diff options
Diffstat (limited to 'gr-blocks/python/blocks')
27 files changed, 2455 insertions, 11 deletions
diff --git a/gr-blocks/python/blocks/parse_file_metadata.py b/gr-blocks/python/blocks/parse_file_metadata.py index a876f49b07..7d8d41d166 100644 --- a/gr-blocks/python/blocks/parse_file_metadata.py +++ b/gr-blocks/python/blocks/parse_file_metadata.py @@ -21,14 +21,9 @@ # import sys -from gnuradio import gr +from gnuradio import gr, blocks import pmt -try: - import blocks_swig as blocks -except ImportError: - from gnuradio import blocks - ''' sr Sample rate (samples/second) time Time as uint64(secs), double(fractional secs) diff --git a/gr-blocks/python/blocks/qa_affinity.py b/gr-blocks/python/blocks/qa_affinity.py new file mode 100644 index 0000000000..98b5c44862 --- /dev/null +++ b/gr-blocks/python/blocks/qa_affinity.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +class test_affinity(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_000(self): + # Just run some data through and make sure it doesn't puke. + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + + src = blocks.vector_source_f(src_data) + snk = blocks.vector_sink_f() + + src.set_processor_affinity([0,]) + self.tb.connect(src, snk) + self.tb.run() + + a = src.processor_affinity() + + self.assertEqual((0,), a) + +if __name__ == '__main__': + gr_unittest.run(test_affinity, "test_affinity.xml") diff --git a/gr-blocks/python/blocks/qa_block_gateway.py b/gr-blocks/python/blocks/qa_block_gateway.py new file mode 100644 index 0000000000..f62726adf3 --- /dev/null +++ b/gr-blocks/python/blocks/qa_block_gateway.py @@ -0,0 +1,257 @@ +# +# Copyright 2011-2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import numpy + +import pmt + +from gnuradio import gr, gr_unittest, blocks + +class add_2_f32_1_f32(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "add 2 f32", + in_sig = [numpy.float32, numpy.float32], + out_sig = [numpy.float32], + ) + + def work(self, input_items, output_items): + output_items[0][:] = input_items[0] + input_items[1] + return len(output_items[0]) + +class add_2_fc32_1_fc32(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "add 2 fc32", + in_sig = [numpy.complex64, numpy.complex64], + out_sig = [numpy.complex64], + ) + + def work(self, input_items, output_items): + output_items[0][:] = input_items[0] + input_items[1] + return len(output_items[0]) + +class convolve(gr.sync_block): + """ + A demonstration using block history to properly perform a convolution. + """ + def __init__(self): + gr.sync_block.__init__( + self, + name = "convolve", + in_sig = [numpy.float32], + out_sig = [numpy.float32] + ) + self._taps = [1, 0, 0, 0] + self.set_history(len(self._taps)) + + def work(self, input_items, output_items): + output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid') + return len(output_items[0]) + +class decim2x(gr.decim_block): + def __init__(self): + gr.decim_block.__init__( + self, + name = "decim2x", + in_sig = [numpy.float32], + out_sig = [numpy.float32], + decim = 2 + ) + + def work(self, input_items, output_items): + output_items[0][:] = input_items[0][::2] + return len(output_items[0]) + +class interp2x(gr.interp_block): + def __init__(self): + gr.interp_block.__init__( + self, + name = "interp2x", + in_sig = [numpy.float32], + out_sig = [numpy.float32], + interp = 2 + ) + + def work(self, input_items, output_items): + output_items[0][1::2] = input_items[0] + output_items[0][::2] = input_items[0] + return len(output_items[0]) + +class tag_source(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "tag source", + in_sig = None, + out_sig = [numpy.float32], + ) + + def work(self, input_items, output_items): + num_output_items = len(output_items[0]) + + #put code here to fill the output items... + + #make a new tag on the middle element every time work is called + count = self.nitems_written(0) + num_output_items/2 + key = pmt.string_to_symbol("example_key") + value = pmt.string_to_symbol("example_value") + self.add_item_tag(0, count, key, value) + + return num_output_items + +class tag_sink(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "tag sink", + in_sig = [numpy.float32], + out_sig = None, + ) + self.key = None + + def work(self, input_items, output_items): + num_input_items = len(input_items[0]) + + #put code here to process the input items... + + #print all the tags received in this work call + nread = self.nitems_read(0) + tags = self.get_tags_in_range(0, nread, nread+num_input_items) + for tag in tags: + #print tag.offset + #print pmt.symbol_to_string(tag.key) + #print pmt.symbol_to_string(tag.value) + self.key = pmt.symbol_to_string(tag.key) + + return num_input_items + +class fc32_to_f32_2(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "fc32_to_f32_2", + in_sig = [numpy.complex64], + out_sig = [(numpy.float32, 2)], + ) + + def work(self, input_items, output_items): + output_items[0][::,0] = numpy.real(input_items[0]) + output_items[0][::,1] = numpy.imag(input_items[0]) + return len(output_items[0]) + +class vector_to_stream(gr.interp_block): + def __init__(self, itemsize, nitems_per_block): + gr.interp_block.__init__( + self, + name = "vector_to_stream", + in_sig = [(itemsize, nitems_per_block)], + out_sig = [itemsize], + interp = nitems_per_block + ) + self.block_size = nitems_per_block + + def work(self, input_items, output_items): + n = 0 + for i in xrange(len(input_items[0])): + for j in xrange(self.block_size): + output_items[0][n] = input_items[0][i][j] + n += 1 + + return len(output_items[0]) + +class test_block_gateway(gr_unittest.TestCase): + + def test_add_f32(self): + tb = gr.top_block() + src0 = blocks.vector_source_f([1, 3, 5, 7, 9], False) + src1 = blocks.vector_source_f([0, 2, 4, 6, 8], False) + adder = add_2_f32_1_f32() + sink = blocks.vector_sink_f() + tb.connect((src0, 0), (adder, 0)) + tb.connect((src1, 0), (adder, 1)) + tb.connect(adder, sink) + tb.run() + self.assertEqual(sink.data(), (1, 5, 9, 13, 17)) + + def test_add_fc32(self): + tb = gr.top_block() + src0 = blocks.vector_source_c([1, 3j, 5, 7j, 9], False) + src1 = blocks.vector_source_c([0, 2j, 4, 6j, 8], False) + adder = add_2_fc32_1_fc32() + sink = blocks.vector_sink_c() + tb.connect((src0, 0), (adder, 0)) + tb.connect((src1, 0), (adder, 1)) + tb.connect(adder, sink) + tb.run() + self.assertEqual(sink.data(), (1, 5j, 9, 13j, 17)) + + def test_convolve(self): + tb = gr.top_block() + src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False) + cv = convolve() + sink = blocks.vector_sink_f() + tb.connect(src, cv, sink) + tb.run() + self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8)) + + def test_decim2x(self): + tb = gr.top_block() + src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False) + d2x = decim2x() + sink = blocks.vector_sink_f() + tb.connect(src, d2x, sink) + tb.run() + self.assertEqual(sink.data(), (1, 3, 5, 7)) + + def test_interp2x(self): + tb = gr.top_block() + src = blocks.vector_source_f([1, 3, 5, 7, 9], False) + i2x = interp2x() + sink = blocks.vector_sink_f() + tb.connect(src, i2x, sink) + tb.run() + self.assertEqual(sink.data(), (1, 1, 3, 3, 5, 5, 7, 7, 9, 9)) + + def test_tags(self): + src = tag_source() + sink = tag_sink() + head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through + tb = gr.top_block() + tb.connect(src, head, sink) + tb.run() + self.assertEqual(sink.key, "example_key") + + def test_fc32_to_f32_2(self): + tb = gr.top_block() + src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False) + convert = fc32_to_f32_2() + v2s = vector_to_stream(numpy.float32, 2) + sink = blocks.vector_sink_f() + tb.connect(src, convert, v2s, sink) + tb.run() + self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + +if __name__ == '__main__': + gr_unittest.run(test_block_gateway, "test_block_gateway.xml") + diff --git a/gr-blocks/python/blocks/qa_copy.py b/gr-blocks/python/blocks/qa_copy.py new file mode 100755 index 0000000000..20914b51e3 --- /dev/null +++ b/gr-blocks/python/blocks/qa_copy.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# +# Copyright 2009,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +class test_copy(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_copy(self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_result = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + src = blocks.vector_source_b(src_data) + op = blocks.copy(gr.sizeof_char) + dst = blocks.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + dst_data = dst.data() + self.assertEqual(expected_result, dst_data) + + def test_copy_drop (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_result = () + src = blocks.vector_source_b(src_data) + op = blocks.copy(gr.sizeof_char) + op.set_enabled(False) + dst = blocks.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + dst_data = dst.data() + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_copy, "test_copy.xml") diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding.py b/gr-blocks/python/blocks/qa_cpp_py_binding.py new file mode 100755 index 0000000000..c15633df82 --- /dev/null +++ b/gr-blocks/python/blocks/qa_cpp_py_binding.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +# +# This program tests mixed python and c++ ctrlport exports in a single app +# + +import Ice +import sys, time, random, numpy +from gnuradio import gr, gr_unittest, blocks + +from gnuradio.ctrlport import GNURadio +from gnuradio import ctrlport +import os + +def get1(): + return "success" + +def get2(): + return "failure" + +class inc_class: + def __init__(self): + self.val = 1 + def pp(self): + self.val = self.val+1 + return self.val + +get3 = inc_class() + +def get4(): + random.seed(0) + rv = random.random() + return rv + +def get5(): + numpy.random.seed(0) + samp_t = numpy.random.randn(24)+1j*numpy.random.randn(24); + samp_f = numpy.fft.fft(samp_t); + log_pow_f = 20*numpy.log10(numpy.abs(samp_f)) + rv = list(log_pow_f) + return rv; + +def get6(): + numpy.random.seed(0) + samp_t = numpy.random.randn(1024)+1j*numpy.random.randn(1024); + rv = list(samp_t) + return rv; + +class test_cpp_py_binding(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + os.environ['GR_CONF_CONTROLPORT_ON'] = 'True' + + def tearDown(self): + self.tb = None + + def test_001(self): + v1 = gr.RPC_get_string("pyland", "v1", "unit_1_string", + "Python Exported String", "", "", "", + gr.DISPNULL) + v1.activate(get1) + + v2 = gr.RPC_get_string("pyland", "v2", "unit_2_string", + "Python Exported String", "", "", "", + gr.DISPNULL) + v2.activate(get2) + + v3 = gr.RPC_get_int("pyland", "v3", "unit_3_int", + "Python Exported Int", 0, 100, 1, + gr.DISPNULL) + v3.activate(get3.pp) + + v4 = gr.RPC_get_double("pyland", "time", "unit_4_time_double", + "Python Exported Double", 0, 1000, 1, + gr.DISPNULL) + v4.activate(get4) + + v5 = gr.RPC_get_vector_float("pyland", "fvec", "unit_5_float_vector", + "Python Exported Float Vector", [], [], [], + gr.DISPTIME | gr.DISPOPTCPLX) + v5.activate(get5) + + v6 = gr.RPC_get_vector_gr_complex("pyland", "cvec", "unit_6_gr_complex_vector", + "Python Exported Complex Vector", [], [], [], + gr.DISPXY | gr.DISPOPTSCATTER) + v6.activate(get6) + + # print some variables locally + val = get1() + rval = v1.get() + self.assertEqual(val, rval) + + val = get2() + rval = v2.get() + self.assertEqual(val, rval) + + val = get3.pp() + rval = v3.get() + self.assertEqual(val+1, rval) + + val = get4() + rval = v4.get() + self.assertEqual(val, rval) + + val = get5() + rval = v5.get() + self.assertComplexTuplesAlmostEqual(val, rval, 5) + + val = get6() + rval = v6.get() + self.assertComplexTuplesAlmostEqual(val, rval, 5) + + def test_002(self): + data = range(1,9) + + self.src = blocks.vector_source_c(data) + self.p1 = blocks.ctrlport_probe_c("aaa","C++ exported variable") + self.p2 = blocks.ctrlport_probe_c("bbb","C++ exported variable") + probe_name = self.p2.alias() + + self.tb.connect(self.src, self.p1) + self.tb.connect(self.src, self.p2) + self.tb.start() + + # Probes return complex values as list of floats with re, im + # Imaginary parts of this data set are 0. + expected_result = [1, 0, 2, 0, 3, 0, 4, 0, + 5, 0, 6, 0, 7, 0, 8, 0] + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple Ice client from endpoint + ic = Ice.initialize(sys.argv) + base = ic.stringToProxy(ep) + radio = GNURadio.ControlPortPrx.checkedCast(base) + + # Get all exported knobs + ret = radio.get([probe_name + "::bbb"]) + for name in ret.keys(): + result = ret[name].value + self.assertEqual(result, expected_result) + + self.tb.stop() + +if __name__ == '__main__': + gr_unittest.run(test_cpp_py_binding, "test_cpp_py_binding.xml") + diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py b/gr-blocks/python/blocks/qa_cpp_py_binding_set.py new file mode 100755 index 0000000000..2e4198c04d --- /dev/null +++ b/gr-blocks/python/blocks/qa_cpp_py_binding_set.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +# +# This program tests mixed python and c++ GRCP sets in a single app +# + +import Ice +import sys, time, random, numpy +from gnuradio import gr, gr_unittest, blocks + +from gnuradio.ctrlport import GNURadio +from gnuradio import ctrlport +import os + +class inc_class: + def __init__(self,val): + self.val = val; + + def _get(self): + #print "returning get (val = %s)"%(str(self.val)); + return self.val; + + def _set(self,val): + #print "updating val to %s"%(str(val)); + self.val = val; + return; + +getset1 = inc_class(10); +getset2 = inc_class(100.0); +getset3 = inc_class("test"); + +class test_cpp_py_binding_set(gr_unittest.TestCase): + def setUp(self): + self.tb = gr.top_block() + os.environ['GR_CONF_CONTROLPORT_ON'] = 'True' + + def tearDown(self): + self.tb = None + + def test_001(self): + + g1 = gr.RPC_get_int("pyland", "v1", "unit_1_int", + "Python Exported Int", 0, 100, 10, + gr.DISPNULL) + g1.activate(getset1._get) + s1 = gr.RPC_get_int("pyland", "v1", "unit_1_int", + "Python Exported Int", 0, 100, 10, + gr.DISPNULL) + s1.activate(getset1._set) + time.sleep(0.01) + + # test int variables + getset1._set(21) + val = getset1._get() + rval = g1.get() + self.assertEqual(val, rval) + + g2 = gr.RPC_get_float("pyland", "v2", "unit_2_float", + "Python Exported Float", -100, 1000.0, 100.0, + gr.DISPNULL) + g2.activate(getset2._get) + s2 = gr.RPC_get_float("pyland", "v2", "unit_2_float", + "Python Exported Float", -100, 1000.0, 100.0, + gr.DISPNULL) + s2.activate(getset2._set) + time.sleep(0.01) + + # test float variables + getset2._set(123.456) + val = getset2._get() + rval = g2.get() + self.assertAlmostEqual(val, rval, 4) + + g3 = gr.RPC_get_string("pyland", "v3", "unit_3_string", + "Python Exported String", "", "", "", + gr.DISPNULL) + g3.activate(getset3._get) + s3 = gr.RPC_get_string("pyland", "v3", "unit_3_string", + "Python Exported String", "", "", "", + gr.DISPNULL) + s3.activate(getset3._set) + time.sleep(0.01) + + # test string variables + getset3._set("third test") + val = getset3._get() + rval = g3.get() + self.assertEqual(val, rval) + + + def test_002(self): + data = range(1, 10) + + self.src = blocks.vector_source_c(data, True) + self.p = blocks.nop(gr.sizeof_gr_complex) + self.p.set_ctrlport_test(0); + probe_info = self.p.alias() + + self.tb.connect(self.src, self.p) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple Ice client from endpoint + ic = Ice.initialize(sys.argv) + base = ic.stringToProxy(ep) + radio = GNURadio.ControlPortPrx.checkedCast(base) + + self.tb.start() + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get all exported knobs + key_name_test = probe_info+"::test" + ret = radio.get([key_name_test,]) + + ret[key_name_test].value = 10 + radio.set({key_name_test: ret[key_name_test]}) + + ret = radio.get([]) + result_test = ret[key_name_test].value + self.assertEqual(result_test, 10) + + self.tb.stop() + self.tb.wait() + +if __name__ == '__main__': + gr_unittest.run(test_cpp_py_binding_set, "test_cpp_py_binding_set.xml") + diff --git a/gr-blocks/python/blocks/qa_endian_swap.py b/gr-blocks/python/blocks/qa_endian_swap.py new file mode 100644 index 0000000000..ea6e10fb8f --- /dev/null +++ b/gr-blocks/python/blocks/qa_endian_swap.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# +# Copyright 2011-2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import ctypes + +class test_endian_swap(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = [1,2,3,4] + expected_result = [256, 512, 768, 1024]; + + src = blocks.vector_source_s(src_data) + op = blocks.endian_swap(2) + dst = blocks.vector_sink_s() + + self.tb.connect(src, op, dst) + self.tb.run() + result_data = list(dst.data()) + + self.assertEqual(expected_result, result_data) + + def test_002(self): + + src_data = [1,2,3,4] + expected_result = [16777216, 33554432, 50331648, 67108864]; + + src = blocks.vector_source_i(src_data) + op = blocks.endian_swap(4) + dst = blocks.vector_sink_i() + + self.tb.connect(src, op, dst) + self.tb.run() + result_data = list(dst.data()) + + self.assertEqual(expected_result, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_endian_swap, "test_endian_swap.xml") + diff --git a/gr-blocks/python/blocks/qa_file_source_sink.py b/gr-blocks/python/blocks/qa_file_source_sink.py new file mode 100644 index 0000000000..f35912b229 --- /dev/null +++ b/gr-blocks/python/blocks/qa_file_source_sink.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import os + +class test_file_source_sink(gr_unittest.TestCase): + + def setUp (self): + os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001(self): + src_data = range(1000) + expected_result = range(1000) + + filename = "tmp.32f" + src = blocks.vector_source_f(src_data) + snk = blocks.file_sink(gr.sizeof_float, filename) + snk.set_unbuffered(True) + + src2 = blocks.file_source(gr.sizeof_float, filename) + snk2 = blocks.vector_sink_f() + + self.tb.connect(src, snk) + self.tb.run() + + self.tb.disconnect(src, snk) + self.tb.connect(src2, snk2) + self.tb.run() + + os.remove(filename) + + result_data = snk2.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data) + + def test_descriptor_001(self): + src_data = range(1000) + expected_result = range(1000) + + filename = "tmp.32f" + fhandle0 = open(filename, "wb") + fd0 = fhandle0.fileno() + + src = blocks.vector_source_f(src_data) + snk = blocks.file_descriptor_sink(gr.sizeof_float, fd0) + + self.tb.connect(src, snk) + self.tb.run() + os.fsync(fd0) + fhandle0.close() + + fhandle1 = open(filename, "rb") + fd1 = fhandle1.fileno() + src2 = blocks.file_descriptor_source(gr.sizeof_float, fd1, False) + snk2 = blocks.vector_sink_f() + + self.tb.disconnect(src, snk) + self.tb.connect(src2, snk2) + self.tb.run() + os.fsync(fd1) + fhandle1.close() + + os.remove(filename) + + result_data = snk2.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_file_source_sink, "test_file_source_sink.xml") + diff --git a/gr-blocks/python/blocks/qa_head.py b/gr-blocks/python/blocks/qa_head.py new file mode 100755 index 0000000000..9b5bca221b --- /dev/null +++ b/gr-blocks/python/blocks/qa_head.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# +# Copyright 2004,2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +class test_head(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_head(self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_result = (1, 2, 3, 4) + src1 = blocks.vector_source_i(src_data) + op = blocks.head(gr.sizeof_int, 4) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op) + self.tb.connect(op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_head, "test_head.xml") diff --git a/gr-blocks/python/blocks/qa_hier_block2.py b/gr-blocks/python/blocks/qa_hier_block2.py new file mode 100755 index 0000000000..51f420e8eb --- /dev/null +++ b/gr-blocks/python/blocks/qa_hier_block2.py @@ -0,0 +1,397 @@ +#!/usr/bin/env python + +from gnuradio import gr, gr_unittest, blocks +import numpy + +class add_ff(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "add_ff", + in_sig = [numpy.float32, numpy.float32], + out_sig = [numpy.float32], + ) + + def work(self, input_items, output_items): + output_items[0][:] = input_items[0] + input_items[1] + return len(output_items[0]) + +class multiply_const_ff(gr.sync_block): + def __init__(self, k): + gr.sync_block.__init__( + self, + name = "multiply_ff", + in_sig = [numpy.float32], + out_sig = [numpy.float32], + ) + self.k = k + + def work(self, input_items, output_items): + output_items[0][:] = map(lambda x: self.k*x, input_items[0]) + return len(output_items[0]) + +class test_hier_block2(gr_unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_001_make(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + self.assertEqual("test_block", hblock.name()) + self.assertEqual(1, hblock.input_signature().max_streams()) + self.assertEqual(1, hblock.output_signature().min_streams()) + self.assertEqual(1, hblock.output_signature().max_streams()) + self.assertEqual(gr.sizeof_int, hblock.output_signature().sizeof_stream_item(0)) + + def test_002_connect_input(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(hblock, nop1) + + def test_004_connect_output(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(nop1, hblock) + + def test_005_connect_output_in_use(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + nop2 = blocks.nop(gr.sizeof_int) + hblock.connect(nop1, hblock) + self.assertRaises(ValueError, + lambda: hblock.connect(nop2, hblock)) + + def test_006_connect_invalid_src_port_neg(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + self.assertRaises(ValueError, + lambda: hblock.connect((hblock, -1), nop1)) + + def test_005_connect_invalid_src_port_exceeds(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + self.assertRaises(ValueError, + lambda: hblock.connect((hblock, 1), nop1)) + + def test_007_connect_invalid_dst_port_neg(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + nop2 = blocks.nop(gr.sizeof_int) + self.assertRaises(ValueError, + lambda: hblock.connect(nop1, (nop2, -1))) + + def test_008_connect_invalid_dst_port_exceeds(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.null_sink(gr.sizeof_int) + nop2 = blocks.null_sink(gr.sizeof_int) + self.assertRaises(ValueError, + lambda: hblock.connect(nop1, (nop2, 1))) + + def test_009_check_topology(self): + hblock = gr.top_block("test_block") + hblock.check_topology(0, 0) + + def test_010_run(self): + expected = (1.0, 2.0, 3.0, 4.0) + hblock = gr.top_block("test_block") + src = blocks.vector_source_f(expected, False) + sink1 = blocks.vector_sink_f() + sink2 = blocks.vector_sink_f() + hblock.connect(src, sink1) + hblock.connect(src, sink2) + hblock.run() + actual1 = sink1.data() + actual2 = sink2.data() + self.assertEquals(expected, actual1) + self.assertEquals(expected, actual2) + + def test_012_disconnect_input(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(hblock, nop1) + hblock.disconnect(hblock, nop1) + + def test_013_disconnect_input_not_connected(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + nop2 = blocks.nop(gr.sizeof_int) + hblock.connect(hblock, nop1) + self.assertRaises(ValueError, + lambda: hblock.disconnect(hblock, nop2)) + + def test_014_disconnect_input_neg(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(hblock, nop1) + self.assertRaises(ValueError, + lambda: hblock.disconnect((hblock, -1), nop1)) + + def test_015_disconnect_input_exceeds(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(hblock, nop1) + self.assertRaises(ValueError, + lambda: hblock.disconnect((hblock, 1), nop1)) + + def test_016_disconnect_output(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(nop1, hblock) + hblock.disconnect(nop1, hblock) + + def test_017_disconnect_output_not_connected(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + nop2 = blocks.nop(gr.sizeof_int) + hblock.connect(nop1, hblock) + self.assertRaises(ValueError, + lambda: hblock.disconnect(nop2, hblock)) + + def test_018_disconnect_output_neg(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(hblock, nop1) + self.assertRaises(ValueError, + lambda: hblock.disconnect(nop1, (hblock, -1))) + + def test_019_disconnect_output_exceeds(self): + hblock = gr.hier_block2("test_block", + gr.io_signature(1,1,gr.sizeof_int), + gr.io_signature(1,1,gr.sizeof_int)) + nop1 = blocks.nop(gr.sizeof_int) + hblock.connect(nop1, hblock) + self.assertRaises(ValueError, + lambda: hblock.disconnect(nop1, (hblock, 1))) + + def test_020_run(self): + hblock = gr.top_block("test_block") + data = (1.0, 2.0, 3.0, 4.0) + src = blocks.vector_source_f(data, False) + dst = blocks.vector_sink_f() + hblock.connect(src, dst) + hblock.run() + self.assertEquals(data, dst.data()) + + def test_021_connect_single(self): + hblock = gr.top_block("test_block") + blk = gr.hier_block2("block", + gr.io_signature(0, 0, 0), + gr.io_signature(0, 0, 0)) + hblock.connect(blk) + + def test_022_connect_single_with_ports(self): + hblock = gr.top_block("test_block") + blk = gr.hier_block2("block", + gr.io_signature(1, 1, 1), + gr.io_signature(1, 1, 1)) + self.assertRaises(ValueError, + lambda: hblock.connect(blk)) + + def test_023_connect_single_twice(self): + hblock = gr.top_block("test_block") + blk = gr.hier_block2("block", + gr.io_signature(0, 0, 0), + gr.io_signature(0, 0, 0)) + hblock.connect(blk) + self.assertRaises(ValueError, + lambda: hblock.connect(blk)) + + def test_024_disconnect_single(self): + hblock = gr.top_block("test_block") + blk = gr.hier_block2("block", + gr.io_signature(0, 0, 0), + gr.io_signature(0, 0, 0)) + hblock.connect(blk) + hblock.disconnect(blk) + + def test_025_disconnect_single_not_connected(self): + hblock = gr.top_block("test_block") + blk = gr.hier_block2("block", + gr.io_signature(0, 0, 0), + gr.io_signature(0, 0, 0)) + self.assertRaises(ValueError, + lambda: hblock.disconnect(blk)) + + def test_026_run_single(self): + expected_data = (1.0,) + tb = gr.top_block("top_block") + hb = gr.hier_block2("block", + gr.io_signature(0, 0, 0), + gr.io_signature(0, 0, 0)) + src = blocks.vector_source_f(expected_data) + dst = blocks.vector_sink_f() + hb.connect(src, dst) + tb.connect(hb) + tb.run() + self.assertEquals(expected_data, dst.data()) + + def test_027a_internally_unconnected_input(self): + tb = gr.top_block() + hb = gr.hier_block2("block", + gr.io_signature(1, 1, 1), + gr.io_signature(1, 1, 1)) + hsrc = blocks.vector_source_b([1,]) + hb.connect(hsrc, hb) # wire output internally + src = blocks.vector_source_b([1, ]) + dst = blocks.vector_sink_b() + tb.connect(src, hb, dst) # hb's input is not connected internally + self.assertRaises(RuntimeError, + lambda: tb.run()) + + def test_027b_internally_unconnected_output(self): + tb = gr.top_block() + + hb = gr.hier_block2("block", + gr.io_signature(1, 1, 1), + gr.io_signature(1, 1, 1)) + hdst = blocks.vector_sink_b() + hb.connect(hb, hdst) # wire input internally + src = blocks.vector_source_b([1, ]) + dst = blocks.vector_sink_b() + tb.connect(src, hb, dst) # hb's output is not connected internally + self.assertRaises(RuntimeError, + lambda: tb.run()) + + def test_027c_fully_unconnected_output(self): + tb = gr.top_block() + hb = gr.hier_block2("block", + gr.io_signature(1, 1, 1), + gr.io_signature(1, 1, 1)) + hsrc = blocks.vector_sink_b() + hb.connect(hb, hsrc) # wire input internally + src = blocks.vector_source_b([1, ]) + dst = blocks.vector_sink_b() + tb.connect(src, hb) # hb's output is not connected internally or externally + self.assertRaises(RuntimeError, + lambda: tb.run()) + + def test_027d_fully_unconnected_input(self): + tb = gr.top_block() + hb = gr.hier_block2("block", + gr.io_signature(1, 1, 1), + gr.io_signature(1, 1, 1)) + hdst = blocks.vector_source_b([1,]) + hb.connect(hdst, hb) # wire output internally + dst = blocks.vector_sink_b() + tb.connect(hb, dst) # hb's input is not connected internally or externally + self.assertRaises(RuntimeError, + lambda: tb.run()) + + def test_028_singleton_reconfigure(self): + tb = gr.top_block() + hb = gr.hier_block2("block", + gr.io_signature(0, 0, 0), gr.io_signature(0, 0, 0)) + src = blocks.vector_source_b([1, ]) + dst = blocks.vector_sink_b() + hb.connect(src, dst) + tb.connect(hb) # Singleton connect + tb.lock() + tb.disconnect_all() + tb.connect(src, dst) + tb.unlock() + + def test_029_singleton_disconnect(self): + tb = gr.top_block() + src = blocks.vector_source_b([1, ]) + dst = blocks.vector_sink_b() + tb.connect(src, dst) + tb.disconnect(src) # Singleton disconnect + tb.connect(src, dst) + tb.run() + self.assertEquals(dst.data(), (1,)) + + def test_030_nested_input(self): + tb = gr.top_block() + src = blocks.vector_source_b([1,]) + hb1 = gr.hier_block2("hb1", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(0, 0, 0)) + hb2 = gr.hier_block2("hb2", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(0, 0, 0)) + dst = blocks.vector_sink_b() + tb.connect(src, hb1) + hb1.connect(hb1, hb2) + hb2.connect(hb2, blocks.copy(gr.sizeof_char), dst) + tb.run() + self.assertEquals(dst.data(), (1,)) + + def test_031_multiple_internal_inputs(self): + tb = gr.top_block() + src = blocks.vector_source_f([1.0,]) + hb = gr.hier_block2("hb", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_float)) + m1 = multiply_const_ff(1.0) + m2 = multiply_const_ff(2.0) + add = add_ff() + hb.connect(hb, m1) # m1 is connected to hb external input #0 + hb.connect(hb, m2) # m2 is also connected to hb external input #0 + hb.connect(m1, (add, 0)) + hb.connect(m2, (add, 1)) + hb.connect(add, hb) # add is connected to hb external output #0 + dst = blocks.vector_sink_f() + tb.connect(src, hb, dst) + tb.run() + self.assertEquals(dst.data(), (3.0,)) + + def test_032_nested_multiple_internal_inputs(self): + tb = gr.top_block() + src = blocks.vector_source_f([1.0,]) + hb = gr.hier_block2("hb", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_float)) + hb2 = gr.hier_block2("hb", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_float)) + + m1 = multiply_const_ff(1.0) + m2 = multiply_const_ff(2.0) + add = add_ff() + hb2.connect(hb2, m1) # m1 is connected to hb2 external input #0 + hb2.connect(hb2, m2) # m2 is also connected to hb2 external input #0 + hb2.connect(m1, (add, 0)) + hb2.connect(m2, (add, 1)) + hb2.connect(add, hb2) # add is connected to hb2 external output #0 + hb.connect(hb, hb2, hb) # hb as hb2 as nested internal block + dst = blocks.vector_sink_f() + tb.connect(src, hb, dst) + tb.run() + self.assertEquals(dst.data(), (3.0,)) + + +if __name__ == "__main__": + gr_unittest.run(test_hier_block2, "test_hier_block2.xml") diff --git a/gr-blocks/python/blocks/qa_message_tags.py b/gr-blocks/python/blocks/qa_message_tags.py new file mode 100644 index 0000000000..0d4f77bf9f --- /dev/null +++ b/gr-blocks/python/blocks/qa_message_tags.py @@ -0,0 +1,27 @@ +import time + +from gnuradio import gr, gr_unittest, blocks + +class test_message_tags (gr_unittest.TestCase): + + def test_1 (self): + data = ('hello', 'you', 'there') + tx_msgq = gr.msg_queue() + rx_msgq = gr.msg_queue() + for d in data: + tx_msgq.insert_tail(gr.message_from_string(d)) + tx_msgq.insert_tail(gr.message(1)) # send EOF + tb = gr.top_block() + src = blocks.message_source(gr.sizeof_char, tx_msgq, "packet_length") + snk = blocks.message_sink(gr.sizeof_char, rx_msgq, False, "packet_length") + tb.connect(src, snk) + tb.start() + time.sleep(1) + tb.stop() + for d in data: + msg = rx_msgq.delete_head() + contents = msg.to_string() + self.assertEqual(d, contents) + +if __name__ == '__main__': + gr_unittest.run(test_message_tags, "test_message_tags.xml") diff --git a/gr-blocks/python/blocks/qa_null_sink_source.py b/gr-blocks/python/blocks/qa_null_sink_source.py new file mode 100644 index 0000000000..03d6ab14c1 --- /dev/null +++ b/gr-blocks/python/blocks/qa_null_sink_source.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import math + +class test_null_sink_source(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + # Just running some data through null source/sink + src = blocks.null_source(gr.sizeof_float) + hed = blocks.head(gr.sizeof_float, 100) + dst = blocks.null_sink(gr.sizeof_float) + + self.tb.connect(src, hed, dst) + self.tb.run() + +if __name__ == '__main__': + gr_unittest.run(test_null_sink_source, "test_null_sink_source.xml") + diff --git a/gr-blocks/python/blocks/qa_plateau_detector_fb.py b/gr-blocks/python/blocks/qa_plateau_detector_fb.py new file mode 100755 index 0000000000..003c4ea74d --- /dev/null +++ b/gr-blocks/python/blocks/qa_plateau_detector_fb.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# Copyright 2012-2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +class qa_plateau_detector_fb (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + # | Spur spike 1 | Plateau | Spur spike 2 + test_signal = (0, 1, .2, .4, .6, .8, 1, 1, 1, 1, 1, .8, .6, .4, 1, 0) + expected_sig = (0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0) + # | Center of Plateau + sink = blocks.vector_sink_b() + self.tb.connect(blocks.vector_source_f(test_signal), blocks.plateau_detector_fb(5), sink) + self.tb.run () + self.assertEqual(expected_sig, sink.data()) + + +if __name__ == '__main__': + gr_unittest.run(qa_plateau_detector_fb, "qa_plateau_detector_fb.xml") diff --git a/gr-blocks/python/blocks/qa_python_message_passing.py b/gr-blocks/python/blocks/qa_python_message_passing.py new file mode 100644 index 0000000000..6eade7556b --- /dev/null +++ b/gr-blocks/python/blocks/qa_python_message_passing.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import pmt +import numpy +import time + +# Simple block to generate messages +class message_generator(gr.sync_block): + def __init__(self, msg_list, msg_interval): + gr.sync_block.__init__( + self, + name = "message generator", + in_sig = [numpy.float32], + out_sig = None + ) + self.msg_list = msg_list + self.msg_interval = msg_interval + self.msg_ctr = 0 + self.message_port_register_out(pmt.intern('out_port')) + + + def work(self, input_items, output_items): + inLen = len(input_items[0]) + while self.msg_ctr < len(self.msg_list) and \ + (self.msg_ctr * self.msg_interval) < \ + (self.nitems_read(0) + inLen): + self.message_port_pub(pmt.intern('out_port'), + self.msg_list[self.msg_ctr]) + self.msg_ctr += 1 + return inLen + +# Simple block to consume messages +class message_consumer(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "message consumer", + in_sig = None, + out_sig = None + ) + self.msg_list = [] + self.message_port_register_in(pmt.intern('in_port')) + self.set_msg_handler(pmt.intern('in_port'), + self.handle_msg) + + def handle_msg(self, msg): + # Create a new PMT from long value and put in list + self.msg_list.append(pmt.from_long(pmt.to_long(msg))) + +class test_python_message_passing(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_000(self): + num_msgs = 10 + msg_interval = 1000 + msg_list = [] + for i in range(num_msgs): + msg_list.append(pmt.from_long(i)) + + # Create vector source with dummy data to trigger messages + src_data = [] + for i in range(num_msgs*msg_interval): + src_data.append(float(i)) + src = blocks.vector_source_f(src_data, False) + msg_gen = message_generator(msg_list, msg_interval) + msg_cons = message_consumer() + + # Connect vector source to message gen + self.tb.connect(src, msg_gen) + + # Connect message generator to message consumer + self.tb.msg_connect(msg_gen, 'out_port', msg_cons, 'in_port') + + # Verify that the messgae port query functions work + self.assertEqual(pmt.symbol_to_string(pmt.vector_ref( + msg_gen.message_ports_out(), 0)), 'out_port') + self.assertEqual(pmt.symbol_to_string(pmt.vector_ref( + msg_cons.message_ports_in(), 0)), 'in_port') + + # Run to verify message passing + self.tb.start() + + # Wait for all messages to be sent + while msg_gen.msg_ctr < num_msgs: + time.sleep(0.5) + self.tb.stop() + self.tb.wait() + + # Verify that the message consumer got all the messages + self.assertEqual(num_msgs, len(msg_cons.msg_list)) + for i in range(num_msgs): + self.assertTrue(pmt.equal(msg_list[i], msg_cons.msg_list[i])) + +if __name__ == '__main__': + gr_unittest.run(test_python_message_passing, + 'test_python_message_passing.xml') diff --git a/gr-blocks/python/blocks/qa_repack_bits_bb.py b/gr-blocks/python/blocks/qa_repack_bits_bb.py new file mode 100755 index 0000000000..d9bbfe4fac --- /dev/null +++ b/gr-blocks/python/blocks/qa_repack_bits_bb.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import random +from gnuradio import gr, gr_unittest, blocks +import pmt + +class qa_repack_bits_bb (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_simple (self): + """ Very simple test, 2 bits -> 1 """ + src_data = (0b11, 0b01, 0b10) + expected_data = (0b1, 0b1, 0b1, 0b0, 0b0, 0b1) + k = 2 + l = 1 + src = blocks.vector_source_b(src_data, False, 1) + repack = blocks.repack_bits_bb(k, l) + sink = blocks.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + + def test_002_three (self): + """ 8 -> 3 """ + src_data = (0b11111101, 0b11111111, 0b11111111) + expected_data = (0b101,) + (0b111,) * 7 + k = 8 + l = 3 + src = blocks.vector_source_b(src_data, False, 1) + repack = blocks.repack_bits_bb(k, l) + sink = blocks.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + + def test_003_lots_of_bytes (self): + """ Lots and lots of bytes, multiple packer stages """ + src_data = tuple([random.randint(0, 255) for x in range(3*5*7*8 * 10)]) + src = blocks.vector_source_b(src_data, False, 1) + repack1 = blocks.repack_bits_bb(8, 3) + repack2 = blocks.repack_bits_bb(3, 5) + repack3 = blocks.repack_bits_bb(5, 7) + repack4 = blocks.repack_bits_bb(7, 8) + sink = blocks.vector_sink_b() + self.tb.connect(src, repack1, repack2, repack3, repack4, sink) + self.tb.run () + self.assertEqual(sink.data(), src_data) + + def test_004_three_with_tags (self): + """ 8 -> 3 """ + src_data = (0b11111101, 0b11111111) + expected_data = (0b101,) + (0b111,) * 4 + (0b001,) + k = 8 + l = 3 + tag_name = "len" + tag = gr.tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(src_data)) + src = blocks.vector_source_b(src_data, False, 1, (tag,)) + repack = blocks.repack_bits_bb(k, l, tag_name) + sink = blocks.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + try: + out_tag = sink.tags()[0] + except: + self.assertFail() + self.assertEqual(out_tag.offset, 0) + self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name) + self.assertEqual(pmt.to_long(out_tag.value), len(expected_data)) + + def test_005_three_with_tags_trailing (self): + """ 3 -> 8, trailing bits """ + src_data = (0b101,) + (0b111,) * 4 + (0b001,) + expected_data = (0b11111101, 0b11111111) + k = 3 + l = 8 + tag_name = "len" + tag = gr.tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(src_data)) + src = blocks.vector_source_b(src_data, False, 1, (tag,)) + repack = blocks.repack_bits_bb(k, l, tag_name, True) + sink = blocks.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + try: + out_tag = sink.tags()[0] + except: + self.assertFail() + self.assertEqual(out_tag.offset, 0) + self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name) + self.assertEqual(pmt.to_long(out_tag.value), len(expected_data)) + +if __name__ == '__main__': + gr_unittest.run(qa_repack_bits_bb, "qa_repack_bits_bb.xml") + diff --git a/gr-blocks/python/blocks/qa_skiphead.py b/gr-blocks/python/blocks/qa_skiphead.py new file mode 100755 index 0000000000..a9b6df40cf --- /dev/null +++ b/gr-blocks/python/blocks/qa_skiphead.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +class test_skiphead(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + self.src_data = [int(x) for x in range(65536)] + + def tearDown(self): + self.tb = None + + def test_skip_0(self): + skip_cnt = 0 + expected_result = tuple(self.src_data[skip_cnt:]) + src1 = blocks.vector_source_i(self.src_data) + op = blocks.skiphead(gr.sizeof_int, skip_cnt) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + + def test_skip_1(self): + skip_cnt = 1 + expected_result = tuple(self.src_data[skip_cnt:]) + src1 = blocks.vector_source_i(self.src_data) + op = blocks.skiphead(gr.sizeof_int, skip_cnt) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + + def test_skip_1023(self): + skip_cnt = 1023 + expected_result = tuple(self.src_data[skip_cnt:]) + src1 = blocks.vector_source_i(self.src_data) + op = blocks.skiphead(gr.sizeof_int, skip_cnt) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + + def test_skip_6339(self): + skip_cnt = 6339 + expected_result = tuple(self.src_data[skip_cnt:]) + src1 = blocks.vector_source_i(self.src_data) + op = blocks.skiphead(gr.sizeof_int, skip_cnt) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + + def test_skip_12678(self): + skip_cnt = 12678 + expected_result = tuple(self.src_data[skip_cnt:]) + src1 = blocks.vector_source_i(self.src_data) + op = blocks.skiphead(gr.sizeof_int, skip_cnt) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + + def test_skip_all(self): + skip_cnt = len(self.src_data) + expected_result = tuple(self.src_data[skip_cnt:]) + src1 = blocks.vector_source_i(self.src_data) + op = blocks.skiphead(gr.sizeof_int, skip_cnt) + dst1 = blocks.vector_sink_i() + self.tb.connect(src1, op, dst1) + self.tb.run() + dst_data = dst1.data() + self.assertEqual(expected_result, dst_data) + + +if __name__ == '__main__': + gr_unittest.run(test_skiphead, "test_skiphead.xml") diff --git a/gr-blocks/python/blocks/qa_tag_file_sink.py b/gr-blocks/python/blocks/qa_tag_file_sink.py new file mode 100644 index 0000000000..250ad1addf --- /dev/null +++ b/gr-blocks/python/blocks/qa_tag_file_sink.py @@ -0,0 +1,71 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import os, struct + +class test_tag_file_sink(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = ( 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + trg_data = (-1, -1, 1, 1, -1, -1, 1, 1, -1, -1) + src = blocks.vector_source_i(src_data) + trg = blocks.vector_source_s(trg_data) + op = blocks.burst_tagger(gr.sizeof_int) + snk = blocks.tagged_file_sink(gr.sizeof_int, 1) + self.tb.connect(src, (op,0)) + self.tb.connect(trg, (op,1)) + self.tb.connect(op, snk) + self.tb.run() + + # Tagged file sink gets 2 burst tags at index 2 and index 5. + # Creates two new files, each with two integers in them from + # src_data at these indexes (3,4) and (7,8). + file0 = "file{0}_0_2.00000000.dat".format(snk.unique_id()) + file1 = "file{0}_1_6.00000000.dat".format(snk.unique_id()) + + # Open the files and read in the data, then remove the files + # to clean up the directory. + outfile0 = file(file0, 'rb') + outfile1 = file(file1, 'rb') + data0 = outfile0.read(8) + data1 = outfile1.read(8) + outfile0.close() + outfile1.close() + os.remove(file0) + os.remove(file1) + + # Convert the 8 bytes from the files into a tuple of 2 ints. + idata0 = struct.unpack('ii', data0) + idata1 = struct.unpack('ii', data1) + + self.assertEqual(idata0, (3, 4)) + self.assertEqual(idata1, (7, 8)) + +if __name__ == '__main__': + gr_unittest.run(test_tag_file_sink, "test_tag_file_sink.xml") diff --git a/gr-blocks/python/blocks/qa_tag_gate.py b/gr-blocks/python/blocks/qa_tag_gate.py new file mode 100755 index 0000000000..74fe5a7bcd --- /dev/null +++ b/gr-blocks/python/blocks/qa_tag_gate.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python +# +# Copyright 2013 <+YOU OR YOUR COMPANY+>. +# +# This is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import pmt + + +class qa_tag_gate (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + tag = gr.tag_t() + tag.key = pmt.string_to_symbol('key') + tag.value = pmt.from_long(42) + tag.offset = 0 + src = blocks.vector_source_f(range(20), False, 1, (tag,)) + gate = blocks.tag_gate(gr.sizeof_float, False) + sink = blocks.vector_sink_f() + self.tb.run () + self.assertEqual(len(sink.tags()), 0) + +if __name__ == '__main__': + gr_unittest.run(qa_tag_gate, "qa_tag_gate.xml") + diff --git a/gr-blocks/python/blocks/qa_tagged_stream_mux.py b/gr-blocks/python/blocks/qa_tagged_stream_mux.py new file mode 100755 index 0000000000..675d685aaa --- /dev/null +++ b/gr-blocks/python/blocks/qa_tagged_stream_mux.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import pmt +import numpy + +def make_len_tags(tupl, key): + tags = [] + tag = gr.tag_t() + tag.key = pmt.string_to_symbol(key) + n_read = 0 + for element in tupl: + tag.offset = n_read + n_read += len(element) + tag.value = pmt.to_pmt(len(element)) + tags.append(tag) + return tags + +def make_len_tag(offset, key, value): + tag = gr.tag_t() + tag.offset = offset + tag.key = pmt.string_to_symbol(key) + tag.value = pmt.to_pmt(value) + return tag + + +class qa_tagged_stream_mux (gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_1(self): + datas = ( + 0, 1, 2, 5, 6, 10, 14, 15, 16, + 3, 4, 7, 8, 9, 11, 12, 13, 17 + ) + expected = tuple(range(18)) + + tagname = "packet_length" + len_tags_0 = ( + make_len_tag(0, tagname, 3), + make_len_tag(3, tagname, 2), + make_len_tag(5, tagname, 1), + make_len_tag(6, tagname, 3) + ) + len_tags_1 = ( + make_len_tag(0, tagname, 2), + make_len_tag(2, tagname, 3), + make_len_tag(5, tagname, 3), + make_len_tag(8, tagname, 1) + ) + test_tag_0 = gr.tag_t() + test_tag_0.key = pmt.string_to_symbol('spam') + test_tag_0.offset = 4 # On the second '1' + test_tag_0.value = pmt.to_pmt(42) + test_tag_1 = gr.tag_t() + test_tag_1.key = pmt.string_to_symbol('eggs') + test_tag_1.offset = 3 # On the first '3' of the 2nd stream + test_tag_1.value = pmt.to_pmt(23) + + src0 = blocks.vector_source_b(datas[0:9], False, 1, len_tags_0 + (test_tag_0,)) + src1 = blocks.vector_source_b(datas[9:], False, 1, len_tags_1 + (test_tag_1,)) + tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, tagname) + snk = blocks.vector_sink_b() + self.tb.connect(src0, (tagged_stream_mux, 0)) + self.tb.connect(src1, (tagged_stream_mux, 1)) + self.tb.connect(tagged_stream_mux, snk) + self.tb.run() + + self.assertEqual(expected, snk.data()) + + tags = [gr.tag_to_python(x) for x in snk.tags()] + tags = sorted([(x.offset, x.key, x.value) for x in tags]) + tags_expected = [ + (0, 'packet_length', 5), + (5, 'packet_length', 5), + (6, 'spam', 42), + (8, 'eggs', 23), + (10, 'packet_length', 4), + (14, 'packet_length', 4) + ] + self.assertEqual(tags, tags_expected) + + +if __name__ == '__main__': + gr_unittest.run(qa_tagged_stream_mux, "qa_tagged_stream_mux.xml") + diff --git a/gr-blocks/python/blocks/qa_udp_source_sink.py b/gr-blocks/python/blocks/qa_udp_source_sink.py new file mode 100644 index 0000000000..b96d393a50 --- /dev/null +++ b/gr-blocks/python/blocks/qa_udp_source_sink.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import os + +from threading import Timer + +class test_udp_sink_source(gr_unittest.TestCase): + + def setUp(self): + os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' + self.tb_snd = gr.top_block() + self.tb_rcv = gr.top_block() + + def tearDown(self): + self.tb_rcv = None + self.tb_snd = None + + def test_001(self): + # Tests calling disconnect/reconnect. + + port = 65500 + + n_data = 16 + src_data = [x for x in range(n_data)] + expected_result = tuple(src_data) + src = blocks.vector_source_s(src_data, False) + udp_snd = blocks.udp_sink(gr.sizeof_short, 'localhost', port) + self.tb_snd.connect(src, udp_snd) + + self.tb_snd.run() + udp_snd.disconnect() + + udp_snd.connect('localhost', port+1) + src.rewind() + self.tb_snd.run() + + def test_002(self): + port = 65500 + + n_data = 100 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = blocks.vector_source_f(src_data, False) + udp_snd = blocks.udp_sink(gr.sizeof_float, 'localhost', port) + self.tb_snd.connect(src, udp_snd) + + udp_rcv = blocks.udp_source(gr.sizeof_float, 'localhost', port) + dst = blocks.vector_sink_f() + self.tb_rcv.connect(udp_rcv, dst) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(2.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(not self.timeout) + + def test_003(self): + udp_rcv = blocks.udp_source(gr.sizeof_float, '0.0.0.0', 0, eof=False) + rcv_port = udp_rcv.get_port() + + udp_snd = blocks.udp_sink(gr.sizeof_float, '127.0.0.1', 65500) + udp_snd.connect('localhost', rcv_port) + + n_data = 16 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = blocks.vector_source_f(src_data) + dst = blocks.vector_sink_f() + + self.tb_snd.connect(src, udp_snd) + self.tb_rcv.connect(udp_rcv, dst) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(2.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(self.timeout) # source ignores EOF? + + def stop_rcv(self): + self.timeout = True + self.tb_rcv.stop() + #print "tb_rcv stopped by Timer" + +if __name__ == '__main__': + gr_unittest.run(test_udp_sink_source, "test_udp_sink_source.xml") + diff --git a/gr-blocks/python/blocks/qa_vco.py b/gr-blocks/python/blocks/qa_vco.py new file mode 100644 index 0000000000..4ed2a71c04 --- /dev/null +++ b/gr-blocks/python/blocks/qa_vco.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import math + +def sig_source_f(samp_rate, freq, amp, N): + t = map(lambda x: float(x)/samp_rate, xrange(N)) + y = map(lambda x: amp*math.cos(2.*math.pi*freq*x), t) + return y + +class test_vco(gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001(self): + src_data = 200*[0,] + 200*[0.5,] + 200*[1,] + expected_result = 200*[1,] + \ + sig_source_f(1, 0.125, 1, 200) + \ + sig_source_f(1, 0.25, 1, 200) + + src = blocks.vector_source_f(src_data) + op = blocks.vco_f(1, math.pi/2.0, 1) + dst = blocks.vector_sink_f() + + self.tb.connect(src, op, dst) + self.tb.run() + + result_data = dst.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5) + + +if __name__ == '__main__': + gr_unittest.run(test_vco, "test_vco.xml") + diff --git a/gr-blocks/python/blocks/qa_vector_insert.py b/gr-blocks/python/blocks/qa_vector_insert.py new file mode 100755 index 0000000000..5f6e84665c --- /dev/null +++ b/gr-blocks/python/blocks/qa_vector_insert.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Copyright 2012-2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import math + +class test_vector_insert(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = [float(x) for x in range(16)] + expected_result = tuple(src_data) + + period = 9177; + offset = 0; + + src = blocks.null_source(1) + head = blocks.head(1, 10000000); + ins = blocks.vector_insert_b([1], period, offset); + dst = blocks.vector_sink_b() + + self.tb.connect(src, head, ins, dst) + self.tb.run() + result_data = dst.data() + + for i in range(10000): + if(i%period == offset): + self.assertEqual(1, result_data[i]) + else: + self.assertEqual(0, result_data[i]) + +if __name__ == '__main__': + gr_unittest.run(test_vector_insert, "test_vector_insert.xml") + diff --git a/gr-blocks/python/blocks/qa_vector_map.py b/gr-blocks/python/blocks/qa_vector_map.py new file mode 100644 index 0000000000..94bdb1989b --- /dev/null +++ b/gr-blocks/python/blocks/qa_vector_map.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# +# Copyright 2012,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import math + +class test_vector_map(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_reversing(self): + # Chunk data in blocks of N and reverse the block contents. + N = 5 + src_data = range(0, 20) + expected_result = [] + for i in range(N-1, len(src_data), N): + for j in range(0, N): + expected_result.append(1.0*(i-j)) + mapping = [list(reversed([(0, i) for i in range(0, N)]))] + src = blocks.vector_source_f(src_data, False, N) + vmap = blocks.vector_map(gr.sizeof_float, (N, ), mapping) + dst = blocks.vector_sink_f(N) + self.tb.connect(src, vmap, dst) + self.tb.run() + result_data = list(dst.data()) + self.assertEqual(expected_result, result_data) + + def test_vector_to_streams(self): + # Split an input vector into N streams. + N = 5 + M = 20 + src_data = range(0, M) + expected_results = [] + for n in range(0, N): + expected_results.append(range(n, M, N)) + mapping = [[(0, n)] for n in range(0, N)] + src = blocks.vector_source_f(src_data, False, N) + vmap = blocks.vector_map(gr.sizeof_float, (N, ), mapping) + dsts = [blocks.vector_sink_f(1) for n in range(0, N)] + self.tb.connect(src, vmap) + for n in range(0, N): + self.tb.connect((vmap, n), dsts[n]) + self.tb.run() + for n in range(0, N): + result_data = list(dsts[n].data()) + self.assertEqual(expected_results[n], result_data) + + def test_interleaving(self): + # Takes 3 streams (a, b and c) + # Outputs 2 streams. + # First (d) is interleaving of a and b. + # Second (e) is interleaving of a and b and c. c is taken in + # chunks of 2 which are reversed. + A = (1, 2, 3, 4, 5) + B = (11, 12, 13, 14, 15) + C = (99, 98, 97, 96, 95, 94, 93, 92, 91, 90) + expected_D = (1, 11, 2, 12, 3, 13, 4, 14, 5, 15) + expected_E = (1, 11, 98, 99, 2, 12, 96, 97, 3, 13, 94, 95, + 4, 14, 92, 93, 5, 15, 90, 91) + mapping = [[(0, 0), (1, 0)], # mapping to produce D + [(0, 0), (1, 0), (2, 1), (2, 0)], # mapping to produce E + ] + srcA = blocks.vector_source_f(A, False, 1) + srcB = blocks.vector_source_f(B, False, 1) + srcC = blocks.vector_source_f(C, False, 2) + vmap = blocks.vector_map(gr.sizeof_int, (1, 1, 2), mapping) + dstD = blocks.vector_sink_f(2) + dstE = blocks.vector_sink_f(4) + self.tb.connect(srcA, (vmap, 0)) + self.tb.connect(srcB, (vmap, 1)) + self.tb.connect(srcC, (vmap, 2)) + self.tb.connect((vmap, 0), dstD) + self.tb.connect((vmap, 1), dstE) + self.tb.run() + self.assertEqual(expected_D, dstD.data()) + self.assertEqual(expected_E, dstE.data()) + +if __name__ == '__main__': + gr_unittest.run(test_vector_map, "test_vector_map.xml") + diff --git a/gr-blocks/python/blocks/qa_vector_sink_source.py b/gr-blocks/python/blocks/qa_vector_sink_source.py new file mode 100755 index 0000000000..6c9f933668 --- /dev/null +++ b/gr-blocks/python/blocks/qa_vector_sink_source.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import math + +class test_vector_sink_source(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = [float(x) for x in range(16)] + expected_result = tuple(src_data) + + src = blocks.vector_source_f(src_data) + dst = blocks.vector_sink_f() + + self.tb.connect(src, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_002(self): + src_data = [float(x) for x in range(16)] + expected_result = tuple(src_data) + + src = blocks.vector_source_f(src_data, False, 2) + dst = blocks.vector_sink_f(2) + + self.tb.connect(src, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_003(self): + src_data = [float(x) for x in range(16)] + expected_result = tuple(src_data) + self.assertRaises(RuntimeError, lambda : blocks.vector_source_f(src_data, False, 3)) + +if __name__ == '__main__': + gr_unittest.run(test_vector_sink_source, "test_vector_sink_source.xml") + diff --git a/gr-blocks/python/blocks/qa_wavfile.py b/gr-blocks/python/blocks/qa_wavfile.py new file mode 100755 index 0000000000..ce1806c5ef --- /dev/null +++ b/gr-blocks/python/blocks/qa_wavfile.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +import os +from os.path import getsize + +g_in_file = os.path.join(os.getenv("srcdir"), "test_16bit_1chunk.wav") + +class test_wavefile(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001_checkwavread(self): + wf = blocks.wavfile_source(g_in_file) + self.assertEqual(wf.sample_rate(), 8000) + + def test_002_checkwavcopy(self): + infile = g_in_file + outfile = "test_out.wav" + + wf_in = blocks.wavfile_source(infile) + wf_out = blocks.wavfile_sink(outfile, + wf_in.channels(), + wf_in.sample_rate(), + wf_in.bits_per_sample()) + self.tb.connect(wf_in, wf_out) + self.tb.run() + wf_out.close() + + self.assertEqual(getsize(infile), getsize(outfile)) + + in_f = file(infile, 'rb') + out_f = file(outfile, 'rb') + + in_data = in_f.read() + out_data = out_f.read() + out_f.close() + os.remove(outfile) + + self.assertEqual(in_data, out_data) + +if __name__ == '__main__': + gr_unittest.run(test_wavefile, "test_wavefile.xml") diff --git a/gr-blocks/python/blocks/stream_to_vector_decimator.py b/gr-blocks/python/blocks/stream_to_vector_decimator.py index c32ae6fce2..984d84b4bb 100644 --- a/gr-blocks/python/blocks/stream_to_vector_decimator.py +++ b/gr-blocks/python/blocks/stream_to_vector_decimator.py @@ -19,13 +19,9 @@ # Boston, MA 02110-1301, USA. # +import blocks_swig as blocks from gnuradio import gr -try: - from gnuradio import blocks -except ImportError: - import blocks_swig as blocks - class stream_to_vector_decimator(gr.hier_block2): """ Convert the stream to a vector, decimate the vector stream to achieve the vector rate. diff --git a/gr-blocks/python/blocks/test_16bit_1chunk.wav b/gr-blocks/python/blocks/test_16bit_1chunk.wav Binary files differnew file mode 100644 index 0000000000..0fe12a7a13 --- /dev/null +++ b/gr-blocks/python/blocks/test_16bit_1chunk.wav |