From 9a66a27ea04ae0ff38215b0522731a0f3f3b955c Mon Sep 17 00:00:00 2001
From: Martin Braun <martin.braun@ettus.com>
Date: Sat, 15 Dec 2018 21:08:13 -0800
Subject: digital: Fix some Pylint warnings in qa_header_payload_demux.py

No functional changes.
---
 .../python/digital/qa_header_payload_demux.py      | 163 +++++++++++----------
 1 file changed, 86 insertions(+), 77 deletions(-)

(limited to 'gr-digital/python/digital/qa_header_payload_demux.py')

diff --git a/gr-digital/python/digital/qa_header_payload_demux.py b/gr-digital/python/digital/qa_header_payload_demux.py
index d77f7c689d..22b0e4ce79 100644
--- a/gr-digital/python/digital/qa_header_payload_demux.py
+++ b/gr-digital/python/digital/qa_header_payload_demux.py
@@ -1,5 +1,5 @@
 #!/usr/bin/env python
-# Copyright 2012-2016 Free Software Foundation, Inc.
+# Copyright 2012-2016,2018 Free Software Foundation, Inc.
 #
 # This file is part of GNU Radio
 #
@@ -34,6 +34,7 @@ import pmt
 
 
 def make_tag(key, value, offset):
+    """Create a gr.tag_t() from key, value, offset."""
     tag = gr.tag_t()
     tag.offset = offset
     tag.key = pmt.string_to_symbol(key)
@@ -46,7 +47,7 @@ class HeaderToMessageBlock(gr.sync_block):
     Helps with testing the HPD. Receives a header, stores it, posts
     a predetermined message.
     """
-    def __init__(self, itemsize, header_len, messages, header_is_symbol=False):
+    def __init__(self, itemsize, header_len, messages):
         gr.sync_block.__init__(
             self,
             name="HeaderToMessageBlock",
@@ -59,9 +60,9 @@ class HeaderToMessageBlock(gr.sync_block):
         self.msg_count = 0
 
     def work(self, input_items, output_items):
-        for i in range(len(input_items[0]) // self.header_len):
+        """Where the magic happens."""
+        for _ in range(len(input_items[0]) // self.header_len):
             msg = self.messages[self.msg_count] or False
-            #print("Sending message: {0}".format(msg))
             self.message_port_pub(pmt.intern('header_data'), pmt.to_pmt(msg))
             self.msg_count += 1
         output_items[0][:] = input_items[0][:]
@@ -71,32 +72,34 @@ class HeaderToMessageBlock(gr.sync_block):
 class qa_header_payload_demux (gr_unittest.TestCase):
 
     def setUp (self):
+        """Runs before every test."""
         self.tb = gr.top_block ()
 
     def tearDown (self):
+        """Runs after every test."""
         self.tb = None
 
     def connect_all_blocks(self,
-            data_src, trigger_src,
-            hpd,
-            mock_header_demod,
-            payload_sink, header_sink
-    ):
+                           data_src, trigger_src,
+                           hpd,
+                           mock_header_demod,
+                           payload_sink, header_sink):
         """
         Connect the standard HPD test flowgraph
         """
-        self.tb.connect(data_src,    (hpd, 0))
+        self.tb.connect(data_src, (hpd, 0))
         if trigger_src is not None:
             self.tb.connect(trigger_src, (hpd, 1))
         self.tb.connect((hpd, 0), mock_header_demod)
         self.tb.connect(mock_header_demod, header_sink)
         self.tb.msg_connect(
-                mock_header_demod, 'header_data',
-                hpd, 'header_data'
+            mock_header_demod, 'header_data',
+            hpd, 'header_data'
         )
         self.tb.connect((hpd, 1), payload_sink)
 
     def run_tb(self, payload_sink, payload_len, header_sink, header_len, timeout=30):
+        """Execute self.tb"""
         stop_time = time.time() + timeout
         self.tb.start()
         while len(payload_sink.data()) < payload_len and \
@@ -106,7 +109,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.tb.stop()
         self.tb.wait()
 
-    def test_001_t (self):
+    def test_001_t(self):
         """ Simplest possible test: put in zeros, then header,
         then payload, trigger signal, try to demux.
         The return signal from the header parser is faked via _post()
@@ -127,23 +130,29 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         # This goes on output 1, item 3:
         testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
         data_src = blocks.vector_source_f(
-                data_signal,
-                False,
-                tags=(testtag1, testtag2, testtag3, testtag4)
+            data_signal,
+            False,
+            tags=(testtag1, testtag2, testtag3, testtag4)
         )
         trigger_src = blocks.vector_source_b(trigger_signal, False)
         hpd = digital.header_payload_demux(
             len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
         )
         mock_header_demod = HeaderToMessageBlock(
-                numpy.float32,
-                len(header),
-                [len(payload)]
+            numpy.float32,
+            len(header),
+            [len(payload)]
         )
         self.assertEqual(pmt.length(hpd.message_ports_in()), 2) #extra system port defined for you
         payload_sink = blocks.vector_sink_f()
         header_sink = blocks.vector_sink_f()
-        self.connect_all_blocks(data_src, trigger_src, hpd, mock_header_demod, payload_sink, header_sink)
+        self.connect_all_blocks(
+            data_src,
+            trigger_src,
+            hpd,
+            mock_header_demod,
+            payload_sink,
+            header_sink)
         self.run_tb(payload_sink, len(payload), header_sink, len(header))
         self.assertEqual(header_sink.data(), header)
         self.assertEqual(payload_sink.data(), payload)
@@ -152,8 +161,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             ptag = gr.tag_to_python(tag)
             ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_header = [
-                {'key': 'tag2', 'offset': 0},
-                {'key': 'tag3', 'offset': 2},
+            {'key': 'tag2', 'offset': 0},
+            {'key': 'tag3', 'offset': 2},
         ]
         self.assertEqual(expected_tags_header, ptags_header)
         ptags_payload = []
@@ -161,12 +170,12 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             ptag = gr.tag_to_python(tag)
             ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_payload = [
-                {'key': 'frame_len', 'offset': 0},
-                {'key': 'tag4', 'offset': 3},
+            {'key': 'frame_len', 'offset': 0},
+            {'key': 'tag4', 'offset': 3},
         ]
         self.assertEqual(expected_tags_payload, ptags_payload)
 
-    def test_001_t_tags (self):
+    def test_001_t_tags(self):
         """ Like the previous test, but use a trigger tag instead of
         a trigger signal.
         """
@@ -185,9 +194,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         # This goes on output 1, item 3:
         testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
         data_src = blocks.vector_source_f(
-                data_signal,
-                False,
-                tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4)
+            data_signal,
+            False,
+            tags=(trigger_tag, testtag1, testtag2, testtag3, testtag4)
         )
         hpd = digital.header_payload_demux(
             len(header), 1, 0, "frame_len", "detect", False, gr.sizeof_float
@@ -196,22 +205,22 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         header_sink = blocks.vector_sink_f()
         payload_sink = blocks.vector_sink_f()
         mock_header_demod = HeaderToMessageBlock(
-                numpy.float32,
-                len(header),
-                [len(payload)]
+            numpy.float32,
+            len(header),
+            [len(payload)]
         )
         self.connect_all_blocks(data_src, None, hpd, mock_header_demod, payload_sink, header_sink)
         self.run_tb(payload_sink, len(payload), header_sink, len(header))
         # Check results
-        self.assertEqual(header_sink.data(),  header)
+        self.assertEqual(header_sink.data(), header)
         self.assertEqual(payload_sink.data(), payload)
         ptags_header = []
         for tag in header_sink.tags():
             ptag = gr.tag_to_python(tag)
             ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_header = [
-                {'key': 'tag2', 'offset': 0},
-                {'key': 'tag3', 'offset': 2},
+            {'key': 'tag2', 'offset': 0},
+            {'key': 'tag3', 'offset': 2},
         ]
         self.assertEqual(expected_tags_header, ptags_header)
         ptags_payload = []
@@ -219,8 +228,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             ptag = gr.tag_to_python(tag)
             ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_payload = [
-                {'key': 'frame_len', 'offset': 0},
-                {'key': 'tag4', 'offset': 3},
+            {'key': 'frame_len', 'offset': 0},
+            {'key': 'tag4', 'offset': 3},
         ]
         self.assertEqual(expected_tags_payload, ptags_payload)
 
@@ -242,9 +251,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         # This goes on output 1, item 3:
         testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
         data_src = blocks.vector_source_f(
-                data_signal,
-                False,
-                tags=(testtag1, testtag2, testtag3, testtag4)
+            data_signal,
+            False,
+            tags=(testtag1, testtag2, testtag3, testtag4)
         )
         trigger_src = blocks.vector_source_b(trigger_signal, False)
         hpd = digital.header_payload_demux(
@@ -271,15 +280,15 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.run_tb(payload_sink, len(payload), header_sink, len(header)+2)
         # Check values
         # Header now is padded:
-        self.assertEqual(header_sink.data(),  (0,) + header + (payload[0],))
+        self.assertEqual(header_sink.data(), (0,) + header + (payload[0],))
         self.assertEqual(payload_sink.data(), payload)
         ptags_header = []
         for tag in header_sink.tags():
             ptag = gr.tag_to_python(tag)
             ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_header = [
-                {'key': 'tag2', 'offset': 1},
-                {'key': 'tag3', 'offset': 3},
+            {'key': 'tag2', 'offset': 1},
+            {'key': 'tag3', 'offset': 3},
         ]
         self.assertEqual(expected_tags_header, ptags_header)
         ptags_payload = []
@@ -287,8 +296,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             ptag = gr.tag_to_python(tag)
             ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_payload = [
-                {'key': 'frame_len', 'offset': 0},
-                {'key': 'tag4', 'offset': 3},
+            {'key': 'frame_len', 'offset': 0},
+            {'key': 'tag4', 'offset': 3},
         ]
         self.assertEqual(expected_tags_payload, ptags_payload)
 
@@ -305,9 +314,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         # This goes on output 1, item 3 + 1 (for payload offset)
         testtag4 = make_tag('tag4', 314, n_zeros + len(header) + 3)
         data_src = blocks.vector_source_f(
-                data_signal,
-                False,
-                tags=(testtag4,)
+            data_signal,
+            False,
+            tags=(testtag4,)
         )
         trigger_src = blocks.vector_source_b(trigger_signal, False)
         hpd = digital.header_payload_demux(
@@ -333,8 +342,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.tb.start()
         time.sleep(.2) # Need this, otherwise, the next message is ignored
         hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset})
+            pmt.intern('header_data'),
+            pmt.to_pmt({'frame_len': len(payload), 'payload_offset': payload_offset})
         )
         while len(payload_sink.data()) < len(payload):
             time.sleep(.2)
@@ -344,22 +353,22 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.assertEqual(header_sink.data(),  (0,) + header + (payload[0],))
         # Payload is now offset:
         self.assertEqual(
-                payload_sink.data(),
-                data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]
+            payload_sink.data(),
+            data_signal[n_zeros + len(header) + payload_offset:n_zeros + len(header) + payload_offset + len(payload)]
         )
         ptags_payload = {}
         for tag in payload_sink.tags():
             ptag = gr.tag_to_python(tag)
             ptags_payload[ptag.key] = ptag.offset
         expected_tags_payload = {
-                'frame_len': 0,
-                'payload_offset': 0,
-                'tag4': 3 - payload_offset,
+            'frame_len': 0,
+            'payload_offset': 0,
+            'tag4': 3 - payload_offset,
         }
         self.assertEqual(expected_tags_payload, ptags_payload)
 
 
-    def test_002_symbols (self):
+    def test_002_symbols(self):
         """
         Same as before, but operate on symbols
         """
@@ -401,8 +410,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.tb.start()
         time.sleep(.2) # Need this, otherwise, the next message is ignored
         hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.from_long(n_symbols)
+            pmt.intern('header_data'),
+            pmt.from_long(n_symbols)
         )
         while len(payload_sink.data()) < len(payload) * n_symbols:
             time.sleep(.2)
@@ -415,8 +424,8 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             ptag = gr.tag_to_python(tag)
             ptags_header.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_header = [
-                {'key': 'tag2', 'offset': 0},
-                {'key': 'tag3', 'offset': 0},
+            {'key': 'tag2', 'offset': 0},
+            {'key': 'tag3', 'offset': 0},
         ]
         self.assertEqual(expected_tags_header, ptags_header)
         ptags_payload = []
@@ -424,12 +433,12 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             ptag = gr.tag_to_python(tag)
             ptags_payload.append({'key': ptag.key, 'offset': ptag.offset})
         expected_tags_payload = [
-                {'key': 'frame_len', 'offset': 0},
-                {'key': 'tag4', 'offset': 1},
+            {'key': 'frame_len', 'offset': 0},
+            {'key': 'tag4', 'offset': 1},
         ]
         self.assertEqual(expected_tags_payload, ptags_payload)
 
-    def test_003_t (self):
+    def test_003_t(self):
         """
         Like test 1, but twice, plus one fail
         """
@@ -448,8 +457,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         print("Triggers at: {0} {1} {2}".format(
             n_zeros,
             len(data_signal),
-            len(data_signal)+len(header_fail)+n_zeros)
-        )
+            len(data_signal)+len(header_fail)+n_zeros))
         tx_signal = data_signal + \
                 header_fail + (0,) * n_zeros + \
                 header + payload2 + (0,) * 1000
@@ -487,21 +495,21 @@ class qa_header_payload_demux (gr_unittest.TestCase):
         self.tb.start()
         time.sleep(.2) # Need this, otherwise, the next message is ignored
         hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.from_long(len(payload1))
+            pmt.intern('header_data'),
+            pmt.from_long(len(payload1))
         )
         while len(payload_sink.data()) < len(payload1):
             time.sleep(.2)
         hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.PMT_F
+            pmt.intern('header_data'),
+            pmt.PMT_F
         )
         # This next command is a bit of a showstopper, but there's no condition to check upon
         # to see if the previous msg handling is finished
         time.sleep(.7)
         hpd.to_basic_block()._post(
-                pmt.intern('header_data'),
-                pmt.from_long(len(payload2))
+            pmt.intern('header_data'),
+            pmt.from_long(len(payload2))
         )
         while len(payload_sink.data()) < len(payload1) + len(payload2):
             time.sleep(.2)
@@ -552,7 +560,7 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             indexes = []
             burst_sizes = []
             total_payload_len = 0
-            for burst_count in range(n_bursts):
+            for _ in range(n_bursts):
                 gap_size = random.randint(0, max_gap)
                 signal += [0] * gap_size
                 is_failure = random.random() < fail_rate
@@ -579,11 +587,12 @@ class qa_header_payload_demux (gr_unittest.TestCase):
                     trigger_tags += [make_tag('detect', True, index)]
             return (trigger_signal, trigger_tags)
         ### Go, go, go
+        # Uncomment this if you want true randomness -- good for actual fuzzing
         # The divide-by-20 means we'll usually get the same random seed
         # between the first run and the XML run.
-        random_seed = int(time.time() / 20)
-        random.seed(random_seed)
-        print("Random seed: {0}".format(random_seed))
+        # random_seed = int(time.time() / 20)
+        # random.seed(random_seed)
+        # print("Random seed: {0}".format(random_seed))
         n_bursts = 400
         header_len = 5
         max_gap = 50
@@ -612,9 +621,9 @@ class qa_header_payload_demux (gr_unittest.TestCase):
             special_tags=('rx_freq',),
         )
         mock_header_demod = HeaderToMessageBlock(
-                numpy.float32,
-                header_len,
-                burst_sizes
+            numpy.float32,
+            header_len,
+            burst_sizes
         )
         header_sink = blocks.vector_sink_f()
         payload_sink = blocks.vector_sink_f()
-- 
cgit v1.2.3