summaryrefslogtreecommitdiff
path: root/gr-digital/lib/header_payload_demux_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r--gr-digital/lib/header_payload_demux_impl.cc115
1 files changed, 65 insertions, 50 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc
index ab3f2875f3..fbc13f4069 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -89,7 +89,9 @@ namespace gr {
if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) {
throw std::invalid_argument("Items and symbol sizes must be at least 1.");
}
- set_output_multiple(d_items_per_symbol);
+ if (!d_output_symbols) {
+ set_output_multiple(d_items_per_symbol);
+ }
message_port_register_in(msg_port_id);
}
@@ -100,10 +102,20 @@ namespace gr {
void
header_payload_demux_impl::forecast (int noutput_items, gr_vector_int &ninput_items_required)
{
- // noutput_items is an integer multiple of d_items_per_symbol!
+ int n_items_reqd = 0;
+ if (d_state == STATE_HEADER) {
+ n_items_reqd = d_header_len * (d_items_per_symbol + d_gi);
+ //} else if (d_state == STATE_HEADER) {
+ } else {
+ n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
+ if (!d_output_symbols) {
+ // here, noutput_items is an integer multiple of d_items_per_symbol!
+ n_items_reqd /= d_items_per_symbol;
+ }
+ }
+
for (unsigned i = 0; i < ninput_items_required.size(); i++) {
- ninput_items_required[i] =
- noutput_items / d_items_per_symbol * (d_items_per_symbol + d_gi);
+ ninput_items_required[i] = n_items_reqd;
}
}
@@ -119,59 +131,62 @@ namespace gr {
int nread = 0;
bool exit_loop = false;
-
int produced_hdr = 0;
int produced_payload = 0;
- // FIXME ninput_items[1] does not have to be defined O_o
- while (nread < noutput_items && nread < ninput_items[0] && nread < ninput_items[1] && !exit_loop) {
+ while (
+ nread < noutput_items
+ && nread < ninput_items[0]
+ && (ninput_items.size() == 1 || nread < ninput_items[1])
+ && !exit_loop
+ ) {
switch (d_state) {
- case STATE_IDLE:
- // 1) Search for a trigger signal on input 1 (if present)
- // 2) Search for a trigger tag, make sure it's the first one
- // The first trigger to be found is used!
- // 3) Make sure the right number of items is skipped
- // 4) If trigger found, switch to STATE_HEADER
- if (find_trigger_signal(nread, noutput_items, input_items)) {
- d_remaining_symbols = d_header_len;
- d_state = STATE_HEADER;
- in += nread * d_itemsize;
- }
- break;
+ case STATE_IDLE:
+ // 1) Search for a trigger signal on input 1 (if present)
+ // 2) Search for a trigger tag, make sure it's the first one
+ // The first trigger to be found is used!
+ // 3) Make sure the right number of items is skipped
+ // 4) If trigger found, switch to STATE_HEADER
+ if (find_trigger_signal(nread, noutput_items, input_items)) {
+ d_remaining_symbols = d_header_len;
+ d_state = STATE_HEADER;
+ in += nread * d_itemsize;
+ }
+ break;
- case STATE_HEADER:
- copy_symbol(in, out_header, 0, nread, produced_hdr);
- if (d_remaining_symbols == 0) {
- d_state = STATE_WAIT_FOR_MSG;
- exit_loop = true;
- }
- break;
+ case STATE_HEADER:
+ copy_symbol(in, out_header, 0, nread, produced_hdr);
+ if (d_remaining_symbols == 0) {
+ d_state = STATE_WAIT_FOR_MSG;
+ exit_loop = true;
+ }
+ break;
- case STATE_WAIT_FOR_MSG:
- if (empty_p(msg_port_id)) return 0; //no message available
- // If we're in this state, nread is zero (because previous state exits loop)
- // 1) Wait for msg (blocking call)
- // 2) set d_remaining_symbols
- // 3) Write tags
- // 4) fall through to next state
- d_remaining_symbols = -1;
- if (!parse_header_data_msg()) {
- d_state = STATE_IDLE;
- exit_loop = true;
- break;
- }
- d_state = STATE_PAYLOAD;
+ case STATE_WAIT_FOR_MSG:
+ if (empty_p(msg_port_id)) return 0; //no message available
+ // If we're in this state, nread is zero (because previous state exits loop)
+ // 1) Wait for msg (blocking call)
+ // 2) set d_remaining_symbols
+ // 3) Write tags
+ // 4) fall through to next state
+ d_remaining_symbols = -1;
+ if (!parse_header_data_msg()) {
+ d_state = STATE_IDLE;
+ exit_loop = true;
+ break;
+ }
+ d_state = STATE_PAYLOAD;
- case STATE_PAYLOAD:
- copy_symbol(in, out_payload, 1, nread, produced_payload);
- if (d_remaining_symbols == 0) {
- d_state = STATE_IDLE;
- exit_loop = true;
- }
- break;
+ case STATE_PAYLOAD:
+ copy_symbol(in, out_payload, 1, nread, produced_payload);
+ if (d_remaining_symbols == 0) {
+ d_state = STATE_IDLE;
+ exit_loop = true;
+ }
+ break;
- default:
- throw std::runtime_error("invalid state");
+ default:
+ throw std::runtime_error("invalid state");
} /* switch */
} /* while(nread < noutput_items) */
@@ -225,7 +240,7 @@ namespace gr {
bool
header_payload_demux_impl::parse_header_data_msg()
{
- pmt::pmt_t msg(delete_head_blocking(msg_port_id));
+ pmt::pmt_t msg(delete_head_nowait(msg_port_id));
if (pmt::pmt_is_integer(msg)) {
d_remaining_symbols = pmt::pmt_to_long(msg);
add_item_tag(1, nitems_written(1), d_len_tag_key, msg);