summaryrefslogtreecommitdiff
path: root/gr-uhd/apps/uhd_app.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-uhd/apps/uhd_app.py')
-rw-r--r--gr-uhd/apps/uhd_app.py127
1 files changed, 74 insertions, 53 deletions
diff --git a/gr-uhd/apps/uhd_app.py b/gr-uhd/apps/uhd_app.py
index 571518cb2a..f666e1e28f 100644
--- a/gr-uhd/apps/uhd_app.py
+++ b/gr-uhd/apps/uhd_app.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
#
# Copyright 2015-2016 Free Software Foundation, Inc.
#
@@ -28,7 +28,7 @@ LONG_TPL = """{prefix} Motherboard: {mb_id} ({mb_serial})
{prefix} Antenna: {ant}
"""
-class UHDApp(object):
+class UHDApp:
" Base class for simple UHD-based applications "
def __init__(self, prefix=None, args=None):
self.prefix = prefix
@@ -53,6 +53,8 @@ class UHDApp(object):
self.time_source = None
self.lo_source = None
self.lo_export = None
+ self.usrp = None
+ self.lo_source_channel = None
def vprint(self, *args):
"""
@@ -61,12 +63,7 @@ class UHDApp(object):
if self.verbose:
print("[{prefix}]".format(prefix=self.prefix), *args)
- def get_usrp_info_string(self,
- compact=False,
- tx_or_rx='rx',
- chan=0,
- mboard=0,
- ):
+ def get_usrp_info_string(self, compact=False, tx_or_rx='rx', chan=0, mboard=0):
"""
Return a nice textual description of the USRP we're using.
"""
@@ -175,7 +172,7 @@ class UHDApp(object):
for i, chan in enumerate(self.channels):
if not self.antenna[i] in self.usrp.get_antennas(i):
print("[ERROR] {} is not a valid antenna name for this USRP device!".format(self.antenna[i]))
- exit(1)
+ sys.exit(1)
self.usrp.set_antenna(self.antenna[i], i)
self.vprint("[{prefix}] Channel {chan}: Using antenna {ant}.".format(
prefix=self.prefix, chan=chan, ant=self.usrp.get_antenna(i)
@@ -203,22 +200,28 @@ class UHDApp(object):
self.usrp.set_lo_export_enabled(True, uhd.ALL_LOS, chan)
if lo_source == "internal":
self.lo_source_channel = chan
- tune_resp = self.usrp.set_center_freq(treq,chan)
- self.usrp.set_lo_source(lo_source, uhd.ALL_LOS,chan)
+ tune_resp = self.usrp.set_center_freq(treq, chan)
+ self.usrp.set_lo_source(lo_source, uhd.ALL_LOS, chan)
# Use lo source tune response to tune dsp_freq on remaining channels
if self.lo_source_channel is not None:
if getattr(args, 'lo_offset', None) is not None:
- treq = uhd.tune_request(target_freq=args.freq, rf_freq=args.freq+args.lo_offset, rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
- dsp_freq=tune_resp.actual_dsp_freq,
- dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
+ treq = uhd.tune_request(
+ target_freq=args.freq,
+ rf_freq=args.freq+args.lo_offset,
+ rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
+ dsp_freq=tune_resp.actual_dsp_freq,
+ dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
else:
- treq = uhd.tune_request(target_freq=args.freq, rf_freq=args.freg, rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
- dsp_freq=tune_resp.actual_dsp_freq,
- dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
+ treq = uhd.tune_request(
+ target_freq=args.freq,
+ rf_freq=args.freg,
+ rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
+ dsp_freq=tune_resp.actual_dsp_freq,
+ dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
for chan in args.channels:
if chan == self.lo_source_channel:
continue
- self.usrp.set_center_freq(treq,chan)
+ self.usrp.set_center_freq(treq, chan)
# Make sure tuning is synched:
command_time_set = False
if len(self.channels) > 1:
@@ -233,11 +236,11 @@ class UHDApp(object):
sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(prefix=self.prefix))
for i, chan in enumerate(self.channels):
self.tr = self.usrp.set_center_freq(treq, i)
- if self.tr == None:
- sys.stderr.write('[{prefix}] [ERROR] Failed to set center frequency on channel {chan}\n'.format(
- prefix=self.prefix, chan=chan
- ))
- exit(1)
+ if self.tr is None:
+ sys.stderr.write(
+ '[{prefix}] [ERROR] Failed to set center frequency on channel {chan}\n'
+ .format(prefix=self.prefix, chan=chan))
+ sys.exit(1)
if command_time_set:
for mb_idx in range(self.usrp.get_num_mboards()):
self.usrp.clear_command_time(mb_idx)
@@ -262,11 +265,11 @@ class UHDApp(object):
self.usrp.set_normalized_gain(.5, i)
if self.args.verbose:
self.vprint("Channel {chan} gain: {g} dB".format(
- prefix=self.prefix, chan=chan, g=self.usrp.get_gain(i)
+ chan=chan, g=self.usrp.get_gain(i)
))
else:
self.vprint("Setting gain to {g} dB.".format(g=gain))
- for chan in range( len( self.channels ) ):
+ for chan in range(len(self.channels)):
self.usrp.set_gain(gain, chan)
self.gain = self.usrp.get_gain(0)
@@ -284,18 +287,23 @@ class UHDApp(object):
if getattr(self, 'lo_source_channel', None) is not None:
tune_resp = self.usrp.set_center_freq(treq, self.lo_source_channel)
if getattr(self.args, 'lo_offset', None) is not None:
- treq = uhd.tune_request(target_freq=freq, rf_freq=freq+self.args.lo_offset, rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
- dsp_freq=tune_resp.actual_dsp_freq,
- dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
+ treq = uhd.tune_request(
+ target_freq=freq,
+ rf_freq=freq+self.args.lo_offset,
+ rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
+ dsp_freq=tune_resp.actual_dsp_freq,
+ dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
else:
- treq = uhd.tune_request(target_freq=freq, rf_freq=freq, rf_freq_policy=uhd.tune_reqest.POLICY_MANUAL,
- dsp_freq=tune_resp.actual_dsp_freq,
- dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
+ treq = uhd.tune_request(
+ target_freq=freq,
+ rf_freq=freq,
+ rf_freq_policy=uhd.tune_reqest.POLICY_MANUAL,
+ dsp_freq=tune_resp.actual_dsp_freq,
+ dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
for chan in self.channels:
if chan == self.lo_source_channel:
continue
- self.usrp.set_center_freq(treq,chan)
-
+ self.usrp.set_center_freq(treq, chan)
# Make sure tuning is synched:
command_time_set = False
if len(self.channels) > 1 and not skip_sync:
@@ -306,13 +314,13 @@ class UHDApp(object):
command_time_set = True
except RuntimeError:
sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(prefix=self.prefix))
- for i, chan in enumerate(self.channels ):
+ for i, chan in enumerate(self.channels):
self.tr = self.usrp.set_center_freq(treq, i)
- if self.tr == None:
+ if self.tr is None:
sys.stderr.write('[{prefix}] [ERROR] Failed to set center frequency on channel {chan}\n'.format(
prefix=self.prefix, chan=chan
))
- exit(1)
+ sys.exit(1)
if command_time_set:
for mb_idx in range(self.usrp.get_num_mboards()):
self.usrp.clear_command_time(mb_idx)
@@ -342,42 +350,55 @@ class UHDApp(object):
raise argparse.ArgumentTypeError("Not a comma-separated list: {string}".format(string=string))
if parser is None:
parser = argparse.ArgumentParser(
- description=description,
+ description=description,
)
tx_or_rx = tx_or_rx.strip() + " "
group = parser.add_argument_group('USRP Arguments')
group.add_argument("-a", "--args", default="", help="UHD device address args")
- group.add_argument("--spec", help="Subdevice(s) of UHD device where appropriate. Use a comma-separated list to set different boards to different specs.")
+ group.add_argument(
+ "--spec",
+ help="Subdevice(s) of UHD device where appropriate. "
+ "Use a comma-separated list to set different boards to different specs.")
group.add_argument("-A", "--antenna", help="Select {xx}antenna(s) where appropriate".format(xx=tx_or_rx))
group.add_argument("-s", "--samp-rate", type=eng_arg.eng_float, default=1e6,
- help="Sample rate")
+ help="Sample rate")
group.add_argument("-g", "--gain", type=eng_arg.eng_float, default=None,
- help="Gain (default is midpoint)")
+ help="Gain (default is midpoint)")
group.add_argument("--gain-type", choices=('db', 'normalized'), default='db',
- help="Gain Type (applies to -g)")
+ help="Gain Type (applies to -g)")
+ group.add_argument("-p", "--power-ref", type=eng_arg.eng_float, default=None,
+ help="Reference power level (in dBm). "
+ "Not supported by all devices (see UHD manual). "
+ "Will fail if not supported. Precludes --gain. ")
if not skip_freq:
group.add_argument("-f", "--freq", type=eng_arg.eng_float, default=None, required=True,
- help="Set carrier frequency to FREQ",
- metavar="FREQ")
+ help="Set carrier frequency to FREQ",
+ metavar="FREQ")
group.add_argument("--lo-offset", type=eng_arg.eng_float, default=0.0,
- help="Set daughterboard LO offset to OFFSET [default=hw default]")
+ help="Set daughterboard LO offset to OFFSET [default=hw default]")
if allow_mimo:
group.add_argument("-c", "--channels", default=[0,], type=cslist,
- help="Select {xx} Channels".format(xx=tx_or_rx))
- group.add_argument("--lo-export", help="Set TwinRX LO export {None, True, False} for each channel with a comma-separated list. None skips a channel.")
- group.add_argument("--lo-source", help="Set TwinRX LO source {None, internal, companion, external} for each channel with a comma-separated list. None skips this channel. ")
+ help="Select {xx} Channels".format(xx=tx_or_rx))
+ group.add_argument(
+ "--lo-export",
+ help="Set TwinRX LO export {None, True, False} for each channel "
+ "with a comma-separated list. None skips a channel.")
+ group.add_argument(
+ "--lo-source",
+ help="Set TwinRX LO source {None, internal, companion, external} "
+ "for each channel with a comma-separated list. None skips this channel.")
group.add_argument("--otw-format", choices=['sc16', 'sc12', 'sc8'], default='sc16',
- help="Choose over-the-wire data format")
+ help="Choose over-the-wire data format")
group.add_argument("--stream-args", default="", help="Set additional stream arguments")
group.add_argument("-m", "--amplitude", type=eng_arg.eng_float, default=0.15,
- help="Set output amplitude to AMPL (0.0-1.0)", metavar="AMPL")
+ help="Set output amplitude to AMPL (0.0-1.0)", metavar="AMPL")
group.add_argument("-v", "--verbose", action="count", help="Use verbose console output")
group.add_argument("--show-async-msg", action="store_true",
- help="Show asynchronous message notifications from UHD")
+ help="Show asynchronous message notifications from UHD")
group.add_argument("--sync", choices=('default', 'pps', 'auto'),
- default='auto', help="Set to 'pps' to sync devices to PPS")
+ default='auto', help="Set to 'pps' to sync devices to PPS")
group.add_argument("--clock-source",
- help="Set the clock source; typically 'internal', 'external' or 'gpsdo'")
+ help="Set the clock source; typically 'internal', 'external' or 'gpsdo'")
group.add_argument("--time-source",
- help="Set the time source")
+ help="Set the time source")
return parser