diff options
Diffstat (limited to 'grc/core/generator')
-rw-r--r-- | grc/core/generator/FlowGraphProxy.py | 54 | ||||
-rw-r--r-- | grc/core/generator/Generator.py | 354 | ||||
-rw-r--r-- | grc/core/generator/flow_graph.py.mako | 352 | ||||
-rw-r--r-- | grc/core/generator/flow_graph.tmpl | 420 | ||||
-rw-r--r-- | grc/core/generator/hier_block.py | 196 | ||||
-rw-r--r-- | grc/core/generator/top_block.py | 285 |
6 files changed, 881 insertions, 780 deletions
diff --git a/grc/core/generator/FlowGraphProxy.py b/grc/core/generator/FlowGraphProxy.py index 23ccf95c4b..f438fa0d39 100644 --- a/grc/core/generator/FlowGraphProxy.py +++ b/grc/core/generator/FlowGraphProxy.py @@ -66,14 +66,15 @@ class FlowGraphProxy(object): # TODO: move this in a refactored Generator self.get_pad_sinks() if direction in ('source', 'out') else [] ports = [] for pad in pads: + type_param = pad.params['type'] master = { - 'label': str(pad.get_param('label').get_evaluated()), - 'type': str(pad.get_param('type').get_evaluated()), - 'vlen': str(pad.get_param('vlen').get_value()), - 'size': pad.get_param('type').opt_value('size'), - 'optional': bool(pad.get_param('optional').get_evaluated()), + 'label': str(pad.params['label'].get_evaluated()), + 'type': str(pad.params['type'].get_evaluated()), + 'vlen': str(pad.params['vlen'].get_value()), + 'size': type_param.options.attributes[type_param.get_value()]['size'], + 'optional': bool(pad.params['optional'].get_evaluated()), } - num_ports = pad.get_param('num_streams').get_evaluated() + num_ports = pad.params['num_streams'].get_evaluated() if num_ports > 1: for i in range(num_ports): clone = master.copy() @@ -91,7 +92,7 @@ class FlowGraphProxy(object): # TODO: move this in a refactored Generator a list of pad source blocks in this flow graph """ pads = [b for b in self.get_enabled_blocks() if b.key == 'pad_source'] - return sorted(pads, lambda x, y: cmp(x.get_id(), y.get_id())) + return sorted(pads, key=lambda x: x.name) def get_pad_sinks(self): """ @@ -101,7 +102,7 @@ class FlowGraphProxy(object): # TODO: move this in a refactored Generator a list of pad sink blocks in this flow graph """ pads = [b for b in self.get_enabled_blocks() if b.key == 'pad_sink'] - return sorted(pads, lambda x, y: cmp(x.get_id(), y.get_id())) + return sorted(pads, key=lambda x: x.name) def get_pad_port_global_key(self, port): """ @@ -116,15 +117,46 @@ class FlowGraphProxy(object): # TODO: move this in a refactored Generator for pad in pads: # using the block param 'type' instead of the port domain here # to emphasize that hier block generation is domain agnostic - is_message_pad = pad.get_param('type').get_evaluated() == "message" + is_message_pad = pad.params['type'].get_evaluated() == "message" if port.parent == pad: if is_message_pad: - key = pad.get_param('label').get_value() + key = pad.params['label'].get_value() else: key = str(key_offset + int(port.key)) return key else: # assuming we have either only sources or sinks if not is_message_pad: - key_offset += len(pad.get_ports()) + key_offset += len(pad.sinks) + len(pad.sources) return -1 + + +def get_hier_block_io(flow_graph, direction, domain=None): + """ + Get a list of io ports for this flow graph. + + Returns a list of dicts with: type, label, vlen, size, optional + """ + pads = flow_graph.get_pad_sources() if direction in ('sink', 'in') else \ + flow_graph.get_pad_sinks() if direction in ('source', 'out') else [] + ports = [] + for pad in pads: + type_param = pad.params['type'] + master = { + 'label': str(pad.params['label'].get_evaluated()), + 'type': str(pad.params['type'].get_evaluated()), + 'vlen': str(pad.params['vlen'].get_value()), + 'size': type_param.options.attributes[type_param.get_value()]['size'], + 'optional': bool(pad.params['optional'].get_evaluated()), + } + num_ports = pad.params['num_streams'].get_evaluated() + if num_ports > 1: + for i in range(num_ports): + clone = master.copy() + clone['label'] += str(i) + ports.append(clone) + else: + ports.append(master) + if domain is not None: + ports = [p for p in ports if p.domain == domain] + return ports diff --git a/grc/core/generator/Generator.py b/grc/core/generator/Generator.py index 316ed5014d..62dc26b8a8 100644 --- a/grc/core/generator/Generator.py +++ b/grc/core/generator/Generator.py @@ -18,25 +18,16 @@ from __future__ import absolute_import -import codecs import os -import tempfile -import operator -import collections -from Cheetah.Template import Template -import six +from mako.template import Template -from .FlowGraphProxy import FlowGraphProxy -from .. import ParseXML, Messages -from ..Constants import ( - TOP_BLOCK_FILE_MODE, BLOCK_FLAG_NEED_QT_GUI, - HIER_BLOCK_FILE_MODE, BLOCK_DTD -) -from ..utils import expr_utils +from .hier_block import HierBlockGenerator, QtHierBlockGenerator +from .top_block import TopBlockGenerator DATA_DIR = os.path.dirname(__file__) -FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.tmpl') +FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.py.mako') +flow_graph_template = Template(filename=FLOW_GRAPH_TEMPLATE) class Generator(object): @@ -64,338 +55,3 @@ class Generator(object): def __getattr__(self, item): """get all other attrib from actual generator object""" return getattr(self._generator, item) - - -class TopBlockGenerator(object): - - def __init__(self, flow_graph, file_path): - """ - Initialize the top block generator object. - - Args: - flow_graph: the flow graph object - file_path: the path to write the file to - """ - self._flow_graph = FlowGraphProxy(flow_graph) - self._generate_options = self._flow_graph.get_option('generate_options') - self._mode = TOP_BLOCK_FILE_MODE - dirname = os.path.dirname(file_path) - # Handle the case where the directory is read-only - # In this case, use the system's temp directory - if not os.access(dirname, os.W_OK): - dirname = tempfile.gettempdir() - filename = self._flow_graph.get_option('id') + '.py' - self.file_path = os.path.join(dirname, filename) - self._dirname = dirname - - def write(self): - """generate output and write it to files""" - # Do throttle warning - throttling_blocks = [b for b in self._flow_graph.get_enabled_blocks() - if b.is_throtteling] - if not throttling_blocks and not self._generate_options.startswith('hb'): - Messages.send_warning("This flow graph may not have flow control: " - "no audio or RF hardware blocks found. " - "Add a Misc->Throttle block to your flow " - "graph to avoid CPU congestion.") - if len(throttling_blocks) > 1: - keys = set([b.key for b in throttling_blocks]) - if len(keys) > 1 and 'blocks_throttle' in keys: - Messages.send_warning("This flow graph contains a throttle " - "block and another rate limiting block, " - "e.g. a hardware source or sink. " - "This is usually undesired. Consider " - "removing the throttle block.") - # Generate - for filename, data in self._build_python_code_from_template(): - with codecs.open(filename, 'w', encoding='utf-8') as fp: - fp.write(data) - if filename == self.file_path: - try: - os.chmod(filename, self._mode) - except: - pass - - def _build_python_code_from_template(self): - """ - Convert the flow graph to python code. - - Returns: - a string of python code - """ - output = list() - - fg = self._flow_graph - title = fg.get_option('title') or fg.get_option('id').replace('_', ' ').title() - imports = fg.get_imports() - variables = fg.get_variables() - parameters = fg.get_parameters() - monitors = fg.get_monitors() - - # List of blocks not including variables and imports and parameters and disabled - def _get_block_sort_text(block): - code = block.get_make().replace(block.get_id(), ' ') - try: - code += block.get_param('gui_hint').get_value() # Newer gui markup w/ qtgui - except: - pass - return code - - blocks_all = expr_utils.sort_objects( - [b for b in fg.blocks if b.enabled and not b.get_bypassed()], - operator.methodcaller('get_id'), _get_block_sort_text - ) - deprecated_block_keys = set(b.name for b in blocks_all if b.is_deprecated) - for key in deprecated_block_keys: - Messages.send_warning("The block {!r} is deprecated.".format(key)) - - # List of regular blocks (all blocks minus the special ones) - blocks = [b for b in blocks_all if b not in imports and b not in parameters] - - for block in blocks: - key = block.key - file_path = os.path.join(self._dirname, block.get_id() + '.py') - if key == 'epy_block': - src = block.get_param('_source_code').get_value() - output.append((file_path, src)) - elif key == 'epy_module': - src = block.get_param('source_code').get_value() - output.append((file_path, src)) - - # Filter out bus and virtual sink connections - connections = [con for con in fg.get_enabled_connections() - if not (con.is_bus() or con.sink_block.is_virtual_sink())] - - # Get the virtual blocks and resolve their connections - connection_factory = fg.parent_platform.Connection - virtual = [c for c in connections if c.source_block.is_virtual_source()] - for connection in virtual: - sink = connection.sink_port - for source in connection.source_port.resolve_virtual_source(): - resolved = connection_factory(fg.orignal_flowgraph, source, sink) - connections.append(resolved) - # Remove the virtual connection - connections.remove(connection) - - # Bypassing blocks: Need to find all the enabled connections for the block using - # the *connections* object rather than get_connections(). Create new connections - # that bypass the selected block and remove the existing ones. This allows adjacent - # bypassed blocks to see the newly created connections to downstream blocks, - # allowing them to correctly construct bypass connections. - bypassed_blocks = fg.get_bypassed_blocks() - for block in bypassed_blocks: - # Get the upstream connection (off of the sink ports) - # Use *connections* not get_connections() - source_connection = [c for c in connections if c.sink_port == block.sinks[0]] - # The source connection should never have more than one element. - assert (len(source_connection) == 1) - - # Get the source of the connection. - source_port = source_connection[0].source_port - - # Loop through all the downstream connections - for sink in (c for c in connections if c.source_port == block.sources[0]): - if not sink.enabled: - # Ignore disabled connections - continue - connection = connection_factory(fg.orignal_flowgraph, source_port, sink.sink_port) - connections.append(connection) - # Remove this sink connection - connections.remove(sink) - # Remove the source connection - connections.remove(source_connection[0]) - - # List of connections where each endpoint is enabled (sorted by domains, block names) - connections.sort(key=lambda c: ( - c.source_port.domain, c.sink_port.domain, - c.source_block.get_id(), c.sink_block.get_id() - )) - - connection_templates = fg.parent.connection_templates - - # List of variable names - var_ids = [var.get_id() for var in parameters + variables] - replace_dict = dict((var_id, 'self.' + var_id) for var_id in var_ids) - callbacks_all = [] - for block in blocks_all: - callbacks_all.extend(expr_utils.expr_replace(cb, replace_dict) for cb in block.get_callbacks()) - - # Map var id to callbacks - def uses_var_id(): - used = expr_utils.get_variable_dependencies(callback, [var_id]) - return used and 'self.' + var_id in callback # callback might contain var_id itself - - callbacks = {} - for var_id in var_ids: - callbacks[var_id] = [callback for callback in callbacks_all if uses_var_id()] - - # Load the namespace - namespace = { - 'title': title, - 'imports': imports, - 'flow_graph': fg, - 'variables': variables, - 'parameters': parameters, - 'monitors': monitors, - 'blocks': blocks, - 'connections': connections, - 'connection_templates': connection_templates, - 'generate_options': self._generate_options, - 'callbacks': callbacks, - } - # Build the template - t = Template(open(FLOW_GRAPH_TEMPLATE, 'r').read(), namespace) - output.append((self.file_path, "\n".join(line.rstrip() for line in str(t).split("\n")))) - return output - - -class HierBlockGenerator(TopBlockGenerator): - """Extends the top block generator to also generate a block XML file""" - - def __init__(self, flow_graph, file_path): - """ - Initialize the hier block generator object. - - Args: - flow_graph: the flow graph object - file_path: where to write the py file (the xml goes into HIER_BLOCK_LIB_DIR) - """ - TopBlockGenerator.__init__(self, flow_graph, file_path) - platform = flow_graph.parent - - hier_block_lib_dir = platform.config.hier_block_lib_dir - if not os.path.exists(hier_block_lib_dir): - os.mkdir(hier_block_lib_dir) - - self._mode = HIER_BLOCK_FILE_MODE - self.file_path = os.path.join(hier_block_lib_dir, self._flow_graph.get_option('id') + '.py') - self.file_path_xml = self.file_path + '.xml' - - def write(self): - """generate output and write it to files""" - TopBlockGenerator.write(self) - ParseXML.to_file(self._build_block_n_from_flow_graph_io(), self.file_path_xml) - ParseXML.validate_dtd(self.file_path_xml, BLOCK_DTD) - try: - os.chmod(self.file_path_xml, self._mode) - except: - pass - - def _build_block_n_from_flow_graph_io(self): - """ - Generate a block XML nested data from the flow graph IO - - Returns: - a xml node tree - """ - # Extract info from the flow graph - block_key = self._flow_graph.get_option('id') - parameters = self._flow_graph.get_parameters() - - def var_or_value(name): - if name in (p.get_id() for p in parameters): - return "$" + name - return name - - # Build the nested data - block_n = collections.OrderedDict() - block_n['name'] = self._flow_graph.get_option('title') or \ - self._flow_graph.get_option('id').replace('_', ' ').title() - block_n['key'] = block_key - block_n['category'] = self._flow_graph.get_option('category') - block_n['import'] = "from {0} import {0} # grc-generated hier_block".format( - self._flow_graph.get_option('id')) - # Make data - if parameters: - block_n['make'] = '{cls}(\n {kwargs},\n)'.format( - cls=block_key, - kwargs=',\n '.join( - '{key}=${key}'.format(key=param.get_id()) for param in parameters - ), - ) - else: - block_n['make'] = '{cls}()'.format(cls=block_key) - # Callback data - block_n['callback'] = [ - 'set_{key}(${key})'.format(key=param.get_id()) for param in parameters - ] - - # Parameters - block_n['param'] = list() - for param in parameters: - param_n = collections.OrderedDict() - param_n['name'] = param.get_param('label').get_value() or param.get_id() - param_n['key'] = param.get_id() - param_n['value'] = param.get_param('value').get_value() - param_n['type'] = 'raw' - param_n['hide'] = param.get_param('hide').get_value() - block_n['param'].append(param_n) - - # Bus stuff - if self._flow_graph.get_bussink(): - block_n['bus_sink'] = '1' - if self._flow_graph.get_bussrc(): - block_n['bus_source'] = '1' - - # Sink/source ports - for direction in ('sink', 'source'): - block_n[direction] = list() - for port in self._flow_graph.get_hier_block_io(direction): - port_n = collections.OrderedDict() - port_n['name'] = port['label'] - port_n['type'] = port['type'] - if port['type'] != "message": - port_n['vlen'] = var_or_value(port['vlen']) - if port['optional']: - port_n['optional'] = '1' - block_n[direction].append(port_n) - - # More bus stuff - bus_struct_sink = self._flow_graph.get_bus_structure_sink() - if bus_struct_sink: - block_n['bus_structure_sink'] = bus_struct_sink[0].get_param('struct').get_value() - bus_struct_src = self._flow_graph.get_bus_structure_src() - if bus_struct_src: - block_n['bus_structure_source'] = bus_struct_src[0].get_param('struct').get_value() - - # Documentation - block_n['doc'] = "\n".join(field for field in ( - self._flow_graph.get_option('author'), - self._flow_graph.get_option('description'), - self.file_path - ) if field) - block_n['grc_source'] = str(self._flow_graph.grc_file_path) - - n = {'block': block_n} - return n - - -class QtHierBlockGenerator(HierBlockGenerator): - - def _build_block_n_from_flow_graph_io(self): - n = HierBlockGenerator._build_block_n_from_flow_graph_io(self) - block_n = collections.OrderedDict() - - # insert flags after category - for key, value in six.iteritems(n['block']): - block_n[key] = value - if key == 'category': - block_n['flags'] = BLOCK_FLAG_NEED_QT_GUI - - if not block_n['name'].upper().startswith('QT GUI'): - block_n['name'] = 'QT GUI ' + block_n['name'] - - gui_hint_param = collections.OrderedDict() - gui_hint_param['name'] = 'GUI Hint' - gui_hint_param['key'] = 'gui_hint' - gui_hint_param['value'] = '' - gui_hint_param['type'] = 'gui_hint' - gui_hint_param['hide'] = 'part' - block_n['param'].append(gui_hint_param) - - block_n['make'] += ( - "\n#set $win = 'self.%s' % $id" - "\n${gui_hint()($win)}" - ) - - return {'block': block_n} diff --git a/grc/core/generator/flow_graph.py.mako b/grc/core/generator/flow_graph.py.mako new file mode 100644 index 0000000000..484441f00f --- /dev/null +++ b/grc/core/generator/flow_graph.py.mako @@ -0,0 +1,352 @@ +% if not generate_options.startswith('hb'): +#!/usr/bin/env python2 +% endif +# -*- coding: utf-8 -*- +<%def name="indent(code)">${ '\n '.join(str(code).splitlines()) }</%def> +""" +GNU Radio Python Flow Graph + +Title: ${title} +% if flow_graph.get_option('author'): +Author: ${flow_graph.get_option('author')} +% endif +% if flow_graph.get_option('description'): +Description: ${flow_graph.get_option('description')} +% endif +Generated: ${ generated_time } +""" + +% if generate_options == 'qt_gui': +from distutils.version import StrictVersion + +if __name__ == '__main__': + import ctypes + import sys + if sys.platform.startswith('linux'): + try: + x11 = ctypes.cdll.LoadLibrary('libX11.so') + x11.XInitThreads() + except: + print "Warning: failed to XInitThreads()" + +% endif +######################################################## +##Create Imports +######################################################## +% for imp in imports: +##${imp.replace(" # grc-generated hier_block", "")} +${imp} +% endfor +######################################################## +##Create Class +## Write the class declaration for a top or hier block. +## The parameter names are the arguments to __init__. +## Setup the IO signature (hier block only). +######################################################## +<% + class_name = flow_graph.get_option('id') + param_str = ', '.join(['self'] + ['%s=%s'%(param.name, param.templates.render('make')) for param in parameters]) +%>\ +% if generate_options == 'qt_gui': +from gnuradio import qtgui + +class ${class_name}(gr.top_block, Qt.QWidget): + + def __init__(${param_str}): + gr.top_block.__init__(self, "${title}") + Qt.QWidget.__init__(self) + self.setWindowTitle("${title}") + qtgui.util.check_set_qss() + try: + self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) + except: + pass + self.top_scroll_layout = Qt.QVBoxLayout() + self.setLayout(self.top_scroll_layout) + self.top_scroll = Qt.QScrollArea() + self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) + self.top_scroll_layout.addWidget(self.top_scroll) + self.top_scroll.setWidgetResizable(True) + self.top_widget = Qt.QWidget() + self.top_scroll.setWidget(self.top_widget) + self.top_layout = Qt.QVBoxLayout(self.top_widget) + self.top_grid_layout = Qt.QGridLayout() + self.top_layout.addLayout(self.top_grid_layout) + + self.settings = Qt.QSettings("GNU Radio", "${class_name}") + + try: + if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): + self.restoreGeometry(self.settings.value("geometry").toByteArray()) + else: + self.restoreGeometry(self.settings.value("geometry")) + except: + pass +% elif generate_options == 'no_gui': + +class ${class_name}(gr.top_block): + + def __init__(${param_str}): + gr.top_block.__init__(self, "${title}") +% elif generate_options.startswith('hb'): + <% in_sigs = flow_graph.get_hier_block_stream_io('in') %> + <% out_sigs = flow_graph.get_hier_block_stream_io('out') %> + + +% if generate_options == 'hb_qt_gui': +class ${class_name}(gr.hier_block2, Qt.QWidget): +% else: +class ${class_name}(gr.hier_block2): +% endif +<%def name="make_io_sig(io_sigs)"> + <% size_strs = ['%s*%s'%(io_sig['size'], io_sig['vlen']) for io_sig in io_sigs] %> + % if len(io_sigs) == 0: +gr.io_signature(0, 0, 0)\ + #elif len(${io_sigs}) == 1 +gr.io_signature(1, 1, ${size_strs[0]}) + % else: +gr.io_signaturev(${len(io_sigs)}, ${len(io_sigs)}, [${', '.join(ize_strs)}]) + % endif +</%def> + + def __init__(${param_str}): + gr.hier_block2.__init__( + self, "${ title }", + ${make_io_sig(in_sigs)}, + ${make_io_sig(out_sigs)}, + ) + % for pad in flow_graph.get_hier_block_message_io('in'): + self.message_port_register_hier_in("${ pad['label'] }") + % endfor + % for pad in flow_graph.get_hier_block_message_io('out'): + self.message_port_register_hier_out("${ pad['label'] }") + % endfor + % if generate_options == 'hb_qt_gui': + + Qt.QWidget.__init__(self) + self.top_layout = Qt.QVBoxLayout() + self.top_grid_layout = Qt.QGridLayout() + self.top_layout.addLayout(self.top_grid_layout) + self.setLayout(self.top_layout) + % endif +% endif +% if flow_graph.get_option('thread_safe_setters'): + + self._lock = threading.RLock() +% endif +######################################################## +##Create Parameters +## Set the parameter to a property of self. +######################################################## +% if parameters: + + ${'##################################################'} + # Parameters + ${'##################################################'} +% endif +% for param in parameters: + ${indent(param.get_var_make())} +% endfor +######################################################## +##Create Variables +######################################################## +% if variables: + + ${'##################################################'} + # Variables + ${'##################################################'} +% endif +% for var in variables: + ${indent(var.templates.render('var_make'))} +% endfor + % if blocks: + + ${'##################################################'} + # Blocks + ${'##################################################'} + % endif + % for blk, blk_make in blocks: + ${ indent(blk_make.strip('\n')) } +## % if 'alias' in blk.params and blk.params['alias'].get_evaluated(): +## (self.${blk.name}).set_block_alias("${blk.params['alias'].get_evaluated()}") +## % endif +## % if 'affinity' in blk.params and blk.params['affinity'].get_evaluated(): +## (self.${blk.name}).set_processor_affinity(${blk.params['affinity'].get_evaluated()}) +## % endif +## % if len(blk.sources) > 0 and 'minoutbuf' in blk.params and int(blk.params['minoutbuf'].get_evaluated()) > 0: +## (self.${blk.name}).set_min_output_buffer(${blk.params['minoutbuf'].get_evaluated()}) +## % endif +## % if len(blk.sources) > 0 and 'maxoutbuf' in blk.params and int(blk.params['maxoutbuf'].get_evaluated()) > 0: +## (self.${blk.name}).set_max_output_buffer(${blk.params['maxoutbuf'].get_evaluated()}) +## % endif + % endfor + % if connections: + + ${'##################################################'} + # Connections + ${'##################################################'} + % for connection in connections: + ${ connection.rstrip() } + % endfor + % endif +######################################################## +## QT sink close method reimplementation +######################################################## +% if generate_options == 'qt_gui': + + def closeEvent(self, event): + self.settings = Qt.QSettings("GNU Radio", "${class_name}") + self.settings.setValue("geometry", self.saveGeometry()) + event.accept() + % if flow_graph.get_option('qt_qss_theme'): + + def setStyleSheetFromFile(self, filename): + try: + if not os.path.exists(filename): + filename = os.path.join( + gr.prefix(), "share", "gnuradio", "themes", filename) + with open(filename) as ss: + self.setStyleSheet(ss.read()) + except Exception as e: + print >> sys.stderr, e + % endif +% endif +## +## +## +## Create Callbacks +## Write a set method for this variable that calls the callbacks +######################################################## + % for var in parameters + variables: + + def get_${ var.name }(self): + return self.${ var.name } + + def set_${ var.name }(self, ${ var.name }): + % if flow_graph.get_option('thread_safe_setters'): + with self._lock: + self.${ var.name } = ${ var.name } + % for callback in callbacks[var.name]: + ${ indent(callback) } + % endfor + % else: + self.${ var.name } = ${ var.name } + % for callback in callbacks[var.name]: + ${ indent(callback) } + % endfor + % endif + % endfor +######################################################## +##Create Main +## For top block code, generate a main routine. +## Instantiate the top block and run as gui or cli. +######################################################## +<%def name="make_default(type_, param)"> + % if type_ == 'eng_float': +eng_notation.num_to_str(${param.templates.render('make')}) + % else: +${param.templates.render('make')} + % endif +</%def>\ +% if not generate_options.startswith('hb'): +<% params_eq_list = list() %> +% if parameters: + +<% arg_parser_args = '' %>\ +def argument_parser(): + % if flow_graph.get_option('description'): + <% + arg_parser_args = 'description=description' + %>description = ${repr(flow_graph.get_option('description'))} + % endif + parser = ArgumentParser(${arg_parser_args}) + % for param in parameters: +<% + switches = ['"--{}"'.format(param.name.replace('_', '-'))] + short_id = param.params['short_id'].get_value() + if short_id: + switches.insert(0, '"-{}"'.format(short_id)) + + type_ = param.params['type'].get_value() + if type_: + params_eq_list.append('%s=options.%s' % (param.name, param.name)) + + default = param.templates.render('make') + if type_ == 'eng_float': + default = eng_notation.num_to_str(default) + # FIXME: + if type_ == 'string': + type_ = 'str' + %>\ + % if type_: + parser.add_argument( + ${ ', '.join(switches) }, dest="${param.name}", type=${type_}, default=${ default }, + help="Set ${param.params['label'].get_evaluated() or param.name} [default=%(default)r]") + % endif + % endfor + return parser +% endif + + +def main(top_block_cls=${class_name}, options=None): + % if parameters: + if options is None: + options = argument_parser().parse_args() + % endif + % if flow_graph.get_option('realtime_scheduling'): + if gr.enable_realtime_scheduling() != gr.RT_OK: + print "Error: failed to enable real-time scheduling." + % endif + % if generate_options == 'qt_gui': + + if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): + style = gr.prefs().get_string('qtgui', 'style', 'raster') + Qt.QApplication.setGraphicsSystem(style) + qapp = Qt.QApplication(sys.argv) + + tb = top_block_cls(${ ', '.join(params_eq_list) }) + % if flow_graph.get_option('run'): + tb.start(${flow_graph.get_option('max_nouts') or ''}) + % endif + % if flow_graph.get_option('qt_qss_theme'): + tb.setStyleSheetFromFile(${ flow_graph.get_option('qt_qss_theme') }) + % endif + tb.show() + + def quitting(): + tb.stop() + tb.wait() + qapp.aboutToQuit.connect(quitting) + % for m in monitors: + if 'en' in m.params: + if m.params['en'].get_value(): + (tb.${m.name}).start() + else: + sys.stderr.write("Monitor '{0}' does not have an enable ('en') parameter.".format("tb.${m.name}")) + % endfor + qapp.exec_() + % elif generate_options == 'no_gui': + tb = top_block_cls(${ ', '.join(params_eq_list) }) + % if flow_graph.get_option('run_options') == 'prompt': + tb.start(${ flow_graph.get_option('max_nouts') or '' }) + % for m in monitors: + (tb.${m.name}).start() + % endfor + try: + raw_input('Press Enter to quit: ') + except EOFError: + pass + tb.stop() + % elif flow_graph.get_option('run_options') == 'run': + tb.start(${flow_graph.get_option('max_nouts') or ''}) + % endif + % for m in monitors: + (tb.${m.name}).start() + % endfor + tb.wait() + % endif + + +if __name__ == '__main__': + main() +% endif diff --git a/grc/core/generator/flow_graph.tmpl b/grc/core/generator/flow_graph.tmpl deleted file mode 100644 index 202362c925..0000000000 --- a/grc/core/generator/flow_graph.tmpl +++ /dev/null @@ -1,420 +0,0 @@ -#if not $generate_options.startswith('hb') -#!/usr/bin/env python2 -#end if -# -*- coding: utf-8 -*- -######################################################## -##Cheetah template - gnuradio_python -## -##@param imports the import statements -##@param flow_graph the flow_graph -##@param variables the variable blocks -##@param parameters the parameter blocks -##@param blocks the signal blocks -##@param connections the connections -##@param generate_options the type of flow graph -##@param callbacks variable id map to callback strings -######################################################## -#def indent($code) -#set $code = '\n '.join(str($code).splitlines()) -$code#slurp -#end def -#import time -#set $DIVIDER = '#'*50 -$DIVIDER -# GNU Radio Python Flow Graph -# Title: $title -#if $flow_graph.get_option('author') -# Author: $flow_graph.get_option('author') -#end if -#if $flow_graph.get_option('description') -# Description: $flow_graph.get_option('description') -#end if -# Generated: $time.ctime() -$DIVIDER -#if $flow_graph.get_option('thread_safe_setters') -import threading -#end if - -#if $generate_options == 'qt_gui' -from distutils.version import StrictVersion -#end if - -## Call XInitThreads as the _very_ first thing. -## After some Qt import, it's too late -#if $generate_options == 'qt_gui' -if __name__ == '__main__': - import ctypes - import sys - if sys.platform.startswith('linux'): - try: - x11 = ctypes.cdll.LoadLibrary('libX11.so') - x11.XInitThreads() - except: - print "Warning: failed to XInitThreads()" - -#end if -# -######################################################## -##Create Imports -######################################################## -#if $flow_graph.get_option('qt_qss_theme') -#set imports = $sorted(set($imports + ["import os", "import sys"])) -#end if -#if any(imp.endswith("# grc-generated hier_block") for imp in $imports) -import os -import sys -#set imports = $filter(lambda i: i not in ("import os", "import sys"), $imports) -sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio'))) - -#end if -#for $imp in $imports -##$(imp.replace(" # grc-generated hier_block", "")) -$imp -#end for -######################################################## -##Create Class -## Write the class declaration for a top or hier block. -## The parameter names are the arguments to __init__. -## Setup the IO signature (hier block only). -######################################################## -#set $class_name = $flow_graph.get_option('id') -#set $param_str = ', '.join(['self'] + ['%s=%s'%(param.get_id(), param.get_make()) for param in $parameters]) -#if $generate_options == 'qt_gui' -from gnuradio import qtgui - -class $(class_name)(gr.top_block, Qt.QWidget): - - def __init__($param_str): - gr.top_block.__init__(self, "$title") - Qt.QWidget.__init__(self) - self.setWindowTitle("$title") - qtgui.util.check_set_qss() - try: - self.setWindowIcon(Qt.QIcon.fromTheme('gnuradio-grc')) - except: - pass - self.top_scroll_layout = Qt.QVBoxLayout() - self.setLayout(self.top_scroll_layout) - self.top_scroll = Qt.QScrollArea() - self.top_scroll.setFrameStyle(Qt.QFrame.NoFrame) - self.top_scroll_layout.addWidget(self.top_scroll) - self.top_scroll.setWidgetResizable(True) - self.top_widget = Qt.QWidget() - self.top_scroll.setWidget(self.top_widget) - self.top_layout = Qt.QVBoxLayout(self.top_widget) - self.top_grid_layout = Qt.QGridLayout() - self.top_layout.addLayout(self.top_grid_layout) - - self.settings = Qt.QSettings("GNU Radio", "$class_name") - - try: - if StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): - self.restoreGeometry(self.settings.value("geometry").toByteArray()) - else: - self.restoreGeometry(self.settings.value("geometry")) - except: - pass -#elif $generate_options == 'no_gui' - - -class $(class_name)(gr.top_block): - - def __init__($param_str): - gr.top_block.__init__(self, "$title") -#elif $generate_options.startswith('hb') - #set $in_sigs = $flow_graph.get_hier_block_stream_io('in') - #set $out_sigs = $flow_graph.get_hier_block_stream_io('out') - - -#if $generate_options == 'hb_qt_gui' -class $(class_name)(gr.hier_block2, Qt.QWidget): -#else -class $(class_name)(gr.hier_block2): -#end if -#def make_io_sig($io_sigs) - #set $size_strs = ['%s*%s'%(io_sig['size'], io_sig['vlen']) for io_sig in $io_sigs] - #if len($io_sigs) == 0 -gr.io_signature(0, 0, 0)#slurp - #elif len($io_sigs) == 1 -gr.io_signature(1, 1, $size_strs[0])#slurp - #else -gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))])#slurp - #end if -#end def - - def __init__($param_str): - gr.hier_block2.__init__( - self, "$title", - $make_io_sig($in_sigs), - $make_io_sig($out_sigs), - ) - #for $pad in $flow_graph.get_hier_block_message_io('in') - self.message_port_register_hier_in("$pad['label']") - #end for - #for $pad in $flow_graph.get_hier_block_message_io('out') - self.message_port_register_hier_out("$pad['label']") - #end for - #if $generate_options == 'hb_qt_gui' - - Qt.QWidget.__init__(self) - self.top_layout = Qt.QVBoxLayout() - self.top_grid_layout = Qt.QGridLayout() - self.top_layout.addLayout(self.top_grid_layout) - self.setLayout(self.top_layout) - #end if -#end if -#if $flow_graph.get_option('thread_safe_setters') - - self._lock = threading.RLock() -#end if -######################################################## -##Create Parameters -## Set the parameter to a property of self. -######################################################## -#if $parameters - - $DIVIDER - # Parameters - $DIVIDER -#end if -#for $param in $parameters - $indent($param.get_var_make()) -#end for -######################################################## -##Create Variables -######################################################## -#if $variables - - $DIVIDER - # Variables - $DIVIDER -#end if -#for $var in $variables - $indent($var.get_var_make()) -#end for -######################################################## -##Create Blocks -######################################################## -#if $blocks - - $DIVIDER - # Blocks - $DIVIDER -#end if -#for $blk in filter(lambda b: b.get_make(), $blocks) - #if $blk in $variables - $indent($blk.get_make()) - #else - self.$blk.get_id() = $indent($blk.get_make()) - #if 'alias' in $blk.params and $blk.params['alias'].get_evaluated() - (self.$blk.get_id()).set_block_alias("$blk.params['alias'].get_evaluated()") - #end if - #if 'affinity' in $blk.params and $blk.params['affinity'].get_evaluated() - (self.$blk.get_id()).set_processor_affinity($blk.params['affinity'].get_evaluated()) - #end if - #if len($blk.sources) > 0 and 'minoutbuf' in $blk.params and int($blk.params['minoutbuf'].get_evaluated()) > 0 - (self.$blk.get_id()).set_min_output_buffer($blk.params['minoutbuf'].get_evaluated()) - #end if - #if len($blk.sources) > 0 and 'maxoutbuf' in $blk.params and int($blk.params['maxoutbuf'].get_evaluated()) > 0 - (self.$blk.get_id()).set_max_output_buffer($blk.params['maxoutbuf'].get_evaluated()) - #end if - #end if -#end for -######################################################## -##Create Connections -## The port name should be the id of the parent block. -## However, port names for IO pads should be self. -######################################################## -#def make_port_sig($port) - #if $port.parent.key in ('pad_source', 'pad_sink') - #set block = 'self' - #set key = $flow_graph.get_pad_port_global_key($port) - #else - #set block = 'self.' + $port.parent.get_id() - #set key = $port.key - #end if - #if not $key.isdigit() - #set key = repr($key) - #end if -($block, $key)#slurp -#end def -#if $connections - - $DIVIDER - # Connections - $DIVIDER -#end if -#for $con in $connections - #set global $source = $con.source_port - #set global $sink = $con.sink_port - #include source=$connection_templates[($source.domain, $sink.domain)] - -#end for -######################################################## -## QT sink close method reimplementation -######################################################## -#if $generate_options == 'qt_gui' - - def closeEvent(self, event): - self.settings = Qt.QSettings("GNU Radio", "$class_name") - self.settings.setValue("geometry", self.saveGeometry()) - event.accept() - #if $flow_graph.get_option('qt_qss_theme') - - def setStyleSheetFromFile(self, filename): - try: - if not os.path.exists(filename): - filename = os.path.join( - gr.prefix(), "share", "gnuradio", "themes", filename) - with open(filename) as ss: - self.setStyleSheet(ss.read()) - except Exception as e: - print >> sys.stderr, e - #end if -#end if -######################################################## -##Create Callbacks -## Write a set method for this variable that calls the callbacks -######################################################## -#for $var in $parameters + $variables - - #set $id = $var.get_id() - def get_$(id)(self): - return self.$id - - def set_$(id)(self, $id): - #if $flow_graph.get_option('thread_safe_setters') - with self._lock: - self.$id = $id - #for $callback in $callbacks[$id] - $indent($callback) - #end for - #else - self.$id = $id - #for $callback in $callbacks[$id] - $indent($callback) - #end for - #end if -#end for -######################################################## -##Create Main -## For top block code, generate a main routine. -## Instantiate the top block and run as gui or cli. -######################################################## -#def make_default($type, $param) - #if $type == 'eng_float' -eng_notation.num_to_str($param.get_make())#slurp - #else -$param.get_make()#slurp - #end if -#end def -#def make_short_id($param) - #set $short_id = $param.get_param('short_id').get_evaluated() - #if $short_id - #set $short_id = '-' + $short_id - #end if -$short_id#slurp -#end def -#if not $generate_options.startswith('hb') -#set $params_eq_list = list() -#if $parameters - - -def argument_parser(): - #set $arg_parser_args = '' - #if $flow_graph.get_option('description') - #set $arg_parser_args = 'description=description' - description = $repr($flow_graph.get_option('description')) - #end if - parser = ArgumentParser($arg_parser_args) - #for $param in $parameters - #set $type = $param.get_param('type').get_value() - #if $type - #silent $params_eq_list.append('%s=options.%s'%($param.get_id(), $param.get_id())) - parser.add_argument( - #if $make_short_id($param) - "$make_short_id($param)", #slurp - #end if - "--$param.get_id().replace('_', '-')", dest="$param.get_id()", type=$type, default=$make_default($type, $param), - help="Set $($param.get_param('label').get_evaluated() or $param.get_id()) [default=%(default)r]") - #end if - #end for - return parser -#end if - - -def main(top_block_cls=$(class_name), options=None): - #if $parameters - if options is None: - options = argument_parser().parse_args() - #end if - #if $flow_graph.get_option('realtime_scheduling') - if gr.enable_realtime_scheduling() != gr.RT_OK: - print "Error: failed to enable real-time scheduling." - #end if - - #if $generate_options == 'qt_gui' - if StrictVersion("4.5.0") <= StrictVersion(Qt.qVersion()) < StrictVersion("5.0.0"): - style = gr.prefs().get_string('qtgui', 'style', 'raster') - Qt.QApplication.setGraphicsSystem(style) - qapp = Qt.QApplication(sys.argv) - - tb = top_block_cls($(', '.join($params_eq_list))) - #if $flow_graph.get_option('run') - #if $flow_graph.get_option('max_nouts') - tb.start($flow_graph.get_option('max_nouts')) - #else - tb.start() - #end if - #end if - #if $flow_graph.get_option('qt_qss_theme') - tb.setStyleSheetFromFile($repr($flow_graph.get_option('qt_qss_theme'))) - #end if - tb.show() - - def quitting(): - tb.stop() - tb.wait() - qapp.aboutToQuit.connect(quitting) - #for $m in $monitors - if 'en' in $m.params: - if $m.params['en'].get_value(): - (tb.$m.get_id()).start() - else: - sys.stderr.write("Monitor '{0}' does not have an enable ('en') parameter.".format("tb.$m.get_id()")) - #end for - qapp.exec_() - #elif $generate_options == 'no_gui' - tb = top_block_cls($(', '.join($params_eq_list))) - #set $run_options = $flow_graph.get_option('run_options') - #if $run_options == 'prompt' - #if $flow_graph.get_option('max_nouts') - tb.start($flow_graph.get_option('max_nouts')) - #else - tb.start() - #end if - #for $m in $monitors - (tb.$m.get_id()).start() - #end for - try: - raw_input('Press Enter to quit: ') - except EOFError: - pass - tb.stop() - #elif $run_options == 'run' - #if $flow_graph.get_option('max_nouts') - tb.start($flow_graph.get_option('max_nouts')) - #else - tb.start() - #end if - #end if - #for $m in $monitors - (tb.$m.get_id()).start() - #end for - tb.wait() - #end if - - -if __name__ == '__main__': - main() -#end if diff --git a/grc/core/generator/hier_block.py b/grc/core/generator/hier_block.py new file mode 100644 index 0000000000..ab362e0663 --- /dev/null +++ b/grc/core/generator/hier_block.py @@ -0,0 +1,196 @@ +import collections +import os + +import six + +from .top_block import TopBlockGenerator + +from .. import ParseXML, Constants + + +class HierBlockGenerator(TopBlockGenerator): + """Extends the top block generator to also generate a block XML file""" + + def __init__(self, flow_graph, file_path): + """ + Initialize the hier block generator object. + + Args: + flow_graph: the flow graph object + file_path: where to write the py file (the xml goes into HIER_BLOCK_LIB_DIR) + """ + TopBlockGenerator.__init__(self, flow_graph, file_path) + platform = flow_graph.parent + + hier_block_lib_dir = platform.config.hier_block_lib_dir + if not os.path.exists(hier_block_lib_dir): + os.mkdir(hier_block_lib_dir) + + self._mode = Constants.HIER_BLOCK_FILE_MODE + self.file_path = os.path.join(hier_block_lib_dir, self._flow_graph.get_option('id') + '.py') + self.file_path_xml = self.file_path + '.xml' + + def write(self): + """generate output and write it to files""" + TopBlockGenerator.write(self) + ParseXML.to_file(self._build_block_n_from_flow_graph_io(), self.file_path_xml) + ParseXML.validate_dtd(self.file_path_xml, Constants.BLOCK_DTD) + try: + os.chmod(self.file_path_xml, self._mode) + except: + pass + + def _build_block_n_from_flow_graph_io(self): + """ + Generate a block XML nested data from the flow graph IO + + Returns: + a xml node tree + """ + # Extract info from the flow graph + block_id = self._flow_graph.get_option('id') + parameters = self._flow_graph.get_parameters() + + def var_or_value(name): + if name in (p.name for p in parameters): + return "${" + name + " }" + return name + + # Build the nested data + data = collections.OrderedDict() + data['id'] = block_id + data['label'] = ( + self._flow_graph.get_option('title') or + self._flow_graph.get_option('id').replace('_', ' ').title() + ) + data['category'] = self._flow_graph.get_option('category') + + # Parameters + data['parameters'] = [] + for param_block in parameters: + p = collections.OrderedDict() + p['id'] = param_block.name + p['label'] = param_block.params['label'].get_value() or param_block.name + p['dtype'] = 'raw' + p['value'] = param_block.params['value'].get_value() + p['hide'] = param_block.params['hide'].get_value() + data['param'].append(p) + + # Ports + for direction in ('inputs', 'outputs'): + data[direction] = [] + for port in get_hier_block_io(self._flow_graph, direction): + p = collections.OrderedDict() + if port.domain == Constants.GR_MESSAGE_DOMAIN: + p['id'] = port.id + p['label'] = port.label + if port.domain != Constants.DEFAULT_DOMAIN: + p['domain'] = port.domain + p['dtype'] = port.dtype + if port.domain != Constants.GR_MESSAGE_DOMAIN: + p['vlen'] = var_or_value(port.vlen) + if port.optional: + p['optional'] = True + data[direction].append(p) + + t = data['templates'] = collections.OrderedDict() + + t['import'] = "from {0} import {0} # grc-generated hier_block".format( + self._flow_graph.get_option('id')) + # Make data + if parameters: + t['make'] = '{cls}(\n {kwargs},\n)'.format( + cls=block_id, + kwargs=',\n '.join( + '{key}=${key}'.format(key=param.name) for param in parameters + ), + ) + else: + t['make'] = '{cls}()'.format(cls=block_id) + # Callback data + t['callback'] = [ + 'set_{key}(${key})'.format(key=param_block.name) for param_block in parameters + ] + + + # Documentation + data['doc'] = "\n".join(field for field in ( + self._flow_graph.get_option('author'), + self._flow_graph.get_option('description'), + self.file_path + ) if field) + data['grc_source'] = str(self._flow_graph.grc_file_path) + + n = {'block': data} + return n + + +class QtHierBlockGenerator(HierBlockGenerator): + + def _build_block_n_from_flow_graph_io(self): + n = HierBlockGenerator._build_block_n_from_flow_graph_io(self) + block_n = collections.OrderedDict() + + # insert flags after category + for key, value in six.iteritems(n['block']): + block_n[key] = value + if key == 'category': + block_n['flags'] = 'need_qt_gui' + + if not block_n['name'].upper().startswith('QT GUI'): + block_n['name'] = 'QT GUI ' + block_n['name'] + + gui_hint_param = collections.OrderedDict() + gui_hint_param['name'] = 'GUI Hint' + gui_hint_param['key'] = 'gui_hint' + gui_hint_param['value'] = '' + gui_hint_param['type'] = 'gui_hint' + gui_hint_param['hide'] = 'part' + block_n['param'].append(gui_hint_param) + + block_n['make'] += ( + "\n#set $win = 'self.%s' % $id" + "\n${gui_hint()($win)}" + ) + + return {'block': block_n} + + +def get_hier_block_io(flow_graph, direction, domain=None): + """ + Get a list of io ports for this flow graph. + + Returns a list of dicts with: type, label, vlen, size, optional + """ + pads = flow_graph.get_pad_sources() if direction == 'inputs' else flow_graph.get_pad_sinks() + + ports = [] + for pad in pads: + for port in (pad.sources if direction == 'inputs' else pad.sinks): + if domain and port.domain != domain: + continue + yield port + + type_param = pad.params['type'] + master = { + 'label': str(pad.params['label'].get_evaluated()), + 'type': str(pad.params['type'].get_evaluated()), + 'vlen': str(pad.params['vlen'].get_value()), + 'size': type_param.options.attributes[type_param.get_value()]['size'], + 'optional': bool(pad.params['optional'].get_evaluated()), + } + if domain and pad. + + num_ports = pad.params['num_streams'].get_evaluated() + if num_ports <= 1: + yield master + else: + for i in range(num_ports): + clone = master.copy() + clone['label'] += str(i) + ports.append(clone) + else: + ports.append(master) + if domain is not None: + ports = [p for p in ports if p.domain == domain] + return ports diff --git a/grc/core/generator/top_block.py b/grc/core/generator/top_block.py new file mode 100644 index 0000000000..e4fed838ae --- /dev/null +++ b/grc/core/generator/top_block.py @@ -0,0 +1,285 @@ +import codecs +import operator +import os +import tempfile +import textwrap +import time + +from mako.template import Template + +from .. import Messages, blocks +from ..Constants import TOP_BLOCK_FILE_MODE +from .FlowGraphProxy import FlowGraphProxy +from ..utils import expr_utils + +DATA_DIR = os.path.dirname(__file__) +FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.py.mako') +flow_graph_template = Template(filename=FLOW_GRAPH_TEMPLATE) + + +class TopBlockGenerator(object): + + def __init__(self, flow_graph, file_path): + """ + Initialize the top block generator object. + + Args: + flow_graph: the flow graph object + file_path: the path to write the file to + """ + + self._flow_graph = FlowGraphProxy(flow_graph) + self._generate_options = self._flow_graph.get_option('generate_options') + + self._mode = TOP_BLOCK_FILE_MODE + dirname = os.path.dirname(file_path) + # Handle the case where the directory is read-only + # In this case, use the system's temp directory + if not os.access(dirname, os.W_OK): + dirname = tempfile.gettempdir() + filename = self._flow_graph.get_option('id') + '.py' + self.file_path = os.path.join(dirname, filename) + self._dirname = dirname + + def _warnings(self): + throttling_blocks = [b for b in self._flow_graph.get_enabled_blocks() + if b.flags.throttle] + if not throttling_blocks and not self._generate_options.startswith('hb'): + Messages.send_warning("This flow graph may not have flow control: " + "no audio or RF hardware blocks found. " + "Add a Misc->Throttle block to your flow " + "graph to avoid CPU congestion.") + if len(throttling_blocks) > 1: + keys = set([b.key for b in throttling_blocks]) + if len(keys) > 1 and 'blocks_throttle' in keys: + Messages.send_warning("This flow graph contains a throttle " + "block and another rate limiting block, " + "e.g. a hardware source or sink. " + "This is usually undesired. Consider " + "removing the throttle block.") + + deprecated_block_keys = {b.name for b in self._flow_graph.get_enabled_blocks() if b.flags.deprecated} + for key in deprecated_block_keys: + Messages.send_warning("The block {!r} is deprecated.".format(key)) + + def write(self): + """generate output and write it to files""" + self._warnings() + + for filename, data in self._build_python_code_from_template(): + with codecs.open(filename, 'w', encoding='utf-8') as fp: + fp.write(data) + if filename == self.file_path: + try: + os.chmod(filename, self._mode) + except: + pass + + def _build_python_code_from_template(self): + """ + Convert the flow graph to python code. + + Returns: + a string of python code + """ + output = [] + + fg = self._flow_graph + title = fg.get_option('title') or fg.get_option('id').replace('_', ' ').title() + variables = fg.get_variables() + parameters = fg.get_parameters() + monitors = fg.get_monitors() + + for block in fg.iter_enabled_blocks(): + key = block.key + file_path = os.path.join(self._dirname, block.name + '.py') + if key == 'epy_block': + src = block.params['_source_code'].get_value() + output.append((file_path, src)) + elif key == 'epy_module': + src = block.params['source_code'].get_value() + output.append((file_path, src)) + + namespace = { + 'flow_graph': fg, + 'variables': variables, + 'parameters': parameters, + 'monitors': monitors, + 'generate_options': self._generate_options, + 'generated_time': time.ctime(), + } + flow_graph_code = flow_graph_template.render( + title=title, + imports=self._imports(), + blocks=self._blocks(), + callbacks=self._callbacks(), + connections=self._connections(), + **namespace + ) + # strip trailing white-space + flow_graph_code = "\n".join(line.rstrip() for line in flow_graph_code.split("\n")) + output.append((self.file_path, flow_graph_code)) + + return output + + def _imports(self): + fg = self._flow_graph + imports = fg.imports() + seen = set() + output = [] + + need_path_hack = any(imp.endswith("# grc-generated hier_block") for imp in imports) + if need_path_hack: + output.insert(0, textwrap.dedent("""\ + import os + import sys + sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio'))) + """)) + seen.add('import os') + seen.add('import sys') + + if fg.get_option('qt_qss_theme'): + imports.append('import os') + imports.append('import sys') + + if fg.get_option('thread_safe_setters'): + imports.append('import threading') + + def is_duplicate(l): + if l.startswith('import') or l.startswith('from') and l in seen: + return True + seen.add(line) + return False + + for import_ in sorted(imports): + lines = import_.strip().split('\n') + if not lines[0]: + continue + for line in lines: + line = line.rstrip() + if not is_duplicate(line): + output.append(line) + + return output + + def _blocks(self): + fg = self._flow_graph + parameters = fg.get_parameters() + + # List of blocks not including variables and imports and parameters and disabled + def _get_block_sort_text(block): + code = block.templates.render('make').replace(block.name, ' ') + try: + code += block.params['gui_hint'].get_value() # Newer gui markup w/ qtgui + except: + pass + return code + + blocks = [ + b for b in fg.blocks + if b.enabled and not (b.get_bypassed() or b.is_import or b in parameters or b.key == 'options') + ] + + blocks = expr_utils.sort_objects(blocks, operator.attrgetter('name'), _get_block_sort_text) + blocks_make = [] + for block in blocks: + make = block.templates.render('make') + if not block.is_variable: + make = 'self.' + block.name + ' = ' + make + if make: + blocks_make.append((block, make)) + return blocks_make + + def _callbacks(self): + fg = self._flow_graph + variables = fg.get_variables() + parameters = fg.get_parameters() + + # List of variable names + var_ids = [var.name for var in parameters + variables] + replace_dict = dict((var_id, 'self.' + var_id) for var_id in var_ids) + callbacks_all = [] + for block in fg.iter_enabled_blocks(): + callbacks_all.extend(expr_utils.expr_replace(cb, replace_dict) for cb in block.get_callbacks()) + + # Map var id to callbacks + def uses_var_id(): + used = expr_utils.get_variable_dependencies(callback, [var_id]) + return used and 'self.' + var_id in callback # callback might contain var_id itself + + callbacks = {} + for var_id in var_ids: + callbacks[var_id] = [callback for callback in callbacks_all if uses_var_id()] + + return callbacks + + def _connections(self): + fg = self._flow_graph + templates = {key: Template(text) + for key, text in fg.parent_platform.connection_templates.items()} + + def make_port_sig(port): + if port.parent.key in ('pad_source', 'pad_sink'): + block = 'self' + key = fg.get_pad_port_global_key(port) + else: + block = 'self.' + port.parent_block.name + key = port.key + + if not key.isdigit(): + key = repr(key) + + return '({block}, {key})'.format(block=block, key=key) + + connections = fg.get_enabled_connections() + + # Get the virtual blocks and resolve their connections + connection_factory = fg.parent_platform.Connection + virtual = [c for c in connections if isinstance(c.source_block, blocks.VirtualSource)] + for connection in virtual: + sink = connection.sink_port + for source in connection.source_port.resolve_virtual_source(): + resolved = connection_factory(fg.orignal_flowgraph, source, sink) + connections.append(resolved) + # Remove the virtual connection + connections.remove(connection) + + # Bypassing blocks: Need to find all the enabled connections for the block using + # the *connections* object rather than get_connections(). Create new connections + # that bypass the selected block and remove the existing ones. This allows adjacent + # bypassed blocks to see the newly created connections to downstream blocks, + # allowing them to correctly construct bypass connections. + bypassed_blocks = fg.get_bypassed_blocks() + for block in bypassed_blocks: + # Get the upstream connection (off of the sink ports) + # Use *connections* not get_connections() + source_connection = [c for c in connections if c.sink_port == block.sinks[0]] + # The source connection should never have more than one element. + assert (len(source_connection) == 1) + + # Get the source of the connection. + source_port = source_connection[0].source_port + + # Loop through all the downstream connections + for sink in (c for c in connections if c.source_port == block.sources[0]): + if not sink.enabled: + # Ignore disabled connections + continue + connection = connection_factory(fg.orignal_flowgraph, source_port, sink.sink_port) + connections.append(connection) + # Remove this sink connection + connections.remove(sink) + # Remove the source connection + connections.remove(source_connection[0]) + + # List of connections where each endpoint is enabled (sorted by domains, block names) + def by_domain_and_blocks(c): + return c.type, c.source_block.name, c.sink_block.name + + rendered = [] + for con in sorted(connections, key=by_domain_and_blocks): + template = templates[con.type] + code = template.render(make_port_sig=make_port_sig, source=con.source_port, sink=con.sink_port) + rendered.append(code) + + return rendered |