diff options
author | Josh Morman <jmorman@perspectalabs.com> | 2019-05-13 14:24:47 -0400 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2019-06-15 16:55:58 -0700 |
commit | 3ad584916c9fe2c1404dcd634d02eb85afd98337 (patch) | |
tree | 54480012d5a168bd3cf8d1db812d95b728d046f4 /gr-blocks/python | |
parent | 25b01217d191a419ff686bd9c59a11f4a3b7437d (diff) |
blocks: replace blks2_selector with new implementation
blks2_selector was deprecated and finally removed
This block implements the same functionality but in a way more similar
to the Copy block
fixes #2460
Diffstat (limited to 'gr-blocks/python')
-rw-r--r-- | gr-blocks/python/blocks/qa_selector.py | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/gr-blocks/python/blocks/qa_selector.py b/gr-blocks/python/blocks/qa_selector.py new file mode 100644 index 0000000000..5bf01b593f --- /dev/null +++ b/gr-blocks/python/blocks/qa_selector.py @@ -0,0 +1,223 @@ +#!/usr/bin/env python +# +# Copyright 2019 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + + +from gnuradio import gr, gr_unittest, blocks + + +class test_selector(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_select_same(self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_result = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_drop = () + + num_inputs = 4; num_outputs = 4 + input_index = 1; output_index = 2 + + op = blocks.selector(gr.sizeof_char, input_index, output_index) + + src = [] + dst = [] + for ii in range(num_inputs): + src.append( blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op,ii)) + for jj in range(num_outputs): + dst.append(blocks.vector_sink_b()) + self.tb.connect((op,jj),dst[jj]) + + + self.tb.run() + + dst_data = dst[output_index].data() + + self.assertEqual(expected_result, dst_data) + + + def test_select_input(self): + + num_inputs = 4; num_outputs = 4 + input_index = 1; output_index = 2 + + op = blocks.selector(gr.sizeof_char, input_index, output_index) + + src = [] + dst = [] + for ii in range(num_inputs): + src_data = [ii+1]*10 + src.append( blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op,ii)) + for jj in range(num_outputs): + dst.append(blocks.vector_sink_b()) + self.tb.connect((op,jj),dst[jj]) + + + self.tb.run() + + expected_result = [input_index+1]*10 + dst_data = list(dst[output_index].data()) + + self.assertEqual(expected_result, dst_data) + + def test_dump(self): + + num_inputs = 4; num_outputs = 4 + input_index = 1; output_index = 2 + output_not_selected = 3 + + op = blocks.selector(gr.sizeof_char, input_index, output_index) + + src = [] + dst = [] + for ii in range(num_inputs): + src_data = [ii+1]*10 + src.append( blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op,ii)) + for jj in range(num_outputs): + dst.append(blocks.vector_sink_b()) + self.tb.connect((op,jj),dst[jj]) + + + self.tb.run() + + expected_result = [] + dst_data = list(dst[output_not_selected].data()) + + self.assertEqual(expected_result, dst_data) + + def test_not_enabled (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_result = () + + num_inputs = 4; num_outputs = 4 + input_index = 1; output_index = 2 + + op = blocks.selector(gr.sizeof_char, input_index, output_index) + op.set_enabled(False) + + src = [] + dst = [] + for ii in range(num_inputs): + src.append( blocks.vector_source_b(src_data)) + self.tb.connect(src[ii], (op,ii)) + for jj in range(num_outputs): + dst.append(blocks.vector_sink_b()) + self.tb.connect((op,jj),dst[jj]) + + + self.tb.run() + + dst_data = dst[output_index].data() + self.assertEqual(expected_result, dst_data) + + + # These tests cannot be run as set_index can only be called after check_topology is called + # def test_set_indices(self): + + # num_inputs = 4; num_outputs = 4 + # input_index = 1; output_index = 2 + + # op = blocks.selector(gr.sizeof_char, 0, 0) + + + # src = [] + # dst = [] + # for ii in range(num_inputs): + # src_data = [ii+1]*10 + # src.append( blocks.vector_source_b(src_data)) + # self.tb.connect(src[ii], (op,ii)) + # for jj in range(num_outputs): + # dst.append(blocks.vector_sink_b()) + # self.tb.connect((op,jj),dst[jj]) + + # op.set_input_index(input_index) + # op.set_output_index(output_index) + + # self.tb.run() + + # expected_result = [input_index+1]*10 + # dst_data = list(dst[output_index].data()) + + # self.assertEqual(expected_result, dst_data) + + # def test_dont_set_indices(self): + + # num_inputs = 4; num_outputs = 4 + # input_index = 1; output_index = 2 + + # op = blocks.selector(gr.sizeof_char, 0, 0) + # #op.set_input_index(input_index) + # #op.set_output_index(output_index) + + # src = [] + # dst = [] + # for ii in range(num_inputs): + # src_data = [ii+1]*10 + # src.append( blocks.vector_source_b(src_data)) + # self.tb.connect(src[ii], (op,ii)) + # for jj in range(num_outputs): + # dst.append(blocks.vector_sink_b()) + # self.tb.connect((op,jj),dst[jj]) + + + # self.tb.run() + + # expected_result = [input_index+1]*10 + # dst_data = list(dst[output_index].data()) + + # self.assertNotEqual(expected_result, dst_data) + + def test_float_vector(self): + + num_inputs = 4; num_outputs = 4 + input_index = 1; output_index = 2 + + veclen = 3 + + op = blocks.selector(gr.sizeof_float*veclen, input_index, output_index) + + src = [] + dst = [] + for ii in range(num_inputs): + src_data = [float(ii)+1]*10*veclen + src.append( blocks.vector_source_f(src_data, repeat=False, vlen=veclen)) + self.tb.connect(src[ii], (op,ii)) + for jj in range(num_outputs): + dst.append(blocks.vector_sink_f(vlen=veclen)) + self.tb.connect((op,jj),dst[jj]) + + + self.tb.run() + + expected_result = [float(input_index)+1]*10*veclen + dst_data = list(dst[output_index].data()) + + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_selector) |