diff options
author | Martin Braun <martin.braun@ettus.com> | 2014-05-08 13:34:04 +0200 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2014-05-20 09:50:22 +0200 |
commit | 00b8dbe951768ce874cc5d74ffb9f0fa57d580d2 (patch) | |
tree | 5ceca63813c4c51ec72c957fa77e0c97f7447997 /gr-blocks/python | |
parent | 979060c0e7edac88ebcb56b4c44409c0c5c8ea19 (diff) |
blocks: updated some QAs to use proper tsb functions
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-x | gr-blocks/python/blocks/qa_repack_bits_bb.py | 57 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_tagged_stream_mux.py | 123 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_tsb_vector_sink_X.py | 4 |
3 files changed, 74 insertions, 110 deletions
diff --git a/gr-blocks/python/blocks/qa_repack_bits_bb.py b/gr-blocks/python/blocks/qa_repack_bits_bb.py index d9bbfe4fac..10880b196a 100755 --- a/gr-blocks/python/blocks/qa_repack_bits_bb.py +++ b/gr-blocks/python/blocks/qa_repack_bits_bb.py @@ -28,6 +28,7 @@ class qa_repack_bits_bb (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "length" def tearDown (self): self.tb = None @@ -77,24 +78,18 @@ class qa_repack_bits_bb (gr_unittest.TestCase): expected_data = (0b101,) + (0b111,) * 4 + (0b001,) k = 8 l = 3 - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(src_data)) - src = blocks.vector_source_b(src_data, False, 1, (tag,)) - repack = blocks.repack_bits_bb(k, l, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, repack, sink) + src = blocks.vector_source_b(src_data, False, 1) + repack = blocks.repack_bits_bb(k, l, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(src_data), self.tsb_key), + repack, + sink + ) self.tb.run () - self.assertEqual(sink.data(), expected_data) - try: - out_tag = sink.tags()[0] - except: - self.assertFail() - self.assertEqual(out_tag.offset, 0) - self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name) - self.assertEqual(pmt.to_long(out_tag.value), len(expected_data)) + self.assertEqual(len(sink.data()), 1) + self.assertEqual(sink.data()[0], expected_data) def test_005_three_with_tags_trailing (self): """ 3 -> 8, trailing bits """ @@ -102,24 +97,18 @@ class qa_repack_bits_bb (gr_unittest.TestCase): expected_data = (0b11111101, 0b11111111) k = 3 l = 8 - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(src_data)) - src = blocks.vector_source_b(src_data, False, 1, (tag,)) - repack = blocks.repack_bits_bb(k, l, tag_name, True) - sink = blocks.vector_sink_b() - self.tb.connect(src, repack, sink) + src = blocks.vector_source_b(src_data, False, 1) + repack = blocks.repack_bits_bb(k, l, self.tsb_key, True) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(src_data), self.tsb_key), + repack, + sink + ) self.tb.run () - self.assertEqual(sink.data(), expected_data) - try: - out_tag = sink.tags()[0] - except: - self.assertFail() - self.assertEqual(out_tag.offset, 0) - self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name) - self.assertEqual(pmt.to_long(out_tag.value), len(expected_data)) + self.assertEqual(len(sink.data()), 1) + self.assertEqual(sink.data()[0], expected_data) if __name__ == '__main__': gr_unittest.run(qa_repack_bits_bb, "qa_repack_bits_bb.xml") diff --git a/gr-blocks/python/blocks/qa_tagged_stream_mux.py b/gr-blocks/python/blocks/qa_tagged_stream_mux.py index 749fda3c32..6f1c1c538a 100755 --- a/gr-blocks/python/blocks/qa_tagged_stream_mux.py +++ b/gr-blocks/python/blocks/qa_tagged_stream_mux.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2013 Free Software Foundation, Inc. +# Copyright 2013-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -20,87 +20,62 @@ # Boston, MA 02110-1301, USA. # -from gnuradio import gr, gr_unittest, blocks -import pmt import numpy +import pmt +from gnuradio import gr, gr_unittest, blocks +from gnuradio.gr import packet_utils -def make_len_tags(tupl, key): - tags = [] - tag = gr.tag_t() - tag.key = pmt.string_to_symbol(key) - n_read = 0 - for element in tupl: - tag.offset = n_read - n_read += len(element) - tag.value = pmt.to_pmt(len(element)) - tags.append(tag) - return tags - -def make_len_tag(offset, key, value): +def make_tag(key, value, offset, srcid=None): tag = gr.tag_t() - tag.offset = offset tag.key = pmt.string_to_symbol(key) tag.value = pmt.to_pmt(value) + tag.offset = offset + if srcid is not None: + tag.srcid = pmt.to_pmt(srcid) return tag - class qa_tagged_stream_mux (gr_unittest.TestCase): def setUp(self): self.tb = gr.top_block() + self.tsb_key = "tsb_key" def tearDown(self): self.tb = None - def test_1(self): - datas = ( - 0, 1, 2, 5, 6, 10, 14, 15, 16, - 3, 4, 7, 8, 9, 11, 12, 13, 17 + def setup_data_tags(self, data): + return packet_utils.packets_to_vectors( + data, + self.tsb_key ) - expected = tuple(range(18)) - tagname = "packet_length" - len_tags_0 = ( - make_len_tag(0, tagname, 3), - make_len_tag(3, tagname, 2), - make_len_tag(5, tagname, 1), - make_len_tag(6, tagname, 3) + def test_1(self): + packets0 = ( + (0, 1, 2), (5, 6), (10,), (14, 15, 16,) ) - len_tags_1 = ( - make_len_tag(0, tagname, 2), - make_len_tag(2, tagname, 3), - make_len_tag(5, tagname, 3), - make_len_tag(8, tagname, 1) + packets1 = ( + (3, 4), (7, 8, 9), (11, 12, 13), (17,) ) - test_tag_0 = gr.tag_t() - test_tag_0.key = pmt.string_to_symbol('spam') - test_tag_0.offset = 4 # On the second '1' - test_tag_0.value = pmt.to_pmt(42) - test_tag_1 = gr.tag_t() - test_tag_1.key = pmt.string_to_symbol('eggs') - test_tag_1.offset = 3 # On the first '3' of the 2nd stream - test_tag_1.value = pmt.to_pmt(23) - - src0 = blocks.vector_source_b(datas[0:9], False, 1, len_tags_0 + (test_tag_0,)) - src1 = blocks.vector_source_b(datas[9:], False, 1, len_tags_1 + (test_tag_1,)) - tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, tagname) - snk = blocks.vector_sink_b() + expected = ((0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12, 13), (14, 15, 16, 17)) + data0, tags0 = self.setup_data_tags(packets0) + data1, tags1 = self.setup_data_tags(packets1) + tags0.append(make_tag('spam', 42, 4)) + tags1.append(make_tag('eggs', 23, 3)) + src0 = blocks.vector_source_b(data0, tags=tags0) + src1 = blocks.vector_source_b(data1, tags=tags1) + tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, self.tsb_key) + snk = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) self.tb.connect(src0, (tagged_stream_mux, 0)) self.tb.connect(src1, (tagged_stream_mux, 1)) self.tb.connect(tagged_stream_mux, snk) self.tb.run() - + # Check self.assertEqual(expected, snk.data()) - tags = [gr.tag_to_python(x) for x in snk.tags()] tags = sorted([(x.offset, x.key, x.value) for x in tags]) tags_expected = [ - (0, 'packet_length', 5), - (5, 'packet_length', 5), (6, 'spam', 42), (8, 'eggs', 23), - (10, 'packet_length', 4), - (14, 'packet_length', 4) ] self.assertEqual(tags, tags_expected) @@ -108,35 +83,35 @@ class qa_tagged_stream_mux (gr_unittest.TestCase): """ Test the 'preserve head position' function. This will add a 'special' tag to item 0 on stream 1. It should be on item 0 of the output stream. """ - special_tag = gr.tag_t() - special_tag.key = pmt.string_to_symbol('spam') - special_tag.offset = 0 - special_tag.value = pmt.to_pmt('eggs') - len_tag_key = "length" - packet_len_1 = 5 - packet_len_2 = 3 - mux = blocks.tagged_stream_mux(gr.sizeof_float, len_tag_key, 1) - sink = blocks.vector_sink_f() + packet_len_0 = 5 + data0 = range(packet_len_0) + packet_len_1 = 3 + data1 = range(packet_len_1) + mux = blocks.tagged_stream_mux( + gr.sizeof_float, + self.tsb_key, + 1 # Mark port 1 as carrying special tags on the head position + ) + sink = blocks.tsb_vector_sink_f(tsb_key=self.tsb_key) self.tb.connect( - blocks.vector_source_f(range(packet_len_1)), - blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_1, len_tag_key), + blocks.vector_source_f(data0), + blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_0, self.tsb_key), (mux, 0) ) self.tb.connect( - blocks.vector_source_f(range(packet_len_2), False, 1, (special_tag,)), - blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_2, len_tag_key), + blocks.vector_source_f(range(packet_len_1), tags=(make_tag('spam', 'eggs', 0),)), + blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_1, self.tsb_key), (mux, 1) ) self.tb.connect(mux, sink) self.tb.run() - self.assertEqual(sink.data(), tuple(range(packet_len_1) + range(packet_len_2))) - tags = [gr.tag_to_python(x) for x in sink.tags()] - tags = sorted([(x.offset, x.key, x.value) for x in tags]) - tags_expected = [ - (0, 'length', packet_len_1 + packet_len_2), - (0, 'spam', 'eggs'), - ] - self.assertEqual(tags, tags_expected) + self.assertEqual(len(sink.data()), 1) + self.assertEqual(sink.data()[0], tuple(data0 + data1)) + self.assertEqual(len(sink.tags()), 1) + tag = gr.tag_to_python(sink.tags()[0]) + tag = (tag.offset, tag.key, tag.value) + tag_expected = (0, 'spam', 'eggs') + self.assertEqual(tag, tag_expected) if __name__ == '__main__': diff --git a/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py b/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py index fbc9c9fca3..aa7154e6a5 100755 --- a/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py +++ b/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py @@ -29,7 +29,7 @@ class qa_tsb_vector_sink (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () - self.tsb_key = "ts_last" + self.tsb_key = "tsb" def tearDown (self): self.tb = None @@ -42,7 +42,7 @@ class qa_tsb_vector_sink (gr_unittest.TestCase): tag.offset = 5 tag.value = pmt.intern("bar") src = blocks.vector_source_f(data, tags=(tag,)) - sink = blocks.tsb_vector_sink_f() + sink = blocks.tsb_vector_sink_f(tsb_key=self.tsb_key) self.tb.connect( src, blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len, self.tsb_key), |