#!/usr/bin/env python # # Copyright 2012,2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # # SPDX-License-Identifier: GPL-3.0-or-later # # from PyQt5 import QtCore, Qt from argparse import ArgumentParser import os import sys import time import struct from gnuradio import gr, ctrlport from gnuradio.ctrlport.GrDataPlotter import * from gnuradio.ctrlport.GNURadioControlPortClient import GNURadioControlPortClient class RateDialog(Qt.QDialog): def __init__(self, delay, parent=None): super(RateDialog, self).__init__(parent) self.gridLayout = Qt.QGridLayout(self) self.setWindowTitle("Update Delay (ms)") self.delay = Qt.QLineEdit(self) self.delay.setText(str(delay)) self.buttonBox = Qt.QDialogButtonBox( Qt.QDialogButtonBox.Ok | Qt.QDialogButtonBox.Cancel) self.gridLayout.addWidget(self.delay) self.gridLayout.addWidget(self.buttonBox) self.buttonBox.accepted.connect(self.accept) self.buttonBox.rejected.connect(self.reject) def accept(self): self.done(1) def reject(self): self.done(0) class MAINWindow(Qt.QMainWindow): def minimumSizeHint(self): return Qt.QSize(800, 600) def __init__(self, radioclient): super(MAINWindow, self).__init__() self.updateRate = 1000 self.conns = [] self.plots = [] self.knobprops = [] self.mdiArea = Qt.QMdiArea() self.mdiArea.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded) self.mdiArea.setVerticalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded) self.setCentralWidget(self.mdiArea) self.mdiArea.subWindowActivated.connect(self.updateMenus) self.windowMapper = QtCore.QSignalMapper(self) self.windowMapper.mapped[Qt.QWidget].connect(self.setActiveSubWindow) self.createActions() self.createMenus() self.createToolBars() self.createStatusBar() self.updateMenus() self.setWindowTitle("GNU Radio Control Port Monitor") self.setUnifiedTitleAndToolBarOnMac(True) self.newCon(radioclient) icon = Qt.QIcon(ctrlport.__path__[0] + "/icon.png") self.setWindowIcon(icon) # Locally turn off ControlPort export from GR. This prevents # our GR-based plotters from launching their own ControlPort # instance (and possibly causing a port collision if one has # been specified). os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' def setUpdateRate(self, nur): self.updateRate = int(nur) for c in self.conns: c.updateRate = self.updateRate c.timer.setInterval(self.updateRate) def newCon(self, radioclient=None): child = MForm(radioclient, len(self.conns), parent=self) if(child.radioclient is not None): child.setWindowTitle(str(child.radioclient)) self.mdiArea.addSubWindow(child) child.show() self.mdiArea.currentSubWindow().showMaximized() self.conns.append(child) self.plots.append([]) def propertiesMenu(self, key, radio, uid): title = str(radio) props = radio.properties([key]) pmin, pmax = get_minmax(props[key]) # Use display option mask of item to set up available plot # types and default options. disp = self.knobprops[uid][key].display cplx = bool(disp & gr.DISPOPTCPLX | disp & gr.DISPXY) strip = bool(disp & gr.DISPOPTSTRIP) stem = bool(disp & gr.DISPOPTSTEM) log = bool(disp & gr.DISPOPTLOG) scatter = bool(disp & gr.DISPOPTSCATTER) def newUpdaterProxy(): self.newUpdater(key, radio) def newPlotterFProxy(): self.newPlotF(key, uid, title, pmin, pmax, log, strip, stem) def newPlotterCProxy(): self.newPlotC(key, uid, title, pmin, pmax, log, strip, stem) def newPlotterConstProxy(): self.newPlotConst(key, uid, title, pmin, pmax, scatter, strip) def newPlotterPsdFProxy(): self.newPlotPsdF(key, uid, title) def newPlotterPsdCProxy(): self.newPlotPsdC(key, uid, title) def newPlotterRasterFProxy(): self.newPlotRasterF(key, uid, title, pmin, pmax) def newPlotterRasterBProxy(): self.newPlotRasterB(key, uid, title, pmin, pmax) menu = Qt.QMenu(self) menu.setTitle("Item Actions") menu.setTearOffEnabled(False) # object properties menu.addAction("Properties", newUpdaterProxy) # displays available if(cplx == 0): menu.addAction("Plot Time", newPlotterFProxy) menu.addAction("Plot PSD", newPlotterPsdFProxy) menu.addAction("Plot Raster (real)", newPlotterRasterFProxy) #menu.addAction("Plot Raster (bits)", newPlotterRasterBProxy) else: menu.addAction("Plot Time", newPlotterCProxy) menu.addAction("Plot PSD", newPlotterPsdCProxy) menu.addAction("Plot Constellation", newPlotterConstProxy) menu.popup(Qt.QCursor.pos()) def newUpdater(self, key, radio): updater = UpdaterWindow(key, radio, None) updater.setWindowTitle("Updater: " + key) updater.setModal(False) updater.exec_() def newSub(self, e): tag = str(e.text(0)) tree = e.treeWidget().parent() uid = tree.uid knobprop = self.knobprops[uid][tag] strr = str(tree.radioclient) title = strr # title = "{0}:{1}".format(r[3], r[5]) pmin, pmax = get_minmax(knobprop) disp = knobprop.display if(disp & gr.DISPTIME): strip = bool(disp & gr.DISPOPTSTRIP) stem = bool(disp & gr.DISPOPTSTEM) log = bool(disp & gr.DISPOPTLOG) if(disp & gr.DISPOPTCPLX == 0): self.newPlotF(tag, uid, title, pmin, pmax, log, strip, stem) else: self.newPlotC(tag, uid, title, pmin, pmax, log, strip, stem) elif(disp & gr.DISPXY): scatter = disp & gr.DISPOPTSCATTER self.newPlotConst(tag, uid, title, pmin, pmax, scatter) elif(disp & gr.DISPPSD): if(disp & gr.DISPOPTCPLX == 0): self.newPlotPsdF(tag, uid, title) else: self.newPlotPsdC(tag, uid, title) def startDrag(self, e): drag = Qt.QDrag(self) mime_data = QtCore.QMimeData() tag = str(e.text(0)) tree = e.treeWidget().parent() knobprop = self.knobprops[tree.uid][tag] disp = knobprop.display iscomplex = (disp & gr.DISPOPTCPLX) or (disp & gr.DISPXY) if(disp != gr.DISPNULL): data = "PlotData:::{0}:::{1}".format(tag, iscomplex) else: data = "OtherData:::{0}:::{1}".format(tag, iscomplex) mime_data.setText(data) drag.setMimeData(mime_data) drop = drag.exec() def createPlot(self, plot, uid, title): plot.start() self.plots[uid].append(plot) self.mdiArea.addSubWindow(plot) plot.setWindowTitle("{0}: {1}".format(title, plot.name())) plot.qwidget().destroyed.connect(lambda obj=None, plot=plot: self.destroyPlot(plot=plot)), # when the plot is updated via drag-and-drop, we need to be # notified of the new qwidget that's created so we can # properly destroy it. plot.plotupdated.connect(self.plotUpdated) plot.show() def plotUpdated(self, q): # the plot has been updated with a new qwidget; make sure this # gets dies to the destroyPlot function. for i, plots in enumerate(self.plots): for p in plots: if(p == q): # plots.remove(p) # plots.append(q) q.qwidget().destroyed.connect(lambda obj=None, plot=p: self.destroyPlot(plot=plot)) break def destroyPlot(self, plot): for plots in self.plots: for p in plots: if p == plot: plots.remove(p) break def newPlotConst(self, tag, uid, title="", pmin=None, pmax=None, scatter=False, stripchart=False): plot = GrDataPlotterConst(tag, 32e6, pmin, pmax, stripchart) plot.scatter(scatter) self.createPlot(plot, uid, title) def newPlotF(self, tag, uid, title="", pmin=None, pmax=None, logy=False, stripchart=False, stem=False): plot = GrDataPlotterF(tag, 32e6, pmin, pmax, stripchart) plot.semilogy(logy) plot.stem(stem) self.createPlot(plot, uid, title) def newPlotC(self, tag, uid, title="", pmin=None, pmax=None, logy=False, stripchart=False, stem=False): plot = GrDataPlotterC(tag, 32e6, pmin, pmax, stripchart) plot.semilogy(logy) plot.stem(stem) self.createPlot(plot, uid, title) def newPlotPsdF(self, tag, uid, title="", pmin=None, pmax=None): plot = GrDataPlotterPsdF(tag, 32e6, pmin, pmax) self.createPlot(plot, uid, title) def newPlotPsdC(self, tag, uid, title="", pmin=None, pmax=None): plot = GrDataPlotterPsdC(tag, 32e6, pmin, pmax) self.createPlot(plot, uid, title) def newPlotRasterF(self, tag, uid, title="", pmin=None, pmax=None): plot = GrTimeRasterF(tag, 32e6, pmin, pmax) self.createPlot(plot, uid, title) def newPlotRasterB(self, tag, uid, title="", pmin=None, pmax=None): plot = GrTimeRasterB(tag, 32e6, pmin, pmax) self.createPlot(plot, uid, title) def update(self, knobs, uid): #sys.stderr.write("KNOB KEYS: {0}\n".format(knobs.keys())) for plot in self.plots[uid]: data = [] for n in plot.knobnames: d = knobs[n].value # If it's a byte stream, Python thinks it's a string. # Unpack and convert to floats for plotting. if(type(d) == str and n.find('probe2_b') == 0): d = struct.unpack(len(d) * 'b', d) d = [float(di) for di in d] data.append(d) plot.update(data) plot.stop() plot.wait() plot.start() def setActiveSubWindow(self, window): if window: self.mdiArea.setActiveSubWindow(window) def createActions(self): self.newConAct = Qt.QAction("&New Connection", self, shortcut=Qt.QKeySequence.New, statusTip="Create a new file", triggered=lambda x: self.newCon(None)) self.exitAct = Qt.QAction("E&xit", self, shortcut="Ctrl+Q", statusTip="Exit the application", triggered=Qt.qApp.closeAllWindows) self.closeAct = Qt.QAction("Cl&ose", self, shortcut="Ctrl+F4", statusTip="Close the active window", triggered=self.mdiArea.closeActiveSubWindow) self.closeAllAct = Qt.QAction("Close &All", self, statusTip="Close all the windows", triggered=self.mdiArea.closeAllSubWindows) self.urAct = Qt.QAction("Update Rate", self, shortcut="F5", statusTip="Change Update Rate", triggered=self.updateRateShow) qks = Qt.QKeySequence(QtCore.Qt.CTRL + QtCore.Qt.Key_T) self.tileAct = Qt.QAction("&Tile", self, statusTip="Tile the windows", triggered=self.mdiArea.tileSubWindows, shortcut=qks) qks = Qt.QKeySequence(QtCore.Qt.CTRL + QtCore.Qt.Key_C) self.cascadeAct = Qt.QAction("&Cascade", self, statusTip="Cascade the windows", shortcut=qks, triggered=self.mdiArea.cascadeSubWindows) self.nextAct = Qt.QAction("Ne&xt", self, shortcut=Qt.QKeySequence.NextChild, statusTip="Move the focus to the next window", triggered=self.mdiArea.activateNextSubWindow) self.previousAct = Qt.QAction("Pre&vious", self, shortcut=Qt.QKeySequence.PreviousChild, statusTip="Move the focus to the previous window", triggered=self.mdiArea.activatePreviousSubWindow) self.separatorAct = Qt.QAction(self) self.separatorAct.setSeparator(True) self.aboutAct = Qt.QAction("&About", self, statusTip="Show the application's About box", triggered=self.about) self.aboutQtAct = Qt.QAction("About &Qt", self, statusTip="Show the Qt library's About box", triggered=Qt.qApp.aboutQt) def createMenus(self): self.fileMenu = self.menuBar().addMenu("&File") self.fileMenu.addAction(self.newConAct) self.fileMenu.addAction(self.urAct) self.fileMenu.addSeparator() self.fileMenu.addAction(self.exitAct) self.windowMenu = self.menuBar().addMenu("&Window") self.updateWindowMenu() self.windowMenu.aboutToShow.connect(self.updateWindowMenu) self.menuBar().addSeparator() self.helpMenu = self.menuBar().addMenu("&Help") self.helpMenu.addAction(self.aboutAct) self.helpMenu.addAction(self.aboutQtAct) def updateRateShow(self): askrate = RateDialog(self.updateRate, self) if askrate.exec_(): ur = float(str(askrate.delay.text())) self.setUpdateRate(ur) return else: return def createToolBars(self): self.fileToolBar = self.addToolBar("File") self.fileToolBar.addAction(self.newConAct) self.fileToolBar.addAction(self.urAct) self.fileToolBar = self.addToolBar("Window") self.fileToolBar.addAction(self.tileAct) self.fileToolBar.addAction(self.cascadeAct) def createStatusBar(self): self.statusBar().showMessage("Ready") def activeMdiChild(self): activeSubWindow = self.mdiArea.activeSubWindow() if activeSubWindow: return activeSubWindow.widget() return None def updateMenus(self): hasMdiChild = (self.activeMdiChild() is not None) self.closeAct.setEnabled(hasMdiChild) self.closeAllAct.setEnabled(hasMdiChild) self.tileAct.setEnabled(hasMdiChild) self.cascadeAct.setEnabled(hasMdiChild) self.nextAct.setEnabled(hasMdiChild) self.previousAct.setEnabled(hasMdiChild) self.separatorAct.setVisible(hasMdiChild) def updateWindowMenu(self): self.windowMenu.clear() self.windowMenu.addAction(self.closeAct) self.windowMenu.addAction(self.closeAllAct) self.windowMenu.addSeparator() self.windowMenu.addAction(self.tileAct) self.windowMenu.addAction(self.cascadeAct) self.windowMenu.addSeparator() self.windowMenu.addAction(self.nextAct) self.windowMenu.addAction(self.previousAct) self.windowMenu.addAction(self.separatorAct) def about(self): about_info = \ '''Copyright 2012 Free Software Foundation, Inc.\n This program is part of GNU Radio.\n GNU Radio is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version.\n GNU Radio is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.\n You should have received a copy of the GNU General Public License along with GNU Radio; see the file COPYING. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Boston, MA 02110-1301, USA.''' Qt.QMessageBox.about(None, "gr-ctrlport-monitor", about_info) class ConInfoDialog(Qt.QDialog): def __init__(self, parent=None): super(ConInfoDialog, self).__init__(parent) self.gridLayout = Qt.QGridLayout(self) self.host = Qt.QLineEdit(self) self.port = Qt.QLineEdit(self) self.host.setText("localhost") self.port.setText("43243") self.buttonBox = Qt.QDialogButtonBox( Qt.QDialogButtonBox.Ok | Qt.QDialogButtonBox.Cancel) self.gridLayout.addWidget(self.host) self.gridLayout.addWidget(self.port) self.gridLayout.addWidget(self.buttonBox) self.buttonBox.accepted.connect(self.accept) self.buttonBox.rejected.connect(self.reject) def accept(self): self.done(1) def reject(self): self.done(0) class UpdaterWindow(Qt.QDialog): def __init__(self, key, radio, parent): Qt.QDialog.__init__(self, parent) self.key = key self.radio = radio self.resize(300, 200) self.layout = Qt.QVBoxLayout() self.props = radio.properties([key])[key] info = radio.printProperties(self.props) self.infoLabel = Qt.QLabel(info) self.layout.addWidget(self.infoLabel) # Test here to make sure that a 'set' function exists try: radio.setKnobs(radio.getKnobs([key])) has_set = True except: has_set = False if(has_set is False): self.cancelButton = Qt.QPushButton("Ok") self.cancelButton.clicked.connect(self.reject) self.buttonlayout = Qt.QHBoxLayout() self.buttonlayout.addWidget(self.cancelButton) self.layout.addLayout(self.buttonlayout) else: # we have a set function self.textInput = Qt.QLineEdit() self.layout.addWidget(self.textInput) self.applyButton = Qt.QPushButton("Apply") self.setButton = Qt.QPushButton("OK") self.cancelButton = Qt.QPushButton("Cancel") rv = radio.getKnobs([key]) val = rv[key].value if(type(val) == ControlPort.complex): val = val.re + val.im * 1j self.textInput.setText(str(val)) self.sv = rv[key] self.applyButton.clicked.connect(self._apply) self.setButton.clicked.connect(self._set) self.cancelButton.clicked.connect(self.reject) self.is_num = ((type(self.sv.value) == float) or (type(self.sv.value) == int)) if(self.is_num): self.sliderlayout = Qt.QHBoxLayout() self.slider = Qt.QSlider(QtCore.Qt.Horizontal) self.sliderlayout.addWidget( Qt.QLabel(str(self.props.min.value))) self.sliderlayout.addWidget(self.slider) self.sliderlayout.addWidget( Qt.QLabel(str(self.props.max.value))) self.steps = 10000 self.valspan = self.props.max.value - self.props.min.value self.slider.setRange(0, 10000) self._set_slider_value(self.sv.value) self.slider.sliderReleased.connect(self._slide) self.layout.addLayout(self.sliderlayout) else: self._set_slider_value = None self.buttonlayout = Qt.QHBoxLayout() self.buttonlayout.addWidget(self.applyButton) self.buttonlayout.addWidget(self.setButton) self.buttonlayout.addWidget(self.cancelButton) self.layout.addLayout(self.buttonlayout) # set layout and go... self.setLayout(self.layout) def _set_slider_value(self, val): self.slider.setValue( self.steps * (val - self.props.min.value) / self.valspan) def _slide(self): val = self.props.min.value + \ (self.slider.value() / float(self.steps) * self.valspan) self.textInput.setText(str(val)) def _apply(self): if(type(self.sv.value) == str): val = str(self.textInput.text()) elif(type(self.sv.value) == int): val = int(round(float(self.textInput.text()))) elif(type(self.sv.value) == float): val = float(self.textInput.text()) elif(type(self.sv.value) == ControlPort.complex): t = str(self.textInput.text()) t = complex(t.strip("(").strip(")").replace(" ", "")) val = ControlPort.complex() val.re = t.real val.im = t.imag else: sys.stderr.write( "set type not supported! ({0})\n".format(type(self.sv.value))) return self.sv.value = val km = {} km[self.key] = self.sv self.radio.setKnobs(km) if self._set_slider_value: self._set_slider_value(self.sv.value) def _set(self): self._apply() self.done(0) class MForm(Qt.QWidget): def update(self): try: st = time.time() knobs = self.radioclient.getKnobs([]) ft = time.time() latency = ft - st self.parent.statusBar().showMessage( "Current GNU Radio Control Port Query Latency: %f ms" % (latency * 1000)) except Exception as e: sys.stderr.write( "ctrlport-monitor: lost connection ({0}).\n".format(e)) if(type(self.parent) is MAINWindow): # Find window of connection remove = [] for p in self.parent.mdiArea.subWindowList(): if self.parent.conns[self.uid] == p.widget(): remove.append(p) # Find any subplot windows of connection for p in self.parent.mdiArea.subWindowList(): for plot in self.parent.plots[self.uid]: if plot == p.widget(): remove.append(p) # Clean up local references to these self.parent.conns[self.uid] = [] self.parent.plots[self.uid] = [] # Remove subwindows for connection and plots for r in remove: self.parent.mdiArea.removeSubWindow(r) # Clean up self self.close() else: sys.exit(1) return tableitems = knobs.keys() # UPDATE TABLE: self.table.updateItems(knobs, self.knobprops) # UPDATE PLOTS self.parent.update(knobs, self.uid) def __init__(self, radioclient, uid=0, updateRate=2000, parent=None): super(MForm, self).__init__() self.radioclient = None if radioclient is None: askinfo = ConInfoDialog(self) if askinfo.exec_(): host = str(askinfo.host.text()) port = str(askinfo.port.text()) try: self.radioclient = GNURadioControlPortClient( host, port, 'thrift').client print("Connected to %s:%s" % (host, port)) except: print("Cannot connect to %s:%s" % (host, port)) else: self.radioclient = radioclient if self.radioclient is None: return self.uid = uid self.parent = parent self.horizontalLayout = Qt.QVBoxLayout(self) self.gridLayout = Qt.QGridLayout() self.knobprops = self.radioclient.properties([]) self.parent.knobprops.append(self.knobprops) self.resize(775, 500) self.timer = QtCore.QTimer() self.constupdatediv = 0 self.tableupdatediv = 0 plotsize = 250 # make table self.table = GrDataPlotterValueTable(uid, self, 0, 0, 400, 200) sizePolicy = Qt.QSizePolicy( Qt.QSizePolicy.Preferred, Qt.QSizePolicy.Preferred) self.table.treeWidget.setSizePolicy(sizePolicy) self.table.treeWidget.setEditTriggers( Qt.QAbstractItemView.EditKeyPressed) self.table.treeWidget.setSortingEnabled(True) self.table.treeWidget.setDragEnabled(True) # add things to layouts self.horizontalLayout.addWidget(self.table.treeWidget) # set up timer self.timer.timeout.connect(self.update) self.updateRate = updateRate self.timer.start(self.updateRate) # set up context menu .. self.table.treeWidget.setContextMenuPolicy(QtCore.Qt.CustomContextMenu) self.table.treeWidget.customContextMenuRequested.connect(self.openMenu) # Set up double-click to launch default plotter self.table.treeWidget.itemDoubleClicked.connect(self.parent.newSub) # Allow drag/drop event from table item to plotter self.table.treeWidget.itemPressed.connect(self.parent.startDrag) def openMenu(self, pos): index = self.table.treeWidget.selectedIndexes() item = self.table.treeWidget.itemFromIndex(index[0]) itemname = str(item.text(0)) self.parent.propertiesMenu(itemname, self.radioclient, self.uid) def get_minmax(p): pmin = p.min.value pmax = p.max.value # Find min/max or real or imag for GNURadio::complex # TODO: fix complex if(type(pmin) == ControlPort.complex): pmin = min(pmin.re, pmin.im) if(type(pmax) == ControlPort.complex): pmax = max(pmax.re, pmax.im) # If it's a byte stream, Python thinks it's a string. try: if(type(pmin) == str): pmin = struct.unpack('b', pmin)[0] if(type(pmax) == str): pmax = struct.unpack('b', pmax)[0] except struct.error: pmin = [] pmax = [] if pmin == []: pmin = None else: pmin = 1.1 * float(pmin) if pmax == []: pmax = None else: pmax = 1.1 * float(pmax) return pmin, pmax class MyApp(object): def __init__(self, args): parser = ArgumentParser(description="GNU Radio Control Port Monitor") parser.add_argument("host", nargs="?", default="localhost", help="host name or IP") parser.add_argument("port", help="port") args = parser.parse_args() try: GNURadioControlPortClient( args.host, args.port, 'thrift', self.run, Qt.QApplication(sys.argv).exec_) except: print("ControlPort failed to connect. Check the config of your endpoint.") print("\t[ControlPort] on = True") print("\t[PerfCounters] on = True") print("\t[PerfCounters] export = True") sys.exit(1) def run(self, client): MAINWindow(client).show() MyApp(sys.argv)