diff options
Diffstat (limited to 'grc/core/Port.py')
-rw-r--r-- | grc/core/Port.py | 414 |
1 files changed, 0 insertions, 414 deletions
diff --git a/grc/core/Port.py b/grc/core/Port.py deleted file mode 100644 index 8549656c9b..0000000000 --- a/grc/core/Port.py +++ /dev/null @@ -1,414 +0,0 @@ -""" -Copyright 2008-2017 Free Software Foundation, Inc. -This file is part of GNU Radio - -GNU Radio Companion is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -GNU Radio Companion is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA -""" - -from itertools import chain - -from .Constants import DEFAULT_DOMAIN, GR_STREAM_DOMAIN, GR_MESSAGE_DOMAIN -from .Element import Element - -from . import Constants - - -class LoopError(Exception): - pass - - -def _upstream_ports(port): - if port.is_sink: - return _sources_from_virtual_sink_port(port) - else: - return _sources_from_virtual_source_port(port) - - -def _sources_from_virtual_sink_port(sink_port, _traversed=None): - """ - Resolve the source port that is connected to the given virtual sink port. - Use the get source from virtual source to recursively resolve subsequent ports. - """ - source_ports_per_virtual_connection = ( - # there can be multiple ports per virtual connection - _sources_from_virtual_source_port(c.get_source(), _traversed) # type: list - for c in sink_port.get_enabled_connections() - ) - return list(chain(*source_ports_per_virtual_connection)) # concatenate generated lists of ports - - -def _sources_from_virtual_source_port(source_port, _traversed=None): - """ - Recursively resolve source ports over the virtual connections. - Keep track of traversed sources to avoid recursive loops. - """ - _traversed = set(_traversed or []) # a new set! - if source_port in _traversed: - raise LoopError('Loop found when resolving port type') - _traversed.add(source_port) - - block = source_port.get_parent() - flow_graph = block.get_parent() - - if not block.is_virtual_source(): - return [source_port] # nothing to resolve, we're done - - stream_id = block.get_param('stream_id').get_value() - - # currently the validation does not allow multiple virtual sinks and one virtual source - # but in the future it may... - connected_virtual_sink_blocks = ( - b for b in flow_graph.get_enabled_blocks() - if b.is_virtual_sink() and b.get_param('stream_id').get_value() == stream_id - ) - source_ports_per_virtual_connection = ( - _sources_from_virtual_sink_port(b.get_sinks()[0], _traversed) # type: list - for b in connected_virtual_sink_blocks - ) - return list(chain(*source_ports_per_virtual_connection)) # concatenate generated lists of ports - - -def _downstream_ports(port): - if port.is_source: - return _sinks_from_virtual_source_port(port) - else: - return _sinks_from_virtual_sink_port(port) - - -def _sinks_from_virtual_source_port(source_port, _traversed=None): - """ - Resolve the sink port that is connected to the given virtual source port. - Use the get sink from virtual sink to recursively resolve subsequent ports. - """ - sink_ports_per_virtual_connection = ( - # there can be multiple ports per virtual connection - _sinks_from_virtual_sink_port(c.get_sink(), _traversed) # type: list - for c in source_port.get_enabled_connections() - ) - return list(chain(*sink_ports_per_virtual_connection)) # concatenate generated lists of ports - - -def _sinks_from_virtual_sink_port(sink_port, _traversed=None): - """ - Recursively resolve sink ports over the virtual connections. - Keep track of traversed sinks to avoid recursive loops. - """ - _traversed = set(_traversed or []) # a new set! - if sink_port in _traversed: - raise LoopError('Loop found when resolving port type') - _traversed.add(sink_port) - - block = sink_port.get_parent() - flow_graph = block.get_parent() - - if not block.is_virtual_sink(): - return [sink_port] - - stream_id = block.get_param('stream_id').get_value() - - connected_virtual_source_blocks = ( - b for b in flow_graph.get_enabled_blocks() - if b.is_virtual_source() and b.get_param('stream_id').get_value() == stream_id - ) - sink_ports_per_virtual_connection = ( - _sinks_from_virtual_source_port(b.get_sources()[0], _traversed) # type: list - for b in connected_virtual_source_blocks - ) - return list(chain(*sink_ports_per_virtual_connection)) # concatenate generated lists of ports - - -class Port(Element): - - is_port = True - - def __init__(self, block, n, dir): - """ - Make a new port from nested data. - - Args: - block: the parent element - n: the nested odict - dir: the direction - """ - self._n = n - if n['type'] == 'message': - n['domain'] = GR_MESSAGE_DOMAIN - if 'domain' not in n: - n['domain'] = DEFAULT_DOMAIN - elif n['domain'] == GR_MESSAGE_DOMAIN: - n['key'] = n['name'] - n['type'] = 'message' # For port color - if not n.find('key'): - n['key'] = str(next(block.port_counters[dir == 'source'])) - - # Build the port - Element.__init__(self, block) - # Grab the data - self._name = n['name'] - self._key = n['key'] - self._type = n['type'] or '' - self._domain = n['domain'] - self._hide = n.find('hide') or '' - self._dir = dir - self._hide_evaluated = False # Updated on rewrite() - - self._nports = n.find('nports') or '' - self._vlen = n.find('vlen') or '' - self._optional = n.find('optional') or '' - self._optional_evaluated = False # Updated on rewrite() - self._clones = [] # References to cloned ports (for nports > 1) - - def __str__(self): - if self.is_source: - return 'Source - {}({})'.format(self.get_name(), self.get_key()) - if self.is_sink: - return 'Sink - {}({})'.format(self.get_name(), self.get_key()) - - def get_types(self): - return Constants.TYPE_TO_SIZEOF.keys() - - def is_type_empty(self): - return not self._n['type'] or not self.get_parent().resolve_dependencies(self._n['type']) - - def validate(self): - if self.get_type() not in self.get_types(): - self.add_error_message('Type "{}" is not a possible type.'.format(self.get_type())) - platform = self.get_parent().get_parent().get_parent() - if self.get_domain() not in platform.domains: - self.add_error_message('Domain key "{}" is not registered.'.format(self.get_domain())) - if not self.get_enabled_connections() and not self.get_optional(): - self.add_error_message('Port is not connected.') - - def rewrite(self): - """ - Handle the port cloning for virtual blocks. - """ - del self._error_messages[:] - if self.is_type_empty(): - self.resolve_empty_type() - - hide = self.get_parent().resolve_dependencies(self._hide).strip().lower() - self._hide_evaluated = False if hide in ('false', 'off', '0') else bool(hide) - optional = self.get_parent().resolve_dependencies(self._optional).strip().lower() - self._optional_evaluated = False if optional in ('false', 'off', '0') else bool(optional) - - # Update domain if was deduced from (dynamic) port type - type_ = self.get_type() - if self._domain == GR_STREAM_DOMAIN and type_ == "message": - self._domain = GR_MESSAGE_DOMAIN - self._key = self._name - if self._domain == GR_MESSAGE_DOMAIN and type_ != "message": - self._domain = GR_STREAM_DOMAIN - self._key = '0' # Is rectified in rewrite() - - def resolve_virtual_source(self): - """Only used by Generator after validation is passed""" - return _upstream_ports(self) - - def resolve_empty_type(self): - def find_port(finder): - try: - return next((p for p in finder(self) if not p.is_type_empty()), None) - except LoopError as error: - self.add_error_message(str(error)) - except (StopIteration, Exception) as error: - pass - - try: - port = find_port(_upstream_ports) or find_port(_downstream_ports) - self._type = str(port.get_type()) - self._vlen = str(port.get_vlen()) - except Exception: - # Reset type and vlen - self._type = self._vlen = '' - - def get_vlen(self): - """ - Get the vector length. - If the evaluation of vlen cannot be cast to an integer, return 1. - - Returns: - the vector length or 1 - """ - vlen = self.get_parent().resolve_dependencies(self._vlen) - try: - return int(self.get_parent().get_parent().evaluate(vlen)) - except: - return 1 - - def get_nports(self): - """ - Get the number of ports. - If already blank, return a blank - If the evaluation of nports cannot be cast to a positive integer, return 1. - - Returns: - the number of ports or 1 - """ - if self._nports == '': - return '' - - nports = self.get_parent().resolve_dependencies(self._nports) - try: - return max(1, int(self.get_parent().get_parent().evaluate(nports))) - except: - return 1 - - def get_optional(self): - return self._optional_evaluated - - def get_color(self): - """ - Get the color that represents this port's type. - Codes differ for ports where the vec length is 1 or greater than 1. - - Returns: - a hex color code. - """ - try: - color = Constants.TYPE_TO_COLOR[self.get_type()] - vlen = self.get_vlen() - if vlen == 1: - return color - color_val = int(color[1:], 16) - r = (color_val >> 16) & 0xff - g = (color_val >> 8) & 0xff - b = (color_val >> 0) & 0xff - dark = (0, 0, 30, 50, 70)[min(4, vlen)] - r = max(r-dark, 0) - g = max(g-dark, 0) - b = max(b-dark, 0) - # TODO: Change this to .format() - return '#%.2x%.2x%.2x' % (r, g, b) - except: - return '#FFFFFF' - - def get_clones(self): - """ - Get the clones of this master port (nports > 1) - - Returns: - a list of ports - """ - return self._clones - - def add_clone(self): - """ - Create a clone of this (master) port and store a reference in self._clones. - - The new port name (and key for message ports) will have index 1... appended. - If this is the first clone, this (master) port will get a 0 appended to its name (and key) - - Returns: - the cloned port - """ - # Add index to master port name if there are no clones yet - if not self._clones: - self._name = self._n['name'] + '0' - # Also update key for none stream ports - if not self._key.isdigit(): - self._key = self._name - - # Prepare a copy of the odict for the clone - n = self._n.copy() - # Remove nports from the key so the copy cannot be a duplicator - if 'nports' in n: - n.pop('nports') - n['name'] = self._n['name'] + str(len(self._clones) + 1) - # Dummy value 99999 will be fixed later - n['key'] = '99999' if self._key.isdigit() else n['name'] - - # Clone - port = self.__class__(self.get_parent(), n, self._dir) - self._clones.append(port) - return port - - def remove_clone(self, port): - """ - Remove a cloned port (from the list of clones only) - Remove the index 0 of the master port name (and key9 if there are no more clones left - """ - self._clones.remove(port) - # Remove index from master port name if there are no more clones - if not self._clones: - self._name = self._n['name'] - # Also update key for none stream ports - if not self._key.isdigit(): - self._key = self._name - - def get_name(self): - number = '' - if self.get_type() == 'bus': - busses = filter(lambda a: a._dir == self._dir, self.get_parent().get_ports_gui()) - number = str(busses.index(self)) + '#' + str(len(self.get_associated_ports())) - return self._name + number - - def get_key(self): - return self._key - - @property - def is_sink(self): - return self._dir == 'sink' - - @property - def is_source(self): - return self._dir == 'source' - - def get_type(self): - return self.get_parent().resolve_dependencies(self._type) - - def get_domain(self): - return self._domain - - def get_hide(self): - return self._hide_evaluated - - def get_connections(self): - """ - Get all connections that use this port. - - Returns: - a list of connection objects - """ - connections = self.get_parent().get_parent().connections - connections = filter(lambda c: c.get_source() is self or c.get_sink() is self, connections) - return connections - - def get_enabled_connections(self): - """ - Get all enabled connections that use this port. - - Returns: - a list of connection objects - """ - return filter(lambda c: c.get_enabled(), self.get_connections()) - - def get_associated_ports(self): - if not self.get_type() == 'bus': - return [self] - else: - if self.is_source: - get_ports = self.get_parent().get_sources - bus_structure = self.get_parent().current_bus_structure['source'] - else: - get_ports = self.get_parent().get_sinks - bus_structure = self.get_parent().current_bus_structure['sink'] - - ports = [i for i in get_ports() if not i.get_type() == 'bus'] - if bus_structure: - busses = [i for i in get_ports() if i.get_type() == 'bus'] - bus_index = busses.index(self) - ports = filter(lambda a: ports.index(a) in bus_structure[bus_index], ports) - return ports |