diff options
author | Ben Reynwar <ben@reynwar.net> | 2013-06-04 10:59:29 -0700 |
---|---|---|
committer | Ben Reynwar <ben@reynwar.net> | 2013-06-04 11:55:00 -0700 |
commit | 48f3452530b29627ce57089b65363776dbe9a179 (patch) | |
tree | a2005c23b427415b9d86bcf256c49b3480f61427 /gr-blocks/python/qa_block_gateway.py | |
parent | ba0a9afedcd5451f2032004e9d583ed78d74ece2 (diff) |
uninstalled import: Updatings blocks, fec, uhd, and filter so that uninstalled import works with recent changes.
Diffstat (limited to 'gr-blocks/python/qa_block_gateway.py')
-rw-r--r-- | gr-blocks/python/qa_block_gateway.py | 256 |
1 files changed, 0 insertions, 256 deletions
diff --git a/gr-blocks/python/qa_block_gateway.py b/gr-blocks/python/qa_block_gateway.py deleted file mode 100644 index 20a2660cba..0000000000 --- a/gr-blocks/python/qa_block_gateway.py +++ /dev/null @@ -1,256 +0,0 @@ -# -# Copyright 2011-2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -from gnuradio import gr, gr_unittest -import pmt -import numpy -import blocks_swig as blocks - -class add_2_f32_1_f32(gr.sync_block): - def __init__(self): - gr.sync_block.__init__( - self, - name = "add 2 f32", - in_sig = [numpy.float32, numpy.float32], - out_sig = [numpy.float32], - ) - - def work(self, input_items, output_items): - output_items[0][:] = input_items[0] + input_items[1] - return len(output_items[0]) - -class add_2_fc32_1_fc32(gr.sync_block): - def __init__(self): - gr.sync_block.__init__( - self, - name = "add 2 fc32", - in_sig = [numpy.complex64, numpy.complex64], - out_sig = [numpy.complex64], - ) - - def work(self, input_items, output_items): - output_items[0][:] = input_items[0] + input_items[1] - return len(output_items[0]) - -class convolve(gr.sync_block): - """ - A demonstration using block history to properly perform a convolution. - """ - def __init__(self): - gr.sync_block.__init__( - self, - name = "convolve", - in_sig = [numpy.float32], - out_sig = [numpy.float32] - ) - self._taps = [1, 0, 0, 0] - self.set_history(len(self._taps)) - - def work(self, input_items, output_items): - output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid') - return len(output_items[0]) - -class decim2x(gr.decim_block): - def __init__(self): - gr.decim_block.__init__( - self, - name = "decim2x", - in_sig = [numpy.float32], - out_sig = [numpy.float32], - decim = 2 - ) - - def work(self, input_items, output_items): - output_items[0][:] = input_items[0][::2] - return len(output_items[0]) - -class interp2x(gr.interp_block): - def __init__(self): - gr.interp_block.__init__( - self, - name = "interp2x", - in_sig = [numpy.float32], - out_sig = [numpy.float32], - interp = 2 - ) - - def work(self, input_items, output_items): - output_items[0][1::2] = input_items[0] - output_items[0][::2] = input_items[0] - return len(output_items[0]) - -class tag_source(gr.sync_block): - def __init__(self): - gr.sync_block.__init__( - self, - name = "tag source", - in_sig = None, - out_sig = [numpy.float32], - ) - - def work(self, input_items, output_items): - num_output_items = len(output_items[0]) - - #put code here to fill the output items... - - #make a new tag on the middle element every time work is called - count = self.nitems_written(0) + num_output_items/2 - key = pmt.string_to_symbol("example_key") - value = pmt.string_to_symbol("example_value") - self.add_item_tag(0, count, key, value) - - return num_output_items - -class tag_sink(gr.sync_block): - def __init__(self): - gr.sync_block.__init__( - self, - name = "tag sink", - in_sig = [numpy.float32], - out_sig = None, - ) - self.key = None - - def work(self, input_items, output_items): - num_input_items = len(input_items[0]) - - #put code here to process the input items... - - #print all the tags received in this work call - nread = self.nitems_read(0) - tags = self.get_tags_in_range(0, nread, nread+num_input_items) - for tag in tags: - #print tag.offset - #print pmt.symbol_to_string(tag.key) - #print pmt.symbol_to_string(tag.value) - self.key = pmt.symbol_to_string(tag.key) - - return num_input_items - -class fc32_to_f32_2(gr.sync_block): - def __init__(self): - gr.sync_block.__init__( - self, - name = "fc32_to_f32_2", - in_sig = [numpy.complex64], - out_sig = [(numpy.float32, 2)], - ) - - def work(self, input_items, output_items): - output_items[0][::,0] = numpy.real(input_items[0]) - output_items[0][::,1] = numpy.imag(input_items[0]) - return len(output_items[0]) - -class vector_to_stream(gr.interp_block): - def __init__(self, itemsize, nitems_per_block): - gr.interp_block.__init__( - self, - name = "vector_to_stream", - in_sig = [(itemsize, nitems_per_block)], - out_sig = [itemsize], - interp = nitems_per_block - ) - self.block_size = nitems_per_block - - def work(self, input_items, output_items): - n = 0 - for i in xrange(len(input_items[0])): - for j in xrange(self.block_size): - output_items[0][n] = input_items[0][i][j] - n += 1 - - return len(output_items[0]) - -class test_block_gateway(gr_unittest.TestCase): - - def test_add_f32(self): - tb = gr.top_block() - src0 = blocks.vector_source_f([1, 3, 5, 7, 9], False) - src1 = blocks.vector_source_f([0, 2, 4, 6, 8], False) - adder = add_2_f32_1_f32() - sink = blocks.vector_sink_f() - tb.connect((src0, 0), (adder, 0)) - tb.connect((src1, 0), (adder, 1)) - tb.connect(adder, sink) - tb.run() - self.assertEqual(sink.data(), (1, 5, 9, 13, 17)) - - def test_add_fc32(self): - tb = gr.top_block() - src0 = blocks.vector_source_c([1, 3j, 5, 7j, 9], False) - src1 = blocks.vector_source_c([0, 2j, 4, 6j, 8], False) - adder = add_2_fc32_1_fc32() - sink = blocks.vector_sink_c() - tb.connect((src0, 0), (adder, 0)) - tb.connect((src1, 0), (adder, 1)) - tb.connect(adder, sink) - tb.run() - self.assertEqual(sink.data(), (1, 5j, 9, 13j, 17)) - - def test_convolve(self): - tb = gr.top_block() - src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False) - cv = convolve() - sink = blocks.vector_sink_f() - tb.connect(src, cv, sink) - tb.run() - self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8)) - - def test_decim2x(self): - tb = gr.top_block() - src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False) - d2x = decim2x() - sink = blocks.vector_sink_f() - tb.connect(src, d2x, sink) - tb.run() - self.assertEqual(sink.data(), (1, 3, 5, 7)) - - def test_interp2x(self): - tb = gr.top_block() - src = blocks.vector_source_f([1, 3, 5, 7, 9], False) - i2x = interp2x() - sink = blocks.vector_sink_f() - tb.connect(src, i2x, sink) - tb.run() - self.assertEqual(sink.data(), (1, 1, 3, 3, 5, 5, 7, 7, 9, 9)) - - def test_tags(self): - src = tag_source() - sink = tag_sink() - head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through - tb = gr.top_block() - tb.connect(src, head, sink) - tb.run() - self.assertEqual(sink.key, "example_key") - - def test_fc32_to_f32_2(self): - tb = gr.top_block() - src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False) - convert = fc32_to_f32_2() - v2s = vector_to_stream(numpy.float32, 2) - sink = blocks.vector_sink_f() - tb.connect(src, convert, v2s, sink) - tb.run() - self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) - -if __name__ == '__main__': - gr_unittest.run(test_block_gateway, "test_block_gateway.xml") - |