summaryrefslogtreecommitdiff
path: root/grc/python/Block.py
diff options
context:
space:
mode:
Diffstat (limited to 'grc/python/Block.py')
-rw-r--r--grc/python/Block.py376
1 files changed, 184 insertions, 192 deletions
diff --git a/grc/python/Block.py b/grc/python/Block.py
index d365c43319..e13b26c12f 100644
--- a/grc/python/Block.py
+++ b/grc/python/Block.py
@@ -23,195 +23,187 @@ import extract_docs
class Block(_Block, _GUIBlock):
- def is_virtual_sink(self): return self.get_key() == 'virtual_sink'
- def is_virtual_source(self): return self.get_key() == 'virtual_source'
-
- ##for make source to keep track of indexes
- _source_count = 0
- ##for make sink to keep track of indexes
- _sink_count = 0
-
-
-
- def __init__(self, flow_graph, n):
- """
- Make a new block from nested data.
-
- Args:
- flow: graph the parent element
- n: the nested odict
-
- Returns:
- block a new block
- """
- #grab the data
- self._doc = n.find('doc') or ''
- self._imports = map(lambda i: i.strip(), n.findall('import'))
- self._make = n.find('make')
- self._var_make = n.find('var_make')
- self._checks = n.findall('check')
- self._callbacks = n.findall('callback')
- self._throttle = n.find('throttle') or ''
- self._bus_structure_source = n.find('bus_structure_source') or ''
- self._bus_structure_sink = n.find('bus_structure_sink') or ''
- #build the block
- _Block.__init__(
- self,
- flow_graph=flow_graph,
- n=n,
- )
- _GUIBlock.__init__(self)
-
- def get_bus_structure(self, direction):
- if direction == 'source':
- bus_structure = self._bus_structure_source;
- else:
- bus_structure = self._bus_structure_sink;
-
- bus_structure = self.resolve_dependencies(bus_structure);
-
- if not bus_structure: return ''
- try:
- clean_bus_structure = self.get_parent().evaluate(bus_structure)
- return clean_bus_structure
-
- except: return ''
-
- def throttle(self): return bool(self._throttle)
-
- def validate(self):
- """
- Validate this block.
- Call the base class validate.
- Evaluate the checks: each check must evaluate to True.
- """
- _Block.validate(self)
- #evaluate the checks
- for check in self._checks:
- check_res = self.resolve_dependencies(check)
- try:
- if not self.get_parent().evaluate(check_res):
- self.add_error_message('Check "%s" failed.'%check)
- except: self.add_error_message('Check "%s" did not evaluate.'%check)
-
- def rewrite(self):
- """
- Add and remove ports to adjust for the nports.
- """
- _Block.rewrite(self)
-
- def rectify(ports):
- #restore integer contiguity after insertion
- #rectify the port names with the index
- self.back_ofthe_bus(ports);
- for i, port in enumerate(ports):
- port._key = str(i)
- port._name = port._n['name']
- if len(ports) > 1 and not port._type == 'bus': port._name += str(i)
-
- def insert_port(get_ports, get_port, key):
- prev_port = get_port(str(int(key)-1))
- get_ports().insert(
- get_ports().index(prev_port)+1,
- prev_port.copy(new_key=key),
- )
- rectify(get_ports())
-
- def remove_port(get_ports, get_port, key):
- port = get_port(key)
- for connection in port.get_connections():
- self.get_parent().remove_element(connection)
- get_ports().remove(port)
- rectify(get_ports())
-
- #adjust nports
- for get_ports, get_port in (
- (self.get_sources, self.get_source),
- (self.get_sinks, self.get_sink),
- ):
- master_ports = filter(lambda p: p.get_nports(), get_ports())
- for i, master_port in enumerate(master_ports):
- nports = master_port.get_nports()
- index_first = get_ports().index(master_port)
- try: index_last = get_ports().index(master_ports[i+1])
- except IndexError: index_last = len(get_ports())
- num_ports = index_last - index_first
- #do nothing if nports is already num ports
- if nports == num_ports: continue
- #remove excess ports and connections
- if nports < num_ports:
- for key in reversed(map(str, range(index_first+nports, index_first+num_ports))):
- remove_port(get_ports, get_port, key);
-
-
- continue
- #add more ports
- if nports > num_ports:
- for key in map(str, range(index_first+num_ports, index_first+nports)):
- insert_port(get_ports, get_port, key)
-
-
- continue
-
-
- def port_controller_modify(self, direction):
- """
- Change the port controller.
-
- Args:
- direction: +1 or -1
-
- Returns:
- true for change
- """
- changed = False
- #concat the nports string from the private nports settings of all ports
- nports_str = ' '.join([port._nports for port in self.get_ports()])
- #modify all params whose keys appear in the nports string
- for param in self.get_params():
- if param.is_enum() or param.get_key() not in nports_str: continue
- #try to increment the port controller by direction
- try:
- value = param.get_evaluated()
- value = value + direction
- if 0 < value:
- param.set_value(value)
- changed = True
- except: pass
- return changed
-
- def get_doc(self):
- doc = self._doc.strip('\n').replace('\\\n', '')
- #merge custom doc with doxygen docs
- return '\n'.join([doc, extract_docs.extract(self.get_key())]).strip('\n')
-
- def get_category(self):
- return _Block.get_category(self)
-
- def get_imports(self):
- """
- Resolve all import statements.
- Split each import statement at newlines.
- Combine all import statments into a list.
- Filter empty imports.
-
- Returns:
- a list of import statements
- """
- return filter(lambda i: i, sum(map(lambda i: self.resolve_dependencies(i).split('\n'), self._imports), []))
-
- def get_make(self): return self.resolve_dependencies(self._make)
- def get_var_make(self): return self.resolve_dependencies(self._var_make)
-
- def get_callbacks(self):
- """
- Get a list of function callbacks for this block.
-
- Returns:
- a list of strings
- """
- def make_callback(callback):
- callback = self.resolve_dependencies(callback)
- if 'self.' in callback: return callback
- return 'self.%s.%s'%(self.get_id(), callback)
- return map(make_callback, self._callbacks)
+ def is_virtual_sink(self): return self.get_key() == 'virtual_sink'
+ def is_virtual_source(self): return self.get_key() == 'virtual_source'
+
+ ##for make source to keep track of indexes
+ _source_count = 0
+ ##for make sink to keep track of indexes
+ _sink_count = 0
+
+ def __init__(self, flow_graph, n):
+ """
+ Make a new block from nested data.
+
+ Args:
+ flow: graph the parent element
+ n: the nested odict
+
+ Returns:
+ block a new block
+ """
+ #grab the data
+ self._doc = n.find('doc') or ''
+ self._imports = map(lambda i: i.strip(), n.findall('import'))
+ self._make = n.find('make')
+ self._var_make = n.find('var_make')
+ self._checks = n.findall('check')
+ self._callbacks = n.findall('callback')
+ self._throttle = n.find('throttle') or ''
+ self._bus_structure_source = n.find('bus_structure_source') or ''
+ self._bus_structure_sink = n.find('bus_structure_sink') or ''
+ #build the block
+ _Block.__init__(
+ self,
+ flow_graph=flow_graph,
+ n=n,
+ )
+ _GUIBlock.__init__(self)
+
+ def get_bus_structure(self, direction):
+ if direction == 'source':
+ bus_structure = self._bus_structure_source;
+ else:
+ bus_structure = self._bus_structure_sink;
+
+ bus_structure = self.resolve_dependencies(bus_structure);
+
+ if not bus_structure: return ''
+ try:
+ clean_bus_structure = self.get_parent().evaluate(bus_structure)
+ return clean_bus_structure
+
+ except: return ''
+ def throttle(self): return bool(self._throttle)
+
+ def validate(self):
+ """
+ Validate this block.
+ Call the base class validate.
+ Evaluate the checks: each check must evaluate to True.
+ """
+ _Block.validate(self)
+ #evaluate the checks
+ for check in self._checks:
+ check_res = self.resolve_dependencies(check)
+ try:
+ if not self.get_parent().evaluate(check_res):
+ self.add_error_message('Check "%s" failed.'%check)
+ except: self.add_error_message('Check "%s" did not evaluate.'%check)
+
+ def rewrite(self):
+ """
+ Add and remove ports to adjust for the nports.
+ """
+ _Block.rewrite(self)
+
+ def rectify(ports):
+ #restore integer contiguity after insertion
+ #rectify the port names with the index
+ self.back_ofthe_bus(ports);
+ for i, port in enumerate(ports):
+ port._key = str(i)
+ port._name = port._n['name']
+ if len(ports) > 1 and not port._type == 'bus': port._name += str(i)
+
+ def insert_port(get_ports, get_port, key):
+ prev_port = get_port(str(int(key)-1))
+ get_ports().insert(
+ get_ports().index(prev_port)+1,
+ prev_port.copy(new_key=key),
+ )
+ rectify(get_ports())
+
+ def remove_port(get_ports, get_port, key):
+ port = get_port(key)
+ for connection in port.get_connections():
+ self.get_parent().remove_element(connection)
+ get_ports().remove(port)
+ rectify(get_ports())
+
+ #adjust nports
+ for get_ports, get_port in (
+ (self.get_sources, self.get_source),
+ (self.get_sinks, self.get_sink),
+ ):
+ master_ports = filter(lambda p: p.get_nports(), get_ports())
+ for i, master_port in enumerate(master_ports):
+ nports = master_port.get_nports()
+ index_first = get_ports().index(master_port)
+ try: index_last = get_ports().index(master_ports[i+1])
+ except IndexError: index_last = len(get_ports())
+ num_ports = index_last - index_first
+ #do nothing if nports is already num ports
+ if nports == num_ports: continue
+ #remove excess ports and connections
+ if nports < num_ports:
+ for key in reversed(map(str, range(index_first+nports, index_first+num_ports))):
+ remove_port(get_ports, get_port, key);
+ continue
+ #add more ports
+ if nports > num_ports:
+ for key in map(str, range(index_first+num_ports, index_first+nports)):
+ insert_port(get_ports, get_port, key)
+ continue
+
+ def port_controller_modify(self, direction):
+ """
+ Change the port controller.
+
+ Args:
+ direction: +1 or -1
+
+ Returns:
+ true for change
+ """
+ changed = False
+ #concat the nports string from the private nports settings of all ports
+ nports_str = ' '.join([port._nports for port in self.get_ports()])
+ #modify all params whose keys appear in the nports string
+ for param in self.get_params():
+ if param.is_enum() or param.get_key() not in nports_str: continue
+ #try to increment the port controller by direction
+ try:
+ value = param.get_evaluated()
+ value = value + direction
+ if 0 < value:
+ param.set_value(value)
+ changed = True
+ except: pass
+ return changed
+
+ def get_doc(self):
+ doc = self._doc.strip('\n').replace('\\\n', '')
+ #merge custom doc with doxygen docs
+ return '\n'.join([doc, extract_docs.extract(self.get_key())]).strip('\n')
+
+ def get_category(self):
+ return _Block.get_category(self)
+
+ def get_imports(self):
+ """
+ Resolve all import statements.
+ Split each import statement at newlines.
+ Combine all import statments into a list.
+ Filter empty imports.
+
+ Returns:
+ a list of import statements
+ """
+ return filter(lambda i: i, sum(map(lambda i: self.resolve_dependencies(i).split('\n'), self._imports), []))
+
+ def get_make(self): return self.resolve_dependencies(self._make)
+ def get_var_make(self): return self.resolve_dependencies(self._var_make)
+
+ def get_callbacks(self):
+ """
+ Get a list of function callbacks for this block.
+
+ Returns:
+ a list of strings
+ """
+ def make_callback(callback):
+ callback = self.resolve_dependencies(callback)
+ if 'self.' in callback: return callback
+ return 'self.%s.%s'%(self.get_id(), callback)
+ return map(make_callback, self._callbacks)