1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
|
import codecs
import operator
import os
import tempfile
import textwrap
import time
from mako.template import Template
from .. import Messages, blocks
from ..Constants import TOP_BLOCK_FILE_MODE
from .FlowGraphProxy import FlowGraphProxy
from ..utils import expr_utils
DATA_DIR = os.path.dirname(__file__)
FLOW_GRAPH_TEMPLATE = os.path.join(DATA_DIR, 'flow_graph.py.mako')
flow_graph_template = Template(filename=FLOW_GRAPH_TEMPLATE)
class TopBlockGenerator(object):
def __init__(self, flow_graph, file_path):
"""
Initialize the top block generator object.
Args:
flow_graph: the flow graph object
file_path: the path to write the file to
"""
self._flow_graph = FlowGraphProxy(flow_graph)
self._generate_options = self._flow_graph.get_option('generate_options')
self._mode = TOP_BLOCK_FILE_MODE
dirname = os.path.dirname(file_path)
# Handle the case where the directory is read-only
# In this case, use the system's temp directory
if not os.access(dirname, os.W_OK):
dirname = tempfile.gettempdir()
filename = self._flow_graph.get_option('id') + '.py'
self.file_path = os.path.join(dirname, filename)
self._dirname = dirname
def _warnings(self):
throttling_blocks = [b for b in self._flow_graph.get_enabled_blocks()
if b.flags.throttle]
if not throttling_blocks and not self._generate_options.startswith('hb'):
Messages.send_warning("This flow graph may not have flow control: "
"no audio or RF hardware blocks found. "
"Add a Misc->Throttle block to your flow "
"graph to avoid CPU congestion.")
if len(throttling_blocks) > 1:
keys = set([b.key for b in throttling_blocks])
if len(keys) > 1 and 'blocks_throttle' in keys:
Messages.send_warning("This flow graph contains a throttle "
"block and another rate limiting block, "
"e.g. a hardware source or sink. "
"This is usually undesired. Consider "
"removing the throttle block.")
deprecated_block_keys = {b.name for b in self._flow_graph.get_enabled_blocks() if b.flags.deprecated}
for key in deprecated_block_keys:
Messages.send_warning("The block {!r} is deprecated.".format(key))
def write(self):
"""generate output and write it to files"""
self._warnings()
for filename, data in self._build_python_code_from_template():
with codecs.open(filename, 'w', encoding='utf-8') as fp:
fp.write(data)
if filename == self.file_path:
try:
os.chmod(filename, self._mode)
except:
pass
def _build_python_code_from_template(self):
"""
Convert the flow graph to python code.
Returns:
a string of python code
"""
output = []
fg = self._flow_graph
title = fg.get_option('title') or fg.get_option('id').replace('_', ' ').title()
variables = fg.get_variables()
parameters = fg.get_parameters()
monitors = fg.get_monitors()
for block in fg.iter_enabled_blocks():
key = block.key
file_path = os.path.join(self._dirname, block.name + '.py')
if key == 'epy_block':
src = block.params['_source_code'].get_value()
output.append((file_path, src))
elif key == 'epy_module':
src = block.params['source_code'].get_value()
output.append((file_path, src))
namespace = {
'flow_graph': fg,
'variables': variables,
'parameters': parameters,
'monitors': monitors,
'generate_options': self._generate_options,
'generated_time': time.ctime(),
}
flow_graph_code = flow_graph_template.render(
title=title,
imports=self._imports(),
blocks=self._blocks(),
callbacks=self._callbacks(),
connections=self._connections(),
**namespace
)
# strip trailing white-space
flow_graph_code = "\n".join(line.rstrip() for line in flow_graph_code.split("\n"))
output.append((self.file_path, flow_graph_code))
return output
def _imports(self):
fg = self._flow_graph
imports = fg.imports()
seen = set()
output = []
need_path_hack = any(imp.endswith("# grc-generated hier_block") for imp in imports)
if need_path_hack:
output.insert(0, textwrap.dedent("""\
import os
import sys
sys.path.append(os.environ.get('GRC_HIER_PATH', os.path.expanduser('~/.grc_gnuradio')))
"""))
seen.add('import os')
seen.add('import sys')
if fg.get_option('qt_qss_theme'):
imports.append('import os')
imports.append('import sys')
if fg.get_option('thread_safe_setters'):
imports.append('import threading')
def is_duplicate(l):
if l.startswith('import') or l.startswith('from') and l in seen:
return True
seen.add(line)
return False
for import_ in sorted(imports):
lines = import_.strip().split('\n')
if not lines[0]:
continue
for line in lines:
line = line.rstrip()
if not is_duplicate(line):
output.append(line)
return output
def _blocks(self):
fg = self._flow_graph
parameters = fg.get_parameters()
# List of blocks not including variables and imports and parameters and disabled
def _get_block_sort_text(block):
code = block.templates.render('make').replace(block.name, ' ')
try:
code += block.params['gui_hint'].get_value() # Newer gui markup w/ qtgui
except:
pass
return code
blocks = [
b for b in fg.blocks
if b.enabled and not (b.get_bypassed() or b.is_import or b in parameters or b.key == 'options')
]
blocks = expr_utils.sort_objects(blocks, operator.attrgetter('name'), _get_block_sort_text)
blocks_make = []
for block in blocks:
make = block.templates.render('make')
if not block.is_variable:
make = 'self.' + block.name + ' = ' + make
if make:
blocks_make.append((block, make))
return blocks_make
def _callbacks(self):
fg = self._flow_graph
variables = fg.get_variables()
parameters = fg.get_parameters()
# List of variable names
var_ids = [var.name for var in parameters + variables]
replace_dict = dict((var_id, 'self.' + var_id) for var_id in var_ids)
callbacks_all = []
for block in fg.iter_enabled_blocks():
callbacks_all.extend(expr_utils.expr_replace(cb, replace_dict) for cb in block.get_callbacks())
# Map var id to callbacks
def uses_var_id():
used = expr_utils.get_variable_dependencies(callback, [var_id])
return used and 'self.' + var_id in callback # callback might contain var_id itself
callbacks = {}
for var_id in var_ids:
callbacks[var_id] = [callback for callback in callbacks_all if uses_var_id()]
return callbacks
def _connections(self):
fg = self._flow_graph
templates = {key: Template(text)
for key, text in fg.parent_platform.connection_templates.items()}
def make_port_sig(port):
if port.parent.key in ('pad_source', 'pad_sink'):
block = 'self'
key = fg.get_pad_port_global_key(port)
else:
block = 'self.' + port.parent_block.name
key = port.key
if not key.isdigit():
key = repr(key)
return '({block}, {key})'.format(block=block, key=key)
connections = fg.get_enabled_connections()
# Get the virtual blocks and resolve their connections
connection_factory = fg.parent_platform.Connection
virtual = [c for c in connections if isinstance(c.source_block, blocks.VirtualSource)]
for connection in virtual:
sink = connection.sink_port
for source in connection.source_port.resolve_virtual_source():
resolved = connection_factory(fg.orignal_flowgraph, source, sink)
connections.append(resolved)
# Remove the virtual connection
connections.remove(connection)
# Bypassing blocks: Need to find all the enabled connections for the block using
# the *connections* object rather than get_connections(). Create new connections
# that bypass the selected block and remove the existing ones. This allows adjacent
# bypassed blocks to see the newly created connections to downstream blocks,
# allowing them to correctly construct bypass connections.
bypassed_blocks = fg.get_bypassed_blocks()
for block in bypassed_blocks:
# Get the upstream connection (off of the sink ports)
# Use *connections* not get_connections()
source_connection = [c for c in connections if c.sink_port == block.sinks[0]]
# The source connection should never have more than one element.
assert (len(source_connection) == 1)
# Get the source of the connection.
source_port = source_connection[0].source_port
# Loop through all the downstream connections
for sink in (c for c in connections if c.source_port == block.sources[0]):
if not sink.enabled:
# Ignore disabled connections
continue
connection = connection_factory(fg.orignal_flowgraph, source_port, sink.sink_port)
connections.append(connection)
# Remove this sink connection
connections.remove(sink)
# Remove the source connection
connections.remove(source_connection[0])
# List of connections where each endpoint is enabled (sorted by domains, block names)
def by_domain_and_blocks(c):
return c.type, c.source_block.name, c.sink_block.name
rendered = []
for con in sorted(connections, key=by_domain_and_blocks):
template = templates[con.type]
code = template.render(make_port_sig=make_port_sig, source=con.source_port, sink=con.sink_port)
rendered.append(code)
return rendered
|