diff options
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rw-r--r-- | gr-digital/python/digital/qa_header_payload_demux.py | 209 |
1 files changed, 126 insertions, 83 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index edee14c0c1..76be0905f0 100644 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -33,6 +33,7 @@ class HeaderToMessageBlock(gr.sync_block): Helps with testing the HPD. Receives a header, stores it, posts a predetermined message. """ + def __init__(self, itemsize, header_len, messages): gr.sync_block.__init__( self, @@ -57,12 +58,12 @@ class HeaderToMessageBlock(gr.sync_block): class qa_header_payload_demux (gr_unittest.TestCase): - def setUp (self): + def setUp(self): """Runs before every test.""" - self.tb = gr.top_block () + self.tb = gr.top_block() random.seed(0) - def tearDown (self): + def tearDown(self): """Runs after every test.""" self.tb = None @@ -85,11 +86,17 @@ class qa_header_payload_demux (gr_unittest.TestCase): ) self.tb.connect((hpd, 1), payload_sink) - def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30): + def run_tb( + self, + payload_sink, + payload_len, + header_sink, + header_len, + timeout=30): """Execute self.tb""" stop_time = time.time() + timeout self.tb.start() - while (len(payload_sink.data()) < payload_len or \ + while (len(payload_sink.data()) < payload_len or len(header_sink.data()) < header_len) and \ time.time() < stop_time: time.sleep(.2) @@ -106,7 +113,7 @@ class qa_header_payload_demux (gr_unittest.TestCase): header = (1, 2, 3) payload = tuple(range(5, 20)) data_signal = (0,) * n_zeros + header + payload - trigger_signal = [0,] * len(data_signal) + trigger_signal = [0, ] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: testtag1 = make_tag('tag1', 0, 0) @@ -130,7 +137,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): len(header), [len(payload)] ) - self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + # extra system port defined for you + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) payload_sink = blocks.vector_sink_f() header_sink = blocks.vector_sink_f() self.connect_all_blocks( @@ -188,7 +196,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float ) - self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + # extra system port defined for you + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() mock_header_demod = HeaderToMessageBlock( @@ -196,7 +205,13 @@ class qa_header_payload_demux (gr_unittest.TestCase): len(header), [len(payload)] ) - self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink) + self.connect_all_blocks( + data_src, + None, + hpd, + mock_header_demod, + payload_sink, + header_sink) self.run_tb(payload_sink, len(payload), header_sink, len(header)) # Check results self.assertEqual(header_sink.data(), list(header)) @@ -220,13 +235,13 @@ class qa_header_payload_demux (gr_unittest.TestCase): ] self.assertEqual(expected_tags_payload, ptags_payload) - def test_001_headerpadding (self): + def test_001_headerpadding(self): """ Like test 1, but with header padding. """ n_zeros = 3 header = [1, 2, 3] header_padding = 1 payload = list(range(5, 20)) - data_signal = [0,] * n_zeros + header + payload + data_signal = [0, ] * n_zeros + header + payload trigger_signal = [0] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: @@ -245,15 +260,15 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), - 1, # Items per symbol - 0, # Guard interval - "frame_len", # TSB tag key - "detect", # Trigger tag key - False, # No symbols please - gr.sizeof_float, # Item size - "", # Timing tag key - 1.0, # Samp rate - (), # No special tags + 1, # Items per symbol + 0, # Guard interval + "frame_len", # TSB tag key + "detect", # Trigger tag key + False, # No symbols please + gr.sizeof_float, # Item size + "", # Timing tag key + 1.0, # Samp rate + (), # No special tags header_padding ) mock_header_demod = HeaderToMessageBlock( @@ -263,11 +278,17 @@ class qa_header_payload_demux (gr_unittest.TestCase): ) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) - self.run_tb(payload_sink, len(payload), header_sink, len(header)+2) + self.connect_all_blocks( + data_src, + trigger_src, + hpd, + mock_header_demod, + payload_sink, + header_sink) + self.run_tb(payload_sink, len(payload), header_sink, len(header) + 2) # Check values # Header now is padded: - self.assertEqual(header_sink.data(), [0,] + header + [payload[0],]) + self.assertEqual(header_sink.data(), [0, ] + header + [payload[0], ]) self.assertEqual(payload_sink.data(), payload) ptags_header = [] for tag in header_sink.tags(): @@ -288,14 +309,14 @@ class qa_header_payload_demux (gr_unittest.TestCase): ] self.assertEqual(expected_tags_payload, ptags_payload) - def test_001_headerpadding_payload_offset (self): + def test_001_headerpadding_payload_offset(self): """ Like test 1, but with header padding + payload offset. """ n_zeros = 3 header = [1, 2, 3] header_padding = 1 payload_offset = -1 payload = list(range(5, 20)) - data_signal = [0,] * n_zeros + header + payload + [0,] * 100 + data_signal = [0, ] * n_zeros + header + payload + [0, ] * 100 trigger_signal = [0] * len(data_signal) trigger_signal[n_zeros] = 1 # This goes on output 1, item 3 + 1 (for payload offset) @@ -308,41 +329,38 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), - 1, # Items per symbol - 0, # Guard interval - "frame_len", # TSB tag key - "detect", # Trigger tag key - False, # No symbols please - gr.sizeof_float, # Item size - "", # Timing tag key - 1.0, # Samp rate - (), # No special tags + 1, # Items per symbol + 0, # Guard interval + "frame_len", # TSB tag key + "detect", # Trigger tag key + False, # No symbols please + gr.sizeof_float, # Item size + "", # Timing tag key + 1.0, # Samp rate + (), # No special tags header_padding ) - self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + # extra system port defined for you + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(data_src, (hpd, 0)) self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), header_sink) self.tb.connect((hpd, 1), payload_sink) self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored - hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset}) - ) + time.sleep(.2) # Need this, otherwise, the next message is ignored + hpd.to_basic_block()._post(pmt.intern('header_data'), pmt.to_pmt( + {'frame_len': len(payload), 'payload_offset': payload_offset})) while len(payload_sink.data()) < len(payload): time.sleep(.2) self.tb.stop() self.tb.wait() # Header is now padded: - self.assertEqual(header_sink.data(), [0,] + header + [payload[0],]) + self.assertEqual(header_sink.data(), [0, ] + header + [payload[0], ]) # Payload is now offset: - self.assertEqual( - payload_sink.data(), - data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)] - ) + self.assertEqual(payload_sink.data(), data_signal[n_zeros + len( + header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]) ptags_payload = {} for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) @@ -354,7 +372,6 @@ class qa_header_payload_demux (gr_unittest.TestCase): } self.assertEqual(expected_tags_payload, ptags_payload) - def test_002_symbols(self): """ Same as before, but operate on symbols @@ -365,8 +382,9 @@ class qa_header_payload_demux (gr_unittest.TestCase): n_symbols = 4 header = (1, 2, 3) payload = (1, 2, 3) - data_signal = (0,) * n_zeros + (0,) + header + ((0,) + payload) * n_symbols - trigger_signal = [0,] * len(data_signal) + data_signal = (0,) * n_zeros + (0,) + header + \ + ((0,) + payload) * n_symbols + trigger_signal = [0, ] * len(data_signal) trigger_signal[n_zeros] = 1 # This is dropped: testtag1 = make_tag('tag1', 0, 0) @@ -375,11 +393,14 @@ class qa_header_payload_demux (gr_unittest.TestCase): # This goes on output 0, item 0 (middle of the header symbol) testtag3 = make_tag('tag3', 42, n_zeros + gi + 1) # This goes on output 1, item 1 (middle of the first payload symbol) - testtag4 = make_tag('tag4', 314, n_zeros + (gi + items_per_symbol) * 2 + 1) - data_src = blocks.vector_source_f(data_signal, False, tags=(testtag1, testtag2, testtag3, testtag4)) + testtag4 = make_tag('tag4', 314, n_zeros + + (gi + items_per_symbol) * 2 + 1) + data_src = blocks.vector_source_f( + data_signal, False, tags=( + testtag1, testtag2, testtag3, testtag4)) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( - len(header) // items_per_symbol, # Header length (in symbols) + len(header) // items_per_symbol, # Header length (in symbols) items_per_symbol, # Items per symbols gi, # Items per guard time "frame_len", # Frame length tag key @@ -387,15 +408,16 @@ class qa_header_payload_demux (gr_unittest.TestCase): True, # Output symbols (not items) gr.sizeof_float # Bytes per item ) - self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + # extra system port defined for you + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f(items_per_symbol) payload_sink = blocks.vector_sink_f(items_per_symbol) - self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(data_src, (hpd, 0)) self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), header_sink) self.tb.connect((hpd, 1), payload_sink) self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored + time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( pmt.intern('header_data'), pmt.from_long(n_symbols) @@ -404,7 +426,7 @@ class qa_header_payload_demux (gr_unittest.TestCase): time.sleep(.2) self.tb.stop() self.tb.wait() - self.assertEqual(header_sink.data(), header) + self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload * n_symbols) ptags_header = [] for tag in header_sink.tags(): @@ -429,32 +451,32 @@ class qa_header_payload_demux (gr_unittest.TestCase): """ Like test 1, but twice, plus one fail """ - ### Tx Data + # Tx Data n_zeros = 5 header = [1, 2, 3] - header_fail = [-1, -2, -4] # Contents don't really matter + header_fail = [-1, -2, -4] # Contents don't really matter payload1 = list(range(5, 20)) - payload2 = [42,] + payload2 = [42, ] sampling_rate = 2 - data_signal = [0,] * n_zeros + header + payload1 - trigger_signal = [0,] * len(data_signal) * 2 + data_signal = [0, ] * n_zeros + header + payload1 + trigger_signal = [0, ] * len(data_signal) * 2 trigger_signal[n_zeros] = 1 trigger_signal[len(data_signal)] = 1 - trigger_signal[len(data_signal)+len(header_fail)+n_zeros] = 1 + trigger_signal[len(data_signal) + len(header_fail) + n_zeros] = 1 print("Triggers at: {0} {1} {2}".format( n_zeros, len(data_signal), - len(data_signal)+len(header_fail)+n_zeros)) + len(data_signal) + len(header_fail) + n_zeros)) tx_signal = data_signal + \ - header_fail + [0,] * n_zeros + \ - header + payload2 + [0,] * 1000 + header_fail + [0, ] * n_zeros + \ + header + payload2 + [0, ] * 1000 # Timing tag: This is preserved and updated: timing_tag = make_tag('rx_time', (0, 0), 0) # Rx freq tags: rx_freq_tag1 = make_tag('rx_freq', 1.0, 0) rx_freq_tag2 = make_tag('rx_freq', 1.5, 29) rx_freq_tag3 = make_tag('rx_freq', 2.0, 30) - ### Flow graph + # Flow graph data_src = blocks.vector_source_f( tx_signal, False, tags=(timing_tag, rx_freq_tag1, rx_freq_tag2, rx_freq_tag3) @@ -472,15 +494,16 @@ class qa_header_payload_demux (gr_unittest.TestCase): samp_rate=sampling_rate, special_tags=('rx_freq',), ) - self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you + # extra system port defined for you + self.assertEqual(pmt.length(hpd.message_ports_in()), 2) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(data_src, (hpd, 0)) self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), header_sink) self.tb.connect((hpd, 1), payload_sink) self.tb.start() - time.sleep(.2) # Need this, otherwise, the next message is ignored + time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( pmt.intern('header_data'), pmt.from_long(len(payload1)) @@ -511,23 +534,29 @@ class qa_header_payload_demux (gr_unittest.TestCase): # 31: header 3 # 34: payload 2 (length 1) # 35: 1000 zeros - self.assertEqual(header_sink.data(), list(header + header_fail + header)) + self.assertEqual( + header_sink.data(), list( + header + header_fail + header)) self.assertEqual(payload_sink.data(), payload1 + payload2) tags_payload = [gr.tag_to_python(x) for x in payload_sink.tags()] - tags_payload = sorted([(x.offset, x.key, x.value) for x in tags_payload]) + tags_payload = sorted([(x.offset, x.key, x.value) + for x in tags_payload]) tags_expected_payload = [ - (0, 'frame_len', len(payload1)), + (0, 'frame_len', len(payload1)), (len(payload1), 'frame_len', len(payload2)), ] tags_header = [gr.tag_to_python(x) for x in header_sink.tags()] tags_header = sorted([(x.offset, x.key, x.value) for x in tags_header]) tags_expected_header = [ - (0, 'rx_freq', 1.0), - (0, 'rx_time', (2, 0.5)), # Hard coded time value :( Is n_zeros/sampling_rate - (len(header), 'rx_freq', 1.0), - (len(header), 'rx_time', (11, .5)), # Hard coded time value :(. See above. - (2*len(header), 'rx_freq', 2.0), - (2*len(header), 'rx_time', (15, .5)), # Hard coded time value :(. See above. + (0, 'rx_freq', 1.0), + # Hard coded time value :( Is n_zeros/sampling_rate + (0, 'rx_time', (2, 0.5)), + (len(header), 'rx_freq', 1.0), + # Hard coded time value :(. See above. + (len(header), 'rx_time', (11, .5)), + (2 * len(header), 'rx_freq', 2.0), + # Hard coded time value :(. See above. + (2 * len(header), 'rx_time', (15, .5)), ] self.assertEqual(tags_header, tags_expected_header) self.assertEqual(tags_payload, tags_expected_payload) @@ -561,6 +590,7 @@ class qa_header_payload_demux (gr_unittest.TestCase): signal += [2] * burst_size burst_sizes += [burst_size] return (signal, indexes, total_payload_len, burst_sizes) + def indexes_to_triggers(indexes, signal_len): """ Convert indexes to a mix of trigger signals and tags @@ -588,7 +618,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): signal, indexes, total_payload_len, burst_sizes = create_signal( n_bursts, header_len, max_gap, max_burstsize, fail_rate ) - trigger_signal, trigger_tags = indexes_to_triggers(indexes, len(signal)) + trigger_signal, trigger_tags = indexes_to_triggers( + indexes, len(signal)) # Flow graph data_src = blocks.vector_source_f( signal, False, @@ -614,10 +645,22 @@ class qa_header_payload_demux (gr_unittest.TestCase): ) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() - self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) - self.run_tb(payload_sink, total_payload_len, header_sink, header_len*n_bursts) - self.assertEqual(header_sink.data(), list([1]*header_len*n_bursts)) - self.assertEqual(payload_sink.data(), list([2]*total_payload_len)) + self.connect_all_blocks( + data_src, + trigger_src, + hpd, + mock_header_demod, + payload_sink, + header_sink) + self.run_tb( + payload_sink, + total_payload_len, + header_sink, + header_len * + n_bursts) + self.assertEqual(header_sink.data(), list([1] * header_len * n_bursts)) + self.assertEqual(payload_sink.data(), list([2] * total_payload_len)) + if __name__ == '__main__': gr_unittest.run(qa_header_payload_demux) |