diff options
Diffstat (limited to 'gr-blocks/python')
-rw-r--r-- | gr-blocks/python/blocks/qa_block_gateway.py | 25 | ||||
-rw-r--r-- | gr-blocks/python/blocks/qa_exponentiate_const_cci.py | 66 | ||||
-rw-r--r-- | gr-blocks/python/blocks/qa_file_source_sink.py | 62 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_socket_pdu.py | 28 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_tag_gate.py | 75 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_tag_share.py | 73 |
6 files changed, 329 insertions, 0 deletions
diff --git a/gr-blocks/python/blocks/qa_block_gateway.py b/gr-blocks/python/blocks/qa_block_gateway.py index 1e848cff04..6fdb0090ae 100644 --- a/gr-blocks/python/blocks/qa_block_gateway.py +++ b/gr-blocks/python/blocks/qa_block_gateway.py @@ -25,6 +25,19 @@ import pmt from gnuradio import gr, gr_unittest, blocks + +class non_sync_block(gr.basic_block): + def __init__(self): + gr.basic_block.__init__(self, + name="non_sync_block", + in_sig=[numpy.float32], + out_sig=[numpy.float32, numpy.float32]) + def general_work(self, input_items, output_items): + self.consume(0, len(input_items[0])) + self.produce(0,2) + self.produce(1,1) + return gr.WORK_CALLED_PRODUCE + class add_2_f32_1_f32(gr.sync_block): def __init__(self): gr.sync_block.__init__( @@ -275,5 +288,17 @@ class test_block_gateway(gr_unittest.TestCase): tb.run() self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + def test_non_sync_block(self): + tb = gr.top_block () + src = blocks.vector_source_f(range(1000000)) + sinks = [blocks.vector_sink_f(), blocks.vector_sink_f()] + dut = non_sync_block() + tb.connect(src, dut) + tb.connect((dut,0), sinks[0]) + tb.connect((dut,1), sinks[1]) + tb.run () + self.assertEqual(len(sinks[0].data()), 2*len(sinks[1].data())) + + if __name__ == '__main__': gr_unittest.run(test_block_gateway, "test_block_gateway.xml") diff --git a/gr-blocks/python/blocks/qa_exponentiate_const_cci.py b/gr-blocks/python/blocks/qa_exponentiate_const_cci.py new file mode 100644 index 0000000000..0c4b65eb68 --- /dev/null +++ b/gr-blocks/python/blocks/qa_exponentiate_const_cci.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2017 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from gnuradio import blocks +import pmt + +class qa_exponentiate_const_cci(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001_t(self): + for exponent in range(1,10): + in_data = (1+1j, -1, 4-1j, -3-7j) + out_data = (in_data[0]**exponent, in_data[1]**exponent, in_data[2]**exponent, in_data[3]**exponent) + + # Test streaming input + source = blocks.vector_source_c(in_data, False, 1) + exponentiate_const_cci = blocks.exponentiate_const_cci(exponent) + sink = blocks.vector_sink_c(1) + + self.tb.connect(source, exponentiate_const_cci, sink) + self.tb.run() + + self.assertAlmostEqual(sink.data(), out_data) + + # Test vector input + for vlen in [2, 4]: + source = blocks.vector_source_c(in_data, False, 1) + s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, vlen) + exponentiate_const_cci = blocks.exponentiate_const_cci(exponent, vlen) + v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, vlen) + sink = blocks.vector_sink_c(1) + + self.tb.connect(source, s2v, exponentiate_const_cci, v2s, sink) + self.tb.run() + + self.assertAlmostEqual(sink.data(), out_data) + + +if __name__ == '__main__': + gr_unittest.run(qa_exponentiate_const_cci, 'qa_exponentiate_const_cci.xml') diff --git a/gr-blocks/python/blocks/qa_file_source_sink.py b/gr-blocks/python/blocks/qa_file_source_sink.py index da1a07b347..32910cb4bc 100644 --- a/gr-blocks/python/blocks/qa_file_source_sink.py +++ b/gr-blocks/python/blocks/qa_file_source_sink.py @@ -23,6 +23,7 @@ from gnuradio import gr, gr_unittest, blocks import os import tempfile +import pmt class test_file_source_sink(gr_unittest.TestCase): @@ -55,6 +56,7 @@ class test_file_source_sink(gr_unittest.TestCase): result_data = snk2.data() self.assertFloatTuplesAlmostEqual(expected_result, result_data) + self.assertEqual(len(snk2.tags()), 0) def test_descriptor_001(self): src_data = range(1000) @@ -86,6 +88,7 @@ class test_file_source_sink(gr_unittest.TestCase): result_data = snk2.data() self.assertFloatTuplesAlmostEqual(expected_result, result_data) + self.assertEqual(len(snk2.tags()), 0) def test_file_source_can_seek_after_open(self): src_data = range(1000) @@ -101,6 +104,65 @@ class test_file_source_sink(gr_unittest.TestCase): source = blocks.file_source(gr.sizeof_float, temp.name) self.assertTrue(source.seek(0, os.SEEK_SET)) + def test_begin_tag(self): + src_data = range(1000) + expected_result = range(1000) + + snk2 = blocks.vector_sink_f() + + with tempfile.NamedTemporaryFile() as temp: + src = blocks.vector_source_f(src_data) + snk = blocks.file_sink(gr.sizeof_float, temp.name) + snk.set_unbuffered(True) + + src2 = blocks.file_source(gr.sizeof_float, temp.name) + src2.set_begin_tag(pmt.string_to_symbol("file_begin")) + + self.tb.connect(src, snk) + self.tb.run() + + self.tb.disconnect(src, snk) + self.tb.connect(src2, snk2) + self.tb.run() + + result_data = snk2.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data) + self.assertEqual(len(snk2.tags()), 1) + + def test_begin_tag_repeat(self): + src_data = range(1000) + expected_result = range(1000) + expected_result.extend(range(1000)) + + snk2 = blocks.vector_sink_f() + + with tempfile.NamedTemporaryFile() as temp: + src = blocks.vector_source_f(src_data) + snk = blocks.file_sink(gr.sizeof_float, temp.name) + snk.set_unbuffered(True) + + src2 = blocks.file_source(gr.sizeof_float, temp.name, True) + src2.set_begin_tag(pmt.string_to_symbol("file_begin")) + hd = blocks.head(gr.sizeof_float, 2000) + + self.tb.connect(src, snk) + self.tb.run() + + self.tb.disconnect(src, snk) + self.tb.connect(src2, hd, snk2) + self.tb.run() + + result_data = snk2.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data) + tags = snk2.tags() + self.assertEqual(len(tags), 2) + self.assertEqual(str(tags[0].key), "file_begin") + self.assertEqual(str(tags[0].value), "0") + self.assertEqual(tags[0].offset, 0) + self.assertEqual(str(tags[1].key), "file_begin") + self.assertEqual(str(tags[1].value), "1") + self.assertEqual(tags[1].offset, 1000) + if __name__ == '__main__': gr_unittest.run(test_file_source_sink, "test_file_source_sink.xml") diff --git a/gr-blocks/python/blocks/qa_socket_pdu.py b/gr-blocks/python/blocks/qa_socket_pdu.py index db9f53c71e..5285585340 100755 --- a/gr-blocks/python/blocks/qa_socket_pdu.py +++ b/gr-blocks/python/blocks/qa_socket_pdu.py @@ -100,6 +100,34 @@ class qa_socket_pdu (gr_unittest.TestCase): #self.tb.connect(pdu_to_ts, head, sink) self.tb.run() + def test_004 (self): + # Test that the TCP server can stream PDUs <= the MTU size. + port = str(random.Random().randint(0, 30000) + 10000) + mtu = 10000 + srcdata = tuple([x % 256 for x in xrange(mtu)]) + data = pmt.init_u8vector(srcdata.__len__(), srcdata) + pdu_msg = pmt.cons(pmt.PMT_NIL, data) + + self.pdu_source = blocks.message_strobe(pdu_msg, 500) + self.pdu_send = blocks.socket_pdu("TCP_SERVER", "localhost", port, mtu) + self.pdu_recv = blocks.socket_pdu("TCP_CLIENT", "localhost", port, mtu) + self.pdu_sink = blocks.message_debug() + + self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus") + self.tb.msg_connect(self.pdu_recv, "pdus", self.pdu_sink, "store") + + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + + received = self.pdu_sink.get_message(0) + received_data = pmt.cdr(received) + msg_data = [] + for i in xrange(mtu): + msg_data.append(pmt.u8vector_ref(received_data, i)) + self.assertEqual(srcdata, tuple(msg_data)) + if __name__ == '__main__': gr_unittest.run(qa_socket_pdu, "qa_socket_pdu.xml") diff --git a/gr-blocks/python/blocks/qa_tag_gate.py b/gr-blocks/python/blocks/qa_tag_gate.py index acb2c68a82..7ae676562e 100755 --- a/gr-blocks/python/blocks/qa_tag_gate.py +++ b/gr-blocks/python/blocks/qa_tag_gate.py @@ -40,9 +40,84 @@ class qa_tag_gate (gr_unittest.TestCase): src = blocks.vector_source_f(range(20), False, 1, (tag,)) gate = blocks.tag_gate(gr.sizeof_float, False) sink = blocks.vector_sink_f() + self.tb.connect(src, gate, sink) self.tb.run () self.assertEqual(len(sink.tags()), 0) + def test_002_t (self): + tags = [] + tags.append(gr.tag_t()) + tags[0].key = pmt.string_to_symbol('key') + tags[0].value = pmt.from_long(42) + tags[0].offset = 0 + tags.append(gr.tag_t()) + tags[1].key = pmt.string_to_symbol('key') + tags[1].value = pmt.from_long(42) + tags[1].offset = 5 + tags.append(gr.tag_t()) + tags[2].key = pmt.string_to_symbol('secondkey') + tags[2].value = pmt.from_long(42) + tags[2].offset = 6 + src = blocks.vector_source_f(range(20), False, 1, tags) + gate = blocks.tag_gate(gr.sizeof_float, False) + gate.set_single_key("key") + self.assertEqual(gate.single_key(),"key") + sink = blocks.vector_sink_f() + self.tb.connect(src, gate, sink) + self.tb.run () + self.assertEqual(len(sink.tags()), 1) + + def test_003_t (self): + tags = [] + tags.append(gr.tag_t()) + tags[0].key = pmt.string_to_symbol('key') + tags[0].value = pmt.from_long(42) + tags[0].offset = 0 + tags.append(gr.tag_t()) + tags[1].key = pmt.string_to_symbol('key') + tags[1].value = pmt.from_long(42) + tags[1].offset = 5 + tags.append(gr.tag_t()) + tags[2].key = pmt.string_to_symbol('secondkey') + tags[2].value = pmt.from_long(42) + tags[2].offset = 6 + src = blocks.vector_source_f(range(20), False, 1, tags) + gate = blocks.tag_gate(gr.sizeof_float, True) + gate.set_single_key("key") + sink = blocks.vector_sink_f() + self.tb.connect(src, gate, sink) + self.tb.run () + self.assertEqual(len(sink.tags()), 3) + + def test_004_t (self): + tags = [] + tags.append(gr.tag_t()) + tags[0].key = pmt.string_to_symbol('key') + tags[0].value = pmt.from_long(42) + tags[0].offset = 0 + tags.append(gr.tag_t()) + tags[1].key = pmt.string_to_symbol('key') + tags[1].value = pmt.from_long(42) + tags[1].offset = 5 + tags.append(gr.tag_t()) + tags[2].key = pmt.string_to_symbol('secondkey') + tags[2].value = pmt.from_long(42) + tags[2].offset = 6 + src = blocks.vector_source_f(range(20), False, 1, tags) + gate = blocks.tag_gate(gr.sizeof_float, True) + sink = blocks.vector_sink_f() + self.tb.connect(src, gate, sink) + self.tb.run () + self.assertEqual(len(sink.tags()), 3) + + def test_005_t (self): + gate = blocks.tag_gate(gr.sizeof_float, True) + self.assertEqual(gate.single_key(), "") + gate.set_single_key("the_key") + self.assertEqual(gate.single_key(), "the_key") + gate.set_single_key("") + self.assertEqual(gate.single_key(), "") + if __name__ == '__main__': gr_unittest.run(qa_tag_gate, "qa_tag_gate.xml") diff --git a/gr-blocks/python/blocks/qa_tag_share.py b/gr-blocks/python/blocks/qa_tag_share.py new file mode 100755 index 0000000000..3fff02ff66 --- /dev/null +++ b/gr-blocks/python/blocks/qa_tag_share.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2017 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +from gnuradio import blocks +import pmt + +class qa_tag_share(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001_t(self): + # Constants + tag_key = 'in1_tag' + tag_value = 0 + tag_offset = 0 + in0_value = 1.0+1.0j + in1_value = 2.717 + in0_data = (in0_value,)*10 + in1_data = (in1_value,)*10 + sink_data = in0_data + + tag = gr.tag_t() + tag.key = pmt.to_pmt(tag_key) + tag.value = pmt.to_pmt(tag_value) + tag.offset = tag_offset + + # Only tag Input 1 of the share block and see if it transfers + # to Output 0. Also verify that Input 0 stream is propagated to + # Output 0. + in0 = blocks.vector_source_c(in0_data, False, 1) + in1 = blocks.vector_source_f(in1_data, False, 1, (tag,)) + tag_share = blocks.tag_share(gr.sizeof_gr_complex, gr.sizeof_float) + sink = blocks.vector_sink_c(1) + + self.tb.connect(in0, (tag_share,0)) + self.tb.connect(in1, (tag_share,1)) + self.tb.connect(tag_share, sink) + self.tb.run() + + self.assertEqual(len(sink.tags()), 1) + self.assertEqual(pmt.to_python(sink.tags()[0].key), tag_key) + self.assertEqual(pmt.to_python(sink.tags()[0].value), tag_value) + self.assertEqual(sink.tags()[0].offset, tag_offset) + self.assertEqual(sink.data(), sink_data) + + +if __name__ == '__main__': + gr_unittest.run(qa_tag_share, 'qa_tag_share.xml') |