diff options
-rw-r--r-- | gr-digital/grc/digital_header_payload_demux.xml | 31 | ||||
-rw-r--r-- | gr-digital/include/gnuradio/digital/header_payload_demux.h | 21 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 154 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.h | 38 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_header_payload_demux.py | 71 |
5 files changed, 279 insertions, 36 deletions
diff --git a/gr-digital/grc/digital_header_payload_demux.xml b/gr-digital/grc/digital_header_payload_demux.xml index 0925b75809..5d19e89e87 100644 --- a/gr-digital/grc/digital_header_payload_demux.xml +++ b/gr-digital/grc/digital_header_payload_demux.xml @@ -2,7 +2,18 @@ <name>Header/Payload Demux</name> <key>digital_header_payload_demux</key> <import>from gnuradio import digital</import> - <make>digital.header_payload_demux($header_len, $items_per_symbol, $guard_interval, $length_tag_key, $trigger_tag_key, $output_symbols, $(type.itemsize))</make> + <make>digital.header_payload_demux( + $header_len, + $items_per_symbol, + $guard_interval, + $length_tag_key, + $trigger_tag_key, + $output_symbols, + $(type.itemsize), + $timing_tag_key, + $samp_rate, + $special_tags, + )</make> <param> <name>Header Length (Symbols)</name> <key>header_len</key> @@ -71,6 +82,24 @@ <opt>itemsize:gr.sizeof_short</opt> </option> </param> + <param> + <name>Timing tag key</name> + <key>timing_tag_key</key> + <value>"rx_time"</value> + <type>string</type> + </param> + <param> + <name>Sampling Rate</name> + <key>samp_rate</key> + <value>samp_rate</value> + <type>int</type> + </param> + <param> + <name>Special Tag Keys</name> + <key>special_tags</key> + <value>(,)</value> + <type>raw</type> + </param> <sink> <name>in</name> <type>$type</type> diff --git a/gr-digital/include/gnuradio/digital/header_payload_demux.h b/gr-digital/include/gnuradio/digital/header_payload_demux.h index b3b7c596ef..4fd3f8c2f2 100644 --- a/gr-digital/include/gnuradio/digital/header_payload_demux.h +++ b/gr-digital/include/gnuradio/digital/header_payload_demux.h @@ -58,6 +58,19 @@ namespace gr { * should go on the payload. * A special case are tags on items that make up the guard interval. These are copied * to the first item of the following symbol. + * + * Tags outside of packets are generally discarded. If this information is important, + * there are two additional mechanisms to preserve the tags: + * - Timing tags might be relevant to know <em>when</em> a packet was received. By + * specifying the name of a timestamp tag and the sample rate at this block, it + * keeps track of the time and will add the time to the first item of every packet. + * The name of the timestamp tag is usually 'rx_time' (see gr::uhd::usrp_source::make()). + * The time value must be specified in the UHD time format. + * - Other tags are simply stored and updated. As an example, the user might want to know the + * rx frequency, which UHD stores in the rx_freq tag. In this case, add the tag name 'rx_freq' + * to the list of \p special_tags. This block will then always save the most current value of + * 'rx_freq' and add it to the beginning of every packet. + * */ class DIGITAL_API header_payload_demux : virtual public block { @@ -72,6 +85,9 @@ namespace gr { * \param trigger_tag_key Key of the trigger tag * \param output_symbols Output symbols (true) or items (false)? * \param itemsize Item size (bytes per item) + * \param timing_tag_key The name of the tag with timing information, usually 'rx_time' or empty (this means timing info is discarded) + * \param samp_rate Sampling rate at the input. Necessary to calculate the rx time of packets. + * \param special_tags A vector of strings denoting tags which shall be preserved. */ static sptr make( int header_len, @@ -80,7 +96,10 @@ namespace gr { const std::string &length_tag_key="frame_len", const std::string &trigger_tag_key="", bool output_symbols=false, - size_t itemsize=sizeof(gr_complex) + size_t itemsize=sizeof(gr_complex), + const std::string &timing_tag_key="", + const double samp_rate=1.0, + const std::vector<std::string> &special_tags=std::vector<std::string>() ); }; diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index ba346e2ae7..406d59e246 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -31,6 +31,19 @@ namespace gr { namespace digital { + //! Returns a PMT time tuple (uint seconds, double fraction) as the sum of + // another PMT time tuple and a time diff in seconds. + pmt::pmt_t _update_pmt_time( + pmt::pmt_t old_time, + double time_difference + ){ + double diff_seconds; + double diff_frac = modf(time_difference, &diff_seconds); + uint64_t seconds = pmt::to_uint64(pmt::tuple_ref(old_time, 0)) + (uint64_t) diff_seconds; + double frac = pmt::to_double(pmt::tuple_ref(old_time, 1)) + diff_frac; + return pmt::make_tuple(pmt::from_uint64(seconds), pmt::from_double(frac)); + } + enum demux_states_t { STATE_FIND_TRIGGER, // "Idle" state (waiting for burst) STATE_HEADER, // Copy header @@ -40,6 +53,11 @@ namespace gr { STATE_PAYLOAD // Copy payload }; + enum out_port_indexes_t { + PORT_HEADER = 0, + PORT_PAYLOAD = 1 + }; + #define msg_port_id pmt::mp("header_data") header_payload_demux::sptr @@ -50,8 +68,11 @@ namespace gr { const std::string &length_tag_key, const std::string &trigger_tag_key, bool output_symbols, - size_t itemsize) - { + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags + ){ return gnuradio::get_initial_sptr ( new header_payload_demux_impl( header_len, @@ -60,7 +81,10 @@ namespace gr { length_tag_key, trigger_tag_key, output_symbols, - itemsize + itemsize, + timing_tag_key, + samp_rate, + special_tags ) ); } @@ -72,7 +96,10 @@ namespace gr { const std::string &length_tag_key, const std::string &trigger_tag_key, bool output_symbols, - size_t itemsize + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags ) : block("header_payload_demux", io_signature::make2(1, 2, itemsize, sizeof(char)), io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))), @@ -87,7 +114,12 @@ namespace gr { d_state(STATE_FIND_TRIGGER), d_curr_payload_len(0), d_payload_tag_keys(0), - d_payload_tag_values(0) + d_payload_tag_values(0), + d_track_time(!timing_tag_key.empty()), + d_timing_key(pmt::intern(timing_tag_key)), + d_last_time_offset(0), + d_last_time(pmt::make_tuple(pmt::from_uint64(0L), pmt::from_double(0.0))), + d_sampling_time(1.0/samp_rate) { if (d_header_len < 1) { throw std::invalid_argument("Header length must be at least 1 symbol."); @@ -104,6 +136,10 @@ namespace gr { set_tag_propagation_policy(TPP_DONT); message_port_register_in(msg_port_id); set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1)); + for (unsigned i = 0; i < special_tags.size(); i++) { + d_special_tags.push_back(pmt::string_to_symbol(special_tags[i])); + d_special_tags_last_value.push_back(pmt::PMT_NIL); + } } header_payload_demux_impl::~header_payload_demux_impl() @@ -168,8 +204,8 @@ namespace gr { gr_vector_void_star &output_items) { const unsigned char *in = (const unsigned char *) input_items[0]; - unsigned char *out_header = (unsigned char *) output_items[0]; - unsigned char *out_payload = (unsigned char *) output_items[1]; + unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER]; + unsigned char *out_payload = (unsigned char *) output_items[PORT_PAYLOAD]; int nread = 0; int trigger_offset = 0; @@ -180,6 +216,7 @@ namespace gr { return 0; case STATE_HEADER_RX_FAIL: + update_special_tags(0, 1); consume_each (1); in += d_itemsize; nread++; @@ -189,40 +226,52 @@ namespace gr { case STATE_FIND_TRIGGER: trigger_offset = find_trigger_signal(nread, noutput_items, input_items); if (trigger_offset == -1) { + update_special_tags(0, noutput_items - nread); consume_each(noutput_items - nread); break; } + update_special_tags(0, trigger_offset); consume_each (trigger_offset); in += trigger_offset * d_itemsize; d_state = STATE_HEADER; // Fall through - case STATE_HEADER: if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) { - copy_n_symbols(in, out_header, 0, d_header_len); + copy_n_symbols(in, out_header, PORT_HEADER, d_header_len); d_state = STATE_WAIT_FOR_MSG; - produce(0, d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)); + add_special_tags(); + produce( + PORT_HEADER, + d_header_len * (d_output_symbols ? 1 : d_items_per_symbol) + ); } break; case STATE_HEADER_RX_SUCCESS: for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { - add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]); + add_item_tag( + PORT_PAYLOAD, + nitems_written(PORT_PAYLOAD), + d_payload_tag_keys[i], + d_payload_tag_values[i] + ); } - consume_each (d_header_len * (d_items_per_symbol + d_gi)); - in += d_header_len * (d_items_per_symbol + d_gi) * d_itemsize; nread += d_header_len * (d_items_per_symbol + d_gi); + update_special_tags(0, nread); + consume_each (nread); + in += nread * d_itemsize; d_state = STATE_PAYLOAD; // Fall through case STATE_PAYLOAD: if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) { - copy_n_symbols(in, out_payload, 1, d_curr_payload_len); - produce(1, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + update_special_tags(0, d_curr_payload_len * (d_items_per_symbol + d_gi)); + copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len); + produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); consume_each (d_curr_payload_len * (d_items_per_symbol + d_gi)); - d_state = STATE_FIND_TRIGGER; set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); + d_state = STATE_FIND_TRIGGER; } break; @@ -251,11 +300,11 @@ namespace gr { } if (d_uses_trigger_tag) { std::vector<tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items, d_trigger_tag_key); uint64_t min_offset = ULLONG_MAX; int tag_index = -1; for (unsigned i = 0; i < tags.size(); i++) { - if (tags[i].key == d_trigger_tag_key && tags[i].offset < min_offset) { + if (tags[i].offset < min_offset) { tag_index = (int) i; min_offset = tags[i].offset; } @@ -266,7 +315,7 @@ namespace gr { } } return -1; - } + } /* find_trigger_signal() */ void @@ -310,7 +359,7 @@ namespace gr { set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); } } - } + } /* parse_header_data_msg() */ void @@ -359,7 +408,70 @@ namespace gr { tags[t].value ); } - } + } /* copy_n_symbols() */ + + void + header_payload_demux_impl::update_special_tags( + int range_start, + int range_end + ){ + if (d_track_time) { + std::vector<tag_t> tags; + get_tags_in_range(tags, 0, + nitems_read(0) + range_start, + nitems_read(0) + range_end, + d_timing_key + ); + for (unsigned t = 0; t < tags.size(); t++) { + if(tags[t].offset >= d_last_time_offset) { + d_last_time = tags[t].value; + d_last_time_offset = tags[t].offset; + } + } + } + + std::vector<tag_t> tags; + for (unsigned i = 0; i < d_special_tags.size(); i++) { + uint64_t offset = 0; + // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually + get_tags_in_range(tags, 0, + nitems_read(0) + range_start, + nitems_read(0) + range_end, + d_special_tags[i] + ); + for (unsigned t = 0; t < tags.size(); t++) { + if(tags[t].offset >= offset) { + d_special_tags_last_value[i] = tags[t].value; + offset = tags[t].offset; + } + } + } + } /* update_special_tags() */ + + void + header_payload_demux_impl::add_special_tags( + ){ + if (d_track_time) { + add_item_tag( + PORT_HEADER, + nitems_written(PORT_HEADER), + d_timing_key, + _update_pmt_time( + d_last_time, + d_sampling_time * (nitems_read(0) - d_last_time_offset) + ) + ); + } + + for (unsigned i = 0; i < d_special_tags.size(); i++) { + add_item_tag( + PORT_HEADER, + nitems_written(PORT_HEADER), + d_special_tags[i], + d_special_tags_last_value[i] + ); + } + } /* add_special_tags() */ } /* namespace digital */ } /* namespace gr */ diff --git a/gr-digital/lib/header_payload_demux_impl.h b/gr-digital/lib/header_payload_demux_impl.h index 61b5e10090..1d45dc7ce1 100644 --- a/gr-digital/lib/header_payload_demux_impl.h +++ b/gr-digital/lib/header_payload_demux_impl.h @@ -42,8 +42,15 @@ namespace gr { int d_curr_payload_len; //!< Length of the next payload (symbols) std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for PMTs that go on the payload (keys) std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for PMTs that go on the payload (values) + bool d_track_time; //!< Whether or not to keep track of the rx time + pmt::pmt_t d_timing_key; //!< Key of the timing tag (usually 'rx_time') + uint64_t d_last_time_offset; //!< Item number of the last time tag + pmt::pmt_t d_last_time; //!< The actual time that was indicated + double d_sampling_time; //!< Inverse sampling rate + std::vector<pmt::pmt_t> d_special_tags; //!< List of special tags + std::vector<pmt::pmt_t> d_special_tags_last_value; //!< The current value of the special tags - // Helpers to make the state machine more readable + // Helper functions to make the state machine more readable //! Checks if there are enough items on the inputs and enough space on the output buffers to copy \p n_symbols symbols inline bool check_items_available(int n_symbols, gr_vector_int &ninput_items, int noutput_items, int nread); @@ -59,7 +66,7 @@ namespace gr { int noutput_items, gr_vector_const_void_star &input_items); - //! Copies n symbols from in to out, makes sure tags are propagated properly + //! Copies n symbols from in to out, makes sure tags are propagated properly. Does neither consume nor produce. void copy_n_symbols( const unsigned char *in, unsigned char *out, @@ -67,16 +74,29 @@ namespace gr { int n_symbols ); + //! Scans a given range for tags in d_special_tags + void update_special_tags( + int range_start, + int range_end + ); + + //! Adds all tags in d_special_tags and timing info to the first item of the header. + void add_special_tags(); + public: header_payload_demux_impl( - int header_len, - int items_per_symbol, - int guard_interval, - const std::string &length_tag_key, - const std::string &trigger_tag_key, - bool output_symbols, - size_t itemsize); + int header_len, + int items_per_symbol, + int guard_interval, + const std::string &length_tag_key, + const std::string &trigger_tag_key, + bool output_symbols, + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags + ); ~header_payload_demux_impl(); void forecast (int noutput_items, gr_vector_int &ninput_items_required); diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index 89df6d645a..2bacf72f62 100755 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -64,8 +64,11 @@ class qa_header_payload_demux (gr_unittest.TestCase): testtag4.offset = n_zeros + len(header) + 3 testtag4.key = pmt.string_to_symbol('tag4') testtag4.value = pmt.from_long(314) - - data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) + data_src = blocks.vector_source_f( + data_signal, + False, + tags=(testtag1, testtag2, testtag3, testtag4) + ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float @@ -196,21 +199,54 @@ class qa_header_payload_demux (gr_unittest.TestCase): """ Like test 1, but twice, plus one fail """ + ### Tx Data n_zeros = 5 header = (1, 2, 3) header_fail = (-1, -2, -4) # Contents don't really matter payload1 = tuple(range(5, 20)) payload2 = (42,) + sampling_rate = 2 data_signal = (0,) * n_zeros + header + payload1 trigger_signal = [0,] * len(data_signal) * 2 trigger_signal[n_zeros] = 1 trigger_signal[len(data_signal)] = 1 trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000 - data_src = blocks.vector_source_f(tx_signal, False) + # Timing tag: This is preserved and updated: + timing_tag = gr.tag_t() + timing_tag.offset = 0 + timing_tag.key = pmt.string_to_symbol('rx_time') + timing_tag.value = pmt.to_pmt((0, 0)) + # Rx freq tags: + rx_freq_tag1 = gr.tag_t() + rx_freq_tag1.offset = 0 + rx_freq_tag1.key = pmt.string_to_symbol('rx_freq') + rx_freq_tag1.value = pmt.from_double(1.0) + rx_freq_tag2 = gr.tag_t() + rx_freq_tag2.offset = 29 + rx_freq_tag2.key = pmt.string_to_symbol('rx_freq') + rx_freq_tag2.value = pmt.from_double(1.5) + rx_freq_tag3 = gr.tag_t() + rx_freq_tag3.offset = 30 + rx_freq_tag3.key = pmt.string_to_symbol('rx_freq') + rx_freq_tag3.value = pmt.from_double(2.0) + ### Flow graph + data_src = blocks.vector_source_f( + tx_signal, False, + tags=(timing_tag, rx_freq_tag1, rx_freq_tag2, rx_freq_tag3) + ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( - len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float + header_len=len(header), + items_per_symbol=1, + guard_interval=0, + length_tag_key="frame_len", + trigger_tag_key="detect", + output_symbols=False, + itemsize=gr.sizeof_float, + timing_tag_key='rx_time', + samp_rate=sampling_rate, + special_tags=('rx_freq',), ) self.assertEqual(pmt.length(hpd.message_ports_in()), 1) header_sink = blocks.vector_sink_f() @@ -242,8 +278,35 @@ class qa_header_payload_demux (gr_unittest.TestCase): time.sleep(.2) self.tb.stop() self.tb.wait() + # Signal description: + # 0: 5 zeros + # 5: header 1 + # 8: payload 1 (length: 15) + # 23: header 2 (fail) + # 26: 5 zeros + # 31: header 3 + # 34: payload 2 (length 1) + # 35: 1000 zeros self.assertEqual(header_sink.data(), header + header_fail + header) self.assertEqual(payload_sink.data(), payload1 + payload2) + tags_payload = [gr.tag_to_python(x) for x in payload_sink.tags()] + tags_payload = sorted([(x.offset, x.key, x.value) for x in tags_payload]) + tags_expected_payload = [ + (0, 'frame_len', len(payload1)), + (len(payload1), 'frame_len', len(payload2)), + ] + tags_header = [gr.tag_to_python(x) for x in header_sink.tags()] + tags_header = sorted([(x.offset, x.key, x.value) for x in tags_header]) + tags_expected_header = [ + (0, 'rx_freq', 1.0), + (0, 'rx_time', (2, 0.5)), # Hard coded time value :( Is n_zeros/sampling_rate + (len(header), 'rx_freq', 1.0), + (len(header), 'rx_time', (11, .5)), # Hard coded time value :(. See above. + (2*len(header), 'rx_freq', 2.0), + (2*len(header), 'rx_time', (15, .5)), # Hard coded time value :(. See above. + ] + self.assertEqual(tags_header, tags_expected_header) + self.assertEqual(tags_payload, tags_expected_payload) if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") |