summaryrefslogtreecommitdiff
path: root/gr-blocks/python/blocks/qa_stream_demux.py
diff options
context:
space:
mode:
authormormj <mormjb@gmail.com>2020-10-30 10:59:50 -0400
committerMarcus Müller <marcus@hostalia.de>2020-10-30 17:52:53 +0100
commit7a0948ba85758fba1cc3858ef99bfa600dcc7416 (patch)
tree610d7f9d773a193562def6df2d4b50f1bb3b3f86 /gr-blocks/python/blocks/qa_stream_demux.py
parent12192ee7d58de95ddca35a3e93bfc172bdb5c820 (diff)
qa: run autopep8 formatting on qa python files
find ./ -iname qa*.py | xargs autopep8 --in-place -a -a mostly formats whitespace and gets rid of trailing semicolons
Diffstat (limited to 'gr-blocks/python/blocks/qa_stream_demux.py')
-rwxr-xr-xgr-blocks/python/blocks/qa_stream_demux.py266
1 files changed, 190 insertions, 76 deletions
diff --git a/gr-blocks/python/blocks/qa_stream_demux.py b/gr-blocks/python/blocks/qa_stream_demux.py
index da9aa46be0..629c35283c 100755
--- a/gr-blocks/python/blocks/qa_stream_demux.py
+++ b/gr-blocks/python/blocks/qa_stream_demux.py
@@ -26,17 +26,18 @@ from gnuradio import gr, gr_unittest, blocks
import pmt
import os
+
class qa_stream_demux(gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def help_stream_2ff(self, N, stream_sizes):
- v = blocks.vector_source_f(N*[1,] + N*[2,], False)
+ v = blocks.vector_source_f(N * [1, ] + N * [2, ], False)
demux = blocks.stream_demux(gr.sizeof_float, stream_sizes)
@@ -44,8 +45,8 @@ class qa_stream_demux(gr_unittest.TestCase):
dst1 = blocks.vector_sink_f()
self.tb.connect(v, demux)
- self.tb.connect((demux,0), dst0)
- self.tb.connect((demux,1), dst1)
+ self.tb.connect((demux, 0), dst0)
+ self.tb.connect((demux, 1), dst1)
self.tb.run()
return (dst0.data(), dst1.data())
@@ -61,14 +62,15 @@ class qa_stream_demux(gr_unittest.TestCase):
dst1 = blocks.vector_sink_f()
self.tb.connect(v, demux)
- self.tb.connect((demux,0), dst0)
- self.tb.connect((demux,1), dst1)
+ self.tb.connect((demux, 0), dst0)
+ self.tb.connect((demux, 1), dst1)
self.tb.run()
return (dst0.data(), dst1.data())
def help_stream_tag_propagation(self, N, stream_sizes):
- src_data = (stream_sizes[0]*[1,] + stream_sizes[1]*[2,] + stream_sizes[2]*[3,]) * N
+ src_data = (stream_sizes[0] * [1, ] + stream_sizes[1]
+ * [2, ] + stream_sizes[2] * [3, ]) * N
src = blocks.vector_source_f(src_data, False)
@@ -88,9 +90,9 @@ class qa_stream_demux(gr_unittest.TestCase):
self.tb.connect(tag_stream1, tag_stream2)
self.tb.connect(tag_stream2, tag_stream3)
self.tb.connect(tag_stream3, demux)
- self.tb.connect((demux,0), dst0)
- self.tb.connect((demux,1), dst1)
- self.tb.connect((demux,2), dst2)
+ self.tb.connect((demux, 0), dst0)
+ self.tb.connect((demux, 1), dst1)
+ self.tb.connect((demux, 2), dst2)
self.tb.run()
return (dst0, dst1, dst2)
@@ -109,25 +111,99 @@ class qa_stream_demux(gr_unittest.TestCase):
2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
- self.assertEqual (exp_data0, result_data[0])
- self.assertEqual (exp_data1, result_data[1])
+ self.assertEqual(exp_data0, result_data[0])
+ self.assertEqual(exp_data1, result_data[1])
def test_stream_ramp_2NN_ff(self):
N = 40
stream_sizes = [10, 10]
result_data = self.help_stream_ramp_2ff(N, stream_sizes)
- exp_data0 = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
- 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0,
- 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0,
- 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0]
- exp_data1 = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0,
- 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0,
- 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0,
- 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
-
- self.assertEqual (exp_data0, result_data[0])
- self.assertEqual (exp_data1, result_data[1])
+ exp_data0 = [
+ 0.0,
+ 1.0,
+ 2.0,
+ 3.0,
+ 4.0,
+ 5.0,
+ 6.0,
+ 7.0,
+ 8.0,
+ 9.0,
+ 20.0,
+ 21.0,
+ 22.0,
+ 23.0,
+ 24.0,
+ 25.0,
+ 26.0,
+ 27.0,
+ 28.0,
+ 29.0,
+ 39.0,
+ 38.0,
+ 37.0,
+ 36.0,
+ 35.0,
+ 34.0,
+ 33.0,
+ 32.0,
+ 31.0,
+ 30.0,
+ 19.0,
+ 18.0,
+ 17.0,
+ 16.0,
+ 15.0,
+ 14.0,
+ 13.0,
+ 12.0,
+ 11.0,
+ 10.0]
+ exp_data1 = [
+ 10.0,
+ 11.0,
+ 12.0,
+ 13.0,
+ 14.0,
+ 15.0,
+ 16.0,
+ 17.0,
+ 18.0,
+ 19.0,
+ 30.0,
+ 31.0,
+ 32.0,
+ 33.0,
+ 34.0,
+ 35.0,
+ 36.0,
+ 37.0,
+ 38.0,
+ 39.0,
+ 29.0,
+ 28.0,
+ 27.0,
+ 26.0,
+ 25.0,
+ 24.0,
+ 23.0,
+ 22.0,
+ 21.0,
+ 20.0,
+ 9.0,
+ 8.0,
+ 7.0,
+ 6.0,
+ 5.0,
+ 4.0,
+ 3.0,
+ 2.0,
+ 1.0,
+ 0.0]
+
+ self.assertEqual(exp_data0, result_data[0])
+ self.assertEqual(exp_data1, result_data[1])
def test_stream_2NM_ff(self):
N = 40
@@ -148,9 +224,8 @@ class qa_stream_demux(gr_unittest.TestCase):
2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
- self.assertEqual (exp_data0, result_data[0])
- self.assertEqual (exp_data1, result_data[1])
-
+ self.assertEqual(exp_data0, result_data[0])
+ self.assertEqual(exp_data1, result_data[1])
def test_stream_2MN_ff(self):
N = 37
@@ -171,8 +246,8 @@ class qa_stream_demux(gr_unittest.TestCase):
2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
2.0, 2.0, 2.0]
- self.assertEqual (exp_data0, result_data[0])
- self.assertEqual (exp_data1, result_data[1])
+ self.assertEqual(exp_data0, result_data[0])
+ self.assertEqual(exp_data1, result_data[1])
def test_stream_2N0_ff(self):
N = 30
@@ -192,8 +267,8 @@ class qa_stream_demux(gr_unittest.TestCase):
2.0, 2.0, 2.0, 2.0]
exp_data1 = []
- self.assertEqual (exp_data0, result_data[0])
- self.assertEqual (exp_data1, result_data[1])
+ self.assertEqual(exp_data0, result_data[0])
+ self.assertEqual(exp_data1, result_data[1])
def test_stream_20N_ff(self):
N = 30
@@ -211,82 +286,121 @@ class qa_stream_demux(gr_unittest.TestCase):
2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
- self.assertEqual (exp_data0, result_data[0])
- self.assertEqual (exp_data1, result_data[1])
+ self.assertEqual(exp_data0, result_data[0])
+ self.assertEqual(exp_data1, result_data[1])
def test_largeN_ff(self):
stream_sizes = [3, 8191]
- r0 = [1.0,] * stream_sizes[0]
- r1 = [2.0,] * stream_sizes[1]
+ r0 = [1.0, ] * stream_sizes[0]
+ r1 = [2.0, ] * stream_sizes[1]
v = blocks.vector_source_f(r0 + r1, repeat=False)
demux = blocks.stream_demux(gr.sizeof_float, stream_sizes)
dst0 = blocks.vector_sink_f()
dst1 = blocks.vector_sink_f()
- self.tb.connect (v, demux)
- self.tb.connect ((demux,0), dst0)
- self.tb.connect ((demux,1), dst1)
- self.tb.run ()
- self.assertEqual (r0, dst0.data())
- self.assertEqual (r1, dst1.data())
+ self.tb.connect(v, demux)
+ self.tb.connect((demux, 0), dst0)
+ self.tb.connect((demux, 1), dst1)
+ self.tb.run()
+ self.assertEqual(r0, dst0.data())
+ self.assertEqual(r1, dst1.data())
def test_tag_propagation(self):
- N = 10 # Block length
- stream_sizes = [1,2,3]
+ N = 10 # Block length
+ stream_sizes = [1, 2, 3]
- expected_result0 = N*(stream_sizes[0]*[1,])
- expected_result1 = N*(stream_sizes[1]*[2,])
- expected_result2 = N*(stream_sizes[2]*[3,])
+ expected_result0 = N * (stream_sizes[0] * [1, ])
+ expected_result1 = N * (stream_sizes[1] * [2, ])
+ expected_result2 = N * (stream_sizes[2] * [3, ])
# check the data
(result0, result1, result2) = self.help_stream_tag_propagation(N, stream_sizes)
- self.assertFloatTuplesAlmostEqual(expected_result0, result0.data(), places=6)
- self.assertFloatTuplesAlmostEqual(expected_result1, result1.data(), places=6)
- self.assertFloatTuplesAlmostEqual(expected_result2, result2.data(), places=6)
+ self.assertFloatTuplesAlmostEqual(
+ expected_result0, result0.data(), places=6)
+ self.assertFloatTuplesAlmostEqual(
+ expected_result1, result1.data(), places=6)
+ self.assertFloatTuplesAlmostEqual(
+ expected_result2, result2.data(), places=6)
# check the tags - result0
tags = result0.tags()
- expected_tag_offsets_src1 = list(range(0,stream_sizes[0]*N,stream_sizes[0]))
- expected_tag_offsets_src2 = list(range(0,stream_sizes[0]*N,stream_sizes[0]))
- expected_tag_offsets_src3 = list(range(0,stream_sizes[0]*N,stream_sizes[0]))
- tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))]
- tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))]
- tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))]
+ expected_tag_offsets_src1 = list(
+ range(0, stream_sizes[0] * N, stream_sizes[0]))
+ expected_tag_offsets_src2 = list(
+ range(0, stream_sizes[0] * N, stream_sizes[0]))
+ expected_tag_offsets_src3 = list(
+ range(0, stream_sizes[0] * N, stream_sizes[0]))
+ tags_src1 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src1'))]
+ tags_src2 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src2'))]
+ tags_src3 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src3'))]
for i in range(len(expected_tag_offsets_src1)):
- self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src1[i] == tags_src1[i].offset)
for i in range(len(expected_tag_offsets_src2)):
- self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src2[i] == tags_src2[i].offset)
for i in range(len(expected_tag_offsets_src3)):
- self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src3[i] == tags_src3[i].offset)
# check the tags - result1
tags = result1.tags()
- expected_tag_offsets_src1 = list(range(0,stream_sizes[1]*N,stream_sizes[0]))
- expected_tag_offsets_src2 = list(range(1,stream_sizes[1]*N,stream_sizes[1]))
+ expected_tag_offsets_src1 = list(
+ range(0, stream_sizes[1] * N, stream_sizes[0]))
+ expected_tag_offsets_src2 = list(
+ range(1, stream_sizes[1] * N, stream_sizes[1]))
expected_tag_offsets_src3 = list()
- tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))]
- tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))]
- tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))]
+ tags_src1 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src1'))]
+ tags_src2 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src2'))]
+ tags_src3 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src3'))]
for i in range(len(expected_tag_offsets_src1)):
- self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src1[i] == tags_src1[i].offset)
for i in range(len(expected_tag_offsets_src2)):
- self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src2[i] == tags_src2[i].offset)
for i in range(len(expected_tag_offsets_src3)):
- self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src3[i] == tags_src3[i].offset)
# check the tags - result2
tags = result2.tags()
- expected_tag_offsets_src1 = list(range(0,stream_sizes[2]*N,stream_sizes[0]))
- expected_tag_offsets_src2 = list(range(1,stream_sizes[2]*N,stream_sizes[2]))
- expected_tag_offsets_src3 = list(range(0,stream_sizes[2]*N,stream_sizes[2]))
- tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))]
- tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))]
- tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))]
+ expected_tag_offsets_src1 = list(
+ range(0, stream_sizes[2] * N, stream_sizes[0]))
+ expected_tag_offsets_src2 = list(
+ range(1, stream_sizes[2] * N, stream_sizes[2]))
+ expected_tag_offsets_src3 = list(
+ range(0, stream_sizes[2] * N, stream_sizes[2]))
+ tags_src1 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src1'))]
+ tags_src2 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src2'))]
+ tags_src3 = [
+ tag for tag in tags if pmt.eq(
+ tag.key, pmt.intern('src3'))]
for i in range(len(expected_tag_offsets_src1)):
- self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src1[i] == tags_src1[i].offset)
for i in range(len(expected_tag_offsets_src2)):
- self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src2[i] == tags_src2[i].offset)
for i in range(len(expected_tag_offsets_src3)):
- self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset)
+ self.assertTrue(
+ expected_tag_offsets_src3[i] == tags_src3[i].offset)
+
if __name__ == '__main__':
gr_unittest.run(qa_stream_demux)