diff options
Diffstat (limited to 'gr-blocks/python')
77 files changed, 1824 insertions, 1326 deletions
diff --git a/gr-blocks/python/blocks/qa_add_mult_div_sub.py b/gr-blocks/python/blocks/qa_add_mult_div_sub.py index 0e516da0c8..ffcf2f4a52 100644 --- a/gr-blocks/python/blocks/qa_add_mult_div_sub.py +++ b/gr-blocks/python/blocks/qa_add_mult_div_sub.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_add_mult_div_sub(gr_unittest.TestCase): def setUp(self): @@ -62,30 +63,30 @@ class test_add_mult_div_sub(gr_unittest.TestCase): # add_XX def test_add_ss(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [9, -1, 7, 12, 7] op = blocks.add_ss() self.help_ss((src1_data, src2_data), expected_result, op) def test_add_ii(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [9, -1, 7, 12, 7] op = blocks.add_ii() self.help_ii((src1_data, src2_data), expected_result, op) def test_add_ff(self): - src1_data = [1.0, 2.0, 3.0, 4.0, 5.0] + src1_data = [1.0, 2.0, 3.0, 4.0, 5.0] src2_data = [8.0, -3.0, 4.0, 8.0, 2.0] expected_result = [9.0, -1.0, 7.0, 12.0, 7.0] op = blocks.add_ff() self.help_ff((src1_data, src2_data), expected_result, op) def test_add_cc(self): - src1_data = [1+1j, 2+2j, 3+3j, 4+4j, 5+5j] - src2_data = [8+8j, -3-3j, 4+4j, 8+8j, 2+2j] - expected_result = [9+9j, -1-1j, 7+7j, 12+12j, 7+7j] + src1_data = [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j, 5 + 5j] + src2_data = [8 + 8j, -3 - 3j, 4 + 4j, 8 + 8j, 2 + 2j] + expected_result = [9 + 9j, -1 - 1j, 7 + 7j, 12 + 12j, 7 + 7j] op = blocks.add_cc() self.help_cc((src1_data, src2_data), expected_result, op) @@ -111,43 +112,43 @@ class test_add_mult_div_sub(gr_unittest.TestCase): def test_add_const_cc(self): src_data = [1, 2, 3, 4, 5] - expected_result = [1+5j, 2+5j, 3+5j, 4+5j, 5+5j] + expected_result = [1 + 5j, 2 + 5j, 3 + 5j, 4 + 5j, 5 + 5j] op = blocks.add_const_cc(5j) self.help_cc((src_data,), expected_result, op) # multiply_XX def test_multiply_ss(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [8, -6, 12, 32, 10] op = blocks.multiply_ss() self.help_ss((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_multiply_ii(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [8, -6, 12, 32, 10] op = blocks.multiply_ii() self.help_ii((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_multiply_ff(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [8, -6, 12, 32, 10] op = blocks.multiply_ff() self.help_ff((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_multiply_cc(self): - src1_data = [1+1j, 2+2j, 3+3j, 4+4j, 5+5j] + src1_data = [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j, 5 + 5j] src2_data = [8, -3, 4, 8, 2] - expected_result = [8+8j, -6-6j, 12+12j, 32+32j, 10+10j] + expected_result = [8 + 8j, -6 - 6j, 12 + 12j, 32 + 32j, 10 + 10j] op = blocks.multiply_cc() self.help_cc((src1_data, src2_data), - expected_result, op) + expected_result, op) # multiply_const_XX @@ -170,24 +171,24 @@ class test_add_mult_div_sub(gr_unittest.TestCase): self.help_ff((src_data,), expected_result, op) def test_multiply_const_cc(self): - src_data = [-1-1j, 0+0j, 1+1j, 2+2j, 3+3j] - expected_result = [-5-5j, 0+0j, 5+5j, 10+10j, 15+15j] + src_data = [-1 - 1j, 0 + 0j, 1 + 1j, 2 + 2j, 3 + 3j] + expected_result = [-5 - 5j, 0 + 0j, 5 + 5j, 10 + 10j, 15 + 15j] op = blocks.multiply_const_cc(5) self.help_cc((src_data,), expected_result, op) def test_multiply_const_cc2(self): - src_data = [-1-1j, 0+0j, 1+1j, 2+2j, 3+3j] - expected_result = [-3-7j, 0+0j, 3+7j, 6+14j, 9+21j] - op = blocks.multiply_const_cc(5+2j) + src_data = [-1 - 1j, 0 + 0j, 1 + 1j, 2 + 2j, 3 + 3j] + expected_result = [-3 - 7j, 0 + 0j, 3 + 7j, 6 + 14j, 9 + 21j] + op = blocks.multiply_const_cc(5 + 2j) self.help_cc((src_data,), expected_result, op) def test_sub_ii(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [-7, 5, -1, -4, 3] op = blocks.sub_ii() self.help_ii((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_sub_ii1(self): src1_data = [1, 2, 3, 4, 5] @@ -202,12 +203,12 @@ class test_add_mult_div_sub(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def test_sub_ss(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [-7, 5, -1, -4, 3] op = blocks.sub_ss() self.help_ss((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_sub_ss1(self): src1_data = [1, 2, 3, 4, 5] @@ -222,12 +223,12 @@ class test_add_mult_div_sub(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def test_sub_ff(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [-7, 5, -1, -4, 3] op = blocks.sub_ff() self.help_ff((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_sub_ff1(self): src1_data = [1, 2, 3, 4, 5] @@ -242,12 +243,12 @@ class test_add_mult_div_sub(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def test_sub_cc(self): - src1_data = [1, 2, 3, 4, 5] + src1_data = [1, 2, 3, 4, 5] src2_data = [8, -3, 4, 8, 2] expected_result = [-7, 5, -1, -4, 3] op = blocks.sub_cc() self.help_cc((src1_data, src2_data), - expected_result, op) + expected_result, op) def test_sub_cc1(self): src1_data = [1, 2, 3, 4, 5] @@ -262,11 +263,12 @@ class test_add_mult_div_sub(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def test_div_ff(self): - src1_data = [ 5, 9, -15, 1024] - src2_data = [10, 3, -5, 64] - expected_result = [0.5, 3, 3, 16] + src1_data = [5, 9, -15, 1024] + src2_data = [10, 3, -5, 64] + expected_result = [0.5, 3, 3, 16] op = blocks.divide_ff() self.help_ff((src1_data, src2_data), expected_result, op) + if __name__ == '__main__': gr_unittest.run(test_add_mult_div_sub) diff --git a/gr-blocks/python/blocks/qa_add_mult_v.py b/gr-blocks/python/blocks/qa_add_mult_v.py index dd872ee8ce..39f11bc4be 100644 --- a/gr-blocks/python/blocks/qa_add_mult_v.py +++ b/gr-blocks/python/blocks/qa_add_mult_v.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_add_mult_v(gr_unittest.TestCase): def setUp(self): @@ -20,7 +21,7 @@ class test_add_mult_v(gr_unittest.TestCase): self.tb = None def help_ss(self, size, src_data, exp_data, op): - for s in zip(list(range(len (src_data))), src_data): + for s in zip(list(range(len(src_data))), src_data): src = blocks.vector_source_s(s[1]) srcv = blocks.stream_to_vector(gr.sizeof_short, size) self.tb.connect(src, srcv) @@ -34,7 +35,7 @@ class test_add_mult_v(gr_unittest.TestCase): self.assertEqual(exp_data, result_data) def help_ii(self, size, src_data, exp_data, op): - for s in zip(list(range(len (src_data))), src_data): + for s in zip(list(range(len(src_data))), src_data): src = blocks.vector_source_i(s[1]) srcv = blocks.stream_to_vector(gr.sizeof_int, size) self.tb.connect(src, srcv) @@ -48,7 +49,7 @@ class test_add_mult_v(gr_unittest.TestCase): self.assertEqual(exp_data, result_data) def help_ff(self, size, src_data, exp_data, op): - for s in zip(list(range(len (src_data))), src_data): + for s in zip(list(range(len(src_data))), src_data): src = blocks.vector_source_f(s[1]) srcv = blocks.stream_to_vector(gr.sizeof_float, size) self.tb.connect(src, srcv) @@ -62,7 +63,7 @@ class test_add_mult_v(gr_unittest.TestCase): self.assertEqual(exp_data, result_data) def help_cc(self, size, src_data, exp_data, op): - for s in zip(list(range(len (src_data))), src_data): + for s in zip(list(range(len(src_data))), src_data): src = blocks.vector_source_c(s[1]) srcv = blocks.stream_to_vector(gr.sizeof_gr_complex, size) self.tb.connect(src, srcv) @@ -118,10 +119,10 @@ class test_add_mult_v(gr_unittest.TestCase): # add_vXX def test_add_vss_one(self): - src1_data = [1,] - src2_data = [2,] - src3_data = [3,] - expected_result = [6,] + src1_data = [1, ] + src2_data = [2, ] + src3_data = [3, ] + expected_result = [6, ] op = blocks.add_ss(1) self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op) @@ -134,10 +135,10 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op) def test_add_vii_one(self): - src1_data = [1,] - src2_data = [2,] - src3_data = [3,] - expected_result = [6,] + src1_data = [1, ] + src2_data = [2, ] + src3_data = [3, ] + expected_result = [6, ] op = blocks.add_ii(1) self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op) @@ -150,10 +151,10 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op) def test_add_vff_one(self): - src1_data = [1.0,] - src2_data = [2.0,] - src3_data = [3.0,] - expected_result = [6.0,] + src1_data = [1.0, ] + src2_data = [2.0, ] + src3_data = [3.0, ] + expected_result = [6.0, ] op = blocks.add_ff(1) self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op) @@ -166,27 +167,47 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op) def test_add_vcc_one(self): - src1_data = [1.0+2.0j,] - src2_data = [3.0+4.0j,] - src3_data = [5.0+6.0j,] - expected_result = [9.0+12j,] + src1_data = [1.0 + 2.0j, ] + src2_data = [3.0 + 4.0j, ] + src3_data = [5.0 + 6.0j, ] + expected_result = [9.0 + 12j, ] op = blocks.add_cc(1) self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op) def test_add_vcc_five(self): - src1_data = [1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j] - src2_data = [11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j] - src3_data = [21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j] - expected_result = [33.0+36.0j, 39.0+42.0j, 45.0+48.0j, 51.0+54.0j, 57.0+60.0j] + src1_data = [ + 1.0 + 2.0j, + 3.0 + 4.0j, + 5.0 + 6.0j, + 7.0 + 8.0j, + 9.0 + 10.0j] + src2_data = [ + 11.0 + 12.0j, + 13.0 + 14.0j, + 15.0 + 16.0j, + 17.0 + 18.0j, + 19.0 + 20.0j] + src3_data = [ + 21.0 + 22.0j, + 23.0 + 24.0j, + 25.0 + 26.0j, + 27.0 + 28.0j, + 29.0 + 30.0j] + expected_result = [ + 33.0 + 36.0j, + 39.0 + 42.0j, + 45.0 + 48.0j, + 51.0 + 54.0j, + 57.0 + 60.0j] op = blocks.add_cc(5) self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op) # add_const_vXX def test_add_const_vss_one(self): - src_data = [1,] + src_data = [1, ] op = blocks.add_const_vss((2,)) - exp_data = [3,] + exp_data = [3, ] self.help_const_ss(src_data, exp_data, op) def test_add_const_vss_five(self): @@ -196,9 +217,9 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_const_ss(src_data, exp_data, op) def test_add_const_vii_one(self): - src_data = [1,] + src_data = [1, ] op = blocks.add_const_vii((2,)) - exp_data = [3,] + exp_data = [3, ] self.help_const_ii(src_data, exp_data, op) def test_add_const_vii_five(self): @@ -208,9 +229,9 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_const_ii(src_data, exp_data, op) def test_add_const_vff_one(self): - src_data = [1.0,] + src_data = [1.0, ] op = blocks.add_const_vff((2.0,)) - exp_data = [3.0,] + exp_data = [3.0, ] self.help_const_ff(src_data, exp_data, op) def test_add_const_vff_five(self): @@ -220,24 +241,39 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_const_ff(src_data, exp_data, op) def test_add_const_vcc_one(self): - src_data = [1.0+2.0j,] - op = blocks.add_const_vcc((2.0+3.0j,)) - exp_data = [3.0+5.0j,] + src_data = [1.0 + 2.0j, ] + op = blocks.add_const_vcc((2.0 + 3.0j,)) + exp_data = [3.0 + 5.0j, ] self.help_const_cc(src_data, exp_data, op) def test_add_const_vcc_five(self): - src_data = [1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j] - op = blocks.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)) - exp_data = [12.0+14.0j, 16.0+18.0j, 20.0+22.0j, 24.0+26.0j, 28.0+30.0j] + src_data = [ + 1.0 + 2.0j, + 3.0 + 4.0j, + 5.0 + 6.0j, + 7.0 + 8.0j, + 9.0 + 10.0j] + op = blocks.add_const_vcc( + (11.0 + 12.0j, + 13.0 + 14.0j, + 15.0 + 16.0j, + 17.0 + 18.0j, + 19.0 + 20.0j)) + exp_data = [ + 12.0 + 14.0j, + 16.0 + 18.0j, + 20.0 + 22.0j, + 24.0 + 26.0j, + 28.0 + 30.0j] self.help_const_cc(src_data, exp_data, op) # multiply_vXX def test_multiply_vss_one(self): - src1_data = [1,] - src2_data = [2,] - src3_data = [3,] - expected_result = [6,] + src1_data = [1, ] + src2_data = [2, ] + src3_data = [3, ] + expected_result = [6, ] op = blocks.multiply_ss(1) self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op) @@ -250,10 +286,10 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op) def test_multiply_vii_one(self): - src1_data = [1,] - src2_data = [2,] - src3_data = [3,] - expected_result = [6,] + src1_data = [1, ] + src2_data = [2, ] + src3_data = [3, ] + expected_result = [6, ] op = blocks.multiply_ii(1) self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op) @@ -266,10 +302,10 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op) def test_multiply_vff_one(self): - src1_data = [1.0,] - src2_data = [2.0,] - src3_data = [3.0,] - expected_result = [6.0,] + src1_data = [1.0, ] + src2_data = [2.0, ] + src3_data = [3.0, ] + expected_result = [6.0, ] op = blocks.multiply_ff(1) self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op) @@ -282,27 +318,43 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op) def test_multiply_vcc_one(self): - src1_data = [1.0+2.0j,] - src2_data = [3.0+4.0j,] - src3_data = [5.0+6.0j,] - expected_result = [-85+20j,] + src1_data = [1.0 + 2.0j, ] + src2_data = [3.0 + 4.0j, ] + src3_data = [5.0 + 6.0j, ] + expected_result = [-85 + 20j, ] op = blocks.multiply_cc(1) self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op) def test_multiply_vcc_five(self): - src1_data = [1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j] - src2_data = [11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j] - src3_data = [21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j] - expected_result = [-1021.0+428.0j, -2647.0+1754.0j, -4945.0+3704.0j, -8011.0+6374.0j, -11941.0+9860.0j] + src1_data = [ + 1.0 + 2.0j, + 3.0 + 4.0j, + 5.0 + 6.0j, + 7.0 + 8.0j, + 9.0 + 10.0j] + src2_data = [ + 11.0 + 12.0j, + 13.0 + 14.0j, + 15.0 + 16.0j, + 17.0 + 18.0j, + 19.0 + 20.0j] + src3_data = [ + 21.0 + 22.0j, + 23.0 + 24.0j, + 25.0 + 26.0j, + 27.0 + 28.0j, + 29.0 + 30.0j] + expected_result = [-1021.0 + 428.0j, -2647.0 + 1754.0j, - \ + 4945.0 + 3704.0j, -8011.0 + 6374.0j, -11941.0 + 9860.0j] op = blocks.multiply_cc(5) self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op) # multiply_const_vXX def test_multiply_const_vss_one(self): - src_data = [2,] + src_data = [2, ] op = blocks.multiply_const_vss((3,)) - exp_data = [6,] + exp_data = [6, ] self.help_const_ss(src_data, exp_data, op) def test_multiply_const_vss_five(self): @@ -312,9 +364,9 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_const_ss(src_data, exp_data, op) def test_multiply_const_vii_one(self): - src_data = [2,] + src_data = [2, ] op = blocks.multiply_const_vii((3,)) - exp_data = [6,] + exp_data = [6, ] self.help_const_ii(src_data, exp_data, op) def test_multiply_const_vii_five(self): @@ -324,9 +376,9 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_const_ii(src_data, exp_data, op) def test_multiply_const_vff_one(self): - src_data = [2.0,] + src_data = [2.0, ] op = blocks.multiply_const_vff((3.0,)) - exp_data = [6.0,] + exp_data = [6.0, ] self.help_const_ff(src_data, exp_data, op) def test_multiply_const_vff_five(self): @@ -336,15 +388,22 @@ class test_add_mult_v(gr_unittest.TestCase): self.help_const_ff(src_data, exp_data, op) def test_multiply_const_vcc_one(self): - src_data = [1.0+2.0j,] - op = blocks.multiply_const_vcc((2.0+3.0j,)) - exp_data = [-4.0+7.0j,] + src_data = [1.0 + 2.0j, ] + op = blocks.multiply_const_vcc((2.0 + 3.0j,)) + exp_data = [-4.0 + 7.0j, ] self.help_const_cc(src_data, exp_data, op) def test_multiply_const_vcc_five(self): - src_data = [1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j] - op = blocks.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)) - exp_data = [-13.0+34.0j, -17.0+94.0j, -21.0+170.0j, -25.0+262.0j, -29.0+370.0j] + src_data = [ + 1.0 + 2.0j, + 3.0 + 4.0j, + 5.0 + 6.0j, + 7.0 + 8.0j, + 9.0 + 10.0j] + op = blocks.multiply_const_vcc( + (11.0 + 12.0j, 13.0 + 14.0j, 15.0 + 16.0j, 17.0 + 18.0j, 19.0 + 20.0j)) + exp_data = [-13.0 + 34.0j, -17.0 + 94.0j, - + 21.0 + 170.0j, -25.0 + 262.0j, -29.0 + 370.0j] self.help_const_cc(src_data, exp_data, op) diff --git a/gr-blocks/python/blocks/qa_affinity.py b/gr-blocks/python/blocks/qa_affinity.py index 35eb6c0ed8..8399df53e9 100644 --- a/gr-blocks/python/blocks/qa_affinity.py +++ b/gr-blocks/python/blocks/qa_affinity.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_affinity(gr_unittest.TestCase): def setUp(self): @@ -26,13 +27,14 @@ class test_affinity(gr_unittest.TestCase): src = blocks.vector_source_f(src_data) snk = blocks.vector_sink_f() - src.set_processor_affinity([0,]) + src.set_processor_affinity([0, ]) self.tb.connect(src, snk) self.tb.run() a = src.processor_affinity() - self.assertEqual([0,], a) + self.assertEqual([0, ], a) + if __name__ == '__main__': gr_unittest.run(test_affinity) diff --git a/gr-blocks/python/blocks/qa_argmax.py b/gr-blocks/python/blocks/qa_argmax.py index 3aac837c62..70797b330b 100644 --- a/gr-blocks/python/blocks/qa_argmax.py +++ b/gr-blocks/python/blocks/qa_argmax.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_arg_max(gr_unittest.TestCase): def setUp(self): @@ -24,9 +25,9 @@ class test_arg_max(gr_unittest.TestCase): def test_001(self): tb = self.tb - src1_data = [0,0.2,-0.3,0,12,0] - src2_data = [0,0.0,3.0,0,10,0] - src3_data = [0,0.0,3.0,0,1,0] + src1_data = [0, 0.2, -0.3, 0, 12, 0] + src2_data = [0, 0.0, 3.0, 0, 10, 0] + src3_data = [0, 0.0, 3.0, 0, 1, 0] src1 = blocks.vector_source_f(src1_data) s2v1 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data)) @@ -48,15 +49,15 @@ class test_arg_max(gr_unittest.TestCase): tb.connect(s2v2, (argmax, 1)) tb.connect(s2v3, (argmax, 2)) - tb.connect((argmax,0), dst1) - tb.connect((argmax,1), dst2) + tb.connect((argmax, 0), dst1) + tb.connect((argmax, 1), dst2) tb.run() index = dst1.data() source = dst2.data() - self.assertEqual(index, [4,]) - self.assertEqual(source, [0,]) + self.assertEqual(index, [4, ]) + self.assertEqual(source, [0, ]) + if __name__ == '__main__': gr_unittest.run(test_arg_max) - diff --git a/gr-blocks/python/blocks/qa_block_behavior.py b/gr-blocks/python/blocks/qa_block_behavior.py index a4bec822b0..b1c7f5a0c4 100644 --- a/gr-blocks/python/blocks/qa_block_behavior.py +++ b/gr-blocks/python/blocks/qa_block_behavior.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_block_behavior(gr_unittest.TestCase): def setUp(self): @@ -30,7 +31,7 @@ class test_block_behavior(gr_unittest.TestCase): ''' src = blocks.null_source(gr.sizeof_float) - op = blocks.head(gr.sizeof_float, 100) + op = blocks.head(gr.sizeof_float, 100) snk = blocks.null_sink(gr.sizeof_float) maxn_pre = op.max_noutput_items() @@ -49,7 +50,7 @@ class test_block_behavior(gr_unittest.TestCase): ''' src = blocks.null_source(gr.sizeof_float) - op = blocks.head(gr.sizeof_float, 100) + op = blocks.head(gr.sizeof_float, 100) snk = blocks.null_sink(gr.sizeof_float) op.set_max_noutput_items(1024) @@ -64,5 +65,6 @@ class test_block_behavior(gr_unittest.TestCase): self.assertEqual(maxn_pre, 1024) self.assertEqual(maxn_post, 1024) + if __name__ == '__main__': gr_unittest.run(test_block_behavior) diff --git a/gr-blocks/python/blocks/qa_block_gateway.py b/gr-blocks/python/blocks/qa_block_gateway.py index f78ea27076..2f8071dfdd 100644 --- a/gr-blocks/python/blocks/qa_block_gateway.py +++ b/gr-blocks/python/blocks/qa_block_gateway.py @@ -18,81 +18,89 @@ from gnuradio import gr, gr_unittest, blocks class non_sync_block(gr.basic_block): def __init__(self): gr.basic_block.__init__(self, - name="non_sync_block", - in_sig=[numpy.float32], - out_sig=[numpy.float32, numpy.float32]) + name="non_sync_block", + in_sig=[numpy.float32], + out_sig=[numpy.float32, numpy.float32]) + def general_work(self, input_items, output_items): self.consume(0, len(input_items[0])) - self.produce(0,2) - self.produce(1,1) + self.produce(0, 2) + self.produce(1, 1) return gr.WORK_CALLED_PRODUCE + class add_2_f32_1_f32(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "add 2 f32", - in_sig = [numpy.float32, numpy.float32], - out_sig = [numpy.float32], + name="add 2 f32", + in_sig=[numpy.float32, numpy.float32], + out_sig=[numpy.float32], ) def work(self, input_items, output_items): output_items[0][:] = input_items[0] + input_items[1] return len(output_items[0]) + class add_2_fc32_1_fc32(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "add 2 fc32", - in_sig = [numpy.complex64, numpy.complex64], - out_sig = [numpy.complex64], + name="add 2 fc32", + in_sig=[numpy.complex64, numpy.complex64], + out_sig=[numpy.complex64], ) def work(self, input_items, output_items): output_items[0][:] = input_items[0] + input_items[1] return len(output_items[0]) + class convolve(gr.sync_block): """ A demonstration using block history to properly perform a convolution. """ + def __init__(self): gr.sync_block.__init__( self, - name = "convolve", - in_sig = [numpy.float32], - out_sig = [numpy.float32] + name="convolve", + in_sig=[numpy.float32], + out_sig=[numpy.float32] ) self._taps = [1, 0, 0, 0] self.set_history(len(self._taps)) def work(self, input_items, output_items): - output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid') + output_items[0][:] = numpy.convolve( + input_items[0], self._taps, mode='valid') return len(output_items[0]) + class decim2x(gr.decim_block): def __init__(self): gr.decim_block.__init__( self, - name = "decim2x", - in_sig = [numpy.float32], - out_sig = [numpy.float32], - decim = 2 + name="decim2x", + in_sig=[numpy.float32], + out_sig=[numpy.float32], + decim=2 ) def work(self, input_items, output_items): output_items[0][:] = input_items[0][::2] return len(output_items[0]) + class interp2x(gr.interp_block): def __init__(self): gr.interp_block.__init__( self, - name = "interp2x", - in_sig = [numpy.float32], - out_sig = [numpy.float32], - interp = 2 + name="interp2x", + in_sig=[numpy.float32], + out_sig=[numpy.float32], + interp=2 ) def work(self, input_items, output_items): @@ -100,21 +108,22 @@ class interp2x(gr.interp_block): output_items[0][::2] = input_items[0] return len(output_items[0]) + class tag_source(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "tag source", - in_sig = None, - out_sig = [numpy.float32], + name="tag source", + in_sig=None, + out_sig=[numpy.float32], ) def work(self, input_items, output_items): num_output_items = len(output_items[0]) - #put code here to fill the output items... + # put code here to fill the output items... - #make a new tag on the middle element every time work is called + # make a new tag on the middle element every time work is called count = self.nitems_written(0) + num_output_items // 2 key = pmt.string_to_symbol("example_key") value = pmt.string_to_symbol("example_value") @@ -122,37 +131,39 @@ class tag_source(gr.sync_block): return num_output_items + class tag_sink(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "tag sink", - in_sig = [numpy.float32], - out_sig = None, + name="tag sink", + in_sig=[numpy.float32], + out_sig=None, ) self.key = None def work(self, input_items, output_items): num_input_items = len(input_items[0]) - #put code here to process the input items... + # put code here to process the input items... - #print all the tags received in this work call + # print all the tags received in this work call nread = self.nitems_read(0) - tags = self.get_tags_in_range(0, nread, nread+num_input_items) + tags = self.get_tags_in_range(0, nread, nread + num_input_items) for tag in tags: - #print tag.offset - #print pmt.symbol_to_string(tag.key) - #print pmt.symbol_to_string(tag.value) + # print tag.offset + # print pmt.symbol_to_string(tag.key) + # print pmt.symbol_to_string(tag.value) self.key = pmt.symbol_to_string(tag.key) return num_input_items + class tag_sink_win(gr.sync_block): def __init__(self): - gr.sync_block.__init__(self, name = "tag sink", - in_sig = [numpy.float32], - out_sig = None) + gr.sync_block.__init__(self, name="tag sink", + in_sig=[numpy.float32], + out_sig=None) self.key = None def work(self, input_items, output_items): @@ -162,28 +173,30 @@ class tag_sink_win(gr.sync_block): self.key = pmt.symbol_to_string(tag.key) return num_input_items + class fc32_to_f32_2(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "fc32_to_f32_2", - in_sig = [numpy.complex64], - out_sig = [(numpy.float32, 2)], + name="fc32_to_f32_2", + in_sig=[numpy.complex64], + out_sig=[(numpy.float32, 2)], ) def work(self, input_items, output_items): - output_items[0][::,0] = numpy.real(input_items[0]) - output_items[0][::,1] = numpy.imag(input_items[0]) + output_items[0][::, 0] = numpy.real(input_items[0]) + output_items[0][::, 1] = numpy.imag(input_items[0]) return len(output_items[0]) + class vector_to_stream(gr.interp_block): def __init__(self, itemsize, nitems_per_block): gr.interp_block.__init__( self, - name = "vector_to_stream", - in_sig = [(itemsize, nitems_per_block)], - out_sig = [itemsize], - interp = nitems_per_block + name="vector_to_stream", + in_sig=[(itemsize, nitems_per_block)], + out_sig=[itemsize], + interp=nitems_per_block ) self.block_size = nitems_per_block @@ -196,6 +209,7 @@ class vector_to_stream(gr.interp_block): return len(output_items[0]) + class test_block_gateway(gr_unittest.TestCase): def test_add_f32(self): @@ -253,7 +267,8 @@ class test_block_gateway(gr_unittest.TestCase): def test_tags(self): src = tag_source() sink = tag_sink() - head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through + # should be enough items to get a tag through + head = blocks.head(gr.sizeof_float, 50000) tb = gr.top_block() tb.connect(src, head, sink) tb.run() @@ -262,7 +277,8 @@ class test_block_gateway(gr_unittest.TestCase): def test_tags_win(self): src = tag_source() sink = tag_sink_win() - head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through + # should be enough items to get a tag through + head = blocks.head(gr.sizeof_float, 50000) tb = gr.top_block() tb.connect(src, head, sink) tb.run() @@ -270,7 +286,8 @@ class test_block_gateway(gr_unittest.TestCase): def test_fc32_to_f32_2(self): tb = gr.top_block() - src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False) + src = blocks.vector_source_c( + [1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j], False) convert = fc32_to_f32_2() v2s = vector_to_stream(numpy.float32, 2) sink = blocks.vector_sink_f() @@ -279,15 +296,15 @@ class test_block_gateway(gr_unittest.TestCase): self.assertEqual(sink.data(), [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) def test_non_sync_block(self): - tb = gr.top_block () + tb = gr.top_block() src = blocks.vector_source_f(range(1000000)) sinks = [blocks.vector_sink_f(), blocks.vector_sink_f()] dut = non_sync_block() tb.connect(src, dut) - tb.connect((dut,0), sinks[0]) - tb.connect((dut,1), sinks[1]) - tb.run () - self.assertEqual(len(sinks[0].data()), 2*len(sinks[1].data())) + tb.connect((dut, 0), sinks[0]) + tb.connect((dut, 1), sinks[1]) + tb.run() + self.assertEqual(len(sinks[0].data()), 2 * len(sinks[1].data())) if __name__ == '__main__': diff --git a/gr-blocks/python/blocks/qa_boolean_operators.py b/gr-blocks/python/blocks/qa_boolean_operators.py index e5c199d283..384924d398 100644 --- a/gr-blocks/python/blocks/qa_boolean_operators.py +++ b/gr-blocks/python/blocks/qa_boolean_operators.py @@ -11,172 +11,169 @@ from gnuradio import gr, gr_unittest, blocks + class test_boolean_operators (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def help_ss (self, src_data, exp_data, op): - for s in zip (list(range(len (src_data))), src_data): - src = blocks.vector_source_s (s[1]) - self.tb.connect (src, (op, s[0])) - dst = blocks.vector_sink_s () - self.tb.connect (op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (exp_data, result_data) - - def help_bb (self, src_data, exp_data, op): - for s in zip (list(range(len (src_data))), src_data): - src = blocks.vector_source_b (s[1]) - self.tb.connect (src, (op, s[0])) - dst = blocks.vector_sink_b () - self.tb.connect (op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (exp_data, result_data) - - def help_ii (self, src_data, exp_data, op): - for s in zip (list(range(len (src_data))), src_data): - src = blocks.vector_source_i (s[1]) - self.tb.connect (src, (op, s[0])) - dst = blocks.vector_sink_i () - self.tb.connect (op, dst) - self.tb.run () - result_data = dst.data () - self.assertEqual (exp_data, result_data) - - def test_xor_ss (self): - src1_data = [1, 2, 3, 0x5004, 0x1150] - src2_data = [8, 2, 1 , 0x0508, 0x1105] - expected_result = [9, 0, 2, 0x550C, 0x0055] - op = blocks.xor_ss () - self.help_ss ((src1_data, src2_data), - expected_result, op) - - def test_xor_bb (self): - src1_data = [1, 2, 3, 4, 0x50] - src2_data = [8, 2, 1 , 8, 0x05] - expected_result = [9, 0, 2, 0xC, 0x55] - op = blocks.xor_bb () - self.help_bb ((src1_data, src2_data), - expected_result, op) - - - def test_xor_ii (self): - src1_data = [1, 2, 3, 0x5000004, 0x11000050] - src2_data = [8, 2, 1 , 0x0500008, 0x11000005] - expected_result = [9, 0, 2, 0x550000C, 0x00000055] - op = blocks.xor_ii () - self.help_ii ((src1_data, src2_data), - expected_result, op) - - - def test_and_ss (self): - src1_data = [1, 2, 3, 0x5004, 0x1150] - src2_data = [8, 2, 1 , 0x0508, 0x1105] - expected_result = [0, 2, 1, 0x0000, 0x1100] - op = blocks.and_ss () - self.help_ss ((src1_data, src2_data), - expected_result, op) - - def test_and_bb (self): - src1_data = [1, 2, 2, 3, 0x04, 0x50] - src2_data = [8, 2, 2, 1, 0x08, 0x05] - src3_data = [8, 2, 1, 1, 0x08, 0x05] - expected_result = [0, 2, 0, 1, 0x00, 0x00] - op = blocks.and_bb () - self.help_bb ((src1_data, src2_data, src3_data), - expected_result, op) - - def test_and_ii (self): - src1_data = [1, 2, 3, 0x50005004, 0x11001150] - src2_data = [8, 2, 1 , 0x05000508, 0x11001105] - expected_result = [0, 2, 1, 0x00000000, 0x11001100] - op = blocks.and_ii () - self.help_ii ((src1_data, src2_data), - expected_result, op) - - def test_and_const_ss (self): - src_data = [1, 2, 3, 0x5004, 0x1150] - expected_result = [0, 2, 2, 0x5000, 0x1100] + def help_ss(self, src_data, exp_data, op): + for s in zip(list(range(len(src_data))), src_data): + src = blocks.vector_source_s(s[1]) + self.tb.connect(src, (op, s[0])) + dst = blocks.vector_sink_s() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_bb(self, src_data, exp_data, op): + for s in zip(list(range(len(src_data))), src_data): + src = blocks.vector_source_b(s[1]) + self.tb.connect(src, (op, s[0])) + dst = blocks.vector_sink_b() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ii(self, src_data, exp_data, op): + for s in zip(list(range(len(src_data))), src_data): + src = blocks.vector_source_i(s[1]) + self.tb.connect(src, (op, s[0])) + dst = blocks.vector_sink_i() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def test_xor_ss(self): + src1_data = [1, 2, 3, 0x5004, 0x1150] + src2_data = [8, 2, 1, 0x0508, 0x1105] + expected_result = [9, 0, 2, 0x550C, 0x0055] + op = blocks.xor_ss() + self.help_ss((src1_data, src2_data), + expected_result, op) + + def test_xor_bb(self): + src1_data = [1, 2, 3, 4, 0x50] + src2_data = [8, 2, 1, 8, 0x05] + expected_result = [9, 0, 2, 0xC, 0x55] + op = blocks.xor_bb() + self.help_bb((src1_data, src2_data), + expected_result, op) + + def test_xor_ii(self): + src1_data = [1, 2, 3, 0x5000004, 0x11000050] + src2_data = [8, 2, 1, 0x0500008, 0x11000005] + expected_result = [9, 0, 2, 0x550000C, 0x00000055] + op = blocks.xor_ii() + self.help_ii((src1_data, src2_data), + expected_result, op) + + def test_and_ss(self): + src1_data = [1, 2, 3, 0x5004, 0x1150] + src2_data = [8, 2, 1, 0x0508, 0x1105] + expected_result = [0, 2, 1, 0x0000, 0x1100] + op = blocks.and_ss() + self.help_ss((src1_data, src2_data), + expected_result, op) + + def test_and_bb(self): + src1_data = [1, 2, 2, 3, 0x04, 0x50] + src2_data = [8, 2, 2, 1, 0x08, 0x05] + src3_data = [8, 2, 1, 1, 0x08, 0x05] + expected_result = [0, 2, 0, 1, 0x00, 0x00] + op = blocks.and_bb() + self.help_bb((src1_data, src2_data, src3_data), + expected_result, op) + + def test_and_ii(self): + src1_data = [1, 2, 3, 0x50005004, 0x11001150] + src2_data = [8, 2, 1, 0x05000508, 0x11001105] + expected_result = [0, 2, 1, 0x00000000, 0x11001100] + op = blocks.and_ii() + self.help_ii((src1_data, src2_data), + expected_result, op) + + def test_and_const_ss(self): + src_data = [1, 2, 3, 0x5004, 0x1150] + expected_result = [0, 2, 2, 0x5000, 0x1100] src = blocks.vector_source_s(src_data) - op = blocks.and_const_ss (0x55AA) + op = blocks.and_const_ss(0x55AA) dst = blocks.vector_sink_s() self.tb.connect(src, op, dst) self.tb.run() self.assertEqual(dst.data(), expected_result) - def test_and_const_bb (self): - src_data = [1, 2, 3, 0x50, 0x11] - expected_result = [0, 2, 2, 0x00, 0x00] + def test_and_const_bb(self): + src_data = [1, 2, 3, 0x50, 0x11] + expected_result = [0, 2, 2, 0x00, 0x00] src = blocks.vector_source_b(src_data) - op = blocks.and_const_bb (0xAA) + op = blocks.and_const_bb(0xAA) dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) self.tb.run() self.assertEqual(dst.data(), expected_result) - - def test_and_const_ii (self): - src_data = [1, 2, 3, 0x5004, 0x1150] - expected_result = [0, 2, 2, 0x5000, 0x1100] + def test_and_const_ii(self): + src_data = [1, 2, 3, 0x5004, 0x1150] + expected_result = [0, 2, 2, 0x5000, 0x1100] src = blocks.vector_source_i(src_data) - op = blocks.and_const_ii (0x55AA) + op = blocks.and_const_ii(0x55AA) dst = blocks.vector_sink_i() self.tb.connect(src, op, dst) self.tb.run() self.assertEqual(dst.data(), expected_result) - - def test_or_ss (self): - src1_data = [1, 2, 3, 0x5004, 0x1150] - src2_data = [8, 2, 1 , 0x0508, 0x1105] - expected_result = [9, 2, 3, 0x550C, 0x1155] - op = blocks.or_ss () - self.help_ss ((src1_data, src2_data), - expected_result, op) - - def test_or_bb (self): - src1_data = [1, 2, 2, 3, 0x04, 0x50] - src2_data = [8, 2, 2, 1 , 0x08, 0x05] - src3_data = [8, 2, 1, 1 , 0x08, 0x05] - expected_result = [9, 2, 3, 3, 0x0C, 0x55] - op = blocks.or_bb () - self.help_bb ((src1_data, src2_data, src3_data), - expected_result, op) - - def test_or_ii (self): - src1_data = [1, 2, 3, 0x50005004, 0x11001150] - src2_data = [8, 2, 1 , 0x05000508, 0x11001105] - expected_result = [9, 2, 3, 0x5500550C, 0x11001155] - op = blocks.or_ii () - self.help_ii ((src1_data, src2_data), - expected_result, op) - - def test_not_ss (self): - src1_data = [1, 2, 3, 0x5004, 0x1150] - expected_result = [~1, ~2, ~3, ~0x5004, ~0x1150] - op = blocks.not_ss () - self.help_ss ((((src1_data),)), - expected_result, op) - - def test_not_bb (self): - src1_data = [1, 2, 2, 3, 0x04, 0x50] - expected_result = [0xFE, 0xFD, 0xFD, 0xFC, 0xFB, 0xAF] - op = blocks.not_bb () - self.help_bb (((src1_data), ), - expected_result, op) - - def test_not_ii (self): - src1_data = [1, 2, 3, 0x50005004, 0x11001150] - expected_result = [~1 , ~2, ~3, ~0x50005004, ~0x11001150] - op = blocks.not_ii () - self.help_ii (((src1_data),), - expected_result, op) + def test_or_ss(self): + src1_data = [1, 2, 3, 0x5004, 0x1150] + src2_data = [8, 2, 1, 0x0508, 0x1105] + expected_result = [9, 2, 3, 0x550C, 0x1155] + op = blocks.or_ss() + self.help_ss((src1_data, src2_data), + expected_result, op) + + def test_or_bb(self): + src1_data = [1, 2, 2, 3, 0x04, 0x50] + src2_data = [8, 2, 2, 1, 0x08, 0x05] + src3_data = [8, 2, 1, 1, 0x08, 0x05] + expected_result = [9, 2, 3, 3, 0x0C, 0x55] + op = blocks.or_bb() + self.help_bb((src1_data, src2_data, src3_data), + expected_result, op) + + def test_or_ii(self): + src1_data = [1, 2, 3, 0x50005004, 0x11001150] + src2_data = [8, 2, 1, 0x05000508, 0x11001105] + expected_result = [9, 2, 3, 0x5500550C, 0x11001155] + op = blocks.or_ii() + self.help_ii((src1_data, src2_data), + expected_result, op) + + def test_not_ss(self): + src1_data = [1, 2, 3, 0x5004, 0x1150] + expected_result = [~1, ~2, ~3, ~0x5004, ~0x1150] + op = blocks.not_ss() + self.help_ss((((src1_data),)), + expected_result, op) + + def test_not_bb(self): + src1_data = [1, 2, 2, 3, 0x04, 0x50] + expected_result = [0xFE, 0xFD, 0xFD, 0xFC, 0xFB, 0xAF] + op = blocks.not_bb() + self.help_bb(((src1_data), ), + expected_result, op) + + def test_not_ii(self): + src1_data = [1, 2, 3, 0x50005004, 0x11001150] + expected_result = [~1, ~2, ~3, ~0x50005004, ~0x11001150] + op = blocks.not_ii() + self.help_ii(((src1_data),), + expected_result, op) if __name__ == '__main__': diff --git a/gr-blocks/python/blocks/qa_burst_tagger.py b/gr-blocks/python/blocks/qa_burst_tagger.py index 707e0d7a8c..c70fb283f0 100644 --- a/gr-blocks/python/blocks/qa_burst_tagger.py +++ b/gr-blocks/python/blocks/qa_burst_tagger.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import pmt + class test_burst_tagger(gr_unittest.TestCase): def setUp(self): @@ -22,14 +23,14 @@ class test_burst_tagger(gr_unittest.TestCase): self.tb = None def test_001(self): - src_data = ( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - trg_data = (-1, -1, 1, 1, -1, -1, 1, 1, -1, -1) + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + trg_data = (-1, -1, 1, 1, -1, -1, 1, 1, -1, -1) src = blocks.vector_source_i(src_data) trg = blocks.vector_source_s(trg_data) - op = blocks.burst_tagger(gr.sizeof_int) + op = blocks.burst_tagger(gr.sizeof_int) snk = blocks.tag_debug(gr.sizeof_int, "burst tagger QA") - self.tb.connect(src, (op,0)) - self.tb.connect(trg, (op,1)) + self.tb.connect(src, (op, 0)) + self.tb.connect(trg, (op, 1)) self.tb.connect(op, snk) self.tb.run() @@ -39,10 +40,11 @@ class test_burst_tagger(gr_unittest.TestCase): self.assertEqual(6, x[2].offset) self.assertEqual(8, x[3].offset) - self.assertEqual(True, pmt.to_bool(x[0].value)) + self.assertEqual(True, pmt.to_bool(x[0].value)) self.assertEqual(False, pmt.to_bool(x[1].value)) - self.assertEqual(True, pmt.to_bool(x[2].value)) + self.assertEqual(True, pmt.to_bool(x[2].value)) self.assertEqual(False, pmt.to_bool(x[3].value)) + if __name__ == '__main__': gr_unittest.run(test_burst_tagger) diff --git a/gr-blocks/python/blocks/qa_conjugate.py b/gr-blocks/python/blocks/qa_conjugate.py index ad7b76b8f0..a67db1da58 100644 --- a/gr-blocks/python/blocks/qa_conjugate.py +++ b/gr-blocks/python/blocks/qa_conjugate.py @@ -11,32 +11,34 @@ from gnuradio import gr, gr_unittest, blocks + class test_conjugate (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_000 (self): - src_data = [-2-2j, -1-1j, -2+2j, -1+1j, - 2-2j, 1-1j, 2+2j, 1+1j, - 0+0j] + def test_000(self): + src_data = [-2 - 2j, -1 - 1j, -2 + 2j, -1 + 1j, + 2 - 2j, 1 - 1j, 2 + 2j, 1 + 1j, + 0 + 0j] - exp_data = [-2+2j, -1+1j, -2-2j, -1-1j, - 2+2j, 1+1j, 2-2j, 1-1j, - 0-0j] + exp_data = [-2 + 2j, -1 + 1j, -2 - 2j, -1 - 1j, + 2 + 2j, 1 + 1j, 2 - 2j, 1 - 1j, + 0 - 0j] src = blocks.vector_source_c(src_data) - op = blocks.conjugate_cc () - dst = blocks.vector_sink_c () + op = blocks.conjugate_cc() + dst = blocks.vector_sink_c() self.tb.connect(src, op) self.tb.connect(op, dst) self.tb.run() - result_data = dst.data () - self.assertEqual (exp_data, result_data) + result_data = dst.data() + self.assertEqual(exp_data, result_data) + if __name__ == '__main__': gr_unittest.run(test_conjugate) diff --git a/gr-blocks/python/blocks/qa_copy.py b/gr-blocks/python/blocks/qa_copy.py index 7bbbba9f7c..9ae4624699 100644 --- a/gr-blocks/python/blocks/qa_copy.py +++ b/gr-blocks/python/blocks/qa_copy.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_copy(gr_unittest.TestCase): def setUp(self): @@ -30,7 +31,7 @@ class test_copy(gr_unittest.TestCase): dst_data = dst.data() self.assertEqual(expected_result, dst_data) - def test_copy_drop (self): + def test_copy_drop(self): src_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] expected_result = [] src = blocks.vector_source_b(src_data) @@ -42,5 +43,6 @@ class test_copy(gr_unittest.TestCase): dst_data = dst.data() self.assertEqual(expected_result, dst_data) + if __name__ == '__main__': gr_unittest.run(test_copy) diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding.py b/gr-blocks/python/blocks/qa_cpp_py_binding.py index 42c4f3995e..8352f67379 100644 --- a/gr-blocks/python/blocks/qa_cpp_py_binding.py +++ b/gr-blocks/python/blocks/qa_cpp_py_binding.py @@ -13,46 +13,59 @@ # -import sys, time, random, numpy, re +import sys +import time +import random +import numpy +import re from gnuradio import gr, gr_unittest, blocks from gnuradio.ctrlport import GNURadio from gnuradio import ctrlport import os + def get1(): return "success" + def get2(): return "failure" + class inc_class(object): def __init__(self): self.val = 1 + def pp(self): - self.val = self.val+1 + self.val = self.val + 1 return self.val + get3 = inc_class() + def get4(): random.seed(0) rv = random.random() return rv + def get5(): numpy.random.seed(0) - samp_t = numpy.random.randn(24)+1j*numpy.random.randn(24); - samp_f = numpy.fft.fft(samp_t); - log_pow_f = 20*numpy.log10(numpy.abs(samp_f)) + samp_t = numpy.random.randn(24) + 1j * numpy.random.randn(24) + samp_f = numpy.fft.fft(samp_t) + log_pow_f = 20 * numpy.log10(numpy.abs(samp_f)) rv = list(log_pow_f) - return rv; + return rv + def get6(): numpy.random.seed(0) - samp_t = numpy.random.randn(1024)+1j*numpy.random.randn(1024); + samp_t = numpy.random.randn(1024) + 1j * numpy.random.randn(1024) rv = list(samp_t) - return rv; + return rv + class test_cpp_py_binding(gr_unittest.TestCase): @@ -84,14 +97,26 @@ class test_cpp_py_binding(gr_unittest.TestCase): gr.DISPNULL) v4.activate(get4) - v5 = gr.RPC_get_vector_float("pyland", "fvec", "unit_5_float_vector", - "Python Exported Float Vector", [], [], [], - gr.DISPTIME | gr.DISPOPTCPLX) + v5 = gr.RPC_get_vector_float( + "pyland", + "fvec", + "unit_5_float_vector", + "Python Exported Float Vector", + [], + [], + [], + gr.DISPTIME | gr.DISPOPTCPLX) v5.activate(get5) - v6 = gr.RPC_get_vector_gr_complex("pyland", "cvec", "unit_6_gr_complex_vector", - "Python Exported Complex Vector", [], [], [], - gr.DISPXY | gr.DISPOPTSCATTER) + v6 = gr.RPC_get_vector_gr_complex( + "pyland", + "cvec", + "unit_6_gr_complex_vector", + "Python Exported Complex Vector", + [], + [], + [], + gr.DISPXY | gr.DISPOPTSCATTER) v6.activate(get6) # print some variables locally @@ -105,7 +130,7 @@ class test_cpp_py_binding(gr_unittest.TestCase): val = get3.pp() rval = v3.get() - self.assertEqual(val+1, rval) + self.assertEqual(val + 1, rval) val = get4() rval = v4.get() @@ -120,11 +145,11 @@ class test_cpp_py_binding(gr_unittest.TestCase): self.assertComplexTuplesAlmostEqual(val, rval, 5) def test_002(self): - data = list(range(1,9)) + data = list(range(1, 9)) self.src = blocks.vector_source_c(data) - self.p1 = blocks.ctrlport_probe_c("aaa","C++ exported variable") - self.p2 = blocks.ctrlport_probe_c("bbb","C++ exported variable") + self.p1 = blocks.ctrlport_probe_c("aaa", "C++ exported variable") + self.p2 = blocks.ctrlport_probe_c("bbb", "C++ exported variable") probe_name = self.p2.alias() self.tb.connect(self.src, self.p1) @@ -146,7 +171,8 @@ class test_cpp_py_binding(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs @@ -157,5 +183,6 @@ class test_cpp_py_binding(gr_unittest.TestCase): self.tb.stop() + if __name__ == '__main__': gr_unittest.run(test_cpp_py_binding) diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py b/gr-blocks/python/blocks/qa_cpp_py_binding_set.py index 6377d05b95..c404669cc3 100644 --- a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py +++ b/gr-blocks/python/blocks/qa_cpp_py_binding_set.py @@ -13,29 +13,36 @@ # -import sys, time, random, numpy, re +import sys +import time +import random +import numpy +import re from gnuradio import gr, gr_unittest, blocks from gnuradio.ctrlport import GNURadio from gnuradio import ctrlport import os + class inc_class(object): - def __init__(self,val): - self.val = val; + def __init__(self, val): + self.val = val def _get(self): - #print "returning get (val = %s)"%(str(self.val)); - return self.val; + # print "returning get (val = %s)"%(str(self.val)); + return self.val + + def _set(self, val): + # print "updating val to %s"%(str(val)); + self.val = val + return + - def _set(self,val): - #print "updating val to %s"%(str(val)); - self.val = val; - return; +getset1 = inc_class(10) +getset2 = inc_class(100.0) +getset3 = inc_class("test") -getset1 = inc_class(10); -getset2 = inc_class(100.0); -getset3 = inc_class("test"); class test_cpp_py_binding_set(gr_unittest.TestCase): def setUp(self): @@ -48,12 +55,12 @@ class test_cpp_py_binding_set(gr_unittest.TestCase): def test_001(self): g1 = gr.RPC_get_int("pyland", "v1", "unit_1_int", - "Python Exported Int", 0, 100, 10, - gr.DISPNULL) + "Python Exported Int", 0, 100, 10, + gr.DISPNULL) g1.activate(getset1._get) s1 = gr.RPC_get_int("pyland", "v1", "unit_1_int", - "Python Exported Int", 0, 100, 10, - gr.DISPNULL) + "Python Exported Int", 0, 100, 10, + gr.DISPNULL) s1.activate(getset1._set) time.sleep(0.01) @@ -95,13 +102,12 @@ class test_cpp_py_binding_set(gr_unittest.TestCase): rval = g3.get() self.assertEqual(val, rval) - def test_002(self): data = list(range(1, 10)) self.src = blocks.vector_source_c(data, True) self.p = blocks.nop(gr.sizeof_gr_complex) - self.p.set_ctrlport_test(0); + self.p.set_ctrlport_test(0) probe_info = self.p.alias() self.tb.connect(self.src, self.p) @@ -113,7 +119,8 @@ class test_cpp_py_binding_set(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client self.tb.start() @@ -122,8 +129,8 @@ class test_cpp_py_binding_set(gr_unittest.TestCase): time.sleep(0.1) # Get all exported knobs - key_name_test = probe_info+"::test" - ret = radio.getKnobs([key_name_test,]) + key_name_test = probe_info + "::test" + ret = radio.getKnobs([key_name_test, ]) ret[key_name_test].value = 10 radio.setKnobs({key_name_test: ret[key_name_test]}) @@ -135,5 +142,6 @@ class test_cpp_py_binding_set(gr_unittest.TestCase): self.tb.stop() self.tb.wait() + if __name__ == '__main__': gr_unittest.run(test_cpp_py_binding_set) diff --git a/gr-blocks/python/blocks/qa_ctrlport_probes.py b/gr-blocks/python/blocks/qa_ctrlport_probes.py index edaabcfc34..33b5a02e9b 100644 --- a/gr-blocks/python/blocks/qa_ctrlport_probes.py +++ b/gr-blocks/python/blocks/qa_ctrlport_probes.py @@ -9,12 +9,18 @@ # -import sys, time, random, numpy +import sys +import time +import random +import numpy from gnuradio import gr, gr_unittest, blocks -import os, struct, re +import os +import struct +import re from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + class test_ctrlport_probes(gr_unittest.TestCase): def setUp(self): @@ -25,17 +31,16 @@ class test_ctrlport_probes(gr_unittest.TestCase): self.tb = None def test_001(self): - data = list(range(1,9)) + data = list(range(1, 9)) self.src = blocks.vector_source_c(data, True) - self.probe = blocks.ctrlport_probe2_c("samples","Complex", + self.probe = blocks.ctrlport_probe2_c("samples", "Complex", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() - # Probes return complex values as list of floats with re, im # Imaginary parts of this data set are 0. expected_result = [1, 2, 3, 4, @@ -51,7 +56,8 @@ class test_ctrlport_probes(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs @@ -67,19 +73,18 @@ class test_ctrlport_probes(gr_unittest.TestCase): self.tb.stop() self.tb.wait() - def test_002(self): - data = list(range(1,9)) + data = list(range(1, 9)) self.src = blocks.vector_source_f(data, True) - self.probe = blocks.ctrlport_probe2_f("samples","Floats", + self.probe = blocks.ctrlport_probe2_f("samples", "Floats", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + expected_result = [1, 2, 3, 4, 5, 6, 7, 8, ] # Make sure we have time for flowgraph to run time.sleep(0.1) @@ -91,7 +96,8 @@ class test_ctrlport_probes(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs @@ -108,17 +114,17 @@ class test_ctrlport_probes(gr_unittest.TestCase): self.tb.wait() def test_003(self): - data = list(range(1,9)) + data = list(range(1, 9)) self.src = blocks.vector_source_i(data, True) - self.probe = blocks.ctrlport_probe2_i("samples","Integers", + self.probe = blocks.ctrlport_probe2_i("samples", "Integers", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + expected_result = [1, 2, 3, 4, 5, 6, 7, 8, ] # Make sure we have time for flowgraph to run time.sleep(0.1) @@ -130,7 +136,8 @@ class test_ctrlport_probes(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs @@ -146,19 +153,18 @@ class test_ctrlport_probes(gr_unittest.TestCase): self.tb.stop() self.tb.wait() - def test_004(self): - data = list(range(1,9)) + data = list(range(1, 9)) self.src = blocks.vector_source_s(data, True) - self.probe = blocks.ctrlport_probe2_s("samples","Shorts", + self.probe = blocks.ctrlport_probe2_s("samples", "Shorts", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + expected_result = [1, 2, 3, 4, 5, 6, 7, 8, ] # Make sure we have time for flowgraph to run time.sleep(0.1) @@ -170,7 +176,8 @@ class test_ctrlport_probes(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs @@ -187,17 +194,17 @@ class test_ctrlport_probes(gr_unittest.TestCase): self.tb.wait() def test_005(self): - data = list(range(1,9)) + data = list(range(1, 9)) self.src = blocks.vector_source_b(data, True) - self.probe = blocks.ctrlport_probe2_b("samples","Bytes", + self.probe = blocks.ctrlport_probe2_b("samples", "Bytes", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + expected_result = [1, 2, 3, 4, 5, 6, 7, 8, ] # Make sure we have time for flowgraph to run time.sleep(0.1) @@ -209,7 +216,8 @@ class test_ctrlport_probes(gr_unittest.TestCase): # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient - radiosys = GNURadioControlPortClient(hostname, portnum, rpcmethod='thrift') + radiosys = GNURadioControlPortClient( + hostname, portnum, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs @@ -218,7 +226,7 @@ class test_ctrlport_probes(gr_unittest.TestCase): # Get data in probe, which might be offset; find the # beginning and unwrap. result = ret[name].value - result = list(struct.unpack(len(result)*'b', result)) + result = list(struct.unpack(len(result) * 'b', result)) i = result.index(1) result = result[i:] + result[0:i] self.assertEqual(expected_result, result) @@ -226,5 +234,6 @@ class test_ctrlport_probes(gr_unittest.TestCase): self.tb.stop() self.tb.wait() + if __name__ == '__main__': gr_unittest.run(test_ctrlport_probes) diff --git a/gr-blocks/python/blocks/qa_delay.py b/gr-blocks/python/blocks/qa_delay.py index dfef2fcbec..08b3289ec5 100644 --- a/gr-blocks/python/blocks/qa_delay.py +++ b/gr-blocks/python/blocks/qa_delay.py @@ -12,6 +12,7 @@ from gnuradio import gr, gr_unittest, blocks import pmt + class test_delay(gr_unittest.TestCase): def setUp(self): @@ -24,7 +25,7 @@ class test_delay(gr_unittest.TestCase): delta_t = 0 tb = self.tb src_data = [float(x) for x in range(0, 100)] - expected_result = list(delta_t*[0.0] + src_data) + expected_result = list(delta_t * [0.0] + src_data) src = blocks.vector_source_f(src_data) op = blocks.delay(gr.sizeof_float, delta_t) @@ -39,7 +40,7 @@ class test_delay(gr_unittest.TestCase): delta_t = 10 tb = self.tb src_data = [float(x) for x in range(0, 100)] - expected_result = list(delta_t*[0.0] + src_data) + expected_result = list(delta_t * [0.0] + src_data) src = blocks.vector_source_f(src_data) op = blocks.delay(gr.sizeof_float, delta_t) @@ -54,9 +55,13 @@ class test_delay(gr_unittest.TestCase): tb = self.tb vector_sink = blocks.vector_sink_f(1) ref_sink = blocks.vector_sink_f(1) - tags_strobe = blocks.tags_strobe(gr.sizeof_float*1, pmt.intern("TEST"), 100, pmt.intern("strobe")) - head = blocks.head(gr.sizeof_float*1, 10**5) - delay = blocks.delay(gr.sizeof_float*1, 100) + tags_strobe = blocks.tags_strobe( + gr.sizeof_float * 1, + pmt.intern("TEST"), + 100, + pmt.intern("strobe")) + head = blocks.head(gr.sizeof_float * 1, 10**5) + delay = blocks.delay(gr.sizeof_float * 1, 100) tb.connect((delay, 0), (head, 0)) tb.connect((head, 0), (vector_sink, 0)) tb.connect((tags_strobe, 0), (delay, 0)) diff --git a/gr-blocks/python/blocks/qa_endian_swap.py b/gr-blocks/python/blocks/qa_endian_swap.py index 52d13c0bad..949183fd35 100644 --- a/gr-blocks/python/blocks/qa_endian_swap.py +++ b/gr-blocks/python/blocks/qa_endian_swap.py @@ -12,6 +12,7 @@ from gnuradio import gr, gr_unittest, blocks import ctypes + class test_endian_swap(gr_unittest.TestCase): def setUp(self): @@ -21,8 +22,8 @@ class test_endian_swap(gr_unittest.TestCase): self.tb = None def test_001(self): - src_data = [1,2,3,4] - expected_result = [256, 512, 768, 1024]; + src_data = [1, 2, 3, 4] + expected_result = [256, 512, 768, 1024] src = blocks.vector_source_s(src_data) op = blocks.endian_swap(2) @@ -36,8 +37,8 @@ class test_endian_swap(gr_unittest.TestCase): def test_002(self): - src_data = [1,2,3,4] - expected_result = [16777216, 33554432, 50331648, 67108864]; + src_data = [1, 2, 3, 4] + expected_result = [16777216, 33554432, 50331648, 67108864] src = blocks.vector_source_i(src_data) op = blocks.endian_swap(4) @@ -49,6 +50,6 @@ class test_endian_swap(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) + if __name__ == '__main__': gr_unittest.run(test_endian_swap) - diff --git a/gr-blocks/python/blocks/qa_exponentiate_const_cci.py b/gr-blocks/python/blocks/qa_exponentiate_const_cci.py index d37e076975..0c1412949c 100644 --- a/gr-blocks/python/blocks/qa_exponentiate_const_cci.py +++ b/gr-blocks/python/blocks/qa_exponentiate_const_cci.py @@ -1,18 +1,19 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# +# # Copyright 2017 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# from gnuradio import gr, gr_unittest from gnuradio import blocks import pmt + class qa_exponentiate_const_cci(gr_unittest.TestCase): def setUp(self): @@ -22,9 +23,13 @@ class qa_exponentiate_const_cci(gr_unittest.TestCase): self.tb = None def test_001_t(self): - for exponent in range(1,10): - in_data = [1+1j, -1, 4-1j, -3-7j] - out_data = [in_data[0]**exponent, in_data[1]**exponent, in_data[2]**exponent, in_data[3]**exponent] + for exponent in range(1, 10): + in_data = [1 + 1j, -1, 4 - 1j, -3 - 7j] + out_data = [ + in_data[0]**exponent, + in_data[1]**exponent, + in_data[2]**exponent, + in_data[3]**exponent] # Test streaming input source = blocks.vector_source_c(in_data, False, 1) @@ -40,7 +45,8 @@ class qa_exponentiate_const_cci(gr_unittest.TestCase): for vlen in [2, 4]: source = blocks.vector_source_c(in_data, False, 1) s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, vlen) - exponentiate_const_cci = blocks.exponentiate_const_cci(exponent, vlen) + exponentiate_const_cci = blocks.exponentiate_const_cci( + exponent, vlen) v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, vlen) sink = blocks.vector_sink_c(1) diff --git a/gr-blocks/python/blocks/qa_file_descriptor_source_sink.py b/gr-blocks/python/blocks/qa_file_descriptor_source_sink.py index 230737802f..bd45b2b2e4 100644 --- a/gr-blocks/python/blocks/qa_file_descriptor_source_sink.py +++ b/gr-blocks/python/blocks/qa_file_descriptor_source_sink.py @@ -13,13 +13,14 @@ import os import tempfile import pmt + class test_file_descriptor_source_sink(gr_unittest.TestCase): - def setUp (self): + def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_file_descriptor(self): @@ -54,5 +55,6 @@ class test_file_descriptor_source_sink(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_result, result_data) self.assertEqual(len(snk2.tags()), 0) + if __name__ == '__main__': gr_unittest.run(test_file_descriptor_source_sink) diff --git a/gr-blocks/python/blocks/qa_file_metadata.py b/gr-blocks/python/blocks/qa_file_metadata.py index 671966b493..aff8aabaa7 100644 --- a/gr-blocks/python/blocks/qa_file_metadata.py +++ b/gr-blocks/python/blocks/qa_file_metadata.py @@ -9,19 +9,22 @@ # -import os, math +import os +import math from gnuradio import gr, gr_unittest, blocks import pmt import parse_file_metadata + def sig_source_c(samp_rate, freq, amp, N): t = [float(x) / samp_rate for x in range(N)] - y = [amp*math.cos(2.*math.pi*freq*x) + \ - 1j*amp*math.sin(2.*math.pi*freq*x) for x in t] + y = [amp * math.cos(2. * math.pi * freq * x) + + 1j * amp * math.sin(2. * math.pi * freq * x) for x in t] return y + class test_file_metadata(gr_unittest.TestCase): def setUp(self): @@ -43,7 +46,7 @@ class test_file_metadata(gr_unittest.TestCase): extras = pmt.dict_add(extras, key, val) data = sig_source_c(samp_rate, 1000, 1, N) - src = blocks.vector_source_c(data) + src = blocks.vector_source_c(data) fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile, samp_rate, 1, blocks.GR_FILE_FLOAT, True, @@ -81,7 +84,6 @@ class test_file_metadata(gr_unittest.TestCase): self.assertEqual(info['rx_rate'], samp_rate) self.assertEqual(pmt.to_double(extra_info['samp_rate']), samp_rate) - # Test file metadata source src.rewind() fsrc = blocks.file_meta_source(outfile, False) @@ -94,7 +96,7 @@ class test_file_metadata(gr_unittest.TestCase): self.tb.connect(src, ssnk) self.tb.run() - fsrc.close() + fsrc.close() # Test to make sure tags with 'samp_rate' and 'rx_rate' keys # were generated and received correctly. tags = tsnk.current_tags() @@ -122,7 +124,7 @@ class test_file_metadata(gr_unittest.TestCase): extras = pmt.dict_add(extras, key, val) data = sig_source_c(samp_rate, 1000, 1, N) - src = blocks.vector_source_c(data) + src = blocks.vector_source_c(data) fsnk = blocks.file_meta_sink(gr.sizeof_gr_complex, outfile, samp_rate, 1, blocks.GR_FILE_FLOAT, True, @@ -162,7 +164,6 @@ class test_file_metadata(gr_unittest.TestCase): self.assertEqual(info['rx_rate'], samp_rate) self.assertEqual(pmt.to_double(extra_info['samp_rate']), samp_rate) - # Test file metadata source src.rewind() fsrc = blocks.file_meta_source(outfile, False, detached, outfile_hdr) @@ -191,5 +192,6 @@ class test_file_metadata(gr_unittest.TestCase): os.remove(outfile) os.remove(outfile_hdr) + if __name__ == '__main__': gr_unittest.run(test_file_metadata) diff --git a/gr-blocks/python/blocks/qa_file_sink.py b/gr-blocks/python/blocks/qa_file_sink.py index 7a0ad4f90a..ac169dbb6a 100644 --- a/gr-blocks/python/blocks/qa_file_sink.py +++ b/gr-blocks/python/blocks/qa_file_sink.py @@ -13,13 +13,14 @@ import tempfile import array from gnuradio import gr, gr_unittest, blocks + class test_file_sink(gr_unittest.TestCase): - def setUp (self): + def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_file_sink(self): @@ -43,5 +44,6 @@ class test_file_sink(gr_unittest.TestCase): result_data.fromfile(datafile, len(data)) self.assertFloatTuplesAlmostEqual(expected_result, result_data) + if __name__ == '__main__': gr_unittest.run(test_file_sink) diff --git a/gr-blocks/python/blocks/qa_file_source.py b/gr-blocks/python/blocks/qa_file_source.py index 9b282f3111..38de4f458e 100644 --- a/gr-blocks/python/blocks/qa_file_source.py +++ b/gr-blocks/python/blocks/qa_file_source.py @@ -14,6 +14,7 @@ import array import pmt from gnuradio import gr, gr_unittest, blocks + class test_file_source(gr_unittest.TestCase): @classmethod @@ -31,10 +32,10 @@ class test_file_source(gr_unittest.TestCase): del cls._datafilename del cls._datafile - def setUp (self): + def setUp(self): self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_file_source(self): @@ -60,7 +61,10 @@ class test_file_source(gr_unittest.TestCase): def test_file_source_with_offset(self): expected_result = self._vector[100:] - src = blocks.file_source(gr.sizeof_float, self._datafilename, offset=100) + src = blocks.file_source( + gr.sizeof_float, + self._datafilename, + offset=100) snk = blocks.vector_sink_f() self.tb.connect(src, snk) @@ -71,9 +75,13 @@ class test_file_source(gr_unittest.TestCase): self.assertEqual(len(snk.tags()), 0) def test_source_with_offset_and_len(self): - expected_result = self._vector[100:100+600] + expected_result = self._vector[100:100 + 600] - src = blocks.file_source(gr.sizeof_float, self._datafilename, offset=100, len=600) + src = blocks.file_source( + gr.sizeof_float, + self._datafilename, + offset=100, + len=600) snk = blocks.vector_sink_f() self.tb.connect(src, snk) self.tb.run() @@ -86,7 +94,7 @@ class test_file_source(gr_unittest.TestCase): src = blocks.file_source(gr.sizeof_float, self._datafilename) self.assertTrue(src.seek(0, os.SEEK_SET)) - self.assertTrue(src.seek(len(self._vector)-1, os.SEEK_SET)) + self.assertTrue(src.seek(len(self._vector) - 1, os.SEEK_SET)) # Seek past end of file - this will also log a warning self.assertFalse(src.seek(len(self._vector), os.SEEK_SET)) # Negative seek - this will also log a warning @@ -102,7 +110,6 @@ class test_file_source(gr_unittest.TestCase): # Seek past end of file - this will also log a warning self.assertFalse(src.seek(len(self._vector), os.SEEK_CUR)) - def test_begin_tag(self): expected_result = self._vector @@ -137,5 +144,6 @@ class test_file_source(gr_unittest.TestCase): self.assertEqual(str(tags[1].value), "1") self.assertEqual(tags[1].offset, 1000) + if __name__ == '__main__': gr_unittest.run(test_file_source) diff --git a/gr-blocks/python/blocks/qa_head.py b/gr-blocks/python/blocks/qa_head.py index c2868758e6..2999745721 100644 --- a/gr-blocks/python/blocks/qa_head.py +++ b/gr-blocks/python/blocks/qa_head.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_head(gr_unittest.TestCase): def setUp(self): @@ -31,5 +32,6 @@ class test_head(gr_unittest.TestCase): dst_data = dst1.data() self.assertEqual(expected_result, dst_data) + if __name__ == '__main__': gr_unittest.run(test_head) diff --git a/gr-blocks/python/blocks/qa_hier_block2.py b/gr-blocks/python/blocks/qa_hier_block2.py index 141561e2ce..418fb85810 100644 --- a/gr-blocks/python/blocks/qa_hier_block2.py +++ b/gr-blocks/python/blocks/qa_hier_block2.py @@ -12,104 +12,108 @@ class add_ff(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "add_ff", - in_sig = [numpy.float32, numpy.float32], - out_sig = [numpy.float32], + name="add_ff", + in_sig=[numpy.float32, numpy.float32], + out_sig=[numpy.float32], ) def work(self, input_items, output_items): output_items[0][:] = input_items[0] + input_items[1] return len(output_items[0]) + class multiply_const_ff(gr.sync_block): def __init__(self, k): gr.sync_block.__init__( self, - name = "multiply_ff", - in_sig = [numpy.float32], - out_sig = [numpy.float32], + name="multiply_ff", + in_sig=[numpy.float32], + out_sig=[numpy.float32], ) self.k = k def work(self, input_items, output_items): - output_items[0][:] = [self.k*x for x in input_items[0]] + output_items[0][:] = [self.k * x for x in input_items[0]] return len(output_items[0]) + class test_hier_block2(gr_unittest.TestCase): def setUp(self): pass def tearDown(self): - pass + pass def test_001_make(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) self.assertEqual("test_block", hblock.name()) self.assertEqual(1, hblock.input_signature().max_streams()) self.assertEqual(1, hblock.output_signature().min_streams()) self.assertEqual(1, hblock.output_signature().max_streams()) - self.assertEqual(gr.sizeof_int, hblock.output_signature().sizeof_stream_item(0)) + self.assertEqual( + gr.sizeof_int, + hblock.output_signature().sizeof_stream_item(0)) def test_002_connect_input(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(hblock, nop1) def test_004_connect_output(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(nop1, hblock) def test_005_connect_output_in_use(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) nop2 = blocks.nop(gr.sizeof_int) hblock.connect(nop1, hblock) self.assertRaises(ValueError, - lambda: hblock.connect(nop2, hblock)) + lambda: hblock.connect(nop2, hblock)) def test_006_connect_invalid_src_port_neg(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) self.assertRaises(ValueError, - lambda: hblock.connect((hblock, -1), nop1)) + lambda: hblock.connect((hblock, -1), nop1)) def test_005_connect_invalid_src_port_exceeds(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) self.assertRaises(ValueError, - lambda: hblock.connect((hblock, 1), nop1)) + lambda: hblock.connect((hblock, 1), nop1)) def test_007_connect_invalid_dst_port_neg(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) nop2 = blocks.nop(gr.sizeof_int) self.assertRaises(ValueError, - lambda: hblock.connect(nop1, (nop2, -1))) + lambda: hblock.connect(nop1, (nop2, -1))) def test_008_connect_invalid_dst_port_exceeds(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.null_sink(gr.sizeof_int) nop2 = blocks.null_sink(gr.sizeof_int) self.assertRaises(ValueError, - lambda: hblock.connect(nop1, (nop2, 1))) + lambda: hblock.connect(nop1, (nop2, 1))) def test_009_check_topology(self): hblock = gr.top_block("test_block") @@ -131,75 +135,75 @@ class test_hier_block2(gr_unittest.TestCase): def test_012_disconnect_input(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(hblock, nop1) hblock.disconnect(hblock, nop1) def test_013_disconnect_input_not_connected(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) nop2 = blocks.nop(gr.sizeof_int) hblock.connect(hblock, nop1) self.assertRaises(ValueError, - lambda: hblock.disconnect(hblock, nop2)) + lambda: hblock.disconnect(hblock, nop2)) def test_014_disconnect_input_neg(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(hblock, nop1) self.assertRaises(ValueError, - lambda: hblock.disconnect((hblock, -1), nop1)) + lambda: hblock.disconnect((hblock, -1), nop1)) def test_015_disconnect_input_exceeds(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(hblock, nop1) self.assertRaises(ValueError, - lambda: hblock.disconnect((hblock, 1), nop1)) + lambda: hblock.disconnect((hblock, 1), nop1)) def test_016_disconnect_output(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(nop1, hblock) hblock.disconnect(nop1, hblock) def test_017_disconnect_output_not_connected(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) nop2 = blocks.nop(gr.sizeof_int) hblock.connect(nop1, hblock) self.assertRaises(ValueError, - lambda: hblock.disconnect(nop2, hblock)) + lambda: hblock.disconnect(nop2, hblock)) def test_018_disconnect_output_neg(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(hblock, nop1) self.assertRaises(ValueError, - lambda: hblock.disconnect(nop1, (hblock, -1))) + lambda: hblock.disconnect(nop1, (hblock, -1))) def test_019_disconnect_output_exceeds(self): hblock = gr.hier_block2("test_block", - gr.io_signature(1,1,gr.sizeof_int), - gr.io_signature(1,1,gr.sizeof_int)) + gr.io_signature(1, 1, gr.sizeof_int), + gr.io_signature(1, 1, gr.sizeof_int)) nop1 = blocks.nop(gr.sizeof_int) hblock.connect(nop1, hblock) self.assertRaises(ValueError, - lambda: hblock.disconnect(nop1, (hblock, 1))) + lambda: hblock.disconnect(nop1, (hblock, 1))) def test_020_run(self): hblock = gr.top_block("test_block") @@ -251,7 +255,7 @@ class test_hier_block2(gr_unittest.TestCase): lambda: hblock.disconnect(blk)) def test_026_run_single(self): - expected_data = [1.0,] + expected_data = [1.0, ] tb = gr.top_block("top_block") hb = gr.hier_block2("block", gr.io_signature(0, 0, 0), @@ -268,11 +272,11 @@ class test_hier_block2(gr_unittest.TestCase): hb = gr.hier_block2("block", gr.io_signature(1, 1, 1), gr.io_signature(1, 1, 1)) - hsrc = blocks.vector_source_b([1,]) - hb.connect(hsrc, hb) # wire output internally + hsrc = blocks.vector_source_b([1, ]) + hb.connect(hsrc, hb) # wire output internally src = blocks.vector_source_b([1, ]) dst = blocks.vector_sink_b() - tb.connect(src, hb, dst) # hb's input is not connected internally + tb.connect(src, hb, dst) # hb's input is not connected internally self.assertRaises(RuntimeError, lambda: tb.run()) @@ -283,10 +287,10 @@ class test_hier_block2(gr_unittest.TestCase): gr.io_signature(1, 1, 1), gr.io_signature(1, 1, 1)) hdst = blocks.vector_sink_b() - hb.connect(hb, hdst) # wire input internally + hb.connect(hb, hdst) # wire input internally src = blocks.vector_source_b([1, ]) dst = blocks.vector_sink_b() - tb.connect(src, hb, dst) # hb's output is not connected internally + tb.connect(src, hb, dst) # hb's output is not connected internally self.assertRaises(RuntimeError, lambda: tb.run()) @@ -296,10 +300,11 @@ class test_hier_block2(gr_unittest.TestCase): gr.io_signature(1, 1, 1), gr.io_signature(1, 1, 1)) hsrc = blocks.vector_sink_b() - hb.connect(hb, hsrc) # wire input internally + hb.connect(hb, hsrc) # wire input internally src = blocks.vector_source_b([1, ]) dst = blocks.vector_sink_b() - tb.connect(src, hb) # hb's output is not connected internally or externally + # hb's output is not connected internally or externally + tb.connect(src, hb) self.assertRaises(RuntimeError, lambda: tb.run()) @@ -308,10 +313,11 @@ class test_hier_block2(gr_unittest.TestCase): hb = gr.hier_block2("block", gr.io_signature(1, 1, 1), gr.io_signature(1, 1, 1)) - hdst = blocks.vector_source_b([1,]) - hb.connect(hdst, hb) # wire output internally + hdst = blocks.vector_source_b([1, ]) + hb.connect(hdst, hb) # wire output internally dst = blocks.vector_sink_b() - tb.connect(hb, dst) # hb's input is not connected internally or externally + # hb's input is not connected internally or externally + tb.connect(hb, dst) self.assertRaises(RuntimeError, lambda: tb.run()) @@ -322,7 +328,7 @@ class test_hier_block2(gr_unittest.TestCase): src = blocks.vector_source_b([1, ]) dst = blocks.vector_sink_b() hb.connect(src, dst) - tb.connect(hb) # Singleton connect + tb.connect(hb) # Singleton connect tb.lock() tb.disconnect_all() tb.connect(src, dst) @@ -336,11 +342,11 @@ class test_hier_block2(gr_unittest.TestCase): tb.disconnect(src) # Singleton disconnect tb.connect(src, dst) tb.run() - self.assertEqual(dst.data(), [1,]) + self.assertEqual(dst.data(), [1, ]) def test_030_nested_input(self): tb = gr.top_block() - src = blocks.vector_source_b([1,]) + src = blocks.vector_source_b([1, ]) hb1 = gr.hier_block2("hb1", gr.io_signature(1, 1, gr.sizeof_char), gr.io_signature(0, 0, 0)) @@ -352,11 +358,11 @@ class test_hier_block2(gr_unittest.TestCase): hb1.connect(hb1, hb2) hb2.connect(hb2, blocks.copy(gr.sizeof_char), dst) tb.run() - self.assertEqual(dst.data(), [1,]) + self.assertEqual(dst.data(), [1, ]) def test_031_multiple_internal_inputs(self): tb = gr.top_block() - src = blocks.vector_source_f([1.0,]) + src = blocks.vector_source_f([1.0, ]) hb = gr.hier_block2("hb", gr.io_signature(1, 1, gr.sizeof_float), gr.io_signature(1, 1, gr.sizeof_float)) @@ -371,23 +377,24 @@ class test_hier_block2(gr_unittest.TestCase): dst = blocks.vector_sink_f() tb.connect(src, hb, dst) tb.run() - self.assertEqual(dst.data(), [3.0,]) + self.assertEqual(dst.data(), [3.0, ]) def test_032_nested_multiple_internal_inputs(self): tb = gr.top_block() - src = blocks.vector_source_f([1.0,]) + src = blocks.vector_source_f([1.0, ]) hb = gr.hier_block2("hb", gr.io_signature(1, 1, gr.sizeof_float), gr.io_signature(1, 1, gr.sizeof_float)) hb2 = gr.hier_block2("hb", - gr.io_signature(1, 1, gr.sizeof_float), - gr.io_signature(1, 1, gr.sizeof_float)) + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_float)) m1 = multiply_const_ff(1.0) m2 = multiply_const_ff(2.0) add = add_ff() hb2.connect(hb2, m1) # m1 is connected to hb2 external input #0 - hb2.connect(hb2, m2) # m2 is also connected to hb2 external input #0 + # m2 is also connected to hb2 external input #0 + hb2.connect(hb2, m2) hb2.connect(m1, (add, 0)) hb2.connect(m2, (add, 1)) hb2.connect(add, hb2) # add is connected to hb2 external output #0 @@ -395,8 +402,7 @@ class test_hier_block2(gr_unittest.TestCase): dst = blocks.vector_sink_f() tb.connect(src, hb, dst) tb.run() - self.assertEqual(dst.data(), [3.0,]) - + self.assertEqual(dst.data(), [3.0, ]) def test_033a_set_affinity(self): expected = [1.0, 2.0, 3.0, 4.0] @@ -404,7 +410,7 @@ class test_hier_block2(gr_unittest.TestCase): src = blocks.vector_source_f(expected, False) snk = blocks.vector_sink_f() hblock.connect(src, snk) - hblock.set_processor_affinity([0,]) + hblock.set_processor_affinity([0, ]) hblock.run() actual = snk.data() self.assertEqual(expected, actual) @@ -415,7 +421,7 @@ class test_hier_block2(gr_unittest.TestCase): src = blocks.vector_source_f(expected, False) snk = blocks.vector_sink_f() hblock.connect(src, snk) - hblock.set_processor_affinity([0,]) + hblock.set_processor_affinity([0, ]) hblock.unset_processor_affinity() hblock.run() actual = snk.data() @@ -427,9 +433,9 @@ class test_hier_block2(gr_unittest.TestCase): src = blocks.vector_source_f(expected, False) snk = blocks.vector_sink_f() hblock.connect(src, snk) - hblock.set_processor_affinity([0,]) + hblock.set_processor_affinity([0, ]) procs = hblock.processor_affinity() - self.assertEqual([0,], procs) + self.assertEqual([0, ], procs) def test_34a_lock_unlock(self): hblock = gr.top_block("test_block") @@ -438,7 +444,7 @@ class test_hier_block2(gr_unittest.TestCase): hier = multiply_const_ff(0.5) sink = blocks.null_sink(gr.sizeof_float) hblock.connect(src, throttle, hier, sink) - hblock.set_processor_affinity([0,]) + hblock.set_processor_affinity([0, ]) hblock.start() time.sleep(1) hblock.lock() @@ -452,7 +458,8 @@ class test_hier_block2(gr_unittest.TestCase): throttle = blocks.throttle(gr.sizeof_float, 32000) sink = blocks.null_sink(gr.sizeof_float) hblock.connect(src, throttle, sink) - hblock.set_processor_affinity([0,]) + hblock.set_processor_affinity([0, ]) + def thread_01(hblock, cls): cls.test_34b_val = 10 hblock.lock() @@ -469,5 +476,6 @@ class test_hier_block2(gr_unittest.TestCase): hblock.wait() self.assertEqual(40, self.test_34b_val) + if __name__ == "__main__": gr_unittest.run(test_hier_block2) diff --git a/gr-blocks/python/blocks/qa_hier_block2_message_connections.py b/gr-blocks/python/blocks/qa_hier_block2_message_connections.py index ee9745c69a..9029866f8d 100644 --- a/gr-blocks/python/blocks/qa_hier_block2_message_connections.py +++ b/gr-blocks/python/blocks/qa_hier_block2_message_connections.py @@ -19,9 +19,9 @@ class block_with_message_output(gr.basic_block): def __init__(self): gr.basic_block.__init__(self, - "block_with_message_output", - in_sig=None, - out_sig=None) + "block_with_message_output", + in_sig=None, + out_sig=None) self.message_port_register_out(pmt.intern("test")) @@ -29,9 +29,9 @@ class block_with_message_input(gr.basic_block): def __init__(self): gr.basic_block.__init__(self, - "block_with_message_input", - in_sig=None, - out_sig=None) + "block_with_message_input", + in_sig=None, + out_sig=None) self.message_port_register_in(pmt.intern("test")) @@ -39,9 +39,9 @@ class hier_block_with_message_output(gr.hier_block2): def __init__(self): gr.hier_block2.__init__(self, - "hier_block_with_message_output", - gr.io_signature(0, 0, 0), # Input signature - gr.io_signature(0, 0, 0)) # Output signature + "hier_block_with_message_output", + gr.io_signature(0, 0, 0), # Input signature + gr.io_signature(0, 0, 0)) # Output signature self.message_port_register_hier_out("test") self.block = block_with_message_output() self.msg_connect(self.block, "test", self, "test") @@ -51,9 +51,9 @@ class hier_block_with_message_input(gr.hier_block2): def __init__(self): gr.hier_block2.__init__(self, - "hier_block_with_message_output", - gr.io_signature(0, 0, 0), # Input signature - gr.io_signature(0, 0, 0)) # Output signature + "hier_block_with_message_output", + gr.io_signature(0, 0, 0), # Input signature + gr.io_signature(0, 0, 0)) # Output signature self.message_port_register_hier_in("test") self.block = block_with_message_input() self.msg_connect(self, "test", self.block, "test") @@ -63,9 +63,9 @@ class hier_block_with_message_inout(gr.hier_block2): def __init__(self): gr.hier_block2.__init__(self, - "hier_block_with_message_inout", - gr.io_signature(0, 0, 0), # Input signature - gr.io_signature(0, 0, 0)) # Output signature + "hier_block_with_message_inout", + gr.io_signature(0, 0, 0), # Input signature + gr.io_signature(0, 0, 0)) # Output signature self.message_port_register_hier_in("test") self.message_port_register_hier_out("test") self.input = block_with_message_input() @@ -88,7 +88,7 @@ class test_hier_block2_message_connections(gr_unittest.TestCase): self.tb.wait() def assert_has_subscription(self, sender, send_port, receiver, - receive_port): + receive_port): """assert that the given sender block has a subscription for the given receiver block on the appropriate send and receive ports @@ -188,5 +188,6 @@ class test_hier_block2_message_connections(gr_unittest.TestCase): self.tb.msg_disconnect(hier, "test", x, "test") self.assert_has_num_subscriptions(hier.block, "test", 0) + if __name__ == '__main__': gr_unittest.run(test_hier_block2_message_connections) diff --git a/gr-blocks/python/blocks/qa_integrate.py b/gr-blocks/python/blocks/qa_integrate.py index 34ac12ee9a..27311ba14f 100644 --- a/gr-blocks/python/blocks/qa_integrate.py +++ b/gr-blocks/python/blocks/qa_integrate.py @@ -11,12 +11,13 @@ from gnuradio import gr, gr_unittest, blocks + class test_integrate (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_000_ss(self): @@ -50,8 +51,14 @@ class test_integrate (gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(dst_data, dst.data(), 6) def test_003_cc(self): - src_data = [1.0+1.0j, 2.0+2.0j, 3.0+3.0j, 4.0+4.0j, 5.0+5.0j, 6.0+6.0j] - dst_data = [6.0+6.0j, 15.0+15.0j] + src_data = [ + 1.0 + 1.0j, + 2.0 + 2.0j, + 3.0 + 3.0j, + 4.0 + 4.0j, + 5.0 + 5.0j, + 6.0 + 6.0j] + dst_data = [6.0 + 6.0j, 15.0 + 15.0j] src = blocks.vector_source_c(src_data) itg = blocks.integrate_cc(3) dst = blocks.vector_sink_c() @@ -71,8 +78,14 @@ class test_integrate (gr_unittest.TestCase): self.assertEqual(dst_data, dst.data()) def test_003_cc_vec(self): - src_data = [1.0+1.0j, 2.0+2.0j, 3.0+3.0j, 4.0+4.0j, 5.0+5.0j, 6.0+6.0j] - dst_data = [9.0+9.0j, 12.0+12.0j] + src_data = [ + 1.0 + 1.0j, + 2.0 + 2.0j, + 3.0 + 3.0j, + 4.0 + 4.0j, + 5.0 + 5.0j, + 6.0 + 6.0j] + dst_data = [9.0 + 9.0j, 12.0 + 12.0j] vlen = 2 src = blocks.vector_source_c(src_data, False, vlen) itg = blocks.integrate_cc(3, vlen) @@ -81,5 +94,6 @@ class test_integrate (gr_unittest.TestCase): self.tb.run() self.assertComplexTuplesAlmostEqual(dst_data, dst.data(), 6) + if __name__ == '__main__': gr_unittest.run(test_integrate) diff --git a/gr-blocks/python/blocks/qa_interleave.py b/gr-blocks/python/blocks/qa_interleave.py index 939fa65876..028c9dc8e4 100644 --- a/gr-blocks/python/blocks/qa_interleave.py +++ b/gr-blocks/python/blocks/qa_interleave.py @@ -11,126 +11,128 @@ from gnuradio import gr, gr_unittest, blocks + class test_interleave (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_int_001 (self): + def test_int_001(self): lenx = 64 - src0 = blocks.vector_source_f (list(range(0, lenx, 4))) - src1 = blocks.vector_source_f (list(range(1, lenx, 4))) - src2 = blocks.vector_source_f (list(range(2, lenx, 4))) - src3 = blocks.vector_source_f (list(range(3, lenx, 4))) - op = blocks.interleave (gr.sizeof_float) - dst = blocks.vector_sink_f () - - self.tb.connect (src0, (op, 0)) - self.tb.connect (src1, (op, 1)) - self.tb.connect (src2, (op, 2)) - self.tb.connect (src3, (op, 3)) - self.tb.connect (op, dst) - self.tb.run () - expected_result = tuple (range (lenx)) - result_data = dst.data () - self.assertFloatTuplesAlmostEqual (expected_result, result_data) - - def test_int_002 (self): + src0 = blocks.vector_source_f(list(range(0, lenx, 4))) + src1 = blocks.vector_source_f(list(range(1, lenx, 4))) + src2 = blocks.vector_source_f(list(range(2, lenx, 4))) + src3 = blocks.vector_source_f(list(range(3, lenx, 4))) + op = blocks.interleave(gr.sizeof_float) + dst = blocks.vector_sink_f() + + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) + self.tb.connect(src2, (op, 2)) + self.tb.connect(src3, (op, 3)) + self.tb.connect(op, dst) + self.tb.run() + expected_result = tuple(range(lenx)) + result_data = dst.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data) + + def test_int_002(self): blksize = 4 lenx = 64 - plusup_big = lambda a: a + (blksize * 4) - plusup_little = lambda a: a + blksize - a_vec = list(range(0,blksize)) - for i in range(0,(lenx // (4 * blksize)) - 1): + def plusup_big(a): return a + (blksize * 4) + def plusup_little(a): return a + blksize + a_vec = list(range(0, blksize)) + for i in range(0, (lenx // (4 * blksize)) - 1): a_vec += list(map(plusup_big, a_vec[len(a_vec) - blksize:])) b_vec = list(map(plusup_little, a_vec)) c_vec = list(map(plusup_little, b_vec)) d_vec = list(map(plusup_little, c_vec)) - src0 = blocks.vector_source_f (a_vec) - src1 = blocks.vector_source_f (b_vec) - src2 = blocks.vector_source_f (c_vec) - src3 = blocks.vector_source_f (d_vec) - op = blocks.interleave (gr.sizeof_float, blksize) - dst = blocks.vector_sink_f () - - self.tb.connect (src0, (op, 0)) - self.tb.connect (src1, (op, 1)) - self.tb.connect (src2, (op, 2)) - self.tb.connect (src3, (op, 3)) - self.tb.connect (op, dst) - self.tb.run () - expected_result = tuple (range (lenx)) - result_data = dst.data () - - self.assertFloatTuplesAlmostEqual (expected_result, result_data) - - def test_deint_001 (self): + src0 = blocks.vector_source_f(a_vec) + src1 = blocks.vector_source_f(b_vec) + src2 = blocks.vector_source_f(c_vec) + src3 = blocks.vector_source_f(d_vec) + op = blocks.interleave(gr.sizeof_float, blksize) + dst = blocks.vector_sink_f() + + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) + self.tb.connect(src2, (op, 2)) + self.tb.connect(src3, (op, 3)) + self.tb.connect(op, dst) + self.tb.run() + expected_result = tuple(range(lenx)) + result_data = dst.data() + + self.assertFloatTuplesAlmostEqual(expected_result, result_data) + + def test_deint_001(self): lenx = 64 - src = blocks.vector_source_f (list(range(lenx))) - op = blocks.deinterleave (gr.sizeof_float) - dst0 = blocks.vector_sink_f () - dst1 = blocks.vector_sink_f () - dst2 = blocks.vector_sink_f () - dst3 = blocks.vector_sink_f () - - self.tb.connect (src, op) - self.tb.connect ((op, 0), dst0) - self.tb.connect ((op, 1), dst1) - self.tb.connect ((op, 2), dst2) - self.tb.connect ((op, 3), dst3) - self.tb.run () - - expected_result0 = tuple (range (0, lenx, 4)) - expected_result1 = tuple (range (1, lenx, 4)) - expected_result2 = tuple (range (2, lenx, 4)) - expected_result3 = tuple (range (3, lenx, 4)) - - self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ()) - self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ()) - self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ()) - self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ()) - - def test_deint_002 (self): + src = blocks.vector_source_f(list(range(lenx))) + op = blocks.deinterleave(gr.sizeof_float) + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + dst2 = blocks.vector_sink_f() + dst3 = blocks.vector_sink_f() + + self.tb.connect(src, op) + self.tb.connect((op, 0), dst0) + self.tb.connect((op, 1), dst1) + self.tb.connect((op, 2), dst2) + self.tb.connect((op, 3), dst3) + self.tb.run() + + expected_result0 = tuple(range(0, lenx, 4)) + expected_result1 = tuple(range(1, lenx, 4)) + expected_result2 = tuple(range(2, lenx, 4)) + expected_result3 = tuple(range(3, lenx, 4)) + + self.assertFloatTuplesAlmostEqual(expected_result0, dst0.data()) + self.assertFloatTuplesAlmostEqual(expected_result1, dst1.data()) + self.assertFloatTuplesAlmostEqual(expected_result2, dst2.data()) + self.assertFloatTuplesAlmostEqual(expected_result3, dst3.data()) + + def test_deint_002(self): blksize = 4 lenx = 64 - src = blocks.vector_source_f (list(range(lenx))) - op = blocks.deinterleave (gr.sizeof_float, blksize) - dst0 = blocks.vector_sink_f () - dst1 = blocks.vector_sink_f () - dst2 = blocks.vector_sink_f () - dst3 = blocks.vector_sink_f () - - self.tb.connect (src, op) - self.tb.connect ((op, 0), dst0) - self.tb.connect ((op, 1), dst1) - self.tb.connect ((op, 2), dst2) - self.tb.connect ((op, 3), dst3) - self.tb.run () - - plusup_big = lambda a: a + (blksize * 4) - plusup_little = lambda a: a + blksize - a_vec = list(range(0,blksize)) - for i in range(0,(lenx // (4 * blksize)) - 1): + src = blocks.vector_source_f(list(range(lenx))) + op = blocks.deinterleave(gr.sizeof_float, blksize) + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + dst2 = blocks.vector_sink_f() + dst3 = blocks.vector_sink_f() + + self.tb.connect(src, op) + self.tb.connect((op, 0), dst0) + self.tb.connect((op, 1), dst1) + self.tb.connect((op, 2), dst2) + self.tb.connect((op, 3), dst3) + self.tb.run() + + def plusup_big(a): return a + (blksize * 4) + def plusup_little(a): return a + blksize + a_vec = list(range(0, blksize)) + for i in range(0, (lenx // (4 * blksize)) - 1): a_vec += list(map(plusup_big, a_vec[len(a_vec) - blksize:])) b_vec = list(map(plusup_little, a_vec)) c_vec = list(map(plusup_little, b_vec)) d_vec = list(map(plusup_little, c_vec)) - expected_result0 = tuple (a_vec) - expected_result1 = tuple (b_vec) - expected_result2 = tuple (c_vec) - expected_result3 = tuple (d_vec) + expected_result0 = tuple(a_vec) + expected_result1 = tuple(b_vec) + expected_result2 = tuple(c_vec) + expected_result3 = tuple(d_vec) + + self.assertFloatTuplesAlmostEqual(expected_result0, dst0.data()) + self.assertFloatTuplesAlmostEqual(expected_result1, dst1.data()) + self.assertFloatTuplesAlmostEqual(expected_result2, dst2.data()) + self.assertFloatTuplesAlmostEqual(expected_result3, dst3.data()) - self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ()) - self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ()) - self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ()) - self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ()) if __name__ == '__main__': gr_unittest.run(test_interleave) diff --git a/gr-blocks/python/blocks/qa_keep_m_in_n.py b/gr-blocks/python/blocks/qa_keep_m_in_n.py index 17298cd1fa..b055303e5c 100644 --- a/gr-blocks/python/blocks/qa_keep_m_in_n.py +++ b/gr-blocks/python/blocks/qa_keep_m_in_n.py @@ -61,8 +61,8 @@ class test_keep_m_in_n(gr_unittest.TestCase): self.assertEqual( sorted( list(range(i, 100, 5)) + - list(range((i+1) % 5, 100, 5)) + - list(range((i+2) % 5, 100, 5)) + list(range((i + 1) % 5, 100, 5)) + + list(range((i + 2) % 5, 100, 5)) ), list(snk[i].data()) ) @@ -82,7 +82,8 @@ class test_keep_m_in_n(gr_unittest.TestCase): with self.assertRaises(RuntimeError) as cm: blocks.keep_m_in_n(8, 2, 5, -1) - self.assertEqual(str(cm.exception), 'keep_m_in_n: offset (-1) must be >= 0') + self.assertEqual(str(cm.exception), + 'keep_m_in_n: offset (-1) must be >= 0') with self.assertRaises(RuntimeError) as cm: blocks.keep_m_in_n(8, 2, 5, 5) diff --git a/gr-blocks/python/blocks/qa_keep_one_in_n.py b/gr-blocks/python/blocks/qa_keep_one_in_n.py index d37195963f..613fc8ed14 100644 --- a/gr-blocks/python/blocks/qa_keep_one_in_n.py +++ b/gr-blocks/python/blocks/qa_keep_one_in_n.py @@ -11,12 +11,13 @@ from gnuradio import gr, gr_unittest, blocks + class test_keep_one_in_n(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_001(self): diff --git a/gr-blocks/python/blocks/qa_logger.py b/gr-blocks/python/blocks/qa_logger.py index 697d657c79..f3714afbb3 100644 --- a/gr-blocks/python/blocks/qa_logger.py +++ b/gr-blocks/python/blocks/qa_logger.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_logger (gr_unittest.TestCase): def setUp(self): @@ -45,9 +46,9 @@ class test_logger (gr_unittest.TestCase): # Make sure exception is throw on bogus data self.assertRaises(RuntimeError, ns.set_log_level, "11") - def test_log_level_for_tb(self): - # Test the python API for getting and setting log levels for a top_block + # Test the python API for getting and setting log levels for a + # top_block nsrc = blocks.null_source(4) nsnk = blocks.null_sink(4) # Set all log levels to a known state @@ -64,10 +65,12 @@ class test_logger (gr_unittest.TestCase): self.assertEqual(nsnk.log_level(), "alert") def test_log_level_for_hier_block(self): - # Test the python API for getting and setting log levels for hier blocks + # Test the python API for getting and setting log levels for hier + # blocks nsrc = blocks.null_source(4) nsnk = blocks.null_sink(4) - b = blocks.stream_to_vector_decimator(4, 1, 1, 1) # just a random hier block that exists + b = blocks.stream_to_vector_decimator( + 4, 1, 1, 1) # just a random hier block that exists tb = gr.top_block() tb.connect(nsrc, b, nsnk) tb.set_log_level("debug") @@ -81,5 +84,6 @@ class test_logger (gr_unittest.TestCase): self.assertEqual(nsnk.log_level(), "alert") self.assertEqual(b.one_in_n.log_level(), "alert") + if __name__ == '__main__': gr_unittest.run(test_logger) diff --git a/gr-blocks/python/blocks/qa_max.py b/gr-blocks/python/blocks/qa_max.py index 31c949853c..86250b7d4f 100644 --- a/gr-blocks/python/blocks/qa_max.py +++ b/gr-blocks/python/blocks/qa_max.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_max(gr_unittest.TestCase): def setUp(self): @@ -22,8 +23,8 @@ class test_max(gr_unittest.TestCase): self.tb = None def test_001(self): - src_data = [0,0.2,-0.3,0,12,0] - expected_result = [float(max(src_data)),] + src_data = [0, 0.2, -0.3, 0, 12, 0] + expected_result = [float(max(src_data)), ] src = blocks.vector_source_f(src_data) s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) @@ -36,8 +37,8 @@ class test_max(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def stest_002(self): - src_data=[-100,-99,-98,-97,-96,-1] - expected_result = [float(max(src_data)),] + src_data = [-100, -99, -98, -97, -96, -1] + expected_result = [float(max(src_data)), ] src = blocks.vector_source_f(src_data) s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) @@ -51,9 +52,10 @@ class test_max(gr_unittest.TestCase): def stest_003(self): src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] - expected_result = [float(max(x,y)) for x,y in zip(src_data0, src_data1)] + expected_result = [float(max(x, y)) + for x, y in zip(src_data0, src_data1)] src0 = blocks.vector_source_f(src_data0) src1 = blocks.vector_source_f(src_data1) @@ -70,17 +72,17 @@ class test_max(gr_unittest.TestCase): def stest_004(self): dim = 2 src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] expected_data = [] - tmp = [float(max(x,y)) for x,y in zip(src_data0, src_data1)] + tmp = [float(max(x, y)) for x, y in zip(src_data0, src_data1)] for i in range(len(tmp) / dim): - expected_data.append(float(max(tmp[i*dim:(i+1)*dim]))) + expected_data.append(float(max(tmp[i * dim:(i + 1) * dim]))) src0 = blocks.vector_source_f(src_data0) - s2v0 = blocks.stream_to_vector(gr.sizeof_float,dim) + s2v0 = blocks.stream_to_vector(gr.sizeof_float, dim) src1 = blocks.vector_source_f(src_data1) - s2v1 = blocks.stream_to_vector(gr.sizeof_float,dim) + s2v1 = blocks.stream_to_vector(gr.sizeof_float, dim) op = blocks.max_ff(dim) dst = blocks.vector_sink_f() @@ -91,13 +93,12 @@ class test_max(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) - def stest_s001(self): src_data = [0, 2, -3, 0, 12, 0] - expected_result = [max(src_data),] + expected_result = [max(src_data), ] src = blocks.vector_source_s(src_data) - s2v = blocks.stream_to_vector(gr.sizeof_short,len(src_data)) + s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data)) op = blocks.max_ss(len(src_data)) dst = blocks.vector_sink_s() @@ -107,8 +108,8 @@ class test_max(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def stest_s002(self): - src_data=[-100,-99,-98,-97,-96,-1] - expected_result = [max(src_data),] + src_data = [-100, -99, -98, -97, -96, -1] + expected_result = [max(src_data), ] src = blocks.vector_source_s(src_data) s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data)) @@ -120,12 +121,11 @@ class test_max(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) - def stest_s003(self): src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] - expected_result = [max(x,y) for x,y in zip(src_data0, src_data1)] + expected_result = [max(x, y) for x, y in zip(src_data0, src_data1)] src0 = blocks.vector_source_s(src_data0) src1 = blocks.vector_source_s(src_data1) @@ -142,17 +142,17 @@ class test_max(gr_unittest.TestCase): def stest_s004(self): dim = 2 src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] expected_data = [] - tmp = [max(x,y) for x,y in zip(src_data0, src_data1)] + tmp = [max(x, y) for x, y in zip(src_data0, src_data1)] for i in range(len(tmp) / dim): - expected_data.append(max(tmp[i*dim:(i+1)*dim])) + expected_data.append(max(tmp[i * dim:(i + 1) * dim])) src0 = blocks.vector_source_s(src_data0) - s2v0 = blocks.stream_to_vector(gr.sizeof_short,dim) + s2v0 = blocks.stream_to_vector(gr.sizeof_short, dim) src1 = blocks.vector_source_s(src_data1) - s2v1 = blocks.stream_to_vector(gr.sizeof_short,dim) + s2v1 = blocks.stream_to_vector(gr.sizeof_short, dim) op = blocks.max_ss(dim) dst = blocks.vector_sink_s() diff --git a/gr-blocks/python/blocks/qa_message.py b/gr-blocks/python/blocks/qa_message.py index af31d7256c..1df6857490 100644 --- a/gr-blocks/python/blocks/qa_message.py +++ b/gr-blocks/python/blocks/qa_message.py @@ -56,7 +56,7 @@ class test_message(gr_unittest.TestCase): msg = gr.message_from_string(s) self.assertEquals(s.encode('utf8'), msg.to_string()) - ## msg_queue was removed from API in 3.8 + # msg_queue was removed from API in 3.8 # def test_200(self): # self.leak_check(self.body_200) @@ -88,7 +88,7 @@ class test_message(gr_unittest.TestCase): msg = gr.message(666) def test_300(self): - input_data = [0,1,2,3,4,5,6,7,8,9] + input_data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] src = blocks.vector_source_b(input_data) dst = blocks.vector_sink_b() tb = gr.top_block() diff --git a/gr-blocks/python/blocks/qa_min.py b/gr-blocks/python/blocks/qa_min.py index 1cd8f31e82..3ae46257a3 100644 --- a/gr-blocks/python/blocks/qa_min.py +++ b/gr-blocks/python/blocks/qa_min.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_min(gr_unittest.TestCase): def setUp(self): @@ -23,7 +24,7 @@ class test_min(gr_unittest.TestCase): def test_001(self): src_data = [0, 0.2, -0.25, 0, 12, 0] - expected_result = [float(min(src_data)),] + expected_result = [float(min(src_data)), ] src = blocks.vector_source_f(src_data) s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) @@ -36,8 +37,8 @@ class test_min(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def stest_002(self): - src_data=[-100,-99,-98,-97,-96,-1] - expected_result = [float(min(src_data)),] + src_data = [-100, -99, -98, -97, -96, -1] + expected_result = [float(min(src_data)), ] src = blocks.vector_source_f(src_data) s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) @@ -51,9 +52,10 @@ class test_min(gr_unittest.TestCase): def stest_003(self): src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] - expected_result = [float(min(x,y)) for x,y in zip(src_data0, src_data1)] + expected_result = [float(min(x, y)) + for x, y in zip(src_data0, src_data1)] src0 = blocks.vector_source_f(src_data0) src1 = blocks.vector_source_f(src_data1) @@ -70,17 +72,17 @@ class test_min(gr_unittest.TestCase): def stest_004(self): dim = 2 src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] expected_data = [] - tmp = [float(min(x,y)) for x,y in zip(src_data0, src_data1)] + tmp = [float(min(x, y)) for x, y in zip(src_data0, src_data1)] for i in range(len(tmp) / dim): - expected_data.append(float(min(tmp[i*dim:(i+1)*dim]))) + expected_data.append(float(min(tmp[i * dim:(i + 1) * dim]))) src0 = blocks.vector_source_f(src_data0) - s2v0 = blocks.stream_to_vector(gr.sizeof_float,dim) + s2v0 = blocks.stream_to_vector(gr.sizeof_float, dim) src1 = blocks.vector_source_f(src_data1) - s2v1 = blocks.stream_to_vector(gr.sizeof_float,dim) + s2v1 = blocks.stream_to_vector(gr.sizeof_float, dim) op = blocks.min_ff(dim) dst = blocks.vector_sink_f() @@ -91,13 +93,12 @@ class test_min(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) - def stest_s001(self): src_data = [0, 2, -3, 0, 12, 0] - expected_result = [min(src_data),] + expected_result = [min(src_data), ] src = blocks.vector_source_s(src_data) - s2v = blocks.stream_to_vector(gr.sizeof_short,len(src_data)) + s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data)) op = blocks.min_ss(len(src_data)) dst = blocks.vector_sink_s() @@ -107,8 +108,8 @@ class test_min(gr_unittest.TestCase): self.assertEqual(expected_result, result_data) def stest_s002(self): - src_data=[-100,-99,-98,-97,-96,-1] - expected_result = [min(src_data),] + src_data = [-100, -99, -98, -97, -96, -1] + expected_result = [min(src_data), ] src = blocks.vector_source_s(src_data) s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data)) @@ -120,12 +121,11 @@ class test_min(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) - def stest_s003(self): src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] - expected_result = [min(x,y) for x,y in zip(src_data0, src_data1)] + expected_result = [min(x, y) for x, y in zip(src_data0, src_data1)] src0 = blocks.vector_source_s(src_data0) src1 = blocks.vector_source_s(src_data1) @@ -142,17 +142,17 @@ class test_min(gr_unittest.TestCase): def stest_s004(self): dim = 2 src_data0 = [0, 2, -3, 0, 12, 0] - src_data1 = [1, 1, 1, 1, 1, 1] + src_data1 = [1, 1, 1, 1, 1, 1] expected_data = [] - tmp = [min(x,y) for x,y in zip(src_data0, src_data1)] + tmp = [min(x, y) for x, y in zip(src_data0, src_data1)] for i in range(len(tmp) / dim): - expected_data.append(min(tmp[i*dim:(i+1)*dim])) + expected_data.append(min(tmp[i * dim:(i + 1) * dim])) src0 = blocks.vector_source_s(src_data0) - s2v0 = blocks.stream_to_vector(gr.sizeof_short,dim) + s2v0 = blocks.stream_to_vector(gr.sizeof_short, dim) src1 = blocks.vector_source_s(src_data1) - s2v1 = blocks.stream_to_vector(gr.sizeof_short,dim) + s2v1 = blocks.stream_to_vector(gr.sizeof_short, dim) op = blocks.min_ss(dim) dst = blocks.vector_sink_s() @@ -163,5 +163,6 @@ class test_min(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) + if __name__ == '__main__': gr_unittest.run(test_min) diff --git a/gr-blocks/python/blocks/qa_moving_average.py b/gr-blocks/python/blocks/qa_moving_average.py index 1691eac71e..8f4169d05c 100644 --- a/gr-blocks/python/blocks/qa_moving_average.py +++ b/gr-blocks/python/blocks/qa_moving_average.py @@ -11,27 +11,31 @@ from gnuradio import gr, gr_unittest, blocks -import math, random +import math +import random + def make_random_complex_tuple(L, scale=1): result = [] for x in range(L): - result.append(scale*complex(2*random.random()-1, - 2*random.random()-1)) + result.append(scale * complex(2 * random.random() - 1, + 2 * random.random() - 1)) return tuple(result) + def make_random_float_tuple(L, scale=1): result = [] for x in range(L): - result.append(scale*(2*random.random()-1)) + result.append(scale * (2 * random.random() - 1)) return tuple(result) + class test_moving_average(gr_unittest.TestCase): def assertListAlmostEqual(self, list1, list2, tol): self.assertEqual(len(list1), len(list2)) for a, b in zip(list1, list2): self.assertAlmostEqual(a, b, tol) - + def setUp(self): random.seed(0) self.tb = gr.top_block() @@ -48,10 +52,10 @@ class test_moving_average(gr_unittest.TestCase): N = 10000 data = make_random_float_tuple(N, 1) - expected_result = N*[0,] + expected_result = N * [0, ] src = blocks.vector_source_f(data, False) - op = blocks.moving_average_ff(100, 0.001) + op = blocks.moving_average_ff(100, 0.001) dst = blocks.vector_sink_f() tb.connect(src, op) @@ -68,10 +72,10 @@ class test_moving_average(gr_unittest.TestCase): N = 10000 data = make_random_complex_tuple(N, 1) - expected_result = N*[0,] + expected_result = N * [0, ] src = blocks.vector_source_c(data, False) - op = blocks.moving_average_cc(100, 0.001) + op = blocks.moving_average_cc(100, 0.001) dst = blocks.vector_sink_c() tb.connect(src, op) @@ -83,20 +87,21 @@ class test_moving_average(gr_unittest.TestCase): # make sure result is close to zero self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) - # This tests implement own moving average to verify correct behaviour of the block + # This tests implement own moving average to verify correct behaviour of + # the block def test_vector_int(self): tb = self.tb vlen = 5 - N = 10*vlen + N = 10 * vlen data = make_random_float_tuple(N, 2**10) - data = [int(d*1000) for d in data] + data = [int(d * 1000) for d in data] src = blocks.vector_source_i(data, False) one_to_many = blocks.stream_to_streams(gr.sizeof_int, vlen) one_to_vector = blocks.stream_to_vector(gr.sizeof_int, vlen) many_to_vector = blocks.streams_to_vector(gr.sizeof_int, vlen) - isolated = [ blocks.moving_average_ii(100, 1) for i in range(vlen)] + isolated = [blocks.moving_average_ii(100, 1) for i in range(vlen)] dut = blocks.moving_average_ii(100, 1, vlen=vlen) dut_dst = blocks.vector_sink_i(vlen=vlen) ref_dst = blocks.vector_sink_i(vlen=vlen) @@ -105,7 +110,7 @@ class test_moving_average(gr_unittest.TestCase): tb.connect(src, one_to_vector, dut, dut_dst) tb.connect(many_to_vector, ref_dst) for idx, single in enumerate(isolated): - tb.connect((one_to_many,idx), single, (many_to_vector,idx)) + tb.connect((one_to_many, idx), single, (many_to_vector, idx)) tb.run() @@ -119,13 +124,13 @@ class test_moving_average(gr_unittest.TestCase): tb = self.tb vlen = 5 - N = 10*vlen + N = 10 * vlen data = make_random_complex_tuple(N, 2**10) src = blocks.vector_source_c(data, False) one_to_many = blocks.stream_to_streams(gr.sizeof_gr_complex, vlen) one_to_vector = blocks.stream_to_vector(gr.sizeof_gr_complex, vlen) many_to_vector = blocks.streams_to_vector(gr.sizeof_gr_complex, vlen) - isolated = [ blocks.moving_average_cc(100, 1) for i in range(vlen)] + isolated = [blocks.moving_average_cc(100, 1) for i in range(vlen)] dut = blocks.moving_average_cc(100, 1, vlen=vlen) dut_dst = blocks.vector_sink_c(vlen=vlen) ref_dst = blocks.vector_sink_c(vlen=vlen) @@ -134,7 +139,7 @@ class test_moving_average(gr_unittest.TestCase): tb.connect(src, one_to_vector, dut, dut_dst) tb.connect(many_to_vector, ref_dst) for idx, single in enumerate(isolated): - tb.connect((one_to_many,idx), single, (many_to_vector,idx)) + tb.connect((one_to_many, idx), single, (many_to_vector, idx)) tb.run() @@ -152,26 +157,28 @@ class test_moving_average(gr_unittest.TestCase): data = make_random_complex_tuple(N, 1) # generate random data # pythonic MA filter - data_padded = (history-1)*[complex(0.0, 0.0)]+list(data) # history + data_padded = (history - 1) * \ + [complex(0.0, 0.0)] + list(data) # history expected_result = [] - moving_sum = sum(data_padded[:history-1]) + moving_sum = sum(data_padded[:history - 1]) for i in range(N): - moving_sum += data_padded[i+history-1] + moving_sum += data_padded[i + history - 1] expected_result.append(moving_sum) moving_sum -= data_padded[i] src = blocks.vector_source_c(data, False) - op = blocks.moving_average_cc(history, 1) + op = blocks.moving_average_cc(history, 1) dst = blocks.vector_sink_c() - + tb.connect(src, op) tb.connect(op, dst) tb.run() - + dst_data = dst.data() # make sure result is close to zero self.assertListAlmostEqual(expected_result, dst_data, 4) + if __name__ == '__main__': gr_unittest.run(test_moving_average) diff --git a/gr-blocks/python/blocks/qa_multiply_conjugate.py b/gr-blocks/python/blocks/qa_multiply_conjugate.py index 3590a703ae..50d48ec672 100644 --- a/gr-blocks/python/blocks/qa_multiply_conjugate.py +++ b/gr-blocks/python/blocks/qa_multiply_conjugate.py @@ -11,36 +11,38 @@ from gnuradio import gr, gr_unittest, blocks + class test_multiply_conjugate (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_000 (self): - src_data0 = [-2-2j, -1-1j, -2+2j, -1+1j, - 2-2j, 1-1j, 2+2j, 1+1j, - 0+0j] - src_data1 = [-3-3j, -4-4j, -3+3j, -4+4j, - 3-3j, 4-4j, 3+3j, 4+4j, - 0+0j] - - exp_data = [12+0j, 8+0j, 12+0j, 8+0j, - 12+0j, 8+0j, 12+0j, 8+0j, - 0+0j] + def test_000(self): + src_data0 = [-2 - 2j, -1 - 1j, -2 + 2j, -1 + 1j, + 2 - 2j, 1 - 1j, 2 + 2j, 1 + 1j, + 0 + 0j] + src_data1 = [-3 - 3j, -4 - 4j, -3 + 3j, -4 + 4j, + 3 - 3j, 4 - 4j, 3 + 3j, 4 + 4j, + 0 + 0j] + + exp_data = [12 + 0j, 8 + 0j, 12 + 0j, 8 + 0j, + 12 + 0j, 8 + 0j, 12 + 0j, 8 + 0j, + 0 + 0j] src0 = blocks.vector_source_c(src_data0) src1 = blocks.vector_source_c(src_data1) - op = blocks.multiply_conjugate_cc () - dst = blocks.vector_sink_c () + op = blocks.multiply_conjugate_cc() + dst = blocks.vector_sink_c() - self.tb.connect(src0, (op,0)) - self.tb.connect(src1, (op,1)) + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) self.tb.connect(op, dst) self.tb.run() - result_data = dst.data () - self.assertEqual (exp_data, result_data) + result_data = dst.data() + self.assertEqual(exp_data, result_data) + if __name__ == '__main__': gr_unittest.run(test_multiply_conjugate) diff --git a/gr-blocks/python/blocks/qa_multiply_matrix_xx.py b/gr-blocks/python/blocks/qa_multiply_matrix_xx.py index 3b877d9d27..c908bf52e9 100644 --- a/gr-blocks/python/blocks/qa_multiply_matrix_xx.py +++ b/gr-blocks/python/blocks/qa_multiply_matrix_xx.py @@ -1,13 +1,13 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -# +# # Copyright 2014 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# import time @@ -19,36 +19,37 @@ from gnuradio import blocks BLOCK_LOOKUP = { 'float': { - 'src': blocks.vector_source_f, + 'src': blocks.vector_source_f, 'sink': blocks.vector_sink_f, 'mult': blocks.multiply_matrix_ff, }, 'complex': { - 'src': blocks.vector_source_c, + 'src': blocks.vector_source_c, 'sink': blocks.vector_sink_c, 'mult': blocks.multiply_matrix_cc, }, } + class test_multiply_matrix_xx (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() self.multiplier = None - def tearDown (self): + def tearDown(self): self.tb = None self.multiplier = None def run_once(self, - X_in, - A, - tpp=gr.TPP_DONT, - A2=None, - tags=None, - msg_A=None, - datatype='float', - ): + X_in, + A, + tpp=gr.TPP_DONT, + A2=None, + tags=None, + msg_A=None, + datatype='float', + ): """ Run the test for given input-, output- and matrix values. Every row from X_in is considered an input signal on a port. """ X_in = numpy.matrix(X_in) @@ -68,9 +69,11 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): else: these_tags = (tags[i],) self.tb.connect( - BLOCK_LOOKUP[datatype]['src'](X_in[i].tolist()[0], tags=these_tags), - (self.multiplier, i) - ) + BLOCK_LOOKUP[datatype]['src']( + X_in[i].tolist()[0], + tags=these_tags), + (self.multiplier, + i)) sinks = [] for i in range(M): sinks.append(BLOCK_LOOKUP[datatype]['sink']()) @@ -78,7 +81,7 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): # Run and check self.tb.run() for i in range(X_in.shape[1]): - Y_out_exp[:,i] = A_matrix * X_in[:,i] + Y_out_exp[:, i] = A_matrix * X_in[:, i] Y_out = [list(x.data()) for x in sinks] if tags is not None: self.the_tags = [] @@ -86,8 +89,7 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): self.the_tags.append(sinks[i].tags()) self.assertEqual(list(Y_out), Y_out_exp.tolist()) - - def test_001_t (self): + def test_001_t(self): """ Simplest possible check: N==M, unit matrix """ X_in = ( (1, 2, 3, 4), @@ -99,7 +101,7 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): ) self.run_once(X_in, A) - def test_001_t_complex (self): + def test_001_t_complex(self): """ Simplest possible check: N==M, unit matrix """ X_in = ( (1, 2, 3, 4), @@ -111,7 +113,7 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): ) self.run_once(X_in, A, datatype='complex') - def test_002_t (self): + def test_002_t(self): """ Switch check: N==M, flipped unit matrix """ X_in = ( (1, 2, 3, 4), @@ -123,7 +125,7 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): ) self.run_once(X_in, A) - def test_003_t (self): + def test_003_t(self): """ Average """ X_in = ( (1, 1, 1, 1), @@ -135,7 +137,7 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): ) self.run_once(X_in, A) - def test_004_t (self): + def test_004_t(self): """ Set """ X_in = ( (1, 2, 3, 4), @@ -151,14 +153,14 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): ) self.run_once(X_in, A1, A2=A2) - def test_005_t (self): + def test_005_t(self): """ Tags """ X_in = ( (1, 2, 3, 4), (5, 6, 7, 8), ) A = ( - (0, 1), # Flip them round + (0, 1), # Flip them round (1, 0), ) tag1 = gr.tag_t() @@ -173,24 +175,22 @@ class test_multiply_matrix_xx (gr_unittest.TestCase): self.assertTrue(pmt.equal(tag1.key, self.the_tags[0][0].key)) self.assertTrue(pmt.equal(tag2.key, self.the_tags[1][0].key)) - #def test_006_t (self): + # def test_006_t (self): #""" Message passing """ - #X_in = ( - #(1, 2, 3, 4), - #(5, 6, 7, 8), - #) - #A1 = ( - #(1, 0), - #(0, 1), - #) - #msg_A = ( - #(0, 1), - #(1, 0), - #) + # X_in = ( + #(1, 2, 3, 4), + #(5, 6, 7, 8), + # ) + # A1 = ( + #(1, 0), + #(0, 1), + # ) + # msg_A = ( + #(0, 1), + #(1, 0), + # ) #self.run_once(X_in, A1, msg_A=msg_A) - if __name__ == '__main__': gr_unittest.run(test_multiply_matrix_xx) - diff --git a/gr-blocks/python/blocks/qa_mute.py b/gr-blocks/python/blocks/qa_mute.py index ae789f1a13..e5aaf76798 100644 --- a/gr-blocks/python/blocks/qa_mute.py +++ b/gr-blocks/python/blocks/qa_mute.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_mute(gr_unittest.TestCase): def setUp(self): @@ -62,16 +63,17 @@ class test_mute(gr_unittest.TestCase): self.help_ii((src_data,), expected_result, op) def test_unmute_cc(self): - src_data = [1+5j, 2+5j, 3+5j, 4+5j, 5+5j] - expected_result = [1+5j, 2+5j, 3+5j, 4+5j, 5+5j] + src_data = [1 + 5j, 2 + 5j, 3 + 5j, 4 + 5j, 5 + 5j] + expected_result = [1 + 5j, 2 + 5j, 3 + 5j, 4 + 5j, 5 + 5j] op = blocks.mute_cc(False) self.help_cc((src_data,), expected_result, op) def test_unmute_cc(self): - src_data = [1+5j, 2+5j, 3+5j, 4+5j, 5+5j] - expected_result =[0+0j, 0+0j, 0+0j, 0+0j, 0+0j] + src_data = [1 + 5j, 2 + 5j, 3 + 5j, 4 + 5j, 5 + 5j] + expected_result = [0 + 0j, 0 + 0j, 0 + 0j, 0 + 0j, 0 + 0j] op = blocks.mute_cc(True) self.help_cc((src_data,), expected_result, op) + if __name__ == '__main__': gr_unittest.run(test_mute) diff --git a/gr-blocks/python/blocks/qa_nlog10.py b/gr-blocks/python/blocks/qa_nlog10.py index 92a5e22ee7..e6fb3b942b 100644 --- a/gr-blocks/python/blocks/qa_nlog10.py +++ b/gr-blocks/python/blocks/qa_nlog10.py @@ -11,12 +11,13 @@ from gnuradio import gr, gr_unittest, blocks + class test_nlog10(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_001(self): @@ -25,12 +26,11 @@ class test_nlog10(gr_unittest.TestCase): src = blocks.vector_source_f(src_data) op = blocks.nlog10_ff(10) dst = blocks.vector_sink_f() - self.tb.connect (src, op, dst) + self.tb.connect(src, op, dst) self.tb.run() result_data = dst.data() - self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5) + self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5) if __name__ == '__main__': gr_unittest.run(test_nlog10) - diff --git a/gr-blocks/python/blocks/qa_null_sink_source.py b/gr-blocks/python/blocks/qa_null_sink_source.py index d903eb9c0c..b8acdfc027 100644 --- a/gr-blocks/python/blocks/qa_null_sink_source.py +++ b/gr-blocks/python/blocks/qa_null_sink_source.py @@ -12,6 +12,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_null_sink_source(gr_unittest.TestCase): def setUp(self): @@ -29,6 +30,6 @@ class test_null_sink_source(gr_unittest.TestCase): self.tb.connect(src, hed, dst) self.tb.run() + if __name__ == '__main__': gr_unittest.run(test_null_sink_source) - diff --git a/gr-blocks/python/blocks/qa_pack_k_bits.py b/gr-blocks/python/blocks/qa_pack_k_bits.py index 435325cd40..b38f9c519d 100644 --- a/gr-blocks/python/blocks/qa_pack_k_bits.py +++ b/gr-blocks/python/blocks/qa_pack_k_bits.py @@ -13,6 +13,7 @@ import random from gnuradio import gr, gr_unittest, blocks + class test_pack(gr_unittest.TestCase): def setUp(self): @@ -23,9 +24,9 @@ class test_pack(gr_unittest.TestCase): self.tb = None def test_001(self): - src_data = [1,0,1,1,0,1,1,0] - expected_results = [1,0,1,1,0,1,1,0] - src = blocks.vector_source_b(src_data,False) + src_data = [1, 0, 1, 1, 0, 1, 1, 0] + expected_results = [1, 0, 1, 1, 0, 1, 1, 0] + src = blocks.vector_source_b(src_data, False) op = blocks.pack_k_bits_bb(1) dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) @@ -33,9 +34,9 @@ class test_pack(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_002(self): - src_data = [1,0,1,1,0,0,0,1] - expected_results = [ 2, 3, 0, 1] - src = blocks.vector_source_b(src_data,False) + src_data = [1, 0, 1, 1, 0, 0, 0, 1] + expected_results = [2, 3, 0, 1] + src = blocks.vector_source_b(src_data, False) op = blocks.pack_k_bits_bb(2) dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) @@ -44,15 +45,15 @@ class test_pack(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_003(self): - src_data = expected_results = [random.randint(0,3) for x in range(10)]; - src = blocks.vector_source_b( src_data ) + src_data = expected_results = [random.randint(0, 3) for x in range(10)] + src = blocks.vector_source_b(src_data) pack = blocks.pack_k_bits_bb(2) unpack = blocks.unpack_k_bits_bb(2) snk = blocks.vector_sink_b() - self.tb.connect(src,unpack,pack,snk) + self.tb.connect(src, unpack, pack, snk) self.tb.run() self.assertEqual(list(expected_results), list(snk.data())) -if __name__ == '__main__': - gr_unittest.run(test_pack) +if __name__ == '__main__': + gr_unittest.run(test_pack) diff --git a/gr-blocks/python/blocks/qa_packed_to_unpacked.py b/gr-blocks/python/blocks/qa_packed_to_unpacked.py index 5965b6c067..b3bc0d93e4 100644 --- a/gr-blocks/python/blocks/qa_packed_to_unpacked.py +++ b/gr-blocks/python/blocks/qa_packed_to_unpacked.py @@ -13,18 +13,19 @@ from gnuradio import gr, gr_unittest, blocks import random + class test_packing(gr_unittest.TestCase): def setUp(self): random.seed(0) - self.tb = gr.top_block () + self.tb = gr.top_block() def tearDown(self): self.tb = None def test_001(self): - src_data = [0x80,] - expected_results = [1,0,0,0,0,0,0,0] + src_data = [0x80, ] + expected_results = [1, 0, 0, 0, 0, 0, 0, 0] src = blocks.vector_source_b(src_data, False) op = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) dst = blocks.vector_sink_b() @@ -36,8 +37,8 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_002(self): - src_data = [0x80,] - expected_results = [0,0,0,0,0,0,0,1] + src_data = [0x80, ] + expected_results = [0, 0, 0, 0, 0, 0, 0, 1] src = blocks.vector_source_b(src_data, False) op = blocks.packed_to_unpacked_bb(1, gr.GR_LSB_FIRST) dst = blocks.vector_sink_b() @@ -49,7 +50,7 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_003(self): - src_data = [0x11,] + src_data = [0x11, ] expected_results = [4, 2] src = blocks.vector_source_b(src_data, False) op = blocks.packed_to_unpacked_bb(3, gr.GR_LSB_FIRST) @@ -62,7 +63,7 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_004(self): - src_data = [0x11,] + src_data = [0x11, ] expected_results = [0, 4] src = blocks.vector_source_b(src_data, False) op = blocks.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST) @@ -75,7 +76,7 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_005(self): - src_data = [1,0,0,0,0,0,1,0,0,1,0,1,1,0,1,0] + src_data = [1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0] expected_results = [0x82, 0x5a] src = blocks.vector_source_b(src_data, False) op = blocks.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) @@ -88,7 +89,7 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_006(self): - src_data = [0,1,0,0,0,0,0,1,0,1,0,1,1,0,1,0] + src_data = [0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 0] expected_results = [0x82, 0x5a] src = blocks.vector_source_b(src_data, False) op = blocks.unpacked_to_packed_bb(1, gr.GR_LSB_FIRST) @@ -101,8 +102,8 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_007(self): - src_data = [4, 2, 0,0,0] - expected_results = [0x11,] + src_data = [4, 2, 0, 0, 0] + expected_results = [0x11, ] src = blocks.vector_source_b(src_data, False) op = blocks.unpacked_to_packed_bb(3, gr.GR_LSB_FIRST) dst = blocks.vector_sink_b() @@ -114,9 +115,9 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_008(self): - src_data = [0, 4, 2,0,0] - expected_results = [0x11,] - src = blocks.vector_source_b(src_data,False) + src_data = [0, 4, 2, 0, 0] + expected_results = [0x11, ] + src = blocks.vector_source_b(src_data, False) op = blocks.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST) dst = blocks.vector_sink_b() @@ -130,7 +131,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(202): - src_data.append((random.randint(0,255))) + src_data.append((random.randint(0, 255))) src_data = src_data expected_results = src_data @@ -149,7 +150,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(56): - src_data.append((random.randint(0,255))) + src_data.append((random.randint(0, 255))) src_data = src_data expected_results = src_data src = blocks.vector_source_b(tuple(src_data), False) @@ -167,10 +168,10 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(56): - src_data.append((random.randint(0,255))) + src_data.append((random.randint(0, 255))) src_data = src_data expected_results = src_data - src = blocks.vector_source_b(tuple(src_data),False) + src = blocks.vector_source_b(tuple(src_data), False) op1 = blocks.packed_to_unpacked_bb(7, gr.GR_LSB_FIRST) op2 = blocks.unpacked_to_packed_bb(7, gr.GR_LSB_FIRST) dst = blocks.vector_sink_b() @@ -181,13 +182,13 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results[0:201], dst.data()) - # tests on shorts + def test_100a(self): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**15,2**15-1))) + src_data.append((random.randint(-2**15, 2**15 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_s(tuple(src_data), False) @@ -205,7 +206,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**15,2**15-1))) + src_data.append((random.randint(-2**15, 2**15 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_s(tuple(src_data), False) @@ -223,7 +224,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**15,2**15-1))) + src_data.append((random.randint(-2**15, 2**15 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_s(tuple(src_data), False) @@ -241,7 +242,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**15,2**15-1))) + src_data.append((random.randint(-2**15, 2**15 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_s(tuple(src_data), False) @@ -255,13 +256,13 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) - # tests on ints + def test_200a(self): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**31,2**31-1))) + src_data.append((random.randint(-2**31, 2**31 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_i(tuple(src_data), False) @@ -279,7 +280,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**31,2**31-1))) + src_data.append((random.randint(-2**31, 2**31 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_i(tuple(src_data), False) @@ -297,7 +298,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**31,2**31-1))) + src_data.append((random.randint(-2**31, 2**31 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_i(tuple(src_data), False) @@ -315,7 +316,7 @@ class test_packing(gr_unittest.TestCase): random.seed(0) src_data = [] for i in range(100): - src_data.append((random.randint(-2**31,2**31-1))) + src_data.append((random.randint(-2**31, 2**31 - 1))) src_data = src_data expected_results = src_data src = blocks.vector_source_i(tuple(src_data), False) @@ -329,6 +330,6 @@ class test_packing(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) -if __name__ == '__main__': - gr_unittest.run(test_packing) +if __name__ == '__main__': + gr_unittest.run(test_packing) diff --git a/gr-blocks/python/blocks/qa_patterned_interleaver.py b/gr-blocks/python/blocks/qa_patterned_interleaver.py index 02790eaafc..bc032c5203 100644 --- a/gr-blocks/python/blocks/qa_patterned_interleaver.py +++ b/gr-blocks/python/blocks/qa_patterned_interleaver.py @@ -13,30 +13,32 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_patterned_interleaver (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_000(self): - dst_data = [0,0,1,2,0,2,1,0]; - src0 = blocks.vector_source_f(200*[0]) - src1 = blocks.vector_source_f(200*[1]) - src2 = blocks.vector_source_f(200*[2]) + dst_data = [0, 0, 1, 2, 0, 2, 1, 0] + src0 = blocks.vector_source_f(200 * [0]) + src1 = blocks.vector_source_f(200 * [1]) + src2 = blocks.vector_source_f(200 * [2]) itg = blocks.patterned_interleaver(gr.sizeof_float, dst_data) dst = blocks.vector_sink_f() - head = blocks.head(gr.sizeof_float, 8); + head = blocks.head(gr.sizeof_float, 8) - self.tb.connect( src0, (itg,0) ); - self.tb.connect( src1, (itg,1) ); - self.tb.connect( src2, (itg,2) ); - self.tb.connect( itg, head, dst ); + self.tb.connect(src0, (itg, 0)) + self.tb.connect(src1, (itg, 1)) + self.tb.connect(src2, (itg, 2)) + self.tb.connect(itg, head, dst) self.tb.run() self.assertEqual(list(dst_data), list(dst.data())) + if __name__ == '__main__': gr_unittest.run(test_patterned_interleaver) diff --git a/gr-blocks/python/blocks/qa_pdu.py b/gr-blocks/python/blocks/qa_pdu.py index 97d9854c55..e3d0aad8e2 100644 --- a/gr-blocks/python/blocks/qa_pdu.py +++ b/gr-blocks/python/blocks/qa_pdu.py @@ -14,6 +14,7 @@ import time from gnuradio import gr, gr_unittest, blocks import pmt + class test_pdu(gr_unittest.TestCase): def setUp(self): @@ -37,7 +38,8 @@ class test_pdu(gr_unittest.TestCase): # Test that the right number of ports exist. pi = snk3.message_ports_in() po = snk3.message_ports_out() - self.assertEqual(pmt.length(pi), 1) #system port is defined automatically + # system port is defined automatically + self.assertEqual(pmt.length(pi), 1) self.assertEqual(pmt.length(po), 1) self.tb.connect(src, snk) @@ -47,12 +49,13 @@ class test_pdu(gr_unittest.TestCase): # make our reference and message pmts port = pmt.intern("pdus") - msg = pmt.cons( pmt.PMT_NIL, pmt.make_u8vector(16, 0xFF)) + msg = pmt.cons(pmt.PMT_NIL, pmt.make_u8vector(16, 0xFF)) # post the message src.to_basic_block()._post(port, msg) - src.to_basic_block()._post(pmt.intern("system"), - pmt.cons(pmt.intern("done"), pmt.from_long(1))) + src.to_basic_block()._post( + pmt.intern("system"), pmt.cons( + pmt.intern("done"), pmt.from_long(1))) self.tb.start() self.tb.wait() @@ -64,19 +67,19 @@ class test_pdu(gr_unittest.TestCase): # Convert the message PMT as a pair into its vector result_msg = dbg.get_message(0) msg_vec = pmt.cdr(result_msg) - #pmt.print(msg_vec) + # pmt.print(msg_vec) # Convert the PMT vector into a Python list msg_data = [] for i in range(16): msg_data.append(pmt.u8vector_ref(msg_vec, i)) - actual_data = 16*[0xFF,] + actual_data = 16 * [0xFF, ] self.assertEqual(actual_data, list(result_data)) self.assertEqual(actual_data, msg_data) def test_001(self): - #Test the overflow buffer in pdu_to_tagged_stream + # Test the overflow buffer in pdu_to_tagged_stream src_data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] src = blocks.pdu_to_tagged_stream(blocks.float_t) snk = blocks.vector_sink_f() @@ -84,16 +87,16 @@ class test_pdu(gr_unittest.TestCase): self.tb.connect(src, snk) port = pmt.intern("pdus") - msg = pmt.cons( pmt.PMT_NIL, pmt.init_f32vector(10, src_data)) + msg = pmt.cons(pmt.PMT_NIL, pmt.init_f32vector(10, src_data)) src.to_basic_block()._post(port, msg) - src.to_basic_block()._post(pmt.intern("system"), - pmt.cons(pmt.intern("done"), pmt.from_long(1))) + src.to_basic_block()._post( + pmt.intern("system"), pmt.cons( + pmt.intern("done"), pmt.from_long(1))) self.tb.start() self.tb.wait() - self.assertEqual(src_data, list(snk.data()) ) - + self.assertEqual(src_data, list(snk.data())) def test_002_tags_plus_data(self): packet_len = 16 @@ -103,11 +106,15 @@ class test_pdu(gr_unittest.TestCase): tag1.key = pmt.string_to_symbol('spam') tag1.value = pmt.from_long(23) tag2 = gr.tag_t() - tag2.offset = 10 # Must be < packet_len + tag2.offset = 10 # Must be < packet_len tag2.key = pmt.string_to_symbol('eggs') tag2.value = pmt.from_long(42) src = blocks.vector_source_f(src_data, tags=(tag1, tag2)) - s2ts = blocks.stream_to_tagged_stream(gr.sizeof_float, vlen=1, packet_len=packet_len, len_tag_key="packet_len") + s2ts = blocks.stream_to_tagged_stream( + gr.sizeof_float, + vlen=1, + packet_len=packet_len, + len_tag_key="packet_len") ts2pdu = blocks.tagged_stream_to_pdu(blocks.float_t, "packet_len") dbg = blocks.message_debug() self.tb.connect(src, s2ts, ts2pdu) @@ -116,7 +123,7 @@ class test_pdu(gr_unittest.TestCase): self.tb.wait() result_msg = dbg.get_message(0) metadata = pmt.to_python(pmt.car(result_msg)) - vector = pmt.f32vector_elements(pmt.cdr(result_msg)) + vector = pmt.f32vector_elements(pmt.cdr(result_msg)) self.assertEqual(metadata, {'eggs': 42, 'spam': 23}) self.assertFloatTuplesAlmostEqual(tuple(vector), src_data) diff --git a/gr-blocks/python/blocks/qa_peak_detector.py b/gr-blocks/python/blocks/qa_peak_detector.py index 297e6cc040..1fe4663e6b 100644 --- a/gr-blocks/python/blocks/qa_peak_detector.py +++ b/gr-blocks/python/blocks/qa_peak_detector.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_peak_detector(gr_unittest.TestCase): def setUp(self): @@ -82,5 +83,6 @@ class test_peak_detector(gr_unittest.TestCase): self.assertEqual(expected_result, dst_data) + if __name__ == '__main__': gr_unittest.run(test_peak_detector) diff --git a/gr-blocks/python/blocks/qa_peak_detector2.py b/gr-blocks/python/blocks/qa_peak_detector2.py index fbbca1d56d..ff6323bb0e 100644 --- a/gr-blocks/python/blocks/qa_peak_detector2.py +++ b/gr-blocks/python/blocks/qa_peak_detector2.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_peak_detector2(gr_unittest.TestCase): def setUp(self): @@ -20,16 +21,15 @@ class test_peak_detector2(gr_unittest.TestCase): self.tb = None def test_peak1(self): - #print "\n\nTEST 1" + # print "\n\nTEST 1" tb = self.tb - n=10 + n = 10 data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, - 9, 8, 7, 6, 5, 4, 3, 2, 1, 0,]+n*[0,] + 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, ] + n * [0, ] expected_result = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]+n*[0,] - + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + n * [0, ] src = blocks.vector_source_f(data, False) regen = blocks.peak_detector2_fb(7.0, 25, 0.001) @@ -44,19 +44,19 @@ class test_peak_detector2(gr_unittest.TestCase): self.assertEqual(expected_result, dst_data) def test_peak2(self): - #print "\n\nTEST 2" + # print "\n\nTEST 2" tb = self.tb - n=10 + n = 10 data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, - 9, 8, 7, 6, 5, 4, 3, 2, 1, 0,]+n*[0,] + 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, ] + n * [0, ] expected_result = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]+n*[0,] - + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + n * [0, ] src = blocks.vector_source_f(data, False) - regen = blocks.peak_detector2_fb(7.0, 1000, 0.001) # called with a LONG window + regen = blocks.peak_detector2_fb( + 7.0, 1000, 0.001) # called with a LONG window dst = blocks.vector_sink_b() tb.connect(src, regen) @@ -65,20 +65,19 @@ class test_peak_detector2(gr_unittest.TestCase): dst_data = dst.data() - # here we know that the block will terminate prematurely, so we compare only part of the expected_result + # here we know that the block will terminate prematurely, so we compare + # only part of the expected_result self.assertEqual(expected_result[0:len(dst_data)], dst_data) - def test_peak3(self): - #print "\n\nTEST 3" + # print "\n\nTEST 3" tb = self.tb l = 8100 m = 100 n = 10 - data = l*[0,]+ [10,]+ m*[0,]+[100,]+ n*[0,] - expected_result = l*[0,]+ [0,]+ m*[0,]+[1,]+ n*[0,] - + data = l * [0, ] + [10, ] + m * [0, ] + [100, ] + n * [0, ] + expected_result = l * [0, ] + [0, ] + m * [0, ] + [1, ] + n * [0, ] src = blocks.vector_source_f(data, False) regen = blocks.peak_detector2_fb(7.0, 105, 0.001) @@ -92,17 +91,15 @@ class test_peak_detector2(gr_unittest.TestCase): self.assertEqual(expected_result, dst_data) - def test_peak4(self): - #print "\n\nTEST 4" + # print "\n\nTEST 4" tb = self.tb l = 8100 m = 100 n = 10 - data = l*[0,]+ [10,]+ m*[0,]+[100,]+ n*[0,] - expected_result = l*[0,]+ [0,]+ m*[0,]+[1,]+ n*[0,] - + data = l * [0, ] + [10, ] + m * [0, ] + [100, ] + n * [0, ] + expected_result = l * [0, ] + [0, ] + m * [0, ] + [1, ] + n * [0, ] src = blocks.vector_source_f(data, False) regen = blocks.peak_detector2_fb(7.0, 150, 0.001) @@ -114,7 +111,8 @@ class test_peak_detector2(gr_unittest.TestCase): dst_data = dst.data() - # here we know that the block will terminate prematurely, so we compare only part of the expected_result + # here we know that the block will terminate prematurely, so we compare + # only part of the expected_result self.assertEqual(expected_result[0:len(dst_data)], dst_data) diff --git a/gr-blocks/python/blocks/qa_pipe_fittings.py b/gr-blocks/python/blocks/qa_pipe_fittings.py index 6c0d799f40..db91b45a7e 100644 --- a/gr-blocks/python/blocks/qa_pipe_fittings.py +++ b/gr-blocks/python/blocks/qa_pipe_fittings.py @@ -11,10 +11,11 @@ from gnuradio import gr, gr_unittest, blocks + def calc_expected_result(src_data, n): assert (len(src_data) % n) == 0 result = [list() for x in range(n)] - #print "len(result) =", len(result) + # print "len(result) =", len(result) for i in range(len(src_data)): (result[i % n]).append(src_data[i]) return result @@ -23,7 +24,7 @@ def calc_expected_result(src_data, n): class test_pipe_fittings(gr_unittest.TestCase): def setUp(self): - self.tb = gr.top_block () + self.tb = gr.top_block() def tearDown(self): self.tb = None @@ -37,7 +38,7 @@ class test_pipe_fittings(gr_unittest.TestCase): src_data = list(range(src_len)) expected_results = calc_expected_result(src_data, n) - #print "expected results: ", expected_results + # print "expected results: ", expected_results src = blocks.vector_source_i(src_data) op = blocks.stream_to_streams(gr.sizeof_int, n) self.tb.connect(src, op) @@ -77,7 +78,7 @@ class test_pipe_fittings(gr_unittest.TestCase): def test_003(self): - #Test streams_to_vector (using stream_to_streams & vector_to_stream). + # Test streams_to_vector (using stream_to_streams & vector_to_stream). n = 8 src_len = n * 8 @@ -100,7 +101,7 @@ class test_pipe_fittings(gr_unittest.TestCase): def test_004(self): - #Test vector_to_streams. + # Test vector_to_streams. n = 8 src_len = n * 8 diff --git a/gr-blocks/python/blocks/qa_plateau_detector_fb.py b/gr-blocks/python/blocks/qa_plateau_detector_fb.py index 6fcabdb8b2..c14022caf8 100644 --- a/gr-blocks/python/blocks/qa_plateau_detector_fb.py +++ b/gr-blocks/python/blocks/qa_plateau_detector_fb.py @@ -11,22 +11,24 @@ from gnuradio import gr, gr_unittest, blocks + class qa_plateau_detector_fb (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_t (self): + def test_001_t(self): # | Spur spike 1 | Plateau | Spur spike 2 - test_signal = [0, 1, .2, .4, .6, .8, 1, 1, 1, 1, 1, .8, .6, .4, 1, 0] - expected_sig = [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0] + test_signal = [0, 1, .2, .4, .6, .8, 1, 1, 1, 1, 1, .8, .6, .4, 1, 0] + expected_sig = [0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0] # | Center of Plateau sink = blocks.vector_sink_b() - self.tb.connect(blocks.vector_source_f(test_signal), blocks.plateau_detector_fb(5), sink) - self.tb.run () + self.tb.connect(blocks.vector_source_f(test_signal), + blocks.plateau_detector_fb(5), sink) + self.tb.run() self.assertEqual(expected_sig, sink.data()) diff --git a/gr-blocks/python/blocks/qa_probe_signal.py b/gr-blocks/python/blocks/qa_probe_signal.py index 64df060441..712a3519fe 100644 --- a/gr-blocks/python/blocks/qa_probe_signal.py +++ b/gr-blocks/python/blocks/qa_probe_signal.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_probe_signal(gr_unittest.TestCase): def setUp(self): @@ -35,7 +36,7 @@ class test_probe_signal(gr_unittest.TestCase): def test_002(self): vector_length = 10 repeats = 10 - value = [0.5+i for i in range(0, vector_length)] + value = [0.5 + i for i in range(0, vector_length)] src_data = value * repeats src = blocks.vector_source_f(src_data) @@ -48,5 +49,6 @@ class test_probe_signal(gr_unittest.TestCase): self.assertEqual(len(output), vector_length) self.assertAlmostEqual(value[3], output[3], places=6) + if __name__ == '__main__': gr_unittest.run(test_probe_signal) diff --git a/gr-blocks/python/blocks/qa_python_message_passing.py b/gr-blocks/python/blocks/qa_python_message_passing.py index e5b0bc6d9f..d36145793a 100644 --- a/gr-blocks/python/blocks/qa_python_message_passing.py +++ b/gr-blocks/python/blocks/qa_python_message_passing.py @@ -15,20 +15,21 @@ import numpy import time # Simple block to generate messages + + class message_generator(gr.sync_block): def __init__(self, msg_list, msg_interval): gr.sync_block.__init__( self, - name = "message generator", - in_sig = [numpy.float32], - out_sig = None + name="message generator", + in_sig=[numpy.float32], + out_sig=None ) self.msg_list = msg_list self.msg_interval = msg_interval self.msg_ctr = 0 self.message_port_register_out(pmt.intern('out_port')) - def work(self, input_items, output_items): inLen = len(input_items[0]) while self.msg_ctr < len(self.msg_list) and \ @@ -40,13 +41,15 @@ class message_generator(gr.sync_block): return inLen # Simple block to consume messages + + class message_consumer(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, - name = "message consumer", - in_sig = None, - out_sig = None + name="message consumer", + in_sig=None, + out_sig=None ) self.msg_list = [] self.message_port_register_in(pmt.intern('in_port')) @@ -57,6 +60,7 @@ class message_consumer(gr.sync_block): # Create a new PMT from long value and put in list self.msg_list.append(pmt.from_long(pmt.to_long(msg))) + class test_python_message_passing(gr_unittest.TestCase): def setUp(self): @@ -74,7 +78,7 @@ class test_python_message_passing(gr_unittest.TestCase): # Create vector source with dummy data to trigger messages src_data = [] - for i in range(num_msgs*msg_interval): + for i in range(num_msgs * msg_interval): src_data.append(float(i)) src = blocks.vector_source_f(src_data, False) msg_gen = message_generator(msg_list, msg_interval) @@ -87,8 +91,13 @@ class test_python_message_passing(gr_unittest.TestCase): self.tb.msg_connect(msg_gen, 'out_port', msg_cons, 'in_port') # Verify that the messgae port query functions work - self.assertEqual(pmt.to_python(msg_gen.message_ports_out())[0], 'out_port') - self.assertEqual('in_port' in pmt.to_python(msg_cons.message_ports_in()), True) + self.assertEqual( + pmt.to_python( + msg_gen.message_ports_out())[0], + 'out_port') + self.assertEqual( + 'in_port' in pmt.to_python( + msg_cons.message_ports_in()), True) # Run to verify message passing self.tb.run() @@ -98,5 +107,6 @@ class test_python_message_passing(gr_unittest.TestCase): for i in range(num_msgs): self.assertTrue(pmt.equal(msg_list[i], msg_cons.msg_list[i])) + if __name__ == '__main__': gr_unittest.run(test_python_message_passing) diff --git a/gr-blocks/python/blocks/qa_regenerate.py b/gr-blocks/python/blocks/qa_regenerate.py index f5e92c2dc1..d8146abd71 100644 --- a/gr-blocks/python/blocks/qa_regenerate.py +++ b/gr-blocks/python/blocks/qa_regenerate.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_regenerate(gr_unittest.TestCase): def setUp(self): @@ -26,10 +27,50 @@ class test_regenerate(gr_unittest.TestCase): 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - expected_result = [0, 0, 0, - 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, - 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - + expected_result = [ + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0] src = blocks.vector_source_b(data, False) regen = blocks.regenerate_bb(5, 2) @@ -46,20 +87,20 @@ class test_regenerate(gr_unittest.TestCase): def test_regen2(self): tb = self.tb - data = 200*[0,] + data = 200 * [0, ] data[9] = 1 data[99] = 1 - expected_result = 200*[0,] - expected_result[9] = 1 - expected_result[19] = 1 - expected_result[29] = 1 - expected_result[39] = 1 + expected_result = 200 * [0, ] + expected_result[9] = 1 + expected_result[19] = 1 + expected_result[29] = 1 + expected_result[39] = 1 - expected_result[99] = 1 - expected_result[109] = 1 - expected_result[119] = 1 - expected_result[129] = 1 + expected_result[99] = 1 + expected_result[109] = 1 + expected_result[119] = 1 + expected_result[129] = 1 src = blocks.vector_source_b(data, False) regen = blocks.regenerate_bb(10, 3) @@ -67,7 +108,7 @@ class test_regenerate(gr_unittest.TestCase): tb.connect(src, regen) tb.connect(regen, dst) - tb.run () + tb.run() dst_data = dst.data() diff --git a/gr-blocks/python/blocks/qa_repack_bits_bb.py b/gr-blocks/python/blocks/qa_repack_bits_bb.py index 129a6795fd..8f41116a2a 100644 --- a/gr-blocks/python/blocks/qa_repack_bits_bb.py +++ b/gr-blocks/python/blocks/qa_repack_bits_bb.py @@ -13,17 +13,18 @@ import random from gnuradio import gr, gr_unittest, blocks import pmt + class qa_repack_bits_bb (gr_unittest.TestCase): - def setUp (self): + def setUp(self): random.seed(0) - self.tb = gr.top_block () + self.tb = gr.top_block() self.tsb_key = "length" - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_simple (self): + def test_001_simple(self): """ Very simple test, 2 bits -> 1 """ src_data = [0b11, 0b01, 0b10] expected_data = [0b1, 0b1, 0b1, 0b0, 0b0, 0b1] @@ -33,10 +34,10 @@ class qa_repack_bits_bb (gr_unittest.TestCase): repack = blocks.repack_bits_bb(k, l) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), expected_data) - def test_001_simple_msb (self): + def test_001_simple_msb(self): """ Very simple test, 2 bits -> 1 with MSB set """ src_data = [0b11, 0b01, 0b10] expected_data = [0b1, 0b1, 0b0, 0b1, 0b1, 0b0] @@ -46,51 +47,51 @@ class qa_repack_bits_bb (gr_unittest.TestCase): repack = blocks.repack_bits_bb(k, l, "", False, gr.GR_MSB_FIRST) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), expected_data) - def test_002_three (self): + def test_002_three(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111, 0b11111111] - expected_data = [0b101,] + [0b111,] * 7 + expected_data = [0b101, ] + [0b111, ] * 7 k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), expected_data) - def test_002_three (self): + def test_002_three(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111, 0b11111111] - expected_data = [0b101,] + [0b111,] * 7 + expected_data = [0b101, ] + [0b111, ] * 7 k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), expected_data) - def test_002_three_msb (self): + def test_002_three_msb(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111, 0b11111111] - expected_data = [0b111,] + [0b111,] + [0b011,] + [0b111,] * 5 + expected_data = [0b111, ] + [0b111, ] + [0b011, ] + [0b111, ] * 5 k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) repack = blocks.repack_bits_bb(k, l, "", False, gr.GR_MSB_FIRST) sink = blocks.vector_sink_b() self.tb.connect(src, repack, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), expected_data) - def test_003_lots_of_bytes (self): + def test_003_lots_of_bytes(self): """ Lots and lots of bytes, multiple packer stages """ - src_data = [random.randint(0, 255) for x in range(3*5*7*8 * 10)] + src_data = [random.randint(0, 255) for x in range(3 * 5 * 7 * 8 * 10)] src = blocks.vector_source_b(src_data, False, 1) repack1 = blocks.repack_bits_bb(8, 3) repack2 = blocks.repack_bits_bb(3, 5) @@ -98,12 +99,12 @@ class qa_repack_bits_bb (gr_unittest.TestCase): repack4 = blocks.repack_bits_bb(7, 8) sink = blocks.vector_sink_b() self.tb.connect(src, repack1, repack2, repack3, repack4, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), src_data) - def test_003_lots_of_bytes_msb (self): + def test_003_lots_of_bytes_msb(self): """ Lots and lots of bytes, multiple packer stages """ - src_data = [random.randint(0, 255) for x in range(3*5*7*8 * 10)] + src_data = [random.randint(0, 255) for x in range(3 * 5 * 7 * 8 * 10)] src = blocks.vector_source_b(src_data, False, 1) repack1 = blocks.repack_bits_bb(8, 3, "", False, gr.GR_MSB_FIRST) repack2 = blocks.repack_bits_bb(3, 5, "", False, gr.GR_MSB_FIRST) @@ -111,13 +112,13 @@ class qa_repack_bits_bb (gr_unittest.TestCase): repack4 = blocks.repack_bits_bb(7, 8, "", False, gr.GR_MSB_FIRST) sink = blocks.vector_sink_b() self.tb.connect(src, repack1, repack2, repack3, repack4, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), src_data) - def test_004_three_with_tags (self): + def test_004_three_with_tags(self): """ 8 -> 3 """ src_data = [0b11111101, 0b11111111] - expected_data = [0b101,] + [0b111,] * 4 + [0b001,] + expected_data = [0b101, ] + [0b111, ] * 4 + [0b001, ] k = 8 l = 3 src = blocks.vector_source_b(src_data, False, 1) @@ -125,17 +126,20 @@ class qa_repack_bits_bb (gr_unittest.TestCase): sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) self.tb.connect( src, - blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(src_data), self.tsb_key), + blocks.stream_to_tagged_stream( + gr.sizeof_char, + 1, + len(src_data), + self.tsb_key), repack, - sink - ) - self.tb.run () + sink) + self.tb.run() self.assertEqual(len(sink.data()), 1) self.assertEqual(sink.data()[0], expected_data) - def test_005_three_with_tags_trailing (self): + def test_005_three_with_tags_trailing(self): """ 3 -> 8, trailing bits """ - src_data = [0b101,] + [0b111,] * 4 + [0b001,] + src_data = [0b101, ] + [0b111, ] * 4 + [0b001, ] expected_data = [0b11111101, 0b11111111] k = 3 l = 8 @@ -144,13 +148,17 @@ class qa_repack_bits_bb (gr_unittest.TestCase): sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) self.tb.connect( src, - blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(src_data), self.tsb_key), + blocks.stream_to_tagged_stream( + gr.sizeof_char, + 1, + len(src_data), + self.tsb_key), repack, - sink - ) - self.tb.run () + sink) + self.tb.run() self.assertEqual(len(sink.data()), 1) self.assertEqual(sink.data()[0], expected_data) + if __name__ == '__main__': gr_unittest.run(qa_repack_bits_bb) diff --git a/gr-blocks/python/blocks/qa_repeat.py b/gr-blocks/python/blocks/qa_repeat.py index 1cdd788392..e54aba1037 100644 --- a/gr-blocks/python/blocks/qa_repeat.py +++ b/gr-blocks/python/blocks/qa_repeat.py @@ -11,19 +11,20 @@ from gnuradio import gr, gr_unittest, blocks + class test_repeat (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_001_float(self): - src_data = [n*1.0 for n in range(100)]; + src_data = [n * 1.0 for n in range(100)] dst_data = [] for n in range(100): - dst_data += [1.0*n, 1.0*n, 1.0*n] + dst_data += [1.0 * n, 1.0 * n, 1.0 * n] src = blocks.vector_source_f(src_data) rpt = blocks.repeat(gr.sizeof_float, 3) @@ -32,5 +33,6 @@ class test_repeat (gr_unittest.TestCase): self.tb.run() self.assertFloatTuplesAlmostEqual(dst_data, dst.data(), 6) + if __name__ == '__main__': gr_unittest.run(test_repeat) diff --git a/gr-blocks/python/blocks/qa_rms.py b/gr-blocks/python/blocks/qa_rms.py index 378bedd300..174fdaa153 100644 --- a/gr-blocks/python/blocks/qa_rms.py +++ b/gr-blocks/python/blocks/qa_rms.py @@ -13,17 +13,20 @@ from gnuradio import gr, gr_unittest, blocks import math + def sig_source_f(samp_rate, freq, amp, N): t = [float(x) / samp_rate for x in range(N)] - y = [amp*math.cos(2.*math.pi*freq*x) for x in t] + y = [amp * math.cos(2. * math.pi * freq * x) for x in t] return y + def sig_source_c(samp_rate, freq, amp, N): t = [float(x) / samp_rate for x in range(N)] - y = [amp*math.cos(2.*math.pi*freq*x) + \ - 1j*amp*math.sin(2.*math.pi*freq*x) for x in t] + y = [amp * math.cos(2. * math.pi * freq * x) + + 1j * amp * math.sin(2. * math.pi * freq * x) for x in t] return y + class test_rms(gr_unittest.TestCase): def setUp(self): @@ -68,5 +71,6 @@ class test_rms(gr_unittest.TestCase): dst_data = dst.data() self.assertAlmostEqual(dst_data[-1], expected_data, 4) + if __name__ == '__main__': gr_unittest.run(test_rms) diff --git a/gr-blocks/python/blocks/qa_sample_and_hold.py b/gr-blocks/python/blocks/qa_sample_and_hold.py index c44915c3bf..59d43ccbf2 100644 --- a/gr-blocks/python/blocks/qa_sample_and_hold.py +++ b/gr-blocks/python/blocks/qa_sample_and_hold.py @@ -12,6 +12,7 @@ import time from gnuradio import gr, gr_unittest, blocks + class test_sample_and_hold(gr_unittest.TestCase): def setUp(self): @@ -21,22 +22,24 @@ class test_sample_and_hold(gr_unittest.TestCase): self.tb = None def test_001(self): - src_data = 10*[0,1,2,3,4,5,6,7,8,9,8,7,6,5,4,3,2,1] - ctrl_data = 10*[1,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0] - expected_result = 10*(0,0,0,0,4,5,6,7,8,9,9,9,9,9,9,9,9,9) + src_data = 10 * [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 8, 7, 6, 5, 4, 3, 2, 1] + ctrl_data = 10 * [1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0] + expected_result = 10 * (0, 0, 0, 0, 4, 5, 6, 7, + 8, 9, 9, 9, 9, 9, 9, 9, 9, 9) - src = blocks.vector_source_f(src_data) + src = blocks.vector_source_f(src_data) ctrl = blocks.vector_source_b(ctrl_data) op = blocks.sample_and_hold_ff() dst = blocks.vector_sink_f() - self.tb.connect(src, (op,0)) - self.tb.connect(ctrl, (op,1)) + self.tb.connect(src, (op, 0)) + self.tb.connect(ctrl, (op, 1)) self.tb.connect(op, dst) self.tb.run() result = dst.data() self.assertFloatTuplesAlmostEqual(expected_result, result, places=6) + if __name__ == '__main__': gr_unittest.run(test_sample_and_hold) diff --git a/gr-blocks/python/blocks/qa_selector.py b/gr-blocks/python/blocks/qa_selector.py index c36cfefad5..e19ed22fb1 100644 --- a/gr-blocks/python/blocks/qa_selector.py +++ b/gr-blocks/python/blocks/qa_selector.py @@ -25,57 +25,60 @@ class test_selector(gr_unittest.TestCase): expected_result = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] expected_drop = () - num_inputs = 4; num_outputs = 4 - input_index = 1; output_index = 2 + num_inputs = 4 + num_outputs = 4 + input_index = 1 + output_index = 2 op = blocks.selector(gr.sizeof_char, input_index, output_index) src = [] dst = [] for ii in range(num_inputs): - src.append( blocks.vector_source_b(src_data)) - self.tb.connect(src[ii], (op,ii)) + src.append(blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op, ii)) for jj in range(num_outputs): dst.append(blocks.vector_sink_b()) - self.tb.connect((op,jj),dst[jj]) + self.tb.connect((op, jj), dst[jj]) - self.tb.run() dst_data = dst[output_index].data() self.assertEqual(expected_result, dst_data) - def test_select_input(self): - num_inputs = 4; num_outputs = 4 - input_index = 1; output_index = 2 + num_inputs = 4 + num_outputs = 4 + input_index = 1 + output_index = 2 op = blocks.selector(gr.sizeof_char, input_index, output_index) src = [] dst = [] for ii in range(num_inputs): - src_data = [ii+1]*10 - src.append( blocks.vector_source_b(src_data)) - self.tb.connect(src[ii], (op,ii)) + src_data = [ii + 1] * 10 + src.append(blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op, ii)) for jj in range(num_outputs): dst.append(blocks.vector_sink_b()) - self.tb.connect((op,jj),dst[jj]) + self.tb.connect((op, jj), dst[jj]) - self.tb.run() - expected_result = [input_index+1]*10 + expected_result = [input_index + 1] * 10 dst_data = list(dst[output_index].data()) self.assertEqual(expected_result, dst_data) - + def test_dump(self): - num_inputs = 4; num_outputs = 4 - input_index = 1; output_index = 2 + num_inputs = 4 + num_outputs = 4 + input_index = 1 + output_index = 2 output_not_selected = 3 op = blocks.selector(gr.sizeof_char, input_index, output_index) @@ -83,14 +86,13 @@ class test_selector(gr_unittest.TestCase): src = [] dst = [] for ii in range(num_inputs): - src_data = [ii+1]*10 - src.append( blocks.vector_source_b(src_data)) - self.tb.connect(src[ii], (op,ii)) + src_data = [ii + 1] * 10 + src.append(blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op, ii)) for jj in range(num_outputs): dst.append(blocks.vector_sink_b()) - self.tb.connect((op,jj),dst[jj]) + self.tb.connect((op, jj), dst[jj]) - self.tb.run() expected_result = [] @@ -98,12 +100,14 @@ class test_selector(gr_unittest.TestCase): self.assertEqual(expected_result, dst_data) - def test_not_enabled (self): + def test_not_enabled(self): src_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] expected_result = [] - num_inputs = 4; num_outputs = 4 - input_index = 1; output_index = 2 + num_inputs = 4 + num_outputs = 4 + input_index = 1 + output_index = 2 op = blocks.selector(gr.sizeof_char, input_index, output_index) op.set_enabled(False) @@ -111,19 +115,17 @@ class test_selector(gr_unittest.TestCase): src = [] dst = [] for ii in range(num_inputs): - src.append( blocks.vector_source_b(src_data)) - self.tb.connect(src[ii], (op,ii)) + src.append(blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op, ii)) for jj in range(num_outputs): dst.append(blocks.vector_sink_b()) - self.tb.connect((op,jj),dst[jj]) + self.tb.connect((op, jj), dst[jj]) - self.tb.run() dst_data = dst[output_index].data() self.assertEqual(expected_result, dst_data) - # These tests cannot be run as set_index can only be called after check_topology is called # def test_set_indices(self): @@ -132,7 +134,6 @@ class test_selector(gr_unittest.TestCase): # op = blocks.selector(gr.sizeof_char, 0, 0) - # src = [] # dst = [] # for ii in range(num_inputs): @@ -145,7 +146,7 @@ class test_selector(gr_unittest.TestCase): # op.set_input_index(input_index) # op.set_output_index(output_index) - + # self.tb.run() # expected_result = [input_index+1]*10 @@ -172,7 +173,6 @@ class test_selector(gr_unittest.TestCase): # dst.append(blocks.vector_sink_b()) # self.tb.connect((op,jj),dst[jj]) - # self.tb.run() # expected_result = [input_index+1]*10 @@ -182,30 +182,39 @@ class test_selector(gr_unittest.TestCase): def test_float_vector(self): - num_inputs = 4; num_outputs = 4 - input_index = 1; output_index = 2 + num_inputs = 4 + num_outputs = 4 + input_index = 1 + output_index = 2 veclen = 3 - op = blocks.selector(gr.sizeof_float*veclen, input_index, output_index) + op = blocks.selector( + gr.sizeof_float * veclen, + input_index, + output_index) src = [] dst = [] for ii in range(num_inputs): - src_data = [float(ii)+1]*10*veclen - src.append( blocks.vector_source_f(src_data, repeat=False, vlen=veclen)) - self.tb.connect(src[ii], (op,ii)) + src_data = [float(ii) + 1] * 10 * veclen + src.append( + blocks.vector_source_f( + src_data, + repeat=False, + vlen=veclen)) + self.tb.connect(src[ii], (op, ii)) for jj in range(num_outputs): dst.append(blocks.vector_sink_f(vlen=veclen)) - self.tb.connect((op,jj),dst[jj]) + self.tb.connect((op, jj), dst[jj]) - self.tb.run() - expected_result = [float(input_index)+1]*10*veclen + expected_result = [float(input_index) + 1] * 10 * veclen dst_data = list(dst[output_index].data()) self.assertEqual(expected_result, dst_data) + if __name__ == '__main__': gr_unittest.run(test_selector) diff --git a/gr-blocks/python/blocks/qa_skiphead.py b/gr-blocks/python/blocks/qa_skiphead.py index 842a36ca0e..1c643cfbd7 100644 --- a/gr-blocks/python/blocks/qa_skiphead.py +++ b/gr-blocks/python/blocks/qa_skiphead.py @@ -120,5 +120,6 @@ class test_skiphead(gr_unittest.TestCase): self.assertEqual(pmt.to_python( dst_tags[0].value), "qux", "Tag value is incorrect") + if __name__ == '__main__': gr_unittest.run(test_skiphead) diff --git a/gr-blocks/python/blocks/qa_socket_pdu.py b/gr-blocks/python/blocks/qa_socket_pdu.py index b591716d7e..16e118dafb 100644 --- a/gr-blocks/python/blocks/qa_socket_pdu.py +++ b/gr-blocks/python/blocks/qa_socket_pdu.py @@ -14,16 +14,17 @@ import random import pmt import time + class qa_socket_pdu (gr_unittest.TestCase): - def setUp (self): + def setUp(self): random.seed(0) - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001 (self): + def test_001(self): # Test that blocks can be created and destroyed without hanging port = str(random.Random().randint(0, 30000) + 10000) self.pdu_send = blocks.socket_pdu("UDP_CLIENT", "localhost", port) @@ -31,7 +32,7 @@ class qa_socket_pdu (gr_unittest.TestCase): self.pdu_send = None self.pdu_recv = None - def test_002 (self): + def test_002(self): # Send a PDU through a pair of UDP sockets port = str(random.Random().randint(0, 30000) + 10000) srcdata = (0x64, 0x6f, 0x67, 0x65) @@ -47,7 +48,7 @@ class qa_socket_pdu (gr_unittest.TestCase): self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus") self.tb.msg_connect(self.pdu_recv, "pdus", self.dbg, "store") - self.tb.start () + self.tb.start() time.sleep(1) self.tb.stop() self.tb.wait() @@ -61,10 +62,26 @@ class qa_socket_pdu (gr_unittest.TestCase): msg_data.append(pmt.u8vector_ref(received_data, i)) self.assertEqual(srcdata, tuple(msg_data)) - def test_003 (self): + def test_003(self): # Test that block stops when interacting with streaming interface port = str(random.Random().randint(0, 30000) + 10000) - srcdata = (0x73, 0x75, 0x63, 0x68, 0x74, 0x65, 0x73, 0x74, 0x76, 0x65, 0x72, 0x79, 0x70, 0x61, 0x73, 0x73) + srcdata = ( + 0x73, + 0x75, + 0x63, + 0x68, + 0x74, + 0x65, + 0x73, + 0x74, + 0x76, + 0x65, + 0x72, + 0x79, + 0x70, + 0x61, + 0x73, + 0x73) tag_dict = {"offset": 0} tag_dict["key"] = pmt.intern("len") tag_dict["value"] = pmt.from_long(8) @@ -90,7 +107,7 @@ class qa_socket_pdu (gr_unittest.TestCase): #self.tb.connect(pdu_to_ts, head, sink) self.tb.run() - def test_004 (self): + def test_004(self): # Test that the TCP server can stream PDUs <= the MTU size. port = str(random.Random().randint(0, 30000) + 10000) mtu = 10000 @@ -118,6 +135,6 @@ class qa_socket_pdu (gr_unittest.TestCase): msg_data.append(pmt.u8vector_ref(received_data, i)) self.assertEqual(srcdata, tuple(msg_data)) + if __name__ == '__main__': gr_unittest.run(qa_socket_pdu) - diff --git a/gr-blocks/python/blocks/qa_stream_demux.py b/gr-blocks/python/blocks/qa_stream_demux.py index da9aa46be0..629c35283c 100755 --- a/gr-blocks/python/blocks/qa_stream_demux.py +++ b/gr-blocks/python/blocks/qa_stream_demux.py @@ -26,17 +26,18 @@ from gnuradio import gr, gr_unittest, blocks import pmt import os + class qa_stream_demux(gr_unittest.TestCase): - def setUp (self): + def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def help_stream_2ff(self, N, stream_sizes): - v = blocks.vector_source_f(N*[1,] + N*[2,], False) + v = blocks.vector_source_f(N * [1, ] + N * [2, ], False) demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) @@ -44,8 +45,8 @@ class qa_stream_demux(gr_unittest.TestCase): dst1 = blocks.vector_sink_f() self.tb.connect(v, demux) - self.tb.connect((demux,0), dst0) - self.tb.connect((demux,1), dst1) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) self.tb.run() return (dst0.data(), dst1.data()) @@ -61,14 +62,15 @@ class qa_stream_demux(gr_unittest.TestCase): dst1 = blocks.vector_sink_f() self.tb.connect(v, demux) - self.tb.connect((demux,0), dst0) - self.tb.connect((demux,1), dst1) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) self.tb.run() return (dst0.data(), dst1.data()) def help_stream_tag_propagation(self, N, stream_sizes): - src_data = (stream_sizes[0]*[1,] + stream_sizes[1]*[2,] + stream_sizes[2]*[3,]) * N + src_data = (stream_sizes[0] * [1, ] + stream_sizes[1] + * [2, ] + stream_sizes[2] * [3, ]) * N src = blocks.vector_source_f(src_data, False) @@ -88,9 +90,9 @@ class qa_stream_demux(gr_unittest.TestCase): self.tb.connect(tag_stream1, tag_stream2) self.tb.connect(tag_stream2, tag_stream3) self.tb.connect(tag_stream3, demux) - self.tb.connect((demux,0), dst0) - self.tb.connect((demux,1), dst1) - self.tb.connect((demux,2), dst2) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) + self.tb.connect((demux, 2), dst2) self.tb.run() return (dst0, dst1, dst2) @@ -109,25 +111,99 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_ramp_2NN_ff(self): N = 40 stream_sizes = [10, 10] result_data = self.help_stream_ramp_2ff(N, stream_sizes) - exp_data0 = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, - 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, - 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0, - 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0] - exp_data1 = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, - 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, - 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0, - 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] - - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + exp_data0 = [ + 0.0, + 1.0, + 2.0, + 3.0, + 4.0, + 5.0, + 6.0, + 7.0, + 8.0, + 9.0, + 20.0, + 21.0, + 22.0, + 23.0, + 24.0, + 25.0, + 26.0, + 27.0, + 28.0, + 29.0, + 39.0, + 38.0, + 37.0, + 36.0, + 35.0, + 34.0, + 33.0, + 32.0, + 31.0, + 30.0, + 19.0, + 18.0, + 17.0, + 16.0, + 15.0, + 14.0, + 13.0, + 12.0, + 11.0, + 10.0] + exp_data1 = [ + 10.0, + 11.0, + 12.0, + 13.0, + 14.0, + 15.0, + 16.0, + 17.0, + 18.0, + 19.0, + 30.0, + 31.0, + 32.0, + 33.0, + 34.0, + 35.0, + 36.0, + 37.0, + 38.0, + 39.0, + 29.0, + 28.0, + 27.0, + 26.0, + 25.0, + 24.0, + 23.0, + 22.0, + 21.0, + 20.0, + 9.0, + 8.0, + 7.0, + 6.0, + 5.0, + 4.0, + 3.0, + 2.0, + 1.0, + 0.0] + + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_2NM_ff(self): N = 40 @@ -148,9 +224,8 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) - + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_2MN_ff(self): N = 37 @@ -171,8 +246,8 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_2N0_ff(self): N = 30 @@ -192,8 +267,8 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0] exp_data1 = [] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_stream_20N_ff(self): N = 30 @@ -211,82 +286,121 @@ class qa_stream_demux(gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data0, result_data[0]) - self.assertEqual (exp_data1, result_data[1]) + self.assertEqual(exp_data0, result_data[0]) + self.assertEqual(exp_data1, result_data[1]) def test_largeN_ff(self): stream_sizes = [3, 8191] - r0 = [1.0,] * stream_sizes[0] - r1 = [2.0,] * stream_sizes[1] + r0 = [1.0, ] * stream_sizes[0] + r1 = [2.0, ] * stream_sizes[1] v = blocks.vector_source_f(r0 + r1, repeat=False) demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) dst0 = blocks.vector_sink_f() dst1 = blocks.vector_sink_f() - self.tb.connect (v, demux) - self.tb.connect ((demux,0), dst0) - self.tb.connect ((demux,1), dst1) - self.tb.run () - self.assertEqual (r0, dst0.data()) - self.assertEqual (r1, dst1.data()) + self.tb.connect(v, demux) + self.tb.connect((demux, 0), dst0) + self.tb.connect((demux, 1), dst1) + self.tb.run() + self.assertEqual(r0, dst0.data()) + self.assertEqual(r1, dst1.data()) def test_tag_propagation(self): - N = 10 # Block length - stream_sizes = [1,2,3] + N = 10 # Block length + stream_sizes = [1, 2, 3] - expected_result0 = N*(stream_sizes[0]*[1,]) - expected_result1 = N*(stream_sizes[1]*[2,]) - expected_result2 = N*(stream_sizes[2]*[3,]) + expected_result0 = N * (stream_sizes[0] * [1, ]) + expected_result1 = N * (stream_sizes[1] * [2, ]) + expected_result2 = N * (stream_sizes[2] * [3, ]) # check the data (result0, result1, result2) = self.help_stream_tag_propagation(N, stream_sizes) - self.assertFloatTuplesAlmostEqual(expected_result0, result0.data(), places=6) - self.assertFloatTuplesAlmostEqual(expected_result1, result1.data(), places=6) - self.assertFloatTuplesAlmostEqual(expected_result2, result2.data(), places=6) + self.assertFloatTuplesAlmostEqual( + expected_result0, result0.data(), places=6) + self.assertFloatTuplesAlmostEqual( + expected_result1, result1.data(), places=6) + self.assertFloatTuplesAlmostEqual( + expected_result2, result2.data(), places=6) # check the tags - result0 tags = result0.tags() - expected_tag_offsets_src1 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) - expected_tag_offsets_src2 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) - expected_tag_offsets_src3 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + expected_tag_offsets_src1 = list( + range(0, stream_sizes[0] * N, stream_sizes[0])) + expected_tag_offsets_src2 = list( + range(0, stream_sizes[0] * N, stream_sizes[0])) + expected_tag_offsets_src3 = list( + range(0, stream_sizes[0] * N, stream_sizes[0])) + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) # check the tags - result1 tags = result1.tags() - expected_tag_offsets_src1 = list(range(0,stream_sizes[1]*N,stream_sizes[0])) - expected_tag_offsets_src2 = list(range(1,stream_sizes[1]*N,stream_sizes[1])) + expected_tag_offsets_src1 = list( + range(0, stream_sizes[1] * N, stream_sizes[0])) + expected_tag_offsets_src2 = list( + range(1, stream_sizes[1] * N, stream_sizes[1])) expected_tag_offsets_src3 = list() - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) # check the tags - result2 tags = result2.tags() - expected_tag_offsets_src1 = list(range(0,stream_sizes[2]*N,stream_sizes[0])) - expected_tag_offsets_src2 = list(range(1,stream_sizes[2]*N,stream_sizes[2])) - expected_tag_offsets_src3 = list(range(0,stream_sizes[2]*N,stream_sizes[2])) - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + expected_tag_offsets_src1 = list( + range(0, stream_sizes[2] * N, stream_sizes[0])) + expected_tag_offsets_src2 = list( + range(1, stream_sizes[2] * N, stream_sizes[2])) + expected_tag_offsets_src3 = list( + range(0, stream_sizes[2] * N, stream_sizes[2])) + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) + if __name__ == '__main__': gr_unittest.run(qa_stream_demux) diff --git a/gr-blocks/python/blocks/qa_stream_mux.py b/gr-blocks/python/blocks/qa_stream_mux.py index 20d1376eac..4c935245db 100644 --- a/gr-blocks/python/blocks/qa_stream_mux.py +++ b/gr-blocks/python/blocks/qa_stream_mux.py @@ -13,29 +13,30 @@ from gnuradio import gr, gr_unittest, blocks import pmt import os + class test_stream_mux (gr_unittest.TestCase): - def setUp (self): + def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def help_stream_2ff(self, N, stream_sizes): - v0 = blocks.vector_source_f(N*[1,], False) - v1 = blocks.vector_source_f(N*[2,], False) + v0 = blocks.vector_source_f(N * [1, ], False) + v1 = blocks.vector_source_f(N * [2, ], False) mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) - dst = blocks.vector_sink_f () + dst = blocks.vector_sink_f() - self.tb.connect (v0, (mux,0)) - self.tb.connect (v1, (mux,1)) - self.tb.connect (mux, dst) - self.tb.run () + self.tb.connect(v0, (mux, 0)) + self.tb.connect(v1, (mux, 1)) + self.tb.connect(mux, dst) + self.tb.run() - return dst.data () + return dst.data() def help_stream_ramp_2ff(self, N, stream_sizes): r1 = list(range(N)) @@ -47,19 +48,19 @@ class test_stream_mux (gr_unittest.TestCase): mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) - dst = blocks.vector_sink_f () + dst = blocks.vector_sink_f() - self.tb.connect (v0, (mux,0)) - self.tb.connect (v1, (mux,1)) - self.tb.connect (mux, dst) - self.tb.run () + self.tb.connect(v0, (mux, 0)) + self.tb.connect(v1, (mux, 1)) + self.tb.connect(mux, dst) + self.tb.run() - return dst.data () + return dst.data() def help_stream_tag_propagation(self, N, stream_sizes): - src_data1 = stream_sizes[0]*N*[1,] - src_data2 = stream_sizes[1]*N*[2,] - src_data3 = stream_sizes[2]*N*[3,] + src_data1 = stream_sizes[0] * N * [1, ] + src_data2 = stream_sizes[1] * N * [2, ] + src_data3 = stream_sizes[2] * N * [3, ] # stream_mux scheme (3,2,4) src1 = blocks.vector_source_f(src_data1) src2 = blocks.vector_source_f(src_data2) @@ -77,14 +78,13 @@ class test_stream_mux (gr_unittest.TestCase): self.tb.connect(src1, tag_stream1) self.tb.connect(src2, tag_stream2) self.tb.connect(src3, tag_stream3) - self.tb.connect(tag_stream1, (mux,0)) - self.tb.connect(tag_stream2, (mux,1)) - self.tb.connect(tag_stream3, (mux,2)) + self.tb.connect(tag_stream1, (mux, 0)) + self.tb.connect(tag_stream2, (mux, 1)) + self.tb.connect(tag_stream3, (mux, 2)) self.tb.connect(mux, dst) self.tb.run() - return (dst.data (), dst.tags ()) - + return (dst.data(), dst.tags()) def test_stream_2NN_ff(self): N = 40 @@ -99,22 +99,22 @@ class test_stream_mux (gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_stream_ramp_2NN_ff(self): N = 40 stream_sizes = [10, 10] result_data = self.help_stream_ramp_2ff(N, stream_sizes) - exp_data = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + exp_data = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, - 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] - self.assertEqual (exp_data, result_data) + 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] + self.assertEqual(exp_data, result_data) def test_stream_2NM_ff(self): N = 40 @@ -134,8 +134,7 @@ class test_stream_mux (gr_unittest.TestCase): 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data, result_data) - + self.assertEqual(exp_data, result_data) def test_stream_2MN_ff(self): N = 37 @@ -155,7 +154,7 @@ class test_stream_mux (gr_unittest.TestCase): 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_stream_2N0_ff(self): N = 30 @@ -170,7 +169,7 @@ class test_stream_mux (gr_unittest.TestCase): 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_stream_20N_ff(self): N = 30 @@ -184,49 +183,59 @@ class test_stream_mux (gr_unittest.TestCase): 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] - self.assertEqual (exp_data, result_data) + self.assertEqual(exp_data, result_data) def test_largeN_ff(self): stream_sizes = [3, 8191] - r1 = [1,] * stream_sizes[0] - r2 = [2,] * stream_sizes[1] + r1 = [1, ] * stream_sizes[0] + r2 = [2, ] * stream_sizes[1] v0 = blocks.vector_source_f(r1, repeat=False) v1 = blocks.vector_source_f(r2, repeat=False) mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) - dst = blocks.vector_sink_f () - self.tb.connect (v0, (mux,0)) - self.tb.connect (v1, (mux,1)) - self.tb.connect (mux, dst) - self.tb.run () - self.assertEqual (r1 + r2, dst.data()) + dst = blocks.vector_sink_f() + self.tb.connect(v0, (mux, 0)) + self.tb.connect(v1, (mux, 1)) + self.tb.connect(mux, dst) + self.tb.run() + self.assertEqual(r1 + r2, dst.data()) def test_tag_propagation(self): - N = 10 # Block length - stream_sizes = [1,2,3] + N = 10 # Block length + stream_sizes = [1, 2, 3] - expected_result = N*(stream_sizes[0]*[1,] - +stream_sizes[1]*[2,] - +stream_sizes[2]*[3,]) + expected_result = N * (stream_sizes[0] * [1, ] + + stream_sizes[1] * [2, ] + + stream_sizes[2] * [3, ]) # check the data (result, tags) = self.help_stream_tag_propagation(N, stream_sizes) self.assertFloatTuplesAlmostEqual(expected_result, result, places=6) # check the tags - expected_tag_offsets_src1 = [sum(stream_sizes)*i for i in range(N)] + expected_tag_offsets_src1 = [sum(stream_sizes) * i for i in range(N)] expected_tag_offsets_src2 = [stream_sizes[0] - +sum(stream_sizes)*i for i in range(N)] - expected_tag_offsets_src3 = [stream_sizes[0]+stream_sizes[1] - +sum(stream_sizes)*i for i in range(N)] - tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] - tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] - tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + + sum(stream_sizes) * i for i in range(N)] + expected_tag_offsets_src3 = [stream_sizes[0] + stream_sizes[1] + + sum(stream_sizes) * i for i in range(N)] + tags_src1 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src1'))] + tags_src2 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src2'))] + tags_src3 = [ + tag for tag in tags if pmt.eq( + tag.key, pmt.intern('src3'))] for i in range(len(expected_tag_offsets_src1)): - self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + self.assertTrue( + expected_tag_offsets_src1[i] == tags_src1[i].offset) for i in range(len(expected_tag_offsets_src2)): - self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + self.assertTrue( + expected_tag_offsets_src2[i] == tags_src2[i].offset) for i in range(len(expected_tag_offsets_src3)): - self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + self.assertTrue( + expected_tag_offsets_src3[i] == tags_src3[i].offset) + if __name__ == '__main__': gr_unittest.run(test_stream_mux) diff --git a/gr-blocks/python/blocks/qa_stream_to_tagged_stream.py b/gr-blocks/python/blocks/qa_stream_to_tagged_stream.py index 6059c00114..4c8170a202 100644 --- a/gr-blocks/python/blocks/qa_stream_to_tagged_stream.py +++ b/gr-blocks/python/blocks/qa_stream_to_tagged_stream.py @@ -13,30 +13,32 @@ from gnuradio import gr, gr_unittest from gnuradio import blocks + class qa_stream_to_tagged_stream (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_t (self): + def test_001_t(self): src_data = [1, ] * 50 packet_len = 10 len_tag_key = 'packet_len' src = blocks.vector_source_f(src_data, False, 1) - tagger = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len, len_tag_key) + tagger = blocks.stream_to_tagged_stream( + gr.sizeof_float, 1, packet_len, len_tag_key) sink = blocks.vector_sink_f() self.tb.connect(src, tagger, sink) - self.tb.run () + self.tb.run() self.assertEqual(sink.data(), src_data) tags = [gr.tag_to_python(x) for x in sink.tags()] tags = sorted([(x.offset, x.key, x.value) for x in tags]) - expected_tags = [(int(pos), 'packet_len', packet_len) for pos in range(0, 50, 10) ] + expected_tags = [(int(pos), 'packet_len', packet_len) + for pos in range(0, 50, 10)] self.assertEqual(tags, expected_tags) if __name__ == '__main__': gr_unittest.run(qa_stream_to_tagged_stream) - diff --git a/gr-blocks/python/blocks/qa_stretch.py b/gr-blocks/python/blocks/qa_stretch.py index 0a3ca0a105..e48a47901b 100644 --- a/gr-blocks/python/blocks/qa_stretch.py +++ b/gr-blocks/python/blocks/qa_stretch.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_stretch(gr_unittest.TestCase): def setUp(self): @@ -22,12 +23,12 @@ class test_stretch(gr_unittest.TestCase): def test_stretch_01(self): tb = self.tb - data = 10*[1,] + data = 10 * [1, ] data0 = [x / 20.0 for x in data] data1 = [x / 10.0 for x in data] - expected_result0 = 10*[0.05,] - expected_result1 = 10*[0.1,] + expected_result0 = 10 * [0.05, ] + expected_result1 = 10 * [0.1, ] src0 = blocks.vector_source_f(data0, False) src1 = blocks.vector_source_f(data1, False) @@ -37,12 +38,12 @@ class test_stretch(gr_unittest.TestCase): dst0 = blocks.vector_sink_f() dst1 = blocks.vector_sink_f() - tb.connect(src0, (inter,0)) - tb.connect(src1, (inter,1)) + tb.connect(src0, (inter, 0)) + tb.connect(src1, (inter, 1)) tb.connect(inter, op) tb.connect(op, deinter) - tb.connect((deinter,0), dst0) - tb.connect((deinter,1), dst1) + tb.connect((deinter, 0), dst0) + tb.connect((deinter, 1), dst1) tb.run() dst0_data = dst0.data() @@ -51,5 +52,6 @@ class test_stretch(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_result0, dst0_data, 4) self.assertFloatTuplesAlmostEqual(expected_result1, dst1_data, 4) + if __name__ == '__main__': gr_unittest.run(test_stretch) diff --git a/gr-blocks/python/blocks/qa_tag_debug.py b/gr-blocks/python/blocks/qa_tag_debug.py index 960d005644..e6b1146bc6 100644 --- a/gr-blocks/python/blocks/qa_tag_debug.py +++ b/gr-blocks/python/blocks/qa_tag_debug.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_tag_debug(gr_unittest.TestCase): def setUp(self): @@ -28,5 +29,6 @@ class test_tag_debug(gr_unittest.TestCase): self.tb.run() x = op.current_tags() + if __name__ == '__main__': gr_unittest.run(test_tag_debug) diff --git a/gr-blocks/python/blocks/qa_tag_file_sink.py b/gr-blocks/python/blocks/qa_tag_file_sink.py index 2edcbb75c7..255672a40b 100644 --- a/gr-blocks/python/blocks/qa_tag_file_sink.py +++ b/gr-blocks/python/blocks/qa_tag_file_sink.py @@ -10,7 +10,9 @@ from gnuradio import gr, gr_unittest, blocks -import os, struct +import os +import struct + class test_tag_file_sink(gr_unittest.TestCase): @@ -21,14 +23,14 @@ class test_tag_file_sink(gr_unittest.TestCase): self.tb = None def test_001(self): - src_data = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - trg_data = [-1, -1, 1, 1, -1, -1, 1, 1, -1, -1] + src_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + trg_data = [-1, -1, 1, 1, -1, -1, 1, 1, -1, -1] src = blocks.vector_source_i(src_data) trg = blocks.vector_source_s(trg_data) - op = blocks.burst_tagger(gr.sizeof_int) + op = blocks.burst_tagger(gr.sizeof_int) snk = blocks.tagged_file_sink(gr.sizeof_int, 1) - self.tb.connect(src, (op,0)) - self.tb.connect(trg, (op,1)) + self.tb.connect(src, (op, 0)) + self.tb.connect(trg, (op, 1)) self.tb.connect(op, snk) self.tb.run() @@ -56,5 +58,6 @@ class test_tag_file_sink(gr_unittest.TestCase): self.assertEqual(idata0, (3, 4)) self.assertEqual(idata1, (7, 8)) + if __name__ == '__main__': gr_unittest.run(test_tag_file_sink) diff --git a/gr-blocks/python/blocks/qa_tag_gate.py b/gr-blocks/python/blocks/qa_tag_gate.py index 0597ca32f9..179d27cb68 100644 --- a/gr-blocks/python/blocks/qa_tag_gate.py +++ b/gr-blocks/python/blocks/qa_tag_gate.py @@ -15,13 +15,13 @@ import pmt class qa_tag_gate (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_t (self): + def test_001_t(self): tag = gr.tag_t() tag.key = pmt.string_to_symbol('key') tag.value = pmt.from_long(42) @@ -30,10 +30,10 @@ class qa_tag_gate (gr_unittest.TestCase): gate = blocks.tag_gate(gr.sizeof_float, False) sink = blocks.vector_sink_f() self.tb.connect(src, gate, sink) - self.tb.run () + self.tb.run() self.assertEqual(len(sink.tags()), 0) - def test_002_t (self): + def test_002_t(self): tags = [] tags.append(gr.tag_t()) tags[0].key = pmt.string_to_symbol('key') @@ -50,13 +50,13 @@ class qa_tag_gate (gr_unittest.TestCase): src = blocks.vector_source_f(range(20), False, 1, tags) gate = blocks.tag_gate(gr.sizeof_float, False) gate.set_single_key("key") - self.assertEqual(gate.single_key(),"key") + self.assertEqual(gate.single_key(), "key") sink = blocks.vector_sink_f() self.tb.connect(src, gate, sink) - self.tb.run () + self.tb.run() self.assertEqual(len(sink.tags()), 1) - def test_003_t (self): + def test_003_t(self): tags = [] tags.append(gr.tag_t()) tags[0].key = pmt.string_to_symbol('key') @@ -75,10 +75,10 @@ class qa_tag_gate (gr_unittest.TestCase): gate.set_single_key("key") sink = blocks.vector_sink_f() self.tb.connect(src, gate, sink) - self.tb.run () + self.tb.run() self.assertEqual(len(sink.tags()), 3) - def test_004_t (self): + def test_004_t(self): tags = [] tags.append(gr.tag_t()) tags[0].key = pmt.string_to_symbol('key') @@ -96,10 +96,10 @@ class qa_tag_gate (gr_unittest.TestCase): gate = blocks.tag_gate(gr.sizeof_float, True) sink = blocks.vector_sink_f() self.tb.connect(src, gate, sink) - self.tb.run () + self.tb.run() self.assertEqual(len(sink.tags()), 3) - def test_005_t (self): + def test_005_t(self): gate = blocks.tag_gate(gr.sizeof_float, True) self.assertEqual(gate.single_key(), "") gate.set_single_key("the_key") @@ -107,6 +107,6 @@ class qa_tag_gate (gr_unittest.TestCase): gate.set_single_key("") self.assertEqual(gate.single_key(), "") + if __name__ == '__main__': gr_unittest.run(qa_tag_gate) - diff --git a/gr-blocks/python/blocks/qa_tag_share.py b/gr-blocks/python/blocks/qa_tag_share.py index 077fa8dee3..3fdb2651c2 100755 --- a/gr-blocks/python/blocks/qa_tag_share.py +++ b/gr-blocks/python/blocks/qa_tag_share.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest from gnuradio import blocks import pmt + class qa_tag_share(gr_unittest.TestCase): def setUp(self): @@ -26,10 +27,10 @@ class qa_tag_share(gr_unittest.TestCase): tag_key = 'in1_tag' tag_value = 0 tag_offset = 0 - in0_value = 1.0+1.0j + in0_value = 1.0 + 1.0j in1_value = 2.717 - in0_data = [in0_value,]*10 - in1_data = [in1_value,]*10 + in0_data = [in0_value, ] * 10 + in1_data = [in1_value, ] * 10 sink_data = in0_data tag = gr.tag_t() @@ -45,8 +46,8 @@ class qa_tag_share(gr_unittest.TestCase): tag_share = blocks.tag_share(gr.sizeof_gr_complex, gr.sizeof_float) sink = blocks.vector_sink_c(1) - self.tb.connect(in0, (tag_share,0)) - self.tb.connect(in1, (tag_share,1)) + self.tb.connect(in0, (tag_share, 0)) + self.tb.connect(in1, (tag_share, 1)) self.tb.connect(tag_share, sink) self.tb.run() diff --git a/gr-blocks/python/blocks/qa_tagged_stream_mux.py b/gr-blocks/python/blocks/qa_tagged_stream_mux.py index 739ffd7bcb..ff95069ab7 100644 --- a/gr-blocks/python/blocks/qa_tagged_stream_mux.py +++ b/gr-blocks/python/blocks/qa_tagged_stream_mux.py @@ -14,6 +14,7 @@ import pmt from gnuradio import gr, gr_unittest, blocks from gnuradio.gr import packet_utils + def make_tag(key, value, offset, srcid=None): tag = gr.tag_t() tag.key = pmt.string_to_symbol(key) @@ -23,6 +24,7 @@ def make_tag(key, value, offset, srcid=None): tag.srcid = pmt.to_pmt(srcid) return tag + class qa_tagged_stream_mux (gr_unittest.TestCase): def setUp(self): @@ -34,25 +36,27 @@ class qa_tagged_stream_mux (gr_unittest.TestCase): def setup_data_tags(self, data): return packet_utils.packets_to_vectors( - data, - self.tsb_key + data, + self.tsb_key ) def test_1(self): packets0 = ( - (0, 1, 2), (5, 6), (10,), (14, 15, 16,) + (0, 1, 2), (5, 6), (10,), (14, 15, 16,) ) packets1 = ( - (3, 4), (7, 8, 9), (11, 12, 13), (17,) + (3, 4), (7, 8, 9), (11, 12, 13), (17,) ) - expected = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13], [14, 15, 16, 17]] + expected = [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], + [10, 11, 12, 13], [14, 15, 16, 17]] data0, tags0 = self.setup_data_tags(packets0) data1, tags1 = self.setup_data_tags(packets1) tags0.append(make_tag('spam', 42, 4)) tags1.append(make_tag('eggs', 23, 3)) src0 = blocks.vector_source_b(data0, tags=tags0) src1 = blocks.vector_source_b(data1, tags=tags1) - tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, self.tsb_key) + tagged_stream_mux = blocks.tagged_stream_mux( + gr.sizeof_char, self.tsb_key) snk = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) self.tb.connect(src0, (tagged_stream_mux, 0)) self.tb.connect(src1, (tagged_stream_mux, 1)) @@ -63,8 +67,8 @@ class qa_tagged_stream_mux (gr_unittest.TestCase): tags = [gr.tag_to_python(x) for x in snk.tags()] tags = sorted([(x.offset, x.key, x.value) for x in tags]) tags_expected = [ - (6, 'spam', 42), - (8, 'eggs', 23), + (6, 'spam', 42), + (8, 'eggs', 23), ] self.assertEqual(tags, tags_expected) @@ -77,21 +81,20 @@ class qa_tagged_stream_mux (gr_unittest.TestCase): packet_len_1 = 3 data1 = list(range(packet_len_1)) mux = blocks.tagged_stream_mux( - gr.sizeof_float, - self.tsb_key, - 1 # Mark port 1 as carrying special tags on the head position + gr.sizeof_float, + self.tsb_key, + 1 # Mark port 1 as carrying special tags on the head position ) sink = blocks.tsb_vector_sink_f(tsb_key=self.tsb_key) + self.tb.connect(blocks.vector_source_f(data0), blocks.stream_to_tagged_stream( + gr.sizeof_float, 1, packet_len_0, self.tsb_key), (mux, 0)) self.tb.connect( - blocks.vector_source_f(data0), - blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_0, self.tsb_key), - (mux, 0) - ) - self.tb.connect( - blocks.vector_source_f(list(range(packet_len_1)), tags=(make_tag('spam', 'eggs', 0),)), - blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_1, self.tsb_key), - (mux, 1) - ) + blocks.vector_source_f( + list( + range(packet_len_1)), tags=( + make_tag( + 'spam', 'eggs', 0),)), blocks.stream_to_tagged_stream( + gr.sizeof_float, 1, packet_len_1, self.tsb_key), (mux, 1)) self.tb.connect(mux, sink) self.tb.run() self.assertEqual(len(sink.data()), 1) @@ -105,4 +108,3 @@ class qa_tagged_stream_mux (gr_unittest.TestCase): if __name__ == '__main__': gr_unittest.run(qa_tagged_stream_mux) - diff --git a/gr-blocks/python/blocks/qa_tags_strobe.py b/gr-blocks/python/blocks/qa_tags_strobe.py index a31a97bc5c..fa6285427a 100644 --- a/gr-blocks/python/blocks/qa_tags_strobe.py +++ b/gr-blocks/python/blocks/qa_tags_strobe.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import pmt import math + class test_tags_strobe(gr_unittest.TestCase): def setUp(self): @@ -81,5 +82,6 @@ class test_tags_strobe(gr_unittest.TestCase): self.assertEqual(tag.offset, n_expected) n_expected += nsamps + if __name__ == '__main__': gr_unittest.run(test_tags_strobe) diff --git a/gr-blocks/python/blocks/qa_tcp_server_sink.py b/gr-blocks/python/blocks/qa_tcp_server_sink.py index 00593f5574..64bcbf0c2e 100644 --- a/gr-blocks/python/blocks/qa_tcp_server_sink.py +++ b/gr-blocks/python/blocks/qa_tcp_server_sink.py @@ -17,6 +17,7 @@ from time import sleep from threading import Timer from multiprocessing import Process + class test_tcp_sink(gr_unittest.TestCase): def setUp(self): @@ -32,7 +33,7 @@ class test_tcp_sink(gr_unittest.TestCase): dst = blocks.vector_sink_s() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) for t in (0, 0.2): -# wait until server listens + # wait until server listens sleep(t) try: sock.connect((self.addr, self.port)) @@ -42,7 +43,9 @@ class test_tcp_sink(gr_unittest.TestCase): continue break fd = os.dup(sock.fileno()) - self.tb_rcv.connect(blocks.file_descriptor_source(self.itemsize, fd), dst) + self.tb_rcv.connect( + blocks.file_descriptor_source( + self.itemsize, fd), dst) self.tb_rcv.run() self.assertEqual(self.data, dst.data()) @@ -53,12 +56,14 @@ class test_tcp_sink(gr_unittest.TestCase): n_data = 16 self.data = tuple([x for x in range(n_data)]) -# tcp_server_sink blocks until client does not connect, start client process first +# tcp_server_sink blocks until client does not connect, start client +# process first p = Process(target=self._tcp_client) p.start() src = blocks.vector_source_s(self.data, False) - tcp_snd = blocks.tcp_server_sink(self.itemsize, self.addr, self.port, False) + tcp_snd = blocks.tcp_server_sink( + self.itemsize, self.addr, self.port, False) self.tb_snd.connect(src, tcp_snd) self.tb_snd.run() @@ -69,8 +74,8 @@ class test_tcp_sink(gr_unittest.TestCase): def stop_rcv(self): self.timeout = True self.tb_rcv.stop() - #print "tb_rcv stopped by Timer" + # print "tb_rcv stopped by Timer" + if __name__ == '__main__': gr_unittest.run(test_tcp_sink) - diff --git a/gr-blocks/python/blocks/qa_threshold.py b/gr-blocks/python/blocks/qa_threshold.py index 8b9eb7b071..e3064b1ad3 100644 --- a/gr-blocks/python/blocks/qa_threshold.py +++ b/gr-blocks/python/blocks/qa_threshold.py @@ -11,6 +11,7 @@ from gnuradio import gr, gr_unittest, blocks + class test_threshold(gr_unittest.TestCase): def setUp(self): @@ -38,5 +39,6 @@ class test_threshold(gr_unittest.TestCase): self.assertEqual(expected_result, dst_data) + if __name__ == '__main__': gr_unittest.run(test_threshold) diff --git a/gr-blocks/python/blocks/qa_transcendental.py b/gr-blocks/python/blocks/qa_transcendental.py index deb0d56cc2..95649d0388 100644 --- a/gr-blocks/python/blocks/qa_transcendental.py +++ b/gr-blocks/python/blocks/qa_transcendental.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_transcendental(gr_unittest.TestCase): def setUp(self): @@ -24,8 +25,8 @@ class test_transcendental(gr_unittest.TestCase): def test_01(self): tb = self.tb - data = 100*[0,] - expected_result = 100*[1,] + data = 100 * [0, ] + expected_result = 100 * [1, ] src = blocks.vector_source_f(data, False) op = blocks.transcendental("cos", "float") @@ -42,8 +43,8 @@ class test_transcendental(gr_unittest.TestCase): def test_02(self): tb = self.tb - data = 100*[3,] - expected_result = 100*[math.log10(3),] + data = 100 * [3, ] + expected_result = 100 * [math.log10(3), ] src = blocks.vector_source_f(data, False) op = blocks.transcendental("log10", "float") @@ -60,8 +61,8 @@ class test_transcendental(gr_unittest.TestCase): def test_03(self): tb = self.tb - data = 100*[3,] - expected_result = 100*[math.tanh(3),] + data = 100 * [3, ] + expected_result = 100 * [math.tanh(3), ] src = blocks.vector_source_f(data, False) op = blocks.transcendental("tanh", "float") @@ -75,5 +76,6 @@ class test_transcendental(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5) + if __name__ == '__main__': gr_unittest.run(test_transcendental) diff --git a/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py b/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py index 291a5054b4..fd382c134c 100644 --- a/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py +++ b/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py @@ -14,16 +14,17 @@ import pmt from gnuradio import gr, gr_unittest from gnuradio import blocks + class qa_tsb_vector_sink (gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() self.tsb_key = "tsb" - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_t (self): + def test_001_t(self): packet_len = 4 data = list(range(2 * packet_len)) tag = gr.tag_t() @@ -33,10 +34,13 @@ class qa_tsb_vector_sink (gr_unittest.TestCase): src = blocks.vector_source_f(data, tags=(tag,)) sink = blocks.tsb_vector_sink_f(tsb_key=self.tsb_key) self.tb.connect( - src, - blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len, self.tsb_key), - sink - ) + src, + blocks.stream_to_tagged_stream( + gr.sizeof_float, + 1, + packet_len, + self.tsb_key), + sink) self.tb.run() self.assertEqual([data[0:packet_len], data[packet_len:]], sink.data()) self.assertEqual(len(sink.tags()), 1) diff --git a/gr-blocks/python/blocks/qa_type_conversions.py b/gr-blocks/python/blocks/qa_type_conversions.py index 7f56bdc177..38e1c6c064 100644 --- a/gr-blocks/python/blocks/qa_type_conversions.py +++ b/gr-blocks/python/blocks/qa_type_conversions.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks from math import sqrt, atan2 + class test_type_conversions(gr_unittest.TestCase): def setUp(self): @@ -52,7 +53,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertEqual(expected_data, dst.data()) def test_complex_to_interleaved_char(self): - src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src_data = (1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j) expected_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] src = blocks.vector_source_c(src_data) op = blocks.complex_to_interleaved_char() @@ -62,7 +63,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertEqual(expected_data, dst.data()) def test_complex_to_interleaved_short(self): - src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src_data = (1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j) expected_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] src = blocks.vector_source_c(src_data) op = blocks.complex_to_interleaved_short() @@ -72,7 +73,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertEqual(expected_data, dst.data()) def test_complex_to_float_1(self): - src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src_data = (1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j) expected_data = [1.0, 3.0, 5.0, 7.0, 9.0] src = blocks.vector_source_c(src_data) op = blocks.complex_to_float() @@ -82,7 +83,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) def test_complex_to_float_2(self): - src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src_data = (1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j) expected_data1 = [1.0, 3.0, 5.0, 7.0, 9.0] expected_data2 = [2.0, 4.0, 6.0, 8.0, 10.0] src = blocks.vector_source_c(src_data) @@ -97,7 +98,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_data2, dst2.data()) def test_complex_to_real(self): - src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src_data = (1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j) expected_data = [1.0, 3.0, 5.0, 7.0, 9.0] src = blocks.vector_source_c(src_data) op = blocks.complex_to_real() @@ -107,7 +108,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) def test_complex_to_imag(self): - src_data = (1+2j, 3+4j, 5+6j, 7+8j, 9+10j) + src_data = (1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j) expected_data = [2.0, 4.0, 6.0, 8.0, 10.0] src = blocks.vector_source_c(src_data) op = blocks.complex_to_imag() @@ -117,7 +118,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) def test_complex_to_mag(self): - src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j) + src_data = (1 + 2j, 3 - 4j, 5 + 6j, 7 - 8j, -9 + 10j) expected_data = [sqrt(5), sqrt(25), sqrt(61), sqrt(113), sqrt(181)] src = blocks.vector_source_c(src_data) op = blocks.complex_to_mag() @@ -127,7 +128,7 @@ class test_type_conversions(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_data, dst.data(), 5) def test_complex_to_mag_squared(self): - src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j) + src_data = (1 + 2j, 3 - 4j, 5 + 6j, 7 - 8j, -9 + 10j) expected_data = [5.0, 25.0, 61.0, 113.0, 181.0] src = blocks.vector_source_c(src_data) op = blocks.complex_to_mag_squared() @@ -137,8 +138,9 @@ class test_type_conversions(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_data, dst.data()) def test_complex_to_arg(self): - src_data = (1+2j, 3-4j, 5+6j, 7-8j, -9+10j) - expected_data = [atan2(2, 1), atan2(-4,3), atan2(6, 5), atan2(-8, 7), atan2(10,-9)] + src_data = (1 + 2j, 3 - 4j, 5 + 6j, 7 - 8j, -9 + 10j) + expected_data = [atan2(2, 1), atan2(-4, 3), + atan2(6, 5), atan2(-8, 7), atan2(10, -9)] src = blocks.vector_source_c(src_data) op = blocks.complex_to_arg() dst = blocks.vector_sink_f() @@ -168,7 +170,7 @@ class test_type_conversions(gr_unittest.TestCase): def test_float_to_complex_1(self): src_data = (1.0, 3.0, 5.0, 7.0, 9.0) - expected_data = [1+0j, 3+0j, 5+0j, 7+0j, 9+0j] + expected_data = [1 + 0j, 3 + 0j, 5 + 0j, 7 + 0j, 9 + 0j] src = blocks.vector_source_f(src_data) op = blocks.float_to_complex() dst = blocks.vector_sink_c() @@ -179,7 +181,7 @@ class test_type_conversions(gr_unittest.TestCase): def test_float_to_complex_2(self): src1_data = (1.0, 3.0, 5.0, 7.0, 9.0) src2_data = (2.0, 4.0, 6.0, 8.0, 10.0) - expected_data = [1+2j, 3+4j, 5+6j, 7+8j, 9+10j] + expected_data = [1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j] src1 = blocks.vector_source_f(src1_data) src2 = blocks.vector_source_f(src2_data) op = blocks.float_to_complex() @@ -262,7 +264,7 @@ class test_type_conversions(gr_unittest.TestCase): def test_interleaved_short_to_complex(self): src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - expected_data = [1+2j, 3+4j, 5+6j, 7+8j, 9+10j] + expected_data = [1 + 2j, 3 + 4j, 5 + 6j, 7 + 8j, 9 + 10j] src = blocks.vector_source_s(src_data) op = blocks.interleaved_short_to_complex() dst = blocks.vector_sink_c() diff --git a/gr-blocks/python/blocks/qa_udp_source_sink.py b/gr-blocks/python/blocks/qa_udp_source_sink.py index 45e53991d1..2e48d0bfae 100644 --- a/gr-blocks/python/blocks/qa_udp_source_sink.py +++ b/gr-blocks/python/blocks/qa_udp_source_sink.py @@ -20,7 +20,7 @@ from threading import Timer, Thread def recv_data(sock, result): while True: - data = sock.recv(4*1000) + data = sock.recv(4 * 1000) if len(data) == 0: break real_data = numpy.frombuffer(data, dtype=numpy.float32) @@ -53,12 +53,10 @@ class test_udp_sink_source(gr_unittest.TestCase): self.tb_snd.run() udp_snd.disconnect() - - udp_snd.connect('localhost', port+1) + udp_snd.connect('localhost', port + 1) src.rewind() self.tb_snd.run() - def test_sink_001(self): port = 65520 @@ -78,7 +76,6 @@ class test_udp_sink_source(gr_unittest.TestCase): udp_snd = blocks.udp_sink(gr.sizeof_float, '127.0.0.1', port) self.tb_snd.connect(src, udp_snd) - self.tb_snd.run() udp_snd.disconnect() t.join() @@ -112,8 +109,6 @@ class test_udp_sink_source(gr_unittest.TestCase): self.assertEqual(expected_result, recv_data) - - def test_003(self): port = 65530 @@ -137,7 +132,7 @@ class test_udp_sink_source(gr_unittest.TestCase): self.tb_snd.run() udp_snd.disconnect() self.timeout = False - q = Timer(2.0,self.stop_rcv) + q = Timer(2.0, self.stop_rcv) q.start() self.tb_rcv.wait() q.cancel() @@ -149,8 +144,8 @@ class test_udp_sink_source(gr_unittest.TestCase): def stop_rcv(self): self.timeout = True self.tb_rcv.stop() - #print "tb_rcv stopped by Timer" + # print "tb_rcv stopped by Timer" + if __name__ == '__main__': gr_unittest.run(test_udp_sink_source) - diff --git a/gr-blocks/python/blocks/qa_unpack_k_bits.py b/gr-blocks/python/blocks/qa_unpack_k_bits.py index aea57a65f6..6f41240830 100644 --- a/gr-blocks/python/blocks/qa_unpack_k_bits.py +++ b/gr-blocks/python/blocks/qa_unpack_k_bits.py @@ -13,19 +13,20 @@ from gnuradio import gr, gr_unittest, blocks import random + class test_unpack(gr_unittest.TestCase): def setUp(self): random.seed(0) - self.tb = gr.top_block () + self.tb = gr.top_block() def tearDown(self): self.tb = None def test_001(self): - src_data = [1,0,1,1,0,1,1,0] - expected_results = [1,0,1,1,0,1,1,0] - src = blocks.vector_source_b(src_data,False) + src_data = [1, 0, 1, 1, 0, 1, 1, 0] + expected_results = [1, 0, 1, 1, 0, 1, 1, 0] + src = blocks.vector_source_b(src_data, False) op = blocks.unpack_k_bits_bb(1) dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) @@ -33,15 +34,15 @@ class test_unpack(gr_unittest.TestCase): self.assertEqual(expected_results, dst.data()) def test_002(self): - src_data = [ 2, 3, 0, 1] - expected_results = [1,0,1,1,0,0,0,1] - src = blocks.vector_source_b(src_data,False) + src_data = [2, 3, 0, 1] + expected_results = [1, 0, 1, 1, 0, 0, 0, 1] + src = blocks.vector_source_b(src_data, False) op = blocks.unpack_k_bits_bb(2) dst = blocks.vector_sink_b() self.tb.connect(src, op, dst) self.tb.run() self.assertEqual(expected_results, dst.data()) -if __name__ == '__main__': - gr_unittest.run(test_unpack) +if __name__ == '__main__': + gr_unittest.run(test_unpack) diff --git a/gr-blocks/python/blocks/qa_vco.py b/gr-blocks/python/blocks/qa_vco.py index 354eb7b42c..7ebdedc8e2 100644 --- a/gr-blocks/python/blocks/qa_vco.py +++ b/gr-blocks/python/blocks/qa_vco.py @@ -12,28 +12,31 @@ from gnuradio import gr, gr_unittest, blocks import math + def sig_source_f(samp_rate, freq, amp, N): t = [float(x) / samp_rate for x in range(N)] - y = [amp*math.cos(2.*math.pi*freq*x) for x in t] + y = [amp * math.cos(2. * math.pi * freq * x) for x in t] return y + def sig_source_c(samp_rate, freq, amp, N): t = [float(x) / samp_rate for x in range(N)] - y = [math.cos(2.*math.pi*freq*x) + \ - 1j*math.sin(2.*math.pi*freq*x) for x in t] + y = [math.cos(2. * math.pi * freq * x) + + 1j * math.sin(2. * math.pi * freq * x) for x in t] return y + class test_vco(gr_unittest.TestCase): - def setUp (self): - self.tb = gr.top_block () + def setUp(self): + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None def test_001(self): - src_data = 200*[0,] + 200*[0.5,] + 200*[1,] - expected_result = 200*[1,] + \ + src_data = 200 * [0, ] + 200 * [0.5, ] + 200 * [1, ] + expected_result = 200 * [1, ] + \ sig_source_f(1, 0.125, 1, 200) + \ sig_source_f(1, 0.25, 1, 200) @@ -47,10 +50,9 @@ class test_vco(gr_unittest.TestCase): result_data = dst.data() self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5) - def test_002(self): - src_data = 200*[0,] + 200*[0.5,] + 200*[1,] - expected_result = 200*[1,] + \ + src_data = 200 * [0, ] + 200 * [0.5, ] + 200 * [1, ] + expected_result = 200 * [1, ] + \ sig_source_c(1, 0.125, 1, 200) + \ sig_source_c(1, 0.25, 1, 200) @@ -67,4 +69,3 @@ class test_vco(gr_unittest.TestCase): if __name__ == '__main__': gr_unittest.run(test_vco) - diff --git a/gr-blocks/python/blocks/qa_vector_insert.py b/gr-blocks/python/blocks/qa_vector_insert.py index 4c25c1538a..d0be59a81e 100644 --- a/gr-blocks/python/blocks/qa_vector_insert.py +++ b/gr-blocks/python/blocks/qa_vector_insert.py @@ -12,6 +12,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_vector_insert(gr_unittest.TestCase): def setUp(self): @@ -24,12 +25,12 @@ class test_vector_insert(gr_unittest.TestCase): src_data = [float(x) for x in range(16)] expected_result = tuple(src_data) - period = 9177; - offset = 0; + period = 9177 + offset = 0 src = blocks.null_source(1) - head = blocks.head(1, 10000000); - ins = blocks.vector_insert_b([1], period, offset); + head = blocks.head(1, 10000000) + ins = blocks.vector_insert_b([1], period, offset) dst = blocks.vector_sink_b() self.tb.connect(src, head, ins, dst) @@ -37,7 +38,7 @@ class test_vector_insert(gr_unittest.TestCase): result_data = dst.data() for i in range(10000): - if(i%period == offset): + if(i % period == offset): self.assertEqual(1, result_data[i]) else: self.assertEqual(0, result_data[i]) @@ -45,10 +46,11 @@ class test_vector_insert(gr_unittest.TestCase): def test_002(self): # insert tags and check their propagation, zero offset period = 11000 offset = 0 - insert = [1.0,] * 1000 + insert = [1.0, ] * 1000 src = blocks.null_source(gr.sizeof_float) - s2ts = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, period-len(insert), "packet") + s2ts = blocks.stream_to_tagged_stream( + gr.sizeof_float, 1, period - len(insert), "packet") head = blocks.head(gr.sizeof_float, 1000000) ins = blocks.vector_insert_f(insert, period, offset) dst = blocks.vector_sink_f() @@ -65,10 +67,11 @@ class test_vector_insert(gr_unittest.TestCase): def test_003(self): # insert tags and check their propagation, non-zero offset period = 11000 offset = 1000 - insert = [1.0,] * 1000 + insert = [1.0, ] * 1000 src = blocks.null_source(gr.sizeof_float) - s2ts = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, period-len(insert), "packet") + s2ts = blocks.stream_to_tagged_stream( + gr.sizeof_float, 1, period - len(insert), "packet") head = blocks.head(gr.sizeof_float, 1000000) ins = blocks.vector_insert_f(insert, period, offset) dst = blocks.vector_sink_f() @@ -86,10 +89,11 @@ class test_vector_insert(gr_unittest.TestCase): period = 11000 offset = 1000 packetlen = 2000 - insert = [1.0,] * 1000 + insert = [1.0, ] * 1000 src = blocks.null_source(gr.sizeof_float) - s2ts = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packetlen, "packet") + s2ts = blocks.stream_to_tagged_stream( + gr.sizeof_float, 1, packetlen, "packet") head = blocks.head(gr.sizeof_float, 1000000) ins = blocks.vector_insert_f(insert, period, offset) dst = blocks.vector_sink_f() @@ -97,12 +101,25 @@ class test_vector_insert(gr_unittest.TestCase): self.tb.connect(src, s2ts, head, ins, dst) self.tb.run() - expected_result = (0, 2000, 4000, 6000, 8000, 11000, 13000, 15000, 17000, 19000, 22000, 24000, 26000) + expected_result = ( + 0, + 2000, + 4000, + 6000, + 8000, + 11000, + 13000, + 15000, + 17000, + 19000, + 22000, + 24000, + 26000) tags = dst.tags() offsets = [tag.offset for tag in tags] for i in range(len(expected_result)): self.assertTrue(expected_result[i] == offsets[i]) + if __name__ == '__main__': gr_unittest.run(test_vector_insert) - diff --git a/gr-blocks/python/blocks/qa_vector_map.py b/gr-blocks/python/blocks/qa_vector_map.py index e2001537eb..a0c9b6dc3e 100644 --- a/gr-blocks/python/blocks/qa_vector_map.py +++ b/gr-blocks/python/blocks/qa_vector_map.py @@ -12,6 +12,7 @@ from gnuradio import gr, gr_unittest, blocks import math + class test_vector_map(gr_unittest.TestCase): def setUp(self): @@ -25,9 +26,9 @@ class test_vector_map(gr_unittest.TestCase): N = 5 src_data = list(range(0, 20)) expected_result = [] - for i in range(N-1, len(src_data), N): + for i in range(N - 1, len(src_data), N): for j in range(0, N): - expected_result.append(1.0*(i-j)) + expected_result.append(1.0 * (i - j)) mapping = [list(reversed([(0, i) for i in range(0, N)]))] src = blocks.vector_source_f(src_data, False, N) vmap = blocks.vector_map(gr.sizeof_float, (N, ), mapping) @@ -69,13 +70,13 @@ class test_vector_map(gr_unittest.TestCase): expected_D = [1, 11, 2, 12, 3, 13, 4, 14, 5, 15] expected_E = [1, 11, 98, 99, 2, 12, 96, 97, 3, 13, 94, 95, 4, 14, 92, 93, 5, 15, 90, 91] - mapping = [[(0, 0), (1, 0)], # mapping to produce D - [(0, 0), (1, 0), (2, 1), (2, 0)], # mapping to produce E + mapping = [[(0, 0), (1, 0)], # mapping to produce D + [(0, 0), (1, 0), (2, 1), (2, 0)], # mapping to produce E ] srcA = blocks.vector_source_f(A, False, 1) srcB = blocks.vector_source_f(B, False, 1) srcC = blocks.vector_source_f(C, False, 2) - vmap = blocks.vector_map(gr.sizeof_int, (1, 1, 2), mapping) + vmap = blocks.vector_map(gr.sizeof_int, (1, 1, 2), mapping) dstD = blocks.vector_sink_f(2) dstE = blocks.vector_sink_f(4) self.tb.connect(srcA, (vmap, 0)) @@ -87,6 +88,6 @@ class test_vector_map(gr_unittest.TestCase): self.assertEqual(expected_D, dstD.data()) self.assertEqual(expected_E, dstE.data()) + if __name__ == '__main__': gr_unittest.run(test_vector_map) - diff --git a/gr-blocks/python/blocks/qa_vector_sink_source.py b/gr-blocks/python/blocks/qa_vector_sink_source.py index 6a4806e660..9fe604c066 100644 --- a/gr-blocks/python/blocks/qa_vector_sink_source.py +++ b/gr-blocks/python/blocks/qa_vector_sink_source.py @@ -13,6 +13,7 @@ from gnuradio import gr, gr_unittest, blocks import pmt import math + def make_tag(key, value, offset, srcid=None): tag = gr.tag_t() tag.key = pmt.string_to_symbol(key) @@ -22,9 +23,11 @@ def make_tag(key, value, offset, srcid=None): tag.srcid = pmt.to_pmt(srcid) return tag + def compare_tags(a, b): return a.offset == b.offset and pmt.equal(a.key, b.key) and \ - pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid) + pmt.equal(a.value, b.value) and pmt.equal(a.srcid, b.srcid) + class test_vector_sink_source(gr_unittest.TestCase): @@ -64,7 +67,9 @@ class test_vector_sink_source(gr_unittest.TestCase): # Test that we can only make vectors (the I/O type) if the input # vector has sufficient size src_data = [float(x) for x in range(16)] - self.assertRaises(ValueError, lambda : blocks.vector_source_f(src_data, False, 3)) + self.assertRaises( + ValueError, lambda: blocks.vector_source_f( + src_data, False, 3)) def test_004(self): # Test sending and receiving tagged streams @@ -91,10 +96,10 @@ class test_vector_sink_source(gr_unittest.TestCase): expected_result = src_data + src_data src_tags = [make_tag('key', 'val', 0, 'src')] expected_tags = [make_tag('key', 'val', 0, 'src'), - make_tag('key', 'val', length, 'src')] + make_tag('key', 'val', length, 'src')] src = blocks.vector_source_f(src_data, repeat=True, tags=src_tags) - head = blocks.head(gr.sizeof_float, 2*length) + head = blocks.head(gr.sizeof_float, 2 * length) dst = blocks.vector_sink_f() self.tb.connect(src, head, dst) @@ -111,7 +116,7 @@ class test_vector_sink_source(gr_unittest.TestCase): src_data = [float(x) for x in range(16)] expected_result = src_data - src = blocks.vector_source_f((3,1,4)) + src = blocks.vector_source_f((3, 1, 4)) dst = blocks.vector_sink_f() src.set_data(src_data) @@ -138,4 +143,3 @@ class test_vector_sink_source(gr_unittest.TestCase): if __name__ == '__main__': gr_unittest.run(test_vector_sink_source) - diff --git a/gr-blocks/python/blocks/qa_wavfile.py b/gr-blocks/python/blocks/qa_wavfile.py index 0b08ce3290..c62cd703c4 100644 --- a/gr-blocks/python/blocks/qa_wavfile.py +++ b/gr-blocks/python/blocks/qa_wavfile.py @@ -15,10 +15,13 @@ import os from os.path import getsize g_in_file = os.path.join(os.getenv("srcdir"), "test_16bit_1chunk.wav") -g_in_file_normal = os.path.join(os.getenv("srcdir"), "test_16bit_1chunk_normal.wav") +g_in_file_normal = os.path.join( + os.getenv("srcdir"), + "test_16bit_1chunk_normal.wav") g_extra_header_offset = 36 g_extra_header_len = 22 + class test_wavefile(gr_unittest.TestCase): def setUp(self): @@ -32,10 +35,10 @@ class test_wavefile(gr_unittest.TestCase): self.assertEqual(wf.sample_rate(), 8000) def test_002_checkwavcopy(self): - infile = g_in_file + infile = g_in_file outfile = "test_out.wav" - wf_in = blocks.wavfile_source(infile) + wf_in = blocks.wavfile_source(infile) wf_out = blocks.wavfile_sink(outfile, wf_in.channels(), wf_in.sample_rate(), @@ -52,11 +55,14 @@ class test_wavefile(gr_unittest.TestCase): pass with wave.open(outfile, 'rb') as f: pass - except: + except BaseException: raise AssertionError('Invalid WAV file') # we're losing all extra header chunks - self.assertEqual(getsize(infile) - g_extra_header_len, getsize(outfile)) + self.assertEqual( + getsize(infile) - + g_extra_header_len, + getsize(outfile)) with open(infile, 'rb') as f: in_data = bytearray(f.read()) @@ -70,11 +76,12 @@ class test_wavefile(gr_unittest.TestCase): out_data[4:8] = b'\x00\x00\x00\x00' # cut extra header chunks from input file - self.assertEqual(in_data[:g_extra_header_offset] + \ - in_data[g_extra_header_offset + g_extra_header_len:], out_data) + self.assertEqual(in_data[:g_extra_header_offset] + + in_data[g_extra_header_offset + + g_extra_header_len:], out_data) def test_003_checkwav_append_copy(self): - infile = g_in_file_normal + infile = g_in_file_normal outfile = "test_out_append.wav" # 1. Copy input to output @@ -82,7 +89,7 @@ class test_wavefile(gr_unittest.TestCase): copyfile(infile, outfile) # 2. append copy - wf_in = blocks.wavfile_source(infile) + wf_in = blocks.wavfile_source(infile) wf_out = blocks.wavfile_sink(outfile, wf_in.channels(), wf_in.sample_rate(), @@ -94,7 +101,7 @@ class test_wavefile(gr_unittest.TestCase): wf_out.close() # 3. append halved copy - wf_in = blocks.wavfile_source(infile) + wf_in = blocks.wavfile_source(infile) halver = blocks.multiply_const_ff(0.5) wf_out = blocks.wavfile_sink(outfile, wf_in.channels(), @@ -117,22 +124,22 @@ class test_wavefile(gr_unittest.TestCase): with wave.open(outfile, 'rb') as w_out: out_params = w_out.getparams() data_out = wav_read_frames(w_out) - except: + except BaseException: raise AssertionError('Invalid WAV file') # Params must be equal except in size: - expected_params = in_params._replace(nframes=3*in_params.nframes) + expected_params = in_params._replace(nframes=3 * in_params.nframes) self.assertEqual(out_params, expected_params) # Part 1 self.assertEqual(data_in, data_out[:len(data_in)]) # Part 2 - self.assertEqual(data_in, data_out[len(data_in):2*len(data_in)]) + self.assertEqual(data_in, data_out[len(data_in):2 * len(data_in)]) # Part 3 - data_in_halved = [int(round(d/2)) for d in data_in] - self.assertEqual(data_in_halved, data_out[2*len(data_in):]) + data_in_halved = [int(round(d / 2)) for d in data_in] + self.assertEqual(data_in_halved, data_out[2 * len(data_in):]) os.remove(outfile) @@ -140,14 +147,21 @@ class test_wavefile(gr_unittest.TestCase): outfile = "no_file.wav" with self.assertRaisesRegex(RuntimeError, "Can't open WAV file."): - blocks.wavfile_sink(outfile, 1, 44100, blocks.FORMAT_WAV, blocks.FORMAT_PCM_16, True) + blocks.wavfile_sink( + outfile, + 1, + 44100, + blocks.FORMAT_WAV, + blocks.FORMAT_PCM_16, + True) os.remove(outfile) + def wav_read_frames(w): import struct # grouper from itertools recipes. - grouper = lambda iterable, n: list(zip(* ([iter(iterable)] * n) )) + def grouper(iterable, n): return list(zip(* ([iter(iterable)] * n))) assert w.getsampwidth() == 2 # Assume 16 bits return [ struct.unpack('<h', bytes(frame_g))[0] |