summaryrefslogtreecommitdiff
path: root/gr-digital/lib/header_payload_demux_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r--gr-digital/lib/header_payload_demux_impl.cc670
1 files changed, 399 insertions, 271 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc
index 89428fa86e..f887ea1eb3 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -1,5 +1,5 @@
/* -*- c++ -*- */
-/* Copyright 2012-2014 Free Software Foundation, Inc.
+/* Copyright 2012-2016 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
@@ -23,10 +23,10 @@
#include "config.h"
#endif
-#include <climits>
-#include <boost/format.hpp>
-#include <gnuradio/io_signature.h>
#include "header_payload_demux_impl.h"
+#include <gnuradio/io_signature.h>
+#include <boost/format.hpp>
+#include <climits>
namespace gr {
namespace digital {
@@ -55,55 +55,63 @@ namespace gr {
enum out_port_indexes_t {
PORT_HEADER = 0,
- PORT_PAYLOAD = 1
+ PORT_PAYLOAD = 1,
+ PORT_INPUTDATA = 0,
+ PORT_TRIGGER = 1
};
#define msg_port_id pmt::mp("header_data")
header_payload_demux::sptr
header_payload_demux::make(
- int header_len,
- int items_per_symbol,
- int guard_interval,
- const std::string &length_tag_key,
- const std::string &trigger_tag_key,
- bool output_symbols,
- size_t itemsize,
- const std::string &timing_tag_key,
- const double samp_rate,
- const std::vector<std::string> &special_tags
+ int header_len,
+ int items_per_symbol,
+ int guard_interval,
+ const std::string &length_tag_key,
+ const std::string &trigger_tag_key,
+ bool output_symbols,
+ size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags,
+ const size_t header_padding
){
return gnuradio::get_initial_sptr (
- new header_payload_demux_impl(
- header_len,
- items_per_symbol,
- guard_interval,
- length_tag_key,
- trigger_tag_key,
- output_symbols,
- itemsize,
- timing_tag_key,
- samp_rate,
- special_tags
- )
+ new header_payload_demux_impl(
+ header_len,
+ items_per_symbol,
+ guard_interval,
+ length_tag_key,
+ trigger_tag_key,
+ output_symbols,
+ itemsize,
+ timing_tag_key,
+ samp_rate,
+ special_tags,
+ header_padding
+ )
);
}
header_payload_demux_impl::header_payload_demux_impl(
- int header_len,
- int items_per_symbol,
- int guard_interval,
- const std::string &length_tag_key,
- const std::string &trigger_tag_key,
- bool output_symbols,
- size_t itemsize,
- const std::string &timing_tag_key,
- const double samp_rate,
- const std::vector<std::string> &special_tags
+ int header_len,
+ int items_per_symbol,
+ int guard_interval,
+ const std::string &length_tag_key,
+ const std::string &trigger_tag_key,
+ bool output_symbols,
+ size_t itemsize,
+ const std::string &timing_tag_key,
+ const double samp_rate,
+ const std::vector<std::string> &special_tags,
+ const size_t header_padding
) : block("header_payload_demux",
- io_signature::make2(1, 2, itemsize, sizeof(char)),
- io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))),
+ io_signature::make2(1, 2, itemsize, sizeof(char)),
+ io_signature::make(2, 2, (output_symbols ? itemsize * items_per_symbol : itemsize))),
d_header_len(header_len),
+ d_header_padding_symbols(header_padding / items_per_symbol),
+ d_header_padding_items(header_padding % items_per_symbol),
+ d_header_padding_total_items(header_padding),
d_items_per_symbol(items_per_symbol),
d_gi(guard_interval),
d_len_tag_key(pmt::string_to_symbol(length_tag_key)),
@@ -113,32 +121,42 @@ namespace gr {
d_uses_trigger_tag(!trigger_tag_key.empty()),
d_state(STATE_FIND_TRIGGER),
d_curr_payload_len(0),
+ d_curr_payload_offset(0),
d_payload_tag_keys(0),
d_payload_tag_values(0),
d_track_time(!timing_tag_key.empty()),
d_timing_key(pmt::intern(timing_tag_key)),
+ d_payload_offset_key(pmt::intern("payload_offset")),
d_last_time_offset(0),
d_last_time(pmt::make_tuple(pmt::from_uint64(0L), pmt::from_double(0.0))),
d_sampling_time(1.0/samp_rate)
{
if (d_header_len < 1) {
- throw std::invalid_argument("Header length must be at least 1 symbol.");
+ throw std::invalid_argument("Header length must be at least 1 symbol.");
+ }
+ if (header_padding < 0) {
+ throw std::invalid_argument("Header padding must be non-negative.");
}
if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) {
- throw std::invalid_argument("Items and symbol sizes must be at least 1.");
+ throw std::invalid_argument("Items and symbol sizes must be at least 1.");
}
if (d_output_symbols) {
- set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
+ set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
} else {
- set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi));
- set_output_multiple(d_items_per_symbol);
+ set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi));
+ set_output_multiple(d_items_per_symbol);
+ }
+ if ((d_output_symbols || d_gi) && d_header_padding_items) {
+ throw std::invalid_argument(
+ "If output_symbols is true or a guard interval is given, padding must be a multiple of items_per_symbol!"
+ );
}
set_tag_propagation_policy(TPP_DONT);
message_port_register_in(msg_port_id);
set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
- for (unsigned i = 0; i < special_tags.size(); i++) {
- d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
- d_special_tags_last_value.push_back(pmt::PMT_NIL);
+ for (size_t i = 0; i < special_tags.size(); i++) {
+ d_special_tags.push_back(pmt::string_to_symbol(special_tags[i]));
+ d_special_tags_last_value.push_back(pmt::PMT_NIL);
}
}
@@ -146,144 +164,219 @@ namespace gr {
{
}
+ // forecast() depends on state:
+ // - When waiting for a header, we require at least the header length
+ // - when waiting for a payload, we require at least the payload length
+ // - Otherwise, pretend this is a sync block with a decimation/interpolation
+ // depending on symbol size and if we output symbols or items
void
- header_payload_demux_impl::forecast (int noutput_items, gr_vector_int &ninput_items_required)
- {
+ header_payload_demux_impl::forecast(
+ int noutput_items,
+ gr_vector_int &ninput_items_required
+ ) {
int n_items_reqd = 0;
if (d_state == STATE_HEADER) {
- n_items_reqd = d_header_len * (d_items_per_symbol + d_gi);
+ n_items_reqd = d_header_len * (d_items_per_symbol + d_gi)
+ + 2*d_header_padding_total_items;
} else if (d_state == STATE_PAYLOAD) {
- n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
+ n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
} else {
- n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
- if (!d_output_symbols) {
- // here, noutput_items is an integer multiple of d_items_per_symbol!
- n_items_reqd /= d_items_per_symbol;
- }
+ n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
+ if (!d_output_symbols) {
+ // Here, noutput_items is an integer multiple of d_items_per_symbol!
+ n_items_reqd /= d_items_per_symbol;
+ }
}
for (unsigned i = 0; i < ninput_items_required.size(); i++) {
- ninput_items_required[i] = n_items_reqd;
+ ninput_items_required[i] = n_items_reqd;
}
}
- inline bool
- header_payload_demux_impl::check_items_available(
- int n_symbols,
- gr_vector_int &ninput_items,
- int noutput_items,
- int nread
- )
- {
- // Check there's enough items on the input
- if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread)
- || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) {
- return false;
- }
-
+ bool header_payload_demux_impl::check_buffers_ready(
+ int output_symbols_reqd,
+ int extra_output_items_reqd,
+ int noutput_items,
+ int input_items_reqd,
+ gr_vector_int &ninput_items,
+ int n_items_read
+ ) {
// Check there's enough space on the output buffer
if (d_output_symbols) {
- if (noutput_items < n_symbols) {
- return false;
- }
+ if (noutput_items < output_symbols_reqd + extra_output_items_reqd) {
+ return false;
+ }
} else {
- if (noutput_items < n_symbols * d_items_per_symbol) {
- return false;
- }
+ if (noutput_items < (output_symbols_reqd * d_items_per_symbol) + extra_output_items_reqd) {
+ return false;
+ }
+ }
+
+ // Check there's enough items on the input
+ if (input_items_reqd > (ninput_items[0]-n_items_read)
+ || (ninput_items.size() == 2 && (input_items_reqd > (ninput_items[1]-n_items_read)))) {
+ return false;
}
+ // All good
return true;
}
int
- header_payload_demux_impl::general_work (int noutput_items,
- gr_vector_int &ninput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- const unsigned char *in = (const unsigned char *) input_items[0];
+ header_payload_demux_impl::general_work(
+ int noutput_items,
+ gr_vector_int &ninput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items
+ ) {
+ const unsigned char *in = (const unsigned char *) input_items[PORT_INPUTDATA];
unsigned char *out_header = (unsigned char *) output_items[PORT_HEADER];
unsigned char *out_payload = (unsigned char *) output_items[PORT_PAYLOAD];
- int nread = 0;
- int trigger_offset = 0;
-
+ const int n_input_items = (ninput_items.size() == 2) ?
+ std::min(ninput_items[0], ninput_items[1]) :
+ ninput_items[0];
+ // Items read going into general_work()
+ const uint64_t n_items_read_base = nitems_read(PORT_INPUTDATA);
+ // Items read during this call to general_work()
+ int n_items_read = 0;
+
+ #define CONSUME_ITEMS(items_to_consume) \
+ update_special_tags( \
+ n_items_read_base + n_items_read, \
+ n_items_read_base + n_items_read + (items_to_consume) \
+ ); \
+ consume_each(items_to_consume); \
+ n_items_read += (items_to_consume); \
+ in += (items_to_consume) * d_itemsize;
switch (d_state) {
- case STATE_WAIT_FOR_MSG:
- // In an ideal world, this would never be called
- return 0;
-
- case STATE_HEADER_RX_FAIL:
- update_special_tags(0, 1);
- consume_each (1);
- in += d_itemsize;
- nread++;
- d_state = STATE_FIND_TRIGGER;
- // The following break was added to this state as well as STATE_FIND_TRIGGER
- // and STATE_HEADER. There appears to be a bug somewhere in this code without
- // the breaks that can lead to failure of this block. With the breaks in the code
- // testing has shown more stable performance with various block paramters.
- // If an offset calculation bug is found and fixed, it should be possible to
- // remove these breaks for some performance increase.
- break;
-
- case STATE_FIND_TRIGGER:
- trigger_offset = find_trigger_signal(nread, noutput_items, input_items);
- if (trigger_offset == -1) {
- update_special_tags(0, noutput_items - nread);
- consume_each(noutput_items - nread);
- break;
- }
- update_special_tags(0, trigger_offset);
- consume_each (trigger_offset);
- in += trigger_offset * d_itemsize;
- d_state = STATE_HEADER;
- break;
-
- case STATE_HEADER:
- if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) {
- copy_n_symbols(in, out_header, PORT_HEADER, d_header_len);
- d_state = STATE_WAIT_FOR_MSG;
- add_special_tags();
- produce(
- PORT_HEADER,
- d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)
- );
- }
- break;
-
- case STATE_HEADER_RX_SUCCESS:
- for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
- add_item_tag(
- PORT_PAYLOAD,
- nitems_written(PORT_PAYLOAD),
- d_payload_tag_keys[i],
- d_payload_tag_values[i]
- );
- }
- nread += d_header_len * (d_items_per_symbol + d_gi);
- update_special_tags(0, nread);
- consume_each (nread);
- in += nread * d_itemsize;
- d_state = STATE_PAYLOAD;
- break;
-
- case STATE_PAYLOAD:
- if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) {
- // The -1 because we won't consume the last item, it might hold the next trigger.
- update_special_tags(0, (d_curr_payload_len - 1) * (d_items_per_symbol + d_gi));
- copy_n_symbols(in, out_payload, PORT_PAYLOAD, d_curr_payload_len);
- produce(PORT_PAYLOAD, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
- consume_each ((d_curr_payload_len - 1) * (d_items_per_symbol + d_gi)); // Same here
- set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi));
- d_state = STATE_FIND_TRIGGER;
- }
- break;
-
- default:
- throw std::runtime_error("invalid state");
+ case STATE_WAIT_FOR_MSG:
+ // In an ideal world, this would never be called
+ // parse_header_data_msg() is the only place that can kick us out
+ // of this state.
+ return 0;
+
+ case STATE_HEADER_RX_FAIL:
+ // Actions:
+ // - Consume a single item to make sure we're not deleting any other
+ // info
+ CONSUME_ITEMS(1);
+ d_state = STATE_FIND_TRIGGER;
+ break;
+
+ case STATE_FIND_TRIGGER: {
+ // Assumptions going into this state:
+ // - No other state was active for this call to general_work()
+ // - i.e. n_items_read == 0
+ // Start looking for a trigger after any header padding.
+ // The trigger offset is relative to 'in'.
+ // => The absolute trigger offset is on n_items_read_base + n_items_read_base + trigger_offset
+ const int max_rel_offset = n_input_items - n_items_read;
+ const int trigger_offset = find_trigger_signal(
+ d_header_padding_total_items,
+ max_rel_offset,
+ n_items_read_base + n_items_read,
+ (input_items.size() == 2) ?
+ ((const unsigned char *) input_items[PORT_TRIGGER]) + n_items_read : NULL
+ );
+ if (trigger_offset < max_rel_offset) {
+ d_state = STATE_HEADER;
+ }
+ // If we're using padding, don't consume everything, or we might
+ // end up with not enough items before the trigger
+ const int items_to_consume = trigger_offset - d_header_padding_total_items;
+ CONSUME_ITEMS(items_to_consume);
+ break;
+ } /* case STATE_FIND_TRIGGER */
+
+ case STATE_HEADER:
+ // Assumptions going into this state:
+ // - The first items on `in' are the header samples (including padding)
+ // - So we can just copy from the beginning of `in'
+ // - The trigger is on item index `d_header_padding * d_items_per_symbol'
+ // Actions:
+ // - Copy the entire header (including padding) to the header port
+ // - Special tags are added to the header port
+ if (check_buffers_ready(
+ d_header_len + 2*d_header_padding_symbols,
+ d_header_padding_items,
+ noutput_items,
+ d_header_len * (d_items_per_symbol + d_gi) + 2*d_header_padding_total_items,
+ ninput_items,
+ n_items_read)) {
+ add_special_tags();
+ copy_n_symbols(
+ in,
+ out_header,
+ PORT_HEADER,
+ n_items_read_base + n_items_read,
+ d_header_len+2*d_header_padding_symbols, // Number of symbols to copy
+ 2*d_header_padding_items
+ );
+ d_state = STATE_WAIT_FOR_MSG;
+ }
+ break;
+
+ case STATE_HEADER_RX_SUCCESS:
+ // Copy tags from header to payload
+ for (size_t i = 0; i < d_payload_tag_keys.size(); i++) {
+ add_item_tag(
+ PORT_PAYLOAD,
+ nitems_written(PORT_PAYLOAD),
+ d_payload_tag_keys[i],
+ d_payload_tag_values[i]
+ );
+ }
+ // Consume header from input
+ {
+ // Consume the padding only once, we leave the second
+ // part in there because it might be part of the payload
+ const int items_to_consume =
+ d_header_len * (d_items_per_symbol + d_gi)
+ + d_header_padding_total_items
+ + d_curr_payload_offset;
+ CONSUME_ITEMS(items_to_consume);
+ d_curr_payload_offset = 0;
+ d_state = STATE_PAYLOAD;
+ }
+ break;
+
+ case STATE_PAYLOAD:
+ // Assumptions:
+ // - Input buffer is in the right spot to just start copying
+ if (check_buffers_ready(
+ d_curr_payload_len,
+ 0,
+ noutput_items,
+ d_curr_payload_len * (d_items_per_symbol + d_gi),
+ ninput_items,
+ n_items_read)) {
+ // Write payload
+ copy_n_symbols(
+ in,
+ out_payload,
+ PORT_PAYLOAD,
+ n_items_read_base + n_items_read,
+ d_curr_payload_len
+ );
+ // Consume payload
+ // We can't consume the full payload, because we need to hold off
+ // at least the padding value. We'll use a minimum padding of 1
+ // item here.
+ const int items_padding = std::max(d_header_padding_total_items, 1);
+ const int items_to_consume =
+ d_curr_payload_len * (d_items_per_symbol + d_gi)
+ - items_padding;
+ CONSUME_ITEMS(items_to_consume);
+ set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi));
+ d_state = STATE_FIND_TRIGGER;
+ }
+ break;
+
+ default:
+ throw std::runtime_error("invalid state");
} /* switch */
return WORK_CALLED_PRODUCE;
@@ -292,35 +385,41 @@ namespace gr {
int
header_payload_demux_impl::find_trigger_signal(
- int nread,
- int noutput_items,
- gr_vector_const_void_star &input_items)
- {
- if (input_items.size() == 2) {
- unsigned char *in_trigger = (unsigned char *) input_items[1];
- in_trigger += nread;
- for (int i = 0; i < noutput_items-nread; i++) {
- if (in_trigger[i]) {
- return i;
- }
- }
+ int skip_items,
+ int max_rel_offset,
+ uint64_t base_offset,
+ const unsigned char *in_trigger
+ ) {
+ int rel_offset = max_rel_offset;
+ if (max_rel_offset < skip_items) {
+ return rel_offset;
+ }
+ if (in_trigger) {
+ for (int i = skip_items; i < max_rel_offset; i++) {
+ if (in_trigger[i]) {
+ rel_offset = i;
+ break;
+ }
+ }
}
if (d_uses_trigger_tag) {
std::vector<tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items, d_trigger_tag_key);
- uint64_t min_offset = ULLONG_MAX;
- int tag_index = -1;
- for (unsigned i = 0; i < tags.size(); i++) {
- if (tags[i].offset < min_offset) {
- tag_index = (int) i;
- min_offset = tags[i].offset;
+ get_tags_in_range(
+ tags,
+ PORT_INPUTDATA,
+ base_offset + skip_items,
+ base_offset + max_rel_offset,
+ d_trigger_tag_key
+ );
+ if (!tags.empty()) {
+ std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+ const int tag_rel_offset = tags[0].offset - base_offset;
+ if (tag_rel_offset < rel_offset) {
+ rel_offset = tag_rel_offset;
}
}
- if (tag_index != -1) {
- return min_offset - nitems_read(0);
- }
}
- return -1;
+ return rel_offset;
} /* find_trigger_signal() */
@@ -332,77 +431,100 @@ namespace gr {
d_state = STATE_HEADER_RX_FAIL;
if (pmt::is_integer(header_data)) {
- d_curr_payload_len = pmt::to_long(header_data);
- d_payload_tag_keys.push_back(d_len_tag_key);
- d_payload_tag_values.push_back(header_data);
- d_state = STATE_HEADER_RX_SUCCESS;
+ d_curr_payload_len = pmt::to_long(header_data);
+ d_payload_tag_keys.push_back(d_len_tag_key);
+ d_payload_tag_values.push_back(header_data);
+ d_state = STATE_HEADER_RX_SUCCESS;
} else if (pmt::is_dict(header_data)) {
- pmt::pmt_t dict_items(pmt::dict_items(header_data));
- while (!pmt::is_null(dict_items)) {
- pmt::pmt_t this_item(pmt::car(dict_items));
- d_payload_tag_keys.push_back(pmt::car(this_item));
- d_payload_tag_values.push_back(pmt::cdr(this_item));
- if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
- d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
- d_state = STATE_HEADER_RX_SUCCESS;
- }
- dict_items = pmt::cdr(dict_items);
- }
- if (d_state == STATE_HEADER_RX_FAIL) {
- GR_LOG_CRIT(d_logger, "no length tag passed from header data");
- }
+ pmt::pmt_t dict_items(pmt::dict_items(header_data));
+ while (!pmt::is_null(dict_items)) {
+ pmt::pmt_t this_item(pmt::car(dict_items));
+ d_payload_tag_keys.push_back(pmt::car(this_item));
+ d_payload_tag_values.push_back(pmt::cdr(this_item));
+ if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
+ d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
+ d_state = STATE_HEADER_RX_SUCCESS;
+ }
+ if (pmt::equal(pmt::car(this_item), d_payload_offset_key)) {
+ d_curr_payload_offset = pmt::to_long(pmt::cdr(this_item));
+ if (std::abs(d_curr_payload_offset) > d_header_padding_total_items) {
+ GR_LOG_CRIT(d_logger, "Payload offset exceeds padding");
+ d_state = STATE_HEADER_RX_FAIL;
+ return;
+ }
+ }
+ dict_items = pmt::cdr(dict_items);
+ }
+ if (d_state == STATE_HEADER_RX_FAIL) {
+ GR_LOG_CRIT(d_logger, "no payload length passed from header data");
+ }
} else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) {
- GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data));
+ GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data));
} else {
- GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data));
+ GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data));
}
if (d_state == STATE_HEADER_RX_SUCCESS)
{
- if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)/2) {
- d_state = STATE_HEADER_RX_FAIL;
- GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len);
- } else {
- set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
- }
+ if (d_curr_payload_len < 0) {
+ GR_LOG_WARN(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len);
+ d_curr_payload_len = 0;
+ d_state = STATE_HEADER_RX_FAIL;
+ }
+ if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)/2) {
+ d_state = STATE_HEADER_RX_FAIL;
+ GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len);
+ } else {
+ set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ }
}
} /* parse_header_data_msg() */
void
header_payload_demux_impl::copy_n_symbols(
- const unsigned char *in,
- unsigned char *out,
- int port,
- int n_symbols
- )
- {
+ const unsigned char *in,
+ unsigned char *out,
+ int port,
+ const uint64_t n_items_read_base,
+ int n_symbols,
+ int n_padding_items
+ ) {
// Copy samples
if (d_gi) {
- for (int i = 0; i < n_symbols; i++) {
- memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize);
- in += d_itemsize * (d_items_per_symbol + d_gi);
- out += d_itemsize * d_items_per_symbol;
- }
+ // Here we know n_padding_items must be 0 (see contract),
+ // because all padding items will be part of n_symbols
+ for (int i = 0; i < n_symbols; i++) {
+ memcpy(
+ (void *) out,
+ (void *) (in + d_gi * d_itemsize),
+ d_items_per_symbol * d_itemsize
+ );
+ in += d_itemsize * (d_items_per_symbol + d_gi);
+ out += d_itemsize * d_items_per_symbol;
+ }
} else {
- memcpy(
- (void *) out,
- (void *) in,
- n_symbols * d_items_per_symbol * d_itemsize
- );
+ memcpy(
+ (void *) out,
+ (void *) in,
+ (n_symbols * d_items_per_symbol + n_padding_items) * d_itemsize
+ );
}
// Copy tags
std::vector<tag_t> tags;
get_tags_in_range(
- tags, 0,
- nitems_read(0),
- nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi)
+ tags,
+ PORT_INPUTDATA,
+ n_items_read_base,
+ n_items_read_base
+ + n_symbols * (d_items_per_symbol + d_gi)
+ + n_padding_items
);
for (size_t t = 0; t < tags.size(); t++) {
// The trigger tag is *not* propagated
if (tags[t].key == d_trigger_tag_key) {
continue;
}
- int new_offset = tags[t].offset - nitems_read(0);
+ int new_offset = tags[t].offset - n_items_read_base;
if (d_output_symbols) {
new_offset /= (d_items_per_symbol + d_gi);
} else if (d_gi) {
@@ -418,43 +540,49 @@ namespace gr {
tags[t].value
);
}
+ // Advance write pointers
+ // Items to produce might actually be symbols
+ const int items_to_produce = d_output_symbols ?
+ n_symbols :
+ (n_symbols * d_items_per_symbol + n_padding_items);
+ produce(port, items_to_produce);
} /* copy_n_symbols() */
void
header_payload_demux_impl::update_special_tags(
- int range_start,
- int range_end
+ uint64_t range_start,
+ uint64_t range_end
){
if (d_track_time) {
- std::vector<tag_t> tags;
- get_tags_in_range(tags, 0,
- nitems_read(0) + range_start,
- nitems_read(0) + range_end,
- d_timing_key
- );
- for (unsigned t = 0; t < tags.size(); t++) {
- if(tags[t].offset >= d_last_time_offset) {
- d_last_time = tags[t].value;
- d_last_time_offset = tags[t].offset;
- }
- }
+ std::vector<tag_t> tags;
+ get_tags_in_range(
+ tags,
+ PORT_INPUTDATA,
+ range_start,
+ range_end,
+ d_timing_key
+ );
+ if (!tags.empty()) {
+ std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+ d_last_time = tags.back().value;
+ d_last_time_offset = tags.back().offset;
+ }
}
std::vector<tag_t> tags;
- for (unsigned i = 0; i < d_special_tags.size(); i++) {
- uint64_t offset = 0;
- // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually
- get_tags_in_range(tags, 0,
- nitems_read(0) + range_start,
- nitems_read(0) + range_end,
- d_special_tags[i]
- );
- for (unsigned t = 0; t < tags.size(); t++) {
- if(tags[t].offset >= offset) {
- d_special_tags_last_value[i] = tags[t].value;
- offset = tags[t].offset;
- }
- }
+ for (size_t i = 0; i < d_special_tags.size(); i++) {
+ // TODO figure out if it's better to get all tags at once instead of doing this for every tag individually
+ get_tags_in_range(
+ tags,
+ PORT_INPUTDATA, // Read from port 0
+ range_start,
+ range_end,
+ d_special_tags[i]
+ );
+ std::sort(tags.begin(), tags.end(), tag_t::offset_compare);
+ for (size_t t = 0; t < tags.size(); t++) {
+ d_special_tags_last_value[i] = tags[t].value;
+ }
}
} /* update_special_tags() */
@@ -462,24 +590,24 @@ namespace gr {
header_payload_demux_impl::add_special_tags(
){
if (d_track_time) {
- add_item_tag(
- PORT_HEADER,
- nitems_written(PORT_HEADER),
- d_timing_key,
- _update_pmt_time(
- d_last_time,
- d_sampling_time * (nitems_read(0) - d_last_time_offset)
- )
- );
+ add_item_tag(
+ PORT_HEADER,
+ nitems_written(PORT_HEADER),
+ d_timing_key,
+ _update_pmt_time(
+ d_last_time,
+ d_sampling_time * (nitems_read(PORT_INPUTDATA) - d_last_time_offset)
+ )
+ );
}
for (unsigned i = 0; i < d_special_tags.size(); i++) {
- add_item_tag(
- PORT_HEADER,
- nitems_written(PORT_HEADER),
- d_special_tags[i],
- d_special_tags_last_value[i]
- );
+ add_item_tag(
+ PORT_HEADER,
+ nitems_written(PORT_HEADER),
+ d_special_tags[i],
+ d_special_tags_last_value[i]
+ );
}
} /* add_special_tags() */