summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
authorTom Rondeau <tom@trondeau.com>2014-08-08 11:28:05 -0400
committerTom Rondeau <tom@trondeau.com>2014-08-08 16:37:43 -0400
commit3d5df0ddd3aa8d5a94285b95f487747f25d4ee06 (patch)
tree715007cafdc7ac2b6eb954562be0b3a4a4612e2e /gr-blocks/python
parent3d18f70c66c974a82c5096acc4cbd37a47b6b55c (diff)
controlport: removing use of ice for a controlport rpc.
This effectively disables the use of ControlPort for now until we build in a new middleware layer. The ControlPort API and interfaces exist but will function as nops for now.
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-xgr-blocks/python/blocks/qa_cpp_py_binding.py172
-rwxr-xr-xgr-blocks/python/blocks/qa_cpp_py_binding_set.py150
-rw-r--r--gr-blocks/python/blocks/qa_ctrlport_probes.py187
3 files changed, 6 insertions, 503 deletions
diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding.py b/gr-blocks/python/blocks/qa_cpp_py_binding.py
deleted file mode 100755
index 35e073d584..0000000000
--- a/gr-blocks/python/blocks/qa_cpp_py_binding.py
+++ /dev/null
@@ -1,172 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2012,2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-#
-# This program tests mixed python and c++ ctrlport exports in a single app
-#
-
-import Ice
-import sys, time, random, numpy
-from gnuradio import gr, gr_unittest, blocks
-
-from gnuradio.ctrlport import GNURadio
-from gnuradio import ctrlport
-import os
-
-def get1():
- return "success"
-
-def get2():
- return "failure"
-
-class inc_class:
- def __init__(self):
- self.val = 1
- def pp(self):
- self.val = self.val+1
- return self.val
-
-get3 = inc_class()
-
-def get4():
- random.seed(0)
- rv = random.random()
- return rv
-
-def get5():
- numpy.random.seed(0)
- samp_t = numpy.random.randn(24)+1j*numpy.random.randn(24);
- samp_f = numpy.fft.fft(samp_t);
- log_pow_f = 20*numpy.log10(numpy.abs(samp_f))
- rv = list(log_pow_f)
- return rv;
-
-def get6():
- numpy.random.seed(0)
- samp_t = numpy.random.randn(1024)+1j*numpy.random.randn(1024);
- rv = list(samp_t)
- return rv;
-
-class test_cpp_py_binding(gr_unittest.TestCase):
-
- def setUp(self):
- self.tb = gr.top_block()
- os.environ['GR_CONF_CONTROLPORT_ON'] = 'True'
-
- def tearDown(self):
- self.tb = None
-
- def test_001(self):
- v1 = gr.RPC_get_string("pyland", "v1", "unit_1_string",
- "Python Exported String", "", "", "",
- gr.DISPNULL)
- v1.activate(get1)
-
- v2 = gr.RPC_get_string("pyland", "v2", "unit_2_string",
- "Python Exported String", "", "", "",
- gr.DISPNULL)
- v2.activate(get2)
-
- v3 = gr.RPC_get_int("pyland", "v3", "unit_3_int",
- "Python Exported Int", 0, 100, 1,
- gr.DISPNULL)
- v3.activate(get3.pp)
-
- v4 = gr.RPC_get_double("pyland", "time", "unit_4_time_double",
- "Python Exported Double", 0, 1000, 1,
- gr.DISPNULL)
- v4.activate(get4)
-
- v5 = gr.RPC_get_vector_float("pyland", "fvec", "unit_5_float_vector",
- "Python Exported Float Vector", [], [], [],
- gr.DISPTIME | gr.DISPOPTCPLX)
- v5.activate(get5)
-
- v6 = gr.RPC_get_vector_gr_complex("pyland", "cvec", "unit_6_gr_complex_vector",
- "Python Exported Complex Vector", [], [], [],
- gr.DISPXY | gr.DISPOPTSCATTER)
- v6.activate(get6)
-
- # print some variables locally
- val = get1()
- rval = v1.get()
- self.assertEqual(val, rval)
-
- val = get2()
- rval = v2.get()
- self.assertEqual(val, rval)
-
- val = get3.pp()
- rval = v3.get()
- self.assertEqual(val+1, rval)
-
- val = get4()
- rval = v4.get()
- self.assertEqual(val, rval)
-
- val = get5()
- rval = v5.get()
- self.assertComplexTuplesAlmostEqual(val, rval, 5)
-
- val = get6()
- rval = v6.get()
- self.assertComplexTuplesAlmostEqual(val, rval, 5)
-
- def test_002(self):
- data = range(1,9)
-
- self.src = blocks.vector_source_c(data)
- self.p1 = blocks.ctrlport_probe_c("aaa","C++ exported variable")
- self.p2 = blocks.ctrlport_probe_c("bbb","C++ exported variable")
- probe_name = self.p2.alias()
-
- self.tb.connect(self.src, self.p1)
- self.tb.connect(self.src, self.p2)
- self.tb.start()
-
- # Probes return complex values as list of floats with re, im
- # Imaginary parts of this data set are 0.
- expected_result = [1, 0, 2, 0, 3, 0, 4, 0,
- 5, 0, 6, 0, 7, 0, 8, 0]
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- # Get all exported knobs
- ret = radio.get([probe_name + "::bbb"])
- for name in ret.keys():
- result = ret[name].value
- self.assertEqual(result, expected_result)
-
- self.tb.stop()
-
-if __name__ == '__main__':
- gr_unittest.run(test_cpp_py_binding, "test_cpp_py_binding.xml")
-
diff --git a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py b/gr-blocks/python/blocks/qa_cpp_py_binding_set.py
deleted file mode 100755
index 69ed6d1d2b..0000000000
--- a/gr-blocks/python/blocks/qa_cpp_py_binding_set.py
+++ /dev/null
@@ -1,150 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2012,2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-#
-# This program tests mixed python and c++ GRCP sets in a single app
-#
-
-import Ice
-import sys, time, random, numpy
-from gnuradio import gr, gr_unittest, blocks
-
-from gnuradio.ctrlport import GNURadio
-from gnuradio import ctrlport
-import os
-
-class inc_class:
- def __init__(self,val):
- self.val = val;
-
- def _get(self):
- #print "returning get (val = %s)"%(str(self.val));
- return self.val;
-
- def _set(self,val):
- #print "updating val to %s"%(str(val));
- self.val = val;
- return;
-
-getset1 = inc_class(10);
-getset2 = inc_class(100.0);
-getset3 = inc_class("test");
-
-class test_cpp_py_binding_set(gr_unittest.TestCase):
- def setUp(self):
- self.tb = gr.top_block()
- os.environ['GR_CONF_CONTROLPORT_ON'] = 'True'
-
- def tearDown(self):
- self.tb = None
-
- def test_001(self):
-
- g1 = gr.RPC_get_int("pyland", "v1", "unit_1_int",
- "Python Exported Int", 0, 100, 10,
- gr.DISPNULL)
- g1.activate(getset1._get)
- s1 = gr.RPC_get_int("pyland", "v1", "unit_1_int",
- "Python Exported Int", 0, 100, 10,
- gr.DISPNULL)
- s1.activate(getset1._set)
- time.sleep(0.01)
-
- # test int variables
- getset1._set(21)
- val = getset1._get()
- rval = g1.get()
- self.assertEqual(val, rval)
-
- g2 = gr.RPC_get_float("pyland", "v2", "unit_2_float",
- "Python Exported Float", -100, 1000.0, 100.0,
- gr.DISPNULL)
- g2.activate(getset2._get)
- s2 = gr.RPC_get_float("pyland", "v2", "unit_2_float",
- "Python Exported Float", -100, 1000.0, 100.0,
- gr.DISPNULL)
- s2.activate(getset2._set)
- time.sleep(0.01)
-
- # test float variables
- getset2._set(123.456)
- val = getset2._get()
- rval = g2.get()
- self.assertAlmostEqual(val, rval, 4)
-
- g3 = gr.RPC_get_string("pyland", "v3", "unit_3_string",
- "Python Exported String", "", "", "",
- gr.DISPNULL)
- g3.activate(getset3._get)
- s3 = gr.RPC_get_string("pyland", "v3", "unit_3_string",
- "Python Exported String", "", "", "",
- gr.DISPNULL)
- s3.activate(getset3._set)
- time.sleep(0.01)
-
- # test string variables
- getset3._set("third test")
- val = getset3._get()
- rval = g3.get()
- self.assertEqual(val, rval)
-
-
- def test_002(self):
- data = range(1, 10)
-
- self.src = blocks.vector_source_c(data, True)
- self.p = blocks.nop(gr.sizeof_gr_complex)
- self.p.set_ctrlport_test(0);
- probe_info = self.p.alias()
-
- self.tb.connect(self.src, self.p)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- self.tb.start()
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get all exported knobs
- key_name_test = probe_info+"::test"
- ret = radio.get([key_name_test,])
-
- ret[key_name_test].value = 10
- radio.set({key_name_test: ret[key_name_test]})
-
- ret = radio.get([])
- result_test = ret[key_name_test].value
- self.assertEqual(result_test, 10)
-
- self.tb.stop()
- self.tb.wait()
-
-if __name__ == '__main__':
- gr_unittest.run(test_cpp_py_binding_set, "test_cpp_py_binding_set.xml")
-
diff --git a/gr-blocks/python/blocks/qa_ctrlport_probes.py b/gr-blocks/python/blocks/qa_ctrlport_probes.py
index b31934f705..91d96010fd 100644
--- a/gr-blocks/python/blocks/qa_ctrlport_probes.py
+++ b/gr-blocks/python/blocks/qa_ctrlport_probes.py
@@ -20,12 +20,9 @@
# Boston, MA 02110-1301, USA.
#
-import Ice
import sys, time, random, numpy
from gnuradio import gr, gr_unittest, blocks
-from gnuradio.ctrlport import GNURadio
-from gnuradio import ctrlport
import os, struct
class test_ctrlport_probes(gr_unittest.TestCase):
@@ -37,193 +34,21 @@ class test_ctrlport_probes(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
- def test_001(self):
- data = range(1,9)
-
- self.src = blocks.vector_source_c(data, True)
- self.probe = blocks.ctrlport_probe2_c("samples","Complex",
- len(data), gr.DISPNULL)
- probe_name = self.probe.alias()
-
- self.tb.connect(self.src, self.probe)
- self.tb.start()
-
- # Probes return complex values as list of floats with re, im
- # Imaginary parts of this data set are 0.
- expected_result = [1, 0, 2, 0, 3, 0, 4, 0,
- 5, 0, 6, 0, 7, 0, 8, 0]
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- # Get all exported knobs
- ret = radio.get([probe_name + "::samples"])
- for name in ret.keys():
- # Get data in probe, which might be offset; find the
- # beginning and unwrap.
- result = ret[name].value
- i = result.index(1.0)
- result = result[i:] + result[0:i]
- self.assertEqual(expected_result, result)
-
- self.tb.stop()
-
+ def xtest_001(self):
+ pass
def test_002(self):
- data = range(1,9)
-
- self.src = blocks.vector_source_f(data, True)
- self.probe = blocks.ctrlport_probe2_f("samples","Floats",
- len(data), gr.DISPNULL)
- probe_name = self.probe.alias()
-
- self.tb.connect(self.src, self.probe)
- self.tb.start()
-
- expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- # Get all exported knobs
- ret = radio.get([probe_name + "::samples"])
- for name in ret.keys():
- # Get data in probe, which might be offset; find the
- # beginning and unwrap.
- result = ret[name].value
- i = result.index(1.0)
- result = result[i:] + result[0:i]
- self.assertEqual(expected_result, result)
-
- self.tb.stop()
+ pass
def test_003(self):
- data = range(1,9)
-
- self.src = blocks.vector_source_i(data, True)
- self.probe = blocks.ctrlport_probe2_i("samples","Integers",
- len(data), gr.DISPNULL)
- probe_name = self.probe.alias()
-
- self.tb.connect(self.src, self.probe)
- self.tb.start()
-
- expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- # Get all exported knobs
- ret = radio.get([probe_name + "::samples"])
- for name in ret.keys():
- # Get data in probe, which might be offset; find the
- # beginning and unwrap.
- result = ret[name].value
- i = result.index(1.0)
- result = result[i:] + result[0:i]
- self.assertEqual(expected_result, result)
-
- self.tb.stop()
-
+ pass
def test_004(self):
- data = range(1,9)
-
- self.src = blocks.vector_source_s(data, True)
- self.probe = blocks.ctrlport_probe2_s("samples","Shorts",
- len(data), gr.DISPNULL)
- probe_name = self.probe.alias()
-
- self.tb.connect(self.src, self.probe)
- self.tb.start()
-
- expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- # Get all exported knobs
- ret = radio.get([probe_name + "::samples"])
- for name in ret.keys():
- # Get data in probe, which might be offset; find the
- # beginning and unwrap.
- result = ret[name].value
- i = result.index(1.0)
- result = result[i:] + result[0:i]
- self.assertEqual(expected_result, result)
-
- self.tb.stop()
+ pass
def test_005(self):
- data = range(1,9)
-
- self.src = blocks.vector_source_b(data, True)
- self.probe = blocks.ctrlport_probe2_b("samples","Bytes",
- len(data), gr.DISPNULL)
- probe_name = self.probe.alias()
-
- self.tb.connect(self.src, self.probe)
- self.tb.start()
-
- expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
-
- # Make sure we have time for flowgraph to run
- time.sleep(0.1)
-
- # Get available endpoint
- ep = gr.rpcmanager_get().endpoints()[0]
-
- # Initialize a simple Ice client from endpoint
- ic = Ice.initialize(sys.argv)
- base = ic.stringToProxy(ep)
- radio = GNURadio.ControlPortPrx.checkedCast(base)
-
- # Get all exported knobs
- ret = radio.get([probe_name + "::samples"])
- for name in ret.keys():
- # Get data in probe, which might be offset; find the
- # beginning and unwrap.
- result = ret[name].value
- result = list(struct.unpack(len(result)*'b', result))
- i = result.index(1)
- result = result[i:] + result[0:i]
- self.assertEqual(expected_result, result)
-
- self.tb.stop()
+ pass
if __name__ == '__main__':
gr_unittest.run(test_ctrlport_probes, "test_ctrlport_probes.xml")
-