summaryrefslogtreecommitdiff
path: root/grc/core/utils/epy_block_io.py
blob: 823116adb95444881ac239fd72f1a25d7bbe1eb5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

from __future__ import absolute_import

import inspect
import collections

import six
from six.moves import zip


TYPE_MAP = {
    'complex64': 'complex', 'complex': 'complex',
    'float32': 'float', 'float': 'float',
    'int32': 'int', 'uint32': 'int',
    'int16': 'short', 'uint16': 'short',
    'int8': 'byte', 'uint8': 'byte',
}

BlockIO = collections.namedtuple('BlockIO', 'name cls params sinks sources doc callbacks')


def _ports(sigs, msgs):
    ports = list()
    for i, dtype in enumerate(sigs):
        port_type = TYPE_MAP.get(dtype.base.name, None)
        if not port_type:
            raise ValueError("Can't map {0!r} to GRC port type".format(dtype))
        vlen = dtype.shape[0] if len(dtype.shape) > 0 else 1
        ports.append((str(i), port_type, vlen))
    for msg_key in msgs:
        if msg_key == 'system':
            continue
        ports.append((msg_key, 'message', 1))
    return ports


def _find_block_class(source_code, cls):
    ns = {}
    try:
        exec(source_code, ns)
    except Exception as e:
        raise ValueError("Can't interpret source code: " + str(e))
    for var in six.itervalues(ns):
        if inspect.isclass(var) and issubclass(var, cls):
            return var
    raise ValueError('No python block class found in code')


def extract(cls):
    try:
        from gnuradio import gr
        import pmt
    except ImportError:
        raise EnvironmentError("Can't import GNU Radio")

    if not inspect.isclass(cls):
        cls = _find_block_class(cls, gr.gateway.gateway_block)

    spec = inspect.getargspec(cls.__init__)
    init_args = spec.args[1:]
    defaults = [repr(arg) for arg in (spec.defaults or ())]
    doc = cls.__doc__ or cls.__init__.__doc__ or ''
    cls_name = cls.__name__

    if len(defaults) + 1 != len(spec.args):
        raise ValueError("Need all __init__ arguments to have default values")

    try:
        instance = cls()
    except Exception as e:
        raise RuntimeError("Can't create an instance of your block: " + str(e))

    name = instance.name()

    params = list(zip(init_args, defaults))

    def settable(attr):
        try:
            return callable(getattr(cls, attr).fset)  # check for a property with setter
        except AttributeError:
            return attr in instance.__dict__  # not dir() - only the instance attribs

    callbacks = [attr for attr in dir(instance) if attr in init_args and settable(attr)]

    sinks = _ports(instance.in_sig(),
                   pmt.to_python(instance.message_ports_in()))
    sources = _ports(instance.out_sig(),
                     pmt.to_python(instance.message_ports_out()))

    return BlockIO(name, cls_name, params, sinks, sources, doc, callbacks)


if __name__ == '__main__':
    blk_code = """
import numpy as np
from gnuradio import gr
import pmt

class blk(gr.sync_block):
    def __init__(self, param1=None, param2=None, param3=None):
        "Test Docu"
        gr.sync_block.__init__(
            self,
            name='Embedded Python Block',
            in_sig = (np.float32,),
            out_sig = (np.float32,np.complex64,),
        )
        self.message_port_register_in(pmt.intern('msg_in'))
        self.message_port_register_out(pmt.intern('msg_out'))
        self.param1 = param1
        self._param2 = param2
        self._param3 = param3

    @property
    def param2(self):
        return self._param2

    @property
    def param3(self):
        return self._param3

    @param3.setter
    def param3(self, value):
        self._param3 = value

    def work(self, inputs_items, output_items):
        return 10
    """
    from pprint import pprint
    pprint(dict(extract(blk_code)._asdict()))