diff options
author | Martin Braun <martin.braun@ettus.com> | 2015-06-28 00:23:16 -0700 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2015-09-24 15:51:44 -0700 |
commit | 5fe360c6ad5e6b50b00380962a1fbe0b5eaeca34 (patch) | |
tree | 997e54a313108fb4d8a6fc97d5f8d5af13ad320b /gr-uhd/apps/uhd_siggen_base.py | |
parent | 2e7800ff6f8dbf16775b8682c5297417e460fa67 (diff) |
uhd: Updated uhd_siggen_gui (uses QT), added GRC siggen example
- Added uhd_app.py base class for example apps
- uhd_siggen and uhd_siggen_gui now both use uhd_app
- siggen now also multi-channel capabilities
- siggen_gui fully QT, no more WX
Diffstat (limited to 'gr-uhd/apps/uhd_siggen_base.py')
-rw-r--r-- | gr-uhd/apps/uhd_siggen_base.py | 351 |
1 files changed, 121 insertions, 230 deletions
diff --git a/gr-uhd/apps/uhd_siggen_base.py b/gr-uhd/apps/uhd_siggen_base.py index 503f49b481..31415dac41 100644 --- a/gr-uhd/apps/uhd_siggen_base.py +++ b/gr-uhd/apps/uhd_siggen_base.py @@ -1,6 +1,6 @@ -#!/usr/bin/env python +#!/usr/bin/env python2 # -# Copyright 2008,2009,2011,2012 Free Software Foundation, Inc. +# Copyright 2008,2009,2011,2012,2015 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -19,6 +19,9 @@ # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # +""" +Provide a base flow graph for USRP signal generators. +""" DESC_KEY = 'desc' SAMP_RATE_KEY = 'samp_rate' @@ -36,44 +39,69 @@ FREQ_RANGE_KEY = 'freq_range' GAIN_RANGE_KEY = 'gain_range' TYPE_KEY = 'type' -def setter(ps, key, val): ps[key] = val +# FIXME figure out if this can be deleted +#def setter(ps, key, val): ps[key] = val -from gnuradio import gr, gru, uhd, eng_notation +import sys +import math +import argparse +try: + from uhd_app import UHDApp +except ImportError: + from gnuradio.uhd.uhd_app import UHDApp +from gnuradio import gr, gru, uhd, eng_notation, eng_arg from gnuradio import analog from gnuradio import blocks from gnuradio.gr.pubsub import pubsub from gnuradio.eng_option import eng_option from optparse import OptionParser -import sys -import math n2s = eng_notation.num_to_str -waveforms = { analog.GR_SIN_WAVE : "Complex Sinusoid", - analog.GR_CONST_WAVE : "Constant", - analog.GR_GAUSSIAN : "Gaussian Noise", - analog.GR_UNIFORM : "Uniform Noise", - "2tone" : "Two Tone", - "sweep" : "Sweep" } - -# -# GUI-unaware GNU Radio flowgraph. This may be used either with command -# line applications or GUI applications. -# -class top_block(gr.top_block, pubsub): - def __init__(self, options, args): +waveforms = { + analog.GR_CONST_WAVE : "Constant", + analog.GR_SIN_WAVE : "Complex Sinusoid", + analog.GR_GAUSSIAN : "Gaussian Noise", + analog.GR_UNIFORM : "Uniform Noise", + "2tone" : "Two Tone", + "sweep" : "Sweep", +} + +class USRPSiggen(gr.top_block, pubsub, UHDApp): + """ + GUI-unaware GNU Radio flowgraph. This may be used either with command + line applications or GUI applications. + """ + def __init__(self, args): gr.top_block.__init__(self) pubsub.__init__(self) - self._verbose = options.verbose - - #initialize values from options - self._setup_usrpx(options) - self[SAMP_RATE_KEY] = options.samp_rate - self[TX_FREQ_KEY] = options.tx_freq - self[AMPLITUDE_KEY] = options.amplitude - self[WAVEFORM_FREQ_KEY] = options.waveform_freq - self[WAVEFORM_OFFSET_KEY] = options.offset - self[WAVEFORM2_FREQ_KEY] = options.waveform2_freq + UHDApp.__init__(self, args=args, prefix="UHD-SIGGEN") + self.extra_sink = None + + # Initialize device: + self.setup_usrp( + ctor=uhd.usrp_sink, + args=args, + ) + print("[UHD-SIGGEN] UHD Signal Generator") + print("[UHD-SIGGEN] UHD Version: {ver}".format(ver=uhd.get_version_string())) + print("[UHD-SIGGEN] Using USRP configuration:") + print(self.get_usrp_info_string(tx_or_rx="tx")) + self.usrp_description = self.get_usrp_info_string(tx_or_rx="tx", compact=True) + + ### Set subscribers and publishers: + self.publish(SAMP_RATE_KEY, lambda: self.usrp.get_samp_rate()) + self.publish(DESC_KEY, lambda: self.usrp_description) + self.publish(FREQ_RANGE_KEY, lambda: self.usrp.get_freq_range(self.channels[0])) + self.publish(GAIN_RANGE_KEY, lambda: self.usrp.get_gain_range(self.channels[0])) + self.publish(GAIN_KEY, lambda: self.usrp.get_gain(self.channels[0])) + + self[SAMP_RATE_KEY] = args.samp_rate + self[TX_FREQ_KEY] = args.freq + self[AMPLITUDE_KEY] = args.amplitude + self[WAVEFORM_FREQ_KEY] = args.waveform_freq + self[WAVEFORM_OFFSET_KEY] = args.offset + self[WAVEFORM2_FREQ_KEY] = args.waveform2_freq self[DSP_FREQ_KEY] = 0 self[RF_FREQ_KEY] = 0 @@ -91,84 +119,15 @@ class top_block(gr.top_block, pubsub): AMPLITUDE_KEY, WAVEFORM_FREQ_KEY, WAVEFORM_OFFSET_KEY, WAVEFORM2_FREQ_KEY): self[key] = self[key] - self[TYPE_KEY] = options.type #set type last - - def _setup_usrpx(self, options): - self._u = uhd.usrp_sink(device_addr=options.args, stream_args=uhd.stream_args('fc32')) - self._u.set_samp_rate(options.samp_rate) - - # Set the subdevice spec - if(options.spec): - self._u.set_subdev_spec(options.spec, 0) - - # Set the gain on the usrp from options - if(options.gain): - self._u.set_gain(options.gain) - - # Set the antenna - if(options.antenna): - self._u.set_antenna(options.antenna, 0) + self[TYPE_KEY] = args.type #set type last - # Setup USRP Configuration value - try: - usrp_info = self._u.get_usrp_info() - mboard_id = usrp_info["mboard_id"] - mboard_serial = usrp_info["mboard_serial"] - if mboard_serial == "": - mboard_serial = "no serial" - dboard_subdev_name = usrp_info["tx_subdev_name"] - dboard_serial = usrp_info["tx_serial"] - if dboard_serial == "": - dboard_serial = "no serial" - subdev = self._u.get_subdev_spec() - antenna = self._u.get_antenna() - - desc_key_str = "Motherboard: %s [%s]\n" % (mboard_id, mboard_serial) - if "B200" in mboard_id or "B210" in mboard_id: - desc_key_str += "Daughterboard: %s\n" % dboard_subdev_name - else: - desc_key_str += "Daughterboard: %s [%s]\n" % (dboard_subdev_name, dboard_serial) - desc_key_str += "Subdev: %s\n" % subdev - desc_key_str += "Antenna: %s" % antenna - except: - desc_key_str = "USRP configuration output not implemented in this version" - - self.publish(DESC_KEY, lambda: desc_key_str) - self.publish(FREQ_RANGE_KEY, self._u.get_freq_range) - self.publish(GAIN_RANGE_KEY, self._u.get_gain_range) - self.publish(GAIN_KEY, self._u.get_gain) - - print "UHD Signal Generator" - print "Version: %s" % uhd.get_version_string() - print "\nUsing USRP configuration:" - print desc_key_str + "\n" - - # Direct asynchronous notifications to callback function - if options.show_async_msg: - self.async_msgq = gr.msg_queue(0) - self.async_src = uhd.amsg_source("", self.async_msgq) - self.async_rcv = gru.msgq_runner(self.async_msgq, self.async_callback) - - def async_callback(self, msg): - md = self.async_src.msg_to_async_metadata_t(msg) - print "Channel: %i Time: %f Event: %i" % (md.channel, md.time_spec.get_real_secs(), md.event_code) - - def _set_tx_amplitude(self, ampl): + def set_samp_rate(self, sr): """ - Sets the transmit amplitude sent to the USRP - - Args: - ampl: the amplitude or None for automatic + When sampling rate is updated, also update the signal sources. """ - ampl_range = self[AMPL_RANGE_KEY] - if ampl is None: - ampl = (ampl_range[1] - ampl_range[0])*0.15 + ampl_range[0] - self[AMPLITUDE_KEY] = max(ampl_range[0], min(ampl, ampl_range[1])) - - def set_samp_rate(self, sr): - self._u.set_samp_rate(sr) - sr = self._u.get_samp_rate() - + self.vprint("Setting sampling rate to: {rate} Msps".format(rate=sr/1e6)) + self.usrp.set_samp_rate(sr) + sr = self.usrp.get_samp_rate() if self[TYPE_KEY] in (analog.GR_SIN_WAVE, analog.GR_CONST_WAVE): self._src.set_sampling_freq(self[SAMP_RATE_KEY]) elif self[TYPE_KEY] == "2tone": @@ -179,48 +138,9 @@ class top_block(gr.top_block, pubsub): self._src2.set_sampling_freq(self[WAVEFORM_FREQ_KEY]*2*math.pi/self[SAMP_RATE_KEY]) else: return True # Waveform not yet set - - if self._verbose: - print "Set sample rate to:", sr - + self.vprint("Set sample rate to: {rate} Msps".format(rate=sr/1e6)) return True - def set_gain(self, gain): - if gain is None: - g = self[GAIN_RANGE_KEY] - gain = float(g.start()+g.stop())/2 - if self._verbose: - print "Using auto-calculated mid-point TX gain" - self[GAIN_KEY] = gain - return - self._u.set_gain(gain) - if self._verbose: - print "Set TX gain to:", gain - - def set_freq(self, target_freq): - - if target_freq is None: - f = self[FREQ_RANGE_KEY] - target_freq = float(f.start()+f.stop())/2.0 - if self._verbose: - print "Using auto-calculated mid-point frequency" - self[TX_FREQ_KEY] = target_freq - return - - tr = self._u.set_center_freq(target_freq) - fs = "%sHz" % (n2s(target_freq),) - if tr is not None: - self._freq = target_freq - self[DSP_FREQ_KEY] = tr.actual_dsp_freq - self[RF_FREQ_KEY] = tr.actual_rf_freq - if self._verbose: - print "Set center frequency to", self._u.get_center_freq() - print "Tx RF frequency: %sHz" % (n2s(tr.actual_rf_freq),) - print "Tx DSP frequency: %sHz" % (n2s(tr.actual_dsp_freq),) - elif self._verbose: - print "Failed to set freq." - return tr - def set_waveform_freq(self, freq): if self[TYPE_KEY] == analog.GR_SIN_WAVE: self._src.set_frequency(freq) @@ -242,6 +162,7 @@ class top_block(gr.top_block, pubsub): return True def set_waveform(self, type): + self.vprint("Selecting waveform...") self.lock() self.disconnect_all() if type == analog.GR_SIN_WAVE or type == analog.GR_CONST_WAVE: @@ -258,9 +179,8 @@ class top_block(gr.top_block, pubsub): self[WAVEFORM_FREQ_KEY], self[AMPLITUDE_KEY]/2.0, 0) - if(self[WAVEFORM2_FREQ_KEY] is None): + if self[WAVEFORM2_FREQ_KEY] is None: self[WAVEFORM2_FREQ_KEY] = -self[WAVEFORM_FREQ_KEY] - self._src2 = analog.sig_source_c(self[SAMP_RATE_KEY], analog.GR_SIN_WAVE, self[WAVEFORM2_FREQ_KEY], @@ -276,7 +196,6 @@ class top_block(gr.top_block, pubsub): # will sweep from (rf_freq-waveform_freq/2) to (rf_freq+waveform_freq/2) if self[WAVEFORM2_FREQ_KEY] is None: self[WAVEFORM2_FREQ_KEY] = 0.1 - self._src1 = analog.sig_source_f(self[SAMP_RATE_KEY], analog.GR_TRI_WAVE, self[WAVEFORM2_FREQ_KEY], @@ -284,33 +203,33 @@ class top_block(gr.top_block, pubsub): -0.5) self._src2 = analog.frequency_modulator_fc(self[WAVEFORM_FREQ_KEY]*2*math.pi/self[SAMP_RATE_KEY]) self._src = blocks.multiply_const_cc(self[AMPLITUDE_KEY]) - self.connect(self._src1,self._src2,self._src) + self.connect(self._src1, self._src2, self._src) else: - raise RuntimeError("Unknown waveform type") - - self.connect(self._src, self._u) + raise RuntimeError("[UHD-SIGGEN] Unknown waveform type") + for c in xrange(len(self.channels)): + self.connect(self._src, (self.usrp, c)) + if self.extra_sink is not None: + self.connect(self._src, self.extra_sink) self.unlock() - - if self._verbose: - print "Set baseband modulation to:", waveforms[type] - if type == analog.GR_SIN_WAVE: - print "Modulation frequency: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),) - print "Initial phase:", self[WAVEFORM_OFFSET_KEY] - elif type == "2tone": - print "Tone 1: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),) - print "Tone 2: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),) - elif type == "sweep": - print "Sweeping across %sHz to %sHz" % (n2s(-self[WAVEFORM_FREQ_KEY]/2.0),n2s(self[WAVEFORM_FREQ_KEY]/2.0)) - print "Sweep rate: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),) - print "TX amplitude:", self[AMPLITUDE_KEY] - + self.vprint("Set baseband modulation to:", waveforms[type]) + if type == analog.GR_SIN_WAVE: + self.vprint("Modulation frequency: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),)) + self.vprint("Initial phase:", self[WAVEFORM_OFFSET_KEY]) + elif type == "2tone": + self.vprint("Tone 1: %sHz" % (n2s(self[WAVEFORM_FREQ_KEY]),)) + self.vprint("Tone 2: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),)) + elif type == "sweep": + self.vprint("Sweeping across %sHz to %sHz" % (n2s(-self[WAVEFORM_FREQ_KEY]/2.0),n2s(self[WAVEFORM_FREQ_KEY]/2.0))) + self.vprint("Sweep rate: %sHz" % (n2s(self[WAVEFORM2_FREQ_KEY]),)) + self.vprint("TX amplitude:", self[AMPLITUDE_KEY]) def set_amplitude(self, amplitude): + """ + amplitude subscriber + """ if amplitude < 0.0 or amplitude > 1.0: - if self._verbose: - print "Amplitude out of range:", amplitude + self.vprint("Amplitude out of range:", amplitude) return False - if self[TYPE_KEY] in (analog.GR_SIN_WAVE, analog.GR_CONST_WAVE, analog.GR_GAUSSIAN, analog.GR_UNIFORM): self._src.set_amplitude(amplitude) elif self[TYPE_KEY] == "2tone": @@ -320,83 +239,55 @@ class top_block(gr.top_block, pubsub): self._src.set_k(amplitude) else: return True # Waveform not yet set - - if self._verbose: - print "Set amplitude to:", amplitude + self.vprint("Set amplitude to:", amplitude) return True -def get_options(): - usage="%prog: [options]" - parser = OptionParser(option_class=eng_option, usage=usage) - parser.add_option("-a", "--args", type="string", default="", - help="UHD device address args , [default=%default]") - parser.add_option("", "--spec", type="string", default=None, - help="Subdevice of UHD device where appropriate") - parser.add_option("-A", "--antenna", type="string", default=None, - help="select Rx Antenna where appropriate") - parser.add_option("-s", "--samp-rate", type="eng_float", default=1e6, - help="set sample rate (bandwidth) [default=%default]") - parser.add_option("-g", "--gain", type="eng_float", default=None, - help="set gain in dB (default is midpoint)") - parser.add_option("-f", "--tx-freq", type="eng_float", default=None, - help="Set carrier frequency to FREQ [default=mid-point]", - metavar="FREQ") - parser.add_option("-x", "--waveform-freq", type="eng_float", default=0, - help="Set baseband waveform frequency to FREQ [default=%default]") - parser.add_option("-y", "--waveform2-freq", type="eng_float", default=None, - help="Set 2nd waveform frequency to FREQ [default=%default]") - parser.add_option("--sine", dest="type", action="store_const", const=analog.GR_SIN_WAVE, +def setup_argparser(): + """ + Create argument parser for signal generator. + """ + parser = UHDApp.setup_argparser( + description="USRP Signal Generator.", + tx_or_rx="Tx", + ) + group = parser.add_argument_group('Siggen Arguments') + group.add_argument("-x", "--waveform-freq", type=eng_arg.eng_float, default=0.0, + help="Set baseband waveform frequency to FREQ") + group.add_argument("-y", "--waveform2-freq", type=eng_arg.eng_float, default=0.0, + help="Set 2nd waveform frequency to FREQ") + group.add_argument("--sine", dest="type", action="store_const", const=analog.GR_SIN_WAVE, help="Generate a carrier modulated by a complex sine wave", default=analog.GR_SIN_WAVE) - parser.add_option("--const", dest="type", action="store_const", const=analog.GR_CONST_WAVE, + group.add_argument("--const", dest="type", action="store_const", const=analog.GR_CONST_WAVE, help="Generate a constant carrier") - parser.add_option("--offset", type="eng_float", default=0, - help="Set waveform phase offset to OFFSET [default=%default]") - parser.add_option("--gaussian", dest="type", action="store_const", const=analog.GR_GAUSSIAN, + group.add_argument("--offset", type=eng_arg.eng_float, default=0, + help="Set waveform phase offset to OFFSET", metavar="OFFSET") + group.add_argument("--gaussian", dest="type", action="store_const", const=analog.GR_GAUSSIAN, help="Generate Gaussian random output") - parser.add_option("--uniform", dest="type", action="store_const", const=analog.GR_UNIFORM, + group.add_argument("--uniform", dest="type", action="store_const", const=analog.GR_UNIFORM, help="Generate Uniform random output") - parser.add_option("--2tone", dest="type", action="store_const", const="2tone", + group.add_argument("--2tone", dest="type", action="store_const", const="2tone", help="Generate Two Tone signal for IMD testing") - parser.add_option("--sweep", dest="type", action="store_const", const="sweep", + group.add_argument("--sweep", dest="type", action="store_const", const="sweep", help="Generate a swept sine wave") - parser.add_option("", "--amplitude", type="eng_float", default=0.15, - help="Set output amplitude to AMPL (0.0-1.0) [default=%default]", - metavar="AMPL") - parser.add_option("-v", "--verbose", action="store_true", default=False, - help="Use verbose console output [default=%default]") - parser.add_option("", "--show-async-msg", action="store_true", default=False, - help="Show asynchronous message notifications from UHD [default=%default]") - - (options, args) = parser.parse_args() - - return (options, args) + return parser -# If this script is executed, the following runs. If it is imported, -# the below does not run. -def test_main(): +def main(): if gr.enable_realtime_scheduling() != gr.RT_OK: - print "Note: failed to enable realtime scheduling, continuing" - - # Grab command line options and create top block + print("Note: failed to enable realtime scheduling, continuing") + # Grab command line args and create top block try: - (options, args) = get_options() - tb = top_block(options, args) - - except RuntimeError, e: - print e - sys.exit(1) - + parser = setup_argparser() + args = parser.parse_args() + tb = USRPSiggen(args) + except RuntimeError as e: + print(e) + exit(1) tb.start() - raw_input('Press Enter to quit: ') + raw_input('[UHD-SIGGEN] Press Enter to quit:\n') tb.stop() tb.wait() -# Make sure to create the top block (tb) within a function: -# That code in main will allow tb to go out of scope on return, -# which will call the decontructor on usrp and stop transmit. -# Whats odd is that grc works fine with tb in the __main__, -# perhaps its because the try/except clauses around tb. if __name__ == "__main__": - test_main() + main() |