summaryrefslogtreecommitdiff
path: root/gr-uhd/apps/uhd_app.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-uhd/apps/uhd_app.py')
-rw-r--r--gr-uhd/apps/uhd_app.py77
1 files changed, 48 insertions, 29 deletions
diff --git a/gr-uhd/apps/uhd_app.py b/gr-uhd/apps/uhd_app.py
index 65b35ac736..55c2a71d2f 100644
--- a/gr-uhd/apps/uhd_app.py
+++ b/gr-uhd/apps/uhd_app.py
@@ -18,7 +18,7 @@ from gnuradio import eng_arg
from gnuradio import uhd
from gnuradio import gr
-COMMAND_DELAY = .2 # Seconds
+COMMAND_DELAY = .2 # Seconds
COMPACT_TPL = "{mb_id} ({mb_serial}), {db_subdev} ({subdev}, {ant}{db_serial})"
LONG_TPL = """{prefix} Motherboard: {mb_id} ({mb_serial})
@@ -30,11 +30,13 @@ LONG_TPL = """{prefix} Motherboard: {mb_id} ({mb_serial})
# PyLint can't reliably detect C++ exports in modules, so let's disable that
# pylint: disable=no-member
+
class UHDApp:
GAIN_TYPE_GAIN = 'dB'
GAIN_TYPE_POWER = 'power_dbm'
" Base class for simple UHD-based applications "
+
def __init__(self, prefix=None, args=None):
self.prefix = prefix
self.args = args
@@ -42,7 +44,7 @@ class UHDApp:
if self.args.sync == 'auto' and len(self.args.channels) > 1:
self.args.sync = 'pps'
self.antenna = None
- self.gain_range = None # Can also be power range
+ self.gain_range = None # Can also be power range
self.samp_rate = None
self.has_lo_sensor = None
self.async_msgq = None
@@ -87,8 +89,10 @@ class UHDApp:
info_pp['mb_serial'] = usrp_info['mboard_serial']
if info_pp['mb_serial'] == "":
info_pp['mb_serial'] = "no serial"
- info_pp['db_subdev'] = usrp_info["{xx}_subdev_name".format(xx=tx_or_rx)]
- info_pp['db_serial'] = ", " + usrp_info["{xx}_serial".format(xx=tx_or_rx)]
+ info_pp['db_subdev'] = usrp_info["{xx}_subdev_name".format(
+ xx=tx_or_rx)]
+ info_pp['db_serial'] = ", " + \
+ usrp_info["{xx}_serial".format(xx=tx_or_rx)]
if info_pp['db_serial'] == "":
info_pp['db_serial'] = "no serial"
info_pp['subdev'] = self.usrp.get_subdev_spec(mboard)
@@ -173,11 +177,13 @@ class UHDApp:
self.samp_rate = self.usrp.get_samp_rate()
self.vprint("Using sampling rate: {rate}".format(rate=self.samp_rate))
# Set the antenna:
- self.antenna = self.normalize_sel("channels", "antenna", len(args.channels), args.antenna)
+ self.antenna = self.normalize_sel(
+ "channels", "antenna", len(args.channels), args.antenna)
if self.antenna is not None:
for i, chan in enumerate(self.channels):
if not self.antenna[i] in self.usrp.get_antennas(i):
- print("[ERROR] {} is not a valid antenna name for this USRP device!".format(self.antenna[i]))
+ print("[ERROR] {} is not a valid antenna name for this USRP device!".format(
+ self.antenna[i]))
sys.exit(1)
self.usrp.set_antenna(self.antenna[i], i)
self.vprint("[{prefix}] Channel {chan}: Using antenna {ant}.".format(
@@ -207,14 +213,16 @@ class UHDApp:
self.has_lo_sensor = 'lo_locked' in self.usrp.get_sensor_names()
# Set LO export and LO source operation
if (args.lo_export is not None) and (args.lo_source is not None):
- self.lo_source = self.normalize_sel("channels", "lo-source", len(self.channels), args.lo_source)
- self.lo_export = self.normalize_sel("channels", "lo-export", len(self.channels), args.lo_export)
+ self.lo_source = self.normalize_sel(
+ "channels", "lo-source", len(self.channels), args.lo_source)
+ self.lo_export = self.normalize_sel(
+ "channels", "lo-export", len(self.channels), args.lo_export)
self.lo_source_channel = None
for chan, lo_source, lo_export in zip(self.channels, self.lo_source, self.lo_export):
if (lo_source == "None") or (lo_export == "None"):
continue
if lo_export == "True":
- #If channel is LO source set frequency and store response
+ # If channel is LO source set frequency and store response
self.usrp.set_lo_export_enabled(True, uhd.ALL_LOS, chan)
if lo_source == "internal":
self.lo_source_channel = chan
@@ -225,7 +233,7 @@ class UHDApp:
if getattr(args, 'lo_offset', None) is not None:
treq = uhd.tune_request(
target_freq=args.freq,
- rf_freq=args.freq+args.lo_offset,
+ rf_freq=args.freq + args.lo_offset,
rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
dsp_freq=tune_resp.actual_dsp_freq,
dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
@@ -251,7 +259,8 @@ class UHDApp:
self.usrp.set_command_time(cmd_time, mb_idx)
command_time_set = True
except RuntimeError:
- sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(prefix=self.prefix))
+ sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(
+ prefix=self.prefix))
for i, chan in enumerate(self.channels):
self.tr = self.usrp.set_center_freq(treq, i)
if self.tr is None:
@@ -268,7 +277,8 @@ class UHDApp:
if args.show_async_msg:
self.async_msgq = gr.msg_queue(0)
self.async_src = uhd.amsg_source("", self.async_msgq)
- self.async_rcv = uhd.msgq_runner(self.async_msgq, self.async_callback)
+ self.async_rcv = uhd.msgq_runner(
+ self.async_msgq, self.async_callback)
def set_gain(self, gain):
"""
@@ -278,7 +288,8 @@ class UHDApp:
"""
if gain is None:
if self.args.verbose:
- self.vprint("Defaulting to mid-point gains:".format(prefix=self.prefix))
+ self.vprint(
+ "Defaulting to mid-point gains:".format(prefix=self.prefix))
for i, chan in enumerate(self.channels):
self.usrp.set_normalized_gain(.5, i)
if self.args.verbose:
@@ -304,7 +315,8 @@ class UHDApp:
"""
Safely tune all channels to freq.
"""
- self.vprint("Tuning all channels to {freq} MHz.".format(freq=freq / 1e6))
+ self.vprint(
+ "Tuning all channels to {freq} MHz.".format(freq=freq / 1e6))
# Set frequency (tune request takes lo_offset):
if hasattr(self.args, 'lo_offset') and self.args.lo_offset is not None:
treq = uhd.tune_request(freq, self.args.lo_offset)
@@ -316,7 +328,7 @@ class UHDApp:
if getattr(self.args, 'lo_offset', None) is not None:
treq = uhd.tune_request(
target_freq=freq,
- rf_freq=freq+self.args.lo_offset,
+ rf_freq=freq + self.args.lo_offset,
rf_freq_policy=uhd.tune_request.POLICY_MANUAL,
dsp_freq=tune_resp.actual_dsp_freq,
dsp_freq_policy=uhd.tune_request.POLICY_MANUAL)
@@ -340,7 +352,8 @@ class UHDApp:
self.usrp.set_command_time(cmd_time, mb_idx)
command_time_set = True
except RuntimeError:
- sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(prefix=self.prefix))
+ sys.stderr.write('[{prefix}] [WARNING] Failed to set command times.\n'.format(
+ prefix=self.prefix))
for i, chan in enumerate(self.channels):
self.tr = self.usrp.set_center_freq(treq, i)
if self.tr is None:
@@ -354,16 +367,17 @@ class UHDApp:
self.vprint("Syncing channels...".format(prefix=self.prefix))
time.sleep(COMMAND_DELAY)
self.freq = self.usrp.get_center_freq(0)
- self.vprint("First channel has freq: {freq} MHz.".format(freq=self.freq / 1e6))
+ self.vprint("First channel has freq: {freq} MHz.".format(
+ freq=self.freq / 1e6))
@staticmethod
def setup_argparser(
- parser=None,
- description='USRP App',
- allow_mimo=True,
- tx_or_rx="",
- skip_freq=False,
- ):
+ parser=None,
+ description='USRP App',
+ allow_mimo=True,
+ tx_or_rx="",
+ skip_freq=False,
+ ):
"""
Create or amend an argument parser with typical USRP options.
"""
@@ -374,19 +388,22 @@ class UHDApp:
try:
return [int(x.strip()) for x in string.split(",")]
except ValueError:
- raise argparse.ArgumentTypeError("Not a comma-separated list: {string}".format(string=string))
+ raise argparse.ArgumentTypeError(
+ "Not a comma-separated list: {string}".format(string=string))
if parser is None:
parser = argparse.ArgumentParser(
description=description,
)
tx_or_rx = tx_or_rx.strip() + " "
group = parser.add_argument_group('USRP Arguments')
- group.add_argument("-a", "--args", default="", help="UHD device address args")
+ group.add_argument("-a", "--args", default="",
+ help="UHD device address args")
group.add_argument(
"--spec",
help="Subdevice(s) of UHD device where appropriate. "
"Use a comma-separated list to set different boards to different specs.")
- group.add_argument("-A", "--antenna", help="Select {xx}antenna(s) where appropriate".format(xx=tx_or_rx))
+ group.add_argument(
+ "-A", "--antenna", help="Select {xx}antenna(s) where appropriate".format(xx=tx_or_rx))
group.add_argument("-s", "--samp-rate", type=eng_arg.eng_float, default=1e6,
help="Sample rate")
group.add_argument("-g", "--gain", type=eng_arg.eng_float, default=None,
@@ -403,7 +420,7 @@ class UHDApp:
group.add_argument("--lo-offset", type=eng_arg.eng_float, default=0.0,
help="Set daughterboard LO offset to OFFSET [default=hw default]")
if allow_mimo:
- group.add_argument("-c", "--channels", default=[0,], type=cslist,
+ group.add_argument("-c", "--channels", default=[0, ], type=cslist,
help="Select {xx} Channels".format(xx=tx_or_rx))
group.add_argument(
"--lo-export",
@@ -415,8 +432,10 @@ class UHDApp:
"for each channel with a comma-separated list. None skips this channel.")
group.add_argument("--otw-format", choices=['sc16', 'sc12', 'sc8'], default='sc16',
help="Choose over-the-wire data format")
- group.add_argument("--stream-args", default="", help="Set additional stream arguments")
- group.add_argument("-v", "--verbose", action="count", help="Use verbose console output")
+ group.add_argument("--stream-args", default="",
+ help="Set additional stream arguments")
+ group.add_argument("-v", "--verbose", action="count",
+ help="Use verbose console output")
group.add_argument("--show-async-msg", action="store_true",
help="Show asynchronous message notifications from UHD")
group.add_argument("--sync", choices=('default', 'pps', 'auto'),