diff options
Diffstat (limited to 'grc/core/generator/top_block.py')
-rw-r--r-- | grc/core/generator/top_block.py | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/grc/core/generator/top_block.py b/grc/core/generator/top_block.py new file mode 100644 index 0000000000..e4fed838ae --- /dev/null +++ b/grc/core/generator/top_block.py @@ -0,0 +1,285 @@ +import codecs +import operator +import os +import tempfile +import textwrap +import time + +from mako.template import Template + +from .. import Messages, blocks +from ..Constants import TOP_BLOCK_FILE_MODE +from .FlowGraphProxy import FlowGraphProxy +from ..utils import expr_utils + +DATA_DIR = os.path.dirname(__file__) +FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.py.mako') +flow_graph_template = Template(filename=FLOW_GRAPH_TEMPLATE) + + +class TopBlockGenerator(object): + + def __init__(self, flow_graph, file_path): + """ + Initialize the top block generator object. + + Args: + flow_graph: the flow graph object + file_path: the path to write the file to + """ + + self._flow_graph = FlowGraphProxy(flow_graph) + self._generate_options = self._flow_graph.get_option('generate_options') + + self._mode = TOP_BLOCK_FILE_MODE + dirname = os.path.dirname(file_path) + # Handle the case where the directory is read-only + # In this case, use the system's temp directory + if not os.access(dirname, os.W_OK): + dirname = tempfile.gettempdir() + filename = self._flow_graph.get_option('id') + '.py' + self.file_path = os.path.join(dirname, filename) + self._dirname = dirname + + def _warnings(self): + throttling_blocks = [b for b in self._flow_graph.get_enabled_blocks() + if b.flags.throttle] + if not throttling_blocks and not self._generate_options.startswith('hb'): + Messages.send_warning("This flow graph may not have flow control: " + "no audio or RF hardware blocks found. " + "Add a Misc->Throttle block to your flow " + "graph to avoid CPU congestion.") + if len(throttling_blocks) > 1: + keys = set([b.key for b in throttling_blocks]) + if len(keys) > 1 and 'blocks_throttle' in keys: + Messages.send_warning("This flow graph contains a throttle " + "block and another rate limiting block, " + "e.g. a hardware source or sink. " + "This is usually undesired. Consider " + "removing the throttle block.") + + deprecated_block_keys = {b.name for b in self._flow_graph.get_enabled_blocks() if b.flags.deprecated} + for key in deprecated_block_keys: + Messages.send_warning("The block {!r} is deprecated.".format(key)) + + def write(self): + """generate output and write it to files""" + self._warnings() + + for filename, data in self._build_python_code_from_template(): + with codecs.open(filename, 'w', encoding='utf-8') as fp: + fp.write(data) + if filename == self.file_path: + try: + os.chmod(filename, self._mode) + except: + pass + + def _build_python_code_from_template(self): + """ + Convert the flow graph to python code. + + Returns: + a string of python code + """ + output = [] + + fg = self._flow_graph + title = fg.get_option('title') or fg.get_option('id').replace('_', ' ').title() + variables = fg.get_variables() + parameters = fg.get_parameters() + monitors = fg.get_monitors() + + for block in fg.iter_enabled_blocks(): + key = block.key + file_path = os.path.join(self._dirname, block.name + '.py') + if key == 'epy_block': + src = block.params['_source_code'].get_value() + output.append((file_path, src)) + elif key == 'epy_module': + src = block.params['source_code'].get_value() + output.append((file_path, src)) + + namespace = { + 'flow_graph': fg, + 'variables': variables, + 'parameters': parameters, + 'monitors': monitors, + 'generate_options': self._generate_options, + 'generated_time': time.ctime(), + } + flow_graph_code = flow_graph_template.render( + title=title, + imports=self._imports(), + blocks=self._blocks(), + callbacks=self._callbacks(), + connections=self._connections(), + **namespace + ) + # strip trailing white-space + flow_graph_code = "\n".join(line.rstrip() for line in flow_graph_code.split("\n")) + output.append((self.file_path, flow_graph_code)) + + return output + + def _imports(self): + fg = self._flow_graph + imports = fg.imports() + seen = set() + output = [] + + need_path_hack = any(imp.endswith("# grc-generated hier_block") for imp in imports) + if need_path_hack: + output.insert(0, textwrap.dedent("""\ + import os + import sys + sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio'))) + """)) + seen.add('import os') + seen.add('import sys') + + if fg.get_option('qt_qss_theme'): + imports.append('import os') + imports.append('import sys') + + if fg.get_option('thread_safe_setters'): + imports.append('import threading') + + def is_duplicate(l): + if l.startswith('import') or l.startswith('from') and l in seen: + return True + seen.add(line) + return False + + for import_ in sorted(imports): + lines = import_.strip().split('\n') + if not lines[0]: + continue + for line in lines: + line = line.rstrip() + if not is_duplicate(line): + output.append(line) + + return output + + def _blocks(self): + fg = self._flow_graph + parameters = fg.get_parameters() + + # List of blocks not including variables and imports and parameters and disabled + def _get_block_sort_text(block): + code = block.templates.render('make').replace(block.name, ' ') + try: + code += block.params['gui_hint'].get_value() # Newer gui markup w/ qtgui + except: + pass + return code + + blocks = [ + b for b in fg.blocks + if b.enabled and not (b.get_bypassed() or b.is_import or b in parameters or b.key == 'options') + ] + + blocks = expr_utils.sort_objects(blocks, operator.attrgetter('name'), _get_block_sort_text) + blocks_make = [] + for block in blocks: + make = block.templates.render('make') + if not block.is_variable: + make = 'self.' + block.name + ' = ' + make + if make: + blocks_make.append((block, make)) + return blocks_make + + def _callbacks(self): + fg = self._flow_graph + variables = fg.get_variables() + parameters = fg.get_parameters() + + # List of variable names + var_ids = [var.name for var in parameters + variables] + replace_dict = dict((var_id, 'self.' + var_id) for var_id in var_ids) + callbacks_all = [] + for block in fg.iter_enabled_blocks(): + callbacks_all.extend(expr_utils.expr_replace(cb, replace_dict) for cb in block.get_callbacks()) + + # Map var id to callbacks + def uses_var_id(): + used = expr_utils.get_variable_dependencies(callback, [var_id]) + return used and 'self.' + var_id in callback # callback might contain var_id itself + + callbacks = {} + for var_id in var_ids: + callbacks[var_id] = [callback for callback in callbacks_all if uses_var_id()] + + return callbacks + + def _connections(self): + fg = self._flow_graph + templates = {key: Template(text) + for key, text in fg.parent_platform.connection_templates.items()} + + def make_port_sig(port): + if port.parent.key in ('pad_source', 'pad_sink'): + block = 'self' + key = fg.get_pad_port_global_key(port) + else: + block = 'self.' + port.parent_block.name + key = port.key + + if not key.isdigit(): + key = repr(key) + + return '({block}, {key})'.format(block=block, key=key) + + connections = fg.get_enabled_connections() + + # Get the virtual blocks and resolve their connections + connection_factory = fg.parent_platform.Connection + virtual = [c for c in connections if isinstance(c.source_block, blocks.VirtualSource)] + for connection in virtual: + sink = connection.sink_port + for source in connection.source_port.resolve_virtual_source(): + resolved = connection_factory(fg.orignal_flowgraph, source, sink) + connections.append(resolved) + # Remove the virtual connection + connections.remove(connection) + + # Bypassing blocks: Need to find all the enabled connections for the block using + # the *connections* object rather than get_connections(). Create new connections + # that bypass the selected block and remove the existing ones. This allows adjacent + # bypassed blocks to see the newly created connections to downstream blocks, + # allowing them to correctly construct bypass connections. + bypassed_blocks = fg.get_bypassed_blocks() + for block in bypassed_blocks: + # Get the upstream connection (off of the sink ports) + # Use *connections* not get_connections() + source_connection = [c for c in connections if c.sink_port == block.sinks[0]] + # The source connection should never have more than one element. + assert (len(source_connection) == 1) + + # Get the source of the connection. + source_port = source_connection[0].source_port + + # Loop through all the downstream connections + for sink in (c for c in connections if c.source_port == block.sources[0]): + if not sink.enabled: + # Ignore disabled connections + continue + connection = connection_factory(fg.orignal_flowgraph, source_port, sink.sink_port) + connections.append(connection) + # Remove this sink connection + connections.remove(sink) + # Remove the source connection + connections.remove(source_connection[0]) + + # List of connections where each endpoint is enabled (sorted by domains, block names) + def by_domain_and_blocks(c): + return c.type, c.source_block.name, c.sink_block.name + + rendered = [] + for con in sorted(connections, key=by_domain_and_blocks): + template = templates[con.type] + code = template.render(make_port_sig=make_port_sig, source=con.source_port, sink=con.sink_port) + rendered.append(code) + + return rendered |