diff options
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 115 |
1 files changed, 65 insertions, 50 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index ab3f2875f3..fbc13f4069 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -89,7 +89,9 @@ namespace gr { if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) { throw std::invalid_argument("Items and symbol sizes must be at least 1."); } - set_output_multiple(d_items_per_symbol); + if (!d_output_symbols) { + set_output_multiple(d_items_per_symbol); + } message_port_register_in(msg_port_id); } @@ -100,10 +102,20 @@ namespace gr { void header_payload_demux_impl::forecast (int noutput_items, gr_vector_int &ninput_items_required) { - // noutput_items is an integer multiple of d_items_per_symbol! + int n_items_reqd = 0; + if (d_state == STATE_HEADER) { + n_items_reqd = d_header_len * (d_items_per_symbol + d_gi); + //} else if (d_state == STATE_HEADER) { + } else { + n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); + if (!d_output_symbols) { + // here, noutput_items is an integer multiple of d_items_per_symbol! + n_items_reqd /= d_items_per_symbol; + } + } + for (unsigned i = 0; i < ninput_items_required.size(); i++) { - ninput_items_required[i] = - noutput_items / d_items_per_symbol * (d_items_per_symbol + d_gi); + ninput_items_required[i] = n_items_reqd; } } @@ -119,59 +131,62 @@ namespace gr { int nread = 0; bool exit_loop = false; - int produced_hdr = 0; int produced_payload = 0; - // FIXME ninput_items[1] does not have to be defined O_o - while (nread < noutput_items && nread < ninput_items[0] && nread < ninput_items[1] && !exit_loop) { + while ( + nread < noutput_items + && nread < ninput_items[0] + && (ninput_items.size() == 1 || nread < ninput_items[1]) + && !exit_loop + ) { switch (d_state) { - case STATE_IDLE: - // 1) Search for a trigger signal on input 1 (if present) - // 2) Search for a trigger tag, make sure it's the first one - // The first trigger to be found is used! - // 3) Make sure the right number of items is skipped - // 4) If trigger found, switch to STATE_HEADER - if (find_trigger_signal(nread, noutput_items, input_items)) { - d_remaining_symbols = d_header_len; - d_state = STATE_HEADER; - in += nread * d_itemsize; - } - break; + case STATE_IDLE: + // 1) Search for a trigger signal on input 1 (if present) + // 2) Search for a trigger tag, make sure it's the first one + // The first trigger to be found is used! + // 3) Make sure the right number of items is skipped + // 4) If trigger found, switch to STATE_HEADER + if (find_trigger_signal(nread, noutput_items, input_items)) { + d_remaining_symbols = d_header_len; + d_state = STATE_HEADER; + in += nread * d_itemsize; + } + break; - case STATE_HEADER: - copy_symbol(in, out_header, 0, nread, produced_hdr); - if (d_remaining_symbols == 0) { - d_state = STATE_WAIT_FOR_MSG; - exit_loop = true; - } - break; + case STATE_HEADER: + copy_symbol(in, out_header, 0, nread, produced_hdr); + if (d_remaining_symbols == 0) { + d_state = STATE_WAIT_FOR_MSG; + exit_loop = true; + } + break; - case STATE_WAIT_FOR_MSG: - if (empty_p(msg_port_id)) return 0; //no message available - // If we're in this state, nread is zero (because previous state exits loop) - // 1) Wait for msg (blocking call) - // 2) set d_remaining_symbols - // 3) Write tags - // 4) fall through to next state - d_remaining_symbols = -1; - if (!parse_header_data_msg()) { - d_state = STATE_IDLE; - exit_loop = true; - break; - } - d_state = STATE_PAYLOAD; + case STATE_WAIT_FOR_MSG: + if (empty_p(msg_port_id)) return 0; //no message available + // If we're in this state, nread is zero (because previous state exits loop) + // 1) Wait for msg (blocking call) + // 2) set d_remaining_symbols + // 3) Write tags + // 4) fall through to next state + d_remaining_symbols = -1; + if (!parse_header_data_msg()) { + d_state = STATE_IDLE; + exit_loop = true; + break; + } + d_state = STATE_PAYLOAD; - case STATE_PAYLOAD: - copy_symbol(in, out_payload, 1, nread, produced_payload); - if (d_remaining_symbols == 0) { - d_state = STATE_IDLE; - exit_loop = true; - } - break; + case STATE_PAYLOAD: + copy_symbol(in, out_payload, 1, nread, produced_payload); + if (d_remaining_symbols == 0) { + d_state = STATE_IDLE; + exit_loop = true; + } + break; - default: - throw std::runtime_error("invalid state"); + default: + throw std::runtime_error("invalid state"); } /* switch */ } /* while(nread < noutput_items) */ @@ -225,7 +240,7 @@ namespace gr { bool header_payload_demux_impl::parse_header_data_msg() { - pmt::pmt_t msg(delete_head_blocking(msg_port_id)); + pmt::pmt_t msg(delete_head_nowait(msg_port_id)); if (pmt::pmt_is_integer(msg)) { d_remaining_symbols = pmt::pmt_to_long(msg); add_item_tag(1, nitems_written(1), d_len_tag_key, msg); |