summaryrefslogtreecommitdiff
path: root/gr-blocks/python/qa_block_gateway.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python/qa_block_gateway.py')
-rw-r--r--gr-blocks/python/qa_block_gateway.py256
1 files changed, 256 insertions, 0 deletions
diff --git a/gr-blocks/python/qa_block_gateway.py b/gr-blocks/python/qa_block_gateway.py
new file mode 100644
index 0000000000..20a2660cba
--- /dev/null
+++ b/gr-blocks/python/qa_block_gateway.py
@@ -0,0 +1,256 @@
+#
+# Copyright 2011-2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import pmt
+import numpy
+import blocks_swig as blocks
+
+class add_2_f32_1_f32(gr.sync_block):
+ def __init__(self):
+ gr.sync_block.__init__(
+ self,
+ name = "add 2 f32",
+ in_sig = [numpy.float32, numpy.float32],
+ out_sig = [numpy.float32],
+ )
+
+ def work(self, input_items, output_items):
+ output_items[0][:] = input_items[0] + input_items[1]
+ return len(output_items[0])
+
+class add_2_fc32_1_fc32(gr.sync_block):
+ def __init__(self):
+ gr.sync_block.__init__(
+ self,
+ name = "add 2 fc32",
+ in_sig = [numpy.complex64, numpy.complex64],
+ out_sig = [numpy.complex64],
+ )
+
+ def work(self, input_items, output_items):
+ output_items[0][:] = input_items[0] + input_items[1]
+ return len(output_items[0])
+
+class convolve(gr.sync_block):
+ """
+ A demonstration using block history to properly perform a convolution.
+ """
+ def __init__(self):
+ gr.sync_block.__init__(
+ self,
+ name = "convolve",
+ in_sig = [numpy.float32],
+ out_sig = [numpy.float32]
+ )
+ self._taps = [1, 0, 0, 0]
+ self.set_history(len(self._taps))
+
+ def work(self, input_items, output_items):
+ output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid')
+ return len(output_items[0])
+
+class decim2x(gr.decim_block):
+ def __init__(self):
+ gr.decim_block.__init__(
+ self,
+ name = "decim2x",
+ in_sig = [numpy.float32],
+ out_sig = [numpy.float32],
+ decim = 2
+ )
+
+ def work(self, input_items, output_items):
+ output_items[0][:] = input_items[0][::2]
+ return len(output_items[0])
+
+class interp2x(gr.interp_block):
+ def __init__(self):
+ gr.interp_block.__init__(
+ self,
+ name = "interp2x",
+ in_sig = [numpy.float32],
+ out_sig = [numpy.float32],
+ interp = 2
+ )
+
+ def work(self, input_items, output_items):
+ output_items[0][1::2] = input_items[0]
+ output_items[0][::2] = input_items[0]
+ return len(output_items[0])
+
+class tag_source(gr.sync_block):
+ def __init__(self):
+ gr.sync_block.__init__(
+ self,
+ name = "tag source",
+ in_sig = None,
+ out_sig = [numpy.float32],
+ )
+
+ def work(self, input_items, output_items):
+ num_output_items = len(output_items[0])
+
+ #put code here to fill the output items...
+
+ #make a new tag on the middle element every time work is called
+ count = self.nitems_written(0) + num_output_items/2
+ key = pmt.string_to_symbol("example_key")
+ value = pmt.string_to_symbol("example_value")
+ self.add_item_tag(0, count, key, value)
+
+ return num_output_items
+
+class tag_sink(gr.sync_block):
+ def __init__(self):
+ gr.sync_block.__init__(
+ self,
+ name = "tag sink",
+ in_sig = [numpy.float32],
+ out_sig = None,
+ )
+ self.key = None
+
+ def work(self, input_items, output_items):
+ num_input_items = len(input_items[0])
+
+ #put code here to process the input items...
+
+ #print all the tags received in this work call
+ nread = self.nitems_read(0)
+ tags = self.get_tags_in_range(0, nread, nread+num_input_items)
+ for tag in tags:
+ #print tag.offset
+ #print pmt.symbol_to_string(tag.key)
+ #print pmt.symbol_to_string(tag.value)
+ self.key = pmt.symbol_to_string(tag.key)
+
+ return num_input_items
+
+class fc32_to_f32_2(gr.sync_block):
+ def __init__(self):
+ gr.sync_block.__init__(
+ self,
+ name = "fc32_to_f32_2",
+ in_sig = [numpy.complex64],
+ out_sig = [(numpy.float32, 2)],
+ )
+
+ def work(self, input_items, output_items):
+ output_items[0][::,0] = numpy.real(input_items[0])
+ output_items[0][::,1] = numpy.imag(input_items[0])
+ return len(output_items[0])
+
+class vector_to_stream(gr.interp_block):
+ def __init__(self, itemsize, nitems_per_block):
+ gr.interp_block.__init__(
+ self,
+ name = "vector_to_stream",
+ in_sig = [(itemsize, nitems_per_block)],
+ out_sig = [itemsize],
+ interp = nitems_per_block
+ )
+ self.block_size = nitems_per_block
+
+ def work(self, input_items, output_items):
+ n = 0
+ for i in xrange(len(input_items[0])):
+ for j in xrange(self.block_size):
+ output_items[0][n] = input_items[0][i][j]
+ n += 1
+
+ return len(output_items[0])
+
+class test_block_gateway(gr_unittest.TestCase):
+
+ def test_add_f32(self):
+ tb = gr.top_block()
+ src0 = blocks.vector_source_f([1, 3, 5, 7, 9], False)
+ src1 = blocks.vector_source_f([0, 2, 4, 6, 8], False)
+ adder = add_2_f32_1_f32()
+ sink = blocks.vector_sink_f()
+ tb.connect((src0, 0), (adder, 0))
+ tb.connect((src1, 0), (adder, 1))
+ tb.connect(adder, sink)
+ tb.run()
+ self.assertEqual(sink.data(), (1, 5, 9, 13, 17))
+
+ def test_add_fc32(self):
+ tb = gr.top_block()
+ src0 = blocks.vector_source_c([1, 3j, 5, 7j, 9], False)
+ src1 = blocks.vector_source_c([0, 2j, 4, 6j, 8], False)
+ adder = add_2_fc32_1_fc32()
+ sink = blocks.vector_sink_c()
+ tb.connect((src0, 0), (adder, 0))
+ tb.connect((src1, 0), (adder, 1))
+ tb.connect(adder, sink)
+ tb.run()
+ self.assertEqual(sink.data(), (1, 5j, 9, 13j, 17))
+
+ def test_convolve(self):
+ tb = gr.top_block()
+ src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False)
+ cv = convolve()
+ sink = blocks.vector_sink_f()
+ tb.connect(src, cv, sink)
+ tb.run()
+ self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8))
+
+ def test_decim2x(self):
+ tb = gr.top_block()
+ src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False)
+ d2x = decim2x()
+ sink = blocks.vector_sink_f()
+ tb.connect(src, d2x, sink)
+ tb.run()
+ self.assertEqual(sink.data(), (1, 3, 5, 7))
+
+ def test_interp2x(self):
+ tb = gr.top_block()
+ src = blocks.vector_source_f([1, 3, 5, 7, 9], False)
+ i2x = interp2x()
+ sink = blocks.vector_sink_f()
+ tb.connect(src, i2x, sink)
+ tb.run()
+ self.assertEqual(sink.data(), (1, 1, 3, 3, 5, 5, 7, 7, 9, 9))
+
+ def test_tags(self):
+ src = tag_source()
+ sink = tag_sink()
+ head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through
+ tb = gr.top_block()
+ tb.connect(src, head, sink)
+ tb.run()
+ self.assertEqual(sink.key, "example_key")
+
+ def test_fc32_to_f32_2(self):
+ tb = gr.top_block()
+ src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False)
+ convert = fc32_to_f32_2()
+ v2s = vector_to_stream(numpy.float32, 2)
+ sink = blocks.vector_sink_f()
+ tb.connect(src, convert, v2s, sink)
+ tb.run()
+ self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+
+if __name__ == '__main__':
+ gr_unittest.run(test_block_gateway, "test_block_gateway.xml")
+