diff options
author | mormj <mormjb@gmail.com> | 2020-10-30 10:59:50 -0400 |
---|---|---|
committer | Marcus Müller <marcus@hostalia.de> | 2020-10-30 17:52:53 +0100 |
commit | 7a0948ba85758fba1cc3858ef99bfa600dcc7416 (patch) | |
tree | 610d7f9d773a193562def6df2d4b50f1bb3b3f86 /gr-blocks/python/blocks/qa_stream_mux.py | |
parent | 12192ee7d58de95ddca35a3e93bfc172bdb5c820 (diff) |
qa: run autopep8 formatting on qa python files
find ./ -iname qa*.py | xargs autopep8 --in-place -a -a
mostly formats whitespace and gets rid of trailing semicolons
Diffstat (limited to 'gr-blocks/python/blocks/qa_stream_mux.py')
-rw-r--r-- | gr-blocks/python/blocks/qa_stream_mux.py | 123 |
1 files changed, 66 insertions, 57 deletions
diff --git a/gr-blocks/python/blocks/qa_stream_mux.py b/gr-blocks/python/blocks/qa_stream_mux.py index 20d1376eac..4c935245db 100644 --- a/gr-blocks/python/blocks/qa_stream_mux.py +++ b/gr-blocks/python/blocks/qa_stream_mux.py @@ -13,29 +13,30 @@ from gnuradio import gr, gr_unittest, blocks import pmt import os + class test_stream_mux (gr_unittest.TestCase): - def setUp (self): + def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def help_stream_2ff(self, N, stream_sizes): - v0 = blocks.vector_source_f(N*[1,], False) - v1 = blocks.vector_source_f(N*[2,], False) + v0 = blocks.vector_source_f(N * [1, ], False) + v1 = blocks.vector_source_f(N * [2, ], False) mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) - dst = blocks.vector_sink_f () + dst = blocks.vector_sink_f() - self.tb.connect (v0, (mux,0)) - self.tb.connect (v1, (mux,1)) - self.tb.connect (mux, dst) - self.tb.run () + self.tb.connect(v0, (mux, 0)) + self.tb.connect(v1, (mux, 1)) + self.tb.connect(mux, dst) + self.tb.run() - return dst.data () + return dst.data() def help_stream_ramp_2ff(self, N, stream_sizes): r1 = list(range(N)) @@ -47,19 +48,19 @@ class test_stream_mux (gr_unittest.TestCase): mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) - dst = blocks.vector_sink_f () + dst = blocks.vector_sink_f() - self.tb.connect (v0, (mux,0)) - self.tb.connect (v1, (mux,1)) - self.tb.connect (mux, dst) - self.tb.run () + self.tb.connect(v0, (mux, 0)) + self.tb.connect(v1, (mux, 1)) + self.tb.connect(mux, dst) + self.tb.run() - return dst.data () + return dst.data() def help_stream_tag_propagation(self, N, stream_sizes): - src_data1 = stream_sizes[0]*N*[1,] - src_data2 = stream_sizes[1]*N*[2,] - src_data3 = stream_sizes[2]*N*[3,] + src_data1 = stream_sizes[0] * N * [1, ] + src_data2 = stream_sizes[1] * N * [2, ] + src_data3 = stream_sizes[2] * N * [3, ] # stream_mux scheme (3,2,4) src1 = blocks.vector_source_f(src_data1) src2 = blocks.vector_source_f(src_data2) @@ -77,14 +78,13 @@ class test_stream_mux (gr_unittest.TestCase): self.tb.connect(src1, tag_stream1) self.tb.connect(src2, tag_stream2) self.tb.connect(src3, tag_stream3) - self.tb.connect(tag_stream1, (mux,0)) - self.tb.connect(tag_stream2, (mux,1)) - self.tb.connect(tag_stream3, (mux,2)) + self.tb.connect(tag_stream1, (mux, 0)) + self.tb.connect(tag_stream2, (mux, 1)) + self.tb.connect(tag_stream3, (mux, 2)) self.tb.connect(mux, dst) self.tb.run() - return (dst.data (), dst.tags ()) - + return (dst.data(), dst.tags()) def test_stream_2NN_ff(self): N = 40 @@ -99,22 +99,22 @@ class test_stream_mux (gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_stream_ramp_2NN_ff(self): N = 40 stream_sizes = [10, 10] result_data = self.help_stream_ramp_2ff(N, stream_sizes) - exp_data = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + exp_data = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, - 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] - self.assertEqual (exp_data, result_data) + 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] + self.assertEqual(exp_data, result_data) def test_stream_2NM_ff(self): N = 40 @@ -134,8 +134,7 @@ class test_stream_mux (gr_unittest.TestCase): 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data, result_data) - + self.assertEqual(exp_data, result_data) def test_stream_2MN_ff(self): N = 37 @@ -155,7 +154,7 @@ class test_stream_mux (gr_unittest.TestCase): 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_stream_2N0_ff(self): N = 30 @@ -170,7 +169,7 @@ class test_stream_mux (gr_unittest.TestCase): 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_stream_20N_ff(self): N = 30 @@ -184,49 +183,59 @@ class test_stream_mux (gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_largeN_ff(self): stream_sizes = [3, 8191] - r1 = [1,] * stream_sizes[0] - r2 = [2,] * stream_sizes[1] + r1 = [1, ] * stream_sizes[0] + r2 = [2, ] * stream_sizes[1] v0 = blocks.vector_source_f(r1, repeat=False) v1 = blocks.vector_source_f(r2, repeat=False) mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) - dst = blocks.vector_sink_f () - self.tb.connect (v0, (mux,0)) - self.tb.connect (v1, (mux,1)) - self.tb.connect (mux, dst) - self.tb.run () - self.assertEqual (r1 + r2, dst.data()) + dst = blocks.vector_sink_f() + self.tb.connect(v0, (mux, 0)) + self.tb.connect(v1, (mux, 1)) + self.tb.connect(mux, dst) + self.tb.run() + self.assertEqual(r1 + r2, dst.data()) def test_tag_propagation(self): - N = 10 # Block length - stream_sizes = [1,2,3] + N = 10 # Block length + stream_sizes = [1, 2, 3] - expected_result = N*(stream_sizes[0]*[1,] - +stream_sizes[1]*[2,] - +stream_sizes[2]*[3,]) + expected_result = N * (stream_sizes[0] * [1, ] + + stream_sizes[1] * [2, ] + + stream_sizes[2] * [3, ]) # check the data (result, tags) = self.help_stream_tag_propagation(N, stream_sizes) self.assertFloatTuplesAlmostEqual(expected_result, result, places=6) # check the tags - expected_tag_offsets_src1 = [sum(stream_sizes)*i for i in range(N)] + expected_tag_offsets_src1 = [sum(stream_sizes) * i for i in range(N)] expected_tag_offsets_src2 = [stream_sizes[0] - +sum(stream_sizes)*i for i in range(N)] - expected_tag_offsets_src3 = [stream_sizes[0]+stream_sizes[1] - +sum(stream_sizes)*i for i in range(N)] - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + + sum(stream_sizes) * i for i in range(N)] + expected_tag_offsets_src3 = [stream_sizes[0] + stream_sizes[1] + + sum(stream_sizes) * i for i in range(N)] + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) + if __name__ == '__main__': gr_unittest.run(test_stream_mux) |