diff options
author | Tom Rondeau <tom@trondeau.com> | 2015-03-03 15:45:36 -0500 |
---|---|---|
committer | Tom Rondeau <tom@trondeau.com> | 2015-04-02 15:38:57 -0700 |
commit | 3da7369d88f67258d1ed69069521a2620657b875 (patch) | |
tree | 763c673b8d13d397f00764dc01c6174b16067ad6 /gr-blocks/python/blocks/qa_ctrlport_probes.py | |
parent | cc5b9ae930b70a9ada0a9efc5a538fa357753e32 (diff) |
controlport: cleaning up; trying to handle shutdown better.
Added QA code in for testing probes and other basic connection features. Requires nthreads >= 10.
Diffstat (limited to 'gr-blocks/python/blocks/qa_ctrlport_probes.py')
-rw-r--r-- | gr-blocks/python/blocks/qa_ctrlport_probes.py | 191 |
1 files changed, 184 insertions, 7 deletions
diff --git a/gr-blocks/python/blocks/qa_ctrlport_probes.py b/gr-blocks/python/blocks/qa_ctrlport_probes.py index 91d96010fd..cb428d6dda 100644 --- a/gr-blocks/python/blocks/qa_ctrlport_probes.py +++ b/gr-blocks/python/blocks/qa_ctrlport_probes.py @@ -22,9 +22,11 @@ import sys, time, random, numpy from gnuradio import gr, gr_unittest, blocks - import os, struct +#from gnuradio import ctrlport +from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + class test_ctrlport_probes(gr_unittest.TestCase): def setUp(self): @@ -34,21 +36,196 @@ class test_ctrlport_probes(gr_unittest.TestCase): def tearDown(self): self.tb = None - def xtest_001(self): - pass + def test_001(self): + data = range(1,9) + + self.src = blocks.vector_source_c(data, True) + self.probe = blocks.ctrlport_probe2_c("samples","Complex", + len(data), gr.DISPNULL) + probe_name = self.probe.alias() + + self.tb.connect(self.src, self.probe) + self.tb.start() + + # Probes return complex values as list of floats with re, im + # Imaginary parts of this data set are 0. + expected_result = [1, 2, 3, 4, + 5, 6, 7, 8] + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple ControlPort client from endpoint + from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + radiosys = GNURadioControlPortClient(rpcmethod='thrift') + radio = radiosys.client + + # Get all exported knobs + ret = radio.getKnobs([probe_name + "::samples"]) + for name in ret.keys(): + # Get data in probe, which might be offset; find the + # beginning and unwrap. + result = ret[name].value + i = result.index(complex(1.0, 0.0)) + result = result[i:] + result[0:i] + self.assertComplexTuplesAlmostEqual(expected_result, result, 4) + + self.tb.stop() + self.tb.wait() + def test_002(self): - pass + data = range(1,9) + self.src = blocks.vector_source_f(data, True) + self.probe = blocks.ctrlport_probe2_f("samples","Floats", + len(data), gr.DISPNULL) + probe_name = self.probe.alias() + + self.tb.connect(self.src, self.probe) + self.tb.start() + + expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple ControlPort client from endpoint + from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + radiosys = GNURadioControlPortClient(rpcmethod='thrift') + radio = radiosys.client + + # Get all exported knobs + ret = radio.getKnobs([probe_name + "::samples"]) + for name in ret.keys(): + # Get data in probe, which might be offset; find the + # beginning and unwrap. + result = ret[name].value + i = result.index(1.0) + result = result[i:] + result[0:i] + self.assertEqual(expected_result, result) + + self.tb.stop() + self.tb.wait() def test_003(self): - pass + data = range(1,9) + + self.src = blocks.vector_source_i(data, True) + self.probe = blocks.ctrlport_probe2_i("samples","Integers", + len(data), gr.DISPNULL) + probe_name = self.probe.alias() + + self.tb.connect(self.src, self.probe) + self.tb.start() + + expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple ControlPort client from endpoint + from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + radiosys = GNURadioControlPortClient(rpcmethod='thrift') + radio = radiosys.client + + # Get all exported knobs + ret = radio.getKnobs([probe_name + "::samples"]) + for name in ret.keys(): + # Get data in probe, which might be offset; find the + # beginning and unwrap. + result = ret[name].value + i = result.index(1.0) + result = result[i:] + result[0:i] + self.assertEqual(expected_result, result) + + self.tb.stop() + self.tb.wait() + def test_004(self): - pass + data = range(1,9) + + self.src = blocks.vector_source_s(data, True) + self.probe = blocks.ctrlport_probe2_s("samples","Shorts", + len(data), gr.DISPNULL) + probe_name = self.probe.alias() + + self.tb.connect(self.src, self.probe) + self.tb.start() + + expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple ControlPort client from endpoint + from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + radiosys = GNURadioControlPortClient(rpcmethod='thrift') + radio = radiosys.client + + # Get all exported knobs + ret = radio.getKnobs([probe_name + "::samples"]) + for name in ret.keys(): + # Get data in probe, which might be offset; find the + # beginning and unwrap. + result = ret[name].value + i = result.index(1.0) + result = result[i:] + result[0:i] + self.assertEqual(expected_result, result) + + self.tb.stop() + self.tb.wait() def test_005(self): - pass + data = range(1,9) + + self.src = blocks.vector_source_b(data, True) + self.probe = blocks.ctrlport_probe2_b("samples","Bytes", + len(data), gr.DISPNULL) + probe_name = self.probe.alias() + + self.tb.connect(self.src, self.probe) + self.tb.start() + + expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] + + # Make sure we have time for flowgraph to run + time.sleep(0.1) + + # Get available endpoint + ep = gr.rpcmanager_get().endpoints()[0] + + # Initialize a simple ControlPort client from endpoint + from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient + radiosys = GNURadioControlPortClient(rpcmethod='thrift') + radio = radiosys.client + + # Get all exported knobs + ret = radio.getKnobs([probe_name + "::samples"]) + for name in ret.keys(): + # Get data in probe, which might be offset; find the + # beginning and unwrap. + result = ret[name].value + result = list(struct.unpack(len(result)*'b', result)) + i = result.index(1) + result = result[i:] + result[0:i] + self.assertEqual(expected_result, result) + + self.tb.stop() + self.tb.wait() if __name__ == '__main__': gr_unittest.run(test_ctrlport_probes, "test_ctrlport_probes.xml") |