diff options
-rw-r--r-- | gr-digital/examples/ofdm/ofdm_loopback.grc | 80 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.cc | 246 | ||||
-rw-r--r-- | gr-digital/lib/header_payload_demux_impl.h | 24 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_header_payload_demux.py | 179 |
4 files changed, 371 insertions, 158 deletions
diff --git a/gr-digital/examples/ofdm/ofdm_loopback.grc b/gr-digital/examples/ofdm/ofdm_loopback.grc index 80dd12f2a5..a6b3b147f1 100644 --- a/gr-digital/examples/ofdm/ofdm_loopback.grc +++ b/gr-digital/examples/ofdm/ofdm_loopback.grc @@ -1,6 +1,6 @@ <?xml version='1.0' encoding='ASCII'?> <flow_graph> - <timestamp>Wed Jun 5 11:54:15 2013</timestamp> + <timestamp>Tue Jun 11 22:44:34 2013</timestamp> <block> <key>options</key> <param> @@ -385,45 +385,6 @@ </param> </block> <block> - <key>blocks_tag_debug</key> - <param> - <key>id</key> - <value>blocks_tag_debug_0</value> - </param> - <param> - <key>_enabled</key> - <value>True</value> - </param> - <param> - <key>type</key> - <value>byte</value> - </param> - <param> - <key>name</key> - <value>Rx Packets</value> - </param> - <param> - <key>num_inputs</key> - <value>1</value> - </param> - <param> - <key>vlen</key> - <value>1</value> - </param> - <param> - <key>display</key> - <value>True</value> - </param> - <param> - <key>_coordinate</key> - <value>(333, 367)</value> - </param> - <param> - <key>_rotation</key> - <value>0</value> - </param> - </block> - <block> <key>digital_ofdm_rx</key> <param> <key>id</key> @@ -845,6 +806,45 @@ <value>180</value> </param> </block> + <block> + <key>blocks_tag_debug</key> + <param> + <key>id</key> + <value>blocks_tag_debug_0</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>type</key> + <value>byte</value> + </param> + <param> + <key>name</key> + <value>Rx Packets</value> + </param> + <param> + <key>num_inputs</key> + <value>1</value> + </param> + <param> + <key>vlen</key> + <value>1</value> + </param> + <param> + <key>display</key> + <value>True</value> + </param> + <param> + <key>_coordinate</key> + <value>(333, 367)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> <connection> <source_block_id>blocks_vector_source_x_0</source_block_id> <sink_block_id>blocks_vector_to_stream_0</sink_block_id> diff --git a/gr-digital/lib/header_payload_demux_impl.cc b/gr-digital/lib/header_payload_demux_impl.cc index 23992e7ce1..7cc3ec3836 100644 --- a/gr-digital/lib/header_payload_demux_impl.cc +++ b/gr-digital/lib/header_payload_demux_impl.cc @@ -28,7 +28,6 @@ #include <gnuradio/io_signature.h> #include "header_payload_demux_impl.h" - namespace gr { namespace digital { @@ -86,6 +85,7 @@ namespace gr { d_itemsize(itemsize), d_uses_trigger_tag(!trigger_tag_key.empty()), d_state(STATE_FIND_TRIGGER), + d_curr_payload_len(0), d_payload_tag_keys(0), d_payload_tag_values(0) { @@ -95,9 +95,13 @@ namespace gr { if (d_items_per_symbol < 1 || d_gi < 0 || d_itemsize < 1) { throw std::invalid_argument("Items and symbol sizes must be at least 1."); } - if (!d_output_symbols) { + if (d_output_symbols) { + set_relative_rate(1.0 / (d_items_per_symbol + d_gi)); + } else { + set_relative_rate((double)d_items_per_symbol / (d_items_per_symbol + d_gi)); set_output_multiple(d_items_per_symbol); } + set_tag_propagation_policy(TPP_DONT); message_port_register_in(msg_port_id); set_msg_handler(msg_port_id, boost::bind(&header_payload_demux_impl::parse_header_data_msg, this, _1)); } @@ -112,7 +116,8 @@ namespace gr { int n_items_reqd = 0; if (d_state == STATE_HEADER) { n_items_reqd = d_header_len * (d_items_per_symbol + d_gi); - //} else if (d_state == STATE_HEADER) { + } else if (d_state == STATE_PAYLOAD) { + n_items_reqd = d_curr_payload_len * (d_items_per_symbol + d_gi); } else { n_items_reqd = noutput_items * (d_items_per_symbol + d_gi); if (!d_output_symbols) { @@ -126,6 +131,36 @@ namespace gr { } } + + inline bool + header_payload_demux_impl::check_items_available( + int n_symbols, + gr_vector_int &ninput_items, + int noutput_items, + int nread + ) + { + // Check there's enough items on the input + if ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[0]-nread) + || (ninput_items.size() == 2 && ((n_symbols * (d_items_per_symbol + d_gi)) > (ninput_items[1]-nread)))) { + return false; + } + + // Check there's enough space on the output buffer + if (d_output_symbols) { + if (noutput_items < n_symbols) { + return false; + } + } else { + if (noutput_items < n_symbols * d_items_per_symbol) { + return false; + } + } + + return true; + } + + int header_payload_demux_impl::general_work (int noutput_items, gr_vector_int &ninput_items, @@ -137,89 +172,77 @@ namespace gr { unsigned char *out_payload = (unsigned char *) output_items[1]; int nread = 0; - bool exit_loop = false; - int produced_hdr = 0; - int produced_payload = 0; - - while ( - nread < noutput_items - && nread < ninput_items[0] - && (ninput_items.size() == 1 || nread < ninput_items[1]) - && !exit_loop - ) { - switch (d_state) { - case STATE_WAIT_FOR_MSG: - // In an ideal world, this would never be called - return 0; - - case STATE_HEADER_RX_FAIL: - // TODO: Consume one item from input when copy_symbols has been optimized - d_state = STATE_FIND_TRIGGER; + int trigger_offset = 0; - case STATE_FIND_TRIGGER: - // 1) Search for a trigger signal on input 1 (if present) - // 2) Search for a trigger tag, make sure it's the first one - // The first trigger to be found is used! - // 3) Make sure the right number of items is skipped - // 4) If trigger found, switch to STATE_HEADER - if (find_trigger_signal(nread, noutput_items, input_items)) { - d_remaining_symbols = d_header_len; - d_state = STATE_HEADER; - in += nread * d_itemsize; - } - break; + switch (d_state) { + case STATE_WAIT_FOR_MSG: + // In an ideal world, this would never be called + return 0; - case STATE_HEADER: - copy_symbol(in, out_header, 0, nread, produced_hdr); - if (d_remaining_symbols == 0) { - d_state = STATE_WAIT_FOR_MSG; - exit_loop = true; - } - break; + case STATE_HEADER_RX_FAIL: + consume_each (1); + in += d_itemsize; + nread++; + d_state = STATE_FIND_TRIGGER; - case STATE_HEADER_RX_SUCCESS: - // TODO: Consume the entire header from the input when copy_symbols has been optimized - for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { - add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]); - } - d_state = STATE_PAYLOAD; - - case STATE_PAYLOAD: - copy_symbol(in, out_payload, 1, nread, produced_payload); - if (d_remaining_symbols == 0) { - d_state = STATE_FIND_TRIGGER; - exit_loop = true; - } + case STATE_FIND_TRIGGER: + trigger_offset = find_trigger_signal(nread, noutput_items, input_items); + if (trigger_offset == -1) { + consume_each(noutput_items - nread); break; + } + consume_each (trigger_offset); + in += trigger_offset * d_itemsize; + d_state = STATE_HEADER; - default: - throw std::runtime_error("invalid state"); - } /* switch */ - } /* while(nread < noutput_items) */ - if (!d_output_symbols) { - produced_hdr *= d_items_per_symbol; - produced_payload *= d_items_per_symbol; - } - produce(0, produced_hdr); - produce(1, produced_payload); - consume_each (nread); + case STATE_HEADER: + if (check_items_available(d_header_len, ninput_items, noutput_items, nread)) { + copy_n_symbols(in, out_header, 0, d_header_len); + d_state = STATE_WAIT_FOR_MSG; + produce(0, d_header_len * (d_output_symbols ? 1 : d_items_per_symbol)); + } + break; + + case STATE_HEADER_RX_SUCCESS: + for (unsigned i = 0; i < d_payload_tag_keys.size(); i++) { + add_item_tag(1, nitems_written(1), d_payload_tag_keys[i], d_payload_tag_values[i]); + } + consume_each (d_header_len * (d_items_per_symbol + d_gi)); + in += d_header_len * (d_items_per_symbol + d_gi) * d_itemsize; + nread += d_header_len * (d_items_per_symbol + d_gi); + d_state = STATE_PAYLOAD; + + case STATE_PAYLOAD: + if (check_items_available(d_curr_payload_len, ninput_items, noutput_items, nread)) { + copy_n_symbols(in, out_payload, 1, d_curr_payload_len); + produce(1, d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + consume_each (d_curr_payload_len * (d_items_per_symbol + d_gi)); + d_state = STATE_FIND_TRIGGER; + set_min_noutput_items(d_output_symbols ? 1 : (d_items_per_symbol + d_gi)); + } + break; + + default: + throw std::runtime_error("invalid state"); + } /* switch */ + return WORK_CALLED_PRODUCE; } /* general_work() */ - bool + int header_payload_demux_impl::find_trigger_signal( - int &pos, + int nread, int noutput_items, gr_vector_const_void_star &input_items) { if (input_items.size() == 2) { unsigned char *in_trigger = (unsigned char *) input_items[1]; - for (int i = 0; i < noutput_items; i++) { + in_trigger += nread; + for (int i = 0; i < noutput_items-nread; i++) { if (in_trigger[i]) { - pos = i; - return true; + return i; } } } @@ -235,12 +258,10 @@ namespace gr { } } if (tag_index != -1) { - pos = min_offset - nitems_read(0); - return true; + return min_offset - nitems_read(0); } } - pos += noutput_items; - return false; + return -1; } @@ -252,7 +273,7 @@ namespace gr { d_state = STATE_HEADER_RX_FAIL; if (pmt::is_integer(header_data)) { - d_remaining_symbols = pmt::to_long(header_data); + d_curr_payload_len = pmt::to_long(header_data); d_payload_tag_keys.push_back(d_len_tag_key); d_payload_tag_values.push_back(header_data); d_state = STATE_HEADER_RX_SUCCESS; @@ -263,7 +284,7 @@ namespace gr { d_payload_tag_keys.push_back(pmt::car(this_item)); d_payload_tag_values.push_back(pmt::cdr(this_item)); if (pmt::equal(pmt::car(this_item), d_len_tag_key)) { - d_remaining_symbols = pmt::to_long(pmt::cdr(this_item)); + d_curr_payload_len = pmt::to_long(pmt::cdr(this_item)); d_state = STATE_HEADER_RX_SUCCESS; } dict_items = pmt::cdr(dict_items); @@ -276,53 +297,64 @@ namespace gr { } else { GR_LOG_ALERT(d_logger, boost::format("Received illegal header data (%1%)") % pmt::write_string(header_data)); } - if (d_state == STATE_HEADER_RX_SUCCESS - && (d_remaining_symbols * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1) - ) { - d_state = STATE_HEADER_RX_FAIL; - GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_remaining_symbols); + if (d_state == STATE_HEADER_RX_SUCCESS) + { + if ((d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)) > max_output_buffer(1)) { + d_state = STATE_HEADER_RX_FAIL; + GR_LOG_INFO(d_logger, boost::format("Detected a packet larger than max frame size (%1% symbols)") % d_curr_payload_len); + } else { + set_min_noutput_items(d_curr_payload_len * (d_output_symbols ? 1 : d_items_per_symbol)); + } } } - // This is a inefficient design: Can only copy one symbol at once. TODO fix. + void - header_payload_demux_impl::copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced) + header_payload_demux_impl::copy_n_symbols( + const unsigned char *in, + unsigned char *out, + int port, + int n_symbols + ) { - std::vector<tag_t> tags; - memcpy((void *) out, - (void *) (in + d_gi * d_itemsize), - d_itemsize * d_items_per_symbol - ); - // Tags on GI - get_tags_in_range(tags, 0, - nitems_read(0) + nread, - nitems_read(0) + nread + d_gi - ); - for (unsigned t = 0; t < tags.size(); t++) { - add_item_tag(port, - nitems_written(port)+nproduced, - tags[t].key, - tags[t].value + // Copy samples + if (d_gi) { + for (int i = 0; i < n_symbols; i++) { + memcpy((void *) out, (void *) (in + d_gi * d_itemsize), d_items_per_symbol * d_itemsize); + in += d_itemsize * (d_items_per_symbol + d_gi); + out += d_itemsize * d_items_per_symbol; + } + } else { + memcpy( + (void *) out, + (void *) in, + n_symbols * d_items_per_symbol * d_itemsize ); } - // Tags on symbol + // Copy tags + std::vector<tag_t> tags; get_tags_in_range( tags, 0, - nitems_read(port) + nread + d_gi, - nitems_read(port) + nread + d_gi + d_items_per_symbol + nitems_read(0), + nitems_read(0) + n_symbols * (d_items_per_symbol + d_gi) ); for (unsigned t = 0; t < tags.size(); t++) { - add_item_tag(0, - tags[t].offset - nitems_read(0)-nread + nitems_written(port)+nproduced, + int new_offset = tags[t].offset - nitems_read(0); + if (d_output_symbols) { + new_offset /= (d_items_per_symbol + d_gi); + } else if (d_gi) { + int pos_on_symbol = (new_offset % (d_items_per_symbol + d_gi)) - d_gi; + if (pos_on_symbol < 0) { + pos_on_symbol = 0; + } + new_offset = (new_offset / (d_items_per_symbol + d_gi)) + pos_on_symbol; + } + add_item_tag(port, + nitems_written(port) + new_offset, tags[t].key, tags[t].value ); } - in += d_itemsize * (d_items_per_symbol + d_gi); - out += d_items_per_symbol * d_itemsize; - nread += d_items_per_symbol + d_gi; - nproduced++; - d_remaining_symbols--; } } /* namespace digital */ diff --git a/gr-digital/lib/header_payload_demux_impl.h b/gr-digital/lib/header_payload_demux_impl.h index 9843977b17..61b5e10090 100644 --- a/gr-digital/lib/header_payload_demux_impl.h +++ b/gr-digital/lib/header_payload_demux_impl.h @@ -39,28 +39,36 @@ namespace gr { size_t d_itemsize; //!< Bytes per item bool d_uses_trigger_tag; //!< If a trigger tag is used int d_state; //!< Current read state - int d_remaining_symbols; //!< When in payload or header state, the number of symbols still to transmit + int d_curr_payload_len; //!< Length of the next payload (symbols) std::vector<pmt::pmt_t> d_payload_tag_keys; //!< Temporary buffer for PMTs that go on the payload (keys) std::vector<pmt::pmt_t> d_payload_tag_values; //!< Temporary buffer for PMTs that go on the payload (values) // Helpers to make the state machine more readable + //! Checks if there are enough items on the inputs and enough space on the output buffers to copy \p n_symbols symbols + inline bool check_items_available(int n_symbols, gr_vector_int &ninput_items, int noutput_items, int nread); + //! Message handler: Reads the result from the header demod and sets length tag (and other tags) void parse_header_data_msg(pmt::pmt_t header_data); //! Helper function that returns true if a trigger signal is detected. - // Searches input 1 (if active), then the tags. Sets \p pos to the position - // of the first tag. - bool find_trigger_signal( - int &pos, + // Searches input 1 (if active), then the tags. Returns the offset in the input buffer + // (or -1 if none is found) + int find_trigger_signal( + int nread, int noutput_items, gr_vector_const_void_star &input_items); - //! Helper function, copies one symbol from in to out and updates all pointers and counters - void copy_symbol(const unsigned char *&in, unsigned char *&out, int port, int &nread, int &nproduced); + //! Copies n symbols from in to out, makes sure tags are propagated properly + void copy_n_symbols( + const unsigned char *in, + unsigned char *out, + int port, + int n_symbols + ); - public: + public: header_payload_demux_impl( int header_len, int items_per_symbol, diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index e0ade4e5fa..89df6d645a 100755 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -36,15 +36,36 @@ class qa_header_payload_demux (gr_unittest.TestCase): """ Simplest possible test: put in zeros, then header, then payload, trigger signal, try to demux. The return signal from the header parser is faked via _post() + Add in some tags for fun. """ - n_zeros = 100 + n_zeros = 1 header = (1, 2, 3) - payload = tuple(range(17)) + payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload trigger_signal = [0,] * len(data_signal) trigger_signal[n_zeros] = 1 + # This is dropped: + testtag1 = gr.tag_t() + testtag1.offset = 0 + testtag1.key = pmt.string_to_symbol('tag1') + testtag1.value = pmt.from_long(0) + # This goes on output 0, item 0: + testtag2 = gr.tag_t() + testtag2.offset = n_zeros + testtag2.key = pmt.string_to_symbol('tag2') + testtag2.value = pmt.from_long(23) + # This goes on output 0, item 2: + testtag3 = gr.tag_t() + testtag3.offset = n_zeros + len(header) - 1 + testtag3.key = pmt.string_to_symbol('tag3') + testtag3.value = pmt.from_long(42) + # This goes on output 1, item 3: + testtag4 = gr.tag_t() + testtag4.offset = n_zeros + len(header) + 3 + testtag4.key = pmt.string_to_symbol('tag4') + testtag4.value = pmt.from_long(314) - data_src = blocks.vector_source_f(data_signal, False) + data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float @@ -70,7 +91,159 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) + ptags_header = [] + for tag in header_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_header = [ + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 2}, + ] + self.assertEqual(expected_tags_header, ptags_header) + ptags_payload = [] + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_payload = [ + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, + ] + self.assertEqual(expected_tags_payload, ptags_payload) + def test_002_symbols (self): + """ + Same as before, but operate on symbols + """ + n_zeros = 1 + items_per_symbol = 3 + gi = 1 + n_symbols = 4 + header = (1, 2, 3) + payload = (1, 2, 3) + data_signal = (0,) * n_zeros + (0,) + header + ((0,) + payload) * n_symbols + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + # This is dropped: + testtag1 = gr.tag_t() + testtag1.offset = 0 + testtag1.key = pmt.string_to_symbol('tag1') + testtag1.value = pmt.from_long(0) + # This goes on output 0, item 0 (from the GI) + testtag2 = gr.tag_t() + testtag2.offset = n_zeros + testtag2.key = pmt.string_to_symbol('tag2') + testtag2.value = pmt.from_long(23) + # This goes on output 0, item 0 (middle of the header symbol) + testtag3 = gr.tag_t() + testtag3.offset = n_zeros + gi + 1 + testtag3.key = pmt.string_to_symbol('tag3') + testtag3.value = pmt.from_long(42) + # This goes on output 1, item 1 (middle of the first payload symbol) + testtag4 = gr.tag_t() + testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1 + testtag4.key = pmt.string_to_symbol('tag4') + testtag4.value = pmt.from_long(314) + data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header) / items_per_symbol, # Header length (in symbols) + items_per_symbol, # Items per symbols + gi, # Items per guard time + "frame_len", # Frame length tag key + "detect", # Trigger tag key + True, # Output symbols (not items) + gr.sizeof_float # Bytes per item + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 1) + header_sink = blocks.vector_sink_f(items_per_symbol) + payload_sink = blocks.vector_sink_f(items_per_symbol) + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(n_symbols) + ) + while len(payload_sink.data()) < len(payload) * n_symbols: + time.sleep(.2) + self.tb.stop() + self.tb.wait() + self.assertEqual(header_sink.data(), header) + self.assertEqual(payload_sink.data(), payload * n_symbols) + ptags_header = [] + for tag in header_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_header = [ + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 0}, + ] + self.assertEqual(expected_tags_header, ptags_header) + ptags_payload = [] + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_payload = [ + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 1}, + ] + self.assertEqual(expected_tags_payload, ptags_payload) + + def test_003_t (self): + """ + Like test 1, but twice, plus one fail + """ + n_zeros = 5 + header = (1, 2, 3) + header_fail = (-1, -2, -4) # Contents don't really matter + payload1 = tuple(range(5, 20)) + payload2 = (42,) + data_signal = (0,) * n_zeros + header + payload1 + trigger_signal = [0,] * len(data_signal) * 2 + trigger_signal[n_zeros] = 1 + trigger_signal[len(data_signal)] = 1 + trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 + tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000 + data_src = blocks.vector_source_f(tx_signal, False) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 1) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(len(payload1)) + ) + while len(payload_sink.data()) < len(payload1): + time.sleep(.2) + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.PMT_F + ) + # This next command is a bit of a showstopper, but there's no condition to check upon + # to see if the previous msg handling is finished + time.sleep(.7) + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(len(payload2)) + ) + while len(payload_sink.data()) < len(payload1) + len(payload2): + time.sleep(.2) + self.tb.stop() + self.tb.wait() + self.assertEqual(header_sink.data(), header + header_fail + header) + self.assertEqual(payload_sink.data(), payload1 + payload2) if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") |