diff options
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-x | gr-blocks/python/qa_message.py | 149 | ||||
-rw-r--r-- | gr-blocks/python/qa_moving_average.py | 91 | ||||
-rw-r--r-- | gr-blocks/python/qa_peak_detector.py | 98 | ||||
-rw-r--r-- | gr-blocks/python/qa_peak_detector2.py | 1 |
4 files changed, 338 insertions, 1 deletions
diff --git a/gr-blocks/python/qa_message.py b/gr-blocks/python/qa_message.py new file mode 100755 index 0000000000..551fdd6259 --- /dev/null +++ b/gr-blocks/python/qa_message.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# +# Copyright 2004,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import pmt +import time + +def all_counts(): + return (gr.block_ncurrently_allocated(), + gr.block_detail_ncurrently_allocated(), + gr.buffer_ncurrently_allocated(), + gr.buffer_reader_ncurrently_allocated(), + gr.message_ncurrently_allocated()) + + +class test_message(gr_unittest.TestCase): + + def setUp(self): + self.msgq = gr.msg_queue() + + def tearDown(self): + self.msgq = None + + def leak_check(self, fct): + begin = all_counts() + fct() + # tear down early so we can check for leaks + self.tearDown() + end = all_counts() + self.assertEqual(begin, end) + + def test_100(self): + msg = gr.message(0, 1.5, 2.3) + self.assertEquals(0, msg.type()) + self.assertAlmostEqual(1.5, msg.arg1()) + self.assertAlmostEqual(2.3, msg.arg2()) + self.assertEquals(0, msg.length()) + + def test_101(self): + s = 'This is a test' + msg = gr.message_from_string(s) + self.assertEquals(s, msg.to_string()) + + def test_200(self): + self.leak_check(self.body_200) + + def body_200(self): + self.msgq.insert_tail(gr.message(0)) + self.assertEquals(1, self.msgq.count()) + self.msgq.insert_tail(gr.message(1)) + self.assertEquals(2, self.msgq.count()) + msg0 = self.msgq.delete_head() + self.assertEquals(0, msg0.type()) + msg1 = self.msgq.delete_head() + self.assertEquals(1, msg1.type()) + self.assertEquals(0, self.msgq.count()) + + def test_201(self): + self.leak_check(self.body_201) + + def body_201(self): + self.msgq.insert_tail(gr.message(0)) + self.assertEquals(1, self.msgq.count()) + self.msgq.insert_tail(gr.message(1)) + self.assertEquals(2, self.msgq.count()) + + def test_202(self): + self.leak_check(self.body_202) + + def body_202(self): + # global msg + msg = gr.message(666) + + def test_300(self): + input_data = (0,1,2,3,4,5,6,7,8,9) + src = gr.vector_source_b(input_data) + dst = gr.vector_sink_b() + tb = gr.top_block() + tb.connect(src, dst) + tb.run() + self.assertEquals(input_data, dst.data()) + + def test_301(self): + # Use itemsize, limit constructor + src = blocks.message_source(gr.sizeof_char) + dst = gr.vector_sink_b() + tb = gr.top_block() + tb.connect(src, dst) + src.msgq().insert_tail(gr.message_from_string('01234')) + src.msgq().insert_tail(gr.message_from_string('5')) + src.msgq().insert_tail(gr.message_from_string('')) + src.msgq().insert_tail(gr.message_from_string('6789')) + src.msgq().insert_tail(gr.message(1)) # send EOF + tb.run() + self.assertEquals(tuple(map(ord, '0123456789')), dst.data()) + + def test_302(self): + # Use itemsize, msgq constructor + msgq = gr.msg_queue() + src = blocks.message_source(gr.sizeof_char, msgq) + dst = gr.vector_sink_b() + tb = gr.top_block() + tb.connect(src, dst) + src.msgq().insert_tail(gr.message_from_string('01234')) + src.msgq().insert_tail(gr.message_from_string('5')) + src.msgq().insert_tail(gr.message_from_string('')) + src.msgq().insert_tail(gr.message_from_string('6789')) + src.msgq().insert_tail(gr.message(1)) # send EOF + tb.run() + self.assertEquals(tuple(map(ord, '0123456789')), dst.data()) + + def test_debug_401(self): + msg = pmt.pmt_intern("TESTING") + src = blocks.message_strobe(msg, 500) + snk = blocks.message_debug() + + tb = gr.top_block() + tb.msg_connect(src, "strobe", snk, "store") + tb.start() + time.sleep(1) + tb.stop() + tb.wait() + + rec_msg = snk.get_message(0) + self.assertTrue(pmt.pmt_eqv(rec_msg, msg)) + + +if __name__ == '__main__': + gr_unittest.run(test_message, "test_message.xml") diff --git a/gr-blocks/python/qa_moving_average.py b/gr-blocks/python/qa_moving_average.py new file mode 100644 index 0000000000..169b4746c2 --- /dev/null +++ b/gr-blocks/python/qa_moving_average.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import math, random + +def make_random_complex_tuple(L, scale=1): + result = [] + for x in range(L): + result.append(scale*complex(2*random.random()-1, + 2*random.random()-1)) + return tuple(result) + +def make_random_float_tuple(L, scale=1): + result = [] + for x in range(L): + result.append(scale*(2*random.random()-1)) + return tuple(result) + +class test_moving_average(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_01(self): + tb = self.tb + + N = 10000 + seed = 0 + data = make_random_float_tuple(N, 1) + expected_result = N*[0,] + + src = gr.vector_source_f(data, False) + op = blocks.moving_average_ff(100, 0.001) + dst = gr.vector_sink_f() + + tb.connect(src, op) + tb.connect(op, dst) + tb.run() + + dst_data = dst.data() + + # make sure result is close to zero + self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 1) + + def test_02(self): + tb = self.tb + + N = 10000 + seed = 0 + data = make_random_complex_tuple(N, 1) + expected_result = N*[0,] + + src = gr.vector_source_c(data, False) + op = blocks.moving_average_cc(100, 0.001) + dst = gr.vector_sink_c() + + tb.connect(src, op) + tb.connect(op, dst) + tb.run() + + dst_data = dst.data() + + # make sure result is close to zero + self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) + +if __name__ == '__main__': + gr_unittest.run(test_moving_average, "test_moving_average.xml") diff --git a/gr-blocks/python/qa_peak_detector.py b/gr-blocks/python/qa_peak_detector.py new file mode 100644 index 0000000000..c3ff2548c4 --- /dev/null +++ b/gr-blocks/python/qa_peak_detector.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_peak_detector(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_01(self): + tb = self.tb + + data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + expected_result = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + src = gr.vector_source_f(data, False) + regen = blocks.peak_detector_fb() + dst = gr.vector_sink_b() + + tb.connect(src, regen) + tb.connect(regen, dst) + tb.run() + + dst_data = dst.data() + + self.assertEqual(expected_result, dst_data) + + def test_02(self): + tb = self.tb + + data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + expected_result = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + src = gr.vector_source_i(data, False) + regen = blocks.peak_detector_ib() + dst = gr.vector_sink_b() + + tb.connect(src, regen) + tb.connect(regen, dst) + tb.run() + + dst_data = dst.data() + + self.assertEqual(expected_result, dst_data) + + def test_03(self): + tb = self.tb + + data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 9, 8, 7, 6, 5, 4, 3, 2, 1, 0] + + expected_result = (0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) + + src = gr.vector_source_s(data, False) + regen = blocks.peak_detector_sb() + dst = gr.vector_sink_b() + + tb.connect(src, regen) + tb.connect(regen, dst) + tb.run() + + dst_data = dst.data() + + self.assertEqual(expected_result, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_peak_detector, "test_peak_detector.xml") diff --git a/gr-blocks/python/qa_peak_detector2.py b/gr-blocks/python/qa_peak_detector2.py index 4b864e4d70..b2d8e318dd 100644 --- a/gr-blocks/python/qa_peak_detector2.py +++ b/gr-blocks/python/qa_peak_detector2.py @@ -50,7 +50,6 @@ class test_peak_detector2(gr_unittest.TestCase): tb.run() dst_data = dst.data() - print dst_data self.assertEqual(expected_result, dst_data) |