summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_header_payload_demux.py
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@kit.edu>2013-06-11 19:28:13 +0200
committerJohnathan Corgan <johnathan@corganlabs.com>2013-06-14 06:38:40 -0700
commitb024982081ee4384e95d8a8958900de93c5fd064 (patch)
treec77af57c0222daea2b5d43d0f98d4eca9b89e3fb /gr-digital/python/digital/qa_header_payload_demux.py
parent8ba7be2a1b0027747d256638c50a4c7b09677ea9 (diff)
digital: HPD: fixed tag propagation, minimized calls to work()
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rwxr-xr-xgr-digital/python/digital/qa_header_payload_demux.py179
1 files changed, 176 insertions, 3 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py
index e0ade4e5fa..89df6d645a 100755
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -36,15 +36,36 @@ class qa_header_payload_demux (gr_unittest.TestCase):
""" Simplest possible test: put in zeros, then header,
then payload, trigger signal, try to demux.
The return signal from the header parser is faked via _post()
+ Add in some tags for fun.
"""
- n_zeros = 100
+ n_zeros = 1
header = (1, 2, 3)
- payload = tuple(range(17))
+ payload = tuple(range(5, 20))
data_signal = (0,) * n_zeros + header + payload
trigger_signal = [0,] * len(data_signal)
trigger_signal[n_zeros] = 1
+ # This is dropped:
+ testtag1 = gr.tag_t()
+ testtag1.offset = 0
+ testtag1.key = pmt.string_to_symbol('tag1')
+ testtag1.value = pmt.from_long(0)
+ # This goes on output 0, item 0:
+ testtag2 = gr.tag_t()
+ testtag2.offset = n_zeros
+ testtag2.key = pmt.string_to_symbol('tag2')
+ testtag2.value = pmt.from_long(23)
+ # This goes on output 0, item 2:
+ testtag3 = gr.tag_t()
+ testtag3.offset = n_zeros + len(header) - 1
+ testtag3.key = pmt.string_to_symbol('tag3')
+ testtag3.value = pmt.from_long(42)
+ # This goes on output 1, item 3:
+ testtag4 = gr.tag_t()
+ testtag4.offset = n_zeros + len(header) + 3
+ testtag4.key = pmt.string_to_symbol('tag4')
+ testtag4.value = pmt.from_long(314)
- data_src = blocks.vector_source_f(data_signal, False)
+ data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4))
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
@@ -70,7 +91,159 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload)
+ ptags_header = []
+ for tag in header_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_header = [
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 2},
+ ]
+ self.assertEqual(expected_tags_header, ptags_header)
+ ptags_payload = []
+ for tag in payload_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_payload = [
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 3},
+ ]
+ self.assertEqual(expected_tags_payload, ptags_payload)
+ def test_002_symbols (self):
+ """
+ Same as before, but operate on symbols
+ """
+ n_zeros = 1
+ items_per_symbol = 3
+ gi = 1
+ n_symbols = 4
+ header = (1, 2, 3)
+ payload = (1, 2, 3)
+ data_signal = (0,) * n_zeros + (0,) + header + ((0,) + payload) * n_symbols
+ trigger_signal = [0,] * len(data_signal)
+ trigger_signal[n_zeros] = 1
+ # This is dropped:
+ testtag1 = gr.tag_t()
+ testtag1.offset = 0
+ testtag1.key = pmt.string_to_symbol('tag1')
+ testtag1.value = pmt.from_long(0)
+ # This goes on output 0, item 0 (from the GI)
+ testtag2 = gr.tag_t()
+ testtag2.offset = n_zeros
+ testtag2.key = pmt.string_to_symbol('tag2')
+ testtag2.value = pmt.from_long(23)
+ # This goes on output 0, item 0 (middle of the header symbol)
+ testtag3 = gr.tag_t()
+ testtag3.offset = n_zeros + gi + 1
+ testtag3.key = pmt.string_to_symbol('tag3')
+ testtag3.value = pmt.from_long(42)
+ # This goes on output 1, item 1 (middle of the first payload symbol)
+ testtag4 = gr.tag_t()
+ testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1
+ testtag4.key = pmt.string_to_symbol('tag4')
+ testtag4.value = pmt.from_long(314)
+ data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4))
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header) / items_per_symbol, # Header length (in symbols)
+ items_per_symbol, # Items per symbols
+ gi, # Items per guard time
+ "frame_len", # Frame length tag key
+ "detect", # Trigger tag key
+ True, # Output symbols (not items)
+ gr.sizeof_float # Bytes per item
+ )
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
+ header_sink = blocks.vector_sink_f(items_per_symbol)
+ payload_sink = blocks.vector_sink_f(items_per_symbol)
+ self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), header_sink)
+ self.tb.connect((hpd, 1), payload_sink)
+ self.tb.start()
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(n_symbols)
+ )
+ while len(payload_sink.data()) < len(payload) * n_symbols:
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(header_sink.data(), header)
+ self.assertEqual(payload_sink.data(), payload * n_symbols)
+ ptags_header = []
+ for tag in header_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_header = [
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 0},
+ ]
+ self.assertEqual(expected_tags_header, ptags_header)
+ ptags_payload = []
+ for tag in payload_sink.tags():
+ ptag = gr.tag_to_python(tag)
+ ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
+ expected_tags_payload = [
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 1},
+ ]
+ self.assertEqual(expected_tags_payload, ptags_payload)
+
+ def test_003_t (self):
+ """
+ Like test 1, but twice, plus one fail
+ """
+ n_zeros = 5
+ header = (1, 2, 3)
+ header_fail = (-1, -2, -4) # Contents don't really matter
+ payload1 = tuple(range(5, 20))
+ payload2 = (42,)
+ data_signal = (0,) * n_zeros + header + payload1
+ trigger_signal = [0,] * len(data_signal) * 2
+ trigger_signal[n_zeros] = 1
+ trigger_signal[len(data_signal)] = 1
+ trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1
+ tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000
+ data_src = blocks.vector_source_f(tx_signal, False)
+ trigger_src = blocks.vector_source_b(trigger_signal, False)
+ hpd = digital.header_payload_demux(
+ len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
+ )
+ self.assertEqual(pmt.length(hpd.message_ports_in()), 1)
+ header_sink = blocks.vector_sink_f()
+ payload_sink = blocks.vector_sink_f()
+ self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(trigger_src, (hpd, 1))
+ self.tb.connect((hpd, 0), header_sink)
+ self.tb.connect((hpd, 1), payload_sink)
+ self.tb.start()
+ time.sleep(.2) # Need this, otherwise, the next message is ignored
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload1))
+ )
+ while len(payload_sink.data()) < len(payload1):
+ time.sleep(.2)
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.PMT_F
+ )
+ # This next command is a bit of a showstopper, but there's no condition to check upon
+ # to see if the previous msg handling is finished
+ time.sleep(.7)
+ hpd.to_basic_block()._post(
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload2))
+ )
+ while len(payload_sink.data()) < len(payload1) + len(payload2):
+ time.sleep(.2)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(header_sink.data(), header + header_fail + header)
+ self.assertEqual(payload_sink.data(), payload1 + payload2)
if __name__ == '__main__':
gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml")