summaryrefslogtreecommitdiff
path: root/grc/core/epy_block_io.py
blob: e089908a01d8adbd36f15472a8c353d331bb7284 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

import inspect
import collections

from gnuradio import gr
import pmt


TYPE_MAP = {
    'complex64': 'complex', 'complex': 'complex',
    'float32': 'float', 'float': 'float',
    'int32': 'int', 'uint32': 'int',
    'int16': 'short', 'uint16': 'short',
    'int8': 'byte', 'uint8': 'byte',
}

BlockIO = collections.namedtuple('BlockIO', 'name cls params sinks sources doc')


def _ports(sigs, msgs):
    ports = list()
    for i, dtype in enumerate(sigs):
        port_type = TYPE_MAP.get(dtype.name, None)
        if not port_type:
            raise ValueError("Can't map {0:!r} to GRC port type".format(dtype))
        ports.append((str(i), port_type))
    for msg_key in msgs:
        if msg_key == 'system':
            continue
        ports.append((msg_key, 'message'))
    return ports


def _blk_class(source_code):
    ns = {}
    try:
        exec source_code in ns
    except Exception as e:
        raise ValueError("Can't interpret source code: " + str(e))
    for var in ns.itervalues():
        if inspect.isclass(var)and issubclass(var, gr.gateway.gateway_block):
            return var
    raise ValueError('No python block class found in code')


def extract(cls):
    if not inspect.isclass(cls):
        cls = _blk_class(cls)

    spec = inspect.getargspec(cls.__init__)
    defaults = map(repr, spec.defaults or ())
    doc = cls.__doc__ or cls.__init__.__doc__ or ''
    cls_name = cls.__name__

    if len(defaults) + 1 != len(spec.args):
        raise ValueError("Need all __init__ arguments to have default values")

    try:
        instance = cls()
    except Exception as e:
        raise RuntimeError("Can't create an instance of your block: " + str(e))

    name = instance.name()
    params = list(zip(spec.args[1:], defaults))

    sinks = _ports(instance.in_sig(),
                   pmt.to_python(instance.message_ports_in()))
    sources = _ports(instance.out_sig(),
                     pmt.to_python(instance.message_ports_out()))

    return BlockIO(name, cls_name, params, sinks, sources, doc)


if __name__ == '__main__':
    blk_code = """
import numpy as np
from gnuradio import gr
import pmt

class blk(gr.sync_block):
    def __init__(self, param1=None, param2=None):
        "Test Docu"
        gr.sync_block.__init__(
            self,
            name='Embedded Python Block',
            in_sig = (np.float32,),
            out_sig = (np.float32,np.complex64,),
        )
        self.message_port_register_in(pmt.intern('msg_in'))
        self.message_port_register_out(pmt.intern('msg_out'))

    def work(self, inputs_items, output_items):
        return 10
    """
    print extract(blk_code)