summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gr-digital/lib/header_payload_demux_impl.cc95
-rw-r--r--gr-digital/lib/header_payload_demux_impl.h7
2 files changed, 57 insertions, 45 deletions
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc
index 71868733b6..23992e7ce1 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -24,20 +24,21 @@
#endif
#include <climits>
+#include <boost/format.hpp>
#include <gnuradio/io_signature.h>
#include "header_payload_demux_impl.h"
namespace gr {
namespace digital {
- // FIXME this is a completely arbitrary number, and depends on the signal params and the buffer sizes
- const int MAX_SYMBOLS = 100;
enum demux_states_t {
- STATE_IDLE,
- STATE_HEADER,
- STATE_WAIT_FOR_MSG,
- STATE_PAYLOAD
+ STATE_FIND_TRIGGER, // "Idle" state (waiting for burst)
+ STATE_HEADER, // Copy header
+ STATE_WAIT_FOR_MSG, // Null state (wait until msg from header demod)
+ STATE_HEADER_RX_SUCCESS, // Header processing
+ STATE_HEADER_RX_FAIL, // "
+ STATE_PAYLOAD // Copy payload
};
#define msg_port_id pmt::mp("header_data")
@@ -84,7 +85,9 @@ namespace gr {
d_output_symbols(output_symbols),
d_itemsize(itemsize),
d_uses_trigger_tag(!trigger_tag_key.empty()),
- d_state(STATE_IDLE)
+ d_state(STATE_FIND_TRIGGER),
+ d_payload_tag_keys(0),
+ d_payload_tag_values(0)
{
if (d_header_len < 1) {
throw std::invalid_argument("Header length must be at least 1 symbol.");
@@ -96,6 +99,7 @@ namespace gr {
set_output_multiple(d_items_per_symbol);
}
message_port_register_in(msg_port_id);
+ set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
}
header_payload_demux_impl::~header_payload_demux_impl()
@@ -144,7 +148,15 @@ namespace gr {
&& !exit_loop
) {
switch (d_state) {
- case STATE_IDLE:
+ case STATE_WAIT_FOR_MSG:
+ // In an ideal world, this would never be called
+ return 0;
+
+ case STATE_HEADER_RX_FAIL:
+ // TODO: Consume one item from input when copy_symbols has been optimized
+ d_state = STATE_FIND_TRIGGER;
+
+ case STATE_FIND_TRIGGER:
// 1) Search for a trigger signal on input 1 (if present)
// 2) Search for a trigger tag, make sure it's the first one
// The first trigger to be found is used!
@@ -165,29 +177,17 @@ namespace gr {
}
break;
- case STATE_WAIT_FOR_MSG:
- if (empty_p(msg_port_id)) return 0; //no message available
- // If we're in this state, nread is zero (because previous state exits loop)
- // 1) Wait for msg (blocking call)
- // 2) set d_remaining_symbols
- // 3) Write tags
- // 4) fall through to next state
- d_remaining_symbols = -1;
- // TODO MAX_SYMBOLS depends on the buffer size in the payload demod chain
- if (!parse_header_data_msg() || d_remaining_symbols > MAX_SYMBOLS) {
- if (d_remaining_symbols > MAX_SYMBOLS) {
- GR_LOG_INFO(d_logger, "Detected a packet larger than max frame size");
- }
- d_state = STATE_IDLE;
- exit_loop = true;
- break;
+ case STATE_HEADER_RX_SUCCESS:
+ // TODO: Consume the entire header from the input when copy_symbols has been optimized
+ for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
+ add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]);
}
d_state = STATE_PAYLOAD;
case STATE_PAYLOAD:
copy_symbol(in, out_payload, 1, nread, produced_payload);
if (d_remaining_symbols == 0) {
- d_state = STATE_IDLE;
+ d_state = STATE_FIND_TRIGGER;
exit_loop = true;
}
break;
@@ -244,36 +244,47 @@ namespace gr {
}
- bool
- header_payload_demux_impl::parse_header_data_msg()
+ void
+ header_payload_demux_impl::parse_header_data_msg(pmt::pmt_t header_data)
{
- pmt::pmt_t msg(delete_head_nowait(msg_port_id));
- if (pmt::is_integer(msg)) {
- d_remaining_symbols = pmt::to_long(msg);
- add_item_tag(1, nitems_written(1), d_len_tag_key, msg);
- } else if (pmt::is_dict(msg)) {
- pmt::pmt_t dict_items(pmt::dict_items(msg));
+ d_payload_tag_keys.clear();
+ d_payload_tag_values.clear();
+ d_state = STATE_HEADER_RX_FAIL;
+
+ if (pmt::is_integer(header_data)) {
+ d_remaining_symbols = pmt::to_long(header_data);
+ d_payload_tag_keys.push_back(d_len_tag_key);
+ d_payload_tag_values.push_back(header_data);
+ d_state = STATE_HEADER_RX_SUCCESS;
+ } else if (pmt::is_dict(header_data)) {
+ pmt::pmt_t dict_items(pmt::dict_items(header_data));
while (!pmt::is_null(dict_items)) {
pmt::pmt_t this_item(pmt::car(dict_items));
- add_item_tag(1, nitems_written(1), pmt::car(this_item), pmt::cdr(this_item));
+ d_payload_tag_keys.push_back(pmt::car(this_item));
+ d_payload_tag_values.push_back(pmt::cdr(this_item));
if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
d_remaining_symbols = pmt::to_long(pmt::cdr(this_item));
+ d_state = STATE_HEADER_RX_SUCCESS;
}
dict_items = pmt::cdr(dict_items);
}
- if (d_remaining_symbols == -1) {
- throw std::runtime_error("no length tag passed from header data");
+ if (d_state == STATE_HEADER_RX_FAIL) {
+ GR_LOG_CRIT(d_logger, "no length tag passed from header data");
}
- } else if (pmt::is_null(msg)) { // Blocking call was interrupted
- return false;
- } else if (msg == pmt::PMT_F) { // Header was invalid
- return false;
+ } else if (header_data == pmt::PMT_F || pmt::is_null(header_data)) {
+ GR_LOG_INFO(d_logger, boost::format("Parser returned %1%") % pmt::write_string(header_data));
} else {
- throw std::runtime_error("Received illegal header data");
+ GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data));
+ }
+ if (d_state == STATE_HEADER_RX_SUCCESS
+ && (d_remaining_symbols * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)
+ ) {
+ d_state = STATE_HEADER_RX_FAIL;
+ GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_remaining_symbols);
}
- return true;
}
+ // This is a inefficient design: Can only copy one symbol at once. TODO fix.
void
header_payload_demux_impl::copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced)
{
diff --git a/gr-digital/lib/header_payload_demux_impl.h b/gr-digital/lib/header_payload_demux_impl.h
index a11430b175..9843977b17 100644
--- a/gr-digital/lib/header_payload_demux_impl.h
+++ b/gr-digital/lib/header_payload_demux_impl.h
@@ -38,14 +38,15 @@ namespace gr {
bool d_output_symbols; //!< If true, output is symbols, not items
size_t d_itemsize; //!< Bytes per item
bool d_uses_trigger_tag; //!< If a trigger tag is used
- int d_ninput_items_reqd; //!< Helper for forecast()
int d_state; //!< Current read state
int d_remaining_symbols; //!< When in payload or header state, the number of symbols still to transmit
+ std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for PMTs that go on the payload (keys)
+ std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for PMTs that go on the payload (values)
// Helpers to make the state machine more readable
- //! Helper function that does the reading from the msg port
- bool parse_header_data_msg();
+ //! Message handler: Reads the result from the header demod and sets length tag (and other tags)
+ void parse_header_data_msg(pmt::pmt_t header_data);
//! Helper function that returns true if a trigger signal is detected.
// Searches input 1 (if active), then the tags. Sets \p pos to the position