diff options
author | Martin Braun <martin.braun@kit.edu> | 2013-06-11 19:28:13 +0200 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2013-06-14 06:38:40 -0700 |
commit | b024982081ee4384e95d8a8958900de93c5fd064 (patch) | |
tree | c77af57c0222daea2b5d43d0f98d4eca9b89e3fb /gr-digital/lib/header_payload_demux_impl.cc | |
parent | 8ba7be2a1b0027747d256638c50a4c7b09677ea9 (diff) |
digital: HPD: fixed tag propagation, minimized calls to work()
Diffstat (limited to 'gr-digital/lib/header_payload_demux_impl.cc')
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 246 |
1 files changed, 139 insertions, 107 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index 23992e7ce1..7cc3ec3836 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -28,7 +28,6 @@ #include <gnuradio/io_signature.h> #include "header_payload_demux_impl.h" - namespace gr { namespace digital { @@ -86,6 +85,7 @@ namespace gr { d_itemsize(itemsize), d_uses_trigger_tag(!trigger_tag_key.empty()), d_state(STATE_FIND_TRIGGER), + d_curr_payload_len(0), d_payload_tag_keys(0), d_payload_tag_values(0) { @@ -95,9 +95,13 @@ namespace gr { if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) { throw std::invalid_argument("Items and symbol sizes must be at least 1."); } - if (!d_output_symbols) { + if (d_output_symbols) { + set_relative_rate(1.0 / (d_items_per_symbol + d_gi)); + } else { + set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi)); set_output_multiple(d_items_per_symbol); } + set_tag_propagation_policy(TPP_DONT); message_port_register_in(msg_port_id); set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1)); } @@ -112,7 +116,8 @@ namespace gr { int n_items_reqd = 0; if (d_state == STATE_HEADER) { n_items_reqd = d_header_len * (d_items_per_symbol + d_gi); - //} else if (d_state == STATE_HEADER) { + } else if (d_state == STATE_PAYLOAD) { + n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi); } else { n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); if (!d_output_symbols) { @@ -126,6 +131,36 @@ namespace gr { } } + + inline bool + header_payload_demux_impl::check_items_available( + int n_symbols, + gr_vector_int &ninput_items, + int noutput_items, + int nread + ) + { + // Check there's enough items on the input + if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread) + || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) { + return false; + } + + // Check there's enough space on the output buffer + if (d_output_symbols) { + if (noutput_items < n_symbols) { + return false; + } + } else { + if (noutput_items < n_symbols * d_items_per_symbol) { + return false; + } + } + + return true; + } + + int header_payload_demux_impl::general_work (int noutput_items, gr_vector_int &ninput_items, @@ -137,89 +172,77 @@ namespace gr { unsigned char *out_payload = (unsigned char *) output_items[1]; int nread = 0; - bool exit_loop = false; - int produced_hdr = 0; - int produced_payload = 0; - - while ( - nread < noutput_items - && nread < ninput_items[0] - && (ninput_items.size() == 1 || nread < ninput_items[1]) - && !exit_loop - ) { - switch (d_state) { - case STATE_WAIT_FOR_MSG: - // In an ideal world, this would never be called - return 0; - - case STATE_HEADER_RX_FAIL: - // TODO: Consume one item from input when copy_symbols has been optimized - d_state = STATE_FIND_TRIGGER; + int trigger_offset = 0; - case STATE_FIND_TRIGGER: - // 1) Search for a trigger signal on input 1 (if present) - // 2) Search for a trigger tag, make sure it's the first one - // The first trigger to be found is used! - // 3) Make sure the right number of items is skipped - // 4) If trigger found, switch to STATE_HEADER - if (find_trigger_signal(nread, noutput_items, input_items)) { - d_remaining_symbols = d_header_len; - d_state = STATE_HEADER; - in += nread * d_itemsize; - } - break; + switch (d_state) { + case STATE_WAIT_FOR_MSG: + // In an ideal world, this would never be called + return 0; - case STATE_HEADER: - copy_symbol(in, out_header, 0, nread, produced_hdr); - if (d_remaining_symbols == 0) { - d_state = STATE_WAIT_FOR_MSG; - exit_loop = true; - } - break; + case STATE_HEADER_RX_FAIL: + consume_each (1); + in += d_itemsize; + nread++; + d_state = STATE_FIND_TRIGGER; - case STATE_HEADER_RX_SUCCESS: - // TODO: Consume the entire header from the input when copy_symbols has been optimized - for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { - add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]); - } - d_state = STATE_PAYLOAD; - - case STATE_PAYLOAD: - copy_symbol(in, out_payload, 1, nread, produced_payload); - if (d_remaining_symbols == 0) { - d_state = STATE_FIND_TRIGGER; - exit_loop = true; - } + case STATE_FIND_TRIGGER: + trigger_offset = find_trigger_signal(nread, noutput_items, input_items); + if (trigger_offset == -1) { + consume_each(noutput_items - nread); break; + } + consume_each (trigger_offset); + in += trigger_offset * d_itemsize; + d_state = STATE_HEADER; - default: - throw std::runtime_error("invalid state"); - } /* switch */ - } /* while(nread < noutput_items) */ - if (!d_output_symbols) { - produced_hdr *= d_items_per_symbol; - produced_payload *= d_items_per_symbol; - } - produce(0, produced_hdr); - produce(1, produced_payload); - consume_each (nread); + case STATE_HEADER: + if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) { + copy_n_symbols(in, out_header, 0, d_header_len); + d_state = STATE_WAIT_FOR_MSG; + produce(0, d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)); + } + break; + + case STATE_HEADER_RX_SUCCESS: + for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { + add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]); + } + consume_each (d_header_len * (d_items_per_symbol + d_gi)); + in += d_header_len * (d_items_per_symbol + d_gi) * d_itemsize; + nread += d_header_len * (d_items_per_symbol + d_gi); + d_state = STATE_PAYLOAD; + + case STATE_PAYLOAD: + if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) { + copy_n_symbols(in, out_payload, 1, d_curr_payload_len); + produce(1, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + consume_each (d_curr_payload_len * (d_items_per_symbol + d_gi)); + d_state = STATE_FIND_TRIGGER; + set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); + } + break; + + default: + throw std::runtime_error("invalid state"); + } /* switch */ + return WORK_CALLED_PRODUCE; } /* general_work() */ - bool + int header_payload_demux_impl::find_trigger_signal( - int &pos, + int nread, int noutput_items, gr_vector_const_void_star &input_items) { if (input_items.size() == 2) { unsigned char *in_trigger = (unsigned char *) input_items[1]; - for (int i = 0; i < noutput_items; i++) { + in_trigger += nread; + for (int i = 0; i < noutput_items-nread; i++) { if (in_trigger[i]) { - pos = i; - return true; + return i; } } } @@ -235,12 +258,10 @@ namespace gr { } } if (tag_index != -1) { - pos = min_offset - nitems_read(0); - return true; + return min_offset - nitems_read(0); } } - pos += noutput_items; - return false; + return -1; } @@ -252,7 +273,7 @@ namespace gr { d_state = STATE_HEADER_RX_FAIL; if (pmt::is_integer(header_data)) { - d_remaining_symbols = pmt::to_long(header_data); + d_curr_payload_len = pmt::to_long(header_data); d_payload_tag_keys.push_back(d_len_tag_key); d_payload_tag_values.push_back(header_data); d_state = STATE_HEADER_RX_SUCCESS; @@ -263,7 +284,7 @@ namespace gr { d_payload_tag_keys.push_back(pmt::car(this_item)); d_payload_tag_values.push_back(pmt::cdr(this_item)); if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { - d_remaining_symbols = pmt::to_long(pmt::cdr(this_item)); + d_curr_payload_len = pmt::to_long(pmt::cdr(this_item)); d_state = STATE_HEADER_RX_SUCCESS; } dict_items = pmt::cdr(dict_items); @@ -276,53 +297,64 @@ namespace gr { } else { GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); } - if (d_state == STATE_HEADER_RX_SUCCESS - && (d_remaining_symbols * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1) - ) { - d_state = STATE_HEADER_RX_FAIL; - GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_remaining_symbols); + if (d_state == STATE_HEADER_RX_SUCCESS) + { + if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)) { + d_state = STATE_HEADER_RX_FAIL; + GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); + } else { + set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + } } } - // This is a inefficient design: Can only copy one symbol at once. TODO fix. + void - header_payload_demux_impl::copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced) + header_payload_demux_impl::copy_n_symbols( + const unsigned char *in, + unsigned char *out, + int port, + int n_symbols + ) { - std::vector<tag_t> tags; - memcpy((void *) out, - (void *) (in + d_gi * d_itemsize), - d_itemsize * d_items_per_symbol - ); - // Tags on GI - get_tags_in_range(tags, 0, - nitems_read(0) + nread, - nitems_read(0) + nread + d_gi - ); - for (unsigned t = 0; t < tags.size(); t++) { - add_item_tag(port, - nitems_written(port)+nproduced, - tags[t].key, - tags[t].value + // Copy samples + if (d_gi) { + for (int i = 0; i < n_symbols; i++) { + memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize); + in += d_itemsize * (d_items_per_symbol + d_gi); + out += d_itemsize * d_items_per_symbol; + } + } else { + memcpy( + (void *) out, + (void *) in, + n_symbols * d_items_per_symbol * d_itemsize ); } - // Tags on symbol + // Copy tags + std::vector<tag_t> tags; get_tags_in_range( tags, 0, - nitems_read(port) + nread + d_gi, - nitems_read(port) + nread + d_gi + d_items_per_symbol + nitems_read(0), + nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi) ); for (unsigned t = 0; t < tags.size(); t++) { - add_item_tag(0, - tags[t].offset - nitems_read(0)-nread + nitems_written(port)+nproduced, + int new_offset = tags[t].offset - nitems_read(0); + if (d_output_symbols) { + new_offset /= (d_items_per_symbol + d_gi); + } else if (d_gi) { + int pos_on_symbol = (new_offset % (d_items_per_symbol + d_gi)) - d_gi; + if (pos_on_symbol < 0) { + pos_on_symbol = 0; + } + new_offset = (new_offset / (d_items_per_symbol + d_gi)) + pos_on_symbol; + } + add_item_tag(port, + nitems_written(port) + new_offset, tags[t].key, tags[t].value ); } - in += d_itemsize * (d_items_per_symbol + d_gi); - out += d_items_per_symbol * d_itemsize; - nread += d_items_per_symbol + d_gi; - nproduced++; - d_remaining_symbols--; } } /* namespace digital */ |