summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-xgr-blocks/python/blocks/qa_add_mult_div_sub.py72
-rwxr-xr-xgr-blocks/python/blocks/qa_max.py118
-rw-r--r--gr-blocks/python/blocks/qa_min.py178
-rwxr-xr-xgr-blocks/python/blocks/qa_multiply_matrix_ff.py172
4 files changed, 538 insertions, 2 deletions
diff --git a/gr-blocks/python/blocks/qa_add_mult_div_sub.py b/gr-blocks/python/blocks/qa_add_mult_div_sub.py
index 7c9b2beac0..8699b3a085 100755
--- a/gr-blocks/python/blocks/qa_add_mult_div_sub.py
+++ b/gr-blocks/python/blocks/qa_add_mult_div_sub.py
@@ -200,6 +200,78 @@ class test_add_mult_div_sub(gr_unittest.TestCase):
self.help_ii((src1_data, src2_data),
expected_result, op)
+ def test_sub_ii1(self):
+ src1_data = (1, 2, 3, 4, 5)
+ expected_result = (1, 2, 3, 4, 5)
+
+ src = blocks.vector_source_i(src1_data)
+ op = blocks.sub_ii()
+ dst = blocks.vector_sink_i()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def test_sub_ss(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (-7, 5, -1, -4, 3)
+ op = blocks.sub_ss()
+ self.help_ss((src1_data, src2_data),
+ expected_result, op)
+
+ def test_sub_ss1(self):
+ src1_data = (1, 2, 3, 4, 5)
+ expected_result = (1, 2, 3, 4, 5)
+
+ src = blocks.vector_source_s(src1_data)
+ op = blocks.sub_ss()
+ dst = blocks.vector_sink_s()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def test_sub_ff(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (-7, 5, -1, -4, 3)
+ op = blocks.sub_ff()
+ self.help_ff((src1_data, src2_data),
+ expected_result, op)
+
+ def test_sub_ff1(self):
+ src1_data = (1, 2, 3, 4, 5)
+ expected_result = (1, 2, 3, 4, 5)
+
+ src = blocks.vector_source_f(src1_data)
+ op = blocks.sub_ff()
+ dst = blocks.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def test_sub_cc(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (-7, 5, -1, -4, 3)
+ op = blocks.sub_cc()
+ self.help_cc((src1_data, src2_data),
+ expected_result, op)
+
+ def test_sub_cc1(self):
+ src1_data = (1, 2, 3, 4, 5)
+ expected_result = (1, 2, 3, 4, 5)
+
+ src = blocks.vector_source_c(src1_data)
+ op = blocks.sub_cc()
+ dst = blocks.vector_sink_c()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
def test_div_ff(self):
src1_data = ( 5, 9, -15, 1024)
src2_data = (10, 3, -5, 64)
diff --git a/gr-blocks/python/blocks/qa_max.py b/gr-blocks/python/blocks/qa_max.py
index 441582ac88..709dbee72e 100755
--- a/gr-blocks/python/blocks/qa_max.py
+++ b/gr-blocks/python/blocks/qa_max.py
@@ -46,7 +46,7 @@ class test_max(gr_unittest.TestCase):
result_data = dst.data()
self.assertEqual(expected_result, result_data)
- def test_002(self):
+ def stest_002(self):
src_data=(-100,-99,-98,-97,-96,-1)
expected_result = (float(max(src_data)),)
@@ -60,6 +60,120 @@ class test_max(gr_unittest.TestCase):
result_data = dst.data()
self.assertEqual(expected_result, result_data)
+ def stest_003(self):
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_result = [float(max(x,y)) for x,y in zip(src_data0, src_data1)]
+
+ src0 = blocks.vector_source_f(src_data0)
+ src1 = blocks.vector_source_f(src_data1)
+ op = blocks.max_ff(1)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src0, (op, 0))
+ self.tb.connect(src1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_004(self):
+ dim = 2
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_data = []
+ tmp = [float(max(x,y)) for x,y in zip(src_data0, src_data1)]
+ for i in xrange(len(tmp)/dim):
+ expected_data.append(float(max(tmp[i*dim:(i+1)*dim])))
+
+ src0 = blocks.vector_source_f(src_data0)
+ s2v0 = blocks.stream_to_vector(gr.sizeof_float,dim)
+ src1 = blocks.vector_source_f(src_data1)
+ s2v1 = blocks.stream_to_vector(gr.sizeof_float,dim)
+ op = blocks.max_ff(dim)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src0, s2v0, (op, 0))
+ self.tb.connect(src1, s2v1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+
+ def stest_s001(self):
+ src_data = (0, 2, -3, 0, 12, 0)
+ expected_result = (max(src_data),)
+
+ src = blocks.vector_source_s(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_short,len(src_data))
+ op = blocks.max_ss(len(src_data))
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_s002(self):
+ src_data=(-100,-99,-98,-97,-96,-1)
+ expected_result = (max(src_data),)
+
+ src = blocks.vector_source_s(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data))
+ op = blocks.max_ss(len(src_data))
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+
+ def stest_s003(self):
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_result = [max(x,y) for x,y in zip(src_data0, src_data1)]
+
+ src0 = blocks.vector_source_s(src_data0)
+ src1 = blocks.vector_source_s(src_data1)
+ op = blocks.max_ss(1)
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src0, (op, 0))
+ self.tb.connect(src1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_s004(self):
+ dim = 2
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_data = []
+ tmp = [max(x,y) for x,y in zip(src_data0, src_data1)]
+ for i in xrange(len(tmp)/dim):
+ expected_data.append(max(tmp[i*dim:(i+1)*dim]))
+
+ src0 = blocks.vector_source_s(src_data0)
+ s2v0 = blocks.stream_to_vector(gr.sizeof_short,dim)
+ src1 = blocks.vector_source_s(src_data1)
+ s2v1 = blocks.stream_to_vector(gr.sizeof_short,dim)
+ op = blocks.max_ss(dim)
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src0, s2v0, (op, 0))
+ self.tb.connect(src1, s2v1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+
if __name__ == '__main__':
gr_unittest.run(test_max, "test_max.xml")
-
diff --git a/gr-blocks/python/blocks/qa_min.py b/gr-blocks/python/blocks/qa_min.py
new file mode 100644
index 0000000000..642782a1a1
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_min.py
@@ -0,0 +1,178 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest, blocks
+
+import math
+
+class test_min(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001(self):
+ src_data = (0, 0.2, -0.25, 0, 12, 0)
+ expected_result = (float(min(src_data)),)
+
+ src = blocks.vector_source_f(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data))
+ op = blocks.min_ff(len(src_data))
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_002(self):
+ src_data=(-100,-99,-98,-97,-96,-1)
+ expected_result = (float(min(src_data)),)
+
+ src = blocks.vector_source_f(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data))
+ op = blocks.min_ff(len(src_data))
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_003(self):
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_result = [float(min(x,y)) for x,y in zip(src_data0, src_data1)]
+
+ src0 = blocks.vector_source_f(src_data0)
+ src1 = blocks.vector_source_f(src_data1)
+ op = blocks.min_ff(1)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src0, (op, 0))
+ self.tb.connect(src1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_004(self):
+ dim = 2
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_data = []
+ tmp = [float(min(x,y)) for x,y in zip(src_data0, src_data1)]
+ for i in xrange(len(tmp)/dim):
+ expected_data.append(float(min(tmp[i*dim:(i+1)*dim])))
+
+ src0 = blocks.vector_source_f(src_data0)
+ s2v0 = blocks.stream_to_vector(gr.sizeof_float,dim)
+ src1 = blocks.vector_source_f(src_data1)
+ s2v1 = blocks.stream_to_vector(gr.sizeof_float,dim)
+ op = blocks.min_ff(dim)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src0, s2v0, (op, 0))
+ self.tb.connect(src1, s2v1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+
+ def stest_s001(self):
+ src_data = (0, 2, -3, 0, 12, 0)
+ expected_result = (min(src_data),)
+
+ src = blocks.vector_source_s(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_short,len(src_data))
+ op = blocks.min_ss(len(src_data))
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_s002(self):
+ src_data=(-100,-99,-98,-97,-96,-1)
+ expected_result = (min(src_data),)
+
+ src = blocks.vector_source_s(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_short, len(src_data))
+ op = blocks.min_ss(len(src_data))
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+
+ def stest_s003(self):
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_result = [min(x,y) for x,y in zip(src_data0, src_data1)]
+
+ src0 = blocks.vector_source_s(src_data0)
+ src1 = blocks.vector_source_s(src_data1)
+ op = blocks.min_ss(1)
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src0, (op, 0))
+ self.tb.connect(src1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def stest_s004(self):
+ dim = 2
+ src_data0 = (0, 2, -3, 0, 12, 0)
+ src_data1 = (1, 1, 1, 1, 1, 1)
+
+ expected_data = []
+ tmp = [min(x,y) for x,y in zip(src_data0, src_data1)]
+ for i in xrange(len(tmp)/dim):
+ expected_data.append(min(tmp[i*dim:(i+1)*dim]))
+
+ src0 = blocks.vector_source_s(src_data0)
+ s2v0 = blocks.stream_to_vector(gr.sizeof_short,dim)
+ src1 = blocks.vector_source_s(src_data1)
+ s2v1 = blocks.stream_to_vector(gr.sizeof_short,dim)
+ op = blocks.min_ss(dim)
+ dst = blocks.vector_sink_s()
+
+ self.tb.connect(src0, s2v0, (op, 0))
+ self.tb.connect(src1, s2v1, (op, 1))
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_min, "test_min.xml")
diff --git a/gr-blocks/python/blocks/qa_multiply_matrix_ff.py b/gr-blocks/python/blocks/qa_multiply_matrix_ff.py
new file mode 100755
index 0000000000..fdf020552a
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_multiply_matrix_ff.py
@@ -0,0 +1,172 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import time
+import numpy
+import os
+import pmt
+from gnuradio import gr, gr_unittest
+from gnuradio import blocks
+
+class test_multiply_matrix_ff (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+ self.multiplier = None
+
+ def tearDown (self):
+ self.tb = None
+ self.multiplier = None
+
+ def run_once(self, X_in, A, tpp=gr.TPP_DONT, A2=None, tags=None, msg_A=None):
+ """ Run the test for given input-, output- and matrix values.
+ Every row from X_in is considered an input signal on a port. """
+ X_in = numpy.matrix(X_in)
+ A_matrix = numpy.matrix(A)
+ (N, M) = A_matrix.shape
+ self.assertTrue(N == X_in.shape[0])
+ # Calc expected
+ Y_out_exp = numpy.matrix(numpy.zeros((M, X_in.shape[1])))
+ self.multiplier = blocks.multiply_matrix_ff(A, tpp)
+ if A2 is not None:
+ self.multiplier.set_A(A2)
+ A = A2
+ A_matrix = numpy.matrix(A)
+ for i in xrange(N):
+ if tags is None:
+ these_tags = ()
+ else:
+ these_tags = (tags[i],)
+ self.tb.connect(blocks.vector_source_f(X_in[i].tolist()[0], tags=these_tags), (self.multiplier, i))
+ sinks = []
+ for i in xrange(M):
+ sinks.append(blocks.vector_sink_f())
+ self.tb.connect((self.multiplier, i), sinks[i])
+ # Run and check
+ self.tb.run()
+ for i in xrange(X_in.shape[1]):
+ Y_out_exp[:,i] = A_matrix * X_in[:,i]
+ Y_out = [list(x.data()) for x in sinks]
+ if tags is not None:
+ self.the_tags = []
+ for i in xrange(M):
+ self.the_tags.append(sinks[i].tags())
+ self.assertEqual(list(Y_out), Y_out_exp.tolist())
+
+
+ def test_001_t (self):
+ """ Simplest possible check: N==M, unit matrix """
+ X_in = (
+ (1, 2, 3, 4),
+ (5, 6, 7, 8),
+ )
+ A = (
+ (1, 0),
+ (0, 1),
+ )
+ self.run_once(X_in, A)
+
+ def test_002_t (self):
+ """ Switch check: N==M, flipped unit matrix """
+ X_in = (
+ (1, 2, 3, 4),
+ (5, 6, 7, 8),
+ )
+ A = (
+ (0, 1),
+ (1, 0),
+ )
+ self.run_once(X_in, A)
+
+ def test_003_t (self):
+ """ Average """
+ X_in = (
+ (1, 1, 1, 1),
+ (2, 2, 2, 2),
+ )
+ A = (
+ (0.5, 0.5),
+ (0.5, 0.5),
+ )
+ self.run_once(X_in, A)
+
+ def test_004_t (self):
+ """ Set """
+ X_in = (
+ (1, 2, 3, 4),
+ (5, 6, 7, 8),
+ )
+ A1 = (
+ (1, 0),
+ (0, 1),
+ )
+ A2 = (
+ (0, 1),
+ (1, 0),
+ )
+ self.run_once(X_in, A1, A2=A2)
+
+ def test_005_t (self):
+ """ Tags """
+ X_in = (
+ (1, 2, 3, 4),
+ (5, 6, 7, 8),
+ )
+ A = (
+ (0, 1), # Flip them round
+ (1, 0),
+ )
+ tag1 = gr.tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.intern("in1")
+ tag1.value = pmt.PMT_T
+ tag2 = gr.tag_t()
+ tag2.offset = 0
+ tag2.key = pmt.intern("in2")
+ tag2.value = pmt.PMT_T
+ self.run_once(X_in, A, tpp=999, tags=(tag1, tag2))
+ self.assertTrue(pmt.equal(tag1.key, self.the_tags[1][0].key))
+ self.assertTrue(pmt.equal(tag2.key, self.the_tags[0][0].key))
+
+ #def test_006_t (self):
+ #""" Message passing """
+ #X_in = (
+ #(1, 2, 3, 4),
+ #(5, 6, 7, 8),
+ #)
+ #A1 = (
+ #(1, 0),
+ #(0, 1),
+ #)
+ #msg_A = (
+ #(0, 1),
+ #(1, 0),
+ #)
+ #self.run_once(X_in, A1, msg_A=msg_A)
+
+
+
+if __name__ == '__main__':
+ #gr_unittest.run(test_multiply_matrix_ff, "test_multiply_matrix_ff.xml")
+ gr_unittest.run(test_multiply_matrix_ff)
+