summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_header_payload_demux.py
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2014-01-25 11:37:44 +0100
committerMartin Braun <martin.braun@ettus.com>2014-01-30 16:49:42 +0100
commit3e054cb93ccd2eb55ff040f9bc72532ea5b0b954 (patch)
treee04316202295ec00524c746b853d6ace1ff1d9f3 /gr-digital/python/digital/qa_header_payload_demux.py
parent19d111e2448a58e20ff5c1c80ca69751376b2544 (diff)
digital: HPD now supports time- and other special tags, can mark rx-time of packets
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rwxr-xr-xgr-digital/python/digital/qa_header_payload_demux.py71
1 files changed, 67 insertions, 4 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py
index 89df6d645a..2bacf72f62 100755
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -64,8 +64,11 @@ class qa_header_payload_demux (gr_unittest.TestCase):
testtag4.offset = n_zeros + len(header) + 3
testtag4.key = pmt.string_to_symbol('tag4')
testtag4.value = pmt.from_long(314)
-
- data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4))
+ data_src = blocks.vector_source_f(
+ data_signal,
+ False,
+ tags=(testtag1, testtag2, testtag3, testtag4)
+ )
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
@@ -196,21 +199,54 @@ class qa_header_payload_demux (gr_unittest.TestCase):
"""
Like test 1, but twice, plus one fail
"""
+ ### Tx Data
n_zeros = 5
header = (1, 2, 3)
header_fail = (-1, -2, -4) # Contents don't really matter
payload1 = tuple(range(5, 20))
payload2 = (42,)
+ sampling_rate = 2
data_signal = (0,) * n_zeros + header + payload1
trigger_signal = [0,] * len(data_signal) * 2
trigger_signal[n_zeros] = 1
trigger_signal[len(data_signal)] = 1
trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1
tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000
- data_src = blocks.vector_source_f(tx_signal, False)
+ # Timing tag: This is preserved and updated:
+ timing_tag = gr.tag_t()
+ timing_tag.offset = 0
+ timing_tag.key = pmt.string_to_symbol('rx_time')
+ timing_tag.value = pmt.to_pmt((0, 0))
+ # Rx freq tags:
+ rx_freq_tag1 = gr.tag_t()
+ rx_freq_tag1.offset = 0
+ rx_freq_tag1.key = pmt.string_to_symbol('rx_freq')
+ rx_freq_tag1.value = pmt.from_double(1.0)
+ rx_freq_tag2 = gr.tag_t()
+ rx_freq_tag2.offset = 29
+ rx_freq_tag2.key = pmt.string_to_symbol('rx_freq')
+ rx_freq_tag2.value = pmt.from_double(1.5)
+ rx_freq_tag3 = gr.tag_t()
+ rx_freq_tag3.offset = 30
+ rx_freq_tag3.key = pmt.string_to_symbol('rx_freq')
+ rx_freq_tag3.value = pmt.from_double(2.0)
+ ### Flow graph
+ data_src = blocks.vector_source_f(
+ tx_signal, False,
+ tags=(timing_tag, rx_freq_tag1, rx_freq_tag2, rx_freq_tag3)
+ )
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
- len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
+ header_len=len(header),
+ items_per_symbol=1,
+ guard_interval=0,
+ length_tag_key="frame_len",
+ trigger_tag_key="detect",
+ output_symbols=False,
+ itemsize=gr.sizeof_float,
+ timing_tag_key='rx_time',
+ samp_rate=sampling_rate,
+ special_tags=('rx_freq',),
)
self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
header_sink = blocks.vector_sink_f()
@@ -242,8 +278,35 @@ class qa_header_payload_demux (gr_unittest.TestCase):
time.sleep(.2)
self.tb.stop()
self.tb.wait()
+ # Signal description:
+ # 0: 5 zeros
+ # 5: header 1
+ # 8: payload 1 (length: 15)
+ # 23: header 2 (fail)
+ # 26: 5 zeros
+ # 31: header 3
+ # 34: payload 2 (length 1)
+ # 35: 1000 zeros
self.assertEqual(header_sink.data(), header + header_fail + header)
self.assertEqual(payload_sink.data(), payload1 + payload2)
+ tags_payload = [gr.tag_to_python(x) for x in payload_sink.tags()]
+ tags_payload = sorted([(x.offset, x.key, x.value) for x in tags_payload])
+ tags_expected_payload = [
+ (0, 'frame_len', len(payload1)),
+ (len(payload1), 'frame_len', len(payload2)),
+ ]
+ tags_header = [gr.tag_to_python(x) for x in header_sink.tags()]
+ tags_header = sorted([(x.offset, x.key, x.value) for x in tags_header])
+ tags_expected_header = [
+ (0, 'rx_freq', 1.0),
+ (0, 'rx_time', (2, 0.5)), # Hard coded time value :( Is n_zeros/sampling_rate
+ (len(header), 'rx_freq', 1.0),
+ (len(header), 'rx_time', (11, .5)), # Hard coded time value :(. See above.
+ (2*len(header), 'rx_freq', 2.0),
+ (2*len(header), 'rx_time', (15, .5)), # Hard coded time value :(. See above.
+ ]
+ self.assertEqual(tags_header, tags_expected_header)
+ self.assertEqual(tags_payload, tags_expected_payload)
if __name__ == '__main__':
gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml")