summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@kit.edu>2013-06-11 19:28:13 +0200
committerJohnathan Corgan <johnathan@corganlabs.com>2013-06-14 06:38:40 -0700
commitb024982081ee4384e95d8a8958900de93c5fd064 (patch)
treec77af57c0222daea2b5d43d0f98d4eca9b89e3fb
parent8ba7be2a1b0027747d256638c50a4c7b09677ea9 (diff)
digital: HPD: fixed tag propagation, minimized calls to work()
-rw-r--r--gr-digital/examples/ofdm/ofdm_loopback.grc80
-rw-r--r--gr-digital/lib/header_payload_demux_impl.cc246
-rw-r--r--gr-digital/lib/header_payload_demux_impl.h24
-rwxr-xr-xgr-digital/python/digital/qa_header_payload_demux.py179
4 files changed, 371 insertions, 158 deletions
diff --git a/gr-digital/examples/ofdm/ofdm_loopback.grc b/gr-digital/examples/ofdm/ofdm_loopback.grc
index 80dd12f2a5..a6b3b147f1 100644
--- a/gr-digital/examples/ofdm/ofdm_loopback.grc
+++ b/gr-digital/examples/ofdm/ofdm_loopback.grc
@@ -1,6 +1,6 @@
<?xml version='1.0' encoding='ASCII'?>
<flow_graph>
- <timestamp>Wed Jun 5 11:54:15 2013</timestamp>
+ <timestamp>Tue Jun 11 22:44:34 2013</timestamp>
<block>
<key>options</key>
<param>
@@ -385,45 +385,6 @@
</param>
</block>
<block>
- <key>blocks_tag_debug</key>
- <param>
- <key>id</key>
- <value>blocks_tag_debug_0</value>
- </param>
- <param>
- <key>_enabled</key>
- <value>True</value>
- </param>
- <param>
- <key>type</key>
- <value>byte</value>
- </param>
- <param>
- <key>name</key>
- <value>Rx Packets</value>
- </param>
- <param>
- <key>num_inputs</key>
- <value>1</value>
- </param>
- <param>
- <key>vlen</key>
- <value>1</value>
- </param>
- <param>
- <key>display</key>
- <value>True</value>
- </param>
- <param>
- <key>_coordinate</key>
- <value>(333, 367)</value>
- </param>
- <param>
- <key>_rotation</key>
- <value>0</value>
- </param>
- </block>
- <block>
<key>digital_ofdm_rx</key>
<param>
<key>id</key>
@@ -845,6 +806,45 @@
<value>180</value>
</param>
</block>
+ <block>
+ <key>blocks_tag_debug</key>
+ <param>
+ <key>id</key>
+ <value>blocks_tag_debug_0</value>
+ </param>
+ <param>
+ <key>_enabled</key>
+ <value>True</value>
+ </param>
+ <param>
+ <key>type</key>
+ <value>byte</value>
+ </param>
+ <param>
+ <key>name</key>
+ <value>Rx Packets</value>
+ </param>
+ <param>
+ <key>num_inputs</key>
+ <value>1</value>
+ </param>
+ <param>
+ <key>vlen</key>
+ <value>1</value>
+ </param>
+ <param>
+ <key>display</key>
+ <value>True</value>
+ </param>
+ <param>
+ <key>_coordinate</key>
+ <value>(333, 367)</value>
+ </param>
+ <param>
+ <key>_rotation</key>
+ <value>0</value>
+ </param>
+ </block>
<connection>
<source_block_id>blocks_vector_source_x_0</source_block_id>
<sink_block_id>blocks_vector_to_stream_0</sink_block_id>
diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc
index 23992e7ce1..7cc3ec3836 100644
--- a/gr-digital/lib/header_payload_demux_impl.cc
+++ b/gr-digital/lib/header_payload_demux_impl.cc
@@ -28,7 +28,6 @@
#include <gnuradio/io_signature.h>
#include "header_payload_demux_impl.h"
-
namespace gr {
namespace digital {
@@ -86,6 +85,7 @@ namespace gr {
d_itemsize(itemsize),
d_uses_trigger_tag(!trigger_tag_key.empty()),
d_state(STATE_FIND_TRIGGER),
+ d_curr_payload_len(0),
d_payload_tag_keys(0),
d_payload_tag_values(0)
{
@@ -95,9 +95,13 @@ namespace gr {
if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) {
throw std::invalid_argument("Items and symbol sizes must be at least 1.");
}
- if (!d_output_symbols) {
+ if (d_output_symbols) {
+ set_relative_rate(1.0 / (d_items_per_symbol + d_gi));
+ } else {
+ set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi));
set_output_multiple(d_items_per_symbol);
}
+ set_tag_propagation_policy(TPP_DONT);
message_port_register_in(msg_port_id);
set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1));
}
@@ -112,7 +116,8 @@ namespace gr {
int n_items_reqd = 0;
if (d_state == STATE_HEADER) {
n_items_reqd = d_header_len * (d_items_per_symbol + d_gi);
- //} else if (d_state == STATE_HEADER) {
+ } else if (d_state == STATE_PAYLOAD) {
+ n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi);
} else {
n_items_reqd = noutput_items * (d_items_per_symbol + d_gi);
if (!d_output_symbols) {
@@ -126,6 +131,36 @@ namespace gr {
}
}
+
+ inline bool
+ header_payload_demux_impl::check_items_available(
+ int n_symbols,
+ gr_vector_int &ninput_items,
+ int noutput_items,
+ int nread
+ )
+ {
+ // Check there's enough items on the input
+ if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread)
+ || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) {
+ return false;
+ }
+
+ // Check there's enough space on the output buffer
+ if (d_output_symbols) {
+ if (noutput_items < n_symbols) {
+ return false;
+ }
+ } else {
+ if (noutput_items < n_symbols * d_items_per_symbol) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+
int
header_payload_demux_impl::general_work (int noutput_items,
gr_vector_int &ninput_items,
@@ -137,89 +172,77 @@ namespace gr {
unsigned char *out_payload = (unsigned char *) output_items[1];
int nread = 0;
- bool exit_loop = false;
- int produced_hdr = 0;
- int produced_payload = 0;
-
- while (
- nread < noutput_items
- && nread < ninput_items[0]
- && (ninput_items.size() == 1 || nread < ninput_items[1])
- && !exit_loop
- ) {
- switch (d_state) {
- case STATE_WAIT_FOR_MSG:
- // In an ideal world, this would never be called
- return 0;
-
- case STATE_HEADER_RX_FAIL:
- // TODO: Consume one item from input when copy_symbols has been optimized
- d_state = STATE_FIND_TRIGGER;
+ int trigger_offset = 0;
- case STATE_FIND_TRIGGER:
- // 1) Search for a trigger signal on input 1 (if present)
- // 2) Search for a trigger tag, make sure it's the first one
- // The first trigger to be found is used!
- // 3) Make sure the right number of items is skipped
- // 4) If trigger found, switch to STATE_HEADER
- if (find_trigger_signal(nread, noutput_items, input_items)) {
- d_remaining_symbols = d_header_len;
- d_state = STATE_HEADER;
- in += nread * d_itemsize;
- }
- break;
+ switch (d_state) {
+ case STATE_WAIT_FOR_MSG:
+ // In an ideal world, this would never be called
+ return 0;
- case STATE_HEADER:
- copy_symbol(in, out_header, 0, nread, produced_hdr);
- if (d_remaining_symbols == 0) {
- d_state = STATE_WAIT_FOR_MSG;
- exit_loop = true;
- }
- break;
+ case STATE_HEADER_RX_FAIL:
+ consume_each (1);
+ in += d_itemsize;
+ nread++;
+ d_state = STATE_FIND_TRIGGER;
- case STATE_HEADER_RX_SUCCESS:
- // TODO: Consume the entire header from the input when copy_symbols has been optimized
- for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
- add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]);
- }
- d_state = STATE_PAYLOAD;
-
- case STATE_PAYLOAD:
- copy_symbol(in, out_payload, 1, nread, produced_payload);
- if (d_remaining_symbols == 0) {
- d_state = STATE_FIND_TRIGGER;
- exit_loop = true;
- }
+ case STATE_FIND_TRIGGER:
+ trigger_offset = find_trigger_signal(nread, noutput_items, input_items);
+ if (trigger_offset == -1) {
+ consume_each(noutput_items - nread);
break;
+ }
+ consume_each (trigger_offset);
+ in += trigger_offset * d_itemsize;
+ d_state = STATE_HEADER;
- default:
- throw std::runtime_error("invalid state");
- } /* switch */
- } /* while(nread < noutput_items) */
- if (!d_output_symbols) {
- produced_hdr *= d_items_per_symbol;
- produced_payload *= d_items_per_symbol;
- }
- produce(0, produced_hdr);
- produce(1, produced_payload);
- consume_each (nread);
+ case STATE_HEADER:
+ if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) {
+ copy_n_symbols(in, out_header, 0, d_header_len);
+ d_state = STATE_WAIT_FOR_MSG;
+ produce(0, d_header_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ }
+ break;
+
+ case STATE_HEADER_RX_SUCCESS:
+ for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) {
+ add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]);
+ }
+ consume_each (d_header_len * (d_items_per_symbol + d_gi));
+ in += d_header_len * (d_items_per_symbol + d_gi) * d_itemsize;
+ nread += d_header_len * (d_items_per_symbol + d_gi);
+ d_state = STATE_PAYLOAD;
+
+ case STATE_PAYLOAD:
+ if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) {
+ copy_n_symbols(in, out_payload, 1, d_curr_payload_len);
+ produce(1, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ consume_each (d_curr_payload_len * (d_items_per_symbol + d_gi));
+ d_state = STATE_FIND_TRIGGER;
+ set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi));
+ }
+ break;
+
+ default:
+ throw std::runtime_error("invalid state");
+ } /* switch */
+
return WORK_CALLED_PRODUCE;
} /* general_work() */
- bool
+ int
header_payload_demux_impl::find_trigger_signal(
- int &pos,
+ int nread,
int noutput_items,
gr_vector_const_void_star &input_items)
{
if (input_items.size() == 2) {
unsigned char *in_trigger = (unsigned char *) input_items[1];
- for (int i = 0; i < noutput_items; i++) {
+ in_trigger += nread;
+ for (int i = 0; i < noutput_items-nread; i++) {
if (in_trigger[i]) {
- pos = i;
- return true;
+ return i;
}
}
}
@@ -235,12 +258,10 @@ namespace gr {
}
}
if (tag_index != -1) {
- pos = min_offset - nitems_read(0);
- return true;
+ return min_offset - nitems_read(0);
}
}
- pos += noutput_items;
- return false;
+ return -1;
}
@@ -252,7 +273,7 @@ namespace gr {
d_state = STATE_HEADER_RX_FAIL;
if (pmt::is_integer(header_data)) {
- d_remaining_symbols = pmt::to_long(header_data);
+ d_curr_payload_len = pmt::to_long(header_data);
d_payload_tag_keys.push_back(d_len_tag_key);
d_payload_tag_values.push_back(header_data);
d_state = STATE_HEADER_RX_SUCCESS;
@@ -263,7 +284,7 @@ namespace gr {
d_payload_tag_keys.push_back(pmt::car(this_item));
d_payload_tag_values.push_back(pmt::cdr(this_item));
if (pmt::equal(pmt::car(this_item), d_len_tag_key)) {
- d_remaining_symbols = pmt::to_long(pmt::cdr(this_item));
+ d_curr_payload_len = pmt::to_long(pmt::cdr(this_item));
d_state = STATE_HEADER_RX_SUCCESS;
}
dict_items = pmt::cdr(dict_items);
@@ -276,53 +297,64 @@ namespace gr {
} else {
GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data));
}
- if (d_state == STATE_HEADER_RX_SUCCESS
- && (d_remaining_symbols * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)
- ) {
- d_state = STATE_HEADER_RX_FAIL;
- GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_remaining_symbols);
+ if (d_state == STATE_HEADER_RX_SUCCESS)
+ {
+ if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)) {
+ d_state = STATE_HEADER_RX_FAIL;
+ GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len);
+ } else {
+ set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol));
+ }
}
}
- // This is a inefficient design: Can only copy one symbol at once. TODO fix.
+
void
- header_payload_demux_impl::copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced)
+ header_payload_demux_impl::copy_n_symbols(
+ const unsigned char *in,
+ unsigned char *out,
+ int port,
+ int n_symbols
+ )
{
- std::vector<tag_t> tags;
- memcpy((void *) out,
- (void *) (in + d_gi * d_itemsize),
- d_itemsize * d_items_per_symbol
- );
- // Tags on GI
- get_tags_in_range(tags, 0,
- nitems_read(0) + nread,
- nitems_read(0) + nread + d_gi
- );
- for (unsigned t = 0; t < tags.size(); t++) {
- add_item_tag(port,
- nitems_written(port)+nproduced,
- tags[t].key,
- tags[t].value
+ // Copy samples
+ if (d_gi) {
+ for (int i = 0; i < n_symbols; i++) {
+ memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize);
+ in += d_itemsize * (d_items_per_symbol + d_gi);
+ out += d_itemsize * d_items_per_symbol;
+ }
+ } else {
+ memcpy(
+ (void *) out,
+ (void *) in,
+ n_symbols * d_items_per_symbol * d_itemsize
);
}
- // Tags on symbol
+ // Copy tags
+ std::vector<tag_t> tags;
get_tags_in_range(
tags, 0,
- nitems_read(port) + nread + d_gi,
- nitems_read(port) + nread + d_gi + d_items_per_symbol
+ nitems_read(0),
+ nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi)
);
for (unsigned t = 0; t < tags.size(); t++) {
- add_item_tag(0,
- tags[t].offset - nitems_read(0)-nread + nitems_written(port)+nproduced,
+ int new_offset = tags[t].offset - nitems_read(0);
+ if (d_output_symbols) {
+ new_offset /= (d_items_per_symbol + d_gi);
+ } else if (d_gi) {
+ int pos_on_symbol = (new_offset % (d_items_per_symbol + d_gi)) - d_gi;
+ if (pos_on_symbol < 0) {
+ pos_on_symbol = 0;
+ }
+ new_offset = (new_offset / (d_items_per_symbol + d_gi)) + pos_on_symbol;
+ }
+ add_item_tag(port,
+ nitems_written(port) + new_offset,
tags[t].key,
tags[t].value
);
}
- in += d_itemsize * (d_items_per_symbol + d_gi);
- out += d_items_per_symbol * d_itemsize;
- nread += d_items_per_symbol + d_gi;
- nproduced++;
- d_remaining_symbols--;
}
} /* namespace digital */
diff --git a/gr-digital/lib/header_payload_demux_impl.h b/gr-digital/lib/header_payload_demux_impl.h
index 9843977b17..61b5e10090 100644
--- a/gr-digital/lib/header_payload_demux_impl.h
+++ b/gr-digital/lib/header_payload_demux_impl.h
@@ -39,28 +39,36 @@ namespace gr {
size_t d_itemsize; //!< Bytes per item
bool d_uses_trigger_tag; //!< If a trigger tag is used
int d_state; //!< Current read state
- int d_remaining_symbols; //!< When in payload or header state, the number of symbols still to transmit
+ int d_curr_payload_len; //!< Length of the next payload (symbols)
std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for PMTs that go on the payload (keys)
std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for PMTs that go on the payload (values)
// Helpers to make the state machine more readable
+ //! Checks if there are enough items on the inputs and enough space on the output buffers to copy \p n_symbols symbols
+ inline bool check_items_available(int n_symbols, gr_vector_int &ninput_items, int noutput_items, int nread);
+
//! Message handler: Reads the result from the header demod and sets length tag (and other tags)
void parse_header_data_msg(pmt::pmt_t header_data);
//! Helper function that returns true if a trigger signal is detected.
- // Searches input 1 (if active), then the tags. Sets \p pos to the position
- // of the first tag.
- bool find_trigger_signal(
- int &pos,
+ // Searches input 1 (if active), then the tags. Returns the offset in the input buffer
+ // (or -1 if none is found)
+ int find_trigger_signal(
+ int nread,
int noutput_items,
gr_vector_const_void_star &input_items);
- //! Helper function, copies one symbol from in to out and updates all pointers and counters
- void copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced);
+ //! Copies n symbols from in to out, makes sure tags are propagated properly
+ void copy_n_symbols(
+ const unsigned char *in,
+ unsigned char *out,
+ int port,
+ int n_symbols
+ );
- public:
+ public:
header_payload_demux_impl(
int header_len,
int items_per_symbol,
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py
index e0ade4e5fa..89df6d645a 100755
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -36,15 +36,36 @@ class qa_header_payload_demux (gr_unittest.TestCase):
""" Simplest possible test: put in zeros, then header,
then payload, trigger signal, try to demux.
The return signal from the header parser is faked via _post()
+ Add in some tags for fun.
"""
- n_zeros = 100
+ n_zeros = 1
header = (1, 2, 3)
- payload = tuple(range(17))
+ payload = tuple(range(5, 20))
data_signal = (0,) * n_zeros + header + payload
trigger_signal = [0,] * len(data_signal)
trigger_signal[n_zeros] = 1
+ # This is dropped:
+ testtag1 = gr.tag_t()
+ testtag1.offset = 0
+ testtag1.key = pmt.string_to_symbol('tag1')
+ testtag1.value = pmt.from_long(0)
+ # This goes on output 0, item 0:
+ testtag2 = gr.tag_t()
+ testtag2.offset = n_zeros
+ testtag2.key = pmt.string_to_symbol('tag2')
+ testtag2.value = pmt.from_long(23)
+ # This goes on output 0, item 2:
+ testtag3 = gr.tag_t()
+ testtag3.offset = n_zeros + len(header) - 1
+ testtag3.key = pmt.string_to_symbol('tag3')
+ testtag3.value = pmt.from_long(42)
+ # This goes on output 1, item 3:
+ testtag4 = gr.tag_t()
+ testtag4.offset = n_zeros + len(header) + 3
+ testtag4.key = pmt.string_to_symbol('tag4')
+ testtag4.value = pmt.from_long(314)
- data_src = blocks.vector_source_f(data_signal, False)
+ data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4))
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
@@ -70,7 +91,159 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload)
+ ptags_header = []
+ for tag in header_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_header = [
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 2},
+ ]
+ self.assertEqual(expected_tags_header, ptags_header)
+ ptags_payload = []
+ for tag in payload_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_payload = [
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 3},
+ ]
+ self.assertEqual(expected_tags_payload, ptags_payload)
+ def test_002_symbols (self):
+ """
+ Same as before, but operate on symbols
+ """
+ n_zeros = 1
+ items_per_symbol = 3
+ gi = 1
+ n_symbols = 4
+ header = (1, 2, 3)
+ payload = (1, 2, 3)
+ data_signal = (0,) * n_zeros + (0,) + header + ((0,) + payload) * n_symbols
+ trigger_signal = [0,] * len(data_signal)
+ trigger_signal[n_zeros] = 1
+ # This is dropped:
+ testtag1 = gr.tag_t()
+ testtag1.offset = 0
+ testtag1.key = pmt.string_to_symbol('tag1')
+ testtag1.value = pmt.from_long(0)
+ # This goes on output 0, item 0 (from the GI)
+ testtag2 = gr.tag_t()
+ testtag2.offset = n_zeros
+ testtag2.key = pmt.string_to_symbol('tag2')
+ testtag2.value = pmt.from_long(23)
+ # This goes on output 0, item 0 (middle of the header symbol)
+ testtag3 = gr.tag_t()
+ testtag3.offset = n_zeros + gi + 1
+ testtag3.key = pmt.string_to_symbol('tag3')
+ testtag3.value = pmt.from_long(42)
+ # This goes on output 1, item 1 (middle of the first payload symbol)
+ testtag4 = gr.tag_t()
+ testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1
+ testtag4.key = pmt.string_to_symbol('tag4')
+ testtag4.value = pmt.from_long(314)
+ data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4))
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header) / items_per_symbol, # Header length (in symbols)
+ items_per_symbol, # Items per symbols
+ gi, # Items per guard time
+ "frame_len", # Frame length tag key
+ "detect", # Trigger tag key
+ True, # Output symbols (not items)
+ gr.sizeof_float # Bytes per item
+ )
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
+ header_sink = blocks.vector_sink_f(items_per_symbol)
+ payload_sink = blocks.vector_sink_f(items_per_symbol)
+ self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), header_sink)
+ self.tb.connect((hpd, 1), payload_sink)
+ self.tb.start()
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(n_symbols)
+ )
+ while len(payload_sink.data()) < len(payload) * n_symbols:
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(header_sink.data(), header)
+ self.assertEqual(payload_sink.data(), payload * n_symbols)
+ ptags_header = []
+ for tag in header_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_header = [
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 0},
+ ]
+ self.assertEqual(expected_tags_header, ptags_header)
+ ptags_payload = []
+ for tag in payload_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_payload = [
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 1},
+ ]
+ self.assertEqual(expected_tags_payload, ptags_payload)
+
+ def test_003_t (self):
+ """
+ Like test 1, but twice, plus one fail
+ """
+ n_zeros = 5
+ header = (1, 2, 3)
+ header_fail = (-1, -2, -4) # Contents don't really matter
+ payload1 = tuple(range(5, 20))
+ payload2 = (42,)
+ data_signal = (0,) * n_zeros + header + payload1
+ trigger_signal = [0,] * len(data_signal) * 2
+ trigger_signal[n_zeros] = 1
+ trigger_signal[len(data_signal)] = 1
+ trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1
+ tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000
+ data_src = blocks.vector_source_f(tx_signal, False)
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
+ )
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
+ header_sink = blocks.vector_sink_f()
+ payload_sink = blocks.vector_sink_f()
+ self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), header_sink)
+ self.tb.connect((hpd, 1), payload_sink)
+ self.tb.start()
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload1))
+ )
+ while len(payload_sink.data()) < len(payload1):
+ time.sleep(.2)
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.PMT_F
+ )
+ # This next command is a bit of a showstopper, but there's no condition to check upon
+ # to see if the previous msg handling is finished
+ time.sleep(.7)
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload2))
+ )
+ while len(payload_sink.data()) < len(payload1) + len(payload2):
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(header_sink.data(), header + header_fail + header)
+ self.assertEqual(payload_sink.data(), payload1 + payload2)
if __name__ == '__main__':
gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml")