#!/usr/bin/env python # # Copyright 2013,2015 Free Software Foundation, Inc. # # This file is part of GNU Radio # # GNU Radio is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # GNU Radio is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # import sys, time, random, numpy from gnuradio import gr, gr_unittest, blocks import os, struct, re from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient class test_ctrlport_probes(gr_unittest.TestCase): def setUp(self): os.environ['GR_CONF_CONTROLPORT_ON'] = 'True' self.tb = gr.top_block() def tearDown(self): self.tb = None def test_001(self): data = range(1,9) self.src = blocks.vector_source_c(data, True) self.probe = blocks.ctrlport_probe2_c("samples","Complex", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() # Probes return complex values as list of floats with re, im # Imaginary parts of this data set are 0. expected_result = [1, 2, 3, 4, 5, 6, 7, 8] # Make sure we have time for flowgraph to run time.sleep(0.1) # Get available endpoint ep = gr.rpcmanager_get().endpoints()[0] hostname = re.search("-h (\S+|\d+\.\d+\.\d+\.\d+)", ep).group(1) portnum = re.search("-p (\d+)", ep).group(1) argv = [None, hostname, portnum] # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient radiosys = GNURadioControlPortClient(argv=argv, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs ret = radio.getKnobs([probe_name + "::samples"]) for name in ret.keys(): # Get data in probe, which might be offset; find the # beginning and unwrap. result = ret[name].value i = result.index(complex(1.0, 0.0)) result = result[i:] + result[0:i] self.assertComplexTuplesAlmostEqual(expected_result, result, 4) self.tb.stop() self.tb.wait() def test_002(self): data = range(1,9) self.src = blocks.vector_source_f(data, True) self.probe = blocks.ctrlport_probe2_f("samples","Floats", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] # Make sure we have time for flowgraph to run time.sleep(0.1) # Get available endpoint ep = gr.rpcmanager_get().endpoints()[0] hostname = re.search("-h (\S+|\d+\.\d+\.\d+\.\d+)", ep).group(1) portnum = re.search("-p (\d+)", ep).group(1) argv = [None, hostname, portnum] # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient radiosys = GNURadioControlPortClient(argv=argv, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs ret = radio.getKnobs([probe_name + "::samples"]) for name in ret.keys(): # Get data in probe, which might be offset; find the # beginning and unwrap. result = ret[name].value i = result.index(1.0) result = result[i:] + result[0:i] self.assertEqual(expected_result, result) self.tb.stop() self.tb.wait() def test_003(self): data = range(1,9) self.src = blocks.vector_source_i(data, True) self.probe = blocks.ctrlport_probe2_i("samples","Integers", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] # Make sure we have time for flowgraph to run time.sleep(0.1) # Get available endpoint ep = gr.rpcmanager_get().endpoints()[0] hostname = re.search("-h (\S+|\d+\.\d+\.\d+\.\d+)", ep).group(1) portnum = re.search("-p (\d+)", ep).group(1) argv = [None, hostname, portnum] # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient radiosys = GNURadioControlPortClient(argv=argv, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs ret = radio.getKnobs([probe_name + "::samples"]) for name in ret.keys(): # Get data in probe, which might be offset; find the # beginning and unwrap. result = ret[name].value i = result.index(1.0) result = result[i:] + result[0:i] self.assertEqual(expected_result, result) self.tb.stop() self.tb.wait() def test_004(self): data = range(1,9) self.src = blocks.vector_source_s(data, True) self.probe = blocks.ctrlport_probe2_s("samples","Shorts", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] # Make sure we have time for flowgraph to run time.sleep(0.1) # Get available endpoint ep = gr.rpcmanager_get().endpoints()[0] hostname = re.search("-h (\S+|\d+\.\d+\.\d+\.\d+)", ep).group(1) portnum = re.search("-p (\d+)", ep).group(1) argv = [None, hostname, portnum] # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient radiosys = GNURadioControlPortClient(argv=argv, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs ret = radio.getKnobs([probe_name + "::samples"]) for name in ret.keys(): # Get data in probe, which might be offset; find the # beginning and unwrap. result = ret[name].value i = result.index(1.0) result = result[i:] + result[0:i] self.assertEqual(expected_result, result) self.tb.stop() self.tb.wait() def test_005(self): data = range(1,9) self.src = blocks.vector_source_b(data, True) self.probe = blocks.ctrlport_probe2_b("samples","Bytes", len(data), gr.DISPNULL) probe_name = self.probe.alias() self.tb.connect(self.src, self.probe) self.tb.start() expected_result = [1, 2, 3, 4, 5, 6, 7, 8,] # Make sure we have time for flowgraph to run time.sleep(0.1) # Get available endpoint ep = gr.rpcmanager_get().endpoints()[0] hostname = re.search("-h (\S+|\d+\.\d+\.\d+\.\d+)", ep).group(1) portnum = re.search("-p (\d+)", ep).group(1) argv = [None, hostname, portnum] # Initialize a simple ControlPort client from endpoint from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient radiosys = GNURadioControlPortClient(argv=argv, rpcmethod='thrift') radio = radiosys.client # Get all exported knobs ret = radio.getKnobs([probe_name + "::samples"]) for name in ret.keys(): # Get data in probe, which might be offset; find the # beginning and unwrap. result = ret[name].value result = list(struct.unpack(len(result)*'b', result)) i = result.index(1) result = result[i:] + result[0:i] self.assertEqual(expected_result, result) self.tb.stop() self.tb.wait() if __name__ == '__main__': gr_unittest.run(test_ctrlport_probes, "test_ctrlport_probes.xml")