diff options
Diffstat (limited to 'grc/python')
-rw-r--r-- | grc/python/Block.py | 323 | ||||
-rw-r--r-- | grc/python/CMakeLists.txt | 44 | ||||
-rw-r--r-- | grc/python/Connection.py | 45 | ||||
-rw-r--r-- | grc/python/Constants.py | 140 | ||||
-rw-r--r-- | grc/python/FlowGraph.py | 338 | ||||
-rw-r--r-- | grc/python/Generator.py | 447 | ||||
-rw-r--r-- | grc/python/Param.py | 433 | ||||
-rw-r--r-- | grc/python/Platform.py | 174 | ||||
-rw-r--r-- | grc/python/Port.py | 268 | ||||
-rw-r--r-- | grc/python/__init__.py | 1 | ||||
-rw-r--r-- | grc/python/block.dtd | 69 | ||||
-rw-r--r-- | grc/python/default_flow_graph.grc | 43 | ||||
-rw-r--r-- | grc/python/epy_block_io.py | 95 | ||||
-rw-r--r-- | grc/python/expr_utils.py | 177 | ||||
-rw-r--r-- | grc/python/extract_docs.py | 296 | ||||
-rw-r--r-- | grc/python/flow_graph.tmpl | 439 |
16 files changed, 0 insertions, 3332 deletions
diff --git a/grc/python/Block.py b/grc/python/Block.py deleted file mode 100644 index 782893fd8f..0000000000 --- a/grc/python/Block.py +++ /dev/null @@ -1,323 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import itertools -import collections - -from .. base.Constants import BLOCK_FLAG_NEED_QT_GUI, BLOCK_FLAG_NEED_WX_GUI -from .. base.odict import odict - -from .. base.Block import Block as _Block -from .. gui.Block import Block as _GUIBlock - -from . FlowGraph import _variable_matcher -from . import epy_block_io - - -class Block(_Block, _GUIBlock): - - def __init__(self, flow_graph, n): - """ - Make a new block from nested data. - - Args: - flow: graph the parent element - n: the nested odict - - Returns: - block a new block - """ - #grab the data - self._doc = (n.find('doc') or '').strip('\n').replace('\\\n', '') - self._imports = map(lambda i: i.strip(), n.findall('import')) - self._make = n.find('make') - self._var_make = n.find('var_make') - self._checks = n.findall('check') - self._callbacks = n.findall('callback') - self._bus_structure_source = n.find('bus_structure_source') or '' - self._bus_structure_sink = n.find('bus_structure_sink') or '' - self.port_counters = [itertools.count(), itertools.count()] - #build the block - _Block.__init__( - self, - flow_graph=flow_graph, - n=n, - ) - _GUIBlock.__init__(self) - - self._epy_source_hash = -1 # for epy blocks - self._epy_reload_error = None - - def get_bus_structure(self, direction): - if direction == 'source': - bus_structure = self._bus_structure_source; - else: - bus_structure = self._bus_structure_sink; - - bus_structure = self.resolve_dependencies(bus_structure); - - if not bus_structure: return '' - try: - clean_bus_structure = self.get_parent().evaluate(bus_structure) - return clean_bus_structure - - except: return '' - - def validate(self): - """ - Validate this block. - Call the base class validate. - Evaluate the checks: each check must evaluate to True. - """ - _Block.validate(self) - #evaluate the checks - for check in self._checks: - check_res = self.resolve_dependencies(check) - try: - if not self.get_parent().evaluate(check_res): - self.add_error_message('Check "%s" failed.'%check) - except: self.add_error_message('Check "%s" did not evaluate.'%check) - # for variables check the value (only if var_value is used - if _variable_matcher.match(self.get_key()) and self._var_value != '$value': - value = self._var_value - try: - value = self.get_var_value() - self.get_parent().evaluate(value) - except Exception as err: - self.add_error_message('Value "%s" cannot be evaluated:\n%s' % (value, err)) - - # check if this is a GUI block and matches the selected generate option - current_generate_option = self.get_parent().get_option('generate_options') - - def check_generate_mode(label, flag, valid_options): - block_requires_mode = ( - flag in self.get_flags() or - self.get_name().upper().startswith(label) - ) - if block_requires_mode and current_generate_option not in valid_options: - self.add_error_message("Can't generate this block in mode " + - repr(current_generate_option)) - - check_generate_mode('WX GUI', BLOCK_FLAG_NEED_WX_GUI, ('wx_gui',)) - check_generate_mode('QT GUI', BLOCK_FLAG_NEED_QT_GUI, ('qt_gui', 'hb_qt_gui')) - if self._epy_reload_error: - self.get_param('_source_code').add_error_message(str(self._epy_reload_error)) - - def rewrite(self): - """ - Add and remove ports to adjust for the nports. - """ - _Block.rewrite(self) - # Check and run any custom rewrite function for this block - getattr(self, 'rewrite_' + self._key, lambda: None)() - - # adjust nports, disconnect hidden ports - for ports in (self.get_sources(), self.get_sinks()): - for i, master_port in enumerate(ports): - nports = master_port.get_nports() or 1 - num_ports = 1 + len(master_port.get_clones()) - if master_port.get_hide(): - for connection in master_port.get_connections(): - self.get_parent().remove_element(connection) - if not nports and num_ports == 1: # not a master port and no left-over clones - continue - # remove excess cloned ports - for port in master_port.get_clones()[nports-1:]: - # remove excess connections - for connection in port.get_connections(): - self.get_parent().remove_element(connection) - master_port.remove_clone(port) - ports.remove(port) - # add more cloned ports - for j in range(num_ports, nports): - port = master_port.add_clone() - ports.insert(ports.index(master_port) + j, port) - - self.back_ofthe_bus(ports) - # renumber non-message/-msg ports - domain_specific_port_index = collections.defaultdict(int) - for port in filter(lambda p: p.get_key().isdigit(), ports): - domain = port.get_domain() - port._key = str(domain_specific_port_index[domain]) - domain_specific_port_index[domain] += 1 - - def port_controller_modify(self, direction): - """ - Change the port controller. - - Args: - direction: +1 or -1 - - Returns: - true for change - """ - changed = False - #concat the nports string from the private nports settings of all ports - nports_str = ' '.join([port._nports for port in self.get_ports()]) - #modify all params whose keys appear in the nports string - for param in self.get_params(): - if param.is_enum() or param.get_key() not in nports_str: continue - #try to increment the port controller by direction - try: - value = param.get_evaluated() - value = value + direction - if 0 < value: - param.set_value(value) - changed = True - except: pass - return changed - - def get_doc(self): - platform = self.get_parent().get_parent() - extracted_docs = platform.block_docstrings.get(self._key, '') - return (self._doc + '\n\n' + extracted_docs).strip() - - def get_category(self): - return _Block.get_category(self) - - def get_imports(self, raw=False): - """ - Resolve all import statements. - Split each import statement at newlines. - Combine all import statments into a list. - Filter empty imports. - - Returns: - a list of import statements - """ - if raw: - return self._imports - return filter(lambda i: i, sum(map(lambda i: self.resolve_dependencies(i).split('\n'), self._imports), [])) - - def get_make(self, raw=False): - if raw: - return self._make - return self.resolve_dependencies(self._make) - - def get_var_make(self): - return self.resolve_dependencies(self._var_make) - - def get_var_value(self): - return self.resolve_dependencies(self._var_value) - - def get_callbacks(self): - """ - Get a list of function callbacks for this block. - - Returns: - a list of strings - """ - def make_callback(callback): - callback = self.resolve_dependencies(callback) - if 'self.' in callback: return callback - return 'self.%s.%s'%(self.get_id(), callback) - return map(make_callback, self._callbacks) - - def is_virtual_sink(self): - return self.get_key() == 'virtual_sink' - - def is_virtual_source(self): - return self.get_key() == 'virtual_source' - - ########################################################################### - # Custom rewrite functions - ########################################################################### - - def rewrite_epy_block(self): - flowgraph = self.get_parent() - platform = flowgraph.get_parent() - param_blk = self.get_param('_io_cache') - param_src = self.get_param('_source_code') - doc_end_tag = 'Block Documentation:' - - src = param_src.get_value() - src_hash = hash((self.get_id(), src)) - if src_hash == self._epy_source_hash: - return - - try: - blk_io = epy_block_io.extract(src) - - except Exception as e: - self._epy_reload_error = ValueError(str(e)) - try: # load last working block io - blk_io = epy_block_io.BlockIO(*eval(param_blk.get_value())) - except: - return - else: - self._epy_reload_error = None # clear previous errors - param_blk.set_value(repr(tuple(blk_io))) - - # print "Rewriting embedded python block {!r}".format(self.get_id()) - self._epy_source_hash = src_hash - self._name = blk_io.name or blk_io.cls - self._doc = self._doc.split(doc_end_tag)[0] + doc_end_tag + '\n' + blk_io.doc - self._imports[0] = 'import ' + self.get_id() - self._make = '{0}.{1}({2})'.format(self.get_id(), blk_io.cls, ', '.join( - '{0}=${0}'.format(key) for key, _ in blk_io.params)) - - params = {} - for param in list(self._params): - if hasattr(param, '__epy_param__'): - params[param.get_key()] = param - self._params.remove(param) - - for key, value in blk_io.params: - try: - param = params[key] - param.set_default(value) - except KeyError: # need to make a new param - name = key.replace('_', ' ').title() - n = odict(dict(name=name, key=key, type='raw', value=value)) - param = platform.Param(block=self, n=n) - setattr(param, '__epy_param__', True) - self._params.append(param) - - def update_ports(label, ports, port_specs, direction): - ports_to_remove = list(ports) - iter_ports = iter(ports) - ports_new = [] - port_current = next(iter_ports, None) - for key, port_type in port_specs: - reuse_port = ( - port_current is not None and - port_current.get_type() == port_type and - (key.isdigit() or port_current.get_key() == key) - ) - if reuse_port: - ports_to_remove.remove(port_current) - port, port_current = port_current, next(iter_ports, None) - else: - n = odict(dict(name=label + str(key), type=port_type, key=key)) - if port_type == 'message': - n['name'] = key - n['optional'] = '1' - port = platform.Port(block=self, n=n, dir=direction) - ports_new.append(port) - # replace old port list with new one - del ports[:] - ports.extend(ports_new) - # remove excess port connections - for port in ports_to_remove: - for connection in port.get_connections(): - flowgraph.remove_element(connection) - - update_ports('in', self.get_sinks(), blk_io.sinks, 'sink') - update_ports('out', self.get_sources(), blk_io.sources, 'source') - _Block.rewrite(self) diff --git a/grc/python/CMakeLists.txt b/grc/python/CMakeLists.txt deleted file mode 100644 index 3f9e273146..0000000000 --- a/grc/python/CMakeLists.txt +++ /dev/null @@ -1,44 +0,0 @@ -# Copyright 2011 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. - -######################################################################## -GR_PYTHON_INSTALL(FILES - expr_utils.py - extract_docs.py - epy_block_io.py - Block.py - Connection.py - Constants.py - FlowGraph.py - Generator.py - Param.py - Platform.py - Port.py - __init__.py - DESTINATION ${GR_PYTHON_DIR}/gnuradio/grc/python - COMPONENT "grc" -) - -install(FILES - block.dtd - default_flow_graph.grc - flow_graph.tmpl - DESTINATION ${GR_PYTHON_DIR}/gnuradio/grc/python - COMPONENT "grc" -) diff --git a/grc/python/Connection.py b/grc/python/Connection.py deleted file mode 100644 index 822876a0ae..0000000000 --- a/grc/python/Connection.py +++ /dev/null @@ -1,45 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import Constants -from .. base.Connection import Connection as _Connection -from .. gui.Connection import Connection as _GUIConnection - -class Connection(_Connection, _GUIConnection): - - def __init__(self, **kwargs): - _Connection.__init__(self, **kwargs) - _GUIConnection.__init__(self) - - def is_msg(self): - return self.get_source().get_type() == self.get_sink().get_type() == 'msg' - - def is_bus(self): - return self.get_source().get_type() == self.get_sink().get_type() == 'bus' - - def validate(self): - """ - Validate the connections. - The ports must match in io size. - """ - _Connection.validate(self) - source_size = Constants.TYPE_TO_SIZEOF[self.get_source().get_type()] * self.get_source().get_vlen() - sink_size = Constants.TYPE_TO_SIZEOF[self.get_sink().get_type()] * self.get_sink().get_vlen() - if source_size != sink_size: - self.add_error_message('Source IO size "%s" does not match sink IO size "%s".'%(source_size, sink_size)) diff --git a/grc/python/Constants.py b/grc/python/Constants.py deleted file mode 100644 index b7a370cad7..0000000000 --- a/grc/python/Constants.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import os -from os.path import expanduser -import numpy -import stat -from gnuradio import gr - -_gr_prefs = gr.prefs() - -# setup paths -PATH_SEP = {'/': ':', '\\': ';'}[os.path.sep] - -HIER_BLOCKS_LIB_DIR = os.environ.get('GRC_HIER_PATH', expanduser('~/.grc_gnuradio')) - -PREFS_FILE = os.environ.get('GRC_PREFS_PATH', expanduser('~/.gnuradio/grc.conf')) -PREFS_FILE_OLD = os.environ.get('GRC_PREFS_PATH', expanduser('~/.grc')) - -BLOCKS_DIRS = filter( # filter blank strings - lambda x: x, PATH_SEP.join([ - os.environ.get('GRC_BLOCKS_PATH', ''), - _gr_prefs.get_string('grc', 'local_blocks_path', ''), - _gr_prefs.get_string('grc', 'global_blocks_path', ''), - ]).split(PATH_SEP), -) + [HIER_BLOCKS_LIB_DIR] - -# user settings -XTERM_EXECUTABLE = _gr_prefs.get_string('grc', 'xterm_executable', 'xterm') - -# file creation modes -TOP_BLOCK_FILE_MODE = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH -HIER_BLOCK_FILE_MODE = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH - -# data files -DATA_DIR = os.path.dirname(__file__) -FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.tmpl') -BLOCK_DTD = os.path.join(DATA_DIR, 'block.dtd') -DEFAULT_FLOW_GRAPH = os.path.join(DATA_DIR, 'default_flow_graph.grc') - -#define types, native python + numpy -VECTOR_TYPES = (tuple, list, set, numpy.ndarray) -COMPLEX_TYPES = [complex, numpy.complex, numpy.complex64, numpy.complex128] -REAL_TYPES = [float, numpy.float, numpy.float32, numpy.float64] -INT_TYPES = [int, long, numpy.int, numpy.int8, numpy.int16, numpy.int32, numpy.uint64, - numpy.uint, numpy.uint8, numpy.uint16, numpy.uint32, numpy.uint64] -#cast to tuple for isinstance, concat subtypes -COMPLEX_TYPES = tuple(COMPLEX_TYPES + REAL_TYPES + INT_TYPES) -REAL_TYPES = tuple(REAL_TYPES + INT_TYPES) -INT_TYPES = tuple(INT_TYPES) - -# Updating colors. Using the standard color pallette from: -# http://www.google.com/design/spec/style/color.html#color-color-palette -# Most are based on the main, primary color standard. Some are within -# that color's spectrum when it was deemed necessary. -GRC_COLOR_BROWN = '#795548' -GRC_COLOR_BLUE = '#2196F3' -GRC_COLOR_LIGHT_GREEN = '#8BC34A' -GRC_COLOR_GREEN = '#4CAF50' -GRC_COLOR_AMBER = '#FFC107' -GRC_COLOR_PURPLE = '#9C27B0' -GRC_COLOR_CYAN = '#00BCD4' -GRC_COLOR_GR_ORANGE = '#FF6905' -GRC_COLOR_ORANGE = '#F57C00' -GRC_COLOR_LIME = '#CDDC39' -GRC_COLOR_TEAL = '#009688' -GRC_COLOR_YELLOW = '#FFEB3B' -GRC_COLOR_PINK = '#F50057' -GRC_COLOR_LIGHT_PURPLE = '#E040FB' -GRC_COLOR_DARK_GREY = '#72706F' -GRC_COLOR_GREY = '#BDBDBD' -GRC_COLOR_WHITE = '#FFFFFF' - - -CORE_TYPES = ( # name, key, sizeof, color - ('Complex Float 64', 'fc64', 16, GRC_COLOR_BROWN), - ('Complex Float 32', 'fc32', 8, GRC_COLOR_BLUE), - ('Complex Integer 64', 'sc64', 16, GRC_COLOR_LIGHT_GREEN), - ('Complex Integer 32', 'sc32', 8, GRC_COLOR_GREEN), - ('Complex Integer 16', 'sc16', 4, GRC_COLOR_AMBER), - ('Complex Integer 8', 'sc8', 2, GRC_COLOR_PURPLE), - ('Float 64', 'f64', 8, GRC_COLOR_CYAN), - ('Float 32', 'f32', 4, GRC_COLOR_ORANGE), - ('Integer 64', 's64', 8, GRC_COLOR_LIME), - ('Integer 32', 's32', 4, GRC_COLOR_TEAL), - ('Integer 16', 's16', 2, GRC_COLOR_YELLOW), - ('Integer 8', 's8', 1, GRC_COLOR_LIGHT_PURPLE), - ('Message Queue', 'msg', 0, GRC_COLOR_DARK_GREY), - ('Async Message', 'message', 0, GRC_COLOR_GREY), - ('Bus Connection', 'bus', 0, GRC_COLOR_WHITE), - ('Wildcard', '', 0, GRC_COLOR_WHITE), -) - -ALIAS_TYPES = { - 'complex' : (8, GRC_COLOR_BLUE), - 'float' : (4, GRC_COLOR_ORANGE), - 'int' : (4, GRC_COLOR_TEAL), - 'short' : (2, GRC_COLOR_YELLOW), - 'byte' : (1, GRC_COLOR_LIGHT_PURPLE), -} - -TYPE_TO_COLOR = dict() -TYPE_TO_SIZEOF = dict() -for name, key, sizeof, color in CORE_TYPES: - TYPE_TO_COLOR[key] = color - TYPE_TO_SIZEOF[key] = sizeof -for key, (sizeof, color) in ALIAS_TYPES.iteritems(): - TYPE_TO_COLOR[key] = color - TYPE_TO_SIZEOF[key] = sizeof - -#coloring -COMPLEX_COLOR_SPEC = '#3399FF' -FLOAT_COLOR_SPEC = '#FF8C69' -INT_COLOR_SPEC = '#00FF99' -SHORT_COLOR_SPEC = '#FFFF66' -BYTE_COLOR_SPEC = '#FF66FF' -COMPLEX_VECTOR_COLOR_SPEC = '#3399AA' -FLOAT_VECTOR_COLOR_SPEC = '#CC8C69' -INT_VECTOR_COLOR_SPEC = '#00CC99' -SHORT_VECTOR_COLOR_SPEC = '#CCCC33' -BYTE_VECTOR_COLOR_SPEC = '#CC66CC' -ID_COLOR_SPEC = '#DDDDDD' -WILDCARD_COLOR_SPEC = '#FFFFFF' -MSG_COLOR_SPEC = '#777777' diff --git a/grc/python/FlowGraph.py b/grc/python/FlowGraph.py deleted file mode 100644 index b2a1d27859..0000000000 --- a/grc/python/FlowGraph.py +++ /dev/null @@ -1,338 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" -import re -import imp -from operator import methodcaller - -from . import expr_utils -from .. base.FlowGraph import FlowGraph as _FlowGraph -from .. gui.FlowGraph import FlowGraph as _GUIFlowGraph - -_variable_matcher = re.compile('^(variable\w*)$') -_parameter_matcher = re.compile('^(parameter)$') -_monitors_searcher = re.compile('(ctrlport_monitor)') -_bussink_searcher = re.compile('^(bus_sink)$') -_bussrc_searcher = re.compile('^(bus_source)$') -_bus_struct_sink_searcher = re.compile('^(bus_structure_sink)$') -_bus_struct_src_searcher = re.compile('^(bus_structure_source)$') - - -class FlowGraph(_FlowGraph, _GUIFlowGraph): - - def __init__(self, **kwargs): - self.grc_file_path = '' - _FlowGraph.__init__(self, **kwargs) - _GUIFlowGraph.__init__(self) - self.n = {} - self.n_hash = -1 - self._renew_eval_ns = True - self._eval_cache = {} - - def _eval(self, code, namespace, namespace_hash): - """ - Evaluate the code with the given namespace. - - Args: - code: a string with python code - namespace: a dict representing the namespace - namespace_hash: a unique hash for the namespace - - Returns: - the resultant object - """ - if not code: raise Exception, 'Cannot evaluate empty statement.' - my_hash = hash(code) ^ namespace_hash - #cache if does not exist - if not self._eval_cache.has_key(my_hash): - self._eval_cache[my_hash] = eval(code, namespace, namespace) - #return from cache - return self._eval_cache[my_hash] - - def get_hier_block_stream_io(self, direction): - """ - Get a list of stream io signatures for this flow graph. - - Args: - direction: a string of 'in' or 'out' - - Returns: - a list of dicts with: type, label, vlen, size, optional - """ - return filter(lambda p: p['type'] != "message", - self.get_hier_block_io(direction)) - - def get_hier_block_message_io(self, direction): - """ - Get a list of message io signatures for this flow graph. - - Args: - direction: a string of 'in' or 'out' - - Returns: - a list of dicts with: type, label, vlen, size, optional - """ - return filter(lambda p: p['type'] == "message", - self.get_hier_block_io(direction)) - - def get_hier_block_io(self, direction): - """ - Get a list of io ports for this flow graph. - - Args: - direction: a string of 'in' or 'out' - - Returns: - a list of dicts with: type, label, vlen, size, optional - """ - pads = self.get_pad_sources() if direction in ('sink', 'in') else \ - self.get_pad_sinks() if direction in ('source', 'out') else [] - ports = [] - for pad in pads: - master = { - 'label': str(pad.get_param('label').get_evaluated()), - 'type': str(pad.get_param('type').get_evaluated()), - 'vlen': str(pad.get_param('vlen').get_value()), - 'size': pad.get_param('type').get_opt('size'), - 'optional': bool(pad.get_param('optional').get_evaluated()), - } - num_ports = pad.get_param('num_streams').get_evaluated() - if num_ports > 1: - for i in xrange(num_ports): - clone = master.copy() - clone['label'] += str(i) - ports.append(clone) - else: - ports.append(master) - return ports - - def get_pad_sources(self): - """ - Get a list of pad source blocks sorted by id order. - - Returns: - a list of pad source blocks in this flow graph - """ - pads = filter(lambda b: b.get_key() == 'pad_source', self.get_enabled_blocks()) - return sorted(pads, lambda x, y: cmp(x.get_id(), y.get_id())) - - def get_pad_sinks(self): - """ - Get a list of pad sink blocks sorted by id order. - - Returns: - a list of pad sink blocks in this flow graph - """ - pads = filter(lambda b: b.get_key() == 'pad_sink', self.get_enabled_blocks()) - return sorted(pads, lambda x, y: cmp(x.get_id(), y.get_id())) - - def get_pad_port_global_key(self, port): - """ - Get the key for a port of a pad source/sink to use in connect() - This takes into account that pad blocks may have multiple ports - - Returns: - the key (str) - """ - key_offset = 0 - pads = self.get_pad_sources() if port.is_source() else self.get_pad_sinks() - for pad in pads: - # using the block param 'type' instead of the port domain here - # to emphasize that hier block generation is domain agnostic - is_message_pad = pad.get_param('type').get_evaluated() == "message" - if port.get_parent() == pad: - if is_message_pad: - key = pad.get_param('label').get_value() - else: - key = str(key_offset + int(port.get_key())) - return key - else: - # assuming we have either only sources or sinks - if not is_message_pad: - key_offset += len(pad.get_ports()) - return -1 - - def get_imports(self): - """ - Get a set of all import statments in this flow graph namespace. - - Returns: - a set of import statements - """ - imports = sum([block.get_imports() for block in self.get_enabled_blocks()], []) - imports = sorted(set(imports)) - return imports - - def get_variables(self): - """ - Get a list of all variables in this flow graph namespace. - Exclude paramterized variables. - - Returns: - a sorted list of variable blocks in order of dependency (indep -> dep) - """ - variables = filter(lambda b: _variable_matcher.match(b.get_key()), self.iter_enabled_blocks()) - return expr_utils.sort_objects(variables, methodcaller('get_id'), methodcaller('get_var_make')) - - def get_parameters(self): - """ - Get a list of all paramterized variables in this flow graph namespace. - - Returns: - a list of paramterized variables - """ - parameters = filter(lambda b: _parameter_matcher.match(b.get_key()), self.iter_enabled_blocks()) - return parameters - - def get_monitors(self): - """ - Get a list of all ControlPort monitors - """ - monitors = filter(lambda b: _monitors_searcher.search(b.get_key()), - self.iter_enabled_blocks()) - return monitors - - def get_python_modules(self): - """Iterate over custom code block ID and Source""" - for block in self.iter_enabled_blocks(): - if block.get_key() == 'epy_module': - yield block.get_id(), block.get_param('source_code').get_value() - - def get_bussink(self): - bussink = filter(lambda b: _bussink_searcher.search(b.get_key()), self.get_enabled_blocks()) - - for i in bussink: - for j in i.get_params(): - if j.get_name() == 'On/Off' and j.get_value() == 'on': - return True; - - return False - - def get_bussrc(self): - bussrc = filter(lambda b: _bussrc_searcher.search(b.get_key()), self.get_enabled_blocks()) - - for i in bussrc: - for j in i.get_params(): - if j.get_name() == 'On/Off' and j.get_value() == 'on': - return True; - - return False - - def get_bus_structure_sink(self): - bussink = filter(lambda b: _bus_struct_sink_searcher.search(b.get_key()), self.get_enabled_blocks()) - - return bussink - - def get_bus_structure_src(self): - bussrc = filter(lambda b: _bus_struct_src_searcher.search(b.get_key()), self.get_enabled_blocks()) - - return bussrc - - def rewrite(self): - """ - Flag the namespace to be renewed. - """ - def reconnect_bus_blocks(): - for block in self.get_blocks(): - - if 'bus' in map(lambda a: a.get_type(), block.get_sources_gui()): - - - for i in range(len(block.get_sources_gui())): - if len(block.get_sources_gui()[i].get_connections()) > 0: - source = block.get_sources_gui()[i] - sink = [] - - for j in range(len(source.get_connections())): - sink.append(source.get_connections()[j].get_sink()); - - - for elt in source.get_connections(): - self.remove_element(elt); - for j in sink: - self.connect(source, j); - self._renew_eval_ns = True - _FlowGraph.rewrite(self); - reconnect_bus_blocks(); - - def evaluate(self, expr): - """ - Evaluate the expression. - - Args: - expr: the string expression - @throw Exception bad expression - - Returns: - the evaluated data - """ - if self._renew_eval_ns: - self._renew_eval_ns = False - #reload namespace - n = dict() - #load imports - for code in self.get_imports(): - try: exec code in n - except: pass - - for id, code in self.get_python_modules(): - try: - module = imp.new_module(id) - exec code in module.__dict__ - n[id] = module - except: - pass - - #load parameters - np = dict() - for parameter in self.get_parameters(): - try: - e = eval(parameter.get_param('value').to_code(), n, n) - np[parameter.get_id()] = e - except: pass - n.update(np) #merge param namespace - #load variables - for variable in self.get_variables(): - try: - e = eval(variable.get_var_value(), n, n) - n[variable.get_id()] = e - except: pass - #make namespace public - self.n = n - self.n_hash = hash(str(n)) - #evaluate - e = self._eval(expr, self.n, self.n_hash) - return e - - def get_new_block(self, key): - """Try to auto-generate the block from file if missing""" - block = _FlowGraph.get_new_block(self, key) - if not block: - platform = self.get_parent() - # we're before the initial fg rewrite(), so no evaluated values! - # --> use raw value instead - path_param = self._options_block.get_param('hier_block_src_path') - file_path = platform.find_file_in_paths( - filename=key + '.' + platform.get_key(), - paths=path_param.get_value(), - cwd=self.grc_file_path - ) - if file_path: # grc file found. load and get block - platform.load_and_generate_flow_graph(file_path) - block = _FlowGraph.get_new_block(self, key) # can be None - return block diff --git a/grc/python/Generator.py b/grc/python/Generator.py deleted file mode 100644 index 56e3a6e78f..0000000000 --- a/grc/python/Generator.py +++ /dev/null @@ -1,447 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import os -import sys -import subprocess -import tempfile -import shlex -import codecs -import re # for shlex_quote -from distutils.spawn import find_executable - -from Cheetah.Template import Template - -from .. gui import Messages -from .. base import ParseXML -from .. base import odict -from .. base.Constants import BLOCK_FLAG_NEED_QT_GUI - -from . Constants import TOP_BLOCK_FILE_MODE, FLOW_GRAPH_TEMPLATE, \ - XTERM_EXECUTABLE, HIER_BLOCK_FILE_MODE, HIER_BLOCKS_LIB_DIR, BLOCK_DTD -from . import expr_utils - - -class Generator(object): - """Adaptor for various generators (uses generate_options)""" - - def __init__(self, flow_graph, file_path): - """ - Initialize the generator object. - Determine the file to generate. - - Args: - flow_graph: the flow graph object - file_path: the path to the grc file - """ - self._generate_options = flow_graph.get_option('generate_options') - if self._generate_options == 'hb': - generator_cls = HierBlockGenerator - elif self._generate_options == 'hb_qt_gui': - generator_cls = QtHierBlockGenerator - else: - generator_cls = TopBlockGenerator - - self._generator = generator_cls(flow_graph, file_path) - - def get_generate_options(self): - return self._generate_options - - def __getattr__(self, item): - """get all other attrib from actual generator object""" - return getattr(self._generator, item) - - -class TopBlockGenerator(object): - - def __init__(self, flow_graph, file_path): - """ - Initialize the top block generator object. - - Args: - flow_graph: the flow graph object - file_path: the path to write the file to - """ - self._flow_graph = flow_graph - self._generate_options = self._flow_graph.get_option('generate_options') - self._mode = TOP_BLOCK_FILE_MODE - dirname = self._dirname = os.path.dirname(file_path) - # handle the case where the directory is read-only - # in this case, use the system's temp directory - if not os.access(dirname, os.W_OK): - dirname = tempfile.gettempdir() - filename = self._flow_graph.get_option('id') + '.py' - self._file_path = os.path.join(dirname, filename) - - def get_file_path(self): - return self._file_path - - def write(self): - """generate output and write it to files""" - # do throttle warning - throttling_blocks = filter(lambda b: b.throtteling(), self._flow_graph.get_enabled_blocks()) - if not throttling_blocks and not self._generate_options.startswith('hb'): - Messages.send_warning("This flow graph may not have flow control: " - "no audio or RF hardware blocks found. " - "Add a Misc->Throttle block to your flow " - "graph to avoid CPU congestion.") - if len(throttling_blocks) > 1: - keys = set(map(lambda b: b.get_key(), throttling_blocks)) - if len(keys) > 1 and 'blocks_throttle' in keys: - Messages.send_warning("This flow graph contains a throttle " - "block and another rate limiting block, " - "e.g. a hardware source or sink. " - "This is usually undesired. Consider " - "removing the throttle block.") - # generate - for filename, data in self._build_python_code_from_template(): - with codecs.open(filename, 'w', encoding='utf-8') as fp: - fp.write(data) - if filename == self.get_file_path(): - try: - os.chmod(filename, self._mode) - except: - pass - - def get_popen(self): - """ - Execute this python flow graph. - - Returns: - a popen object - """ - run_command = self._flow_graph.get_option('run_command') - try: - run_command = run_command.format( - python=shlex_quote(sys.executable), - filename=shlex_quote(self.get_file_path())) - run_command_args = shlex.split(run_command) - except Exception as e: - raise ValueError("Can't parse run command {!r}: {}".format(run_command, e)) - - # when in no gui mode on linux, use a graphical terminal (looks nice) - xterm_executable = find_executable(XTERM_EXECUTABLE) - if self._generate_options == 'no_gui' and xterm_executable: - run_command_args = [xterm_executable, '-e', run_command] - - # this does not reproduce a shell executable command string, if a graphical - # terminal is used. Passing run_command though shlex_quote would do it but - # it looks really ugly and confusing in the console panel. - Messages.send_start_exec(' '.join(run_command_args)) - - return subprocess.Popen( - args=run_command_args, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - shell=False, universal_newlines=True - ) - - def _build_python_code_from_template(self): - """ - Convert the flow graph to python code. - - Returns: - a string of python code - """ - output = list() - - fg = self._flow_graph - title = fg.get_option('title') or fg.get_option('id').replace('_', ' ').title() - imports = fg.get_imports() - variables = fg.get_variables() - parameters = fg.get_parameters() - monitors = fg.get_monitors() - - # list of blocks not including variables and imports and parameters and disabled - def _get_block_sort_text(block): - code = block.get_make().replace(block.get_id(), ' ') - try: - code += block.get_param('notebook').get_value() # older gui markup w/ wxgui - except: - pass - try: - code += block.get_param('gui_hint').get_value() # newer gui markup w/ qtgui - except: - pass - return code - - blocks = expr_utils.sort_objects( - filter(lambda b: b.get_enabled() and not b.get_bypassed(), fg.iter_blocks()), - lambda b: b.get_id(), _get_block_sort_text - ) - # List of regular blocks (all blocks minus the special ones) - blocks = filter(lambda b: b not in (imports + parameters), blocks) - - for block in blocks: - key = block.get_key() - file_path = os.path.join(self._dirname, block.get_id() + '.py') - if key == 'epy_block': - src = block.get_param('_source_code').get_value() - output.append((file_path, src)) - elif key == 'epy_module': - src = block.get_param('source_code').get_value() - output.append((file_path, src)) - - # Filter out virtual sink connections - cf = lambda c: not (c.is_bus() or c.is_msg() or c.get_sink().get_parent().is_virtual_sink()) - connections = filter(cf, fg.get_enabled_connections()) - - # Get the virtual blocks and resolve their connections - virtual = filter(lambda c: c.get_source().get_parent().is_virtual_source(), connections) - for connection in virtual: - source = connection.get_source().resolve_virtual_source() - sink = connection.get_sink() - resolved = fg.get_parent().Connection(flow_graph=fg, porta=source, portb=sink) - connections.append(resolved) - # Remove the virtual connection - connections.remove(connection) - - # Bypassing blocks: Need to find all the enabled connections for the block using - # the *connections* object rather than get_connections(). Create new connections - # that bypass the selected block and remove the existing ones. This allows adjacent - # bypassed blocks to see the newly created connections to downstream blocks, - # allowing them to correctly construct bypass connections. - bypassed_blocks = fg.get_bypassed_blocks() - for block in bypassed_blocks: - # Get the upstream connection (off of the sink ports) - # Use *connections* not get_connections() - get_source_connection = lambda c: c.get_sink() == block.get_sinks()[0] - source_connection = filter(get_source_connection, connections) - # The source connection should never have more than one element. - assert (len(source_connection) == 1) - - # Get the source of the connection. - source_port = source_connection[0].get_source() - - # Loop through all the downstream connections - get_sink_connections = lambda c: c.get_source() == block.get_sources()[0] - for sink in filter(get_sink_connections, connections): - if not sink.get_enabled(): - # Ignore disabled connections - continue - sink_port = sink.get_sink() - connection = fg.get_parent().Connection(flow_graph=fg, porta=source_port, portb=sink_port) - connections.append(connection) - # Remove this sink connection - connections.remove(sink) - # Remove the source connection - connections.remove(source_connection[0]) - - # List of connections where each endpoint is enabled (sorted by domains, block names) - connections.sort(key=lambda c: ( - c.get_source().get_domain(), c.get_sink().get_domain(), - c.get_source().get_parent().get_id(), c.get_sink().get_parent().get_id() - )) - - connection_templates = fg.get_parent().get_connection_templates() - msgs = filter(lambda c: c.is_msg(), fg.get_enabled_connections()) - # list of variable names - var_ids = [var.get_id() for var in parameters + variables] - # prepend self. - replace_dict = dict([(var_id, 'self.%s' % var_id) for var_id in var_ids]) - # list of callbacks - callbacks = [ - expr_utils.expr_replace(cb, replace_dict) - for cb in sum([block.get_callbacks() for block in fg.get_enabled_blocks()], []) - ] - # map var id to callbacks - var_id2cbs = dict([ - (var_id, filter(lambda c: expr_utils.get_variable_dependencies(c, [var_id]), callbacks)) - for var_id in var_ids - ]) - # load the namespace - namespace = { - 'title': title, - 'imports': imports, - 'flow_graph': fg, - 'variables': variables, - 'parameters': parameters, - 'monitors': monitors, - 'blocks': blocks, - 'connections': connections, - 'connection_templates': connection_templates, - 'msgs': msgs, - 'generate_options': self._generate_options, - 'var_id2cbs': var_id2cbs, - } - # build the template - t = Template(open(FLOW_GRAPH_TEMPLATE, 'r').read(), namespace) - output.append((self.get_file_path(), str(t))) - return output - - -class HierBlockGenerator(TopBlockGenerator): - """Extends the top block generator to also generate a block XML file""" - - def __init__(self, flow_graph, file_path): - """ - Initialize the hier block generator object. - - Args: - flow_graph: the flow graph object - file_path: where to write the py file (the xml goes into HIER_BLOCK_LIB_DIR) - """ - TopBlockGenerator.__init__(self, flow_graph, file_path) - self._mode = HIER_BLOCK_FILE_MODE - self._file_path = os.path.join(HIER_BLOCKS_LIB_DIR, - self._flow_graph.get_option('id') + '.py') - self._file_path_xml = self._file_path + '.xml' - - def get_file_path_xml(self): - return self._file_path_xml - - def write(self): - """generate output and write it to files""" - TopBlockGenerator.write(self) - ParseXML.to_file(self._build_block_n_from_flow_graph_io(), self.get_file_path_xml()) - ParseXML.validate_dtd(self.get_file_path_xml(), BLOCK_DTD) - try: - os.chmod(self.get_file_path_xml(), self._mode) - except: - pass - - def _build_block_n_from_flow_graph_io(self): - """ - Generate a block XML nested data from the flow graph IO - - Returns: - a xml node tree - """ - # extract info from the flow graph - block_key = self._flow_graph.get_option('id') - parameters = self._flow_graph.get_parameters() - - def var_or_value(name): - if name in map(lambda p: p.get_id(), parameters): - return "$"+name - return name - - # build the nested data - block_n = odict() - block_n['name'] = self._flow_graph.get_option('title') or \ - self._flow_graph.get_option('id').replace('_', ' ').title() - block_n['key'] = block_key - block_n['category'] = self._flow_graph.get_option('category') - block_n['import'] = "from {0} import {0} # grc-generated hier_block".format( - self._flow_graph.get_option('id')) - # make data - if parameters: - block_n['make'] = '{cls}(\n {kwargs},\n)'.format( - cls=block_key, - kwargs=',\n '.join( - '{key}=${key}'.format(key=param.get_id()) for param in parameters - ), - ) - else: - block_n['make'] = '{cls}()'.format(cls=block_key) - # callback data - block_n['callback'] = [ - 'set_{key}(${key})'.format(key=param.get_id()) for param in parameters - ] - - # Parameters - block_n['param'] = list() - for param in parameters: - param_n = odict() - param_n['name'] = param.get_param('label').get_value() or param.get_id() - param_n['key'] = param.get_id() - param_n['value'] = param.get_param('value').get_value() - param_n['type'] = 'raw' - block_n['param'].append(param_n) - - # bus stuff - if self._flow_graph.get_bussink(): - block_n['bus_sink'] = '1' - if self._flow_graph.get_bussrc(): - block_n['bus_source'] = '1' - - # sink/source ports - for direction in ('sink', 'source'): - block_n[direction] = list() - for port in self._flow_graph.get_hier_block_io(direction): - port_n = odict() - port_n['name'] = port['label'] - port_n['type'] = port['type'] - if port['type'] != "message": - port_n['vlen'] = var_or_value(port['vlen']) - if port['optional']: - port_n['optional'] = '1' - block_n[direction].append(port_n) - - # more bus stuff - bus_struct_sink = self._flow_graph.get_bus_structure_sink() - if bus_struct_sink: - block_n['bus_structure_sink'] = bus_struct_sink[0].get_param('struct').get_value() - bus_struct_src = self._flow_graph.get_bus_structure_src() - if bus_struct_src: - block_n['bus_structure_source'] = bus_struct_src[0].get_param('struct').get_value() - - # documentation - block_n['doc'] = "\n".join(field for field in ( - self._flow_graph.get_option('author'), - self._flow_graph.get_option('description'), - self.get_file_path() - ) if field) - block_n['grc_source'] = str(self._flow_graph.grc_file_path) - - n = {'block': block_n} - return n - - -class QtHierBlockGenerator(HierBlockGenerator): - - def _build_block_n_from_flow_graph_io(self): - n = HierBlockGenerator._build_block_n_from_flow_graph_io(self) - block_n = n['block'] - - if not block_n['name'].upper().startswith('QT GUI'): - block_n['name'] = 'QT GUI ' + block_n['name'] - - block_n.insert_after('category', 'flags', BLOCK_FLAG_NEED_QT_GUI) - - gui_hint_param = odict() - gui_hint_param['name'] = 'GUI Hint' - gui_hint_param['key'] = 'gui_hint' - gui_hint_param['value'] = '' - gui_hint_param['type'] = 'gui_hint' - gui_hint_param['hide'] = 'part' - block_n['param'].append(gui_hint_param) - - block_n['make'] += ( - "\n#set $win = 'self.%s' % $id" - "\n${gui_hint()($win)}" - ) - return n - - -########################################################### -# back-port from python3 -########################################################### -_find_unsafe = re.compile(r'[^\w@%+=:,./-]').search - - -def shlex_quote(s): - """Return a shell-escaped version of the string *s*.""" - if not s: - return "''" - if _find_unsafe(s) is None: - return s - - # use single quotes, and put single quotes into double quotes - # the string $'b is then quoted as '$'"'"'b' - return "'" + s.replace("'", "'\"'\"'") + "'" diff --git a/grc/python/Param.py b/grc/python/Param.py deleted file mode 100644 index e60f613f00..0000000000 --- a/grc/python/Param.py +++ /dev/null @@ -1,433 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import ast -import re - -from gnuradio import gr - -from .. base.Param import Param as _Param -from .. gui.Param import Param as _GUIParam - -import Constants -from Constants import VECTOR_TYPES, COMPLEX_TYPES, REAL_TYPES, INT_TYPES - -from gnuradio import eng_notation - -_check_id_matcher = re.compile('^[a-z|A-Z]\w*$') -_show_id_matcher = re.compile('^(variable\w*|parameter|options|notebook|epy_module)$') - - -#blacklist certain ids, its not complete, but should help -import __builtin__ -ID_BLACKLIST = ['self', 'options', 'gr', 'blks2', 'wxgui', 'wx', 'math', 'forms', 'firdes'] + \ - filter(lambda x: not x.startswith('_'), dir(gr.top_block())) + dir(__builtin__) - - -def num_to_str(num): - """ Display logic for numbers """ - if isinstance(num, COMPLEX_TYPES): - num = complex(num) #cast to python complex - if num == 0: return '0' #value is zero - elif num.imag == 0: return '%s'%eng_notation.num_to_str(num.real) #value is real - elif num.real == 0: return '%sj'%eng_notation.num_to_str(num.imag) #value is imaginary - elif num.imag < 0: return '%s-%sj'%(eng_notation.num_to_str(num.real), eng_notation.num_to_str(abs(num.imag))) - else: return '%s+%sj'%(eng_notation.num_to_str(num.real), eng_notation.num_to_str(num.imag)) - else: return str(num) - - -class Param(_Param, _GUIParam): - - def __init__(self, **kwargs): - _Param.__init__(self, **kwargs) - _GUIParam.__init__(self) - self._init = False - self._hostage_cells = list() - - def get_types(self): return ( - 'raw', 'enum', - 'complex', 'real', 'float', 'int', - 'complex_vector', 'real_vector', 'float_vector', 'int_vector', - 'hex', 'string', 'bool', - 'file_open', 'file_save', '_multiline', '_multiline_python_external', - 'id', 'stream_id', - 'grid_pos', 'notebook', 'gui_hint', - 'import', - ) - - def __repr__(self): - """ - Get the repr (nice string format) for this param. - - Returns: - the string representation - """ - ################################################## - # truncate helper method - ################################################## - def _truncate(string, style=0): - max_len = max(27 - len(self.get_name()), 3) - if len(string) > max_len: - if style < 0: #front truncate - string = '...' + string[3-max_len:] - elif style == 0: #center truncate - string = string[:max_len/2 -3] + '...' + string[-max_len/2:] - elif style > 0: #rear truncate - string = string[:max_len-3] + '...' - return string - ################################################## - # simple conditions - ################################################## - if not self.is_valid(): return _truncate(self.get_value()) - if self.get_value() in self.get_option_keys(): return self.get_option(self.get_value()).get_name() - - ################################################## - # split up formatting by type - ################################################## - truncate = 0 #default center truncate - e = self.get_evaluated() - t = self.get_type() - if isinstance(e, bool): return str(e) - elif isinstance(e, COMPLEX_TYPES): dt_str = num_to_str(e) - elif isinstance(e, VECTOR_TYPES): #vector types - if len(e) > 8: - dt_str = self.get_value() #large vectors use code - truncate = 1 - else: dt_str = ', '.join(map(num_to_str, e)) #small vectors use eval - elif t in ('file_open', 'file_save'): - dt_str = self.get_value() - truncate = -1 - else: dt_str = str(e) #other types - ################################################## - # done - ################################################## - return _truncate(dt_str, truncate) - - def get_color(self): - """ - Get the color that represents this param's type. - - Returns: - a hex color code. - """ - try: - return { - #number types - 'complex': Constants.COMPLEX_COLOR_SPEC, - 'real': Constants.FLOAT_COLOR_SPEC, - 'float': Constants.FLOAT_COLOR_SPEC, - 'int': Constants.INT_COLOR_SPEC, - #vector types - 'complex_vector': Constants.COMPLEX_VECTOR_COLOR_SPEC, - 'real_vector': Constants.FLOAT_VECTOR_COLOR_SPEC, - 'float_vector': Constants.FLOAT_VECTOR_COLOR_SPEC, - 'int_vector': Constants.INT_VECTOR_COLOR_SPEC, - #special - 'bool': Constants.INT_COLOR_SPEC, - 'hex': Constants.INT_COLOR_SPEC, - 'string': Constants.BYTE_VECTOR_COLOR_SPEC, - 'id': Constants.ID_COLOR_SPEC, - 'stream_id': Constants.ID_COLOR_SPEC, - 'grid_pos': Constants.INT_VECTOR_COLOR_SPEC, - 'notebook': Constants.INT_VECTOR_COLOR_SPEC, - 'raw': Constants.WILDCARD_COLOR_SPEC, - }[self.get_type()] - except: return _Param.get_color(self) - - def get_hide(self): - """ - Get the hide value from the base class. - Hide the ID parameter for most blocks. Exceptions below. - If the parameter controls a port type, vlen, or nports, return part. - If the parameter is an empty grid position, return part. - These parameters are redundant to display in the flow graph view. - - Returns: - hide the hide property string - """ - hide = _Param.get_hide(self) - if hide: return hide - #hide ID in non variable blocks - if self.get_key() == 'id' and not _show_id_matcher.match(self.get_parent().get_key()): return 'part' - #hide port controllers for type and nports - if self.get_key() in ' '.join(map( - lambda p: ' '.join([p._type, p._nports]), self.get_parent().get_ports()) - ): return 'part' - #hide port controllers for vlen, when == 1 - if self.get_key() in ' '.join(map( - lambda p: p._vlen, self.get_parent().get_ports()) - ): - try: - if int(self.get_evaluated()) == 1: return 'part' - except: pass - #hide empty grid positions - if self.get_key() in ('grid_pos', 'notebook') and not self.get_value(): return 'part' - return hide - - def validate(self): - """ - Validate the param. - A test evaluation is performed - """ - _Param.validate(self) #checks type - self._evaluated = None - try: self._evaluated = self.evaluate() - except Exception, e: self.add_error_message(str(e)) - - def get_evaluated(self): return self._evaluated - - def evaluate(self): - """ - Evaluate the value. - - Returns: - evaluated type - """ - self._init = True - self._lisitify_flag = False - self._stringify_flag = False - self._hostage_cells = list() - t = self.get_type() - v = self.get_value() - ######################### - # Enum Type - ######################### - if self.is_enum(): return v - ######################### - # Numeric Types - ######################### - elif t in ('raw', 'complex', 'real', 'float', 'int', 'hex', 'bool'): - #raise exception if python cannot evaluate this value - try: e = self.get_parent().get_parent().evaluate(v) - except Exception, e: raise Exception, 'Value "%s" cannot be evaluated:\n%s'%(v, e) - #raise an exception if the data is invalid - if t == 'raw': return e - elif t == 'complex': - if not isinstance(e, COMPLEX_TYPES): - raise Exception, 'Expression "%s" is invalid for type complex.'%str(e) - return e - elif t == 'real' or t == 'float': - if not isinstance(e, REAL_TYPES): - raise Exception, 'Expression "%s" is invalid for type float.'%str(e) - return e - elif t == 'int': - if not isinstance(e, INT_TYPES): - raise Exception, 'Expression "%s" is invalid for type integer.'%str(e) - return e - elif t == 'hex': return hex(e) - elif t == 'bool': - if not isinstance(e, bool): - raise Exception, 'Expression "%s" is invalid for type bool.'%str(e) - return e - else: raise TypeError, 'Type "%s" not handled'%t - ######################### - # Numeric Vector Types - ######################### - elif t in ('complex_vector', 'real_vector', 'float_vector', 'int_vector'): - if not v: v = '()' #turn a blank string into an empty list, so it will eval - #raise exception if python cannot evaluate this value - try: e = self.get_parent().get_parent().evaluate(v) - except Exception, e: raise Exception, 'Value "%s" cannot be evaluated:\n%s'%(v, e) - #raise an exception if the data is invalid - if t == 'complex_vector': - if not isinstance(e, VECTOR_TYPES): - self._lisitify_flag = True - e = [e] - if not all([isinstance(ei, COMPLEX_TYPES) for ei in e]): - raise Exception, 'Expression "%s" is invalid for type complex vector.'%str(e) - return e - elif t == 'real_vector' or t == 'float_vector': - if not isinstance(e, VECTOR_TYPES): - self._lisitify_flag = True - e = [e] - if not all([isinstance(ei, REAL_TYPES) for ei in e]): - raise Exception, 'Expression "%s" is invalid for type float vector.'%str(e) - return e - elif t == 'int_vector': - if not isinstance(e, VECTOR_TYPES): - self._lisitify_flag = True - e = [e] - if not all([isinstance(ei, INT_TYPES) for ei in e]): - raise Exception, 'Expression "%s" is invalid for type integer vector.'%str(e) - return e - ######################### - # String Types - ######################### - elif t in ('string', 'file_open', 'file_save', '_multiline', '_multiline_python_external'): - #do not check if file/directory exists, that is a runtime issue - try: - e = self.get_parent().get_parent().evaluate(v) - if not isinstance(e, str): - raise Exception() - except: - self._stringify_flag = True - e = str(v) - if t == '_multiline_python_external': - ast.parse(e) # raises SyntaxError - return e - ######################### - # Unique ID Type - ######################### - elif t == 'id': - #can python use this as a variable? - if not _check_id_matcher.match(v): - raise Exception, 'ID "%s" must begin with a letter and may contain letters, numbers, and underscores.'%v - ids = [param.get_value() for param in self.get_all_params(t)] - if ids.count(v) > 1: #id should only appear once, or zero times if block is disabled - raise Exception, 'ID "%s" is not unique.'%v - if v in ID_BLACKLIST: - raise Exception, 'ID "%s" is blacklisted.'%v - return v - ######################### - # Stream ID Type - ######################### - elif t == 'stream_id': - #get a list of all stream ids used in the virtual sinks - ids = [param.get_value() for param in filter( - lambda p: p.get_parent().is_virtual_sink(), - self.get_all_params(t), - )] - #check that the virtual sink's stream id is unique - if self.get_parent().is_virtual_sink(): - if ids.count(v) > 1: #id should only appear once, or zero times if block is disabled - raise Exception, 'Stream ID "%s" is not unique.'%v - #check that the virtual source's steam id is found - if self.get_parent().is_virtual_source(): - if v not in ids: - raise Exception, 'Stream ID "%s" is not found.'%v - return v - ######################### - # GUI Position/Hint - ######################### - elif t == 'gui_hint': - if ':' in v: tab, pos = v.split(':') - elif '@' in v: tab, pos = v, '' - else: tab, pos = '', v - - if '@' in tab: tab, index = tab.split('@') - else: index = '?' - - widget_str = ({ - (True, True): 'self.%(tab)s_grid_layout_%(index)s.addWidget(%(widget)s, %(pos)s)', - (True, False): 'self.%(tab)s_layout_%(index)s.addWidget(%(widget)s)', - (False, True): 'self.top_grid_layout.addWidget(%(widget)s, %(pos)s)', - (False, False): 'self.top_layout.addWidget(%(widget)s)', - }[bool(tab), bool(pos)])%{'tab': tab, 'index': index, 'widget': '%s', 'pos': pos} - - # FIXME: Move replace(...) into the make template of the qtgui blocks and return a string here - class GuiHint(object): - def __init__(self, ws): - self._ws = ws - - def __call__(self, w): - return (self._ws.replace('addWidget', 'addLayout') if 'layout' in w else self._ws) % w - - def __str__(self): - return self._ws - return GuiHint(widget_str) - ######################### - # Grid Position Type - ######################### - elif t == 'grid_pos': - if not v: return '' #allow for empty grid pos - e = self.get_parent().get_parent().evaluate(v) - if not isinstance(e, (list, tuple)) or len(e) != 4 or not all([isinstance(ei, int) for ei in e]): - raise Exception, 'A grid position must be a list of 4 integers.' - row, col, row_span, col_span = e - #check row, col - if row < 0 or col < 0: - raise Exception, 'Row and column must be non-negative.' - #check row span, col span - if row_span <= 0 or col_span <= 0: - raise Exception, 'Row and column span must be greater than zero.' - #get hostage cell parent - try: my_parent = self.get_parent().get_param('notebook').evaluate() - except: my_parent = '' - #calculate hostage cells - for r in range(row_span): - for c in range(col_span): - self._hostage_cells.append((my_parent, (row+r, col+c))) - #avoid collisions - params = filter(lambda p: p is not self, self.get_all_params('grid_pos')) - for param in params: - for parent, cell in param._hostage_cells: - if (parent, cell) in self._hostage_cells: - raise Exception, 'Another graphical element is using parent "%s", cell "%s".'%(str(parent), str(cell)) - return e - ######################### - # Notebook Page Type - ######################### - elif t == 'notebook': - if not v: return '' #allow for empty notebook - #get a list of all notebooks - notebook_blocks = filter(lambda b: b.get_key() == 'notebook', self.get_parent().get_parent().get_enabled_blocks()) - #check for notebook param syntax - try: notebook_id, page_index = map(str.strip, v.split(',')) - except: raise Exception, 'Bad notebook page format.' - #check that the notebook id is valid - try: notebook_block = filter(lambda b: b.get_id() == notebook_id, notebook_blocks)[0] - except: raise Exception, 'Notebook id "%s" is not an existing notebook id.'%notebook_id - #check that page index exists - if int(page_index) not in range(len(notebook_block.get_param('labels').evaluate())): - raise Exception, 'Page index "%s" is not a valid index number.'%page_index - return notebook_id, page_index - ######################### - # Import Type - ######################### - elif t == 'import': - n = dict() #new namespace - try: exec v in n - except ImportError: raise Exception, 'Import "%s" failed.'%v - except Exception: raise Exception, 'Bad import syntax: "%s".'%v - return filter(lambda k: str(k) != '__builtins__', n.keys()) - ######################### - else: raise TypeError, 'Type "%s" not handled'%t - - def to_code(self): - """ - Convert the value to code. - For string and list types, check the init flag, call evaluate(). - This ensures that evaluate() was called to set the xxxify_flags. - - Returns: - a string representing the code - """ - v = self.get_value() - t = self.get_type() - if t in ('string', 'file_open', 'file_save', '_multiline', '_multiline_python_external'): # string types - if not self._init: self.evaluate() - if self._stringify_flag: return '"%s"'%v.replace('"', '\"') - else: return v - elif t in ('complex_vector', 'real_vector', 'float_vector', 'int_vector'): #vector types - if not self._init: self.evaluate() - if self._lisitify_flag: return '(%s, )'%v - else: return '(%s)'%v - else: return v - - def get_all_params(self, type): - """ - Get all the params from the flowgraph that have the given type. - - Args: - type: the specified type - - Returns: - a list of params - """ - return sum([filter(lambda p: p.get_type() == type, block.get_params()) for block in self.get_parent().get_parent().get_enabled_blocks()], []) diff --git a/grc/python/Platform.py b/grc/python/Platform.py deleted file mode 100644 index 2a0bbf9a9e..0000000000 --- a/grc/python/Platform.py +++ /dev/null @@ -1,174 +0,0 @@ -""" -Copyright 2008-2016 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import os -import sys - -from gnuradio import gr - -from .. base.Platform import Platform as _Platform -from .. gui.Platform import Platform as _GUIPlatform -from .. gui import Messages - -from . import extract_docs -from .FlowGraph import FlowGraph as _FlowGraph -from .Connection import Connection as _Connection -from .Block import Block as _Block -from .Port import Port as _Port -from .Param import Param as _Param -from .Generator import Generator -from .Constants import ( - HIER_BLOCKS_LIB_DIR, BLOCK_DTD, DEFAULT_FLOW_GRAPH, BLOCKS_DIRS, - PREFS_FILE, PREFS_FILE_OLD, CORE_TYPES -) - -COLORS = [(name, color) for name, key, sizeof, color in CORE_TYPES] - - -class Platform(_Platform, _GUIPlatform): - def __init__(self): - """ - Make a platform for gnuradio. - """ - # ensure hier and conf directories - if not os.path.exists(HIER_BLOCKS_LIB_DIR): - os.mkdir(HIER_BLOCKS_LIB_DIR) - if not os.path.exists(os.path.dirname(PREFS_FILE)): - os.mkdir(os.path.dirname(PREFS_FILE)) - - self.block_docstrings = block_docstrings = dict() - self.block_docstrings_loaded_callback = lambda: None - - def setter(key, docs): - block_docstrings[key] = '\n\n'.join( - '--- {0} ---\n{1}\n'.format(b, d.replace('\n\n', '\n')) - for b, d in docs.iteritems() if d is not None - ) - - self._docstring_extractor = extract_docs.SubprocessLoader( - callback_query_result=setter, - callback_finished=lambda: self.block_docstrings_loaded_callback() - ) - - # init - _Platform.__init__( - self, - name='GNU Radio Companion', - version=(gr.version(), gr.major_version(), gr.api_version(), gr.minor_version()), - key='grc', - license=__doc__.strip(), - website='http://gnuradio.org/', - block_paths=BLOCKS_DIRS, - block_dtd=BLOCK_DTD, - default_flow_graph=DEFAULT_FLOW_GRAPH, - generator=Generator, - colors=COLORS, - ) - self._move_old_pref_file() - _GUIPlatform.__init__( - self, - prefs_file=PREFS_FILE - ) - self._auto_hier_block_generate_chain = set() - - @staticmethod - def _move_old_pref_file(): - if PREFS_FILE == PREFS_FILE_OLD: - return # prefs file overridden with env var - if os.path.exists(PREFS_FILE_OLD) and not os.path.exists(PREFS_FILE): - try: - import shutil - shutil.move(PREFS_FILE_OLD, PREFS_FILE) - except Exception as e: - print >> sys.stderr, e - - def load_blocks(self): - self._docstring_extractor.start() - _Platform.load_blocks(self) - self._docstring_extractor.finish() - # self._docstring_extractor.wait() - - def load_block_xml(self, xml_file): - block = _Platform.load_block_xml(self, xml_file) - self._docstring_extractor.query( - block.get_key(), - block.get_imports(raw=True), - block.get_make(raw=True) - ) - return block - - @staticmethod - def find_file_in_paths(filename, paths, cwd): - """Checks the provided paths relative to cwd for a certain filename""" - if not os.path.isdir(cwd): - cwd = os.path.dirname(cwd) - if isinstance(paths, str): - paths = (p for p in paths.split(':') if p) - - for path in paths: - path = os.path.expanduser(path) - if not os.path.isabs(path): - path = os.path.normpath(os.path.join(cwd, path)) - file_path = os.path.join(path, filename) - if os.path.exists(os.path.normpath(file_path)): - return file_path - - def load_and_generate_flow_graph(self, file_path): - """Loads a flowgraph from file and generates it""" - Messages.set_indent(len(self._auto_hier_block_generate_chain)) - Messages.send('>>> Loading: %r\n' % file_path) - if file_path in self._auto_hier_block_generate_chain: - Messages.send(' >>> Warning: cyclic hier_block dependency\n') - return False - self._auto_hier_block_generate_chain.add(file_path) - try: - flow_graph = self.get_new_flow_graph() - flow_graph.grc_file_path = file_path - # other, nested higiter_blocks might be auto-loaded here - flow_graph.import_data(self.parse_flow_graph(file_path)) - flow_graph.rewrite() - flow_graph.validate() - if not flow_graph.is_valid(): - raise Exception('Flowgraph invalid') - except Exception as e: - Messages.send('>>> Load Error: %r: %s\n' % (file_path, str(e))) - return False - finally: - self._auto_hier_block_generate_chain.discard(file_path) - Messages.set_indent(len(self._auto_hier_block_generate_chain)) - - try: - Messages.send('>>> Generating: %r\n' % file_path) - generator = self.get_generator()(flow_graph, file_path) - generator.write() - except Exception as e: - Messages.send('>>> Generate Error: %r: %s\n' % (file_path, str(e))) - return False - - self.load_block_xml(generator.get_file_path_xml()) - return True - - ############################################## - # Constructors - ############################################## - FlowGraph = _FlowGraph - Connection = _Connection - Block = _Block - Port = _Port - Param = _Param diff --git a/grc/python/Port.py b/grc/python/Port.py deleted file mode 100644 index 249d7aed71..0000000000 --- a/grc/python/Port.py +++ /dev/null @@ -1,268 +0,0 @@ -""" -Copyright 2008-2012 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -from .. base.Port import Port as _Port -from .. base.Constants import DEFAULT_DOMAIN, GR_MESSAGE_DOMAIN -from .. gui.Port import Port as _GUIPort -import Constants - - -def _get_source_from_virtual_sink_port(vsp): - """ - Resolve the source port that is connected to the given virtual sink port. - Use the get source from virtual source to recursively resolve subsequent ports. - """ - try: return _get_source_from_virtual_source_port( - vsp.get_enabled_connections()[0].get_source()) - except: raise Exception, 'Could not resolve source for virtual sink port %s'%vsp - -def _get_source_from_virtual_source_port(vsp, traversed=[]): - """ - Recursively resolve source ports over the virtual connections. - Keep track of traversed sources to avoid recursive loops. - """ - if not vsp.get_parent().is_virtual_source(): return vsp - if vsp in traversed: raise Exception, 'Loop found when resolving virtual source %s'%vsp - try: return _get_source_from_virtual_source_port( - _get_source_from_virtual_sink_port( - filter(#get all virtual sinks with a matching stream id - lambda vs: vs.get_param('stream_id').get_value() == vsp.get_parent().get_param('stream_id').get_value(), - filter(#get all enabled blocks that are also virtual sinks - lambda b: b.is_virtual_sink(), - vsp.get_parent().get_parent().get_enabled_blocks(), - ), - )[0].get_sinks()[0] - ), traversed + [vsp], - ) - except: raise Exception, 'Could not resolve source for virtual source port %s'%vsp - -def _get_sink_from_virtual_source_port(vsp): - """ - Resolve the sink port that is connected to the given virtual source port. - Use the get sink from virtual sink to recursively resolve subsequent ports. - """ - try: return _get_sink_from_virtual_sink_port( - vsp.get_enabled_connections()[0].get_sink()) # Could have many connections, but use first - except: raise Exception, 'Could not resolve source for virtual source port %s'%vsp - -def _get_sink_from_virtual_sink_port(vsp, traversed=[]): - """ - Recursively resolve sink ports over the virtual connections. - Keep track of traversed sinks to avoid recursive loops. - """ - if not vsp.get_parent().is_virtual_sink(): return vsp - if vsp in traversed: raise Exception, 'Loop found when resolving virtual sink %s'%vsp - try: return _get_sink_from_virtual_sink_port( - _get_sink_from_virtual_source_port( - filter(#get all virtual source with a matching stream id - lambda vs: vs.get_param('stream_id').get_value() == vsp.get_parent().get_param('stream_id').get_value(), - filter(#get all enabled blocks that are also virtual sinks - lambda b: b.is_virtual_source(), - vsp.get_parent().get_parent().get_enabled_blocks(), - ), - )[0].get_sources()[0] - ), traversed + [vsp], - ) - except: raise Exception, 'Could not resolve source for virtual sink port %s'%vsp - -class Port(_Port, _GUIPort): - - def __init__(self, block, n, dir): - """ - Make a new port from nested data. - - Args: - block: the parent element - n: the nested odict - dir: the direction - """ - self._n = n - if n['type'] == 'message': - n['domain'] = GR_MESSAGE_DOMAIN - if 'domain' not in n: - n['domain'] = DEFAULT_DOMAIN - elif n['domain'] == GR_MESSAGE_DOMAIN: - n['key'] = n['name'] - n['type'] = 'message' # for port color - if n['type'] == 'msg': - n['key'] = 'msg' - if not n.find('key'): - n['key'] = str(next(block.port_counters[dir == 'source'])) - # build the port - _Port.__init__( - self, - block=block, - n=n, - dir=dir, - ) - _GUIPort.__init__(self) - self._nports = n.find('nports') or '' - self._vlen = n.find('vlen') or '' - self._optional = bool(n.find('optional')) - self._clones = [] # references to cloned ports (for nports > 1) - - def get_types(self): return Constants.TYPE_TO_SIZEOF.keys() - - def is_type_empty(self): return not self._n['type'] - - def validate(self): - _Port.validate(self) - if not self.get_enabled_connections() and not self.get_optional(): - self.add_error_message('Port is not connected.') - #message port logic - if self.get_type() == 'msg': - if self.get_nports(): - self.add_error_message('A port of type "msg" cannot have "nports" set.') - if self.get_vlen() != 1: - self.add_error_message('A port of type "msg" must have a "vlen" of 1.') - - def rewrite(self): - """ - Handle the port cloning for virtual blocks. - """ - if self.is_type_empty(): - try: #clone type and vlen - source = self.resolve_empty_type() - self._type = str(source.get_type()) - self._vlen = str(source.get_vlen()) - except: #reset type and vlen - self._type = '' - self._vlen = '' - _Port.rewrite(self) - - def resolve_virtual_source(self): - if self.get_parent().is_virtual_sink(): return _get_source_from_virtual_sink_port(self) - if self.get_parent().is_virtual_source(): return _get_source_from_virtual_source_port(self) - - def resolve_empty_type(self): - if self.is_sink(): - try: - src = _get_source_from_virtual_sink_port(self) - if not src.is_type_empty(): return src - except: pass - sink = _get_sink_from_virtual_sink_port(self) - if not sink.is_type_empty(): return sink - if self.is_source(): - try: - src = _get_source_from_virtual_source_port(self) - if not src.is_type_empty(): return src - except: pass - sink = _get_sink_from_virtual_source_port(self) - if not sink.is_type_empty(): return sink - - def get_vlen(self): - """ - Get the vector length. - If the evaluation of vlen cannot be cast to an integer, return 1. - - Returns: - the vector length or 1 - """ - vlen = self.get_parent().resolve_dependencies(self._vlen) - try: return int(self.get_parent().get_parent().evaluate(vlen)) - except: return 1 - - def get_nports(self): - """ - Get the number of ports. - If already blank, return a blank - If the evaluation of nports cannot be cast to a positive integer, return 1. - - Returns: - the number of ports or 1 - """ - if self._nports == '': return '' - - nports = self.get_parent().resolve_dependencies(self._nports) - try: - return max(1, int(self.get_parent().get_parent().evaluate(nports))) - except: - return 1 - - def get_optional(self): return bool(self._optional) - - def get_color(self): - """ - Get the color that represents this port's type. - Codes differ for ports where the vec length is 1 or greater than 1. - - Returns: - a hex color code. - """ - try: - color = Constants.TYPE_TO_COLOR[self.get_type()] - vlen = self.get_vlen() - if vlen == 1: return color - color_val = int(color[1:], 16) - r = (color_val >> 16) & 0xff - g = (color_val >> 8) & 0xff - b = (color_val >> 0) & 0xff - dark = (0, 0, 30, 50, 70)[min(4, vlen)] - r = max(r-dark, 0) - g = max(g-dark, 0) - b = max(b-dark, 0) - return '#%.2x%.2x%.2x'%(r, g, b) - except: return _Port.get_color(self) - - def get_clones(self): - """ - Get the clones of this master port (nports > 1) - - Returns: - a list of ports - """ - return self._clones - - def add_clone(self): - """ - Create a clone of this (master) port and store a reference in self._clones. - - The new port name (and key for message ports) will have index 1... appended. - If this is the first clone, this (master) port will get a 0 appended to its name (and key) - - Returns: - the cloned port - """ - # add index to master port name if there are no clones yet - if not self._clones: - self._name = self._n['name'] + '0' - if not self._key.isdigit(): # also update key for none stream ports - self._key = self._name - - # Prepare a copy of the odict for the clone - n = self._n.copy() - if 'nports' in n: n.pop('nports') # remove nports from the key so the copy cannot be a duplicator - n['name'] = self._n['name'] + str(len(self._clones) + 1) - n['key'] = '99999' if self._key.isdigit() else n['name'] # dummy value 99999 will be fixed later - - port = self.__class__(self.get_parent(), n, self._dir) # clone - self._clones.append(port) - return port - - def remove_clone(self, port): - """ - Remove a cloned port (from the list of clones only) - Remove the index 0 of the master port name (and key9 if there are no more clones left - """ - self._clones.remove(port) - # remove index from master port name if there are no more clones - if not self._clones: - self._name = self._n['name'] - if not self._key.isdigit(): # also update key for none stream ports - self._key = self._name diff --git a/grc/python/__init__.py b/grc/python/__init__.py deleted file mode 100644 index 8b13789179..0000000000 --- a/grc/python/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/grc/python/block.dtd b/grc/python/block.dtd deleted file mode 100644 index 145f4d8610..0000000000 --- a/grc/python/block.dtd +++ /dev/null @@ -1,69 +0,0 @@ -<!-- -Copyright 2008 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA ---> -<!-- - gnuradio_python.blocks.dtd - Josh Blum - The document type definition for blocks. - --> -<!-- - Top level element. - A block contains a name, ...parameters list, and list of IO ports. - --> -<!ELEMENT block (name, key, category?, throttle?, flags?, import*, var_make?, var_value?, - make, callback*, param_tab_order?, param*, bus_sink?, bus_source?, check*, - sink*, source*, bus_structure_sink?, bus_structure_source?, doc?, grc_source?)> -<!-- - Sub level elements. - --> -<!ELEMENT param_tab_order (tab+)> -<!ELEMENT param (base_key?, name, key, value?, type?, hide?, option*, tab?)> -<!ELEMENT option (name, key, opt*)> -<!ELEMENT sink (name, type, vlen?, domain?, nports?, optional?, hide?)> -<!ELEMENT source (name, type, vlen?, domain?, nports?, optional?, hide?)> -<!-- - Bottom level elements. - Character data only. - --> -<!ELEMENT category (#PCDATA)> -<!ELEMENT import (#PCDATA)> -<!ELEMENT doc (#PCDATA)> -<!ELEMENT grc_source (#PCDATA)> -<!ELEMENT tab (#PCDATA)> -<!ELEMENT name (#PCDATA)> -<!ELEMENT base_key (#PCDATA)> -<!ELEMENT key (#PCDATA)> -<!ELEMENT check (#PCDATA)> -<!ELEMENT bus_sink (#PCDATA)> -<!ELEMENT bus_source (#PCDATA)> -<!ELEMENT opt (#PCDATA)> -<!ELEMENT type (#PCDATA)> -<!ELEMENT hide (#PCDATA)> -<!ELEMENT vlen (#PCDATA)> -<!ELEMENT domain (#PCDATA)> -<!ELEMENT nports (#PCDATA)> -<!ELEMENT bus_structure_sink (#PCDATA)> -<!ELEMENT bus_structure_source (#PCDATA)> -<!ELEMENT var_make (#PCDATA)> -<!ELEMENT var_value (#PCDATA)> -<!ELEMENT make (#PCDATA)> -<!ELEMENT value (#PCDATA)> -<!ELEMENT callback (#PCDATA)> -<!ELEMENT optional (#PCDATA)> -<!ELEMENT throttle (#PCDATA)> -<!ELEMENT flags (#PCDATA)> diff --git a/grc/python/default_flow_graph.grc b/grc/python/default_flow_graph.grc deleted file mode 100644 index 059509d34b..0000000000 --- a/grc/python/default_flow_graph.grc +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0"?> -<!-- -################################################### -##Default Flow Graph: -## include an options block and a variable for sample rate -################################################### - --> -<flow_graph> - <block> - <key>options</key> - <param> - <key>id</key> - <value>top_block</value> - </param> - <param> - <key>_coordinate</key> - <value>(8, 8)</value> - </param> - <param> - <key>_rotation</key> - <value>0</value> - </param> - </block> - <block> - <key>variable</key> - <param> - <key>id</key> - <value>samp_rate</value> - </param> - <param> - <key>value</key> - <value>32000</value> - </param> - <param> - <key>_coordinate</key> - <value>(8, 160)</value> - </param> - <param> - <key>_rotation</key> - <value>0</value> - </param> - </block> -</flow_graph> diff --git a/grc/python/epy_block_io.py b/grc/python/epy_block_io.py deleted file mode 100644 index e089908a01..0000000000 --- a/grc/python/epy_block_io.py +++ /dev/null @@ -1,95 +0,0 @@ - -import inspect -import collections - -from gnuradio import gr -import pmt - - -TYPE_MAP = { - 'complex64': 'complex', 'complex': 'complex', - 'float32': 'float', 'float': 'float', - 'int32': 'int', 'uint32': 'int', - 'int16': 'short', 'uint16': 'short', - 'int8': 'byte', 'uint8': 'byte', -} - -BlockIO = collections.namedtuple('BlockIO', 'name cls params sinks sources doc') - - -def _ports(sigs, msgs): - ports = list() - for i, dtype in enumerate(sigs): - port_type = TYPE_MAP.get(dtype.name, None) - if not port_type: - raise ValueError("Can't map {0:!r} to GRC port type".format(dtype)) - ports.append((str(i), port_type)) - for msg_key in msgs: - if msg_key == 'system': - continue - ports.append((msg_key, 'message')) - return ports - - -def _blk_class(source_code): - ns = {} - try: - exec source_code in ns - except Exception as e: - raise ValueError("Can't interpret source code: " + str(e)) - for var in ns.itervalues(): - if inspect.isclass(var)and issubclass(var, gr.gateway.gateway_block): - return var - raise ValueError('No python block class found in code') - - -def extract(cls): - if not inspect.isclass(cls): - cls = _blk_class(cls) - - spec = inspect.getargspec(cls.__init__) - defaults = map(repr, spec.defaults or ()) - doc = cls.__doc__ or cls.__init__.__doc__ or '' - cls_name = cls.__name__ - - if len(defaults) + 1 != len(spec.args): - raise ValueError("Need all __init__ arguments to have default values") - - try: - instance = cls() - except Exception as e: - raise RuntimeError("Can't create an instance of your block: " + str(e)) - - name = instance.name() - params = list(zip(spec.args[1:], defaults)) - - sinks = _ports(instance.in_sig(), - pmt.to_python(instance.message_ports_in())) - sources = _ports(instance.out_sig(), - pmt.to_python(instance.message_ports_out())) - - return BlockIO(name, cls_name, params, sinks, sources, doc) - - -if __name__ == '__main__': - blk_code = """ -import numpy as np -from gnuradio import gr -import pmt - -class blk(gr.sync_block): - def __init__(self, param1=None, param2=None): - "Test Docu" - gr.sync_block.__init__( - self, - name='Embedded Python Block', - in_sig = (np.float32,), - out_sig = (np.float32,np.complex64,), - ) - self.message_port_register_in(pmt.intern('msg_in')) - self.message_port_register_out(pmt.intern('msg_out')) - - def work(self, inputs_items, output_items): - return 10 - """ - print extract(blk_code) diff --git a/grc/python/expr_utils.py b/grc/python/expr_utils.py deleted file mode 100644 index 9e0b2a4a0a..0000000000 --- a/grc/python/expr_utils.py +++ /dev/null @@ -1,177 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import string -VAR_CHARS = string.letters + string.digits + '_' - -class graph(object): - """ - Simple graph structure held in a dictionary. - """ - - def __init__(self): self._graph = dict() - - def __str__(self): return str(self._graph) - - def add_node(self, node_key): - if self._graph.has_key(node_key): return - self._graph[node_key] = set() - - def remove_node(self, node_key): - if not self._graph.has_key(node_key): return - for edges in self._graph.values(): - if node_key in edges: edges.remove(node_key) - self._graph.pop(node_key) - - def add_edge(self, src_node_key, dest_node_key): - self._graph[src_node_key].add(dest_node_key) - - def remove_edge(self, src_node_key, dest_node_key): - self._graph[src_node_key].remove(dest_node_key) - - def get_nodes(self): return self._graph.keys() - - def get_edges(self, node_key): return self._graph[node_key] - -def expr_split(expr): - """ - Split up an expression by non alphanumeric characters, including underscore. - Leave strings in-tact. - #TODO ignore escaped quotes, use raw strings. - - Args: - expr: an expression string - - Returns: - a list of string tokens that form expr - """ - toks = list() - tok = '' - quote = '' - for char in expr: - if quote or char in VAR_CHARS: - if char == quote: quote = '' - tok += char - elif char in ("'", '"'): - toks.append(tok) - tok = char - quote = char - else: - toks.append(tok) - toks.append(char) - tok = '' - toks.append(tok) - return filter(lambda t: t, toks) - -def expr_replace(expr, replace_dict): - """ - Search for vars in the expression and add the prepend. - - Args: - expr: an expression string - replace_dict: a dict of find:replace - - Returns: - a new expression with the prepend - """ - expr_splits = expr_split(expr) - for i, es in enumerate(expr_splits): - if es in replace_dict.keys(): - expr_splits[i] = replace_dict[es] - return ''.join(expr_splits) - -def get_variable_dependencies(expr, vars): - """ - Return a set of variables used in this expression. - - Args: - expr: an expression string - vars: a list of variable names - - Returns: - a subset of vars used in the expression - """ - expr_toks = expr_split(expr) - return set(filter(lambda v: v in expr_toks, vars)) - -def get_graph(exprs): - """ - Get a graph representing the variable dependencies - - Args: - exprs: a mapping of variable name to expression - - Returns: - a graph of variable deps - """ - vars = exprs.keys() - #get dependencies for each expression, load into graph - var_graph = graph() - for var in vars: var_graph.add_node(var) - for var, expr in exprs.iteritems(): - for dep in get_variable_dependencies(expr, vars): - if dep != var: var_graph.add_edge(dep, var) - return var_graph - -def sort_variables(exprs): - """ - Get a list of variables in order of dependencies. - - Args: - exprs: a mapping of variable name to expression - - Returns: - a list of variable names - @throws Exception circular dependencies - """ - var_graph = get_graph(exprs) - sorted_vars = list() - #determine dependency order - while var_graph.get_nodes(): - #get a list of nodes with no edges - indep_vars = filter(lambda var: not var_graph.get_edges(var), var_graph.get_nodes()) - if not indep_vars: raise Exception('circular dependency caught in sort_variables') - #add the indep vars to the end of the list - sorted_vars.extend(sorted(indep_vars)) - #remove each edge-less node from the graph - for var in indep_vars: var_graph.remove_node(var) - return reversed(sorted_vars) - -def sort_objects(objects, get_id, get_expr): - """ - Sort a list of objects according to their expressions. - - Args: - objects: the list of objects to sort - get_id: the function to extract an id from the object - get_expr: the function to extract an expression from the object - - Returns: - a list of sorted objects - """ - id2obj = dict([(get_id(obj), obj) for obj in objects]) - #map obj id to expression code - id2expr = dict([(get_id(obj), get_expr(obj)) for obj in objects]) - #sort according to dependency - sorted_ids = sort_variables(id2expr) - #return list of sorted objects - return [id2obj[id] for id in sorted_ids] - -if __name__ == '__main__': - for i in sort_variables({'x':'1', 'y':'x+1', 'a':'x+y', 'b':'y+1', 'c':'a+b+x+y'}): print i diff --git a/grc/python/extract_docs.py b/grc/python/extract_docs.py deleted file mode 100644 index d8dc4f4e8f..0000000000 --- a/grc/python/extract_docs.py +++ /dev/null @@ -1,296 +0,0 @@ -""" -Copyright 2008-2011 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -import sys -import re -import subprocess -import threading -import json -import Queue -import random -import itertools - - -############################################################################### -# The docstring extraction -############################################################################### - -def docstring_guess_from_key(key): - """Extract the documentation from the python __doc__ strings - - By guessing module and constructor names from key - - Args: - key: the block key - - Returns: - a dict (block_name --> doc string) - """ - doc_strings = dict() - - in_tree = [key.partition('_')[::2] + ( - lambda package: getattr(__import__('gnuradio.' + package), package), - )] - - key_parts = key.split('_') - oot = [ - ('_'.join(key_parts[:i]), '_'.join(key_parts[i:]), __import__) - for i in range(1, len(key_parts)) - ] - - for module_name, init_name, importer in itertools.chain(in_tree, oot): - if not module_name or not init_name: - continue - try: - module = importer(module_name) - break - except ImportError: - continue - else: - return doc_strings - - pattern = re.compile( - '^' + init_name.replace('_', '_*').replace('x', r'\w') + r'\w*$' - ) - for match in filter(pattern.match, dir(module)): - try: - doc_strings[match] = getattr(module, match).__doc__.strip() - except AttributeError: - continue - - return doc_strings - - -def docstring_from_make(key, imports, make): - """Extract the documentation from the python __doc__ strings - - By importing it and checking a truncated make - - Args: - key: the block key - imports: a list of import statements (string) to execute - make: block constructor template - - Returns: - a list of tuples (block_name, doc string) - """ - - try: - blk_cls = make.partition('(')[0].strip() - if '$' in blk_cls: - raise ValueError('Not an identifier') - - ns = dict() - for _import in imports: - exec(_import.strip(), ns) - blk = eval(blk_cls, ns) - - doc_strings = {key: blk.__doc__} - - except (ImportError, AttributeError, SyntaxError, ValueError): - doc_strings = docstring_guess_from_key(key) - - return doc_strings - - -############################################################################### -# Manage docstring extraction in separate process -############################################################################### - -class SubprocessLoader(object): - """Start and manage docstring extraction process - - Manages subprocess and handles RPC. - """ - BOOTSTRAP = "import runpy; runpy.run_path({!r}, run_name='__worker__')" - AUTH_CODE = random.random() # sort out unwanted output of worker process - RESTART = 5 # number of worker restarts before giving up - DONE = object() # sentinel value to signal end-of-queue - - def __init__(self, callback_query_result, callback_finished=None): - self.callback_query_result = callback_query_result - self.callback_finished = callback_finished or (lambda: None) - - self._queue = Queue.Queue() - self._thread = None - self._worker = None - self._shutdown = threading.Event() - self._last_cmd = None - - def start(self): - """Start the worker process handler thread""" - if self._thread is not None: - return - self._shutdown.clear() - thread = self._thread = threading.Thread(target=self.run_worker) - thread.daemon = True - thread.start() - - def run_worker(self): - """Read docstring back from worker stdout and execute callback.""" - for _ in range(self.RESTART): - if self._shutdown.is_set(): - break - try: - self._worker = subprocess.Popen( - args=(sys.executable, '-uc', self.BOOTSTRAP.format(__file__)), - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - self._handle_worker() - - except (OSError, IOError): - msg = "Warning: restarting the docstring loader" - cmd, args = self._last_cmd - if cmd == 'query': - msg += " (crashed while loading {0!r})".format(args[0]) - print >> sys.stderr, msg - continue # restart - else: - break # normal termination, return - finally: - self._worker.terminate() - else: - print >> sys.stderr, "Warning: docstring loader crashed too often" - self._thread = None - self._worker = None - self.callback_finished() - - def _handle_worker(self): - """Send commands and responses back from worker.""" - assert '1' == self._worker.stdout.read(1) - for cmd, args in iter(self._queue.get, self.DONE): - self._last_cmd = cmd, args - self._send(cmd, args) - cmd, args = self._receive() - self._handle_response(cmd, args) - - def _send(self, cmd, args): - """send a command to worker's stdin""" - fd = self._worker.stdin - json.dump((self.AUTH_CODE, cmd, args), fd) - fd.write('\n'.encode()) - - def _receive(self): - """receive response from worker's stdout""" - for line in iter(self._worker.stdout.readline, ''): - try: - key, cmd, args = json.loads(line, encoding='utf-8') - if key != self.AUTH_CODE: - raise ValueError('Got wrong auth code') - return cmd, args - except ValueError: - continue # ignore invalid output from worker - else: - raise IOError("Can't read worker response") - - def _handle_response(self, cmd, args): - """Handle response from worker, call the callback""" - if cmd == 'result': - key, docs = args - self.callback_query_result(key, docs) - elif cmd == 'error': - print args - else: - print >> sys.stderr, "Unknown response:", cmd, args - - def query(self, key, imports=None, make=None): - """request docstring extraction for a certain key""" - if self._thread is None: - self.start() - if imports and make: - self._queue.put(('query', (key, imports, make))) - else: - self._queue.put(('query_key_only', (key,))) - - def finish(self): - """signal end of requests""" - self._queue.put(self.DONE) - - def wait(self): - """Wait for the handler thread to die""" - if self._thread: - self._thread.join() - - def terminate(self): - """Terminate the worker and wait""" - self._shutdown.set() - try: - self._worker.terminate() - self.wait() - except (OSError, AttributeError): - pass - - -############################################################################### -# Main worker entry point -############################################################################### - -def worker_main(): - """Main entry point for the docstring extraction process. - - Manages RPC with main process through. - Runs a docstring extraction for each key it read on stdin. - """ - def send(cmd, args): - json.dump((code, cmd, args), sys.stdout) - sys.stdout.write('\n'.encode()) - - sys.stdout.write('1') - for line in iter(sys.stdin.readline, ''): - code, cmd, args = json.loads(line, encoding='utf-8') - try: - if cmd == 'query': - key, imports, make = args - send('result', (key, docstring_from_make(key, imports, make))) - elif cmd == 'query_key_only': - key, = args - send('result', (key, docstring_guess_from_key(key))) - elif cmd == 'exit': - break - except Exception as e: - send('error', repr(e)) - - -if __name__ == '__worker__': - worker_main() - -elif __name__ == '__main__': - def callback(key, docs): - print key - for match, doc in docs.iteritems(): - print '-->', match - print doc.strip() - print - print - - r = SubprocessLoader(callback) - - # r.query('analog_feedforward_agc_cc') - # r.query('uhd_source') - r.query('expr_utils_graph') - r.query('blocks_add_cc') - r.query('blocks_add_cc', ['import gnuradio.blocks'], 'gnuradio.blocks.add_cc(') - # r.query('analog_feedforward_agc_cc') - # r.query('uhd_source') - # r.query('uhd_source') - # r.query('analog_feedforward_agc_cc') - r.finish() - # r.terminate() - r.wait() diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl deleted file mode 100644 index bd8025b676..0000000000 --- a/grc/python/flow_graph.tmpl +++ /dev/null @@ -1,439 +0,0 @@ -#if not $generate_options.startswith('hb') -#!/usr/bin/env python2 -#end if -# -*- coding: utf-8 -*- -######################################################## -##Cheetah template - gnuradio_python -## -##@param imports the import statements -##@param flow_graph the flow_graph -##@param variables the variable blocks -##@param parameters the parameter blocks -##@param blocks the signal blocks -##@param connections the connections -##@param msgs the msg type connections -##@param generate_options the type of flow graph -##@param var_id2cbs variable id map to callback strings -######################################################## -#def indent($code) -#set $code = '\n '.join(str($code).splitlines()) -$code#slurp -#end def -#import time -#set $DIVIDER = '#'*50 -$DIVIDER -# GNU Radio Python Flow Graph -# Title: $title -#if $flow_graph.get_option('author') -# Author: $flow_graph.get_option('author') -#end if -#if $flow_graph.get_option('description') -# Description: $flow_graph.get_option('description') -#end if -# Generated: $time.ctime() -$DIVIDER -#if $flow_graph.get_option('thread_safe_setters') -import threading -#end if - -## Call XInitThreads as the _very_ first thing. -## After some Qt import, it's too late -#if $generate_options in ('wx_gui', 'qt_gui') -if __name__ == '__main__': - import ctypes - import sys - if sys.platform.startswith('linux'): - try: - x11 = ctypes.cdll.LoadLibrary('libX11.so') - x11.XInitThreads() - except: - print "Warning: failed to XInitThreads()" - -#end if -# -######################################################## -##Create Imports -######################################################## -#if $flow_graph.get_option('qt_qss_theme') -#set imports = $sorted(set($imports + ["import os", "import sys"])) -#end if -#if any(imp.endswith("# grc-generated hier_block") for imp in $imports) -import os -import sys -#set imports = $filter(lambda i: i not in ("import os", "import sys"), $imports) -sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio'))) - -#end if -#for $imp in $imports -##$(imp.replace(" # grc-generated hier_block", "")) -$imp -#end for -######################################################## -##Create Class -## Write the class declaration for a top or hier block. -## The parameter names are the arguments to __init__. -## Determine the absolute icon path (wx gui only). -## Setup the IO signature (hier block only). -######################################################## -#set $class_name = $flow_graph.get_option('id') -#set $param_str = ', '.join(['self'] + ['%s=%s'%(param.get_id(), param.get_make()) for param in $parameters]) -#if $generate_options == 'wx_gui' - #import gtk - #set $icon = gtk.IconTheme().lookup_icon('gnuradio-grc', 32, 0) - - -class $(class_name)(grc_wxgui.top_block_gui): - - def __init__($param_str): - grc_wxgui.top_block_gui.__init__(self, title="$title") - #if $icon - _icon_path = "$icon.get_filename()" - self.SetIcon(wx.Icon(_icon_path, wx.BITMAP_TYPE_ANY)) - #end if -#elif $generate_options == 'qt_gui' - - -class $(class_name)(gr.top_block, Qt.QWidget): - - def __init__($param_str): - gr.top_block.__init__(self, "$title") - Qt.QWidget.__init__(self) - self.setWindowTitle("$title") - try: - self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) - except: - pass - self.top_scroll_layout = Qt.QVBoxLayout() - self.setLayout(self.top_scroll_layout) - self.top_scroll = Qt.QScrollArea() - self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) - self.top_scroll_layout.addWidget(self.top_scroll) - self.top_scroll.setWidgetResizable(True) - self.top_widget = Qt.QWidget() - self.top_scroll.setWidget(self.top_widget) - self.top_layout = Qt.QVBoxLayout(self.top_widget) - self.top_grid_layout = Qt.QGridLayout() - self.top_layout.addLayout(self.top_grid_layout) - - self.settings = Qt.QSettings("GNU Radio", "$class_name") - self.restoreGeometry(self.settings.value("geometry").toByteArray()) -#elif $generate_options == 'no_gui' - - -class $(class_name)(gr.top_block): - - def __init__($param_str): - gr.top_block.__init__(self, "$title") -#elif $generate_options.startswith('hb') - #set $in_sigs = $flow_graph.get_hier_block_stream_io('in') - #set $out_sigs = $flow_graph.get_hier_block_stream_io('out') - - -#if $generate_options == 'hb_qt_gui' -class $(class_name)(gr.hier_block2, Qt.QWidget): -#else -class $(class_name)(gr.hier_block2): -#end if -#def make_io_sig($io_sigs) - #set $size_strs = ['%s*%s'%(io_sig['size'], io_sig['vlen']) for io_sig in $io_sigs] - #if len($io_sigs) == 0 -gr.io_signature(0, 0, 0)#slurp - #elif len($io_sigs) == 1 -gr.io_signature(1, 1, $size_strs[0])#slurp - #else -gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))])#slurp - #end if -#end def - - def __init__($param_str): - gr.hier_block2.__init__( - self, "$title", - $make_io_sig($in_sigs), - $make_io_sig($out_sigs), - ) - #for $pad in $flow_graph.get_hier_block_message_io('in') - self.message_port_register_hier_in("$pad['label']") - #end for - #for $pad in $flow_graph.get_hier_block_message_io('out') - self.message_port_register_hier_out("$pad['label']") - #end for - #if $generate_options == 'hb_qt_gui' - - Qt.QWidget.__init__(self) - self.top_layout = Qt.QVBoxLayout() - self.top_grid_layout = Qt.QGridLayout() - self.top_layout.addLayout(self.top_grid_layout) - self.setLayout(self.top_layout) - #end if -#end if -#if $flow_graph.get_option('thread_safe_setters') - - self._lock = threading.RLock() -#end if -######################################################## -##Create Parameters -## Set the parameter to a property of self. -######################################################## -#if $parameters - - $DIVIDER - # Parameters - $DIVIDER -#end if -#for $param in $parameters - $indent($param.get_var_make()) -#end for -######################################################## -##Create Variables -######################################################## -#if $variables - - $DIVIDER - # Variables - $DIVIDER -#end if -#for $var in $variables - $indent($var.get_var_make()) -#end for -######################################################## -##Create Message Queues -######################################################## -#if $msgs - - $DIVIDER - # Message Queues - $DIVIDER -#end if -#for $msg in $msgs - $(msg.get_source().get_parent().get_id())_msgq_out = $(msg.get_sink().get_parent().get_id())_msgq_in = gr.msg_queue(2) -#end for -######################################################## -##Create Blocks -######################################################## -#if $blocks - - $DIVIDER - # Blocks - $DIVIDER -#end if -#for $blk in filter(lambda b: b.get_make(), $blocks) - #if $blk in $variables - $indent($blk.get_make()) - #else - self.$blk.get_id() = $indent($blk.get_make()) - #if $blk.has_param('alias') and $blk.get_param('alias').get_evaluated() - (self.$blk.get_id()).set_block_alias("$blk.get_param('alias').get_evaluated()") - #end if - #if $blk.has_param('affinity') and $blk.get_param('affinity').get_evaluated() - (self.$blk.get_id()).set_processor_affinity($blk.get_param('affinity').get_evaluated()) - #end if - #if (len($blk.get_sources())>0) and $blk.has_param('minoutbuf') and (int($blk.get_param('minoutbuf').get_evaluated()) > 0) - (self.$blk.get_id()).set_min_output_buffer($blk.get_param('minoutbuf').get_evaluated()) - #end if - #if (len($blk.get_sources())>0) and $blk.has_param('maxoutbuf') and (int($blk.get_param('maxoutbuf').get_evaluated()) > 0) - (self.$blk.get_id()).set_max_output_buffer($blk.get_param('maxoutbuf').get_evaluated()) - #end if - #end if -#end for -######################################################## -##Create Connections -## The port name should be the id of the parent block. -## However, port names for IO pads should be self. -######################################################## -#def make_port_sig($port) - #if $port.get_parent().get_key() in ('pad_source', 'pad_sink') - #set block = 'self' - #set key = $flow_graph.get_pad_port_global_key($port) - #else - #set block = 'self.' + $port.get_parent().get_id() - #set key = $port.get_key() - #end if - #if not $key.isdigit() - #set key = repr($key) - #end if -($block, $key)#slurp -#end def -#if $connections - - $DIVIDER - # Connections - $DIVIDER -#end if -#for $con in $connections - #set global $source = $con.get_source() - #set global $sink = $con.get_sink() - #include source=$connection_templates[($source.get_domain(), $sink.get_domain())] - -#end for -######################################################## -## QT sink close method reimplementation -######################################################## -#if $generate_options == 'qt_gui' - - def closeEvent(self, event): - self.settings = Qt.QSettings("GNU Radio", "$class_name") - self.settings.setValue("geometry", self.saveGeometry()) - event.accept() - - #if $flow_graph.get_option('qt_qss_theme') - def setStyleSheetFromFile(self, filename): - try: - if not os.path.exists(filename): - filename = os.path.join( - gr.prefix(), "share", "gnuradio", "themes", filename) - with open(filename) as ss: - self.setStyleSheet(ss.read()) - except Exception as e: - print >> sys.stderr, e - #end if -#end if -######################################################## -##Create Callbacks -## Write a set method for this variable that calls the callbacks -######################################################## -#for $var in $parameters + $variables - - #set $id = $var.get_id() - def get_$(id)(self): - return self.$id - - def set_$(id)(self, $id): - #if $flow_graph.get_option('thread_safe_setters') - with self._lock: - self.$id = $id - #for $callback in $var_id2cbs[$id] - $indent($callback) - #end for - #else - self.$id = $id - #for $callback in $var_id2cbs[$id] - $indent($callback) - #end for - #end if -#end for -######################################################## -##Create Main -## For top block code, generate a main routine. -## Instantiate the top block and run as gui or cli. -######################################################## -#def make_default($type, $param) - #if $type == 'eng_float' -eng_notation.num_to_str($param.get_make())#slurp - #else -$param.get_make()#slurp - #end if -#end def -#def make_short_id($param) - #set $short_id = $param.get_param('short_id').get_evaluated() - #if $short_id - #set $short_id = '-' + $short_id - #end if -$short_id#slurp -#end def -#if not $generate_options.startswith('hb') -#set $params_eq_list = list() -#if $parameters - - -def argument_parser(): - parser = OptionParser(option_class=eng_option, usage="%prog: [options]") - #for $param in $parameters - #set $type = $param.get_param('type').get_value() - #if $type - #silent $params_eq_list.append('%s=options.%s'%($param.get_id(), $param.get_id())) - parser.add_option( - "$make_short_id($param)", "--$param.get_id().replace('_', '-')", dest="$param.get_id()", type="$type", default=$make_default($type, $param), - help="Set $($param.get_param('label').get_evaluated() or $param.get_id()) [default=%default]") - #end if - #end for - return parser -#end if - - -def main(top_block_cls=$(class_name), options=None): - #if $parameters - if options is None: - options, _ = argument_parser().parse_args() - #end if - #if $flow_graph.get_option('realtime_scheduling') - if gr.enable_realtime_scheduling() != gr.RT_OK: - print "Error: failed to enable real-time scheduling." - #end if - - #if $generate_options == 'wx_gui' - tb = top_block_cls($(', '.join($params_eq_list))) - #if $flow_graph.get_option('max_nouts') - tb.Run($flow_graph.get_option('run'), $flow_graph.get_option('max_nouts')) - #else - tb.Start($flow_graph.get_option('run')) - #for $m in $monitors - (tb.$m.get_id()).start() - #end for - tb.Wait() - #end if - #elif $generate_options == 'qt_gui' - from distutils.version import StrictVersion - if StrictVersion(Qt.qVersion()) >= StrictVersion("4.5.0"): - style = gr.prefs().get_string('qtgui', 'style', 'raster') - Qt.QApplication.setGraphicsSystem(style) - qapp = Qt.QApplication(sys.argv) - - tb = top_block_cls($(', '.join($params_eq_list))) - #if $flow_graph.get_option('run') - #if $flow_graph.get_option('max_nouts') - tb.start($flow_graph.get_option('max_nouts')) - #else - tb.start() - #end if - #end if - #if $flow_graph.get_option('qt_qss_theme') - tb.setStyleSheetFromFile($repr($flow_graph.get_option('qt_qss_theme'))) - #end if - tb.show() - - def quitting(): - tb.stop() - tb.wait() - qapp.connect(qapp, Qt.SIGNAL("aboutToQuit()"), quitting) - #for $m in $monitors - if $m.has_param('en'): - if $m.get_param('en').get_value(): - (tb.$m.get_id()).start() - else: - sys.stderr.write("Monitor '{0}' does not have an enable ('en') parameter.".format("tb.$m.get_id()")) - #end for - qapp.exec_() - #elif $generate_options == 'no_gui' - tb = top_block_cls($(', '.join($params_eq_list))) - #set $run_options = $flow_graph.get_option('run_options') - #if $run_options == 'prompt' - #if $flow_graph.get_option('max_nouts') - tb.start($flow_graph.get_option('max_nouts')) - #else - tb.start() - #end if - #for $m in $monitors - (tb.$m.get_id()).start() - #end for - try: - raw_input('Press Enter to quit: ') - except EOFError: - pass - tb.stop() - #elif $run_options == 'run' - #if $flow_graph.get_option('max_nouts') - tb.start($flow_graph.get_option('max_nouts')) - #else - tb.start() - #end if - #end if - #for $m in $monitors - (tb.$m.get_id()).start() - #end for - tb.wait() - #end if - - -if __name__ == '__main__': - main() -#end if |