diff options
author | Martin Braun <martin.braun@kit.edu> | 2013-06-09 17:29:21 +0200 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2013-06-14 06:38:30 -0700 |
commit | 8ba7be2a1b0027747d256638c50a4c7b09677ea9 (patch) | |
tree | e8cb55983f72846c64ffee8e38fa3f5b886382fa | |
parent | 7e62087ae6f8a41c9b8545edb37f1bb352472765 (diff) |
digital: HPD now uses a registered message handler
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 95 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.h | 7 |
2 files changed, 57 insertions, 45 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index 71868733b6..23992e7ce1 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -24,20 +24,21 @@ #endif #include <climits> +#include <boost/format.hpp> #include <gnuradio/io_signature.h> #include "header_payload_demux_impl.h" namespace gr { namespace digital { - // FIXME this is a completely arbitrary number, and depends on the signal params and the buffer sizes - const int MAX_SYMBOLS = 100; enum demux_states_t { - STATE_IDLE, - STATE_HEADER, - STATE_WAIT_FOR_MSG, - STATE_PAYLOAD + STATE_FIND_TRIGGER, // "Idle" state (waiting for burst) + STATE_HEADER, // Copy header + STATE_WAIT_FOR_MSG, // Null state (wait until msg from header demod) + STATE_HEADER_RX_SUCCESS, // Header processing + STATE_HEADER_RX_FAIL, // " + STATE_PAYLOAD // Copy payload }; #define msg_port_id pmt::mp("header_data") @@ -84,7 +85,9 @@ namespace gr { d_output_symbols(output_symbols), d_itemsize(itemsize), d_uses_trigger_tag(!trigger_tag_key.empty()), - d_state(STATE_IDLE) + d_state(STATE_FIND_TRIGGER), + d_payload_tag_keys(0), + d_payload_tag_values(0) { if (d_header_len < 1) { throw std::invalid_argument("Header length must be at least 1 symbol."); @@ -96,6 +99,7 @@ namespace gr { set_output_multiple(d_items_per_symbol); } message_port_register_in(msg_port_id); + set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1)); } header_payload_demux_impl::~header_payload_demux_impl() @@ -144,7 +148,15 @@ namespace gr { && !exit_loop ) { switch (d_state) { - case STATE_IDLE: + case STATE_WAIT_FOR_MSG: + // In an ideal world, this would never be called + return 0; + + case STATE_HEADER_RX_FAIL: + // TODO: Consume one item from input when copy_symbols has been optimized + d_state = STATE_FIND_TRIGGER; + + case STATE_FIND_TRIGGER: // 1) Search for a trigger signal on input 1 (if present) // 2) Search for a trigger tag, make sure it's the first one // The first trigger to be found is used! @@ -165,29 +177,17 @@ namespace gr { } break; - case STATE_WAIT_FOR_MSG: - if (empty_p(msg_port_id)) return 0; //no message available - // If we're in this state, nread is zero (because previous state exits loop) - // 1) Wait for msg (blocking call) - // 2) set d_remaining_symbols - // 3) Write tags - // 4) fall through to next state - d_remaining_symbols = -1; - // TODO MAX_SYMBOLS depends on the buffer size in the payload demod chain - if (!parse_header_data_msg() || d_remaining_symbols > MAX_SYMBOLS) { - if (d_remaining_symbols > MAX_SYMBOLS) { - GR_LOG_INFO(d_logger, "Detected a packet larger than max frame size"); - } - d_state = STATE_IDLE; - exit_loop = true; - break; + case STATE_HEADER_RX_SUCCESS: + // TODO: Consume the entire header from the input when copy_symbols has been optimized + for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { + add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]); } d_state = STATE_PAYLOAD; case STATE_PAYLOAD: copy_symbol(in, out_payload, 1, nread, produced_payload); if (d_remaining_symbols == 0) { - d_state = STATE_IDLE; + d_state = STATE_FIND_TRIGGER; exit_loop = true; } break; @@ -244,36 +244,47 @@ namespace gr { } - bool - header_payload_demux_impl::parse_header_data_msg() + void + header_payload_demux_impl::parse_header_data_msg(pmt::pmt_t header_data) { - pmt::pmt_t msg(delete_head_nowait(msg_port_id)); - if (pmt::is_integer(msg)) { - d_remaining_symbols = pmt::to_long(msg); - add_item_tag(1, nitems_written(1), d_len_tag_key, msg); - } else if (pmt::is_dict(msg)) { - pmt::pmt_t dict_items(pmt::dict_items(msg)); + d_payload_tag_keys.clear(); + d_payload_tag_values.clear(); + d_state = STATE_HEADER_RX_FAIL; + + if (pmt::is_integer(header_data)) { + d_remaining_symbols = pmt::to_long(header_data); + d_payload_tag_keys.push_back(d_len_tag_key); + d_payload_tag_values.push_back(header_data); + d_state = STATE_HEADER_RX_SUCCESS; + } else if (pmt::is_dict(header_data)) { + pmt::pmt_t dict_items(pmt::dict_items(header_data)); while (!pmt::is_null(dict_items)) { pmt::pmt_t this_item(pmt::car(dict_items)); - add_item_tag(1, nitems_written(1), pmt::car(this_item), pmt::cdr(this_item)); + d_payload_tag_keys.push_back(pmt::car(this_item)); + d_payload_tag_values.push_back(pmt::cdr(this_item)); if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { d_remaining_symbols = pmt::to_long(pmt::cdr(this_item)); + d_state = STATE_HEADER_RX_SUCCESS; } dict_items = pmt::cdr(dict_items); } - if (d_remaining_symbols == -1) { - throw std::runtime_error("no length tag passed from header data"); + if (d_state == STATE_HEADER_RX_FAIL) { + GR_LOG_CRIT(d_logger, "no length tag passed from header data"); } - } else if (pmt::is_null(msg)) { // Blocking call was interrupted - return false; - } else if (msg == pmt::PMT_F) { // Header was invalid - return false; + } else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) { + GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data)); } else { - throw std::runtime_error("Received illegal header data"); + GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); + } + if (d_state == STATE_HEADER_RX_SUCCESS + && (d_remaining_symbols * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1) + ) { + d_state = STATE_HEADER_RX_FAIL; + GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_remaining_symbols); } - return true; } + // This is a inefficient design: Can only copy one symbol at once. TODO fix. void header_payload_demux_impl::copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced) { diff --git a/gr-digital/lib/header_payload_demux_impl.h b/gr-digital/lib/header_payload_demux_impl.h index a11430b175..9843977b17 100644 --- a/gr-digital/lib/header_payload_demux_impl.h +++ b/gr-digital/lib/header_payload_demux_impl.h @@ -38,14 +38,15 @@ namespace gr { bool d_output_symbols; //!< If true, output is symbols, not items size_t d_itemsize; //!< Bytes per item bool d_uses_trigger_tag; //!< If a trigger tag is used - int d_ninput_items_reqd; //!< Helper for forecast() int d_state; //!< Current read state int d_remaining_symbols; //!< When in payload or header state, the number of symbols still to transmit + std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for PMTs that go on the payload (keys) + std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for PMTs that go on the payload (values) // Helpers to make the state machine more readable - //! Helper function that does the reading from the msg port - bool parse_header_data_msg(); + //! Message handler: Reads the result from the header demod and sets length tag (and other tags) + void parse_header_data_msg(pmt::pmt_t header_data); //! Helper function that returns true if a trigger signal is detected. // Searches input 1 (if active), then the tags. Sets \p pos to the position |