summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_header_payload_demux.py
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2018-12-15 21:08:13 -0800
committerMartin Braun <martin.braun@ettus.com>2018-12-16 12:58:35 -0800
commit9a66a27ea04ae0ff38215b0522731a0f3f3b955c (patch)
treef07dafcfc4d625a2417102aa1c96cc771f24e9d0 /gr-digital/python/digital/qa_header_payload_demux.py
parent84252e3e09b50165bd8ebd8419d1d19e7fc51dd4 (diff)
digital: Fix some Pylint warnings in qa_header_payload_demux.py
No functional changes.
Diffstat (limited to 'gr-digital/python/digital/qa_header_payload_demux.py')
-rw-r--r--gr-digital/python/digital/qa_header_payload_demux.py163
1 files changed, 86 insertions, 77 deletions
diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py
index d77f7c689d..22b0e4ce79 100644
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-# Copyright 2012-2016 Free Software Foundation, Inc.
+# Copyright 2012-2016,2018 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -34,6 +34,7 @@ import pmt
def make_tag(key, value, offset):
+ """Create a gr.tag_t() from key, value, offset."""
tag = gr.tag_t()
tag.offset = offset
tag.key = pmt.string_to_symbol(key)
@@ -46,7 +47,7 @@ class HeaderToMessageBlock(gr.sync_block):
Helps with testing the HPD. Receives a header, stores it, posts
a predetermined message.
"""
- def __init__(self, itemsize, header_len, messages, header_is_symbol=False):
+ def __init__(self, itemsize, header_len, messages):
gr.sync_block.__init__(
self,
name="HeaderToMessageBlock",
@@ -59,9 +60,9 @@ class HeaderToMessageBlock(gr.sync_block):
self.msg_count = 0
def work(self, input_items, output_items):
- for i in range(len(input_items[0]) // self.header_len):
+ """Where the magic happens."""
+ for _ in range(len(input_items[0]) // self.header_len):
msg = self.messages[self.msg_count] or False
- #print("Sending message: {0}".format(msg))
self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg))
self.msg_count += 1
output_items[0][:] = input_items[0][:]
@@ -71,32 +72,34 @@ class HeaderToMessageBlock(gr.sync_block):
class qa_header_payload_demux (gr_unittest.TestCase):
def setUp (self):
+ """Runs before every test."""
self.tb = gr.top_block ()
def tearDown (self):
+ """Runs after every test."""
self.tb = None
def connect_all_blocks(self,
- data_src, trigger_src,
- hpd,
- mock_header_demod,
- payload_sink, header_sink
- ):
+ data_src, trigger_src,
+ hpd,
+ mock_header_demod,
+ payload_sink, header_sink):
"""
Connect the standard HPD test flowgraph
"""
- self.tb.connect(data_src, (hpd, 0))
+ self.tb.connect(data_src, (hpd, 0))
if trigger_src is not None:
self.tb.connect(trigger_src, (hpd, 1))
self.tb.connect((hpd, 0), mock_header_demod)
self.tb.connect(mock_header_demod, header_sink)
self.tb.msg_connect(
- mock_header_demod, 'header_data',
- hpd, 'header_data'
+ mock_header_demod, 'header_data',
+ hpd, 'header_data'
)
self.tb.connect((hpd, 1), payload_sink)
def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30):
+ """Execute self.tb"""
stop_time = time.time() + timeout
self.tb.start()
while len(payload_sink.data()) < payload_len and \
@@ -106,7 +109,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.tb.stop()
self.tb.wait()
- def test_001_t (self):
+ def test_001_t(self):
""" Simplest possible test: put in zeros, then header,
then payload, trigger signal, try to demux.
The return signal from the header parser is faked via _post()
@@ -127,23 +130,29 @@ class qa_header_payload_demux (gr_unittest.TestCase):
# This goes on output 1, item 3:
testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
data_src = blocks.vector_source_f(
- data_signal,
- False,
- tags=(testtag1, testtag2, testtag3, testtag4)
+ data_signal,
+ False,
+ tags=(testtag1, testtag2, testtag3, testtag4)
)
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
)
mock_header_demod = HeaderToMessageBlock(
- numpy.float32,
- len(header),
- [len(payload)]
+ numpy.float32,
+ len(header),
+ [len(payload)]
)
self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
payload_sink = blocks.vector_sink_f()
header_sink = blocks.vector_sink_f()
- self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink)
+ self.connect_all_blocks(
+ data_src,
+ trigger_src,
+ hpd,
+ mock_header_demod,
+ payload_sink,
+ header_sink)
self.run_tb(payload_sink, len(payload), header_sink, len(header))
self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload)
@@ -152,8 +161,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
ptag = gr.tag_to_python(tag)
ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_header = [
- {'key': 'tag2', 'offset': 0},
- {'key': 'tag3', 'offset': 2},
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 2},
]
self.assertEqual(expected_tags_header, ptags_header)
ptags_payload = []
@@ -161,12 +170,12 @@ class qa_header_payload_demux (gr_unittest.TestCase):
ptag = gr.tag_to_python(tag)
ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_payload = [
- {'key': 'frame_len', 'offset': 0},
- {'key': 'tag4', 'offset': 3},
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 3},
]
self.assertEqual(expected_tags_payload, ptags_payload)
- def test_001_t_tags (self):
+ def test_001_t_tags(self):
""" Like the previous test, but use a trigger tag instead of
a trigger signal.
"""
@@ -185,9 +194,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
# This goes on output 1, item 3:
testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
data_src = blocks.vector_source_f(
- data_signal,
- False,
- tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4)
+ data_signal,
+ False,
+ tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4)
)
hpd = digital.header_payload_demux(
len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
@@ -196,22 +205,22 @@ class qa_header_payload_demux (gr_unittest.TestCase):
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()
mock_header_demod = HeaderToMessageBlock(
- numpy.float32,
- len(header),
- [len(payload)]
+ numpy.float32,
+ len(header),
+ [len(payload)]
)
self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink)
self.run_tb(payload_sink, len(payload), header_sink, len(header))
# Check results
- self.assertEqual(header_sink.data(), header)
+ self.assertEqual(header_sink.data(), header)
self.assertEqual(payload_sink.data(), payload)
ptags_header = []
for tag in header_sink.tags():
ptag = gr.tag_to_python(tag)
ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_header = [
- {'key': 'tag2', 'offset': 0},
- {'key': 'tag3', 'offset': 2},
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 2},
]
self.assertEqual(expected_tags_header, ptags_header)
ptags_payload = []
@@ -219,8 +228,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
ptag = gr.tag_to_python(tag)
ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_payload = [
- {'key': 'frame_len', 'offset': 0},
- {'key': 'tag4', 'offset': 3},
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 3},
]
self.assertEqual(expected_tags_payload, ptags_payload)
@@ -242,9 +251,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
# This goes on output 1, item 3:
testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
data_src = blocks.vector_source_f(
- data_signal,
- False,
- tags=(testtag1, testtag2, testtag3, testtag4)
+ data_signal,
+ False,
+ tags=(testtag1, testtag2, testtag3, testtag4)
)
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
@@ -271,15 +280,15 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.run_tb(payload_sink, len(payload), header_sink, len(header)+2)
# Check values
# Header now is padded:
- self.assertEqual(header_sink.data(), (0,) + header + (payload[0],))
+ self.assertEqual(header_sink.data(), (0,) + header + (payload[0],))
self.assertEqual(payload_sink.data(), payload)
ptags_header = []
for tag in header_sink.tags():
ptag = gr.tag_to_python(tag)
ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_header = [
- {'key': 'tag2', 'offset': 1},
- {'key': 'tag3', 'offset': 3},
+ {'key': 'tag2', 'offset': 1},
+ {'key': 'tag3', 'offset': 3},
]
self.assertEqual(expected_tags_header, ptags_header)
ptags_payload = []
@@ -287,8 +296,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
ptag = gr.tag_to_python(tag)
ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_payload = [
- {'key': 'frame_len', 'offset': 0},
- {'key': 'tag4', 'offset': 3},
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 3},
]
self.assertEqual(expected_tags_payload, ptags_payload)
@@ -305,9 +314,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
# This goes on output 1, item 3 + 1 (for payload offset)
testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
data_src = blocks.vector_source_f(
- data_signal,
- False,
- tags=(testtag4,)
+ data_signal,
+ False,
+ tags=(testtag4,)
)
trigger_src = blocks.vector_source_b(trigger_signal, False)
hpd = digital.header_payload_demux(
@@ -333,8 +342,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.tb.start()
time.sleep(.2) # Need this, otherwise, the next message is ignored
hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset})
+ pmt.intern('header_data'),
+ pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset})
)
while len(payload_sink.data()) < len(payload):
time.sleep(.2)
@@ -344,22 +353,22 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.assertEqual(header_sink.data(), (0,) + header + (payload[0],))
# Payload is now offset:
self.assertEqual(
- payload_sink.data(),
- data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]
+ payload_sink.data(),
+ data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]
)
ptags_payload = {}
for tag in payload_sink.tags():
ptag = gr.tag_to_python(tag)
ptags_payload[ptag.key] = ptag.offset
expected_tags_payload = {
- 'frame_len': 0,
- 'payload_offset': 0,
- 'tag4': 3 - payload_offset,
+ 'frame_len': 0,
+ 'payload_offset': 0,
+ 'tag4': 3 - payload_offset,
}
self.assertEqual(expected_tags_payload, ptags_payload)
- def test_002_symbols (self):
+ def test_002_symbols(self):
"""
Same as before, but operate on symbols
"""
@@ -401,8 +410,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.tb.start()
time.sleep(.2) # Need this, otherwise, the next message is ignored
hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.from_long(n_symbols)
+ pmt.intern('header_data'),
+ pmt.from_long(n_symbols)
)
while len(payload_sink.data()) < len(payload) * n_symbols:
time.sleep(.2)
@@ -415,8 +424,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
ptag = gr.tag_to_python(tag)
ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_header = [
- {'key': 'tag2', 'offset': 0},
- {'key': 'tag3', 'offset': 0},
+ {'key': 'tag2', 'offset': 0},
+ {'key': 'tag3', 'offset': 0},
]
self.assertEqual(expected_tags_header, ptags_header)
ptags_payload = []
@@ -424,12 +433,12 @@ class qa_header_payload_demux (gr_unittest.TestCase):
ptag = gr.tag_to_python(tag)
ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
expected_tags_payload = [
- {'key': 'frame_len', 'offset': 0},
- {'key': 'tag4', 'offset': 1},
+ {'key': 'frame_len', 'offset': 0},
+ {'key': 'tag4', 'offset': 1},
]
self.assertEqual(expected_tags_payload, ptags_payload)
- def test_003_t (self):
+ def test_003_t(self):
"""
Like test 1, but twice, plus one fail
"""
@@ -448,8 +457,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
print("Triggers at: {0} {1} {2}".format(
n_zeros,
len(data_signal),
- len(data_signal)+len(header_fail)+n_zeros)
- )
+ len(data_signal)+len(header_fail)+n_zeros))
tx_signal = data_signal + \
header_fail + (0,) * n_zeros + \
header + payload2 + (0,) * 1000
@@ -487,21 +495,21 @@ class qa_header_payload_demux (gr_unittest.TestCase):
self.tb.start()
time.sleep(.2) # Need this, otherwise, the next message is ignored
hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.from_long(len(payload1))
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload1))
)
while len(payload_sink.data()) < len(payload1):
time.sleep(.2)
hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.PMT_F
+ pmt.intern('header_data'),
+ pmt.PMT_F
)
# This next command is a bit of a showstopper, but there's no condition to check upon
# to see if the previous msg handling is finished
time.sleep(.7)
hpd.to_basic_block()._post(
- pmt.intern('header_data'),
- pmt.from_long(len(payload2))
+ pmt.intern('header_data'),
+ pmt.from_long(len(payload2))
)
while len(payload_sink.data()) < len(payload1) + len(payload2):
time.sleep(.2)
@@ -552,7 +560,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
indexes = []
burst_sizes = []
total_payload_len = 0
- for burst_count in range(n_bursts):
+ for _ in range(n_bursts):
gap_size = random.randint(0, max_gap)
signal += [0] * gap_size
is_failure = random.random() < fail_rate
@@ -579,11 +587,12 @@ class qa_header_payload_demux (gr_unittest.TestCase):
trigger_tags += [make_tag('detect', True, index)]
return (trigger_signal, trigger_tags)
### Go, go, go
+ # Uncomment this if you want true randomness -- good for actual fuzzing
# The divide-by-20 means we'll usually get the same random seed
# between the first run and the XML run.
- random_seed = int(time.time() / 20)
- random.seed(random_seed)
- print("Random seed: {0}".format(random_seed))
+ # random_seed = int(time.time() / 20)
+ # random.seed(random_seed)
+ # print("Random seed: {0}".format(random_seed))
n_bursts = 400
header_len = 5
max_gap = 50
@@ -612,9 +621,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
special_tags=('rx_freq',),
)
mock_header_demod = HeaderToMessageBlock(
- numpy.float32,
- header_len,
- burst_sizes
+ numpy.float32,
+ header_len,
+ burst_sizes
)
header_sink = blocks.vector_sink_f()
payload_sink = blocks.vector_sink_f()