summaryrefslogtreecommitdiff
path: root/gr-uhd/apps/uhd_siggen_base.py
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@ettus.com>2015-06-28 00:23:16 -0700
committerMartin Braun <martin.braun@ettus.com>2015-09-24 15:51:44 -0700
commit5fe360c6ad5e6b50b00380962a1fbe0b5eaeca34 (patch)
tree997e54a313108fb4d8a6fc97d5f8d5af13ad320b /gr-uhd/apps/uhd_siggen_base.py
parent2e7800ff6f8dbf16775b8682c5297417e460fa67 (diff)
uhd: Updated uhd_siggen_gui (uses QT), added GRC siggen example
- Added uhd_app.py base class for example apps - uhd_siggen and uhd_siggen_gui now both use uhd_app - siggen now also multi-channel capabilities - siggen_gui fully QT, no more WX
Diffstat (limited to 'gr-uhd/apps/uhd_siggen_base.py')
-rw-r--r--gr-uhd/apps/uhd_siggen_base.py351
1 files changed, 121 insertions, 230 deletions
diff --git a/gr-uhd/apps/uhd_siggen_base.py b/gr-uhd/apps/uhd_siggen_base.py
index 503f49b481..31415dac41 100644
--- a/gr-uhd/apps/uhd_siggen_base.py
+++ b/gr-uhd/apps/uhd_siggen_base.py
@@ -1,6 +1,6 @@
-#!/usr/bin/env python
+#!/usr/bin/env python2
#
-# Copyright 2008,2009,2011,2012 Free Software Foundation, Inc.
+# Copyright 2008,2009,2011,2012,2015 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -19,6 +19,9 @@
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
+"""
+Provide a base flow graph for USRP signal generators.
+"""
DESC_KEY = 'desc'
SAMP_RATE_KEY = 'samp_rate'
@@ -36,44 +39,69 @@ FREQ_RANGE_KEY = 'freq_range'
GAIN_RANGE_KEY = 'gain_range'
TYPE_KEY = 'type'
-def setter(ps, key, val): ps[key] = val
+# FIXME figure out if this can be deleted
+#def setter(ps, key, val): ps[key] = val
-from gnuradio import gr, gru, uhd, eng_notation
+import sys
+import math
+import argparse
+try:
+ from uhd_app import UHDApp
+except ImportError:
+ from gnuradio.uhd.uhd_app import UHDApp
+from gnuradio import gr, gru, uhd, eng_notation, eng_arg
from gnuradio import analog
from gnuradio import blocks
from gnuradio.gr.pubsub import pubsub
from gnuradio.eng_option import eng_option
from optparse import OptionParser
-import sys
-import math
n2s = eng_notation.num_to_str
-waveforms = { analog.GR_SIN_WAVE : "Complex Sinusoid",
- analog.GR_CONST_WAVE : "Constant",
- analog.GR_GAUSSIAN : "Gaussian Noise",
- analog.GR_UNIFORM : "Uniform Noise",
- "2tone" : "Two Tone",
- "sweep" : "Sweep" }
-
-#
-# GUI-unaware GNU Radio flowgraph. This may be used either with command
-# line applications or GUI applications.
-#
-class top_block(gr.top_block, pubsub):
- def __init__(self, options, args):
+waveforms = {
+ analog.GR_CONST_WAVE : "Constant",
+ analog.GR_SIN_WAVE : "Complex Sinusoid",
+ analog.GR_GAUSSIAN : "Gaussian Noise",
+ analog.GR_UNIFORM : "Uniform Noise",
+ "2tone" : "Two Tone",
+ "sweep" : "Sweep",
+}
+
+class USRPSiggen(gr.top_block, pubsub, UHDApp):
+ """
+ GUI-unaware GNU Radio flowgraph. This may be used either with command
+ line applications or GUI applications.
+ """
+ def __init__(self, args):
gr.top_block.__init__(self)
pubsub.__init__(self)
- self._verbose = options.verbose
-
- #initialize values from options
- self._setup_usrpx(options)
- self[SAMP_RATE_KEY] = options.samp_rate
- self[TX_FREQ_KEY] = options.tx_freq
- self[AMPLITUDE_KEY] = options.amplitude
- self[WAVEFORM_FREQ_KEY] = options.waveform_freq
- self[WAVEFORM_OFFSET_KEY] = options.offset
- self[WAVEFORM2_FREQ_KEY] = options.waveform2_freq
+ UHDApp.__init__(self, args=args, prefix="UHD-SIGGEN")
+ self.extra_sink = None
+
+ # Initialize device:
+ self.setup_usrp(
+ ctor=uhd.usrp_sink,
+ args=args,
+ )
+ print("[UHD-SIGGEN] UHD Signal Generator")
+ print("[UHD-SIGGEN] UHD Version: {ver}".format(ver=uhd.get_version_string()))
+ print("[UHD-SIGGEN] Using USRP configuration:")
+ print(self.get_usrp_info_string(tx_or_rx="tx"))
+ self.usrp_description = self.get_usrp_info_string(tx_or_rx="tx", compact=True)
+
+ ### Set subscribers and publishers:
+ self.publish(SAMP_RATE_KEY, lambda: self.usrp.get_samp_rate())
+ self.publish(DESC_KEY, lambda: self.usrp_description)
+ self.publish(FREQ_RANGE_KEY, lambda: self.usrp.get_freq_range(self.channels[0]))
+ self.publish(GAIN_RANGE_KEY, lambda: self.usrp.get_gain_range(self.channels[0]))
+ self.publish(GAIN_KEY, lambda: self.usrp.get_gain(self.channels[0]))
+
+ self[SAMP_RATE_KEY] = args.samp_rate
+ self[TX_FREQ_KEY] = args.freq
+ self[AMPLITUDE_KEY] = args.amplitude
+ self[WAVEFORM_FREQ_KEY] = args.waveform_freq
+ self[WAVEFORM_OFFSET_KEY] = args.offset
+ self[WAVEFORM2_FREQ_KEY] = args.waveform2_freq
self[DSP_FREQ_KEY] = 0
self[RF_FREQ_KEY] = 0
@@ -91,84 +119,15 @@ class top_block(gr.top_block, pubsub):
AMPLITUDE_KEY, WAVEFORM_FREQ_KEY,
WAVEFORM_OFFSET_KEY, WAVEFORM2_FREQ_KEY):
self[key] = self[key]
- self[TYPE_KEY] = options.type #set type last
-
- def _setup_usrpx(self, options):
- self._u = uhd.usrp_sink(device_addr=options.args, stream_args=uhd.stream_args('fc32'))
- self._u.set_samp_rate(options.samp_rate)
-
- # Set the subdevice spec
- if(options.spec):
- self._u.set_subdev_spec(options.spec, 0)
-
- # Set the gain on the usrp from options
- if(options.gain):
- self._u.set_gain(options.gain)
-
- # Set the antenna
- if(options.antenna):
- self._u.set_antenna(options.antenna, 0)
+ self[TYPE_KEY] = args.type #set type last
- # Setup USRP Configuration value
- try:
- usrp_info = self._u.get_usrp_info()
- mboard_id = usrp_info["mboard_id"]
- mboard_serial = usrp_info["mboard_serial"]
- if mboard_serial == "":
- mboard_serial = "no serial"
- dboard_subdev_name = usrp_info["tx_subdev_name"]
- dboard_serial = usrp_info["tx_serial"]
- if dboard_serial == "":
- dboard_serial = "no serial"
- subdev = self._u.get_subdev_spec()
- antenna = self._u.get_antenna()
-
- desc_key_str = "Motherboard: %s [%s]\n" % (mboard_id, mboard_serial)
- if "B200" in mboard_id or "B210" in mboard_id:
- desc_key_str += "Daughterboard: %s\n" % dboard_subdev_name
- else:
- desc_key_str += "Daughterboard: %s [%s]\n" % (dboard_subdev_name, dboard_serial)
- desc_key_str += "Subdev: %s\n" % subdev
- desc_key_str += "Antenna: %s" % antenna
- except:
- desc_key_str = "USRP configuration output not implemented in this version"
-
- self.publish(DESC_KEY, lambda: desc_key_str)
- self.publish(FREQ_RANGE_KEY, self._u.get_freq_range)
- self.publish(GAIN_RANGE_KEY, self._u.get_gain_range)
- self.publish(GAIN_KEY, self._u.get_gain)
-
- print "UHD Signal Generator"
- print "Version: %s" % uhd.get_version_string()
- print "\nUsing USRP configuration:"
- print desc_key_str + "\n"
-
- # Direct asynchronous notifications to callback function
- if options.show_async_msg:
- self.async_msgq = gr.msg_queue(0)
- self.async_src = uhd.amsg_source("", self.async_msgq)
- self.async_rcv = gru.msgq_runner(self.async_msgq, self.async_callback)
-
- def async_callback(self, msg):
- md = self.async_src.msg_to_async_metadata_t(msg)
- print "Channel: %i Time: %f Event: %i" % (md.channel, md.time_spec.get_real_secs(), md.event_code)
-
- def _set_tx_amplitude(self, ampl):
+ def set_samp_rate(self, sr):
"""
- Sets the transmit amplitude sent to the USRP
-
- Args:
- ampl: the amplitude or None for automatic
+ When sampling rate is updated, also update the signal sources.
"""
- ampl_range = self[AMPL_RANGE_KEY]
- if ampl is None:
- ampl = (ampl_range[1] - ampl_range[0])*0.15 + ampl_range[0]
- self[AMPLITUDE_KEY] = max(ampl_range[0], min(ampl, ampl_range[1]))
-
- def set_samp_rate(self, sr):
- self._u.set_samp_rate(sr)
- sr = self._u.get_samp_rate()
-
+ self.vprint("Setting sampling rate to: {rate} Msps".format(rate=sr/1e6))
+ self.usrp.set_samp_rate(sr)
+ sr = self.usrp.get_samp_rate()
if self[TYPE_KEY] in (analog.GR_SIN_WAVE, analog.GR_CONST_WAVE):
self._src.set_sampling_freq(self[SAMP_RATE_KEY])
elif self[TYPE_KEY] == "2tone":
@@ -179,48 +138,9 @@ class top_block(gr.top_block, pubsub):
self._src2.set_sampling_freq(self[WAVEFORM_FREQ_KEY]*2*math.pi/self[SAMP_RATE_KEY])
else:
return True # Waveform not yet set
-
- if self._verbose:
- print "Set sample rate to:", sr
-
+ self.vprint("Set sample rate to: {rate} Msps".format(rate=sr/1e6))
return True
- def set_gain(self, gain):
- if gain is None:
- g = self[GAIN_RANGE_KEY]
- gain = float(g.start()+g.stop())/2
- if self._verbose:
- print "Using auto-calculated mid-point TX gain"
- self[GAIN_KEY] = gain
- return
- self._u.set_gain(gain)
- if self._verbose:
- print "Set TX gain to:", gain
-
- def set_freq(self, target_freq):
-
- if target_freq is None:
- f = self[FREQ_RANGE_KEY]
- target_freq = float(f.start()+f.stop())/2.0
- if self._verbose:
- print "Using auto-calculated mid-point frequency"
- self[TX_FREQ_KEY] = target_freq
- return
-
- tr = self._u.set_center_freq(target_freq)
- fs = "%sHz" % (n2s(target_freq),)
- if tr is not None:
- self._freq = target_freq
- self[DSP_FREQ_KEY] = tr.actual_dsp_freq
- self[RF_FREQ_KEY] = tr.actual_rf_freq
- if self._verbose:
- print "Set center frequency to", self._u.get_center_freq()
- print "Tx RF frequency: %sHz" % (n2s(tr.actual_rf_freq),)
- print "Tx DSP frequency: %sHz" % (n2s(tr.actual_dsp_freq),)
- elif self._verbose:
- print "Failed to set freq."
- return tr
-
def set_waveform_freq(self, freq):
if self[TYPE_KEY] == analog.GR_SIN_WAVE:
self._src.set_frequency(freq)
@@ -242,6 +162,7 @@ class top_block(gr.top_block, pubsub):
return True
def set_waveform(self, type):
+ self.vprint("Selecting waveform...")
self.lock()
self.disconnect_all()
if type == analog.GR_SIN_WAVE or type == analog.GR_CONST_WAVE:
@@ -258,9 +179,8 @@ class top_block(gr.top_block, pubsub):
self[WAVEFORM_FREQ_KEY],
self[AMPLITUDE_KEY]/2.0,
0)
- if(self[WAVEFORM2_FREQ_KEY] is None):
+ if self[WAVEFORM2_FREQ_KEY] is None:
self[WAVEFORM2_FREQ_KEY] = -self[WAVEFORM_FREQ_KEY]
-
self._src2 = analog.sig_source_c(self[SAMP_RATE_KEY],
analog.GR_SIN_WAVE,
self[WAVEFORM2_FREQ_KEY],
@@ -276,7 +196,6 @@ class top_block(gr.top_block, pubsub):
# will sweep from (rf_freq-waveform_freq/2) to (rf_freq+waveform_freq/2)
if self[WAVEFORM2_FREQ_KEY] is None:
self[WAVEFORM2_FREQ_KEY] = 0.1
-
self._src1 = analog.sig_source_f(self[SAMP_RATE_KEY],
analog.GR_TRI_WAVE,
self[WAVEFORM2_FREQ_KEY],
@@ -284,33 +203,33 @@ class top_block(gr.top_block, pubsub):
-0.5)
self._src2 = analog.frequency_modulator_fc(self[WAVEFORM_FREQ_KEY]*2*math.pi/self[SAMP_RATE_KEY])
self._src = blocks.multiply_const_cc(self[AMPLITUDE_KEY])
- self.connect(self._src1,self._src2,self._src)
+ self.connect(self._src1, self._src2, self._src)
else:
- raise RuntimeError("Unknown waveform type")
-
- self.connect(self._src, self._u)
+ raise RuntimeError("[UHD-SIGGEN] Unknown waveform type")
+ for c in xrange(len(self.channels)):
+ self.connect(self._src, (self.usrp, c))
+ if self.extra_sink is not None:
+ self.connect(self._src, self.extra_sink)
self.unlock()
-
- if self._verbose:
- print "Set baseband modulation to:", waveforms[type]
- if type == analog.GR_SIN_WAVE:
- print "Modulation frequency: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),)
- print "Initial phase:", self[WAVEFORM_OFFSET_KEY]
- elif type == "2tone":
- print "Tone 1: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),)
- print "Tone 2: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),)
- elif type == "sweep":
- print "Sweeping across %sHz to %sHz" % (n2s(-self[WAVEFORM_FREQ_KEY]/2.0),n2s(self[WAVEFORM_FREQ_KEY]/2.0))
- print "Sweep rate: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),)
- print "TX amplitude:", self[AMPLITUDE_KEY]
-
+ self.vprint("Set baseband modulation to:", waveforms[type])
+ if type == analog.GR_SIN_WAVE:
+ self.vprint("Modulation frequency: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),))
+ self.vprint("Initial phase:", self[WAVEFORM_OFFSET_KEY])
+ elif type == "2tone":
+ self.vprint("Tone 1: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),))
+ self.vprint("Tone 2: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),))
+ elif type == "sweep":
+ self.vprint("Sweeping across %sHz to %sHz" % (n2s(-self[WAVEFORM_FREQ_KEY]/2.0),n2s(self[WAVEFORM_FREQ_KEY]/2.0)))
+ self.vprint("Sweep rate: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),))
+ self.vprint("TX amplitude:", self[AMPLITUDE_KEY])
def set_amplitude(self, amplitude):
+ """
+ amplitude subscriber
+ """
if amplitude < 0.0 or amplitude > 1.0:
- if self._verbose:
- print "Amplitude out of range:", amplitude
+ self.vprint("Amplitude out of range:", amplitude)
return False
-
if self[TYPE_KEY] in (analog.GR_SIN_WAVE, analog.GR_CONST_WAVE, analog.GR_GAUSSIAN, analog.GR_UNIFORM):
self._src.set_amplitude(amplitude)
elif self[TYPE_KEY] == "2tone":
@@ -320,83 +239,55 @@ class top_block(gr.top_block, pubsub):
self._src.set_k(amplitude)
else:
return True # Waveform not yet set
-
- if self._verbose:
- print "Set amplitude to:", amplitude
+ self.vprint("Set amplitude to:", amplitude)
return True
-def get_options():
- usage="%prog: [options]"
- parser = OptionParser(option_class=eng_option, usage=usage)
- parser.add_option("-a", "--args", type="string", default="",
- help="UHD device address args , [default=%default]")
- parser.add_option("", "--spec", type="string", default=None,
- help="Subdevice of UHD device where appropriate")
- parser.add_option("-A", "--antenna", type="string", default=None,
- help="select Rx Antenna where appropriate")
- parser.add_option("-s", "--samp-rate", type="eng_float", default=1e6,
- help="set sample rate (bandwidth) [default=%default]")
- parser.add_option("-g", "--gain", type="eng_float", default=None,
- help="set gain in dB (default is midpoint)")
- parser.add_option("-f", "--tx-freq", type="eng_float", default=None,
- help="Set carrier frequency to FREQ [default=mid-point]",
- metavar="FREQ")
- parser.add_option("-x", "--waveform-freq", type="eng_float", default=0,
- help="Set baseband waveform frequency to FREQ [default=%default]")
- parser.add_option("-y", "--waveform2-freq", type="eng_float", default=None,
- help="Set 2nd waveform frequency to FREQ [default=%default]")
- parser.add_option("--sine", dest="type", action="store_const", const=analog.GR_SIN_WAVE,
+def setup_argparser():
+ """
+ Create argument parser for signal generator.
+ """
+ parser = UHDApp.setup_argparser(
+ description="USRP Signal Generator.",
+ tx_or_rx="Tx",
+ )
+ group = parser.add_argument_group('Siggen Arguments')
+ group.add_argument("-x", "--waveform-freq", type=eng_arg.eng_float, default=0.0,
+ help="Set baseband waveform frequency to FREQ")
+ group.add_argument("-y", "--waveform2-freq", type=eng_arg.eng_float, default=0.0,
+ help="Set 2nd waveform frequency to FREQ")
+ group.add_argument("--sine", dest="type", action="store_const", const=analog.GR_SIN_WAVE,
help="Generate a carrier modulated by a complex sine wave",
default=analog.GR_SIN_WAVE)
- parser.add_option("--const", dest="type", action="store_const", const=analog.GR_CONST_WAVE,
+ group.add_argument("--const", dest="type", action="store_const", const=analog.GR_CONST_WAVE,
help="Generate a constant carrier")
- parser.add_option("--offset", type="eng_float", default=0,
- help="Set waveform phase offset to OFFSET [default=%default]")
- parser.add_option("--gaussian", dest="type", action="store_const", const=analog.GR_GAUSSIAN,
+ group.add_argument("--offset", type=eng_arg.eng_float, default=0,
+ help="Set waveform phase offset to OFFSET", metavar="OFFSET")
+ group.add_argument("--gaussian", dest="type", action="store_const", const=analog.GR_GAUSSIAN,
help="Generate Gaussian random output")
- parser.add_option("--uniform", dest="type", action="store_const", const=analog.GR_UNIFORM,
+ group.add_argument("--uniform", dest="type", action="store_const", const=analog.GR_UNIFORM,
help="Generate Uniform random output")
- parser.add_option("--2tone", dest="type", action="store_const", const="2tone",
+ group.add_argument("--2tone", dest="type", action="store_const", const="2tone",
help="Generate Two Tone signal for IMD testing")
- parser.add_option("--sweep", dest="type", action="store_const", const="sweep",
+ group.add_argument("--sweep", dest="type", action="store_const", const="sweep",
help="Generate a swept sine wave")
- parser.add_option("", "--amplitude", type="eng_float", default=0.15,
- help="Set output amplitude to AMPL (0.0-1.0) [default=%default]",
- metavar="AMPL")
- parser.add_option("-v", "--verbose", action="store_true", default=False,
- help="Use verbose console output [default=%default]")
- parser.add_option("", "--show-async-msg", action="store_true", default=False,
- help="Show asynchronous message notifications from UHD [default=%default]")
-
- (options, args) = parser.parse_args()
-
- return (options, args)
+ return parser
-# If this script is executed, the following runs. If it is imported,
-# the below does not run.
-def test_main():
+def main():
if gr.enable_realtime_scheduling() != gr.RT_OK:
- print "Note: failed to enable realtime scheduling, continuing"
-
- # Grab command line options and create top block
+ print("Note: failed to enable realtime scheduling, continuing")
+ # Grab command line args and create top block
try:
- (options, args) = get_options()
- tb = top_block(options, args)
-
- except RuntimeError, e:
- print e
- sys.exit(1)
-
+ parser = setup_argparser()
+ args = parser.parse_args()
+ tb = USRPSiggen(args)
+ except RuntimeError as e:
+ print(e)
+ exit(1)
tb.start()
- raw_input('Press Enter to quit: ')
+ raw_input('[UHD-SIGGEN] Press Enter to quit:\n')
tb.stop()
tb.wait()
-# Make sure to create the top block (tb) within a function:
-# That code in main will allow tb to go out of scope on return,
-# which will call the decontructor on usrp and stop transmit.
-# Whats odd is that grc works fine with tb in the __main__,
-# perhaps its because the try/except clauses around tb.
if __name__ == "__main__":
- test_main()
+ main()