import codecs import operator import os import tempfile import textwrap import time from mako.template import Template from .. import Messages, blocks from ..Constants import TOP_BLOCK_FILE_MODE from .FlowGraphProxy import FlowGraphProxy from ..utils import expr_utils DATA_DIR = os.path.dirname(__file__) FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.py.mako') flow_graph_template = Template(filename=FLOW_GRAPH_TEMPLATE) class TopBlockGenerator(object): def __init__(self, flow_graph, file_path): """ Initialize the top block generator object. Args: flow_graph: the flow graph object file_path: the path to write the file to """ self._flow_graph = FlowGraphProxy(flow_graph) self._generate_options = self._flow_graph.get_option('generate_options') self._mode = TOP_BLOCK_FILE_MODE dirname = os.path.dirname(file_path) # Handle the case where the directory is read-only # In this case, use the system's temp directory if not os.access(dirname, os.W_OK): dirname = tempfile.gettempdir() filename = self._flow_graph.get_option('id') + '.py' self.file_path = os.path.join(dirname, filename) self._dirname = dirname def _warnings(self): throttling_blocks = [b for b in self._flow_graph.get_enabled_blocks() if b.flags.throttle] if not throttling_blocks and not self._generate_options.startswith('hb'): Messages.send_warning("This flow graph may not have flow control: " "no audio or RF hardware blocks found. " "Add a Misc->Throttle block to your flow " "graph to avoid CPU congestion.") if len(throttling_blocks) > 1: keys = set([b.key for b in throttling_blocks]) if len(keys) > 1 and 'blocks_throttle' in keys: Messages.send_warning("This flow graph contains a throttle " "block and another rate limiting block, " "e.g. a hardware source or sink. " "This is usually undesired. Consider " "removing the throttle block.") deprecated_block_keys = {b.name for b in self._flow_graph.get_enabled_blocks() if b.flags.deprecated} for key in deprecated_block_keys: Messages.send_warning("The block {!r} is deprecated.".format(key)) def write(self): """generate output and write it to files""" self._warnings() for filename, data in self._build_python_code_from_template(): with codecs.open(filename, 'w', encoding='utf-8') as fp: fp.write(data) if filename == self.file_path: try: os.chmod(filename, self._mode) except: pass def _build_python_code_from_template(self): """ Convert the flow graph to python code. Returns: a string of python code """ output = [] fg = self._flow_graph title = fg.get_option('title') or fg.get_option('id').replace('_', ' ').title() variables = fg.get_variables() parameters = fg.get_parameters() monitors = fg.get_monitors() for block in fg.iter_enabled_blocks(): key = block.key file_path = os.path.join(self._dirname, block.name + '.py') if key == 'epy_block': src = block.params['_source_code'].get_value() output.append((file_path, src)) elif key == 'epy_module': src = block.params['source_code'].get_value() output.append((file_path, src)) namespace = { 'flow_graph': fg, 'variables': variables, 'parameters': parameters, 'monitors': monitors, 'generate_options': self._generate_options, 'generated_time': time.ctime(), } flow_graph_code = flow_graph_template.render( title=title, imports=self._imports(), blocks=self._blocks(), callbacks=self._callbacks(), connections=self._connections(), **namespace ) # strip trailing white-space flow_graph_code = "\n".join(line.rstrip() for line in flow_graph_code.split("\n")) output.append((self.file_path, flow_graph_code)) return output def _imports(self): fg = self._flow_graph imports = fg.imports() seen = set() output = [] need_path_hack = any(imp.endswith("# grc-generated hier_block") for imp in imports) if need_path_hack: output.insert(0, textwrap.dedent("""\ import os import sys sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio'))) """)) seen.add('import os') seen.add('import sys') if fg.get_option('qt_qss_theme'): imports.append('import os') imports.append('import sys') if fg.get_option('thread_safe_setters'): imports.append('import threading') def is_duplicate(l): if l.startswith('import') or l.startswith('from') and l in seen: return True seen.add(line) return False for import_ in sorted(imports): lines = import_.strip().split('\n') if not lines[0]: continue for line in lines: line = line.rstrip() if not is_duplicate(line): output.append(line) return output def _blocks(self): fg = self._flow_graph parameters = fg.get_parameters() # List of blocks not including variables and imports and parameters and disabled def _get_block_sort_text(block): code = block.templates.render('make').replace(block.name, ' ') try: code += block.params['gui_hint'].get_value() # Newer gui markup w/ qtgui except: pass return code blocks = [ b for b in fg.blocks if b.enabled and not (b.get_bypassed() or b.is_import or b in parameters or b.key == 'options') ] blocks = expr_utils.sort_objects(blocks, operator.attrgetter('name'), _get_block_sort_text) blocks_make = [] for block in blocks: make = block.templates.render('make') if not block.is_variable: make = 'self.' + block.name + ' = ' + make if make: blocks_make.append((block, make)) return blocks_make def _callbacks(self): fg = self._flow_graph variables = fg.get_variables() parameters = fg.get_parameters() # List of variable names var_ids = [var.name for var in parameters + variables] replace_dict = dict((var_id, 'self.' + var_id) for var_id in var_ids) callbacks_all = [] for block in fg.iter_enabled_blocks(): callbacks_all.extend(expr_utils.expr_replace(cb, replace_dict) for cb in block.get_callbacks()) # Map var id to callbacks def uses_var_id(): used = expr_utils.get_variable_dependencies(callback, [var_id]) return used and 'self.' + var_id in callback # callback might contain var_id itself callbacks = {} for var_id in var_ids: callbacks[var_id] = [callback for callback in callbacks_all if uses_var_id()] return callbacks def _connections(self): fg = self._flow_graph templates = {key: Template(text) for key, text in fg.parent_platform.connection_templates.items()} def make_port_sig(port): if port.parent.key in ('pad_source', 'pad_sink'): block = 'self' key = fg.get_pad_port_global_key(port) else: block = 'self.' + port.parent_block.name key = port.key if not key.isdigit(): key = repr(key) return '({block}, {key})'.format(block=block, key=key) connections = fg.get_enabled_connections() # Get the virtual blocks and resolve their connections connection_factory = fg.parent_platform.Connection virtual = [c for c in connections if isinstance(c.source_block, blocks.VirtualSource)] for connection in virtual: sink = connection.sink_port for source in connection.source_port.resolve_virtual_source(): resolved = connection_factory(fg.orignal_flowgraph, source, sink) connections.append(resolved) # Remove the virtual connection connections.remove(connection) # Bypassing blocks: Need to find all the enabled connections for the block using # the *connections* object rather than get_connections(). Create new connections # that bypass the selected block and remove the existing ones. This allows adjacent # bypassed blocks to see the newly created connections to downstream blocks, # allowing them to correctly construct bypass connections. bypassed_blocks = fg.get_bypassed_blocks() for block in bypassed_blocks: # Get the upstream connection (off of the sink ports) # Use *connections* not get_connections() source_connection = [c for c in connections if c.sink_port == block.sinks[0]] # The source connection should never have more than one element. assert (len(source_connection) == 1) # Get the source of the connection. source_port = source_connection[0].source_port # Loop through all the downstream connections for sink in (c for c in connections if c.source_port == block.sources[0]): if not sink.enabled: # Ignore disabled connections continue connection = connection_factory(fg.orignal_flowgraph, source_port, sink.sink_port) connections.append(connection) # Remove this sink connection connections.remove(sink) # Remove the source connection connections.remove(source_connection[0]) # List of connections where each endpoint is enabled (sorted by domains, block names) def by_domain_and_blocks(c): return c.type, c.source_block.name, c.sink_block.name rendered = [] for con in sorted(connections, key=by_domain_and_blocks): template = templates[con.type] code = template.render(make_port_sig=make_port_sig, source=con.source_port, sink=con.sink_port) rendered.append(code) return rendered