diff options
author | Tom Rondeau <tom@trondeau.com> | 2014-08-08 16:44:15 -0400 |
---|---|---|
committer | Tom Rondeau <tom@trondeau.com> | 2014-08-08 16:44:15 -0400 |
commit | 4ac28002fbea740f78c2339446cca2d1f118628f (patch) | |
tree | c81ebb1aa487fdb3cc277415e68c360ef6c80b26 /gr-blocks/python | |
parent | 41d0844328800cfc84c97e3961c39b5fafab11bb (diff) | |
parent | 3d5df0ddd3aa8d5a94285b95f487747f25d4ee06 (diff) |
Merge branch 'maint'
Conflicts:
gnuradio-runtime/CMakeLists.txt
gnuradio-runtime/lib/controlport/CMakeLists.txt
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-x | gr-blocks/python/blocks/qa_cpp_py_binding.py | 172 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_cpp_py_binding_set.py | 150 | ||||
-rw-r--r-- | gr-blocks/python/blocks/qa_ctrlport_probes.py | 187 |
3 files changed, 6 insertions, 503 deletions
diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding.py b/gr-blocks/python/blocks/qa_cpp_py_binding.py deleted file mode 100755 index 35e073d584..0000000000 --- a/gr-blocks/python/blocks/qa_cpp_py_binding.py +++ /dev/null @@ -1,172 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2012,2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -# -# This program tests mixed python and c++ ctrlport exports in a single app -# - -import Ice -import sys, time, random, numpy -from gnuradio import gr, gr_unittest, blocks - -from gnuradio.ctrlport import GNURadio -from gnuradio import ctrlport -import os - -def get1(): - return "success" - -def get2(): - return "failure" - -class inc_class: - def __init__(self): - self.val = 1 - def pp(self): - self.val = self.val+1 - return self.val - -get3 = inc_class() - -def get4(): - random.seed(0) - rv = random.random() - return rv - -def get5(): - numpy.random.seed(0) - samp_t = numpy.random.randn(24)+1j*numpy.random.randn(24); - samp_f = numpy.fft.fft(samp_t); - log_pow_f = 20*numpy.log10(numpy.abs(samp_f)) - rv = list(log_pow_f) - return rv; - -def get6(): - numpy.random.seed(0) - samp_t = numpy.random.randn(1024)+1j*numpy.random.randn(1024); - rv = list(samp_t) - return rv; - -class test_cpp_py_binding(gr_unittest.TestCase): - - def setUp(self): - self.tb = gr.top_block() - os.environ['GR_CONF_CONTROLPORT_ON'] = 'True' - - def tearDown(self): - self.tb = None - - def test_001(self): - v1 = gr.RPC_get_string("pyland", "v1", "unit_1_string", - "Python Exported String", "", "", "", - gr.DISPNULL) - v1.activate(get1) - - v2 = gr.RPC_get_string("pyland", "v2", "unit_2_string", - "Python Exported String", "", "", "", - gr.DISPNULL) - v2.activate(get2) - - v3 = gr.RPC_get_int("pyland", "v3", "unit_3_int", - "Python Exported Int", 0, 100, 1, - gr.DISPNULL) - v3.activate(get3.pp) - - v4 = gr.RPC_get_double("pyland", "time", "unit_4_time_double", - "Python Exported Double", 0, 1000, 1, - gr.DISPNULL) - v4.activate(get4) - - v5 = gr.RPC_get_vector_float("pyland", "fvec", "unit_5_float_vector", - "Python Exported Float Vector", [], [], [], - gr.DISPTIME | gr.DISPOPTCPLX) - v5.activate(get5) - - v6 = gr.RPC_get_vector_gr_complex("pyland", "cvec", "unit_6_gr_complex_vector", - "Python Exported Complex Vector", [], [], [], - gr.DISPXY | gr.DISPOPTSCATTER) - v6.activate(get6) - - # print some variables locally - val = get1() - rval = v1.get() - self.assertEqual(val, rval) - - val = get2() - rval = v2.get() - self.assertEqual(val, rval) - - val = get3.pp() - rval = v3.get() - self.assertEqual(val+1, rval) - - val = get4() - rval = v4.get() - self.assertEqual(val, rval) - - val = get5() - rval = v5.get() - self.assertComplexTuplesAlmostEqual(val, rval, 5) - - val = get6() - rval = v6.get() - self.assertComplexTuplesAlmostEqual(val, rval, 5) - - def test_002(self): - data = range(1,9) - - self.src = blocks.vector_source_c(data) - self.p1 = blocks.ctrlport_probe_c("aaa","C++ exported variable") - self.p2 = blocks.ctrlport_probe_c("bbb","C++ exported variable") - probe_name = self.p2.alias() - - self.tb.connect(self.src, self.p1) - self.tb.connect(self.src, self.p2) - self.tb.start() - - # Probes return complex values as list of floats with re, im - # Imaginary parts of this data set are 0. - expected_result = [1, 0, 2, 0, 3, 0, 4, 0, - 5, 0, 6, 0, 7, 0, 8, 0] - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - # Get all exported knobs - ret = radio.get([probe_name + "::bbb"]) - for name in ret.keys(): - result = ret[name].value - self.assertEqual(result, expected_result) - - self.tb.stop() - -if __name__ == '__main__': - gr_unittest.run(test_cpp_py_binding, "test_cpp_py_binding.xml") - diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py b/gr-blocks/python/blocks/qa_cpp_py_binding_set.py deleted file mode 100755 index 69ed6d1d2b..0000000000 --- a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2012,2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -# -# This program tests mixed python and c++ GRCP sets in a single app -# - -import Ice -import sys, time, random, numpy -from gnuradio import gr, gr_unittest, blocks - -from gnuradio.ctrlport import GNURadio -from gnuradio import ctrlport -import os - -class inc_class: - def __init__(self,val): - self.val = val; - - def _get(self): - #print "returning get (val = %s)"%(str(self.val)); - return self.val; - - def _set(self,val): - #print "updating val to %s"%(str(val)); - self.val = val; - return; - -getset1 = inc_class(10); -getset2 = inc_class(100.0); -getset3 = inc_class("test"); - -class test_cpp_py_binding_set(gr_unittest.TestCase): - def setUp(self): - self.tb = gr.top_block() - os.environ['GR_CONF_CONTROLPORT_ON'] = 'True' - - def tearDown(self): - self.tb = None - - def test_001(self): - - g1 = gr.RPC_get_int("pyland", "v1", "unit_1_int", - "Python Exported Int", 0, 100, 10, - gr.DISPNULL) - g1.activate(getset1._get) - s1 = gr.RPC_get_int("pyland", "v1", "unit_1_int", - "Python Exported Int", 0, 100, 10, - gr.DISPNULL) - s1.activate(getset1._set) - time.sleep(0.01) - - # test int variables - getset1._set(21) - val = getset1._get() - rval = g1.get() - self.assertEqual(val, rval) - - g2 = gr.RPC_get_float("pyland", "v2", "unit_2_float", - "Python Exported Float", -100, 1000.0, 100.0, - gr.DISPNULL) - g2.activate(getset2._get) - s2 = gr.RPC_get_float("pyland", "v2", "unit_2_float", - "Python Exported Float", -100, 1000.0, 100.0, - gr.DISPNULL) - s2.activate(getset2._set) - time.sleep(0.01) - - # test float variables - getset2._set(123.456) - val = getset2._get() - rval = g2.get() - self.assertAlmostEqual(val, rval, 4) - - g3 = gr.RPC_get_string("pyland", "v3", "unit_3_string", - "Python Exported String", "", "", "", - gr.DISPNULL) - g3.activate(getset3._get) - s3 = gr.RPC_get_string("pyland", "v3", "unit_3_string", - "Python Exported String", "", "", "", - gr.DISPNULL) - s3.activate(getset3._set) - time.sleep(0.01) - - # test string variables - getset3._set("third test") - val = getset3._get() - rval = g3.get() - self.assertEqual(val, rval) - - - def test_002(self): - data = range(1, 10) - - self.src = blocks.vector_source_c(data, True) - self.p = blocks.nop(gr.sizeof_gr_complex) - self.p.set_ctrlport_test(0); - probe_info = self.p.alias() - - self.tb.connect(self.src, self.p) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - self.tb.start() - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get all exported knobs - key_name_test = probe_info+"::test" - ret = radio.get([key_name_test,]) - - ret[key_name_test].value = 10 - radio.set({key_name_test: ret[key_name_test]}) - - ret = radio.get([]) - result_test = ret[key_name_test].value - self.assertEqual(result_test, 10) - - self.tb.stop() - self.tb.wait() - -if __name__ == '__main__': - gr_unittest.run(test_cpp_py_binding_set, "test_cpp_py_binding_set.xml") - diff --git a/gr-blocks/python/blocks/qa_ctrlport_probes.py b/gr-blocks/python/blocks/qa_ctrlport_probes.py index b31934f705..91d96010fd 100644 --- a/gr-blocks/python/blocks/qa_ctrlport_probes.py +++ b/gr-blocks/python/blocks/qa_ctrlport_probes.py @@ -20,12 +20,9 @@ # Boston, MA 02110-1301, USA. # -import Ice import sys, time, random, numpy from gnuradio import gr, gr_unittest, blocks -from gnuradio.ctrlport import GNURadio -from gnuradio import ctrlport import os, struct class test_ctrlport_probes(gr_unittest.TestCase): @@ -37,193 +34,21 @@ class test_ctrlport_probes(gr_unittest.TestCase): def tearDown(self): self.tb = None - def test_001(self): - data = range(1,9) - - self.src = blocks.vector_source_c(data, True) - self.probe = blocks.ctrlport_probe2_c("samples","Complex", - len(data), gr.DISPNULL) - probe_name = self.probe.alias() - - self.tb.connect(self.src, self.probe) - self.tb.start() - - # Probes return complex values as list of floats with re, im - # Imaginary parts of this data set are 0. - expected_result = [1, 0, 2, 0, 3, 0, 4, 0, - 5, 0, 6, 0, 7, 0, 8, 0] - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - # Get all exported knobs - ret = radio.get([probe_name + "::samples"]) - for name in ret.keys(): - # Get data in probe, which might be offset; find the - # beginning and unwrap. - result = ret[name].value - i = result.index(1.0) - result = result[i:] + result[0:i] - self.assertEqual(expected_result, result) - - self.tb.stop() - + def xtest_001(self): + pass def test_002(self): - data = range(1,9) - - self.src = blocks.vector_source_f(data, True) - self.probe = blocks.ctrlport_probe2_f("samples","Floats", - len(data), gr.DISPNULL) - probe_name = self.probe.alias() - - self.tb.connect(self.src, self.probe) - self.tb.start() - - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - # Get all exported knobs - ret = radio.get([probe_name + "::samples"]) - for name in ret.keys(): - # Get data in probe, which might be offset; find the - # beginning and unwrap. - result = ret[name].value - i = result.index(1.0) - result = result[i:] + result[0:i] - self.assertEqual(expected_result, result) - - self.tb.stop() + pass def test_003(self): - data = range(1,9) - - self.src = blocks.vector_source_i(data, True) - self.probe = blocks.ctrlport_probe2_i("samples","Integers", - len(data), gr.DISPNULL) - probe_name = self.probe.alias() - - self.tb.connect(self.src, self.probe) - self.tb.start() - - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - # Get all exported knobs - ret = radio.get([probe_name + "::samples"]) - for name in ret.keys(): - # Get data in probe, which might be offset; find the - # beginning and unwrap. - result = ret[name].value - i = result.index(1.0) - result = result[i:] + result[0:i] - self.assertEqual(expected_result, result) - - self.tb.stop() - + pass def test_004(self): - data = range(1,9) - - self.src = blocks.vector_source_s(data, True) - self.probe = blocks.ctrlport_probe2_s("samples","Shorts", - len(data), gr.DISPNULL) - probe_name = self.probe.alias() - - self.tb.connect(self.src, self.probe) - self.tb.start() - - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - # Get all exported knobs - ret = radio.get([probe_name + "::samples"]) - for name in ret.keys(): - # Get data in probe, which might be offset; find the - # beginning and unwrap. - result = ret[name].value - i = result.index(1.0) - result = result[i:] + result[0:i] - self.assertEqual(expected_result, result) - - self.tb.stop() + pass def test_005(self): - data = range(1,9) - - self.src = blocks.vector_source_b(data, True) - self.probe = blocks.ctrlport_probe2_b("samples","Bytes", - len(data), gr.DISPNULL) - probe_name = self.probe.alias() - - self.tb.connect(self.src, self.probe) - self.tb.start() - - expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] - - # Make sure we have time for flowgraph to run - time.sleep(0.1) - - # Get available endpoint - ep = gr.rpcmanager_get().endpoints()[0] - - # Initialize a simple Ice client from endpoint - ic = Ice.initialize(sys.argv) - base = ic.stringToProxy(ep) - radio = GNURadio.ControlPortPrx.checkedCast(base) - - # Get all exported knobs - ret = radio.get([probe_name + "::samples"]) - for name in ret.keys(): - # Get data in probe, which might be offset; find the - # beginning and unwrap. - result = ret[name].value - result = list(struct.unpack(len(result)*'b', result)) - i = result.index(1) - result = result[i:] + result[0:i] - self.assertEqual(expected_result, result) - - self.tb.stop() + pass if __name__ == '__main__': gr_unittest.run(test_ctrlport_probes, "test_ctrlport_probes.xml") - |