diff options
Diffstat (limited to 'gr-blocks/python')
-rw-r--r-- | gr-blocks/python/qa_argmax.py | 73 | ||||
-rwxr-xr-x | gr-blocks/python/qa_max.py | 65 | ||||
-rwxr-xr-x | gr-blocks/python/qa_mute.py | 89 | ||||
-rwxr-xr-x | gr-blocks/python/qa_pack_k_bits.py | 68 | ||||
-rwxr-xr-x | gr-blocks/python/qa_pdu.py | 92 | ||||
-rw-r--r-- | gr-blocks/python/qa_probe_signal.py | 65 | ||||
-rw-r--r-- | gr-blocks/python/qa_sample_and_hold.py | 54 | ||||
-rwxr-xr-x | gr-blocks/python/qa_unpack_k_bits.py | 57 |
8 files changed, 563 insertions, 0 deletions
diff --git a/gr-blocks/python/qa_argmax.py b/gr-blocks/python/qa_argmax.py new file mode 100644 index 0000000000..ec82b71cd4 --- /dev/null +++ b/gr-blocks/python/qa_argmax.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import math + +class test_arg_max(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + tb = self.tb + + src1_data = (0,0.2,-0.3,0,12,0) + src2_data = (0,0.0,3.0,0,10,0) + src3_data = (0,0.0,3.0,0,1,0) + + src1 = gr.vector_source_f(src1_data) + s2v1 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data)) + tb.connect(src1, s2v1) + + src2 = gr.vector_source_f(src2_data) + s2v2 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data)) + tb.connect(src2, s2v2) + + src3 = gr.vector_source_f(src3_data) + s2v3 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data)) + tb.connect(src3, s2v3) + + dst1 = gr.vector_sink_s() + dst2 = gr.vector_sink_s() + argmax = blocks.argmax_fs(len(src1_data)) + + tb.connect(s2v1, (argmax, 0)) + tb.connect(s2v2, (argmax, 1)) + tb.connect(s2v3, (argmax, 2)) + + tb.connect((argmax,0), dst1) + tb.connect((argmax,1), dst2) + + tb.run() + index = dst1.data() + source = dst2.data() + self.assertEqual(index, (4,)) + self.assertEqual(source, (0,)) + +if __name__ == '__main__': + gr_unittest.run(test_arg_max, "test_arg_max.xml") + diff --git a/gr-blocks/python/qa_max.py b/gr-blocks/python/qa_max.py new file mode 100755 index 0000000000..4af70bd4be --- /dev/null +++ b/gr-blocks/python/qa_max.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import math + +class test_max(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = (0,0.2,-0.3,0,12,0) + expected_result = (float(max(src_data)),) + + src = gr.vector_source_f(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) + op = blocks.max_ff(len(src_data)) + dst = gr.vector_sink_f() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_002(self): + src_data=(-100,-99,-98,-97,-96,-1) + expected_result = (float(max(src_data)),) + + src = gr.vector_source_f(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) + op = blocks.max_ff(len(src_data)) + dst = gr.vector_sink_f() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_max, "test_max.xml") + diff --git a/gr-blocks/python/qa_mute.py b/gr-blocks/python/qa_mute.py new file mode 100755 index 0000000000..96c57b2ed1 --- /dev/null +++ b/gr-blocks/python/qa_mute.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# +# Copyright 2004,2005,2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_mute(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def help_ii(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_i(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_i() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ff(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_f(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_f() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_cc(self, src_data, exp_data, op): + for s in zip(range(len(src_data)), src_data): + src = gr.vector_source_c(s[1]) + self.tb.connect(src, (op, s[0])) + dst = gr.vector_sink_c() + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def test_unmute_ii(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (1, 2, 3, 4, 5) + op = blocks.mute_ii(False) + self.help_ii((src_data,), expected_result, op) + + def test_mute_ii(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (0, 0, 0, 0, 0) + op = blocks.mute_ii(True) + self.help_ii((src_data,), expected_result, op) + + def test_unmute_cc(self): + src_data = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + op = blocks.mute_cc(False) + self.help_cc((src_data,), expected_result, op) + + def test_unmute_cc(self): + src_data = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + expected_result =(0+0j, 0+0j, 0+0j, 0+0j, 0+0j) + op = blocks.mute_cc(True) + self.help_cc((src_data,), expected_result, op) + +if __name__ == '__main__': + gr_unittest.run(test_mute, "test_mute.xml") diff --git a/gr-blocks/python/qa_pack_k_bits.py b/gr-blocks/python/qa_pack_k_bits.py new file mode 100755 index 0000000000..cd55d2f200 --- /dev/null +++ b/gr-blocks/python/qa_pack_k_bits.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# +# Copyright 2006,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import random + +class test_pack(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = (1,0,1,1,0,1,1,0) + expected_results = (1,0,1,1,0,1,1,0) + src = gr.vector_source_b(src_data,False) + op = blocks.pack_k_bits_bb(1) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_results, dst.data()) + + def test_002(self): + src_data = (1,0,1,1,0,0,0,1) + expected_results = ( 2, 3, 0, 1) + src = gr.vector_source_b(src_data,False) + op = blocks.pack_k_bits_bb(2) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + #self.assertEqual(expected_results, dst.data()) + self.assertEqual(expected_results, dst.data()) + + def test_003(self): + src_data = expected_results = map(lambda x: random.randint(0,3), range(10)); + src = gr.vector_source_b( src_data ); + pack = blocks.pack_k_bits_bb(2); + unpack = blocks.unpack_k_bits_bb(2); + snk = gr.vector_sink_b(); + self.tb.connect(src,unpack,pack,snk); + self.tb.run() + self.assertEqual(list(expected_results), list(snk.data())); + +if __name__ == '__main__': + gr_unittest.run(test_pack, "test_pack.xml") + diff --git a/gr-blocks/python/qa_pdu.py b/gr-blocks/python/qa_pdu.py new file mode 100755 index 0000000000..dc50b7c330 --- /dev/null +++ b/gr-blocks/python/qa_pdu.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import pmt +import time + +class test_pdu(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_000(self): + # Just run some data through and make sure it doesn't puke. + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + + src = blocks.pdu_to_tagged_stream(blocks.byte_t) + snk3 = blocks.tagged_stream_to_pdu(blocks.byte_t) + snk2 = gr.vector_sink_b() + snk = gr.tag_debug(1, "test") + snk.set_display(False) + + dbg = gr.message_debug() + + # Test that the right number of ports exist. + pi = snk3.message_ports_in() + po = snk3.message_ports_out() + self.assertEqual(pmt.pmt_length(pi), 0) + self.assertEqual(pmt.pmt_length(po), 1) + + time.sleep(0.1) + self.tb.connect(src, snk) + self.tb.connect(src, snk2) + self.tb.connect(src, snk3) + self.tb.msg_connect(snk3, "pdus", dbg, "store") + self.tb.start() + + # make our reference and message pmts + port = pmt.pmt_intern("pdus") + msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF)) + + # post the message + src.to_basic_block()._post(port, msg) # eww, what's that smell? + + while dbg.num_messages() < 1: + time.sleep(0.5) + self.tb.stop() + self.tb.wait() + + # Get the vector of data from the vector sink + result_data = snk2.data() + + # Get the vector of data from the message sink + # Convert the message PMT as a pair into its vector + result_msg = dbg.get_message(0) + msg_vec = pmt.pmt_cdr(result_msg) + #pmt.pmt_print(msg_vec) + + # Convert the PMT vector into a Python list + msg_data = [] + for i in xrange(16): + msg_data.append(pmt.pmt_u8vector_ref(msg_vec, i)) + + actual_data = 16*[0xFF,] + self.assertEqual(actual_data, list(result_data)) + self.assertEqual(actual_data, msg_data) + +if __name__ == '__main__': + gr_unittest.run(test_pdu, "test_pdu.xml") diff --git a/gr-blocks/python/qa_probe_signal.py b/gr-blocks/python/qa_probe_signal.py new file mode 100644 index 0000000000..a420df71e5 --- /dev/null +++ b/gr-blocks/python/qa_probe_signal.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# +# Copyright 2012-2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import time +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_probe_signal(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + value = 12.3 + repeats = 100 + src_data = [value] * repeats + + src = gr.vector_source_f(src_data) + dst = blocks.probe_signal_f() + + self.tb.connect(src, dst) + self.tb.run() + output = dst.level() + self.assertAlmostEqual(value, output, places=6) + + def test_002(self): + vector_length = 10 + repeats = 10 + value = [0.5+i for i in range(0, vector_length)] + src_data = value * repeats + + src = gr.vector_source_f(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_float, vector_length) + dst = blocks.probe_signal_vf(vector_length) + + self.tb.connect(src, s2v, dst) + self.tb.run() + output = dst.level() + self.assertEqual(len(output), vector_length) + self.assertAlmostEqual(value[3], output[3], places=6) + +if __name__ == '__main__': + gr_unittest.run(test_probe_signal, "test_probe_signal.xml") diff --git a/gr-blocks/python/qa_sample_and_hold.py b/gr-blocks/python/qa_sample_and_hold.py new file mode 100644 index 0000000000..59628090d1 --- /dev/null +++ b/gr-blocks/python/qa_sample_and_hold.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import time +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_sample_and_hold(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = 10*[0,1,2,3,4,5,6,7,8,9,8,7,6,5,4,3,2,1] + ctrl_data = 10*[1,0,0,0,1,1,1,1,1,1,0,0,0,0,0,0,0,0] + expected_result = 10*(0,0,0,0,4,5,6,7,8,9,9,9,9,9,9,9,9,9) + + src = gr.vector_source_f(src_data) + ctrl = gr.vector_source_b(ctrl_data) + op = blocks.sample_and_hold_ff() + dst = gr.vector_sink_f() + + self.tb.connect(src, (op,0)) + self.tb.connect(ctrl, (op,1)) + self.tb.connect(op, dst) + self.tb.run() + + result = dst.data() + self.assertFloatTuplesAlmostEqual(expected_result, result, places=6) + +if __name__ == '__main__': + gr_unittest.run(test_sample_and_hold, "test_sample_and_hold.xml") diff --git a/gr-blocks/python/qa_unpack_k_bits.py b/gr-blocks/python/qa_unpack_k_bits.py new file mode 100755 index 0000000000..e038d5a03a --- /dev/null +++ b/gr-blocks/python/qa_unpack_k_bits.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# +# Copyright 2006,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import random + +class test_unpack(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block () + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = (1,0,1,1,0,1,1,0) + expected_results = (1,0,1,1,0,1,1,0) + src = gr.vector_source_b(src_data,False) + op = blocks.unpack_k_bits_bb(1) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_results, dst.data()) + + def test_002(self): + src_data = ( 2, 3, 0, 1) + expected_results = (1,0,1,1,0,0,0,1) + src = gr.vector_source_b(src_data,False) + op = blocks.unpack_k_bits_bb(2) + dst = gr.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + self.assertEqual(expected_results, dst.data()) + +if __name__ == '__main__': + gr_unittest.run(test_unpack, "test_unpack.xml") + |