summaryrefslogtreecommitdiff
path: root/grc/core/Port.py
diff options
context:
space:
mode:
Diffstat (limited to 'grc/core/Port.py')
-rw-r--r--grc/core/Port.py414
1 files changed, 0 insertions, 414 deletions
diff --git a/grc/core/Port.py b/grc/core/Port.py
deleted file mode 100644
index 8549656c9b..0000000000
--- a/grc/core/Port.py
+++ /dev/null
@@ -1,414 +0,0 @@
-"""
-Copyright 2008-2017 Free Software Foundation, Inc.
-This file is part of GNU Radio
-
-GNU Radio Companion is free software; you can redistribute it and/or
-modify it under the terms of the GNU General Public License
-as published by the Free Software Foundation; either version 2
-of the License, or (at your option) any later version.
-
-GNU Radio Companion is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
-"""
-
-from itertools import chain
-
-from .Constants import DEFAULT_DOMAIN, GR_STREAM_DOMAIN, GR_MESSAGE_DOMAIN
-from .Element import Element
-
-from . import Constants
-
-
-class LoopError(Exception):
- pass
-
-
-def _upstream_ports(port):
- if port.is_sink:
- return _sources_from_virtual_sink_port(port)
- else:
- return _sources_from_virtual_source_port(port)
-
-
-def _sources_from_virtual_sink_port(sink_port, _traversed=None):
- """
- Resolve the source port that is connected to the given virtual sink port.
- Use the get source from virtual source to recursively resolve subsequent ports.
- """
- source_ports_per_virtual_connection = (
- # there can be multiple ports per virtual connection
- _sources_from_virtual_source_port(c.get_source(), _traversed) # type: list
- for c in sink_port.get_enabled_connections()
- )
- return list(chain(*source_ports_per_virtual_connection)) # concatenate generated lists of ports
-
-
-def _sources_from_virtual_source_port(source_port, _traversed=None):
- """
- Recursively resolve source ports over the virtual connections.
- Keep track of traversed sources to avoid recursive loops.
- """
- _traversed = set(_traversed or []) # a new set!
- if source_port in _traversed:
- raise LoopError('Loop found when resolving port type')
- _traversed.add(source_port)
-
- block = source_port.get_parent()
- flow_graph = block.get_parent()
-
- if not block.is_virtual_source():
- return [source_port] # nothing to resolve, we're done
-
- stream_id = block.get_param('stream_id').get_value()
-
- # currently the validation does not allow multiple virtual sinks and one virtual source
- # but in the future it may...
- connected_virtual_sink_blocks = (
- b for b in flow_graph.get_enabled_blocks()
- if b.is_virtual_sink() and b.get_param('stream_id').get_value() == stream_id
- )
- source_ports_per_virtual_connection = (
- _sources_from_virtual_sink_port(b.get_sinks()[0], _traversed) # type: list
- for b in connected_virtual_sink_blocks
- )
- return list(chain(*source_ports_per_virtual_connection)) # concatenate generated lists of ports
-
-
-def _downstream_ports(port):
- if port.is_source:
- return _sinks_from_virtual_source_port(port)
- else:
- return _sinks_from_virtual_sink_port(port)
-
-
-def _sinks_from_virtual_source_port(source_port, _traversed=None):
- """
- Resolve the sink port that is connected to the given virtual source port.
- Use the get sink from virtual sink to recursively resolve subsequent ports.
- """
- sink_ports_per_virtual_connection = (
- # there can be multiple ports per virtual connection
- _sinks_from_virtual_sink_port(c.get_sink(), _traversed) # type: list
- for c in source_port.get_enabled_connections()
- )
- return list(chain(*sink_ports_per_virtual_connection)) # concatenate generated lists of ports
-
-
-def _sinks_from_virtual_sink_port(sink_port, _traversed=None):
- """
- Recursively resolve sink ports over the virtual connections.
- Keep track of traversed sinks to avoid recursive loops.
- """
- _traversed = set(_traversed or []) # a new set!
- if sink_port in _traversed:
- raise LoopError('Loop found when resolving port type')
- _traversed.add(sink_port)
-
- block = sink_port.get_parent()
- flow_graph = block.get_parent()
-
- if not block.is_virtual_sink():
- return [sink_port]
-
- stream_id = block.get_param('stream_id').get_value()
-
- connected_virtual_source_blocks = (
- b for b in flow_graph.get_enabled_blocks()
- if b.is_virtual_source() and b.get_param('stream_id').get_value() == stream_id
- )
- sink_ports_per_virtual_connection = (
- _sinks_from_virtual_source_port(b.get_sources()[0], _traversed) # type: list
- for b in connected_virtual_source_blocks
- )
- return list(chain(*sink_ports_per_virtual_connection)) # concatenate generated lists of ports
-
-
-class Port(Element):
-
- is_port = True
-
- def __init__(self, block, n, dir):
- """
- Make a new port from nested data.
-
- Args:
- block: the parent element
- n: the nested odict
- dir: the direction
- """
- self._n = n
- if n['type'] == 'message':
- n['domain'] = GR_MESSAGE_DOMAIN
- if 'domain' not in n:
- n['domain'] = DEFAULT_DOMAIN
- elif n['domain'] == GR_MESSAGE_DOMAIN:
- n['key'] = n['name']
- n['type'] = 'message' # For port color
- if not n.find('key'):
- n['key'] = str(next(block.port_counters[dir == 'source']))
-
- # Build the port
- Element.__init__(self, block)
- # Grab the data
- self._name = n['name']
- self._key = n['key']
- self._type = n['type'] or ''
- self._domain = n['domain']
- self._hide = n.find('hide') or ''
- self._dir = dir
- self._hide_evaluated = False # Updated on rewrite()
-
- self._nports = n.find('nports') or ''
- self._vlen = n.find('vlen') or ''
- self._optional = n.find('optional') or ''
- self._optional_evaluated = False # Updated on rewrite()
- self._clones = [] # References to cloned ports (for nports > 1)
-
- def __str__(self):
- if self.is_source:
- return 'Source - {}({})'.format(self.get_name(), self.get_key())
- if self.is_sink:
- return 'Sink - {}({})'.format(self.get_name(), self.get_key())
-
- def get_types(self):
- return Constants.TYPE_TO_SIZEOF.keys()
-
- def is_type_empty(self):
- return not self._n['type'] or not self.get_parent().resolve_dependencies(self._n['type'])
-
- def validate(self):
- if self.get_type() not in self.get_types():
- self.add_error_message('Type "{}" is not a possible type.'.format(self.get_type()))
- platform = self.get_parent().get_parent().get_parent()
- if self.get_domain() not in platform.domains:
- self.add_error_message('Domain key "{}" is not registered.'.format(self.get_domain()))
- if not self.get_enabled_connections() and not self.get_optional():
- self.add_error_message('Port is not connected.')
-
- def rewrite(self):
- """
- Handle the port cloning for virtual blocks.
- """
- del self._error_messages[:]
- if self.is_type_empty():
- self.resolve_empty_type()
-
- hide = self.get_parent().resolve_dependencies(self._hide).strip().lower()
- self._hide_evaluated = False if hide in ('false', 'off', '0') else bool(hide)
- optional = self.get_parent().resolve_dependencies(self._optional).strip().lower()
- self._optional_evaluated = False if optional in ('false', 'off', '0') else bool(optional)
-
- # Update domain if was deduced from (dynamic) port type
- type_ = self.get_type()
- if self._domain == GR_STREAM_DOMAIN and type_ == "message":
- self._domain = GR_MESSAGE_DOMAIN
- self._key = self._name
- if self._domain == GR_MESSAGE_DOMAIN and type_ != "message":
- self._domain = GR_STREAM_DOMAIN
- self._key = '0' # Is rectified in rewrite()
-
- def resolve_virtual_source(self):
- """Only used by Generator after validation is passed"""
- return _upstream_ports(self)
-
- def resolve_empty_type(self):
- def find_port(finder):
- try:
- return next((p for p in finder(self) if not p.is_type_empty()), None)
- except LoopError as error:
- self.add_error_message(str(error))
- except (StopIteration, Exception) as error:
- pass
-
- try:
- port = find_port(_upstream_ports) or find_port(_downstream_ports)
- self._type = str(port.get_type())
- self._vlen = str(port.get_vlen())
- except Exception:
- # Reset type and vlen
- self._type = self._vlen = ''
-
- def get_vlen(self):
- """
- Get the vector length.
- If the evaluation of vlen cannot be cast to an integer, return 1.
-
- Returns:
- the vector length or 1
- """
- vlen = self.get_parent().resolve_dependencies(self._vlen)
- try:
- return int(self.get_parent().get_parent().evaluate(vlen))
- except:
- return 1
-
- def get_nports(self):
- """
- Get the number of ports.
- If already blank, return a blank
- If the evaluation of nports cannot be cast to a positive integer, return 1.
-
- Returns:
- the number of ports or 1
- """
- if self._nports == '':
- return ''
-
- nports = self.get_parent().resolve_dependencies(self._nports)
- try:
- return max(1, int(self.get_parent().get_parent().evaluate(nports)))
- except:
- return 1
-
- def get_optional(self):
- return self._optional_evaluated
-
- def get_color(self):
- """
- Get the color that represents this port's type.
- Codes differ for ports where the vec length is 1 or greater than 1.
-
- Returns:
- a hex color code.
- """
- try:
- color = Constants.TYPE_TO_COLOR[self.get_type()]
- vlen = self.get_vlen()
- if vlen == 1:
- return color
- color_val = int(color[1:], 16)
- r = (color_val >> 16) & 0xff
- g = (color_val >> 8) & 0xff
- b = (color_val >> 0) & 0xff
- dark = (0, 0, 30, 50, 70)[min(4, vlen)]
- r = max(r-dark, 0)
- g = max(g-dark, 0)
- b = max(b-dark, 0)
- # TODO: Change this to .format()
- return '#%.2x%.2x%.2x' % (r, g, b)
- except:
- return '#FFFFFF'
-
- def get_clones(self):
- """
- Get the clones of this master port (nports > 1)
-
- Returns:
- a list of ports
- """
- return self._clones
-
- def add_clone(self):
- """
- Create a clone of this (master) port and store a reference in self._clones.
-
- The new port name (and key for message ports) will have index 1... appended.
- If this is the first clone, this (master) port will get a 0 appended to its name (and key)
-
- Returns:
- the cloned port
- """
- # Add index to master port name if there are no clones yet
- if not self._clones:
- self._name = self._n['name'] + '0'
- # Also update key for none stream ports
- if not self._key.isdigit():
- self._key = self._name
-
- # Prepare a copy of the odict for the clone
- n = self._n.copy()
- # Remove nports from the key so the copy cannot be a duplicator
- if 'nports' in n:
- n.pop('nports')
- n['name'] = self._n['name'] + str(len(self._clones) + 1)
- # Dummy value 99999 will be fixed later
- n['key'] = '99999' if self._key.isdigit() else n['name']
-
- # Clone
- port = self.__class__(self.get_parent(), n, self._dir)
- self._clones.append(port)
- return port
-
- def remove_clone(self, port):
- """
- Remove a cloned port (from the list of clones only)
- Remove the index 0 of the master port name (and key9 if there are no more clones left
- """
- self._clones.remove(port)
- # Remove index from master port name if there are no more clones
- if not self._clones:
- self._name = self._n['name']
- # Also update key for none stream ports
- if not self._key.isdigit():
- self._key = self._name
-
- def get_name(self):
- number = ''
- if self.get_type() == 'bus':
- busses = filter(lambda a: a._dir == self._dir, self.get_parent().get_ports_gui())
- number = str(busses.index(self)) + '#' + str(len(self.get_associated_ports()))
- return self._name + number
-
- def get_key(self):
- return self._key
-
- @property
- def is_sink(self):
- return self._dir == 'sink'
-
- @property
- def is_source(self):
- return self._dir == 'source'
-
- def get_type(self):
- return self.get_parent().resolve_dependencies(self._type)
-
- def get_domain(self):
- return self._domain
-
- def get_hide(self):
- return self._hide_evaluated
-
- def get_connections(self):
- """
- Get all connections that use this port.
-
- Returns:
- a list of connection objects
- """
- connections = self.get_parent().get_parent().connections
- connections = filter(lambda c: c.get_source() is self or c.get_sink() is self, connections)
- return connections
-
- def get_enabled_connections(self):
- """
- Get all enabled connections that use this port.
-
- Returns:
- a list of connection objects
- """
- return filter(lambda c: c.get_enabled(), self.get_connections())
-
- def get_associated_ports(self):
- if not self.get_type() == 'bus':
- return [self]
- else:
- if self.is_source:
- get_ports = self.get_parent().get_sources
- bus_structure = self.get_parent().current_bus_structure['source']
- else:
- get_ports = self.get_parent().get_sinks
- bus_structure = self.get_parent().current_bus_structure['sink']
-
- ports = [i for i in get_ports() if not i.get_type() == 'bus']
- if bus_structure:
- busses = [i for i in get_ports() if i.get_type() == 'bus']
- bus_index = busses.index(self)
- ports = filter(lambda a: ports.index(a) in bus_structure[bus_index], ports)
- return ports