summaryrefslogtreecommitdiff
path: root/gr-blocks/python/blocks/qa_pipe_fittings.py
blob: b239f46a391edca11b5e65a81d74a0d3e73ce39f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python
#
# Copyright 2005,2007,2010,2012,2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#


from gnuradio import gr, gr_unittest, blocks

def calc_expected_result(src_data, n):
    assert (len(src_data) % n) == 0
    result = [list() for x in range(n)]
    #print "len(result) =", len(result)
    for i in range(len(src_data)):
        (result[i % n]).append(src_data[i])
    return result


class test_pipe_fittings(gr_unittest.TestCase):

    def setUp(self):
        self.tb = gr.top_block ()

    def tearDown(self):
        self.tb = None

    def test_001(self):
        """
        Test stream_to_streams.
        """
        n = 8
        src_len = n * 8
        src_data = list(range(src_len))

        expected_results = calc_expected_result(src_data, n)
        #print "expected results: ", expected_results
        src = blocks.vector_source_i(src_data)
        op = blocks.stream_to_streams(gr.sizeof_int, n)
        self.tb.connect(src, op)

        dsts = []
        for i in range(n):
            dst = blocks.vector_sink_i()
            self.tb.connect((op, i), (dst, 0))
            dsts.append(dst)

        self.tb.run()

        for d in range(n):
            self.assertEqual(expected_results[d], dsts[d].data())

    def test_002(self):

        # Test streams_to_stream (using stream_to_streams).

        n = 8
        src_len = n * 8
        src_data = list(range(src_len))
        expected_results = src_data

        src = blocks.vector_source_i(src_data)
        op1 = blocks.stream_to_streams(gr.sizeof_int, n)
        op2 = blocks.streams_to_stream(gr.sizeof_int, n)
        dst = blocks.vector_sink_i()

        self.tb.connect(src, op1)
        for i in range(n):
            self.tb.connect((op1, i), (op2, i))
        self.tb.connect(op2, dst)

        self.tb.run()
        self.assertEqual(expected_results, dst.data())

    def test_003(self):

        #Test streams_to_vector (using stream_to_streams & vector_to_stream).

        n = 8
        src_len = n * 8
        src_data = list(range(src_len))
        expected_results = src_data

        src = blocks.vector_source_i(src_data)
        op1 = blocks.stream_to_streams(gr.sizeof_int, n)
        op2 = blocks.streams_to_vector(gr.sizeof_int, n)
        op3 = blocks.vector_to_stream(gr.sizeof_int, n)
        dst = blocks.vector_sink_i()

        self.tb.connect(src, op1)
        for i in range(n):
            self.tb.connect((op1, i), (op2, i))
        self.tb.connect(op2, op3, dst)

        self.tb.run()
        self.assertEqual(expected_results, dst.data())

    def test_004(self):

        #Test vector_to_streams.

        n = 8
        src_len = n * 8
        src_data = list(range(src_len))
        expected_results = src_data

        src = blocks.vector_source_i(src_data)
        op1 = blocks.stream_to_vector(gr.sizeof_int, n)
        op2 = blocks.vector_to_streams(gr.sizeof_int, n)
        op3 = blocks.streams_to_stream(gr.sizeof_int, n)
        dst = blocks.vector_sink_i()

        self.tb.connect(src, op1, op2)
        for i in range(n):
            self.tb.connect((op2, i), (op3, i))
        self.tb.connect(op3, dst)

        self.tb.run()
        self.assertEqual(expected_results, dst.data())


if __name__ == '__main__':
    gr_unittest.run(test_pipe_fittings, "test_pipe_fittings.xml")