summaryrefslogtreecommitdiff
path: root/gr-digital/lib/header_payload_demux_impl.cc
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@kit.edu>2013-06-11 19:28:13 +0200
committerJohnathan Corgan <johnathan@corganlabs.com>2013-06-14 06:38:40 -0700
commitb024982081ee4384e95d8a8958900de93c5fd064 (patch)
treec77af57c0222daea2b5d43d0f98d4eca9b89e3fb /gr-digital/lib/header_payload_demux_impl.cc
parent8ba7be2a1b0027747d256638c50a4c7b09677ea9 (diff)
digital: HPD: fixed tag propagation, minimized calls to work()
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r--gr-digital/lib/header_payload_demux_impl.cc246
1 files changed, 139 insertions, 107 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc
index 23992e7ce1..7cc3ec3836 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -28,7 +28,6 @@
#include <gnuradio/io_signature.h>
#include "header_payload_demux_impl.h"
-
namespace gr {
namespace digital {
@@ -86,6 +85,7 @@ namespace gr {
d_itemsize(itemsize),
d_uses_trigger_tag(!trigger_tag_key.empty()),
d_state(STATE_FIND_TRIGGER),
+ d_curr_payload_len(0),
d_payload_tag_keys(0),
d_payload_tag_values(0)
{
@@ -95,9 +95,13 @@ namespace gr {
if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) {
throw std::invalid_argument("Items and symbol sizes must be at least 1.");
}
- if (!d_output_symbols) {
+ if (d_output_symbols) {
+ set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
+ } else {
+ set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi));
set_output_multiple(d_items_per_symbol);
}
+ set_tag_propagation_policy(TPP_DONT);
message_port_register_in(msg_port_id);
set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
}
@@ -112,7 +116,8 @@ namespace gr {
int n_items_reqd = 0;
if (d_state == STATE_HEADER) {
n_items_reqd = d_header_len * (d_items_per_symbol + d_gi);
- //} else if (d_state == STATE_HEADER) {
+ } else if (d_state == STATE_PAYLOAD) {
+ n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
} else {
n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
if (!d_output_symbols) {
@@ -126,6 +131,36 @@ namespace gr {
}
}
+
+ inline bool
+ header_payload_demux_impl::check_items_available(
+ int n_symbols,
+ gr_vector_int &ninput_items,
+ int noutput_items,
+ int nread
+ )
+ {
+ // Check there's enough items on the input
+ if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread)
+ || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) {
+ return false;
+ }
+
+ // Check there's enough space on the output buffer
+ if (d_output_symbols) {
+ if (noutput_items < n_symbols) {
+ return false;
+ }
+ } else {
+ if (noutput_items < n_symbols * d_items_per_symbol) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
int
header_payload_demux_impl::general_work (int noutput_items,
gr_vector_int &ninput_items,
@@ -137,89 +172,77 @@ namespace gr {
unsigned char *out_payload = (unsigned char *) output_items[1];
int nread = 0;
- bool exit_loop = false;
- int produced_hdr = 0;
- int produced_payload = 0;
-
- while (
- nread < noutput_items
- && nread < ninput_items[0]
- && (ninput_items.size() == 1 || nread < ninput_items[1])
- && !exit_loop
- ) {
- switch (d_state) {
- case STATE_WAIT_FOR_MSG:
- // In an ideal world, this would never be called
- return 0;
-
- case STATE_HEADER_RX_FAIL:
- // TODO: Consume one item from input when copy_symbols has been optimized
- d_state = STATE_FIND_TRIGGER;
+ int trigger_offset = 0;
- case STATE_FIND_TRIGGER:
- // 1) Search for a trigger signal on input 1 (if present)
- // 2) Search for a trigger tag, make sure it's the first one
- // The first trigger to be found is used!
- // 3) Make sure the right number of items is skipped
- // 4) If trigger found, switch to STATE_HEADER
- if (find_trigger_signal(nread, noutput_items, input_items)) {
- d_remaining_symbols = d_header_len;
- d_state = STATE_HEADER;
- in += nread * d_itemsize;
- }
- break;
+ switch (d_state) {
+ case STATE_WAIT_FOR_MSG:
+ // In an ideal world, this would never be called
+ return 0;
- case STATE_HEADER:
- copy_symbol(in, out_header, 0, nread, produced_hdr);
- if (d_remaining_symbols == 0) {
- d_state = STATE_WAIT_FOR_MSG;
- exit_loop = true;
- }
- break;
+ case STATE_HEADER_RX_FAIL:
+ consume_each (1);
+ in += d_itemsize;
+ nread++;
+ d_state = STATE_FIND_TRIGGER;
- case STATE_HEADER_RX_SUCCESS:
- // TODO: Consume the entire header from the input when copy_symbols has been optimized
- for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
- add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]);
- }
- d_state = STATE_PAYLOAD;
-
- case STATE_PAYLOAD:
- copy_symbol(in, out_payload, 1, nread, produced_payload);
- if (d_remaining_symbols == 0) {
- d_state = STATE_FIND_TRIGGER;
- exit_loop = true;
- }
+ case STATE_FIND_TRIGGER:
+ trigger_offset = find_trigger_signal(nread, noutput_items, input_items);
+ if (trigger_offset == -1) {
+ consume_each(noutput_items - nread);
break;
+ }
+ consume_each (trigger_offset);
+ in += trigger_offset * d_itemsize;
+ d_state = STATE_HEADER;
- default:
- throw std::runtime_error("invalid state");
- } /* switch */
- } /* while(nread < noutput_items) */
- if (!d_output_symbols) {
- produced_hdr *= d_items_per_symbol;
- produced_payload *= d_items_per_symbol;
- }
- produce(0, produced_hdr);
- produce(1, produced_payload);
- consume_each (nread);
+ case STATE_HEADER:
+ if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) {
+ copy_n_symbols(in, out_header, 0, d_header_len);
+ d_state = STATE_WAIT_FOR_MSG;
+ produce(0, d_header_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ }
+ break;
+
+ case STATE_HEADER_RX_SUCCESS:
+ for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
+ add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]);
+ }
+ consume_each (d_header_len * (d_items_per_symbol + d_gi));
+ in += d_header_len * (d_items_per_symbol + d_gi) * d_itemsize;
+ nread += d_header_len * (d_items_per_symbol + d_gi);
+ d_state = STATE_PAYLOAD;
+
+ case STATE_PAYLOAD:
+ if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) {
+ copy_n_symbols(in, out_payload, 1, d_curr_payload_len);
+ produce(1, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ consume_each (d_curr_payload_len * (d_items_per_symbol + d_gi));
+ d_state = STATE_FIND_TRIGGER;
+ set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi));
+ }
+ break;
+
+ default:
+ throw std::runtime_error("invalid state");
+ } /* switch */
+
return WORK_CALLED_PRODUCE;
} /* general_work() */
- bool
+ int
header_payload_demux_impl::find_trigger_signal(
- int &pos,
+ int nread,
int noutput_items,
gr_vector_const_void_star &input_items)
{
if (input_items.size() == 2) {
unsigned char *in_trigger = (unsigned char *) input_items[1];
- for (int i = 0; i < noutput_items; i++) {
+ in_trigger += nread;
+ for (int i = 0; i < noutput_items-nread; i++) {
if (in_trigger[i]) {
- pos = i;
- return true;
+ return i;
}
}
}
@@ -235,12 +258,10 @@ namespace gr {
}
}
if (tag_index != -1) {
- pos = min_offset - nitems_read(0);
- return true;
+ return min_offset - nitems_read(0);
}
}
- pos += noutput_items;
- return false;
+ return -1;
}
@@ -252,7 +273,7 @@ namespace gr {
d_state = STATE_HEADER_RX_FAIL;
if (pmt::is_integer(header_data)) {
- d_remaining_symbols = pmt::to_long(header_data);
+ d_curr_payload_len = pmt::to_long(header_data);
d_payload_tag_keys.push_back(d_len_tag_key);
d_payload_tag_values.push_back(header_data);
d_state = STATE_HEADER_RX_SUCCESS;
@@ -263,7 +284,7 @@ namespace gr {
d_payload_tag_keys.push_back(pmt::car(this_item));
d_payload_tag_values.push_back(pmt::cdr(this_item));
if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
- d_remaining_symbols = pmt::to_long(pmt::cdr(this_item));
+ d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
d_state = STATE_HEADER_RX_SUCCESS;
}
dict_items = pmt::cdr(dict_items);
@@ -276,53 +297,64 @@ namespace gr {
} else {
GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data));
}
- if (d_state == STATE_HEADER_RX_SUCCESS
- && (d_remaining_symbols * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)
- ) {
- d_state = STATE_HEADER_RX_FAIL;
- GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_remaining_symbols);
+ if (d_state == STATE_HEADER_RX_SUCCESS)
+ {
+ if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)) {
+ d_state = STATE_HEADER_RX_FAIL;
+ GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len);
+ } else {
+ set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ }
}
}
- // This is a inefficient design: Can only copy one symbol at once. TODO fix.
+
void
- header_payload_demux_impl::copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced)
+ header_payload_demux_impl::copy_n_symbols(
+ const unsigned char *in,
+ unsigned char *out,
+ int port,
+ int n_symbols
+ )
{
- std::vector<tag_t> tags;
- memcpy((void *) out,
- (void *) (in + d_gi * d_itemsize),
- d_itemsize * d_items_per_symbol
- );
- // Tags on GI
- get_tags_in_range(tags, 0,
- nitems_read(0) + nread,
- nitems_read(0) + nread + d_gi
- );
- for (unsigned t = 0; t < tags.size(); t++) {
- add_item_tag(port,
- nitems_written(port)+nproduced,
- tags[t].key,
- tags[t].value
+ // Copy samples
+ if (d_gi) {
+ for (int i = 0; i < n_symbols; i++) {
+ memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize);
+ in += d_itemsize * (d_items_per_symbol + d_gi);
+ out += d_itemsize * d_items_per_symbol;
+ }
+ } else {
+ memcpy(
+ (void *) out,
+ (void *) in,
+ n_symbols * d_items_per_symbol * d_itemsize
);
}
- // Tags on symbol
+ // Copy tags
+ std::vector<tag_t> tags;
get_tags_in_range(
tags, 0,
- nitems_read(port) + nread + d_gi,
- nitems_read(port) + nread + d_gi + d_items_per_symbol
+ nitems_read(0),
+ nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi)
);
for (unsigned t = 0; t < tags.size(); t++) {
- add_item_tag(0,
- tags[t].offset - nitems_read(0)-nread + nitems_written(port)+nproduced,
+ int new_offset = tags[t].offset - nitems_read(0);
+ if (d_output_symbols) {
+ new_offset /= (d_items_per_symbol + d_gi);
+ } else if (d_gi) {
+ int pos_on_symbol = (new_offset % (d_items_per_symbol + d_gi)) - d_gi;
+ if (pos_on_symbol < 0) {
+ pos_on_symbol = 0;
+ }
+ new_offset = (new_offset / (d_items_per_symbol + d_gi)) + pos_on_symbol;
+ }
+ add_item_tag(port,
+ nitems_written(port) + new_offset,
tags[t].key,
tags[t].value
);
}
- in += d_itemsize * (d_items_per_symbol + d_gi);
- out += d_items_per_symbol * d_itemsize;
- nread += d_items_per_symbol + d_gi;
- nproduced++;
- d_remaining_symbols--;
}
} /* namespace digital */