diff options
author | Martin Braun <martin.braun@ettus.com> | 2018-12-15 21:08:13 -0800 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2018-12-16 12:58:35 -0800 |
commit | 9a66a27ea04ae0ff38215b0522731a0f3f3b955c (patch) | |
tree | f07dafcfc4d625a2417102aa1c96cc771f24e9d0 /gr-digital/python/digital/qa_header_payload_demux.py | |
parent | 84252e3e09b50165bd8ebd8419d1d19e7fc51dd4 (diff) |
digital: Fix some Pylint warnings in qa_header_payload_demux.py
No functional changes.
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rw-r--r-- | gr-digital/python/digital/qa_header_payload_demux.py | 163 |
1 files changed, 86 insertions, 77 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py index d77f7c689d..22b0e4ce79 100644 --- a/gr-digital/python/digital/qa_header_payload_demux.py +++ b/gr-digital/python/digital/qa_header_payload_demux.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2012-2016 Free Software Foundation, Inc. +# Copyright 2012-2016,2018 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -34,6 +34,7 @@ import pmt def make_tag(key, value, offset): + """Create a gr.tag_t() from key, value, offset.""" tag = gr.tag_t() tag.offset = offset tag.key = pmt.string_to_symbol(key) @@ -46,7 +47,7 @@ class HeaderToMessageBlock(gr.sync_block): Helps with testing the HPD. Receives a header, stores it, posts a predetermined message. """ - def __init__(self, itemsize, header_len, messages, header_is_symbol=False): + def __init__(self, itemsize, header_len, messages): gr.sync_block.__init__( self, name="HeaderToMessageBlock", @@ -59,9 +60,9 @@ class HeaderToMessageBlock(gr.sync_block): self.msg_count = 0 def work(self, input_items, output_items): - for i in range(len(input_items[0]) // self.header_len): + """Where the magic happens.""" + for _ in range(len(input_items[0]) // self.header_len): msg = self.messages[self.msg_count] or False - #print("Sending message: {0}".format(msg)) self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg)) self.msg_count += 1 output_items[0][:] = input_items[0][:] @@ -71,32 +72,34 @@ class HeaderToMessageBlock(gr.sync_block): class qa_header_payload_demux (gr_unittest.TestCase): def setUp (self): + """Runs before every test.""" self.tb = gr.top_block () def tearDown (self): + """Runs after every test.""" self.tb = None def connect_all_blocks(self, - data_src, trigger_src, - hpd, - mock_header_demod, - payload_sink, header_sink - ): + data_src, trigger_src, + hpd, + mock_header_demod, + payload_sink, header_sink): """ Connect the standard HPD test flowgraph """ - self.tb.connect(data_src, (hpd, 0)) + self.tb.connect(data_src, (hpd, 0)) if trigger_src is not None: self.tb.connect(trigger_src, (hpd, 1)) self.tb.connect((hpd, 0), mock_header_demod) self.tb.connect(mock_header_demod, header_sink) self.tb.msg_connect( - mock_header_demod, 'header_data', - hpd, 'header_data' + mock_header_demod, 'header_data', + hpd, 'header_data' ) self.tb.connect((hpd, 1), payload_sink) def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30): + """Execute self.tb""" stop_time = time.time() + timeout self.tb.start() while len(payload_sink.data()) < payload_len and \ @@ -106,7 +109,7 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.tb.stop() self.tb.wait() - def test_001_t (self): + def test_001_t(self): """ Simplest possible test: put in zeros, then header, then payload, trigger signal, try to demux. The return signal from the header parser is faked via _post() @@ -127,23 +130,29 @@ class qa_header_payload_demux (gr_unittest.TestCase): # This goes on output 1, item 3: testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( - data_signal, - False, - tags=(testtag1, testtag2, testtag3, testtag4) + data_signal, + False, + tags=(testtag1, testtag2, testtag3, testtag4) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float ) mock_header_demod = HeaderToMessageBlock( - numpy.float32, - len(header), - [len(payload)] + numpy.float32, + len(header), + [len(payload)] ) self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you payload_sink = blocks.vector_sink_f() header_sink = blocks.vector_sink_f() - self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink) + self.connect_all_blocks( + data_src, + trigger_src, + hpd, + mock_header_demod, + payload_sink, + header_sink) self.run_tb(payload_sink, len(payload), header_sink, len(header)) self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) @@ -152,8 +161,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ - {'key': 'tag2', 'offset': 0}, - {'key': 'tag3', 'offset': 2}, + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 2}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] @@ -161,12 +170,12 @@ class qa_header_payload_demux (gr_unittest.TestCase): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ - {'key': 'frame_len', 'offset': 0}, - {'key': 'tag4', 'offset': 3}, + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, ] self.assertEqual(expected_tags_payload, ptags_payload) - def test_001_t_tags (self): + def test_001_t_tags(self): """ Like the previous test, but use a trigger tag instead of a trigger signal. """ @@ -185,9 +194,9 @@ class qa_header_payload_demux (gr_unittest.TestCase): # This goes on output 1, item 3: testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( - data_signal, - False, - tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4) + data_signal, + False, + tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4) ) hpd = digital.header_payload_demux( len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float @@ -196,22 +205,22 @@ class qa_header_payload_demux (gr_unittest.TestCase): header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() mock_header_demod = HeaderToMessageBlock( - numpy.float32, - len(header), - [len(payload)] + numpy.float32, + len(header), + [len(payload)] ) self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink) self.run_tb(payload_sink, len(payload), header_sink, len(header)) # Check results - self.assertEqual(header_sink.data(), header) + self.assertEqual(header_sink.data(), header) self.assertEqual(payload_sink.data(), payload) ptags_header = [] for tag in header_sink.tags(): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ - {'key': 'tag2', 'offset': 0}, - {'key': 'tag3', 'offset': 2}, + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 2}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] @@ -219,8 +228,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ - {'key': 'frame_len', 'offset': 0}, - {'key': 'tag4', 'offset': 3}, + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, ] self.assertEqual(expected_tags_payload, ptags_payload) @@ -242,9 +251,9 @@ class qa_header_payload_demux (gr_unittest.TestCase): # This goes on output 1, item 3: testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( - data_signal, - False, - tags=(testtag1, testtag2, testtag3, testtag4) + data_signal, + False, + tags=(testtag1, testtag2, testtag3, testtag4) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( @@ -271,15 +280,15 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.run_tb(payload_sink, len(payload), header_sink, len(header)+2) # Check values # Header now is padded: - self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) + self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) self.assertEqual(payload_sink.data(), payload) ptags_header = [] for tag in header_sink.tags(): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ - {'key': 'tag2', 'offset': 1}, - {'key': 'tag3', 'offset': 3}, + {'key': 'tag2', 'offset': 1}, + {'key': 'tag3', 'offset': 3}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] @@ -287,8 +296,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ - {'key': 'frame_len', 'offset': 0}, - {'key': 'tag4', 'offset': 3}, + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 3}, ] self.assertEqual(expected_tags_payload, ptags_payload) @@ -305,9 +314,9 @@ class qa_header_payload_demux (gr_unittest.TestCase): # This goes on output 1, item 3 + 1 (for payload offset) testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3) data_src = blocks.vector_source_f( - data_signal, - False, - tags=(testtag4,) + data_signal, + False, + tags=(testtag4,) ) trigger_src = blocks.vector_source_b(trigger_signal, False) hpd = digital.header_payload_demux( @@ -333,8 +342,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.tb.start() time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset}) + pmt.intern('header_data'), + pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset}) ) while len(payload_sink.data()) < len(payload): time.sleep(.2) @@ -344,22 +353,22 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.assertEqual(header_sink.data(), (0,) + header + (payload[0],)) # Payload is now offset: self.assertEqual( - payload_sink.data(), - data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)] + payload_sink.data(), + data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)] ) ptags_payload = {} for tag in payload_sink.tags(): ptag = gr.tag_to_python(tag) ptags_payload[ptag.key] = ptag.offset expected_tags_payload = { - 'frame_len': 0, - 'payload_offset': 0, - 'tag4': 3 - payload_offset, + 'frame_len': 0, + 'payload_offset': 0, + 'tag4': 3 - payload_offset, } self.assertEqual(expected_tags_payload, ptags_payload) - def test_002_symbols (self): + def test_002_symbols(self): """ Same as before, but operate on symbols """ @@ -401,8 +410,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.tb.start() time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(n_symbols) + pmt.intern('header_data'), + pmt.from_long(n_symbols) ) while len(payload_sink.data()) < len(payload) * n_symbols: time.sleep(.2) @@ -415,8 +424,8 @@ class qa_header_payload_demux (gr_unittest.TestCase): ptag = gr.tag_to_python(tag) ptags_header.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_header = [ - {'key': 'tag2', 'offset': 0}, - {'key': 'tag3', 'offset': 0}, + {'key': 'tag2', 'offset': 0}, + {'key': 'tag3', 'offset': 0}, ] self.assertEqual(expected_tags_header, ptags_header) ptags_payload = [] @@ -424,12 +433,12 @@ class qa_header_payload_demux (gr_unittest.TestCase): ptag = gr.tag_to_python(tag) ptags_payload.append({'key': ptag.key, 'offset': ptag.offset}) expected_tags_payload = [ - {'key': 'frame_len', 'offset': 0}, - {'key': 'tag4', 'offset': 1}, + {'key': 'frame_len', 'offset': 0}, + {'key': 'tag4', 'offset': 1}, ] self.assertEqual(expected_tags_payload, ptags_payload) - def test_003_t (self): + def test_003_t(self): """ Like test 1, but twice, plus one fail """ @@ -448,8 +457,7 @@ class qa_header_payload_demux (gr_unittest.TestCase): print("Triggers at: {0} {1} {2}".format( n_zeros, len(data_signal), - len(data_signal)+len(header_fail)+n_zeros) - ) + len(data_signal)+len(header_fail)+n_zeros)) tx_signal = data_signal + \ header_fail + (0,) * n_zeros + \ header + payload2 + (0,) * 1000 @@ -487,21 +495,21 @@ class qa_header_payload_demux (gr_unittest.TestCase): self.tb.start() time.sleep(.2) # Need this, otherwise, the next message is ignored hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(len(payload1)) + pmt.intern('header_data'), + pmt.from_long(len(payload1)) ) while len(payload_sink.data()) < len(payload1): time.sleep(.2) hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.PMT_F + pmt.intern('header_data'), + pmt.PMT_F ) # This next command is a bit of a showstopper, but there's no condition to check upon # to see if the previous msg handling is finished time.sleep(.7) hpd.to_basic_block()._post( - pmt.intern('header_data'), - pmt.from_long(len(payload2)) + pmt.intern('header_data'), + pmt.from_long(len(payload2)) ) while len(payload_sink.data()) < len(payload1) + len(payload2): time.sleep(.2) @@ -552,7 +560,7 @@ class qa_header_payload_demux (gr_unittest.TestCase): indexes = [] burst_sizes = [] total_payload_len = 0 - for burst_count in range(n_bursts): + for _ in range(n_bursts): gap_size = random.randint(0, max_gap) signal += [0] * gap_size is_failure = random.random() < fail_rate @@ -579,11 +587,12 @@ class qa_header_payload_demux (gr_unittest.TestCase): trigger_tags += [make_tag('detect', True, index)] return (trigger_signal, trigger_tags) ### Go, go, go + # Uncomment this if you want true randomness -- good for actual fuzzing # The divide-by-20 means we'll usually get the same random seed # between the first run and the XML run. - random_seed = int(time.time() / 20) - random.seed(random_seed) - print("Random seed: {0}".format(random_seed)) + # random_seed = int(time.time() / 20) + # random.seed(random_seed) + # print("Random seed: {0}".format(random_seed)) n_bursts = 400 header_len = 5 max_gap = 50 @@ -612,9 +621,9 @@ class qa_header_payload_demux (gr_unittest.TestCase): special_tags=('rx_freq',), ) mock_header_demod = HeaderToMessageBlock( - numpy.float32, - header_len, - burst_sizes + numpy.float32, + header_len, + burst_sizes ) header_sink = blocks.vector_sink_f() payload_sink = blocks.vector_sink_f() |