1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
#!/usr/bin/env python
#
# Copyright 2005,2007,2010,2012,2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
from gnuradio import gr, gr_unittest, blocks
def calc_expected_result(src_data, n):
assert (len(src_data) % n) == 0
result = [list() for x in range(n)]
#print "len(result) =", len(result)
for i in range(len(src_data)):
(result[i % n]).append(src_data[i])
return result
class test_pipe_fittings(gr_unittest.TestCase):
def setUp(self):
self.tb = gr.top_block ()
def tearDown(self):
self.tb = None
def test_001(self):
"""
Test stream_to_streams.
"""
n = 8
src_len = n * 8
src_data = list(range(src_len))
expected_results = calc_expected_result(src_data, n)
#print "expected results: ", expected_results
src = blocks.vector_source_i(src_data)
op = blocks.stream_to_streams(gr.sizeof_int, n)
self.tb.connect(src, op)
dsts = []
for i in range(n):
dst = blocks.vector_sink_i()
self.tb.connect((op, i), (dst, 0))
dsts.append(dst)
self.tb.run()
for d in range(n):
self.assertEqual(expected_results[d], dsts[d].data())
def test_002(self):
# Test streams_to_stream (using stream_to_streams).
n = 8
src_len = n * 8
src_data = list(range(src_len))
expected_results = src_data
src = blocks.vector_source_i(src_data)
op1 = blocks.stream_to_streams(gr.sizeof_int, n)
op2 = blocks.streams_to_stream(gr.sizeof_int, n)
dst = blocks.vector_sink_i()
self.tb.connect(src, op1)
for i in range(n):
self.tb.connect((op1, i), (op2, i))
self.tb.connect(op2, dst)
self.tb.run()
self.assertEqual(expected_results, dst.data())
def test_003(self):
#Test streams_to_vector (using stream_to_streams & vector_to_stream).
n = 8
src_len = n * 8
src_data = list(range(src_len))
expected_results = src_data
src = blocks.vector_source_i(src_data)
op1 = blocks.stream_to_streams(gr.sizeof_int, n)
op2 = blocks.streams_to_vector(gr.sizeof_int, n)
op3 = blocks.vector_to_stream(gr.sizeof_int, n)
dst = blocks.vector_sink_i()
self.tb.connect(src, op1)
for i in range(n):
self.tb.connect((op1, i), (op2, i))
self.tb.connect(op2, op3, dst)
self.tb.run()
self.assertEqual(expected_results, dst.data())
def test_004(self):
#Test vector_to_streams.
n = 8
src_len = n * 8
src_data = list(range(src_len))
expected_results = src_data
src = blocks.vector_source_i(src_data)
op1 = blocks.stream_to_vector(gr.sizeof_int, n)
op2 = blocks.vector_to_streams(gr.sizeof_int, n)
op3 = blocks.streams_to_stream(gr.sizeof_int, n)
dst = blocks.vector_sink_i()
self.tb.connect(src, op1, op2)
for i in range(n):
self.tb.connect((op2, i), (op3, i))
self.tb.connect(op3, dst)
self.tb.run()
self.assertEqual(expected_results, dst.data())
if __name__ == '__main__':
gr_unittest.run(test_pipe_fittings, "test_pipe_fittings.xml")
|