1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
from __future__ import absolute_import
import inspect
import collections
import six
from six.moves import zip
TYPE_MAP = {
'complex64': 'complex', 'complex': 'complex',
'float32': 'float', 'float': 'float',
'int32': 'int', 'uint32': 'int',
'int16': 'short', 'uint16': 'short',
'int8': 'byte', 'uint8': 'byte',
}
BlockIO = collections.namedtuple('BlockIO', 'name cls params sinks sources doc callbacks')
def _ports(sigs, msgs):
ports = list()
for i, dtype in enumerate(sigs):
port_type = TYPE_MAP.get(dtype.base.name, None)
if not port_type:
raise ValueError("Can't map {0!r} to GRC port type".format(dtype))
vlen = dtype.shape[0] if len(dtype.shape) > 0 else 1
ports.append((str(i), port_type, vlen))
for msg_key in msgs:
if msg_key == 'system':
continue
ports.append((msg_key, 'message', 1))
return ports
def _find_block_class(source_code, cls):
ns = {}
try:
exec(source_code, ns)
except Exception as e:
raise ValueError("Can't interpret source code: " + str(e))
for var in six.itervalues(ns):
if inspect.isclass(var) and issubclass(var, cls):
return var
raise ValueError('No python block class found in code')
def extract(cls):
try:
from gnuradio import gr
import pmt
except ImportError:
raise EnvironmentError("Can't import GNU Radio")
if not inspect.isclass(cls):
cls = _find_block_class(cls, gr.gateway.gateway_block)
spec = inspect.getargspec(cls.__init__)
init_args = spec.args[1:]
defaults = [repr(arg) for arg in (spec.defaults or ())]
doc = cls.__doc__ or cls.__init__.__doc__ or ''
cls_name = cls.__name__
if len(defaults) + 1 != len(spec.args):
raise ValueError("Need all __init__ arguments to have default values")
try:
instance = cls()
except Exception as e:
raise RuntimeError("Can't create an instance of your block: " + str(e))
name = instance.name()
params = list(zip(init_args, defaults))
def settable(attr):
try:
return callable(getattr(cls, attr).fset) # check for a property with setter
except AttributeError:
return attr in instance.__dict__ # not dir() - only the instance attribs
callbacks = [attr for attr in dir(instance) if attr in init_args and settable(attr)]
sinks = _ports(instance.in_sig(),
pmt.to_python(instance.message_ports_in()))
sources = _ports(instance.out_sig(),
pmt.to_python(instance.message_ports_out()))
return BlockIO(name, cls_name, params, sinks, sources, doc, callbacks)
if __name__ == '__main__':
blk_code = """
import numpy as np
from gnuradio import gr
import pmt
class blk(gr.sync_block):
def __init__(self, param1=None, param2=None, param3=None):
"Test Docu"
gr.sync_block.__init__(
self,
name='Embedded Python Block',
in_sig = (np.float32,),
out_sig = (np.float32,np.complex64,),
)
self.message_port_register_in(pmt.intern('msg_in'))
self.message_port_register_out(pmt.intern('msg_out'))
self.param1 = param1
self._param2 = param2
self._param3 = param3
@property
def param2(self):
return self._param2
@property
def param3(self):
return self._param3
@param3.setter
def param3(self, value):
self._param3 = value
def work(self, inputs_items, output_items):
return 10
"""
from pprint import pprint
pprint(dict(extract(blk_code)._asdict()))
|