From d4f6b86a9bdea09c2c158b9982559a727f8c6a0b Mon Sep 17 00:00:00 2001
From: Tom Rondeau <trondeau@vt.edu>
Date: Sun, 17 Mar 2013 12:24:38 -0400
Subject: blocks: converting references to vector source/sink, null
 source/sink, nop, copy, head, skiphead, vector_map, and annotator blocks to
 use gr-blocks.

---
 gr-blocks/python/qa_block_gateway.py | 256 +++++++++++++++++++++++++++++++++++
 1 file changed, 256 insertions(+)
 create mode 100644 gr-blocks/python/qa_block_gateway.py

(limited to 'gr-blocks/python/qa_block_gateway.py')

diff --git a/gr-blocks/python/qa_block_gateway.py b/gr-blocks/python/qa_block_gateway.py
new file mode 100644
index 0000000000..20a2660cba
--- /dev/null
+++ b/gr-blocks/python/qa_block_gateway.py
@@ -0,0 +1,256 @@
+#
+# Copyright 2011-2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING.  If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import pmt
+import numpy
+import blocks_swig as blocks
+
+class add_2_f32_1_f32(gr.sync_block):
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "add 2 f32",
+            in_sig = [numpy.float32, numpy.float32],
+            out_sig = [numpy.float32],
+        )
+
+    def work(self, input_items, output_items):
+        output_items[0][:] = input_items[0] + input_items[1]
+        return len(output_items[0])
+
+class add_2_fc32_1_fc32(gr.sync_block):
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "add 2 fc32",
+            in_sig = [numpy.complex64, numpy.complex64],
+            out_sig = [numpy.complex64],
+        )
+
+    def work(self, input_items, output_items):
+        output_items[0][:] = input_items[0] + input_items[1]
+        return len(output_items[0])
+
+class convolve(gr.sync_block):
+    """
+    A demonstration using block history to properly perform a convolution.
+    """
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "convolve",
+            in_sig = [numpy.float32],
+            out_sig = [numpy.float32]
+        )
+        self._taps = [1, 0, 0, 0]
+        self.set_history(len(self._taps))
+
+    def work(self, input_items, output_items):
+        output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid')
+        return len(output_items[0])
+
+class decim2x(gr.decim_block):
+    def __init__(self):
+        gr.decim_block.__init__(
+            self,
+            name = "decim2x",
+            in_sig = [numpy.float32],
+            out_sig = [numpy.float32],
+            decim = 2
+        )
+
+    def work(self, input_items, output_items):
+        output_items[0][:] = input_items[0][::2]
+        return len(output_items[0])
+
+class interp2x(gr.interp_block):
+    def __init__(self):
+        gr.interp_block.__init__(
+            self,
+            name = "interp2x",
+            in_sig = [numpy.float32],
+            out_sig = [numpy.float32],
+            interp = 2
+        )
+
+    def work(self, input_items, output_items):
+        output_items[0][1::2] = input_items[0]
+        output_items[0][::2] = input_items[0]
+        return len(output_items[0])
+
+class tag_source(gr.sync_block):
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "tag source",
+            in_sig = None,
+            out_sig = [numpy.float32],
+        )
+
+    def work(self, input_items, output_items):
+        num_output_items = len(output_items[0])
+
+        #put code here to fill the output items...
+
+        #make a new tag on the middle element every time work is called
+        count = self.nitems_written(0) + num_output_items/2
+        key = pmt.string_to_symbol("example_key")
+        value = pmt.string_to_symbol("example_value")
+        self.add_item_tag(0, count, key, value)
+
+        return num_output_items
+
+class tag_sink(gr.sync_block):
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "tag sink",
+            in_sig = [numpy.float32],
+            out_sig = None,
+        )
+        self.key = None
+
+    def work(self, input_items, output_items):
+        num_input_items = len(input_items[0])
+
+        #put code here to process the input items...
+
+        #print all the tags received in this work call
+        nread = self.nitems_read(0)
+        tags = self.get_tags_in_range(0, nread, nread+num_input_items)
+        for tag in tags:
+            #print tag.offset
+            #print pmt.symbol_to_string(tag.key)
+            #print pmt.symbol_to_string(tag.value)
+            self.key = pmt.symbol_to_string(tag.key)
+
+        return num_input_items
+
+class fc32_to_f32_2(gr.sync_block):
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "fc32_to_f32_2",
+            in_sig = [numpy.complex64],
+            out_sig = [(numpy.float32, 2)],
+        )
+
+    def work(self, input_items, output_items):
+        output_items[0][::,0] = numpy.real(input_items[0])
+        output_items[0][::,1] = numpy.imag(input_items[0])
+        return len(output_items[0])
+
+class vector_to_stream(gr.interp_block):
+    def __init__(self, itemsize, nitems_per_block):
+        gr.interp_block.__init__(
+            self,
+            name = "vector_to_stream",
+            in_sig = [(itemsize, nitems_per_block)],
+            out_sig = [itemsize],
+            interp = nitems_per_block
+        )
+        self.block_size = nitems_per_block
+
+    def work(self, input_items, output_items):
+        n = 0
+        for i in xrange(len(input_items[0])):
+            for j in xrange(self.block_size):
+                output_items[0][n] = input_items[0][i][j]
+                n += 1
+      
+        return len(output_items[0])
+
+class test_block_gateway(gr_unittest.TestCase):
+
+    def test_add_f32(self):
+        tb = gr.top_block()
+        src0 = blocks.vector_source_f([1, 3, 5, 7, 9], False)
+        src1 = blocks.vector_source_f([0, 2, 4, 6, 8], False)
+        adder = add_2_f32_1_f32()
+        sink = blocks.vector_sink_f()
+        tb.connect((src0, 0), (adder, 0))
+        tb.connect((src1, 0), (adder, 1))
+        tb.connect(adder, sink)
+        tb.run()
+        self.assertEqual(sink.data(), (1, 5, 9, 13, 17))
+
+    def test_add_fc32(self):
+        tb = gr.top_block()
+        src0 = blocks.vector_source_c([1, 3j, 5, 7j, 9], False)
+        src1 = blocks.vector_source_c([0, 2j, 4, 6j, 8], False)
+        adder = add_2_fc32_1_fc32()
+        sink = blocks.vector_sink_c()
+        tb.connect((src0, 0), (adder, 0))
+        tb.connect((src1, 0), (adder, 1))
+        tb.connect(adder, sink)
+        tb.run()
+        self.assertEqual(sink.data(), (1, 5j, 9, 13j, 17))
+
+    def test_convolve(self):
+        tb = gr.top_block()
+        src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False)
+        cv = convolve()
+        sink = blocks.vector_sink_f()
+        tb.connect(src, cv, sink)
+        tb.run()
+        self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8))
+
+    def test_decim2x(self):
+        tb = gr.top_block()
+        src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False)
+        d2x = decim2x()
+        sink = blocks.vector_sink_f()
+        tb.connect(src, d2x, sink)
+        tb.run()
+        self.assertEqual(sink.data(), (1, 3, 5, 7))
+
+    def test_interp2x(self):
+        tb = gr.top_block()
+        src = blocks.vector_source_f([1, 3, 5, 7, 9], False)
+        i2x = interp2x()
+        sink = blocks.vector_sink_f()
+        tb.connect(src, i2x, sink)
+        tb.run()
+        self.assertEqual(sink.data(), (1, 1, 3, 3, 5, 5, 7, 7, 9, 9))
+
+    def test_tags(self):
+        src = tag_source()
+        sink = tag_sink()
+        head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through
+        tb = gr.top_block()
+        tb.connect(src, head, sink)
+        tb.run()
+        self.assertEqual(sink.key, "example_key")
+
+    def test_fc32_to_f32_2(self):
+        tb = gr.top_block()
+        src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False)
+        convert = fc32_to_f32_2()
+        v2s = vector_to_stream(numpy.float32, 2)
+        sink = blocks.vector_sink_f()
+        tb.connect(src, convert, v2s, sink)
+        tb.run()
+        self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
+
+if __name__ == '__main__':
+    gr_unittest.run(test_block_gateway, "test_block_gateway.xml")
+
-- 
cgit v1.2.3