diff options
author | Martin Braun <martin.braun@ettus.com> | 2014-01-25 11:37:44 +0100 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2014-01-30 16:49:42 +0100 |
commit | 3e054cb93ccd2eb55ff040f9bc72532ea5b0b954 (patch) | |
tree | e04316202295ec00524c746b853d6ace1ff1d9f3 /gr-digital/python/digital/qa_header_payload_demux.py | |
parent | 19d111e2448a58e20ff5c1c80ca69751376b2544 (diff) |
digital: HPD now supports time- and other special tags, can mark rx-time of packets
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rwxr-xr-x | gr-digital/python/digital/qa_header_payload_demux.py | 71 |
1 files changed, 67 insertions, 4 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index 89df6d645a..2bacf72f62 100755 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -64,8 +64,11 @@ class qa_header_payload_demux (gr_unittest.TestCase): testtag4.offset = n_zeros + len(header) + 3 testtag4.key = pmt.string_to_symbol('tag4') testtag4.value = pmt.from_long(314) - - data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) + data_src = blocks.vector_source_f( + data_signal, + False, + tags=(testtag1, testtag2, testtag3, testtag4) + ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float @@ -196,21 +199,54 @@ class qa_header_payload_demux (gr_unittest.TestCase): """ Like test 1, but twice, plus one fail """ + ### Tx Data n_zeros = 5 header = (1, 2, 3) header_fail = (-1, -2, -4) # Contents don't really matter payload1 = tuple(range(5, 20)) payload2 = (42,) + sampling_rate = 2 data_signal = (0,) * n_zeros + header + payload1 trigger_signal = [0,] * len(data_signal) * 2 trigger_signal[n_zeros] = 1 trigger_signal[len(data_signal)] = 1 trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000 - data_src = blocks.vector_source_f(tx_signal, False) + # Timing tag: This is preserved and updated: + timing_tag = gr.tag_t() + timing_tag.offset = 0 + timing_tag.key = pmt.string_to_symbol('rx_time') + timing_tag.value = pmt.to_pmt((0, 0)) + # Rx freq tags: + rx_freq_tag1 = gr.tag_t() + rx_freq_tag1.offset = 0 + rx_freq_tag1.key = pmt.string_to_symbol('rx_freq') + rx_freq_tag1.value = pmt.from_double(1.0) + rx_freq_tag2 = gr.tag_t() + rx_freq_tag2.offset = 29 + rx_freq_tag2.key = pmt.string_to_symbol('rx_freq') + rx_freq_tag2.value = pmt.from_double(1.5) + rx_freq_tag3 = gr.tag_t() + rx_freq_tag3.offset = 30 + rx_freq_tag3.key = pmt.string_to_symbol('rx_freq') + rx_freq_tag3.value = pmt.from_double(2.0) + ### Flow graph + data_src = blocks.vector_source_f( + tx_signal, False, + tags=(timing_tag, rx_freq_tag1, rx_freq_tag2, rx_freq_tag3) + ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( - len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float + header_len=len(header), + items_per_symbol=1, + guard_interval=0, + length_tag_key="frame_len", + trigger_tag_key="detect", + output_symbols=False, + itemsize=gr.sizeof_float, + timing_tag_key='rx_time', + samp_rate=sampling_rate, + special_tags=('rx_freq',), ) self.assertEqual(pmt.length(hpd.message_ports_in()), 1) header_sink = blocks.vector_sink_f() @@ -242,8 +278,35 @@ class qa_header_payload_demux (gr_unittest.TestCase): time.sleep(.2) self.tb.stop() self.tb.wait() + # Signal description: + # 0: 5 zeros + # 5: header 1 + # 8: payload 1 (length: 15) + # 23: header 2 (fail) + # 26: 5 zeros + # 31: header 3 + # 34: payload 2 (length 1) + # 35: 1000 zeros self.assertEqual(header_sink.data(), header + header_fail + header) self.assertEqual(payload_sink.data(), payload1 + payload2) + tags_payload = [gr.tag_to_python(x) for x in payload_sink.tags()] + tags_payload = sorted([(x.offset, x.key, x.value) for x in tags_payload]) + tags_expected_payload = [ + (0, 'frame_len', len(payload1)), + (len(payload1), 'frame_len', len(payload2)), + ] + tags_header = [gr.tag_to_python(x) for x in header_sink.tags()] + tags_header = sorted([(x.offset, x.key, x.value) for x in tags_header]) + tags_expected_header = [ + (0, 'rx_freq', 1.0), + (0, 'rx_time', (2, 0.5)), # Hard coded time value :( Is n_zeros/sampling_rate + (len(header), 'rx_freq', 1.0), + (len(header), 'rx_time', (11, .5)), # Hard coded time value :(. See above. + (2*len(header), 'rx_freq', 2.0), + (2*len(header), 'rx_time', (15, .5)), # Hard coded time value :(. See above. + ] + self.assertEqual(tags_header, tags_expected_header) + self.assertEqual(tags_payload, tags_expected_payload) if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") |