diff options
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-x | gr-blocks/python/blocks/qa_add_mult_div_sub.py | 72 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_max.py | 118 | ||||
-rw-r--r-- | gr-blocks/python/blocks/qa_min.py | 178 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_multiply_matrix_ff.py | 172 |
4 files changed, 538 insertions, 2 deletions
diff --git a/gr-blocks/python/blocks/qa_add_mult_div_sub.py b/gr-blocks/python/blocks/qa_add_mult_div_sub.py index 7c9b2beac0..8699b3a085 100755 --- a/gr-blocks/python/blocks/qa_add_mult_div_sub.py +++ b/gr-blocks/python/blocks/qa_add_mult_div_sub.py @@ -200,6 +200,78 @@ class test_add_mult_div_sub(gr_unittest.TestCase): self.help_ii((src1_data, src2_data), expected_result, op) + def test_sub_ii1(self): + src1_data = (1, 2, 3, 4, 5) + expected_result = (1, 2, 3, 4, 5) + + src = blocks.vector_source_i(src1_data) + op = blocks.sub_ii() + dst = blocks.vector_sink_i() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_sub_ss(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (-7, 5, -1, -4, 3) + op = blocks.sub_ss() + self.help_ss((src1_data, src2_data), + expected_result, op) + + def test_sub_ss1(self): + src1_data = (1, 2, 3, 4, 5) + expected_result = (1, 2, 3, 4, 5) + + src = blocks.vector_source_s(src1_data) + op = blocks.sub_ss() + dst = blocks.vector_sink_s() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_sub_ff(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (-7, 5, -1, -4, 3) + op = blocks.sub_ff() + self.help_ff((src1_data, src2_data), + expected_result, op) + + def test_sub_ff1(self): + src1_data = (1, 2, 3, 4, 5) + expected_result = (1, 2, 3, 4, 5) + + src = blocks.vector_source_f(src1_data) + op = blocks.sub_ff() + dst = blocks.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_sub_cc(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (-7, 5, -1, -4, 3) + op = blocks.sub_cc() + self.help_cc((src1_data, src2_data), + expected_result, op) + + def test_sub_cc1(self): + src1_data = (1, 2, 3, 4, 5) + expected_result = (1, 2, 3, 4, 5) + + src = blocks.vector_source_c(src1_data) + op = blocks.sub_cc() + dst = blocks.vector_sink_c() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + def test_div_ff(self): src1_data = ( 5, 9, -15, 1024) src2_data = (10, 3, -5, 64) diff --git a/gr-blocks/python/blocks/qa_max.py b/gr-blocks/python/blocks/qa_max.py index 441582ac88..709dbee72e 100755 --- a/gr-blocks/python/blocks/qa_max.py +++ b/gr-blocks/python/blocks/qa_max.py @@ -46,7 +46,7 @@ class test_max(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) - def test_002(self): + def stest_002(self): src_data=(-100,-99,-98,-97,-96,-1) expected_result = (float(max(src_data)),) @@ -60,6 +60,120 @@ class test_max(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) + def stest_003(self): + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_result = [float(max(x,y)) for x,y in zip(src_data0, src_data1)] + + src0 = blocks.vector_source_f(src_data0) + src1 = blocks.vector_source_f(src_data1) + op = blocks.max_ff(1) + dst = blocks.vector_sink_f() + + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_004(self): + dim = 2 + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_data = [] + tmp = [float(max(x,y)) for x,y in zip(src_data0, src_data1)] + for i in xrange(len(tmp)/dim): + expected_data.append(float(max(tmp[i*dim:(i+1)*dim]))) + + src0 = blocks.vector_source_f(src_data0) + s2v0 = blocks.stream_to_vector(gr.sizeof_float,dim) + src1 = blocks.vector_source_f(src_data1) + s2v1 = blocks.stream_to_vector(gr.sizeof_float,dim) + op = blocks.max_ff(dim) + dst = blocks.vector_sink_f() + + self.tb.connect(src0, s2v0, (op, 0)) + self.tb.connect(src1, s2v1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + + def stest_s001(self): + src_data = (0, 2, -3, 0, 12, 0) + expected_result = (max(src_data),) + + src = blocks.vector_source_s(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_short,len(src_data)) + op = blocks.max_ss(len(src_data)) + dst = blocks.vector_sink_s() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_s002(self): + src_data=(-100,-99,-98,-97,-96,-1) + expected_result = (max(src_data),) + + src = blocks.vector_source_s(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data)) + op = blocks.max_ss(len(src_data)) + dst = blocks.vector_sink_s() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + + def stest_s003(self): + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_result = [max(x,y) for x,y in zip(src_data0, src_data1)] + + src0 = blocks.vector_source_s(src_data0) + src1 = blocks.vector_source_s(src_data1) + op = blocks.max_ss(1) + dst = blocks.vector_sink_s() + + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_s004(self): + dim = 2 + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_data = [] + tmp = [max(x,y) for x,y in zip(src_data0, src_data1)] + for i in xrange(len(tmp)/dim): + expected_data.append(max(tmp[i*dim:(i+1)*dim])) + + src0 = blocks.vector_source_s(src_data0) + s2v0 = blocks.stream_to_vector(gr.sizeof_short,dim) + src1 = blocks.vector_source_s(src_data1) + s2v1 = blocks.stream_to_vector(gr.sizeof_short,dim) + op = blocks.max_ss(dim) + dst = blocks.vector_sink_s() + + self.tb.connect(src0, s2v0, (op, 0)) + self.tb.connect(src1, s2v1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + if __name__ == '__main__': gr_unittest.run(test_max, "test_max.xml") - diff --git a/gr-blocks/python/blocks/qa_min.py b/gr-blocks/python/blocks/qa_min.py new file mode 100644 index 0000000000..642782a1a1 --- /dev/null +++ b/gr-blocks/python/blocks/qa_min.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks + +import math + +class test_min(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001(self): + src_data = (0, 0.2, -0.25, 0, 12, 0) + expected_result = (float(min(src_data)),) + + src = blocks.vector_source_f(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) + op = blocks.min_ff(len(src_data)) + dst = blocks.vector_sink_f() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_002(self): + src_data=(-100,-99,-98,-97,-96,-1) + expected_result = (float(min(src_data)),) + + src = blocks.vector_source_f(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data)) + op = blocks.min_ff(len(src_data)) + dst = blocks.vector_sink_f() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_003(self): + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_result = [float(min(x,y)) for x,y in zip(src_data0, src_data1)] + + src0 = blocks.vector_source_f(src_data0) + src1 = blocks.vector_source_f(src_data1) + op = blocks.min_ff(1) + dst = blocks.vector_sink_f() + + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_004(self): + dim = 2 + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_data = [] + tmp = [float(min(x,y)) for x,y in zip(src_data0, src_data1)] + for i in xrange(len(tmp)/dim): + expected_data.append(float(min(tmp[i*dim:(i+1)*dim]))) + + src0 = blocks.vector_source_f(src_data0) + s2v0 = blocks.stream_to_vector(gr.sizeof_float,dim) + src1 = blocks.vector_source_f(src_data1) + s2v1 = blocks.stream_to_vector(gr.sizeof_float,dim) + op = blocks.min_ff(dim) + dst = blocks.vector_sink_f() + + self.tb.connect(src0, s2v0, (op, 0)) + self.tb.connect(src1, s2v1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + + def stest_s001(self): + src_data = (0, 2, -3, 0, 12, 0) + expected_result = (min(src_data),) + + src = blocks.vector_source_s(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_short,len(src_data)) + op = blocks.min_ss(len(src_data)) + dst = blocks.vector_sink_s() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_s002(self): + src_data=(-100,-99,-98,-97,-96,-1) + expected_result = (min(src_data),) + + src = blocks.vector_source_s(src_data) + s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data)) + op = blocks.min_ss(len(src_data)) + dst = blocks.vector_sink_s() + + self.tb.connect(src, s2v, op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + + def stest_s003(self): + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_result = [min(x,y) for x,y in zip(src_data0, src_data1)] + + src0 = blocks.vector_source_s(src_data0) + src1 = blocks.vector_source_s(src_data1) + op = blocks.min_ss(1) + dst = blocks.vector_sink_s() + + self.tb.connect(src0, (op, 0)) + self.tb.connect(src1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def stest_s004(self): + dim = 2 + src_data0 = (0, 2, -3, 0, 12, 0) + src_data1 = (1, 1, 1, 1, 1, 1) + + expected_data = [] + tmp = [min(x,y) for x,y in zip(src_data0, src_data1)] + for i in xrange(len(tmp)/dim): + expected_data.append(min(tmp[i*dim:(i+1)*dim])) + + src0 = blocks.vector_source_s(src_data0) + s2v0 = blocks.stream_to_vector(gr.sizeof_short,dim) + src1 = blocks.vector_source_s(src_data1) + s2v1 = blocks.stream_to_vector(gr.sizeof_short,dim) + op = blocks.min_ss(dim) + dst = blocks.vector_sink_s() + + self.tb.connect(src0, s2v0, (op, 0)) + self.tb.connect(src1, s2v1, (op, 1)) + self.tb.connect(op, dst) + self.tb.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + +if __name__ == '__main__': + gr_unittest.run(test_min, "test_min.xml") diff --git a/gr-blocks/python/blocks/qa_multiply_matrix_ff.py b/gr-blocks/python/blocks/qa_multiply_matrix_ff.py new file mode 100755 index 0000000000..fdf020552a --- /dev/null +++ b/gr-blocks/python/blocks/qa_multiply_matrix_ff.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import time +import numpy +import os +import pmt +from gnuradio import gr, gr_unittest +from gnuradio import blocks + +class test_multiply_matrix_ff (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + self.multiplier = None + + def tearDown (self): + self.tb = None + self.multiplier = None + + def run_once(self, X_in, A, tpp=gr.TPP_DONT, A2=None, tags=None, msg_A=None): + """ Run the test for given input-, output- and matrix values. + Every row from X_in is considered an input signal on a port. """ + X_in = numpy.matrix(X_in) + A_matrix = numpy.matrix(A) + (N, M) = A_matrix.shape + self.assertTrue(N == X_in.shape[0]) + # Calc expected + Y_out_exp = numpy.matrix(numpy.zeros((M, X_in.shape[1]))) + self.multiplier = blocks.multiply_matrix_ff(A, tpp) + if A2 is not None: + self.multiplier.set_A(A2) + A = A2 + A_matrix = numpy.matrix(A) + for i in xrange(N): + if tags is None: + these_tags = () + else: + these_tags = (tags[i],) + self.tb.connect(blocks.vector_source_f(X_in[i].tolist()[0], tags=these_tags), (self.multiplier, i)) + sinks = [] + for i in xrange(M): + sinks.append(blocks.vector_sink_f()) + self.tb.connect((self.multiplier, i), sinks[i]) + # Run and check + self.tb.run() + for i in xrange(X_in.shape[1]): + Y_out_exp[:,i] = A_matrix * X_in[:,i] + Y_out = [list(x.data()) for x in sinks] + if tags is not None: + self.the_tags = [] + for i in xrange(M): + self.the_tags.append(sinks[i].tags()) + self.assertEqual(list(Y_out), Y_out_exp.tolist()) + + + def test_001_t (self): + """ Simplest possible check: N==M, unit matrix """ + X_in = ( + (1, 2, 3, 4), + (5, 6, 7, 8), + ) + A = ( + (1, 0), + (0, 1), + ) + self.run_once(X_in, A) + + def test_002_t (self): + """ Switch check: N==M, flipped unit matrix """ + X_in = ( + (1, 2, 3, 4), + (5, 6, 7, 8), + ) + A = ( + (0, 1), + (1, 0), + ) + self.run_once(X_in, A) + + def test_003_t (self): + """ Average """ + X_in = ( + (1, 1, 1, 1), + (2, 2, 2, 2), + ) + A = ( + (0.5, 0.5), + (0.5, 0.5), + ) + self.run_once(X_in, A) + + def test_004_t (self): + """ Set """ + X_in = ( + (1, 2, 3, 4), + (5, 6, 7, 8), + ) + A1 = ( + (1, 0), + (0, 1), + ) + A2 = ( + (0, 1), + (1, 0), + ) + self.run_once(X_in, A1, A2=A2) + + def test_005_t (self): + """ Tags """ + X_in = ( + (1, 2, 3, 4), + (5, 6, 7, 8), + ) + A = ( + (0, 1), # Flip them round + (1, 0), + ) + tag1 = gr.tag_t() + tag1.offset = 0 + tag1.key = pmt.intern("in1") + tag1.value = pmt.PMT_T + tag2 = gr.tag_t() + tag2.offset = 0 + tag2.key = pmt.intern("in2") + tag2.value = pmt.PMT_T + self.run_once(X_in, A, tpp=999, tags=(tag1, tag2)) + self.assertTrue(pmt.equal(tag1.key, self.the_tags[1][0].key)) + self.assertTrue(pmt.equal(tag2.key, self.the_tags[0][0].key)) + + #def test_006_t (self): + #""" Message passing """ + #X_in = ( + #(1, 2, 3, 4), + #(5, 6, 7, 8), + #) + #A1 = ( + #(1, 0), + #(0, 1), + #) + #msg_A = ( + #(0, 1), + #(1, 0), + #) + #self.run_once(X_in, A1, msg_A=msg_A) + + + +if __name__ == '__main__': + #gr_unittest.run(test_multiply_matrix_ff, "test_multiply_matrix_ff.xml") + gr_unittest.run(test_multiply_matrix_ff) + |