1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
#
# Copyright 2019 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
""" Module to get io_signature of the header block """
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals
import re
import itertools
import logging
import string
from ..core import Constants
LOGGER = logging.getLogger(__name__)
def io_signature(impl_file):
"""
function to generate the io_signature of the block
: returns the io parmaters
"""
parsed_io = {
"input": {
"signature": None
},
"output": {
"signature": None
}
}
with open(impl_file, 'r') as impl:
io_lines = []
for line in impl:
if Constants.IO_SIGNATURE in line:
io_lines.append(line)
if len(io_lines) > 2:
io_lines = io_lines[0:2]
_io_sig = []
for line in io_lines:
if Constants.IO_SIGNATURE in line:
line = line.lstrip().rstrip().split(Constants.IO_SIGNATURE)
_io_sig.append(line)
_io_sig = list(itertools.chain.from_iterable(_io_sig))
for index, _element in enumerate(_io_sig):
_io_sig[index] = _element.lstrip().rstrip()
if all(i in string.punctuation for i in _element):
_io_sig.remove(_element)
_io_sig = list(filter(None, _io_sig))
io_func = []
for _io in _io_sig:
if Constants.MAKE in _io:
io_func.append(_io.lstrip().rstrip(Constants.STRIP_SYMBOLS))
for signature in Constants.SIGNATURE_LIST:
if signature in io_func[0] and parsed_io['input']['signature'] is None:
parsed_io['input']['signature'] = signature
io_func[0] = io_func[0].lstrip(signature+' (')
if signature in io_func[1] and parsed_io['output']['signature'] is None:
parsed_io['output']['signature'] = signature
io_func[1] = io_func[1].lstrip(signature+' (')
io_elements = []
for _io in io_func:
_io = _io.split(',')
io_elements.append(_io)
io_elements = list(itertools.chain.from_iterable(io_elements))
for index, _io in enumerate(io_elements):
_io = _io.lstrip(' (').rstrip(' )')
if Constants.OPEN_BRACKET in _io:
_io = _io + Constants.CLOSE_BRACKET
io_elements[index] = _io
# Because of any possible combination of I/O signature and different number
# of arguments, manual if else loop is required
if parsed_io['input']['signature'] is Constants.MAKE:
parsed_io['input']['min_streams'] = io_elements[0]
parsed_io['input']['max_streams'] = io_elements[1]
parsed_io['input']['sizeof_stream_item'] = io_elements[2]
del io_elements[0:3]
elif parsed_io['input']['signature'] is Constants.MAKE2:
parsed_io['input']['min_streams'] = io_elements[0]
parsed_io['input']['max_streams'] = io_elements[1]
parsed_io['input']['sizeof_stream_item1'] = io_elements[2]
parsed_io['input']['sizeof_stream_item2'] = io_elements[3]
del io_elements[0:4]
elif parsed_io['input']['signature'] is Constants.MAKE3:
parsed_io['input']['min_streams'] = io_elements[0]
parsed_io['input']['max_streams'] = io_elements[1]
parsed_io['input']['sizeof_stream_item1'] = io_elements[2]
parsed_io['input']['sizeof_stream_item2'] = io_elements[3]
parsed_io['input']['sizeof_stream_item3'] = io_elements[4]
del io_elements[0:5]
elif parsed_io['input']['signature'] is Constants.MAKEV:
parsed_io['input']['min_streams'] = io_elements[0]
parsed_io['input']['max_streams'] = io_elements[1]
parsed_io['input']['sizeof_stream_items'] = io_elements[2]
del io_elements[0:3]
if parsed_io['output']['signature'] is Constants.MAKE:
parsed_io['output']['min_streams'] = io_elements[0]
parsed_io['output']['max_streams'] = io_elements[1]
parsed_io['output']['sizeof_stream_item'] = io_elements[2]
del io_elements[0:3]
elif parsed_io['output']['signature'] is Constants.MAKE2:
parsed_io['output']['min_streams'] = io_elements[0]
parsed_io['output']['max_streams'] = io_elements[1]
parsed_io['output']['sizeof_stream_item1'] = io_elements[2]
parsed_io['output']['sizeof_stream_item2'] = io_elements[3]
del io_elements[0:4]
elif parsed_io['output']['signature'] is Constants.MAKE3:
parsed_io['output']['min_streams'] = io_elements[0]
parsed_io['output']['max_streams'] = io_elements[1]
parsed_io['output']['sizeof_stream_item1'] = io_elements[2]
parsed_io['output']['sizeof_stream_item2'] = io_elements[3]
parsed_io['output']['sizeof_stream_item3'] = io_elements[4]
del io_elements[0:5]
elif parsed_io['output']['signature'] is Constants.MAKEV:
parsed_io['output']['min_streams'] = io_elements[0]
parsed_io['output']['max_streams'] = io_elements[1]
parsed_io['output']['sizeof_stream_items'] = io_elements[2]
del io_elements[0:3]
return parsed_io
def message_port(impl_file):
"""
parses message ports from implementation file
"""
parsed_message_port = {
"input": [],
"output": []
}
with open(impl_file, 'r') as impl:
_input = []
_output = []
for line in impl:
if Constants.MESSAGE_INPUT in line:
_input.append(line)
if Constants.MESSAGE_OUTPUT in line:
_output.append(line)
input_port = []
output_port = []
if _input:
for port in _input:
port = port.lstrip().rstrip().strip(Constants.MESSAGE_INPUT)
pattern = port.find('\"')
if pattern != -1:
if re.findall(r'"([^"]*)"', port)[0]:
input_port.append(re.findall(r'"([^"]*)"', port)[0])
else:
input_port.append(port[port.find('(')+1:port.rfind(')')])
_temp_port = ''.join(map(str, input_port))
input_port.clear()
input_port.append(_temp_port)
if _output:
for port in _output:
port = port.lstrip().rstrip().strip(Constants.MESSAGE_OUTPUT)
pattern = port.find('\"')
if pattern != -1:
if re.findall(r'"([^"]*)"', port)[0]:
output_port.append(re.findall(r'"([^"]*)"', port)[0])
else:
output_port.append(port[port.find('(')+1:port.rfind(')')])
_temp_port = ''.join(map(str, output_port))
output_port.clear()
output_port.append(_temp_port)
if input_port:
for port in input_port:
parsed_message_port['input'].append(port)
if output_port:
for port in output_port:
parsed_message_port['output'].append(port)
return parsed_message_port
|