summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
authorMartin Braun <martin.braun@kit.edu>2013-03-15 02:12:20 -0700
committerTom Rondeau <trondeau@vt.edu>2013-03-16 23:52:19 -0400
commit7fd15b67afa5abd20c0982bdd6bcb7191831bf73 (patch)
tree3bd525f2850e662a9ccfe0a6c286ff7c20ad57e3 /gr-blocks/python
parent34216336205ba3edc0bc308a8da9ad388f3d0774 (diff)
Squash/rebased martin/ofdm-master onto trial merge branch
Conflicts: gr-blocks/include/blocks/CMakeLists.txt
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-xgr-blocks/python/qa_repack_bits_bb.py127
-rwxr-xr-xgr-blocks/python/qa_tagged_stream_mux.py111
2 files changed, 238 insertions, 0 deletions
diff --git a/gr-blocks/python/qa_repack_bits_bb.py b/gr-blocks/python/qa_repack_bits_bb.py
new file mode 100755
index 0000000000..b0c6480ec4
--- /dev/null
+++ b/gr-blocks/python/qa_repack_bits_bb.py
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import random
+from gnuradio import gr, gr_unittest
+from gruel import pmt
+import blocks_swig as blocks
+
+class qa_repack_bits_bb (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_simple (self):
+ """ Very simple test, 2 bits -> 1 """
+ src_data = (0b11, 0b01, 0b10)
+ expected_data = (0b1, 0b1, 0b1, 0b0, 0b0, 0b1)
+ k = 2
+ l = 1
+ src = gr.vector_source_b(src_data, False, 1)
+ repack = blocks.repack_bits_bb(k, l)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, repack, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_data)
+
+ def test_002_three (self):
+ """ 8 -> 3 """
+ src_data = (0b11111101, 0b11111111, 0b11111111)
+ expected_data = (0b101,) + (0b111,) * 7
+ k = 8
+ l = 3
+ src = gr.vector_source_b(src_data, False, 1)
+ repack = blocks.repack_bits_bb(k, l)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, repack, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_data)
+
+ def test_003_lots_of_bytes (self):
+ """ Lots and lots of bytes, multiple packer stages """
+ src_data = tuple([random.randint(0, 255) for x in range(3*5*7*8 * 10)])
+ src = gr.vector_source_b(src_data, False, 1)
+ repack1 = blocks.repack_bits_bb(8, 3)
+ repack2 = blocks.repack_bits_bb(3, 5)
+ repack3 = blocks.repack_bits_bb(5, 7)
+ repack4 = blocks.repack_bits_bb(7, 8)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, repack1, repack2, repack3, repack4, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), src_data)
+
+ def test_004_three_with_tags (self):
+ """ 8 -> 3 """
+ src_data = (0b11111101, 0b11111111)
+ expected_data = (0b101,) + (0b111,) * 4 + (0b001,)
+ k = 8
+ l = 3
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.pmt_string_to_symbol(tag_name)
+ tag.value = pmt.pmt_from_long(len(src_data))
+ src = gr.vector_source_b(src_data, False, 1, (tag,))
+ repack = blocks.repack_bits_bb(k, l, tag_name)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, repack, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_data)
+ try:
+ out_tag = sink.tags()[0]
+ except:
+ self.assertFail()
+ self.assertEqual(out_tag.offset, 0)
+ self.assertEqual(pmt.pmt_symbol_to_string(out_tag.key), tag_name)
+ self.assertEqual(pmt.pmt_to_long(out_tag.value), len(expected_data))
+
+ def test_005_three_with_tags_trailing (self):
+ """ 3 -> 8, trailing bits """
+ src_data = (0b101,) + (0b111,) * 4 + (0b001,)
+ expected_data = (0b11111101, 0b11111111)
+ k = 3
+ l = 8
+ tag_name = "len"
+ tag = gr.gr_tag_t()
+ tag.offset = 0
+ tag.key = pmt.pmt_string_to_symbol(tag_name)
+ tag.value = pmt.pmt_from_long(len(src_data))
+ src = gr.vector_source_b(src_data, False, 1, (tag,))
+ repack = blocks.repack_bits_bb(k, l, tag_name, True)
+ sink = gr.vector_sink_b()
+ self.tb.connect(src, repack, sink)
+ self.tb.run ()
+ self.assertEqual(sink.data(), expected_data)
+ try:
+ out_tag = sink.tags()[0]
+ except:
+ self.assertFail()
+ self.assertEqual(out_tag.offset, 0)
+ self.assertEqual(pmt.pmt_symbol_to_string(out_tag.key), tag_name)
+ self.assertEqual(pmt.pmt_to_long(out_tag.value), len(expected_data))
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_repack_bits_bb, "qa_repack_bits_bb.xml")
+
diff --git a/gr-blocks/python/qa_tagged_stream_mux.py b/gr-blocks/python/qa_tagged_stream_mux.py
new file mode 100755
index 0000000000..501c111a67
--- /dev/null
+++ b/gr-blocks/python/qa_tagged_stream_mux.py
@@ -0,0 +1,111 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+from gruel import pmt
+import blocks_swig as blocks
+import numpy
+
+def make_len_tags(tupl, key):
+ tags = []
+ tag = gr.gr_tag_t()
+ tag.key = pmt.pmt_string_to_symbol(key)
+ n_read = 0
+ for element in tupl:
+ tag.offset = n_read
+ n_read += len(element)
+ tag.value = pmt.to_pmt(len(element))
+ tags.append(tag)
+ return tags
+
+def make_len_tag(offset, key, value):
+ tag = gr.gr_tag_t()
+ tag.offset = offset
+ tag.key = pmt.pmt_string_to_symbol(key)
+ tag.value = pmt.to_pmt(value)
+ return tag
+
+
+class qa_tagged_stream_mux (gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_1(self):
+ datas = (
+ 0, 1, 2, 5, 6, 10, 14, 15, 16,
+ 3, 4, 7, 8, 9, 11, 12, 13, 17
+ )
+ expected = tuple(range(18))
+
+ tagname = "packet_length"
+ len_tags_0 = (
+ make_len_tag(0, tagname, 3),
+ make_len_tag(3, tagname, 2),
+ make_len_tag(5, tagname, 1),
+ make_len_tag(6, tagname, 3)
+ )
+ len_tags_1 = (
+ make_len_tag(0, tagname, 2),
+ make_len_tag(2, tagname, 3),
+ make_len_tag(5, tagname, 3),
+ make_len_tag(8, tagname, 1)
+ )
+ test_tag_0 = gr.gr_tag_t()
+ test_tag_0.key = pmt.pmt_string_to_symbol('spam')
+ test_tag_0.offset = 4 # On the second '1'
+ test_tag_0.value = pmt.to_pmt(42)
+ test_tag_1 = gr.gr_tag_t()
+ test_tag_1.key = pmt.pmt_string_to_symbol('eggs')
+ test_tag_1.offset = 3 # On the first '3' of the 2nd stream
+ test_tag_1.value = pmt.to_pmt(23)
+
+ src0 = gr.vector_source_b(datas[0:9], False, 1, len_tags_0 + (test_tag_0,))
+ src1 = gr.vector_source_b(datas[9:], False, 1, len_tags_1 + (test_tag_1,))
+ tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, tagname)
+ snk = gr.vector_sink_b()
+ self.tb.connect(src0, (tagged_stream_mux, 0))
+ self.tb.connect(src1, (tagged_stream_mux, 1))
+ self.tb.connect(tagged_stream_mux, snk)
+ self.tb.run()
+
+ self.assertEqual(expected, snk.data())
+
+ tags = [gr.tag_to_python(x) for x in snk.tags()]
+ tags = sorted([(x.offset, x.key, x.value) for x in tags])
+ tags_expected = [
+ (0, 'packet_length', 5),
+ (5, 'packet_length', 5),
+ (6, 'spam', 42),
+ (8, 'eggs', 23),
+ (10, 'packet_length', 4),
+ (14, 'packet_length', 4)
+ ]
+ self.assertEqual(tags, tags_expected)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_tagged_stream_mux, "qa_tagged_stream_mux.xml")
+