diff options
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rwxr-xr-x | gr-digital/python/digital/qa_header_payload_demux.py | 451 |
1 files changed, 341 insertions, 110 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index 8006d4442e..f36d71067c 100755 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -1,29 +1,69 @@ #!/usr/bin/env python -# Copyright 2012 Free Software Foundation, Inc. -# +# Copyright 2012-2016 Free Software Foundation, Inc. +# # This file is part of GNU Radio -# +# # GNU Radio is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. -# +# # GNU Radio is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. -# +# +from __future__ import print_function import time - -from gnuradio import gr, gr_unittest, digital, blocks +import random +import numpy +from gnuradio import gr +from gnuradio import gr_unittest +from gnuradio import digital +from gnuradio import blocks import pmt +def make_tag(key, value, offset): + tag = gr.tag_t() + tag.offset = offset + tag.key = pmt.string_to_symbol(key) + tag.value = pmt.to_pmt(value) + return tag + + +class HeaderToMessageBlock(gr.sync_block): + """ + Helps with testing the HPD. Receives a header, stores it, posts + a predetermined message. + """ + def __init__(self, itemsize, header_len, messages, header_is_symbol=False): + gr.sync_block.__init__( + self, + name="HeaderToMessageBlock", + in_sig=[itemsize], + out_sig=[itemsize], + ) + self.header_len = header_len + self.message_port_register_out(pmt.intern('header_data')) + self.messages = messages + self.msg_count = 0 + + def work(self, input_items, output_items): + for i in xrange(len(input_items[0])/self.header_len): + msg = self.messages[self.msg_count] or False + #print("Sending message: {0}".format(msg)) + self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg)) + self.msg_count += 1 + output_items[0][:] = input_items[0][:] + return len(input_items[0]) + + class qa_header_payload_demux (gr_unittest.TestCase): def setUp (self): @@ -32,6 +72,36 @@ class qa_header_payload_demux (gr_unittest.TestCase): def tearDown (self): self.tb = None + def connect_all_blocks(self, + data_src, trigger_src, + hpd, + mock_header_demod, + payload_sink, header_sink + ): + """ + Connect the standard HPD test flowgraph + """ + self.tb.connect(data_src, (hpd, 0)) + if trigger_src is not None: + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), mock_header_demod) + self.tb.connect(mock_header_demod, header_sink) + self.tb.msg_connect( + mock_header_demod, 'header_data', + hpd, 'header_data' + ) + self.tb.connect((hpd, 1), payload_sink) + + def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30): + stop_time = time.time() + timeout + self.tb.start() + while len(payload_sink.data()) < payload_len and \ + len(header_sink.data()) < header_len and \ + time.time() < stop_time: + time.sleep(.2) + self.tb.stop() + self.tb.wait() + def test_001_t (self): """ Simplest possible test: put in zeros, then header, then payload, trigger signal, try to demux. @@ -45,25 +115,13 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_signal = [0,] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: - testtag1 = gr.tag_t() - testtag1.offset = 0 - testtag1.key = pmt.string_to_symbol('tag1') - testtag1.value = pmt.from_long(0) + testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: - testtag2 = gr.tag_t() - testtag2.offset = n_zeros - testtag2.key = pmt.string_to_symbol('tag2') - testtag2.value = pmt.from_long(23) + testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: - testtag3 = gr.tag_t() - testtag3.offset = n_zeros + len(header) - 1 - testtag3.key = pmt.string_to_symbol('tag3') - testtag3.value = pmt.from_long(42) + testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: - testtag4 = gr.tag_t() - testtag4.offset = n_zeros + len(header) + 3 - testtag4.key = pmt.string_to_symbol('tag4') - testtag4.value = pmt.from_long(314) + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, @@ -73,26 +131,17 @@ class qa_header_payload_demux (gr_unittest.TestCase): hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float ) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + len(header), + [len(payload)] + ) self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you - header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - - self.tb.connect(data_src, (hpd, 0)) - self.tb.connect(trigger_src, (hpd, 1)) - self.tb.connect((hpd, 0), header_sink) - self.tb.connect((hpd, 1), payload_sink) - self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored - hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(len(payload)) - ) - while len(payload_sink.data()) < len(payload): - time.sleep(.2) - self.tb.stop() - self.tb.wait() - - self.assertEqual(header_sink.data(), header) + header_sink = blocks.vector_sink_f() + self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header)) + self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) ptags_header = [] for tag in header_sink.tags(): @@ -122,30 +171,15 @@ class qa_header_payload_demux (gr_unittest.TestCase): payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload # Trigger tag - trigger_tag = gr.tag_t() - trigger_tag.offset = n_zeros - trigger_tag.key = pmt.string_to_symbol('detect') - trigger_tag.value = pmt.PMT_T + trigger_tag = make_tag('detect', True, n_zeros) # This is dropped: - testtag1 = gr.tag_t() - testtag1.offset = 0 - testtag1.key = pmt.string_to_symbol('tag1') - testtag1.value = pmt.from_long(0) + testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0: - testtag2 = gr.tag_t() - testtag2.offset = n_zeros - testtag2.key = pmt.string_to_symbol('tag2') - testtag2.value = pmt.from_long(23) + testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 2: - testtag3 = gr.tag_t() - testtag3.offset = n_zeros + len(header) - 1 - testtag3.key = pmt.string_to_symbol('tag3') - testtag3.value = pmt.from_long(42) + testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) # This goes on output 1, item 3: - testtag4 = gr.tag_t() - testtag4.offset = n_zeros + len(header) + 3 - testtag4.key = pmt.string_to_symbol('tag4') - testtag4.value = pmt.from_long(314) + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( data_signal, False, @@ -157,21 +191,14 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - - self.tb.connect(data_src, (hpd, 0)) - self.tb.connect((hpd, 0), header_sink) - self.tb.connect((hpd, 1), payload_sink) - self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored - hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(len(payload)) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + len(header), + [len(payload)] ) - while len(payload_sink.data()) < len(payload): - time.sleep(.2) - self.tb.stop() - self.tb.wait() - + self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header)) + # Check results self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) ptags_header = [] @@ -193,8 +220,143 @@ class qa_header_payload_demux (gr_unittest.TestCase): ] self.assertEqual(expected_tags_payload, ptags_payload) + def test_001_headerpadding (self): + """ Like test 1, but with header padding. """ + n_zeros = 3 + header = (1, 2, 3) + header_padding = 1 + payload = tuple(range(5, 20)) + data_signal = (0,) * n_zeros + header + payload + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + # This is dropped: + testtag1 = make_tag('tag1', 0, 0) + # This goes on output 0, item 0: + testtag2 = make_tag('tag2', 23, n_zeros) + # This goes on output 0, item 2: + testtag3 = make_tag('tag3', 42, n_zeros + len(header) - 1) + # This goes on output 1, item 3: + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) + data_src = blocks.vector_source_f( + data_signal, + False, + tags=(testtag1, testtag2, testtag3, testtag4) + ) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), + 1, # Items per symbol + 0, # Guard interval + "frame_len", # TSB tag key + "detect", # Trigger tag key + False, # No symbols please + gr.sizeof_float, # Item size + "", # Timing tag key + 1.0, # Samp rate + (), # No special tags + header_padding + ) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + len(header), + [len(payload)] + ) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header)+2) + # Check values + # Header now is padded: + self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) + self.assertEqual(payload_sink.data(), payload) + ptags_header = [] + for tag in header_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_header = [ + {'key': 'tag2', 'offset': 1}, + {'key': 'tag3', 'offset': 3}, + ] + self.assertEqual(expected_tags_header, ptags_header) + ptags_payload = [] + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) + expected_tags_payload = [ + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, + ] + self.assertEqual(expected_tags_payload, ptags_payload) + + def test_001_headerpadding_payload_offset (self): + """ Like test 1, but with header padding + payload offset. """ + n_zeros = 3 + header = (1, 2, 3) + header_padding = 1 + payload_offset = -1 + payload = tuple(range(5, 20)) + data_signal = (0,) * n_zeros + header + payload + (0,) * 100 + trigger_signal = [0,] * len(data_signal) + trigger_signal[n_zeros] = 1 + # This goes on output 1, item 3 + 1 (for payload offset) + testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) + data_src = blocks.vector_source_f( + data_signal, + False, + tags=(testtag4,) + ) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + len(header), + 1, # Items per symbol + 0, # Guard interval + "frame_len", # TSB tag key + "detect", # Trigger tag key + False, # No symbols please + gr.sizeof_float, # Item size + "", # Timing tag key + 1.0, # Samp rate + (), # No special tags + header_padding + ) + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(trigger_src, (hpd, 1)) + self.tb.connect((hpd, 0), header_sink) + self.tb.connect((hpd, 1), payload_sink) + self.tb.start() + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post( + pmt.intern('header_data'), + pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset}) + ) + while len(payload_sink.data()) < len(payload): + time.sleep(.2) + self.tb.stop() + self.tb.wait() + # Header is now padded: + self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) + # Payload is now offset: + self.assertEqual( + payload_sink.data(), + data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)] + ) + ptags_payload = {} + for tag in payload_sink.tags(): + ptag = gr.tag_to_python(tag) + ptags_payload[ptag.key] = ptag.offset + expected_tags_payload = { + 'frame_len': 0, + 'payload_offset': 0, + 'tag4': 3 - payload_offset, + } + self.assertEqual(expected_tags_payload, ptags_payload) + + def test_002_symbols (self): - """ + """ Same as before, but operate on symbols """ n_zeros = 1 @@ -207,25 +369,13 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_signal = [0,] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: - testtag1 = gr.tag_t() - testtag1.offset = 0 - testtag1.key = pmt.string_to_symbol('tag1') - testtag1.value = pmt.from_long(0) + testtag1 = make_tag('tag1', 0, 0) # This goes on output 0, item 0 (from the GI) - testtag2 = gr.tag_t() - testtag2.offset = n_zeros - testtag2.key = pmt.string_to_symbol('tag2') - testtag2.value = pmt.from_long(23) + testtag2 = make_tag('tag2', 23, n_zeros) # This goes on output 0, item 0 (middle of the header symbol) - testtag3 = gr.tag_t() - testtag3.offset = n_zeros + gi + 1 - testtag3.key = pmt.string_to_symbol('tag3') - testtag3.value = pmt.from_long(42) + testtag3 = make_tag('tag3', 42, n_zeros + gi + 1) # This goes on output 1, item 1 (middle of the first payload symbol) - testtag4 = gr.tag_t() - testtag4.offset = n_zeros + (gi + items_per_symbol) * 2 + 1 - testtag4.key = pmt.string_to_symbol('tag4') - testtag4.value = pmt.from_long(314) + testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2 + 1) data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( @@ -291,25 +441,20 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_signal[n_zeros] = 1 trigger_signal[len(data_signal)] = 1 trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 - tx_signal = data_signal + header_fail + (0,) * n_zeros + header + payload2 + (0,) * 1000 + print("Triggers at: {0} {1} {2}".format( + n_zeros, + len(data_signal), + len(data_signal)+len(header_fail)+n_zeros) + ) + tx_signal = data_signal + \ + header_fail + (0,) * n_zeros + \ + header + payload2 + (0,) * 1000 # Timing tag: This is preserved and updated: - timing_tag = gr.tag_t() - timing_tag.offset = 0 - timing_tag.key = pmt.string_to_symbol('rx_time') - timing_tag.value = pmt.to_pmt((0, 0)) + timing_tag = make_tag('rx_time', (0, 0), 0) # Rx freq tags: - rx_freq_tag1 = gr.tag_t() - rx_freq_tag1.offset = 0 - rx_freq_tag1.key = pmt.string_to_symbol('rx_freq') - rx_freq_tag1.value = pmt.from_double(1.0) - rx_freq_tag2 = gr.tag_t() - rx_freq_tag2.offset = 29 - rx_freq_tag2.key = pmt.string_to_symbol('rx_freq') - rx_freq_tag2.value = pmt.from_double(1.5) - rx_freq_tag3 = gr.tag_t() - rx_freq_tag3.offset = 30 - rx_freq_tag3.key = pmt.string_to_symbol('rx_freq') - rx_freq_tag3.value = pmt.from_double(2.0) + rx_freq_tag1 = make_tag('rx_freq', 1.0, 0) + rx_freq_tag2 = make_tag('rx_freq', 1.5, 29) + rx_freq_tag3 = make_tag('rx_freq', 2.0, 30) ### Flow graph data_src = blocks.vector_source_f( tx_signal, False, @@ -388,6 +533,92 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(tags_header, tags_expected_header) self.assertEqual(tags_payload, tags_expected_payload) + def test_004_fuzz(self): + """ + Long random test + """ + def create_signal( + n_bursts, + header_len, + max_gap, + max_burstsize, + fail_rate, + ): + signal = [] + indexes = [] + burst_sizes = [] + total_payload_len = 0 + for burst_count in xrange(n_bursts): + gap_size = random.randint(0, max_gap) + signal += [0] * gap_size + is_failure = random.random() < fail_rate + if not is_failure: + burst_size = random.randint(0, max_burstsize) + else: + burst_size = 0 + total_payload_len += burst_size + indexes += [len(signal)] + signal += [1] * header_len + signal += [2] * burst_size + burst_sizes += [burst_size] + return (signal, indexes, total_payload_len, burst_sizes) + def indexes_to_triggers(indexes, signal_len): + """ + Convert indexes to a mix of trigger signals and tags + """ + trigger_signal = [0] * signal_len + trigger_tags = [] + for index in indexes: + if random.random() > 0.5: + trigger_signal[index] = 1 + else: + trigger_tags += [make_tag('detect', True, index)] + return (trigger_signal, trigger_tags) + ### Go, go, go + # The divide-by-20 means we'll usually get the same random seed + # between the first run and the XML run. + random_seed = int(time.time()/20) + random.seed(random_seed) + print("Random seed: {0}".format(random_seed)) + n_bursts = 400 + header_len = 5 + max_gap = 50 + max_burstsize = 100 + fail_rate = 0.05 + signal, indexes, total_payload_len, burst_sizes = create_signal( + n_bursts, header_len, max_gap, max_burstsize, fail_rate + ) + trigger_signal, trigger_tags = indexes_to_triggers(indexes, len(signal)) + # Flow graph + data_src = blocks.vector_source_f( + signal, False, + tags=trigger_tags + ) + trigger_src = blocks.vector_source_b(trigger_signal, False) + hpd = digital.header_payload_demux( + header_len=header_len, + items_per_symbol=1, + guard_interval=0, + length_tag_key="frame_len", + trigger_tag_key="detect", + output_symbols=False, + itemsize=gr.sizeof_float, + timing_tag_key='rx_time', + samp_rate=1.0, + special_tags=('rx_freq',), + ) + mock_header_demod = HeaderToMessageBlock( + numpy.float32, + header_len, + burst_sizes + ) + header_sink = blocks.vector_sink_f() + payload_sink = blocks.vector_sink_f() + self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.run_tb(payload_sink, total_payload_len, header_sink, header_len*n_bursts) + self.assertEqual(header_sink.data(), tuple([1]*header_len*n_bursts)) + self.assertEqual(payload_sink.data(), tuple([2]*total_payload_len)) + if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux, "qa_header_payload_demux.xml") |