summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python')
-rw-r--r--gr-blocks/python/blocks/qa_block_gateway.py25
-rw-r--r--gr-blocks/python/blocks/qa_exponentiate_const_cci.py66
-rw-r--r--gr-blocks/python/blocks/qa_file_source_sink.py62
-rwxr-xr-xgr-blocks/python/blocks/qa_socket_pdu.py28
-rwxr-xr-xgr-blocks/python/blocks/qa_tag_gate.py75
-rwxr-xr-xgr-blocks/python/blocks/qa_tag_share.py73
6 files changed, 329 insertions, 0 deletions
diff --git a/gr-blocks/python/blocks/qa_block_gateway.py b/gr-blocks/python/blocks/qa_block_gateway.py
index 1e848cff04..6fdb0090ae 100644
--- a/gr-blocks/python/blocks/qa_block_gateway.py
+++ b/gr-blocks/python/blocks/qa_block_gateway.py
@@ -25,6 +25,19 @@ import pmt
from gnuradio import gr, gr_unittest, blocks
+
+class non_sync_block(gr.basic_block):
+ def __init__(self):
+ gr.basic_block.__init__(self,
+ name="non_sync_block",
+ in_sig=[numpy.float32],
+ out_sig=[numpy.float32, numpy.float32])
+ def general_work(self, input_items, output_items):
+ self.consume(0, len(input_items[0]))
+ self.produce(0,2)
+ self.produce(1,1)
+ return gr.WORK_CALLED_PRODUCE
+
class add_2_f32_1_f32(gr.sync_block):
def __init__(self):
gr.sync_block.__init__(
@@ -275,5 +288,17 @@ class test_block_gateway(gr_unittest.TestCase):
tb.run()
self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+ def test_non_sync_block(self):
+ tb = gr.top_block ()
+ src = blocks.vector_source_f(range(1000000))
+ sinks = [blocks.vector_sink_f(), blocks.vector_sink_f()]
+ dut = non_sync_block()
+ tb.connect(src, dut)
+ tb.connect((dut,0), sinks[0])
+ tb.connect((dut,1), sinks[1])
+ tb.run ()
+ self.assertEqual(len(sinks[0].data()), 2*len(sinks[1].data()))
+
+
if __name__ == '__main__':
gr_unittest.run(test_block_gateway, "test_block_gateway.xml")
diff --git a/gr-blocks/python/blocks/qa_exponentiate_const_cci.py b/gr-blocks/python/blocks/qa_exponentiate_const_cci.py
new file mode 100644
index 0000000000..0c4b65eb68
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_exponentiate_const_cci.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright 2017 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+from gnuradio import blocks
+import pmt
+
+class qa_exponentiate_const_cci(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001_t(self):
+ for exponent in range(1,10):
+ in_data = (1+1j, -1, 4-1j, -3-7j)
+ out_data = (in_data[0]**exponent, in_data[1]**exponent, in_data[2]**exponent, in_data[3]**exponent)
+
+ # Test streaming input
+ source = blocks.vector_source_c(in_data, False, 1)
+ exponentiate_const_cci = blocks.exponentiate_const_cci(exponent)
+ sink = blocks.vector_sink_c(1)
+
+ self.tb.connect(source, exponentiate_const_cci, sink)
+ self.tb.run()
+
+ self.assertAlmostEqual(sink.data(), out_data)
+
+ # Test vector input
+ for vlen in [2, 4]:
+ source = blocks.vector_source_c(in_data, False, 1)
+ s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, vlen)
+ exponentiate_const_cci = blocks.exponentiate_const_cci(exponent, vlen)
+ v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, vlen)
+ sink = blocks.vector_sink_c(1)
+
+ self.tb.connect(source, s2v, exponentiate_const_cci, v2s, sink)
+ self.tb.run()
+
+ self.assertAlmostEqual(sink.data(), out_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_exponentiate_const_cci, 'qa_exponentiate_const_cci.xml')
diff --git a/gr-blocks/python/blocks/qa_file_source_sink.py b/gr-blocks/python/blocks/qa_file_source_sink.py
index da1a07b347..32910cb4bc 100644
--- a/gr-blocks/python/blocks/qa_file_source_sink.py
+++ b/gr-blocks/python/blocks/qa_file_source_sink.py
@@ -23,6 +23,7 @@
from gnuradio import gr, gr_unittest, blocks
import os
import tempfile
+import pmt
class test_file_source_sink(gr_unittest.TestCase):
@@ -55,6 +56,7 @@ class test_file_source_sink(gr_unittest.TestCase):
result_data = snk2.data()
self.assertFloatTuplesAlmostEqual(expected_result, result_data)
+ self.assertEqual(len(snk2.tags()), 0)
def test_descriptor_001(self):
src_data = range(1000)
@@ -86,6 +88,7 @@ class test_file_source_sink(gr_unittest.TestCase):
result_data = snk2.data()
self.assertFloatTuplesAlmostEqual(expected_result, result_data)
+ self.assertEqual(len(snk2.tags()), 0)
def test_file_source_can_seek_after_open(self):
src_data = range(1000)
@@ -101,6 +104,65 @@ class test_file_source_sink(gr_unittest.TestCase):
source = blocks.file_source(gr.sizeof_float, temp.name)
self.assertTrue(source.seek(0, os.SEEK_SET))
+ def test_begin_tag(self):
+ src_data = range(1000)
+ expected_result = range(1000)
+
+ snk2 = blocks.vector_sink_f()
+
+ with tempfile.NamedTemporaryFile() as temp:
+ src = blocks.vector_source_f(src_data)
+ snk = blocks.file_sink(gr.sizeof_float, temp.name)
+ snk.set_unbuffered(True)
+
+ src2 = blocks.file_source(gr.sizeof_float, temp.name)
+ src2.set_begin_tag(pmt.string_to_symbol("file_begin"))
+
+ self.tb.connect(src, snk)
+ self.tb.run()
+
+ self.tb.disconnect(src, snk)
+ self.tb.connect(src2, snk2)
+ self.tb.run()
+
+ result_data = snk2.data()
+ self.assertFloatTuplesAlmostEqual(expected_result, result_data)
+ self.assertEqual(len(snk2.tags()), 1)
+
+ def test_begin_tag_repeat(self):
+ src_data = range(1000)
+ expected_result = range(1000)
+ expected_result.extend(range(1000))
+
+ snk2 = blocks.vector_sink_f()
+
+ with tempfile.NamedTemporaryFile() as temp:
+ src = blocks.vector_source_f(src_data)
+ snk = blocks.file_sink(gr.sizeof_float, temp.name)
+ snk.set_unbuffered(True)
+
+ src2 = blocks.file_source(gr.sizeof_float, temp.name, True)
+ src2.set_begin_tag(pmt.string_to_symbol("file_begin"))
+ hd = blocks.head(gr.sizeof_float, 2000)
+
+ self.tb.connect(src, snk)
+ self.tb.run()
+
+ self.tb.disconnect(src, snk)
+ self.tb.connect(src2, hd, snk2)
+ self.tb.run()
+
+ result_data = snk2.data()
+ self.assertFloatTuplesAlmostEqual(expected_result, result_data)
+ tags = snk2.tags()
+ self.assertEqual(len(tags), 2)
+ self.assertEqual(str(tags[0].key), "file_begin")
+ self.assertEqual(str(tags[0].value), "0")
+ self.assertEqual(tags[0].offset, 0)
+ self.assertEqual(str(tags[1].key), "file_begin")
+ self.assertEqual(str(tags[1].value), "1")
+ self.assertEqual(tags[1].offset, 1000)
+
if __name__ == '__main__':
gr_unittest.run(test_file_source_sink, "test_file_source_sink.xml")
diff --git a/gr-blocks/python/blocks/qa_socket_pdu.py b/gr-blocks/python/blocks/qa_socket_pdu.py
index db9f53c71e..5285585340 100755
--- a/gr-blocks/python/blocks/qa_socket_pdu.py
+++ b/gr-blocks/python/blocks/qa_socket_pdu.py
@@ -100,6 +100,34 @@ class qa_socket_pdu (gr_unittest.TestCase):
#self.tb.connect(pdu_to_ts, head, sink)
self.tb.run()
+ def test_004 (self):
+ # Test that the TCP server can stream PDUs <= the MTU size.
+ port = str(random.Random().randint(0, 30000) + 10000)
+ mtu = 10000
+ srcdata = tuple([x % 256 for x in xrange(mtu)])
+ data = pmt.init_u8vector(srcdata.__len__(), srcdata)
+ pdu_msg = pmt.cons(pmt.PMT_NIL, data)
+
+ self.pdu_source = blocks.message_strobe(pdu_msg, 500)
+ self.pdu_send = blocks.socket_pdu("TCP_SERVER", "localhost", port, mtu)
+ self.pdu_recv = blocks.socket_pdu("TCP_CLIENT", "localhost", port, mtu)
+ self.pdu_sink = blocks.message_debug()
+
+ self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus")
+ self.tb.msg_connect(self.pdu_recv, "pdus", self.pdu_sink, "store")
+
+ self.tb.start()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+
+ received = self.pdu_sink.get_message(0)
+ received_data = pmt.cdr(received)
+ msg_data = []
+ for i in xrange(mtu):
+ msg_data.append(pmt.u8vector_ref(received_data, i))
+ self.assertEqual(srcdata, tuple(msg_data))
+
if __name__ == '__main__':
gr_unittest.run(qa_socket_pdu, "qa_socket_pdu.xml")
diff --git a/gr-blocks/python/blocks/qa_tag_gate.py b/gr-blocks/python/blocks/qa_tag_gate.py
index acb2c68a82..7ae676562e 100755
--- a/gr-blocks/python/blocks/qa_tag_gate.py
+++ b/gr-blocks/python/blocks/qa_tag_gate.py
@@ -40,9 +40,84 @@ class qa_tag_gate (gr_unittest.TestCase):
src = blocks.vector_source_f(range(20), False, 1, (tag,))
gate = blocks.tag_gate(gr.sizeof_float, False)
sink = blocks.vector_sink_f()
+ self.tb.connect(src, gate, sink)
self.tb.run ()
self.assertEqual(len(sink.tags()), 0)
+ def test_002_t (self):
+ tags = []
+ tags.append(gr.tag_t())
+ tags[0].key = pmt.string_to_symbol('key')
+ tags[0].value = pmt.from_long(42)
+ tags[0].offset = 0
+ tags.append(gr.tag_t())
+ tags[1].key = pmt.string_to_symbol('key')
+ tags[1].value = pmt.from_long(42)
+ tags[1].offset = 5
+ tags.append(gr.tag_t())
+ tags[2].key = pmt.string_to_symbol('secondkey')
+ tags[2].value = pmt.from_long(42)
+ tags[2].offset = 6
+ src = blocks.vector_source_f(range(20), False, 1, tags)
+ gate = blocks.tag_gate(gr.sizeof_float, False)
+ gate.set_single_key("key")
+ self.assertEqual(gate.single_key(),"key")
+ sink = blocks.vector_sink_f()
+ self.tb.connect(src, gate, sink)
+ self.tb.run ()
+ self.assertEqual(len(sink.tags()), 1)
+
+ def test_003_t (self):
+ tags = []
+ tags.append(gr.tag_t())
+ tags[0].key = pmt.string_to_symbol('key')
+ tags[0].value = pmt.from_long(42)
+ tags[0].offset = 0
+ tags.append(gr.tag_t())
+ tags[1].key = pmt.string_to_symbol('key')
+ tags[1].value = pmt.from_long(42)
+ tags[1].offset = 5
+ tags.append(gr.tag_t())
+ tags[2].key = pmt.string_to_symbol('secondkey')
+ tags[2].value = pmt.from_long(42)
+ tags[2].offset = 6
+ src = blocks.vector_source_f(range(20), False, 1, tags)
+ gate = blocks.tag_gate(gr.sizeof_float, True)
+ gate.set_single_key("key")
+ sink = blocks.vector_sink_f()
+ self.tb.connect(src, gate, sink)
+ self.tb.run ()
+ self.assertEqual(len(sink.tags()), 3)
+
+ def test_004_t (self):
+ tags = []
+ tags.append(gr.tag_t())
+ tags[0].key = pmt.string_to_symbol('key')
+ tags[0].value = pmt.from_long(42)
+ tags[0].offset = 0
+ tags.append(gr.tag_t())
+ tags[1].key = pmt.string_to_symbol('key')
+ tags[1].value = pmt.from_long(42)
+ tags[1].offset = 5
+ tags.append(gr.tag_t())
+ tags[2].key = pmt.string_to_symbol('secondkey')
+ tags[2].value = pmt.from_long(42)
+ tags[2].offset = 6
+ src = blocks.vector_source_f(range(20), False, 1, tags)
+ gate = blocks.tag_gate(gr.sizeof_float, True)
+ sink = blocks.vector_sink_f()
+ self.tb.connect(src, gate, sink)
+ self.tb.run ()
+ self.assertEqual(len(sink.tags()), 3)
+
+ def test_005_t (self):
+ gate = blocks.tag_gate(gr.sizeof_float, True)
+ self.assertEqual(gate.single_key(), "")
+ gate.set_single_key("the_key")
+ self.assertEqual(gate.single_key(), "the_key")
+ gate.set_single_key("")
+ self.assertEqual(gate.single_key(), "")
+
if __name__ == '__main__':
gr_unittest.run(qa_tag_gate, "qa_tag_gate.xml")
diff --git a/gr-blocks/python/blocks/qa_tag_share.py b/gr-blocks/python/blocks/qa_tag_share.py
new file mode 100755
index 0000000000..3fff02ff66
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_tag_share.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright 2017 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+from gnuradio import blocks
+import pmt
+
+class qa_tag_share(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001_t(self):
+ # Constants
+ tag_key = 'in1_tag'
+ tag_value = 0
+ tag_offset = 0
+ in0_value = 1.0+1.0j
+ in1_value = 2.717
+ in0_data = (in0_value,)*10
+ in1_data = (in1_value,)*10
+ sink_data = in0_data
+
+ tag = gr.tag_t()
+ tag.key = pmt.to_pmt(tag_key)
+ tag.value = pmt.to_pmt(tag_value)
+ tag.offset = tag_offset
+
+ # Only tag Input 1 of the share block and see if it transfers
+ # to Output 0. Also verify that Input 0 stream is propagated to
+ # Output 0.
+ in0 = blocks.vector_source_c(in0_data, False, 1)
+ in1 = blocks.vector_source_f(in1_data, False, 1, (tag,))
+ tag_share = blocks.tag_share(gr.sizeof_gr_complex, gr.sizeof_float)
+ sink = blocks.vector_sink_c(1)
+
+ self.tb.connect(in0, (tag_share,0))
+ self.tb.connect(in1, (tag_share,1))
+ self.tb.connect(tag_share, sink)
+ self.tb.run()
+
+ self.assertEqual(len(sink.tags()), 1)
+ self.assertEqual(pmt.to_python(sink.tags()[0].key), tag_key)
+ self.assertEqual(pmt.to_python(sink.tags()[0].value), tag_value)
+ self.assertEqual(sink.tags()[0].offset, tag_offset)
+ self.assertEqual(sink.data(), sink_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_tag_share, 'qa_tag_share.xml')