diff options
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 670 |
1 files changed, 399 insertions, 271 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index 89428fa86e..f887ea1eb3 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -1,5 +1,5 @@ /* -*- c++ -*- */ -/* Copyright 2012-2014 Free Software Foundation, Inc. +/* Copyright 2012-2016 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -23,10 +23,10 @@ #include "config.h" #endif -#include <climits> -#include <boost/format.hpp> -#include <gnuradio/io_signature.h> #include "header_payload_demux_impl.h" +#include <gnuradio/io_signature.h> +#include <boost/format.hpp> +#include <climits> namespace gr { namespace digital { @@ -55,55 +55,63 @@ namespace gr { enum out_port_indexes_t { PORT_HEADER = 0, - PORT_PAYLOAD = 1 + PORT_PAYLOAD = 1, + PORT_INPUTDATA = 0, + PORT_TRIGGER = 1 }; #define msg_port_id pmt::mp("header_data") header_payload_demux::sptr header_payload_demux::make( - int header_len, - int items_per_symbol, - int guard_interval, - const std::string &length_tag_key, - const std::string &trigger_tag_key, - bool output_symbols, - size_t itemsize, - const std::string &timing_tag_key, - const double samp_rate, - const std::vector<std::string> &special_tags + int header_len, + int items_per_symbol, + int guard_interval, + const std::string &length_tag_key, + const std::string &trigger_tag_key, + bool output_symbols, + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags, + const size_t header_padding ){ return gnuradio::get_initial_sptr ( - new header_payload_demux_impl( - header_len, - items_per_symbol, - guard_interval, - length_tag_key, - trigger_tag_key, - output_symbols, - itemsize, - timing_tag_key, - samp_rate, - special_tags - ) + new header_payload_demux_impl( + header_len, + items_per_symbol, + guard_interval, + length_tag_key, + trigger_tag_key, + output_symbols, + itemsize, + timing_tag_key, + samp_rate, + special_tags, + header_padding + ) ); } header_payload_demux_impl::header_payload_demux_impl( - int header_len, - int items_per_symbol, - int guard_interval, - const std::string &length_tag_key, - const std::string &trigger_tag_key, - bool output_symbols, - size_t itemsize, - const std::string &timing_tag_key, - const double samp_rate, - const std::vector<std::string> &special_tags + int header_len, + int items_per_symbol, + int guard_interval, + const std::string &length_tag_key, + const std::string &trigger_tag_key, + bool output_symbols, + size_t itemsize, + const std::string &timing_tag_key, + const double samp_rate, + const std::vector<std::string> &special_tags, + const size_t header_padding ) : block("header_payload_demux", - io_signature::make2(1, 2, itemsize, sizeof(char)), - io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))), + io_signature::make2(1, 2, itemsize, sizeof(char)), + io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))), d_header_len(header_len), + d_header_padding_symbols(header_padding / items_per_symbol), + d_header_padding_items(header_padding % items_per_symbol), + d_header_padding_total_items(header_padding), d_items_per_symbol(items_per_symbol), d_gi(guard_interval), d_len_tag_key(pmt::string_to_symbol(length_tag_key)), @@ -113,32 +121,42 @@ namespace gr { d_uses_trigger_tag(!trigger_tag_key.empty()), d_state(STATE_FIND_TRIGGER), d_curr_payload_len(0), + d_curr_payload_offset(0), d_payload_tag_keys(0), d_payload_tag_values(0), d_track_time(!timing_tag_key.empty()), d_timing_key(pmt::intern(timing_tag_key)), + d_payload_offset_key(pmt::intern("payload_offset")), d_last_time_offset(0), d_last_time(pmt::make_tuple(pmt::from_uint64(0L), pmt::from_double(0.0))), d_sampling_time(1.0/samp_rate) { if (d_header_len < 1) { - throw std::invalid_argument("Header length must be at least 1 symbol."); + throw std::invalid_argument("Header length must be at least 1 symbol."); + } + if (header_padding < 0) { + throw std::invalid_argument("Header padding must be non-negative."); } if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) { - throw std::invalid_argument("Items and symbol sizes must be at least 1."); + throw std::invalid_argument("Items and symbol sizes must be at least 1."); } if (d_output_symbols) { - set_relative_rate(1.0 / (d_items_per_symbol + d_gi)); + set_relative_rate(1.0 / (d_items_per_symbol + d_gi)); } else { - set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi)); - set_output_multiple(d_items_per_symbol); + set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi)); + set_output_multiple(d_items_per_symbol); + } + if ((d_output_symbols || d_gi) && d_header_padding_items) { + throw std::invalid_argument( + "If output_symbols is true or a guard interval is given, padding must be a multiple of items_per_symbol!" + ); } set_tag_propagation_policy(TPP_DONT); message_port_register_in(msg_port_id); set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1)); - for (unsigned i = 0; i < special_tags.size(); i++) { - d_special_tags.push_back(pmt::string_to_symbol(special_tags[i])); - d_special_tags_last_value.push_back(pmt::PMT_NIL); + for (size_t i = 0; i < special_tags.size(); i++) { + d_special_tags.push_back(pmt::string_to_symbol(special_tags[i])); + d_special_tags_last_value.push_back(pmt::PMT_NIL); } } @@ -146,144 +164,219 @@ namespace gr { { } + // forecast() depends on state: + // - When waiting for a header, we require at least the header length + // - when waiting for a payload, we require at least the payload length + // - Otherwise, pretend this is a sync block with a decimation/interpolation + // depending on symbol size and if we output symbols or items void - header_payload_demux_impl::forecast (int noutput_items, gr_vector_int &ninput_items_required) - { + header_payload_demux_impl::forecast( + int noutput_items, + gr_vector_int &ninput_items_required + ) { int n_items_reqd = 0; if (d_state == STATE_HEADER) { - n_items_reqd = d_header_len * (d_items_per_symbol + d_gi); + n_items_reqd = d_header_len * (d_items_per_symbol + d_gi) + + 2*d_header_padding_total_items; } else if (d_state == STATE_PAYLOAD) { - n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi); + n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi); } else { - n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); - if (!d_output_symbols) { - // here, noutput_items is an integer multiple of d_items_per_symbol! - n_items_reqd /= d_items_per_symbol; - } + n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); + if (!d_output_symbols) { + // Here, noutput_items is an integer multiple of d_items_per_symbol! + n_items_reqd /= d_items_per_symbol; + } } for (unsigned i = 0; i < ninput_items_required.size(); i++) { - ninput_items_required[i] = n_items_reqd; + ninput_items_required[i] = n_items_reqd; } } - inline bool - header_payload_demux_impl::check_items_available( - int n_symbols, - gr_vector_int &ninput_items, - int noutput_items, - int nread - ) - { - // Check there's enough items on the input - if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread) - || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) { - return false; - } - + bool header_payload_demux_impl::check_buffers_ready( + int output_symbols_reqd, + int extra_output_items_reqd, + int noutput_items, + int input_items_reqd, + gr_vector_int &ninput_items, + int n_items_read + ) { // Check there's enough space on the output buffer if (d_output_symbols) { - if (noutput_items < n_symbols) { - return false; - } + if (noutput_items < output_symbols_reqd + extra_output_items_reqd) { + return false; + } } else { - if (noutput_items < n_symbols * d_items_per_symbol) { - return false; - } + if (noutput_items < (output_symbols_reqd * d_items_per_symbol) + extra_output_items_reqd) { + return false; + } + } + + // Check there's enough items on the input + if (input_items_reqd > (ninput_items[0]-n_items_read) + || (ninput_items.size() == 2 && (input_items_reqd > (ninput_items[1]-n_items_read)))) { + return false; } + // All good return true; } int - header_payload_demux_impl::general_work (int noutput_items, - gr_vector_int &ninput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - const unsigned char *in = (const unsigned char *) input_items[0]; + header_payload_demux_impl::general_work( + int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items + ) { + const unsigned char *in = (const unsigned char *) input_items[PORT_INPUTDATA]; unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER]; unsigned char *out_payload = (unsigned char *) output_items[PORT_PAYLOAD]; - int nread = 0; - int trigger_offset = 0; - + const int n_input_items = (ninput_items.size() == 2) ? + std::min(ninput_items[0], ninput_items[1]) : + ninput_items[0]; + // Items read going into general_work() + const uint64_t n_items_read_base = nitems_read(PORT_INPUTDATA); + // Items read during this call to general_work() + int n_items_read = 0; + + #define CONSUME_ITEMS(items_to_consume) \ + update_special_tags( \ + n_items_read_base + n_items_read, \ + n_items_read_base + n_items_read + (items_to_consume) \ + ); \ + consume_each(items_to_consume); \ + n_items_read += (items_to_consume); \ + in += (items_to_consume) * d_itemsize; switch (d_state) { - case STATE_WAIT_FOR_MSG: - // In an ideal world, this would never be called - return 0; - - case STATE_HEADER_RX_FAIL: - update_special_tags(0, 1); - consume_each (1); - in += d_itemsize; - nread++; - d_state = STATE_FIND_TRIGGER; - // The following break was added to this state as well as STATE_FIND_TRIGGER - // and STATE_HEADER. There appears to be a bug somewhere in this code without - // the breaks that can lead to failure of this block. With the breaks in the code - // testing has shown more stable performance with various block paramters. - // If an offset calculation bug is found and fixed, it should be possible to - // remove these breaks for some performance increase. - break; - - case STATE_FIND_TRIGGER: - trigger_offset = find_trigger_signal(nread, noutput_items, input_items); - if (trigger_offset == -1) { - update_special_tags(0, noutput_items - nread); - consume_each(noutput_items - nread); - break; - } - update_special_tags(0, trigger_offset); - consume_each (trigger_offset); - in += trigger_offset * d_itemsize; - d_state = STATE_HEADER; - break; - - case STATE_HEADER: - if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) { - copy_n_symbols(in, out_header, PORT_HEADER, d_header_len); - d_state = STATE_WAIT_FOR_MSG; - add_special_tags(); - produce( - PORT_HEADER, - d_header_len * (d_output_symbols ? 1 : d_items_per_symbol) - ); - } - break; - - case STATE_HEADER_RX_SUCCESS: - for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { - add_item_tag( - PORT_PAYLOAD, - nitems_written(PORT_PAYLOAD), - d_payload_tag_keys[i], - d_payload_tag_values[i] - ); - } - nread += d_header_len * (d_items_per_symbol + d_gi); - update_special_tags(0, nread); - consume_each (nread); - in += nread * d_itemsize; - d_state = STATE_PAYLOAD; - break; - - case STATE_PAYLOAD: - if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) { - // The -1 because we won't consume the last item, it might hold the next trigger. - update_special_tags(0, (d_curr_payload_len - 1) * (d_items_per_symbol + d_gi)); - copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len); - produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); - consume_each ((d_curr_payload_len - 1) * (d_items_per_symbol + d_gi)); // Same here - set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); - d_state = STATE_FIND_TRIGGER; - } - break; - - default: - throw std::runtime_error("invalid state"); + case STATE_WAIT_FOR_MSG: + // In an ideal world, this would never be called + // parse_header_data_msg() is the only place that can kick us out + // of this state. + return 0; + + case STATE_HEADER_RX_FAIL: + // Actions: + // - Consume a single item to make sure we're not deleting any other + // info + CONSUME_ITEMS(1); + d_state = STATE_FIND_TRIGGER; + break; + + case STATE_FIND_TRIGGER: { + // Assumptions going into this state: + // - No other state was active for this call to general_work() + // - i.e. n_items_read == 0 + // Start looking for a trigger after any header padding. + // The trigger offset is relative to 'in'. + // => The absolute trigger offset is on n_items_read_base + n_items_read_base + trigger_offset + const int max_rel_offset = n_input_items - n_items_read; + const int trigger_offset = find_trigger_signal( + d_header_padding_total_items, + max_rel_offset, + n_items_read_base + n_items_read, + (input_items.size() == 2) ? + ((const unsigned char *) input_items[PORT_TRIGGER]) + n_items_read : NULL + ); + if (trigger_offset < max_rel_offset) { + d_state = STATE_HEADER; + } + // If we're using padding, don't consume everything, or we might + // end up with not enough items before the trigger + const int items_to_consume = trigger_offset - d_header_padding_total_items; + CONSUME_ITEMS(items_to_consume); + break; + } /* case STATE_FIND_TRIGGER */ + + case STATE_HEADER: + // Assumptions going into this state: + // - The first items on `in' are the header samples (including padding) + // - So we can just copy from the beginning of `in' + // - The trigger is on item index `d_header_padding * d_items_per_symbol' + // Actions: + // - Copy the entire header (including padding) to the header port + // - Special tags are added to the header port + if (check_buffers_ready( + d_header_len + 2*d_header_padding_symbols, + d_header_padding_items, + noutput_items, + d_header_len * (d_items_per_symbol + d_gi) + 2*d_header_padding_total_items, + ninput_items, + n_items_read)) { + add_special_tags(); + copy_n_symbols( + in, + out_header, + PORT_HEADER, + n_items_read_base + n_items_read, + d_header_len+2*d_header_padding_symbols, // Number of symbols to copy + 2*d_header_padding_items + ); + d_state = STATE_WAIT_FOR_MSG; + } + break; + + case STATE_HEADER_RX_SUCCESS: + // Copy tags from header to payload + for (size_t i = 0; i < d_payload_tag_keys.size(); i++) { + add_item_tag( + PORT_PAYLOAD, + nitems_written(PORT_PAYLOAD), + d_payload_tag_keys[i], + d_payload_tag_values[i] + ); + } + // Consume header from input + { + // Consume the padding only once, we leave the second + // part in there because it might be part of the payload + const int items_to_consume = + d_header_len * (d_items_per_symbol + d_gi) + + d_header_padding_total_items + + d_curr_payload_offset; + CONSUME_ITEMS(items_to_consume); + d_curr_payload_offset = 0; + d_state = STATE_PAYLOAD; + } + break; + + case STATE_PAYLOAD: + // Assumptions: + // - Input buffer is in the right spot to just start copying + if (check_buffers_ready( + d_curr_payload_len, + 0, + noutput_items, + d_curr_payload_len * (d_items_per_symbol + d_gi), + ninput_items, + n_items_read)) { + // Write payload + copy_n_symbols( + in, + out_payload, + PORT_PAYLOAD, + n_items_read_base + n_items_read, + d_curr_payload_len + ); + // Consume payload + // We can't consume the full payload, because we need to hold off + // at least the padding value. We'll use a minimum padding of 1 + // item here. + const int items_padding = std::max(d_header_padding_total_items, 1); + const int items_to_consume = + d_curr_payload_len * (d_items_per_symbol + d_gi) + - items_padding; + CONSUME_ITEMS(items_to_consume); + set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); + d_state = STATE_FIND_TRIGGER; + } + break; + + default: + throw std::runtime_error("invalid state"); } /* switch */ return WORK_CALLED_PRODUCE; @@ -292,35 +385,41 @@ namespace gr { int header_payload_demux_impl::find_trigger_signal( - int nread, - int noutput_items, - gr_vector_const_void_star &input_items) - { - if (input_items.size() == 2) { - unsigned char *in_trigger = (unsigned char *) input_items[1]; - in_trigger += nread; - for (int i = 0; i < noutput_items-nread; i++) { - if (in_trigger[i]) { - return i; - } - } + int skip_items, + int max_rel_offset, + uint64_t base_offset, + const unsigned char *in_trigger + ) { + int rel_offset = max_rel_offset; + if (max_rel_offset < skip_items) { + return rel_offset; + } + if (in_trigger) { + for (int i = skip_items; i < max_rel_offset; i++) { + if (in_trigger[i]) { + rel_offset = i; + break; + } + } } if (d_uses_trigger_tag) { std::vector<tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items, d_trigger_tag_key); - uint64_t min_offset = ULLONG_MAX; - int tag_index = -1; - for (unsigned i = 0; i < tags.size(); i++) { - if (tags[i].offset < min_offset) { - tag_index = (int) i; - min_offset = tags[i].offset; + get_tags_in_range( + tags, + PORT_INPUTDATA, + base_offset + skip_items, + base_offset + max_rel_offset, + d_trigger_tag_key + ); + if (!tags.empty()) { + std::sort(tags.begin(), tags.end(), tag_t::offset_compare); + const int tag_rel_offset = tags[0].offset - base_offset; + if (tag_rel_offset < rel_offset) { + rel_offset = tag_rel_offset; } } - if (tag_index != -1) { - return min_offset - nitems_read(0); - } } - return -1; + return rel_offset; } /* find_trigger_signal() */ @@ -332,77 +431,100 @@ namespace gr { d_state = STATE_HEADER_RX_FAIL; if (pmt::is_integer(header_data)) { - d_curr_payload_len = pmt::to_long(header_data); - d_payload_tag_keys.push_back(d_len_tag_key); - d_payload_tag_values.push_back(header_data); - d_state = STATE_HEADER_RX_SUCCESS; + d_curr_payload_len = pmt::to_long(header_data); + d_payload_tag_keys.push_back(d_len_tag_key); + d_payload_tag_values.push_back(header_data); + d_state = STATE_HEADER_RX_SUCCESS; } else if (pmt::is_dict(header_data)) { - pmt::pmt_t dict_items(pmt::dict_items(header_data)); - while (!pmt::is_null(dict_items)) { - pmt::pmt_t this_item(pmt::car(dict_items)); - d_payload_tag_keys.push_back(pmt::car(this_item)); - d_payload_tag_values.push_back(pmt::cdr(this_item)); - if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { - d_curr_payload_len = pmt::to_long(pmt::cdr(this_item)); - d_state = STATE_HEADER_RX_SUCCESS; - } - dict_items = pmt::cdr(dict_items); - } - if (d_state == STATE_HEADER_RX_FAIL) { - GR_LOG_CRIT(d_logger, "no length tag passed from header data"); - } + pmt::pmt_t dict_items(pmt::dict_items(header_data)); + while (!pmt::is_null(dict_items)) { + pmt::pmt_t this_item(pmt::car(dict_items)); + d_payload_tag_keys.push_back(pmt::car(this_item)); + d_payload_tag_values.push_back(pmt::cdr(this_item)); + if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { + d_curr_payload_len = pmt::to_long(pmt::cdr(this_item)); + d_state = STATE_HEADER_RX_SUCCESS; + } + if (pmt::equal(pmt::car(this_item), d_payload_offset_key)) { + d_curr_payload_offset = pmt::to_long(pmt::cdr(this_item)); + if (std::abs(d_curr_payload_offset) > d_header_padding_total_items) { + GR_LOG_CRIT(d_logger, "Payload offset exceeds padding"); + d_state = STATE_HEADER_RX_FAIL; + return; + } + } + dict_items = pmt::cdr(dict_items); + } + if (d_state == STATE_HEADER_RX_FAIL) { + GR_LOG_CRIT(d_logger, "no payload length passed from header data"); + } } else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) { - GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data)); + GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data)); } else { - GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); + GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); } if (d_state == STATE_HEADER_RX_SUCCESS) { - if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)/2) { - d_state = STATE_HEADER_RX_FAIL; - GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); - } else { - set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); - } + if (d_curr_payload_len < 0) { + GR_LOG_WARN(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); + d_curr_payload_len = 0; + d_state = STATE_HEADER_RX_FAIL; + } + if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)/2) { + d_state = STATE_HEADER_RX_FAIL; + GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); + } else { + set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + } } } /* parse_header_data_msg() */ void header_payload_demux_impl::copy_n_symbols( - const unsigned char *in, - unsigned char *out, - int port, - int n_symbols - ) - { + const unsigned char *in, + unsigned char *out, + int port, + const uint64_t n_items_read_base, + int n_symbols, + int n_padding_items + ) { // Copy samples if (d_gi) { - for (int i = 0; i < n_symbols; i++) { - memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize); - in += d_itemsize * (d_items_per_symbol + d_gi); - out += d_itemsize * d_items_per_symbol; - } + // Here we know n_padding_items must be 0 (see contract), + // because all padding items will be part of n_symbols + for (int i = 0; i < n_symbols; i++) { + memcpy( + (void *) out, + (void *) (in + d_gi * d_itemsize), + d_items_per_symbol * d_itemsize + ); + in += d_itemsize * (d_items_per_symbol + d_gi); + out += d_itemsize * d_items_per_symbol; + } } else { - memcpy( - (void *) out, - (void *) in, - n_symbols * d_items_per_symbol * d_itemsize - ); + memcpy( + (void *) out, + (void *) in, + (n_symbols * d_items_per_symbol + n_padding_items) * d_itemsize + ); } // Copy tags std::vector<tag_t> tags; get_tags_in_range( - tags, 0, - nitems_read(0), - nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi) + tags, + PORT_INPUTDATA, + n_items_read_base, + n_items_read_base + + n_symbols * (d_items_per_symbol + d_gi) + + n_padding_items ); for (size_t t = 0; t < tags.size(); t++) { // The trigger tag is *not* propagated if (tags[t].key == d_trigger_tag_key) { continue; } - int new_offset = tags[t].offset - nitems_read(0); + int new_offset = tags[t].offset - n_items_read_base; if (d_output_symbols) { new_offset /= (d_items_per_symbol + d_gi); } else if (d_gi) { @@ -418,43 +540,49 @@ namespace gr { tags[t].value ); } + // Advance write pointers + // Items to produce might actually be symbols + const int items_to_produce = d_output_symbols ? + n_symbols : + (n_symbols * d_items_per_symbol + n_padding_items); + produce(port, items_to_produce); } /* copy_n_symbols() */ void header_payload_demux_impl::update_special_tags( - int range_start, - int range_end + uint64_t range_start, + uint64_t range_end ){ if (d_track_time) { - std::vector<tag_t> tags; - get_tags_in_range(tags, 0, - nitems_read(0) + range_start, - nitems_read(0) + range_end, - d_timing_key - ); - for (unsigned t = 0; t < tags.size(); t++) { - if(tags[t].offset >= d_last_time_offset) { - d_last_time = tags[t].value; - d_last_time_offset = tags[t].offset; - } - } + std::vector<tag_t> tags; + get_tags_in_range( + tags, + PORT_INPUTDATA, + range_start, + range_end, + d_timing_key + ); + if (!tags.empty()) { + std::sort(tags.begin(), tags.end(), tag_t::offset_compare); + d_last_time = tags.back().value; + d_last_time_offset = tags.back().offset; + } } std::vector<tag_t> tags; - for (unsigned i = 0; i < d_special_tags.size(); i++) { - uint64_t offset = 0; - // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually - get_tags_in_range(tags, 0, - nitems_read(0) + range_start, - nitems_read(0) + range_end, - d_special_tags[i] - ); - for (unsigned t = 0; t < tags.size(); t++) { - if(tags[t].offset >= offset) { - d_special_tags_last_value[i] = tags[t].value; - offset = tags[t].offset; - } - } + for (size_t i = 0; i < d_special_tags.size(); i++) { + // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually + get_tags_in_range( + tags, + PORT_INPUTDATA, // Read from port 0 + range_start, + range_end, + d_special_tags[i] + ); + std::sort(tags.begin(), tags.end(), tag_t::offset_compare); + for (size_t t = 0; t < tags.size(); t++) { + d_special_tags_last_value[i] = tags[t].value; + } } } /* update_special_tags() */ @@ -462,24 +590,24 @@ namespace gr { header_payload_demux_impl::add_special_tags( ){ if (d_track_time) { - add_item_tag( - PORT_HEADER, - nitems_written(PORT_HEADER), - d_timing_key, - _update_pmt_time( - d_last_time, - d_sampling_time * (nitems_read(0) - d_last_time_offset) - ) - ); + add_item_tag( + PORT_HEADER, + nitems_written(PORT_HEADER), + d_timing_key, + _update_pmt_time( + d_last_time, + d_sampling_time * (nitems_read(PORT_INPUTDATA) - d_last_time_offset) + ) + ); } for (unsigned i = 0; i < d_special_tags.size(); i++) { - add_item_tag( - PORT_HEADER, - nitems_written(PORT_HEADER), - d_special_tags[i], - d_special_tags_last_value[i] - ); + add_item_tag( + PORT_HEADER, + nitems_written(PORT_HEADER), + d_special_tags[i], + d_special_tags_last_value[i] + ); } } /* add_special_tags() */ |