diff options
author | Martin Braun <martin.braun@kit.edu> | 2013-06-11 19:28:13 +0200 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2013-06-14 06:38:40 -0700 |
commit | b024982081ee4384e95d8a8958900de93c5fd064 (patch) | |
tree | c77af57c0222daea2b5d43d0f98d4eca9b89e3fb /gr-digital/python/digital/qa_header_payload_demux.py | |
parent | 8ba7be2a1b0027747d256638c50a4c7b09677ea9 (diff) |
digital: HPD: fixed tag propagation, minimized calls to work()
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rwxr-xr-x | gr-digital/python/digital/qa_header_payload_demux.py | 179 |
1 files changed, 176 insertions, 3 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index e0ade4e5fa..89df6d645a 100755 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -36,15 +36,36 @@ class qa_header_payload_demux (gr_unittest.TestCase): """ Simplest possible test: put in zeros, then header, then payload, trigger signal, try to demux. The return signal from the header parser is faked via _post() + Add in some tags for fun. """ - n_zeros = 100 + n_zeros = 1 header = (1, 2, 3) - payload = tuple(range(17)) + payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload trigger_signal = [0,] * len(data_signal) trigger_signal[n_zeros] = 1 + # This is dropped: + testtag1 = gr.tag_t() + testtag1.offset = 0 + testtag1.key = pmt.string_to_symbol('tag1') + testtag1.value = pmt.from_long(0) + # This goes on output 0, item 0: + testtag2 = gr.tag_t() + testtag2.offset = n_zeros + testtag2.key = pmt.string_to_symbol('tag2') + testtag2.value = pmt.from_long(23) + # This goes on output 0, item 2: + testtag3 = gr.tag_t() + testtag3.offset = n_zeros + len(header) - 1 + testtag3.key = pmt.string_to_symbol('tag3') + testtag3.value = pmt.from_long(42) + # This goes on output 1, item 3: + testtag4 = gr.tag_t() + testtag4.offset = n_zeros + len(header) + 3 + testtag4.key = pmt.string_to_symbol('tag4') + testtag4.value = pmt.from_long(314) - data_src = blocks.vector_source_f(data_signal, False) + data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float @@ -70,7 +91,159 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) + ptags_header = [] + for tag in header_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_header = [ + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 2}, + ] + self.assertEqual(expected_tags_header, ptags_header) + ptags_payload = [] + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_payload = [ + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, + ] + self.assertEqual(expected_tags_payload, ptags_payload) + def test_002_symbols (self): + """ + Same as before, but operate on symbols + """ + n_zeros = 1 + items_per_symbol = 3 + gi = 1 + n_symbols = 4 + header = (1, 2, 3) + payload = (1, 2, 3) + data_signal = (0,) * n_zeros + (0,) + header + ((0,) + payload) * n_symbols + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + # This is dropped: + testtag1 = gr.tag_t() + testtag1.offset = 0 + testtag1.key = pmt.string_to_symbol('tag1') + testtag1.value = pmt.from_long(0) + # This goes on output 0, item 0 (from the GI) + testtag2 = gr.tag_t() + testtag2.offset = n_zeros + testtag2.key = pmt.string_to_symbol('tag2') + testtag2.value = pmt.from_long(23) + # This goes on output 0, item 0 (middle of the header symbol) + testtag3 = gr.tag_t() + testtag3.offset = n_zeros + gi + 1 + testtag3.key = pmt.string_to_symbol('tag3') + testtag3.value = pmt.from_long(42) + # This goes on output 1, item 1 (middle of the first payload symbol) + testtag4 = gr.tag_t() + testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1 + testtag4.key = pmt.string_to_symbol('tag4') + testtag4.value = pmt.from_long(314) + data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header) / items_per_symbol, # Header length (in symbols) + items_per_symbol, # Items per symbols + gi, # Items per guard time + "frame_len", # Frame length tag key + "detect", # Trigger tag key + True, # Output symbols (not items) + gr.sizeof_float # Bytes per item + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 1) + header_sink = blocks.vector_sink_f(items_per_symbol) + payload_sink = blocks.vector_sink_f(items_per_symbol) + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(n_symbols) + ) + while len(payload_sink.data()) < len(payload) * n_symbols: + time.sleep(.2) + self.tb.stop() + self.tb.wait() + self.assertEqual(header_sink.data(), header) + self.assertEqual(payload_sink.data(), payload * n_symbols) + ptags_header = [] + for tag in header_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_header = [ + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 0}, + ] + self.assertEqual(expected_tags_header, ptags_header) + ptags_payload = [] + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_payload = [ + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 1}, + ] + self.assertEqual(expected_tags_payload, ptags_payload) + + def test_003_t (self): + """ + Like test 1, but twice, plus one fail + """ + n_zeros = 5 + header = (1, 2, 3) + header_fail = (-1, -2, -4) # Contents don't really matter + payload1 = tuple(range(5, 20)) + payload2 = (42,) + data_signal = (0,) * n_zeros + header + payload1 + trigger_signal = [0,] * len(data_signal) * 2 + trigger_signal[n_zeros] = 1 + trigger_signal[len(data_signal)] = 1 + trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 + tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000 + data_src = blocks.vector_source_f(tx_signal, False) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 1) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(len(payload1)) + ) + while len(payload_sink.data()) < len(payload1): + time.sleep(.2) + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.PMT_F + ) + # This next command is a bit of a showstopper, but there's no condition to check upon + # to see if the previous msg handling is finished + time.sleep(.7) + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.from_long(len(payload2)) + ) + while len(payload_sink.data()) < len(payload1) + len(payload2): + time.sleep(.2) + self.tb.stop() + self.tb.wait() + self.assertEqual(header_sink.data(), header + header_fail + header) + self.assertEqual(payload_sink.data(), payload1 + payload2) if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") |