diff options
author | Tom Rondeau <trondeau@vt.edu> | 2013-03-16 10:29:20 -0400 |
---|---|---|
committer | Tom Rondeau <trondeau@vt.edu> | 2013-03-16 10:29:20 -0400 |
commit | 57720572843fe89aaf6edc02a50859780b349b6c (patch) | |
tree | 3d36e7f8c3c4194ab6ee238213a67ed181876e14 /gr-blocks/python | |
parent | 3d38e82e7d87dfc01132b199e33b53add8a706fa (diff) | |
parent | 4412cb10077cb40659ab5bc9c4dc814ca88a380b (diff) |
Merge branch 'next' of gnuradio.org:gnuradio into next
Diffstat (limited to 'gr-blocks/python')
-rw-r--r-- | gr-blocks/python/qa_message_tags.py | 27 | ||||
-rwxr-xr-x | gr-blocks/python/qa_repack_bits_bb.py | 127 | ||||
-rwxr-xr-x | gr-blocks/python/qa_tagged_stream_mux.py | 111 |
3 files changed, 265 insertions, 0 deletions
diff --git a/gr-blocks/python/qa_message_tags.py b/gr-blocks/python/qa_message_tags.py new file mode 100644 index 0000000000..0ab857b1aa --- /dev/null +++ b/gr-blocks/python/qa_message_tags.py @@ -0,0 +1,27 @@ +import time + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks + +class test_message_tags (gr_unittest.TestCase): + + def test_1 (self): + data = ('hello', 'you', 'there') + tx_msgq = gr.msg_queue () + rx_msgq = gr.msg_queue () + for d in data: + tx_msgq.insert_tail(gr.message_from_string(d)) + tb = gr.top_block() + src = blocks.message_source(gr.sizeof_char, tx_msgq, "packet_length") + snk = blocks.message_sink(gr.sizeof_char, rx_msgq, False, "packet_length") + tb.connect(src, snk) + tb.start() + time.sleep(1) + tb.stop() + for d in data: + msg = rx_msgq.delete_head() + contents = msg.to_string() + self.assertEqual(d, contents) + +if __name__ == '__main__': + gr_unittest.run(test_message_tags, "test_message_tags.xml") diff --git a/gr-blocks/python/qa_repack_bits_bb.py b/gr-blocks/python/qa_repack_bits_bb.py new file mode 100755 index 0000000000..3f88df4a68 --- /dev/null +++ b/gr-blocks/python/qa_repack_bits_bb.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import random +from gnuradio import gr, gr_unittest +import pmt +import blocks_swig as blocks + +class qa_repack_bits_bb (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_simple (self): + """ Very simple test, 2 bits -> 1 """ + src_data = (0b11, 0b01, 0b10) + expected_data = (0b1, 0b1, 0b1, 0b0, 0b0, 0b1) + k = 2 + l = 1 + src = gr.vector_source_b(src_data, False, 1) + repack = blocks.repack_bits_bb(k, l) + sink = gr.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + + def test_002_three (self): + """ 8 -> 3 """ + src_data = (0b11111101, 0b11111111, 0b11111111) + expected_data = (0b101,) + (0b111,) * 7 + k = 8 + l = 3 + src = gr.vector_source_b(src_data, False, 1) + repack = blocks.repack_bits_bb(k, l) + sink = gr.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + + def test_003_lots_of_bytes (self): + """ Lots and lots of bytes, multiple packer stages """ + src_data = tuple([random.randint(0, 255) for x in range(3*5*7*8 * 10)]) + src = gr.vector_source_b(src_data, False, 1) + repack1 = blocks.repack_bits_bb(8, 3) + repack2 = blocks.repack_bits_bb(3, 5) + repack3 = blocks.repack_bits_bb(5, 7) + repack4 = blocks.repack_bits_bb(7, 8) + sink = gr.vector_sink_b() + self.tb.connect(src, repack1, repack2, repack3, repack4, sink) + self.tb.run () + self.assertEqual(sink.data(), src_data) + + def test_004_three_with_tags (self): + """ 8 -> 3 """ + src_data = (0b11111101, 0b11111111) + expected_data = (0b101,) + (0b111,) * 4 + (0b001,) + k = 8 + l = 3 + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(src_data)) + src = gr.vector_source_b(src_data, False, 1, (tag,)) + repack = blocks.repack_bits_bb(k, l, tag_name) + sink = gr.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + try: + out_tag = sink.tags()[0] + except: + self.assertFail() + self.assertEqual(out_tag.offset, 0) + self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name) + self.assertEqual(pmt.to_long(out_tag.value), len(expected_data)) + + def test_005_three_with_tags_trailing (self): + """ 3 -> 8, trailing bits """ + src_data = (0b101,) + (0b111,) * 4 + (0b001,) + expected_data = (0b11111101, 0b11111111) + k = 3 + l = 8 + tag_name = "len" + tag = gr.gr_tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(len(src_data)) + src = gr.vector_source_b(src_data, False, 1, (tag,)) + repack = blocks.repack_bits_bb(k, l, tag_name, True) + sink = gr.vector_sink_b() + self.tb.connect(src, repack, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_data) + try: + out_tag = sink.tags()[0] + except: + self.assertFail() + self.assertEqual(out_tag.offset, 0) + self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name) + self.assertEqual(pmt.to_long(out_tag.value), len(expected_data)) + +if __name__ == '__main__': + gr_unittest.run(qa_repack_bits_bb, "qa_repack_bits_bb.xml") + diff --git a/gr-blocks/python/qa_tagged_stream_mux.py b/gr-blocks/python/qa_tagged_stream_mux.py new file mode 100755 index 0000000000..e39f8cac29 --- /dev/null +++ b/gr-blocks/python/qa_tagged_stream_mux.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import pmt +import blocks_swig as blocks +import numpy + +def make_len_tags(tupl, key): + tags = [] + tag = gr.gr_tag_t() + tag.key = pmt.string_to_symbol(key) + n_read = 0 + for element in tupl: + tag.offset = n_read + n_read += len(element) + tag.value = pmt.to_pmt(len(element)) + tags.append(tag) + return tags + +def make_len_tag(offset, key, value): + tag = gr.gr_tag_t() + tag.offset = offset + tag.key = pmt.string_to_symbol(key) + tag.value = pmt.to_pmt(value) + return tag + + +class qa_tagged_stream_mux (gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_1(self): + datas = ( + 0, 1, 2, 5, 6, 10, 14, 15, 16, + 3, 4, 7, 8, 9, 11, 12, 13, 17 + ) + expected = tuple(range(18)) + + tagname = "packet_length" + len_tags_0 = ( + make_len_tag(0, tagname, 3), + make_len_tag(3, tagname, 2), + make_len_tag(5, tagname, 1), + make_len_tag(6, tagname, 3) + ) + len_tags_1 = ( + make_len_tag(0, tagname, 2), + make_len_tag(2, tagname, 3), + make_len_tag(5, tagname, 3), + make_len_tag(8, tagname, 1) + ) + test_tag_0 = gr.gr_tag_t() + test_tag_0.key = pmt.string_to_symbol('spam') + test_tag_0.offset = 4 # On the second '1' + test_tag_0.value = pmt.to_pmt(42) + test_tag_1 = gr.gr_tag_t() + test_tag_1.key = pmt.string_to_symbol('eggs') + test_tag_1.offset = 3 # On the first '3' of the 2nd stream + test_tag_1.value = pmt.to_pmt(23) + + src0 = gr.vector_source_b(datas[0:9], False, 1, len_tags_0 + (test_tag_0,)) + src1 = gr.vector_source_b(datas[9:], False, 1, len_tags_1 + (test_tag_1,)) + tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, tagname) + snk = gr.vector_sink_b() + self.tb.connect(src0, (tagged_stream_mux, 0)) + self.tb.connect(src1, (tagged_stream_mux, 1)) + self.tb.connect(tagged_stream_mux, snk) + self.tb.run() + + self.assertEqual(expected, snk.data()) + + tags = [gr.tag_to_python(x) for x in snk.tags()] + tags = sorted([(x.offset, x.key, x.value) for x in tags]) + tags_expected = [ + (0, 'packet_length', 5), + (5, 'packet_length', 5), + (6, 'spam', 42), + (8, 'eggs', 23), + (10, 'packet_length', 4), + (14, 'packet_length', 4) + ] + self.assertEqual(tags, tags_expected) + + +if __name__ == '__main__': + gr_unittest.run(qa_tagged_stream_mux, "qa_tagged_stream_mux.xml") + |