summaryrefslogtreecommitdiff
path: root/grc/python/Block.py
diff options
context:
space:
mode:
Diffstat (limited to 'grc/python/Block.py')
-rw-r--r--grc/python/Block.py324
1 files changed, 0 insertions, 324 deletions
diff --git a/grc/python/Block.py b/grc/python/Block.py
deleted file mode 100644
index aaf65fbddf..0000000000
--- a/grc/python/Block.py
+++ /dev/null
@@ -1,324 +0,0 @@
-"""
-Copyright 2008-2011 Free Software Foundation, Inc.
-This file is part of GNU Radio
-
-GNU Radio Companion is free software; you can redistribute it and/or
-modify it under the terms of the GNU General Public License
-as published by the Free Software Foundation; either version 2
-of the License, or (at your option) any later version.
-
-GNU Radio Companion is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program; if not, write to the Free Software
-Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
-"""
-
-import collections
-import itertools
-
-from .base.Constants import BLOCK_FLAG_NEED_QT_GUI, BLOCK_FLAG_NEED_WX_GUI
-from .base.odict import odict
-from .base.Block import Block as _Block
-
-from . import epy_block_io
-from .FlowGraph import _variable_matcher
-
-
-class Block(_Block):
-
- def __init__(self, flow_graph, n):
- """
- Make a new block from nested data.
-
- Args:
- flow: graph the parent element
- n: the nested odict
-
- Returns:
- block a new block
- """
- #grab the data
- self._doc = (n.find('doc') or '').strip('\n').replace('\\\n', '')
- self._imports = map(lambda i: i.strip(), n.findall('import'))
- self._make = n.find('make')
- self._var_make = n.find('var_make')
- self._checks = n.findall('check')
- self._callbacks = n.findall('callback')
- self._bus_structure_source = n.find('bus_structure_source') or ''
- self._bus_structure_sink = n.find('bus_structure_sink') or ''
- self.port_counters = [itertools.count(), itertools.count()]
- #build the block
- _Block.__init__(
- self,
- flow_graph=flow_graph,
- n=n,
- )
-
- self._epy_source_hash = -1 # for epy blocks
- self._epy_reload_error = None
-
- def get_bus_structure(self, direction):
- if direction == 'source':
- bus_structure = self._bus_structure_source;
- else:
- bus_structure = self._bus_structure_sink;
-
- bus_structure = self.resolve_dependencies(bus_structure);
-
- if not bus_structure: return ''
- try:
- clean_bus_structure = self.get_parent().evaluate(bus_structure)
- return clean_bus_structure
-
- except: return ''
-
- def validate(self):
- """
- Validate this block.
- Call the base class validate.
- Evaluate the checks: each check must evaluate to True.
- """
- _Block.validate(self)
- #evaluate the checks
- for check in self._checks:
- check_res = self.resolve_dependencies(check)
- try:
- if not self.get_parent().evaluate(check_res):
- self.add_error_message('Check "%s" failed.'%check)
- except: self.add_error_message('Check "%s" did not evaluate.'%check)
- # for variables check the value (only if var_value is used
- if _variable_matcher.match(self.get_key()) and self._var_value != '$value':
- value = self._var_value
- try:
- value = self.get_var_value()
- self.get_parent().evaluate(value)
- except Exception as err:
- self.add_error_message('Value "%s" cannot be evaluated:\n%s' % (value, err))
-
- # check if this is a GUI block and matches the selected generate option
- current_generate_option = self.get_parent().get_option('generate_options')
-
- def check_generate_mode(label, flag, valid_options):
- block_requires_mode = (
- flag in self.get_flags() or
- self.get_name().upper().startswith(label)
- )
- if block_requires_mode and current_generate_option not in valid_options:
- self.add_error_message("Can't generate this block in mode " +
- repr(current_generate_option))
-
- check_generate_mode('WX GUI', BLOCK_FLAG_NEED_WX_GUI, ('wx_gui',))
- check_generate_mode('QT GUI', BLOCK_FLAG_NEED_QT_GUI, ('qt_gui', 'hb_qt_gui'))
- if self._epy_reload_error:
- self.get_param('_source_code').add_error_message(str(self._epy_reload_error))
-
- def rewrite(self):
- """
- Add and remove ports to adjust for the nports.
- """
- _Block.rewrite(self)
- # Check and run any custom rewrite function for this block
- getattr(self, 'rewrite_' + self._key, lambda: None)()
-
- # adjust nports, disconnect hidden ports
- for ports in (self.get_sources(), self.get_sinks()):
- for i, master_port in enumerate(ports):
- nports = master_port.get_nports() or 1
- num_ports = 1 + len(master_port.get_clones())
- if master_port.get_hide():
- for connection in master_port.get_connections():
- self.get_parent().remove_element(connection)
- if not nports and num_ports == 1: # not a master port and no left-over clones
- continue
- # remove excess cloned ports
- for port in master_port.get_clones()[nports-1:]:
- # remove excess connections
- for connection in port.get_connections():
- self.get_parent().remove_element(connection)
- master_port.remove_clone(port)
- ports.remove(port)
- # add more cloned ports
- for j in range(num_ports, nports):
- port = master_port.add_clone()
- ports.insert(ports.index(master_port) + j, port)
-
- self.back_ofthe_bus(ports)
- # renumber non-message/-msg ports
- domain_specific_port_index = collections.defaultdict(int)
- for port in filter(lambda p: p.get_key().isdigit(), ports):
- domain = port.get_domain()
- port._key = str(domain_specific_port_index[domain])
- domain_specific_port_index[domain] += 1
-
- def port_controller_modify(self, direction):
- """
- Change the port controller.
-
- Args:
- direction: +1 or -1
-
- Returns:
- true for change
- """
- changed = False
- #concat the nports string from the private nports settings of all ports
- nports_str = ' '.join([port._nports for port in self.get_ports()])
- #modify all params whose keys appear in the nports string
- for param in self.get_params():
- if param.is_enum() or param.get_key() not in nports_str: continue
- #try to increment the port controller by direction
- try:
- value = param.get_evaluated()
- value = value + direction
- if 0 < value:
- param.set_value(value)
- changed = True
- except: pass
- return changed
-
- def get_doc(self):
- platform = self.get_parent().get_parent()
- documentation = platform.block_docstrings.get(self._key, {})
- from_xml = self._doc.strip()
- if from_xml:
- documentation[''] = from_xml
- return documentation
-
- def get_category(self):
- return _Block.get_category(self)
-
- def get_imports(self, raw=False):
- """
- Resolve all import statements.
- Split each import statement at newlines.
- Combine all import statments into a list.
- Filter empty imports.
-
- Returns:
- a list of import statements
- """
- if raw:
- return self._imports
- return filter(lambda i: i, sum(map(lambda i: self.resolve_dependencies(i).split('\n'), self._imports), []))
-
- def get_make(self, raw=False):
- if raw:
- return self._make
- return self.resolve_dependencies(self._make)
-
- def get_var_make(self):
- return self.resolve_dependencies(self._var_make)
-
- def get_var_value(self):
- return self.resolve_dependencies(self._var_value)
-
- def get_callbacks(self):
- """
- Get a list of function callbacks for this block.
-
- Returns:
- a list of strings
- """
- def make_callback(callback):
- callback = self.resolve_dependencies(callback)
- if 'self.' in callback: return callback
- return 'self.%s.%s'%(self.get_id(), callback)
- return map(make_callback, self._callbacks)
-
- def is_virtual_sink(self):
- return self.get_key() == 'virtual_sink'
-
- def is_virtual_source(self):
- return self.get_key() == 'virtual_source'
-
- ###########################################################################
- # Custom rewrite functions
- ###########################################################################
-
- def rewrite_epy_block(self):
- flowgraph = self.get_parent()
- platform = flowgraph.get_parent()
- param_blk = self.get_param('_io_cache')
- param_src = self.get_param('_source_code')
-
- src = param_src.get_value()
- src_hash = hash(src)
- if src_hash == self._epy_source_hash:
- return
-
- try:
- blk_io = epy_block_io.extract(src)
-
- except Exception as e:
- self._epy_reload_error = ValueError(str(e))
- try: # load last working block io
- blk_io = epy_block_io.BlockIO(*eval(param_blk.get_value()))
- except:
- return
- else:
- self._epy_reload_error = None # clear previous errors
- param_blk.set_value(repr(tuple(blk_io)))
-
- # print "Rewriting embedded python block {!r}".format(self.get_id())
-
- self._epy_source_hash = src_hash
- self._name = blk_io.name or blk_io.cls
- self._doc = blk_io.doc
- self._imports[0] = 'from {} import {}'.format(self.get_id(), blk_io.cls)
- self._make = '{}({})'.format(blk_io.cls, ', '.join(
- '{0}=${0}'.format(key) for key, _ in blk_io.params))
-
- params = {}
- for param in list(self._params):
- if hasattr(param, '__epy_param__'):
- params[param.get_key()] = param
- self._params.remove(param)
-
- for key, value in blk_io.params:
- if key in params:
- param = params[key]
- if not param.value_is_default():
- param.set_value(value)
- else:
- name = key.replace('_', ' ').title()
- n = odict(dict(name=name, key=key, type='raw', value=value))
- param = platform.Param(block=self, n=n)
- setattr(param, '__epy_param__', True)
- self._params.append(param)
-
- def update_ports(label, ports, port_specs, direction):
- ports_to_remove = list(ports)
- iter_ports = iter(ports)
- ports_new = []
- port_current = next(iter_ports, None)
- for key, port_type in port_specs:
- reuse_port = (
- port_current is not None and
- port_current.get_type() == port_type and
- (key.isdigit() or port_current.get_key() == key)
- )
- if reuse_port:
- ports_to_remove.remove(port_current)
- port, port_current = port_current, next(iter_ports, None)
- else:
- n = odict(dict(name=label + str(key), type=port_type, key=key))
- if port_type == 'message':
- n['name'] = key
- n['optional'] = '1'
- port = platform.Port(block=self, n=n, dir=direction)
- ports_new.append(port)
- # replace old port list with new one
- del ports[:]
- ports.extend(ports_new)
- # remove excess port connections
- for port in ports_to_remove:
- for connection in port.get_connections():
- flowgraph.remove_element(connection)
-
- update_ports('in', self.get_sinks(), blk_io.sinks, 'sink')
- update_ports('out', self.get_sources(), blk_io.sources, 'source')
- _Block.rewrite(self)