# # Copyright 2011-2013 Free Software Foundation, Inc. # # This file is part of GNU Radio # # GNU Radio is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # GNU Radio is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # from gnuradio import gr, gr_unittest import pmt import numpy import blocks_swig as blocks class add_2_f32_1_f32(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, name = "add 2 f32", in_sig = [numpy.float32, numpy.float32], out_sig = [numpy.float32], ) def work(self, input_items, output_items): output_items[0][:] = input_items[0] + input_items[1] return len(output_items[0]) class add_2_fc32_1_fc32(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, name = "add 2 fc32", in_sig = [numpy.complex64, numpy.complex64], out_sig = [numpy.complex64], ) def work(self, input_items, output_items): output_items[0][:] = input_items[0] + input_items[1] return len(output_items[0]) class convolve(gr.sync_block): """ A demonstration using block history to properly perform a convolution. """ def __init__(self): gr.sync_block.__init__( self, name = "convolve", in_sig = [numpy.float32], out_sig = [numpy.float32] ) self._taps = [1, 0, 0, 0] self.set_history(len(self._taps)) def work(self, input_items, output_items): output_items[0][:] = numpy.convolve(input_items[0], self._taps, mode='valid') return len(output_items[0]) class decim2x(gr.decim_block): def __init__(self): gr.decim_block.__init__( self, name = "decim2x", in_sig = [numpy.float32], out_sig = [numpy.float32], decim = 2 ) def work(self, input_items, output_items): output_items[0][:] = input_items[0][::2] return len(output_items[0]) class interp2x(gr.interp_block): def __init__(self): gr.interp_block.__init__( self, name = "interp2x", in_sig = [numpy.float32], out_sig = [numpy.float32], interp = 2 ) def work(self, input_items, output_items): output_items[0][1::2] = input_items[0] output_items[0][::2] = input_items[0] return len(output_items[0]) class tag_source(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, name = "tag source", in_sig = None, out_sig = [numpy.float32], ) def work(self, input_items, output_items): num_output_items = len(output_items[0]) #put code here to fill the output items... #make a new tag on the middle element every time work is called count = self.nitems_written(0) + num_output_items/2 key = pmt.string_to_symbol("example_key") value = pmt.string_to_symbol("example_value") self.add_item_tag(0, count, key, value) return num_output_items class tag_sink(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, name = "tag sink", in_sig = [numpy.float32], out_sig = None, ) self.key = None def work(self, input_items, output_items): num_input_items = len(input_items[0]) #put code here to process the input items... #print all the tags received in this work call nread = self.nitems_read(0) tags = self.get_tags_in_range(0, nread, nread+num_input_items) for tag in tags: #print tag.offset #print pmt.symbol_to_string(tag.key) #print pmt.symbol_to_string(tag.value) self.key = pmt.symbol_to_string(tag.key) return num_input_items class fc32_to_f32_2(gr.sync_block): def __init__(self): gr.sync_block.__init__( self, name = "fc32_to_f32_2", in_sig = [numpy.complex64], out_sig = [(numpy.float32, 2)], ) def work(self, input_items, output_items): output_items[0][::,0] = numpy.real(input_items[0]) output_items[0][::,1] = numpy.imag(input_items[0]) return len(output_items[0]) class vector_to_stream(gr.interp_block): def __init__(self, itemsize, nitems_per_block): gr.interp_block.__init__( self, name = "vector_to_stream", in_sig = [(itemsize, nitems_per_block)], out_sig = [itemsize], interp = nitems_per_block ) self.block_size = nitems_per_block def work(self, input_items, output_items): n = 0 for i in xrange(len(input_items[0])): for j in xrange(self.block_size): output_items[0][n] = input_items[0][i][j] n += 1 return len(output_items[0]) class test_block_gateway(gr_unittest.TestCase): def test_add_f32(self): tb = gr.top_block() src0 = blocks.vector_source_f([1, 3, 5, 7, 9], False) src1 = blocks.vector_source_f([0, 2, 4, 6, 8], False) adder = add_2_f32_1_f32() sink = blocks.vector_sink_f() tb.connect((src0, 0), (adder, 0)) tb.connect((src1, 0), (adder, 1)) tb.connect(adder, sink) tb.run() self.assertEqual(sink.data(), (1, 5, 9, 13, 17)) def test_add_fc32(self): tb = gr.top_block() src0 = blocks.vector_source_c([1, 3j, 5, 7j, 9], False) src1 = blocks.vector_source_c([0, 2j, 4, 6j, 8], False) adder = add_2_fc32_1_fc32() sink = blocks.vector_sink_c() tb.connect((src0, 0), (adder, 0)) tb.connect((src1, 0), (adder, 1)) tb.connect(adder, sink) tb.run() self.assertEqual(sink.data(), (1, 5j, 9, 13j, 17)) def test_convolve(self): tb = gr.top_block() src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False) cv = convolve() sink = blocks.vector_sink_f() tb.connect(src, cv, sink) tb.run() self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8)) def test_decim2x(self): tb = gr.top_block() src = blocks.vector_source_f([1, 2, 3, 4, 5, 6, 7, 8], False) d2x = decim2x() sink = blocks.vector_sink_f() tb.connect(src, d2x, sink) tb.run() self.assertEqual(sink.data(), (1, 3, 5, 7)) def test_interp2x(self): tb = gr.top_block() src = blocks.vector_source_f([1, 3, 5, 7, 9], False) i2x = interp2x() sink = blocks.vector_sink_f() tb.connect(src, i2x, sink) tb.run() self.assertEqual(sink.data(), (1, 1, 3, 3, 5, 5, 7, 7, 9, 9)) def test_tags(self): src = tag_source() sink = tag_sink() head = blocks.head(gr.sizeof_float, 50000) #should be enough items to get a tag through tb = gr.top_block() tb.connect(src, head, sink) tb.run() self.assertEqual(sink.key, "example_key") def test_fc32_to_f32_2(self): tb = gr.top_block() src = blocks.vector_source_c([1+2j, 3+4j, 5+6j, 7+8j, 9+10j], False) convert = fc32_to_f32_2() v2s = vector_to_stream(numpy.float32, 2) sink = blocks.vector_sink_f() tb.connect(src, convert, v2s, sink) tb.run() self.assertEqual(sink.data(), (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) if __name__ == '__main__': gr_unittest.run(test_block_gateway, "test_block_gateway.xml")