diff options
Diffstat (limited to 'gr-uhd/apps/uhd_app.py')
-rw-r--r-- | gr-uhd/apps/uhd_app.py | 77 |
1 files changed, 48 insertions, 29 deletions
diff --git a/gr-uhd/apps/uhd_app.py b/gr-uhd/apps/uhd_app.py index 65b35ac736..55c2a71d2f 100644 --- a/gr-uhd/apps/uhd_app.py +++ b/gr-uhd/apps/uhd_app.py @@ -18,7 +18,7 @@ from gnuradio import eng_arg from gnuradio import uhd from gnuradio import gr -COMMAND_DELAY = .2 # Seconds +COMMAND_DELAY = .2 # Seconds COMPACT_TPL = "{mb_id} ({mb_serial}), {db_subdev} ({subdev}, {ant}{db_serial})" LONG_TPL = """{prefix} Motherboard: {mb_id} ({mb_serial}) @@ -30,11 +30,13 @@ LONG_TPL = """{prefix} Motherboard: {mb_id} ({mb_serial}) # PyLint can't reliably detect C++ exports in modules, so let's disable that # pylint: disable=no-member + class UHDApp: GAIN_TYPE_GAIN = 'dB' GAIN_TYPE_POWER = 'power_dbm' " Base class for simple UHD-based applications " + def __init__(self, prefix=None, args=None): self.prefix = prefix self.args = args @@ -42,7 +44,7 @@ class UHDApp: if self.args.sync == 'auto' and len(self.args.channels) > 1: self.args.sync = 'pps' self.antenna = None - self.gain_range = None # Can also be power range + self.gain_range = None # Can also be power range self.samp_rate = None self.has_lo_sensor = None self.async_msgq = None @@ -87,8 +89,10 @@ class UHDApp: info_pp['mb_serial'] = usrp_info['mboard_serial'] if info_pp['mb_serial'] == "": info_pp['mb_serial'] = "no serial" - info_pp['db_subdev'] = usrp_info["{xx}_subdev_name".format(xx=tx_or_rx)] - info_pp['db_serial'] = ", " + usrp_info["{xx}_serial".format(xx=tx_or_rx)] + info_pp['db_subdev'] = usrp_info["{xx}_subdev_name".format( + xx=tx_or_rx)] + info_pp['db_serial'] = ", " + \ + usrp_info["{xx}_serial".format(xx=tx_or_rx)] if info_pp['db_serial'] == "": info_pp['db_serial'] = "no serial" info_pp['subdev'] = self.usrp.get_subdev_spec(mboard) @@ -173,11 +177,13 @@ class UHDApp: self.samp_rate = self.usrp.get_samp_rate() self.vprint("Using sampling rate: {rate}".format(rate=self.samp_rate)) # Set the antenna: - self.antenna = self.normalize_sel("channels", "antenna", len(args.channels), args.antenna) + self.antenna = self.normalize_sel( + "channels", "antenna", len(args.channels), args.antenna) if self.antenna is not None: for i, chan in enumerate(self.channels): if not self.antenna[i] in self.usrp.get_antennas(i): - print("[ERROR] {} is not a valid antenna name for this USRP device!".format(self.antenna[i])) + print("[ERROR] {} is not a valid antenna name for this USRP device!".format( + self.antenna[i])) sys.exit(1) self.usrp.set_antenna(self.antenna[i], i) self.vprint("[{prefix}] Channel {chan}: Using antenna {ant}.".format( @@ -207,14 +213,16 @@ class UHDApp: self.has_lo_sensor = 'lo_locked' in self.usrp.get_sensor_names() # Set LO export and LO source operation if (args.lo_export is not None) and (args.lo_source is not None): - self.lo_source = self.normalize_sel("channels", "lo-source", len(self.channels), args.lo_source) - self.lo_export = self.normalize_sel("channels", "lo-export", len(self.channels), args.lo_export) + self.lo_source = self.normalize_sel( + "channels", "lo-source", len(self.channels), args.lo_source) + self.lo_export = self.normalize_sel( + "channels", "lo-export", len(self.channels), args.lo_export) self.lo_source_channel = None for chan, lo_source, lo_export in zip(self.channels, self.lo_source, self.lo_export): if (lo_source == "None") or (lo_export == "None"): continue if lo_export == "True": - #If channel is LO source set frequency and store response + # If channel is LO source set frequency and store response self.usrp.set_lo_export_enabled(True, uhd.ALL_LOS, chan) if lo_source == "internal": self.lo_source_channel = chan @@ -225,7 +233,7 @@ class UHDApp: if getattr(args, 'lo_offset', None) is not None: treq = uhd.tune_request( target_freq=args.freq, - rf_freq=args.freq+args.lo_offset, + rf_freq=args.freq + args.lo_offset, rf_freq_policy=uhd.tune_request.POLICY_MANUAL, dsp_freq=tune_resp.actual_dsp_freq, dsp_freq_policy=uhd.tune_request.POLICY_MANUAL) @@ -251,7 +259,8 @@ class UHDApp: self.usrp.set_command_time(cmd_time, mb_idx) command_time_set = True except RuntimeError: - sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(prefix=self.prefix)) + sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format( + prefix=self.prefix)) for i, chan in enumerate(self.channels): self.tr = self.usrp.set_center_freq(treq, i) if self.tr is None: @@ -268,7 +277,8 @@ class UHDApp: if args.show_async_msg: self.async_msgq = gr.msg_queue(0) self.async_src = uhd.amsg_source("", self.async_msgq) - self.async_rcv = uhd.msgq_runner(self.async_msgq, self.async_callback) + self.async_rcv = uhd.msgq_runner( + self.async_msgq, self.async_callback) def set_gain(self, gain): """ @@ -278,7 +288,8 @@ class UHDApp: """ if gain is None: if self.args.verbose: - self.vprint("Defaulting to mid-point gains:".format(prefix=self.prefix)) + self.vprint( + "Defaulting to mid-point gains:".format(prefix=self.prefix)) for i, chan in enumerate(self.channels): self.usrp.set_normalized_gain(.5, i) if self.args.verbose: @@ -304,7 +315,8 @@ class UHDApp: """ Safely tune all channels to freq. """ - self.vprint("Tuning all channels to {freq} MHz.".format(freq=freq / 1e6)) + self.vprint( + "Tuning all channels to {freq} MHz.".format(freq=freq / 1e6)) # Set frequency (tune request takes lo_offset): if hasattr(self.args, 'lo_offset') and self.args.lo_offset is not None: treq = uhd.tune_request(freq, self.args.lo_offset) @@ -316,7 +328,7 @@ class UHDApp: if getattr(self.args, 'lo_offset', None) is not None: treq = uhd.tune_request( target_freq=freq, - rf_freq=freq+self.args.lo_offset, + rf_freq=freq + self.args.lo_offset, rf_freq_policy=uhd.tune_request.POLICY_MANUAL, dsp_freq=tune_resp.actual_dsp_freq, dsp_freq_policy=uhd.tune_request.POLICY_MANUAL) @@ -340,7 +352,8 @@ class UHDApp: self.usrp.set_command_time(cmd_time, mb_idx) command_time_set = True except RuntimeError: - sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(prefix=self.prefix)) + sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format( + prefix=self.prefix)) for i, chan in enumerate(self.channels): self.tr = self.usrp.set_center_freq(treq, i) if self.tr is None: @@ -354,16 +367,17 @@ class UHDApp: self.vprint("Syncing channels...".format(prefix=self.prefix)) time.sleep(COMMAND_DELAY) self.freq = self.usrp.get_center_freq(0) - self.vprint("First channel has freq: {freq} MHz.".format(freq=self.freq / 1e6)) + self.vprint("First channel has freq: {freq} MHz.".format( + freq=self.freq / 1e6)) @staticmethod def setup_argparser( - parser=None, - description='USRP App', - allow_mimo=True, - tx_or_rx="", - skip_freq=False, - ): + parser=None, + description='USRP App', + allow_mimo=True, + tx_or_rx="", + skip_freq=False, + ): """ Create or amend an argument parser with typical USRP options. """ @@ -374,19 +388,22 @@ class UHDApp: try: return [int(x.strip()) for x in string.split(",")] except ValueError: - raise argparse.ArgumentTypeError("Not a comma-separated list: {string}".format(string=string)) + raise argparse.ArgumentTypeError( + "Not a comma-separated list: {string}".format(string=string)) if parser is None: parser = argparse.ArgumentParser( description=description, ) tx_or_rx = tx_or_rx.strip() + " " group = parser.add_argument_group('USRP Arguments') - group.add_argument("-a", "--args", default="", help="UHD device address args") + group.add_argument("-a", "--args", default="", + help="UHD device address args") group.add_argument( "--spec", help="Subdevice(s) of UHD device where appropriate. " "Use a comma-separated list to set different boards to different specs.") - group.add_argument("-A", "--antenna", help="Select {xx}antenna(s) where appropriate".format(xx=tx_or_rx)) + group.add_argument( + "-A", "--antenna", help="Select {xx}antenna(s) where appropriate".format(xx=tx_or_rx)) group.add_argument("-s", "--samp-rate", type=eng_arg.eng_float, default=1e6, help="Sample rate") group.add_argument("-g", "--gain", type=eng_arg.eng_float, default=None, @@ -403,7 +420,7 @@ class UHDApp: group.add_argument("--lo-offset", type=eng_arg.eng_float, default=0.0, help="Set daughterboard LO offset to OFFSET [default=hw default]") if allow_mimo: - group.add_argument("-c", "--channels", default=[0,], type=cslist, + group.add_argument("-c", "--channels", default=[0, ], type=cslist, help="Select {xx} Channels".format(xx=tx_or_rx)) group.add_argument( "--lo-export", @@ -415,8 +432,10 @@ class UHDApp: "for each channel with a comma-separated list. None skips this channel.") group.add_argument("--otw-format", choices=['sc16', 'sc12', 'sc8'], default='sc16', help="Choose over-the-wire data format") - group.add_argument("--stream-args", default="", help="Set additional stream arguments") - group.add_argument("-v", "--verbose", action="count", help="Use verbose console output") + group.add_argument("--stream-args", default="", + help="Set additional stream arguments") + group.add_argument("-v", "--verbose", action="count", + help="Use verbose console output") group.add_argument("--show-async-msg", action="store_true", help="Show asynchronous message notifications from UHD") group.add_argument("--sync", choices=('default', 'pps', 'auto'), |