summaryrefslogtreecommitdiff
path: root/gr-blocks/python/blocks/qa_ctrlport_probes.py
diff options
context:
space:
mode:
authorTom Rondeau <tom@trondeau.com>2015-03-03 15:45:36 -0500
committerTom Rondeau <tom@trondeau.com>2015-04-02 15:38:57 -0700
commit3da7369d88f67258d1ed69069521a2620657b875 (patch)
tree763c673b8d13d397f00764dc01c6174b16067ad6 /gr-blocks/python/blocks/qa_ctrlport_probes.py
parentcc5b9ae930b70a9ada0a9efc5a538fa357753e32 (diff)
controlport: cleaning up; trying to handle shutdown better.
Added QA code in for testing probes and other basic connection features. Requires nthreads >= 10.
Diffstat (limited to 'gr-blocks/python/blocks/qa_ctrlport_probes.py')
-rw-r--r--gr-blocks/python/blocks/qa_ctrlport_probes.py191
1 files changed, 184 insertions, 7 deletions
diff --git a/gr-blocks/python/blocks/qa_ctrlport_probes.py b/gr-blocks/python/blocks/qa_ctrlport_probes.py
index 91d96010fd..cb428d6dda 100644
--- a/gr-blocks/python/blocks/qa_ctrlport_probes.py
+++ b/gr-blocks/python/blocks/qa_ctrlport_probes.py
@@ -22,9 +22,11 @@
import sys, time, random, numpy
from gnuradio import gr, gr_unittest, blocks
-
import os, struct
+#from gnuradio import ctrlport
+from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient
+
class test_ctrlport_probes(gr_unittest.TestCase):
def setUp(self):
@@ -34,21 +36,196 @@ class test_ctrlport_probes(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
- def xtest_001(self):
- pass
+ def test_001(self):
+ data = range(1,9)
+
+ self.src = blocks.vector_source_c(data, True)
+ self.probe = blocks.ctrlport_probe2_c("samples","Complex",
+ len(data), gr.DISPNULL)
+ probe_name = self.probe.alias()
+
+ self.tb.connect(self.src, self.probe)
+ self.tb.start()
+
+ # Probes return complex values as list of floats with re, im
+ # Imaginary parts of this data set are 0.
+ expected_result = [1, 2, 3, 4,
+ 5, 6, 7, 8]
+
+ # Make sure we have time for flowgraph to run
+ time.sleep(0.1)
+
+ # Get available endpoint
+ ep = gr.rpcmanager_get().endpoints()[0]
+
+ # Initialize a simple ControlPort client from endpoint
+ from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient
+ radiosys = GNURadioControlPortClient(rpcmethod='thrift')
+ radio = radiosys.client
+
+ # Get all exported knobs
+ ret = radio.getKnobs([probe_name + "::samples"])
+ for name in ret.keys():
+ # Get data in probe, which might be offset; find the
+ # beginning and unwrap.
+ result = ret[name].value
+ i = result.index(complex(1.0, 0.0))
+ result = result[i:] + result[0:i]
+ self.assertComplexTuplesAlmostEqual(expected_result, result, 4)
+
+ self.tb.stop()
+ self.tb.wait()
+
def test_002(self):
- pass
+ data = range(1,9)
+ self.src = blocks.vector_source_f(data, True)
+ self.probe = blocks.ctrlport_probe2_f("samples","Floats",
+ len(data), gr.DISPNULL)
+ probe_name = self.probe.alias()
+
+ self.tb.connect(self.src, self.probe)
+ self.tb.start()
+
+ expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
+
+ # Make sure we have time for flowgraph to run
+ time.sleep(0.1)
+
+ # Get available endpoint
+ ep = gr.rpcmanager_get().endpoints()[0]
+
+ # Initialize a simple ControlPort client from endpoint
+ from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient
+ radiosys = GNURadioControlPortClient(rpcmethod='thrift')
+ radio = radiosys.client
+
+ # Get all exported knobs
+ ret = radio.getKnobs([probe_name + "::samples"])
+ for name in ret.keys():
+ # Get data in probe, which might be offset; find the
+ # beginning and unwrap.
+ result = ret[name].value
+ i = result.index(1.0)
+ result = result[i:] + result[0:i]
+ self.assertEqual(expected_result, result)
+
+ self.tb.stop()
+ self.tb.wait()
def test_003(self):
- pass
+ data = range(1,9)
+
+ self.src = blocks.vector_source_i(data, True)
+ self.probe = blocks.ctrlport_probe2_i("samples","Integers",
+ len(data), gr.DISPNULL)
+ probe_name = self.probe.alias()
+
+ self.tb.connect(self.src, self.probe)
+ self.tb.start()
+
+ expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
+
+ # Make sure we have time for flowgraph to run
+ time.sleep(0.1)
+
+ # Get available endpoint
+ ep = gr.rpcmanager_get().endpoints()[0]
+
+ # Initialize a simple ControlPort client from endpoint
+ from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient
+ radiosys = GNURadioControlPortClient(rpcmethod='thrift')
+ radio = radiosys.client
+
+ # Get all exported knobs
+ ret = radio.getKnobs([probe_name + "::samples"])
+ for name in ret.keys():
+ # Get data in probe, which might be offset; find the
+ # beginning and unwrap.
+ result = ret[name].value
+ i = result.index(1.0)
+ result = result[i:] + result[0:i]
+ self.assertEqual(expected_result, result)
+
+ self.tb.stop()
+ self.tb.wait()
+
def test_004(self):
- pass
+ data = range(1,9)
+
+ self.src = blocks.vector_source_s(data, True)
+ self.probe = blocks.ctrlport_probe2_s("samples","Shorts",
+ len(data), gr.DISPNULL)
+ probe_name = self.probe.alias()
+
+ self.tb.connect(self.src, self.probe)
+ self.tb.start()
+
+ expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
+
+ # Make sure we have time for flowgraph to run
+ time.sleep(0.1)
+
+ # Get available endpoint
+ ep = gr.rpcmanager_get().endpoints()[0]
+
+ # Initialize a simple ControlPort client from endpoint
+ from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient
+ radiosys = GNURadioControlPortClient(rpcmethod='thrift')
+ radio = radiosys.client
+
+ # Get all exported knobs
+ ret = radio.getKnobs([probe_name + "::samples"])
+ for name in ret.keys():
+ # Get data in probe, which might be offset; find the
+ # beginning and unwrap.
+ result = ret[name].value
+ i = result.index(1.0)
+ result = result[i:] + result[0:i]
+ self.assertEqual(expected_result, result)
+
+ self.tb.stop()
+ self.tb.wait()
def test_005(self):
- pass
+ data = range(1,9)
+
+ self.src = blocks.vector_source_b(data, True)
+ self.probe = blocks.ctrlport_probe2_b("samples","Bytes",
+ len(data), gr.DISPNULL)
+ probe_name = self.probe.alias()
+
+ self.tb.connect(self.src, self.probe)
+ self.tb.start()
+
+ expected_result = [1, 2, 3, 4, 5, 6, 7, 8,]
+
+ # Make sure we have time for flowgraph to run
+ time.sleep(0.1)
+
+ # Get available endpoint
+ ep = gr.rpcmanager_get().endpoints()[0]
+
+ # Initialize a simple ControlPort client from endpoint
+ from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient
+ radiosys = GNURadioControlPortClient(rpcmethod='thrift')
+ radio = radiosys.client
+
+ # Get all exported knobs
+ ret = radio.getKnobs([probe_name + "::samples"])
+ for name in ret.keys():
+ # Get data in probe, which might be offset; find the
+ # beginning and unwrap.
+ result = ret[name].value
+ result = list(struct.unpack(len(result)*'b', result))
+ i = result.index(1)
+ result = result[i:] + result[0:i]
+ self.assertEqual(expected_result, result)
+
+ self.tb.stop()
+ self.tb.wait()
if __name__ == '__main__':
gr_unittest.run(test_ctrlport_probes, "test_ctrlport_probes.xml")