summaryrefslogtreecommitdiff
path: root/grc/core/Port.py
diff options
context:
space:
mode:
Diffstat (limited to 'grc/core/Port.py')
-rw-r--r--grc/core/Port.py281
1 files changed, 130 insertions, 151 deletions
diff --git a/grc/core/Port.py b/grc/core/Port.py
index 88601dc87a..41388df409 100644
--- a/grc/core/Port.py
+++ b/grc/core/Port.py
@@ -17,8 +17,11 @@ along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
"""
-from .Constants import DEFAULT_DOMAIN, GR_STREAM_DOMAIN, GR_MESSAGE_DOMAIN
-from .Element import Element
+from __future__ import absolute_import
+
+from six.moves import filter
+
+from .Element import Element, lazy_property
from . import Constants
@@ -30,7 +33,7 @@ def _get_source_from_virtual_sink_port(vsp):
"""
try:
return _get_source_from_virtual_source_port(
- vsp.get_enabled_connections()[0].get_source())
+ vsp.get_enabled_connections()[0].source_port)
except:
raise Exception('Could not resolve source for virtual sink port {}'.format(vsp))
@@ -40,20 +43,20 @@ def _get_source_from_virtual_source_port(vsp, traversed=[]):
Recursively resolve source ports over the virtual connections.
Keep track of traversed sources to avoid recursive loops.
"""
- if not vsp.get_parent().is_virtual_source():
+ if not vsp.parent.is_virtual_source():
return vsp
if vsp in traversed:
raise Exception('Loop found when resolving virtual source {}'.format(vsp))
try:
return _get_source_from_virtual_source_port(
_get_source_from_virtual_sink_port(
- filter( # Get all virtual sinks with a matching stream id
- lambda vs: vs.get_param('stream_id').get_value() == vsp.get_parent().get_param('stream_id').get_value(),
- filter( # Get all enabled blocks that are also virtual sinks
+ list(filter( # Get all virtual sinks with a matching stream id
+ lambda vs: vs.get_param('stream_id').get_value() == vsp.parent.get_param('stream_id').get_value(),
+ list(filter( # Get all enabled blocks that are also virtual sinks
lambda b: b.is_virtual_sink(),
- vsp.get_parent().get_parent().get_enabled_blocks(),
- ),
- )[0].get_sinks()[0]
+ vsp.parent.parent.get_enabled_blocks(),
+ )),
+ ))[0].sinks[0]
), traversed + [vsp],
)
except:
@@ -68,7 +71,7 @@ def _get_sink_from_virtual_source_port(vsp):
try:
# Could have many connections, but use first
return _get_sink_from_virtual_sink_port(
- vsp.get_enabled_connections()[0].get_sink())
+ vsp.get_enabled_connections()[0].sink_port)
except:
raise Exception('Could not resolve source for virtual source port {}'.format(vsp))
@@ -78,7 +81,7 @@ def _get_sink_from_virtual_sink_port(vsp, traversed=[]):
Recursively resolve sink ports over the virtual connections.
Keep track of traversed sinks to avoid recursive loops.
"""
- if not vsp.get_parent().is_virtual_sink():
+ if not vsp.parent.is_virtual_sink():
return vsp
if vsp in traversed:
raise Exception('Loop found when resolving virtual sink {}'.format(vsp))
@@ -86,12 +89,12 @@ def _get_sink_from_virtual_sink_port(vsp, traversed=[]):
return _get_sink_from_virtual_sink_port(
_get_sink_from_virtual_source_port(
filter( # Get all virtual source with a matching stream id
- lambda vs: vs.get_param('stream_id').get_value() == vsp.get_parent().get_param('stream_id').get_value(),
- filter( # Get all enabled blocks that are also virtual sinks
+ lambda vs: vs.get_param('stream_id').get_value() == vsp.parent.get_param('stream_id').get_value(),
+ list(filter( # Get all enabled blocks that are also virtual sinks
lambda b: b.is_virtual_source(),
- vsp.get_parent().get_parent().get_enabled_blocks(),
- ),
- )[0].get_sources()[0]
+ vsp.parent.parent.get_enabled_blocks(),
+ )),
+ )[0].sources[0]
), traversed + [vsp],
)
except:
@@ -101,8 +104,9 @@ def _get_sink_from_virtual_sink_port(vsp, traversed=[]):
class Port(Element):
is_port = True
+ is_clone = False
- def __init__(self, block, n, dir):
+ def __init__(self, parent, direction, **n):
"""
Make a new port from nested data.
@@ -113,50 +117,44 @@ class Port(Element):
"""
self._n = n
if n['type'] == 'message':
- n['domain'] = GR_MESSAGE_DOMAIN
+ n['domain'] = Constants.GR_MESSAGE_DOMAIN
+
if 'domain' not in n:
- n['domain'] = DEFAULT_DOMAIN
- elif n['domain'] == GR_MESSAGE_DOMAIN:
+ n['domain'] = Constants.DEFAULT_DOMAIN
+ elif n['domain'] == Constants.GR_MESSAGE_DOMAIN:
n['key'] = n['name']
n['type'] = 'message' # For port color
- if not n.find('key'):
- n['key'] = str(next(block.port_counters[dir == 'source']))
# Build the port
- Element.__init__(self, block)
+ Element.__init__(self, parent)
# Grab the data
- self._name = n['name']
- self._key = n['key']
- self._type = n['type'] or ''
- self._domain = n['domain']
- self._hide = n.find('hide') or ''
- self._dir = dir
+ self.name = n['name']
+ self.key = n['key']
+ self.domain = n.get('domain')
+ self._type = n.get('type', '')
+ self.inherit_type = not self._type
+ self._hide = n.get('hide', '')
+ self._dir = direction
self._hide_evaluated = False # Updated on rewrite()
- self._nports = n.find('nports') or ''
- self._vlen = n.find('vlen') or ''
- self._optional = bool(n.find('optional'))
- self._clones = [] # References to cloned ports (for nports > 1)
+ self._nports = n.get('nports', '')
+ self._vlen = n.get('vlen', '')
+ self._optional = bool(n.get('optional'))
+ self.clones = [] # References to cloned ports (for nports > 1)
def __str__(self):
if self.is_source:
- return 'Source - {}({})'.format(self.get_name(), self.get_key())
+ return 'Source - {}({})'.format(self.name, self.key)
if self.is_sink:
- return 'Sink - {}({})'.format(self.get_name(), self.get_key())
-
- def get_types(self):
- return Constants.TYPE_TO_SIZEOF.keys()
-
- def is_type_empty(self):
- return not self._n['type']
+ return 'Sink - {}({})'.format(self.name, self.key)
def validate(self):
Element.validate(self)
- if self.get_type() not in self.get_types():
+ if self.get_type() not in Constants.TYPE_TO_SIZEOF.keys():
self.add_error_message('Type "{}" is not a possible type.'.format(self.get_type()))
- platform = self.get_parent().get_parent().get_parent()
- if self.get_domain() not in platform.domains:
- self.add_error_message('Domain key "{}" is not registered.'.format(self.get_domain()))
+ platform = self.parent_platform
+ if self.domain not in platform.domains:
+ self.add_error_message('Domain key "{}" is not registered.'.format(self.domain))
if not self.get_enabled_connections() and not self.get_optional():
self.add_error_message('Port is not connected.')
@@ -164,7 +162,7 @@ class Port(Element):
"""
Handle the port cloning for virtual blocks.
"""
- if self.is_type_empty():
+ if self.inherit_type:
try:
# Clone type and vlen
source = self.resolve_empty_type()
@@ -176,44 +174,44 @@ class Port(Element):
self._vlen = ''
Element.rewrite(self)
- hide = self.get_parent().resolve_dependencies(self._hide).strip().lower()
+ hide = self.parent.resolve_dependencies(self._hide).strip().lower()
self._hide_evaluated = False if hide in ('false', 'off', '0') else bool(hide)
# Update domain if was deduced from (dynamic) port type
type_ = self.get_type()
- if self._domain == GR_STREAM_DOMAIN and type_ == "message":
- self._domain = GR_MESSAGE_DOMAIN
- self._key = self._name
- if self._domain == GR_MESSAGE_DOMAIN and type_ != "message":
- self._domain = GR_STREAM_DOMAIN
- self._key = '0' # Is rectified in rewrite()
+ if self.domain == Constants.GR_STREAM_DOMAIN and type_ == "message":
+ self.domain = Constants.GR_MESSAGE_DOMAIN
+ self.key = self.name
+ if self.domain == Constants.GR_MESSAGE_DOMAIN and type_ != "message":
+ self.domain = Constants.GR_STREAM_DOMAIN
+ self.key = '0' # Is rectified in rewrite()
def resolve_virtual_source(self):
- if self.get_parent().is_virtual_sink():
+ if self.parent.is_virtual_sink():
return _get_source_from_virtual_sink_port(self)
- if self.get_parent().is_virtual_source():
+ if self.parent.is_virtual_source():
return _get_source_from_virtual_source_port(self)
def resolve_empty_type(self):
if self.is_sink:
try:
src = _get_source_from_virtual_sink_port(self)
- if not src.is_type_empty():
+ if not src.inherit_type:
return src
except:
pass
sink = _get_sink_from_virtual_sink_port(self)
- if not sink.is_type_empty():
+ if not sink.inherit_type:
return sink
if self.is_source:
try:
src = _get_source_from_virtual_source_port(self)
- if not src.is_type_empty():
+ if not src.inherit_type:
return src
except:
pass
sink = _get_sink_from_virtual_source_port(self)
- if not sink.is_type_empty():
+ if not sink.inherit_type:
return sink
def get_vlen(self):
@@ -224,9 +222,9 @@ class Port(Element):
Returns:
the vector length or 1
"""
- vlen = self.get_parent().resolve_dependencies(self._vlen)
+ vlen = self.parent.resolve_dependencies(self._vlen)
try:
- return int(self.get_parent().get_parent().evaluate(vlen))
+ return max(1, int(self.parent_flowgraph.evaluate(vlen)))
except:
return 1
@@ -240,52 +238,17 @@ class Port(Element):
the number of ports or 1
"""
if self._nports == '':
- return ''
+ return 1
- nports = self.get_parent().resolve_dependencies(self._nports)
+ nports = self.parent.resolve_dependencies(self._nports)
try:
- return max(1, int(self.get_parent().get_parent().evaluate(nports)))
+ return max(1, int(self.parent_flowgraph.evaluate(nports)))
except:
return 1
def get_optional(self):
return bool(self._optional)
- def get_color(self):
- """
- Get the color that represents this port's type.
- Codes differ for ports where the vec length is 1 or greater than 1.
-
- Returns:
- a hex color code.
- """
- try:
- color = Constants.TYPE_TO_COLOR[self.get_type()]
- vlen = self.get_vlen()
- if vlen == 1:
- return color
- color_val = int(color[1:], 16)
- r = (color_val >> 16) & 0xff
- g = (color_val >> 8) & 0xff
- b = (color_val >> 0) & 0xff
- dark = (0, 0, 30, 50, 70)[min(4, vlen)]
- r = max(r-dark, 0)
- g = max(g-dark, 0)
- b = max(b-dark, 0)
- # TODO: Change this to .format()
- return '#%.2x%.2x%.2x' % (r, g, b)
- except:
- return '#FFFFFF'
-
- def get_clones(self):
- """
- Get the clones of this master port (nports > 1)
-
- Returns:
- a list of ports
- """
- return self._clones
-
def add_clone(self):
"""
Create a clone of this (master) port and store a reference in self._clones.
@@ -297,24 +260,23 @@ class Port(Element):
the cloned port
"""
# Add index to master port name if there are no clones yet
- if not self._clones:
- self._name = self._n['name'] + '0'
+ if not self.clones:
+ self.name = self._n['name'] + '0'
# Also update key for none stream ports
- if not self._key.isdigit():
- self._key = self._name
-
- # Prepare a copy of the odict for the clone
- n = self._n.copy()
- # Remove nports from the key so the copy cannot be a duplicator
- if 'nports' in n:
- n.pop('nports')
- n['name'] = self._n['name'] + str(len(self._clones) + 1)
+ if not self.key.isdigit():
+ self.key = self.name
+
+ name = self._n['name'] + str(len(self.clones) + 1)
# Dummy value 99999 will be fixed later
- n['key'] = '99999' if self._key.isdigit() else n['name']
+ key = '99999' if self.key.isdigit() else name
# Clone
- port = self.__class__(self.get_parent(), n, self._dir)
- self._clones.append(port)
+ port_factory = self.parent_platform.get_new_port
+ port = port_factory(self.parent, direction=self._dir,
+ name=name, key=key,
+ master=self, cls_key='clone')
+
+ self.clones.append(port)
return port
def remove_clone(self, port):
@@ -322,37 +284,24 @@ class Port(Element):
Remove a cloned port (from the list of clones only)
Remove the index 0 of the master port name (and key9 if there are no more clones left
"""
- self._clones.remove(port)
+ self.clones.remove(port)
# Remove index from master port name if there are no more clones
- if not self._clones:
- self._name = self._n['name']
+ if not self.clones:
+ self.name = self._n['name']
# Also update key for none stream ports
- if not self._key.isdigit():
- self._key = self._name
-
- def get_name(self):
- number = ''
- if self.get_type() == 'bus':
- busses = filter(lambda a: a._dir == self._dir, self.get_parent().get_ports_gui())
- number = str(busses.index(self)) + '#' + str(len(self.get_associated_ports()))
- return self._name + number
-
- def get_key(self):
- return self._key
+ if not self.key.isdigit():
+ self.key = self.name
- @property
+ @lazy_property
def is_sink(self):
return self._dir == 'sink'
- @property
+ @lazy_property
def is_source(self):
return self._dir == 'source'
def get_type(self):
- return self.get_parent().resolve_dependencies(self._type)
-
- def get_domain(self):
- return self._domain
+ return self.parent_block.resolve_dependencies(self._type)
def get_hide(self):
return self._hide_evaluated
@@ -364,8 +313,8 @@ class Port(Element):
Returns:
a list of connection objects
"""
- connections = self.get_parent().get_parent().connections
- connections = filter(lambda c: c.get_source() is self or c.get_sink() is self, connections)
+ connections = self.parent_flowgraph.connections
+ connections = [c for c in connections if c.source_port is self or c.sink_port is self]
return connections
def get_enabled_connections(self):
@@ -375,22 +324,52 @@ class Port(Element):
Returns:
a list of connection objects
"""
- return filter(lambda c: c.get_enabled(), self.get_connections())
+ return [c for c in self.get_connections() if c.enabled]
def get_associated_ports(self):
if not self.get_type() == 'bus':
return [self]
+
+ block = self.parent_block
+ if self.is_source:
+ block_ports = block.sources
+ bus_structure = block.current_bus_structure['source']
else:
- if self.is_source:
- get_ports = self.get_parent().get_sources
- bus_structure = self.get_parent().current_bus_structure['source']
- else:
- get_ports = self.get_parent().get_sinks
- bus_structure = self.get_parent().current_bus_structure['sink']
-
- ports = [i for i in get_ports() if not i.get_type() == 'bus']
- if bus_structure:
- busses = [i for i in get_ports() if i.get_type() == 'bus']
- bus_index = busses.index(self)
- ports = filter(lambda a: ports.index(a) in bus_structure[bus_index], ports)
- return ports
+ block_ports = block.sinks
+ bus_structure = block.current_bus_structure['sink']
+
+ ports = [i for i in block_ports if not i.get_type() == 'bus']
+ if bus_structure:
+ bus_index = [i for i in block_ports if i.get_type() == 'bus'].index(self)
+ ports = [p for i, p in enumerate(ports) if i in bus_structure[bus_index]]
+ return ports
+
+
+class PortClone(Port):
+
+ is_clone = True
+
+ def __init__(self, parent, direction, master, name, key):
+ """
+ Make a new port from nested data.
+
+ Args:
+ block: the parent element
+ n: the nested odict
+ dir: the direction
+ """
+ Element.__init__(self, parent)
+ self.master = master
+ self.name = name
+ self._key = key
+ self._nports = '1'
+
+ def __getattr__(self, item):
+ return getattr(self.master, item)
+
+ def add_clone(self):
+ raise NotImplementedError()
+
+ def remove_clone(self, port):
+ raise NotImplementedError()
+