diff options
author | mormj <mormjb@gmail.com> | 2020-10-30 10:59:50 -0400 |
---|---|---|
committer | Marcus Müller <marcus@hostalia.de> | 2020-10-30 17:52:53 +0100 |
commit | 7a0948ba85758fba1cc3858ef99bfa600dcc7416 (patch) | |
tree | 610d7f9d773a193562def6df2d4b50f1bb3b3f86 /gr-blocks/python/blocks/qa_stream_demux.py | |
parent | 12192ee7d58de95ddca35a3e93bfc172bdb5c820 (diff) |
qa: run autopep8 formatting on qa python files
find ./ -iname qa*.py | xargs autopep8 --in-place -a -a
mostly formats whitespace and gets rid of trailing semicolons
Diffstat (limited to 'gr-blocks/python/blocks/qa_stream_demux.py')
-rwxr-xr-x | gr-blocks/python/blocks/qa_stream_demux.py | 266 |
1 files changed, 190 insertions, 76 deletions
diff --git a/gr-blocks/python/blocks/qa_stream_demux.py b/gr-blocks/python/blocks/qa_stream_demux.py index da9aa46be0..629c35283c 100755 --- a/gr-blocks/python/blocks/qa_stream_demux.py +++ b/gr-blocks/python/blocks/qa_stream_demux.py @@ -26,17 +26,18 @@ from gnuradio import gr, gr_unittest, blocks import pmt import os + class qa_stream_demux(gr_unittest.TestCase): - def setUp (self): + def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def help_stream_2ff(self, N, stream_sizes): - v = blocks.vector_source_f(N*[1,] + N*[2,], False) + v = blocks.vector_source_f(N * [1, ] + N * [2, ], False) demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) @@ -44,8 +45,8 @@ class qa_stream_demux(gr_unittest.TestCase): dst1 = blocks.vector_sink_f() self.tb.connect(v, demux) - self.tb.connect((demux,0), dst0) - self.tb.connect((demux,1), dst1) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) self.tb.run() return (dst0.data(), dst1.data()) @@ -61,14 +62,15 @@ class qa_stream_demux(gr_unittest.TestCase): dst1 = blocks.vector_sink_f() self.tb.connect(v, demux) - self.tb.connect((demux,0), dst0) - self.tb.connect((demux,1), dst1) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) self.tb.run() return (dst0.data(), dst1.data()) def help_stream_tag_propagation(self, N, stream_sizes): - src_data = (stream_sizes[0]*[1,] + stream_sizes[1]*[2,] + stream_sizes[2]*[3,]) * N + src_data = (stream_sizes[0] * [1, ] + stream_sizes[1] + * [2, ] + stream_sizes[2] * [3, ]) * N src = blocks.vector_source_f(src_data, False) @@ -88,9 +90,9 @@ class qa_stream_demux(gr_unittest.TestCase): self.tb.connect(tag_stream1, tag_stream2) self.tb.connect(tag_stream2, tag_stream3) self.tb.connect(tag_stream3, demux) - self.tb.connect((demux,0), dst0) - self.tb.connect((demux,1), dst1) - self.tb.connect((demux,2), dst2) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) + self.tb.connect((demux, 2), dst2) self.tb.run() return (dst0, dst1, dst2) @@ -109,25 +111,99 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_ramp_2NN_ff(self): N = 40 stream_sizes = [10, 10] result_data = self.help_stream_ramp_2ff(N, stream_sizes) - exp_data0 = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, - 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, - 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0, - 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0] - exp_data1 = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, - 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, - 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0, - 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] - - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + exp_data0 = [ + 0.0, + 1.0, + 2.0, + 3.0, + 4.0, + 5.0, + 6.0, + 7.0, + 8.0, + 9.0, + 20.0, + 21.0, + 22.0, + 23.0, + 24.0, + 25.0, + 26.0, + 27.0, + 28.0, + 29.0, + 39.0, + 38.0, + 37.0, + 36.0, + 35.0, + 34.0, + 33.0, + 32.0, + 31.0, + 30.0, + 19.0, + 18.0, + 17.0, + 16.0, + 15.0, + 14.0, + 13.0, + 12.0, + 11.0, + 10.0] + exp_data1 = [ + 10.0, + 11.0, + 12.0, + 13.0, + 14.0, + 15.0, + 16.0, + 17.0, + 18.0, + 19.0, + 30.0, + 31.0, + 32.0, + 33.0, + 34.0, + 35.0, + 36.0, + 37.0, + 38.0, + 39.0, + 29.0, + 28.0, + 27.0, + 26.0, + 25.0, + 24.0, + 23.0, + 22.0, + 21.0, + 20.0, + 9.0, + 8.0, + 7.0, + 6.0, + 5.0, + 4.0, + 3.0, + 2.0, + 1.0, + 0.0] + + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_2NM_ff(self): N = 40 @@ -148,9 +224,8 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) - + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_2MN_ff(self): N = 37 @@ -171,8 +246,8 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_2N0_ff(self): N = 30 @@ -192,8 +267,8 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0] exp_data1 = [] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_20N_ff(self): N = 30 @@ -211,82 +286,121 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_largeN_ff(self): stream_sizes = [3, 8191] - r0 = [1.0,] * stream_sizes[0] - r1 = [2.0,] * stream_sizes[1] + r0 = [1.0, ] * stream_sizes[0] + r1 = [2.0, ] * stream_sizes[1] v = blocks.vector_source_f(r0 + r1, repeat=False) demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) dst0 = blocks.vector_sink_f() dst1 = blocks.vector_sink_f() - self.tb.connect (v, demux) - self.tb.connect ((demux,0), dst0) - self.tb.connect ((demux,1), dst1) - self.tb.run () - self.assertEqual (r0, dst0.data()) - self.assertEqual (r1, dst1.data()) + self.tb.connect(v, demux) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) + self.tb.run() + self.assertEqual(r0, dst0.data()) + self.assertEqual(r1, dst1.data()) def test_tag_propagation(self): - N = 10 # Block length - stream_sizes = [1,2,3] + N = 10 # Block length + stream_sizes = [1, 2, 3] - expected_result0 = N*(stream_sizes[0]*[1,]) - expected_result1 = N*(stream_sizes[1]*[2,]) - expected_result2 = N*(stream_sizes[2]*[3,]) + expected_result0 = N * (stream_sizes[0] * [1, ]) + expected_result1 = N * (stream_sizes[1] * [2, ]) + expected_result2 = N * (stream_sizes[2] * [3, ]) # check the data (result0, result1, result2) = self.help_stream_tag_propagation(N, stream_sizes) - self.assertFloatTuplesAlmostEqual(expected_result0, result0.data(), places=6) - self.assertFloatTuplesAlmostEqual(expected_result1, result1.data(), places=6) - self.assertFloatTuplesAlmostEqual(expected_result2, result2.data(), places=6) + self.assertFloatTuplesAlmostEqual( + expected_result0, result0.data(), places=6) + self.assertFloatTuplesAlmostEqual( + expected_result1, result1.data(), places=6) + self.assertFloatTuplesAlmostEqual( + expected_result2, result2.data(), places=6) # check the tags - result0 tags = result0.tags() - expected_tag_offsets_src1 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) - expected_tag_offsets_src2 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) - expected_tag_offsets_src3 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + expected_tag_offsets_src1 = list( + range(0, stream_sizes[0] * N, stream_sizes[0])) + expected_tag_offsets_src2 = list( + range(0, stream_sizes[0] * N, stream_sizes[0])) + expected_tag_offsets_src3 = list( + range(0, stream_sizes[0] * N, stream_sizes[0])) + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) # check the tags - result1 tags = result1.tags() - expected_tag_offsets_src1 = list(range(0,stream_sizes[1]*N,stream_sizes[0])) - expected_tag_offsets_src2 = list(range(1,stream_sizes[1]*N,stream_sizes[1])) + expected_tag_offsets_src1 = list( + range(0, stream_sizes[1] * N, stream_sizes[0])) + expected_tag_offsets_src2 = list( + range(1, stream_sizes[1] * N, stream_sizes[1])) expected_tag_offsets_src3 = list() - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) # check the tags - result2 tags = result2.tags() - expected_tag_offsets_src1 = list(range(0,stream_sizes[2]*N,stream_sizes[0])) - expected_tag_offsets_src2 = list(range(1,stream_sizes[2]*N,stream_sizes[2])) - expected_tag_offsets_src3 = list(range(0,stream_sizes[2]*N,stream_sizes[2])) - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + expected_tag_offsets_src1 = list( + range(0, stream_sizes[2] * N, stream_sizes[0])) + expected_tag_offsets_src2 = list( + range(1, stream_sizes[2] * N, stream_sizes[2])) + expected_tag_offsets_src3 = list( + range(0, stream_sizes[2] * N, stream_sizes[2])) + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) + if __name__ == '__main__': gr_unittest.run(qa_stream_demux) |