diff options
author | Martin Braun <martin.braun@ettus.com> | 2014-05-08 13:33:07 +0200 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2014-05-20 09:50:23 +0200 |
commit | 95ae36c7ad1fd228e7ca943c7d2a2b49053ae818 (patch) | |
tree | 77824a10189913dda7abcc9b4d2d05b2156beb7d | |
parent | 00b8dbe951768ce874cc5d74ffb9f0fa57d580d2 (diff) |
digital: updated some QAs to use proper tsb functions
-rwxr-xr-x | gr-digital/python/digital/qa_crc32_bb.py | 111 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py | 111 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_chanest_vcvc.py | 7 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py | 17 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py | 162 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_serializer_vcc.py | 120 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_packet_headergenerator_bb.py | 105 |
7 files changed, 265 insertions, 368 deletions
diff --git a/gr-digital/python/digital/qa_crc32_bb.py b/gr-digital/python/digital/qa_crc32_bb.py index 71ba81ded3..bca19cd418 100755 --- a/gr-digital/python/digital/qa_crc32_bb.py +++ b/gr-digital/python/digital/qa_crc32_bb.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2012,2013 Free Software Foundation, Inc. +# Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -26,6 +26,7 @@ class qa_crc32_bb (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "length" def tearDown (self): self.tb = None @@ -33,36 +34,37 @@ class qa_crc32_bb (gr_unittest.TestCase): def test_001_crc_len (self): """ Make sure the output of a CRC set is 4 bytes longer than the input. """ data = range(16) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) - src = blocks.vector_source_b(data, False, 1, (tag,)) - crc = digital.crc32_bb(False, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, sink) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + sink + ) self.tb.run() # Check that the packets before crc_check are 4 bytes longer that the input. - self.assertEqual(len(data)+4, len(sink.data())) + self.assertEqual(len(data)+4, len(sink.data()[0])) def test_002_crc_equal (self): """ Go through CRC set / CRC check and make sure the output is the same as the input. """ data = (0, 1, 2, 3, 4, 5, 6, 7, 8) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) - src = blocks.vector_source_b(data, False, 1, (tag,)) - crc = digital.crc32_bb(False, tag_name) - crc_check = digital.crc32_bb(True, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, crc_check, sink) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key) + crc_check = digital.crc32_bb(True, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + crc_check, + sink + ) self.tb.run() # Check that the packets after crc_check are the same as input. - self.assertEqual(data, sink.data()) + self.assertEqual(data, sink.data()[0]) def test_003_crc_correct_lentag (self): tag_name = "length" @@ -88,12 +90,19 @@ class qa_crc32_bb (gr_unittest.TestCase): testtag3.offset = len(packets)-1 testtag3.key = pmt.string_to_symbol("tag3") testtag3.value = pmt.from_long(0) - src = blocks.vector_source_b(packets, False, 1, (tag1, tag2, testtag1, testtag2, testtag3)) - crc = digital.crc32_bb(False, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, sink) + src = blocks.vector_source_b(packets, False, 1, (testtag1, testtag2, testtag3)) + crc = digital.crc32_bb(False, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, pack_len, self.tsb_key), + crc, + sink + ) self.tb.run() - self.assertEqual(len(sink.data()), 2*(pack_len+4)) + self.assertEqual(len(sink.data()), 2) + self.assertEqual(len(sink.data()[0]), (pack_len+4)) + self.assertEqual(len(sink.data()[1]), (pack_len+4)) correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19} tags_found = {'tag1': False, 'tag2': False, 'tag3': False} for tag in sink.tags(): @@ -101,45 +110,49 @@ class qa_crc32_bb (gr_unittest.TestCase): if key in correct_offsets.keys(): tags_found[key] = True self.assertEqual(correct_offsets[key], tag.offset) - if key == tag_name: - self.assertTrue(tag.offset == 0 or tag.offset == pack_len+4) self.assertTrue(all(tags_found.values())) def test_004_fail (self): """ Corrupt the data and make sure it fails CRC test. """ data = (0, 1, 2, 3, 4, 5, 6, 7) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) - src = blocks.vector_source_b(data, False, 1, (tag,)) - crc = digital.crc32_bb(False, tag_name) - crc_check = digital.crc32_bb(True, tag_name) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key) + crc_check = digital.crc32_bb(True, self.tsb_key) corruptor = blocks.add_const_bb(1) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, corruptor, crc_check, sink) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + corruptor, + crc_check, + sink + ) self.tb.run() # crc_check will drop invalid packets self.assertEqual(len(sink.data()), 0) def test_005_tag_propagation (self): """ Make sure tags on the CRC aren't lost. """ - data = (0, 1, 2, 3, 4, 5, 6, 7, 8, 2, 67, 225, 188) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) + # Data with precalculated CRC + data = ( + 0, 1, 2, 3, 4, 5, 6, 7, 8, + 2, 67, 225, 188 + ) testtag = gr.tag_t() testtag.offset = len(data)-1 testtag.key = pmt.string_to_symbol('tag1') testtag.value = pmt.from_long(0) - src = blocks.vector_source_b(data, False, 1, (tag, testtag)) - crc_check = digital.crc32_bb(True, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc_check, sink) + src = blocks.vector_source_b(data, False, 1, (testtag,)) + crc_check = digital.crc32_bb(True, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc_check, + sink + ) self.tb.run() self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1']) diff --git a/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py b/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py index b1732fa94a..befb15ac4a 100755 --- a/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py +++ b/gr-digital/python/digital/qa_ofdm_carrier_allocator_cvc.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2012,2013 Free Software Foundation, Inc. +# Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -26,13 +26,14 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "ts_last" def tearDown (self): self.tb = None def test_001_t (self): """ - pretty simple (the carrier allocation is not a practical OFDM configuration!) + pretty simple (the carrier allocation here is not a practical OFDM configuration!) """ fft_len = 6 tx_symbols = (1, 2, 3) @@ -43,21 +44,21 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): sync_word = (range(fft_len),) expected_result = tuple(sync_word[0] + [1j, 0, 0, 1, 2, 3]) # ^ DC carrier - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_symbols)) - src = blocks.vector_source_c(tx_symbols, False, 1, (tag,)) + src = blocks.vector_source_c(tx_symbols, False, 1) alloc = digital.ofdm_carrier_allocator_cvc(fft_len, occupied_carriers, pilot_carriers, pilot_symbols, sync_word, - tag_name) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, alloc, sink) - self.tb.run () - self.assertEqual(sink.data(), expected_result) + self.tsb_key) + sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols), self.tsb_key), + alloc, + sink + ) + self.tb.run() + self.assertEqual(sink.data()[0], expected_result) def test_001_t2 (self): """ @@ -71,21 +72,18 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): pilot_symbols = ((1j,),) expected_result = (1j, 0, 1, 2, 3) # ^ DC carrier - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_symbols)) - src = blocks.vector_source_c(tx_symbols, False, 1, (tag,)) - alloc = digital.ofdm_carrier_allocator_cvc(fft_len, - occupied_carriers, - pilot_carriers, - pilot_symbols, (), - tag_name) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, alloc, sink) + src = blocks.vector_source_c(tx_symbols, False, 1) + alloc = digital.ofdm_carrier_allocator_cvc( + fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, (), + self.tsb_key + ) + sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols), self.tsb_key), alloc, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) + self.assertEqual(sink.data()[0], expected_result) def test_002_t (self): """ @@ -97,21 +95,21 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): occupied_carriers = ((-1, 1, 2),) pilot_carriers = ((3,),) expected_result = (1j, 0, 1, 0, 2, 3) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_symbols)) - src = blocks.vector_source_c(tx_symbols, False, 1, (tag,)) + src = blocks.vector_source_c(tx_symbols, False, 1) alloc = digital.ofdm_carrier_allocator_cvc(fft_len, occupied_carriers, pilot_carriers, pilot_symbols, (), - tag_name) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, alloc, sink) + self.tsb_key) + sink = blocks.tsb_vector_sink_c(fft_len) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols), self.tsb_key), + alloc, + sink + ) self.tb.run () - self.assertEqual(sink.data(), expected_result) + self.assertEqual(sink.data()[0], expected_result) def test_002_t (self): """ @@ -124,11 +122,6 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): occupied_carriers = ((-1, 1, 2),) pilot_carriers = ((3,),) expected_result = sync_word + (1j, 0, 1, 0, 2, 3) + (1j, 0, 4, 0, 5, 6) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_symbols)) special_tag1 = gr.tag_t() special_tag1.offset = 0 special_tag1.key = pmt.string_to_symbol("spam") @@ -139,7 +132,7 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): special_tag2.value = pmt.to_pmt(42) src = blocks.vector_source_c( tx_symbols, False, 1, - (tag, special_tag1, special_tag2) + (special_tag1, special_tag2) ) alloc = digital.ofdm_carrier_allocator_cvc( fft_len, @@ -147,16 +140,15 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): pilot_carriers, pilot_symbols, sync_words=(sync_word,), - len_tag_key=tag_name + len_tag_key=self.tsb_key ) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, alloc, sink) + sink = blocks.tsb_vector_sink_c(fft_len) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols), self.tsb_key), alloc, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) + self.assertEqual(sink.data()[0], expected_result) tags = [gr.tag_to_python(x) for x in sink.tags()] tags = sorted([(x.offset, x.key, x.value) for x in tags]) tags_expected = [ - (0, 'len', 3), (0, 'spam', 23), (2, 'eggs', 42), ] @@ -180,15 +172,6 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0, 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0) fft_len = 16 - tag_name = "len" - tag1 = gr.tag_t() - tag1.offset = 0 - tag1.key = pmt.string_to_symbol(tag_name) - tag1.value = pmt.from_long(len(tx_symbols)) - tag2 = gr.tag_t() - tag2.offset = len(tx_symbols) - tag2.key = pmt.string_to_symbol(tag_name) - tag2.value = pmt.from_long(len(tx_symbols)) testtag1 = gr.tag_t() testtag1.offset = 0 testtag1.key = pmt.string_to_symbol('tag1') @@ -205,18 +188,17 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): testtag4.offset = 2*len(tx_symbols)-1 # Last OFDM symbol of packet 2 testtag4.key = pmt.string_to_symbol('tag4') testtag4.value = pmt.from_long(0) - src = blocks.vector_source_c(tx_symbols * 2, False, 1, - (tag1, tag2, testtag1, testtag2, testtag3, testtag4)) + src = blocks.vector_source_c(tx_symbols * 2, False, 1, (testtag1, testtag2, testtag3, testtag4)) alloc = digital.ofdm_carrier_allocator_cvc(fft_len, occupied_carriers, pilot_carriers, pilot_symbols, (), - tag_name, + self.tsb_key, False) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, alloc, sink) + sink = blocks.tsb_vector_sink_c(fft_len) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_symbols), self.tsb_key), alloc, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result * 2) + self.assertEqual(sink.data()[0], expected_result) tags_found = {'tag1': False, 'tag2': False, 'tag3': False, 'tag4': False} correct_offsets = {'tag1': 0, 'tag2': 1, 'tag3': 3, 'tag4': 5} for tag in sink.tags(): @@ -224,9 +206,6 @@ class qa_digital_carrier_allocator_cvc (gr_unittest.TestCase): if key in tags_found.keys(): tags_found[key] = True self.assertEqual(correct_offsets[key], tag.offset) - if key == tag_name: - self.assertTrue(tag.offset == 0 or tag.offset == 3) - self.assertTrue(pmt.to_long(tag.value) == 3) self.assertTrue(all(tags_found.values())) diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py index c365cf8483..d63b65d018 100755 --- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py +++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2012,2013 Free Software Foundation, Inc. +# Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -22,7 +22,6 @@ import sys import numpy import random - import numpy from gnuradio import gr, gr_unittest, blocks, analog, digital @@ -41,7 +40,7 @@ def rand_range(min_val, max_val): return random.random() * (max_val - min_val) + min_val -class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): +class qa_ofdm_chanest_vcvc (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () @@ -284,5 +283,5 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): if __name__ == '__main__': - gr_unittest.run(qa_ofdm_sync_eqinit_vcvc, "qa_ofdm_sync_eqinit_vcvc.xml") + gr_unittest.run(qa_ofdm_chanest_vcvc, "qa_ofdm_chanest_vcvc.xml") diff --git a/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py b/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py index 5cb9fae777..ecc1c426d6 100755 --- a/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py +++ b/gr-digital/python/digital/qa_ofdm_cyclic_prefixer.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2007,2010,2011,2013 Free Software Foundation, Inc. +# Copyright 2007,2010,2011,2013,2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -62,27 +62,22 @@ class test_ofdm_cyclic_prefixer (gr_unittest.TestCase): " With tags and a 2-sample rolloff " fft_len = 8 cp_len = 2 - tag_name = "length" + tag_name = "ts_last" expected_result = (7.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, # 1.0/2 7.0/2+1.0/2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0/2) - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(2) tag2 = gr.tag_t() tag2.offset = 1 tag2.key = pmt.string_to_symbol("random_tag") tag2.value = pmt.from_long(42) - src = blocks.vector_source_c(range(1, fft_len+1) * 2, False, fft_len, (tag, tag2)) + src = blocks.vector_source_c(range(1, fft_len+1) * 2, False, fft_len, (tag2,)) cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, 2, tag_name) - sink = blocks.vector_sink_c() - self.tb.connect(src, cp, sink) + sink = blocks.tsb_vector_sink_c(tsb_key=tag_name) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, 2, tag_name), cp, sink) self.tb.run() - self.assertEqual(sink.data(), expected_result) + self.assertEqual(sink.data()[0], expected_result) tags = [gr.tag_to_python(x) for x in sink.tags()] tags = sorted([(x.offset, x.key, x.value) for x in tags]) expected_tags = [ - (0, tag_name, len(expected_result)), (fft_len+cp_len, "random_tag", 42) ] self.assertEqual(tags, expected_tags) diff --git a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py index 1cdb8ed9a4..c42fb2b907 100755 --- a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py +++ b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py @@ -28,6 +28,7 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "tsb_key" def tearDown (self): self.tb = None @@ -44,12 +45,7 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): fft_len = 8 equalizer = digital.ofdm_equalizer_static(fft_len) n_syms = 3 - len_tag_key = "frame_len" tx_data = (1,) * fft_len * n_syms - len_tag = gr.tag_t() - len_tag.offset = 0 - len_tag.key = pmt.string_to_symbol(len_tag_key) - len_tag.value = pmt.from_long(n_syms) chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") @@ -58,20 +54,24 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): random_tag.offset = 1 random_tag.key = pmt.string_to_symbol("foo") random_tag.value = pmt.from_long(42) - src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, random_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag, random_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, self.tsb_key) + sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), + eq, + sink + ) self.tb.run () # Check data - self.assertEqual(tx_data, sink.data()) + self.assertEqual(tx_data, sink.data()[0]) # Check tags tag_dict = dict() for tag in sink.tags(): ptag = gr.tag_to_python(tag) tag_dict[ptag.key] = ptag.value expected_dict = { - 'frame_len': n_syms, 'foo': 42 } self.assertEqual(tag_dict, expected_dict) @@ -83,23 +83,23 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): fft_len = 8 equalizer = digital.ofdm_equalizer_static(fft_len, symbols_skipped=1) n_syms = 3 - len_tag_key = "frame_len" tx_data = (1,) * fft_len * n_syms - len_tag = gr.tag_t() - len_tag.offset = 0 - len_tag.key = pmt.string_to_symbol(len_tag_key) - len_tag.value = pmt.from_long(n_syms) chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len) - src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag,)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, self.tsb_key) + sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), + eq, + sink + ) self.tb.run () # Check data - self.assertEqual(tx_data, sink.data()) + self.assertEqual(tx_data, sink.data()[0]) def test_001c_carrier_offset_no_cp (self): """ @@ -116,11 +116,6 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): # The rx'd signal is shifted rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers) - len_tag_key = "frame_len" - len_tag = gr.tag_t() - len_tag.offset = 0 - len_tag.key = pmt.string_to_symbol(len_tag_key) - len_tag.value = pmt.from_long(n_syms) chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") @@ -130,13 +125,18 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): offset_tag.offset = 0 offset_tag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offset_tag.value = pmt.from_long(carr_offset) - src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, offset_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, len_tag_key) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag, offset_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, self.tsb_key) + sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), + eq, + sink + ) self.tb.run () # Check data - self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4) + self.assertComplexTuplesAlmostEqual(rx_expected, sink.data()[0], places=4) def test_001c_carrier_offset_cp (self): """ @@ -157,11 +157,6 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): # Rx'd signal is corrected rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers) - len_tag_key = "frame_len" - len_tag = gr.tag_t() - len_tag.offset = 0 - len_tag.key = pmt.string_to_symbol(len_tag_key) - len_tag.value = pmt.from_long(n_syms) chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") @@ -170,13 +165,18 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): offset_tag.offset = 0 offset_tag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offset_tag.value = pmt.from_long(carr_offset) - src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, offset_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, len_tag_key) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(tx_data, False, fft_len, (chan_tag, offset_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, self.tsb_key) + sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), + eq, + sink + ) self.tb.run () # Check data - self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4) + self.assertComplexTuplesAlmostEqual(rx_expected, sink.data()[0], places=4) def test_002_static (self): """ @@ -211,21 +211,21 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): ] for idx in range(fft_len, 2*fft_len): channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) - len_tag_key = "frame_len" - len_tag = gr.tag_t() - len_tag.offset = 0 - len_tag.key = pmt.string_to_symbol(len_tag_key) - len_tag.value = pmt.from_long(4) chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len]) - src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) - sink = blocks.vector_sink_c(fft_len) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key, True) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (chan_tag,)) + sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, self.tsb_key, True) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, len(tx_data)/fft_len, self.tsb_key), + eq, + sink + ) self.tb.run () - rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()[0]] # Check data self.assertEqual(tx_data, rx_data) # Check tags @@ -238,7 +238,6 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): else: tag_dict[ptag.key] = pmt.to_python(tag.value) expected_dict = { - 'frame_len': 4, 'ofdm_sync_chan_taps': channel[-fft_len:] } self.assertEqual(tag_dict, expected_dict) @@ -247,7 +246,7 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): """ Same as before, but the input stream has no tag. We specify the frame size in the constructor. We also specify a tag key, so the output stream *should* have - a length tag. + a TSB tag. """ fft_len = 8 n_syms = 4 @@ -274,22 +273,22 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) idx2 = idx+2*fft_len channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) - src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len) - # We do specify a length tag, it should then appear at the output - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, "frame_len", False, n_syms) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, self.tsb_key, False, n_syms) + sink = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, len(tx_data)/fft_len, self.tsb_key), + eq, + sink + ) self.tb.run () - rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()[0]] self.assertEqual(tx_data, rx_data) - # Check len tag - tags = sink.tags() - len_tag = dict() - for tag in tags: - ptag = gr.tag_to_python(tag) - if ptag.key == 'frame_len': - len_tag[ptag.key] = ptag.value - self.assertEqual(len_tag, {'frame_len': 4}) + # Check TSB Functionality + packets = sink.data() + self.assertEqual(len(packets), 1) + self.assertEqual(len(packets[0]), len(tx_data)) def test_002_static_wo_tags (self): fft_len = 8 @@ -352,27 +351,26 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) idx2 = idx+2*fft_len channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) - len_tag_key = "frame_len" - len_tag = gr.tag_t() - len_tag.offset = 0 - len_tag.key = pmt.string_to_symbol(len_tag_key) - len_tag.value = pmt.from_long(4) chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len]) - src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key, True) - sink = blocks.vector_sink_c(fft_len) - self.tb.connect(src, eq, sink) + src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (chan_tag,)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, self.tsb_key, True) + sink = blocks.tsb_vector_sink_c(fft_len, tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, len(tx_data)/fft_len, self.tsb_key), + eq, + sink + ) self.tb.run () - rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()[0]] self.assertEqual(tx_data, rx_data) - for tag in sink.tags(): - if pmt.symbol_to_string(tag.key) == len_tag_key: - self.assertEqual(pmt.to_long(tag.value), 4) - if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps": - self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:], places=1) + self.assertEqual(len(sink.tags()), 1) + tag = sink.tags()[0] + self.assertEqual(pmt.symbol_to_string(tag.key), "ofdm_sync_chan_taps") + self.assertComplexTuplesAlmostEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:], places=1) if __name__ == '__main__': diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py index 69997ce981..8a60b97882 100755 --- a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py +++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2012,2013 Free Software Foundation, Inc. +# Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -29,6 +29,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "ts_last" def tearDown (self): self.tb = None @@ -42,21 +43,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(1, 16)) + (0, 0, 0) occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, "", False) - sink = blocks.vector_sink_c() - self.tb.connect(src, serializer, sink) + src = blocks.vector_source_c(tx_symbols, False, fft_len) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key, "", 0, "", False) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 1) - result_tag = sink.tags()[0] - self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name) - self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0])) + self.assertEqual(sink.data()[0], expected_result) def test_001b_shifted (self): """ Same as before, but shifted, because that's the normal mode in OFDM Rx """ @@ -69,21 +61,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(18)) occupied_carriers = ((13, 14, 15, 1, 2, 3), (-4, -2, -1, 1, 2, 4),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name) - sink = blocks.vector_sink_c() - self.tb.connect(src, serializer, sink) + src = blocks.vector_source_c(tx_symbols, False, fft_len) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 1) - result_tag = sink.tags()[0] - self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name) - self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0])) + self.assertEqual(sink.data()[0], expected_result) def test_002_with_offset (self): """ Standard test, carrier offset """ @@ -96,32 +79,24 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(1, 16)) + (0, 0, 0) occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) offsettag = gr.tag_t() offsettag.offset = 0 offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offsettag.value = pmt.from_long(carr_offset) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag)) - sink = blocks.vector_sink_c() + src = blocks.vector_source_c(tx_symbols, False, fft_len, (offsettag,)) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) serializer = digital.ofdm_serializer_vcc( fft_len, occupied_carriers, - tag_name, + self.tsb_key, "", 0, "ofdm_sync_carr_offset", False ) - self.tb.connect(src, serializer, sink) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 2) - for tag in sink.tags(): - if pmt.symbol_to_string(tag.key) == tag_name: - self.assertEqual(pmt.to_long(tag.value), n_syms * len(occupied_carriers[0])) + self.assertEqual(sink.data()[0], expected_result) + self.assertEqual(len(sink.tags()), 1) def test_003_connect (self): """ Connect carrier_allocator to ofdm_serializer, @@ -133,19 +108,14 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): pilot_symbols = ((1j,),(-1j,)) #tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) tx_data = (1, 2, 3, 4) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_data)) - src = blocks.vector_source_c(tx_data, False, 1, (tag,)) + src = blocks.vector_source_c(tx_data, False, 1) alloc = digital.ofdm_carrier_allocator_cvc( fft_len, occupied_carriers, pilot_carriers, pilot_symbols, (), # No sync word - tag_name, + self.tsb_key, True # Output is shifted (default) ) serializer = digital.ofdm_serializer_vcc( @@ -155,10 +125,10 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): "", # Carrier offset key True # Input is shifted (default) ) - sink = blocks.vector_sink_c() - self.tb.connect(src, alloc, serializer, sink) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data), self.tsb_key), alloc, serializer, sink) self.tb.run () - self.assertEqual(sink.data(), tx_data) + self.assertEqual(sink.data()[0], tx_data) def test_004_connect (self): """ @@ -176,33 +146,30 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): pilot_carriers = ((-3,),(3,)) pilot_symbols = ((1j,),(-1j,)) tx_data = (1, 2, 3, 4) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_data)) offsettag = gr.tag_t() offsettag.offset = 0 offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offsettag.value = pmt.from_long(carr_offset) - src = blocks.vector_source_c(tx_data, False, 1, (tag, offsettag)) + src = blocks.vector_source_c(tx_data, False, 1, (offsettag,)) alloc = digital.ofdm_carrier_allocator_cvc(fft_len, occupied_carriers, pilot_carriers, pilot_symbols, (), - tag_name) + self.tsb_key) tx_ifft = fft.fft_vcc(fft_len, False, (1.0/fft_len,)*fft_len, True) oscillator = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0) mixer = blocks.multiply_cc() rx_fft = fft.fft_vcc(fft_len, True, (), True) - sink2 = blocks.vector_sink_c(fft_len) + sink2 = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key) self.tb.connect(rx_fft, sink2) serializer = digital.ofdm_serializer_vcc( alloc, "", 0, "ofdm_sync_carr_offset", True ) - sink = blocks.vector_sink_c() + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) self.tb.connect( - src, alloc, tx_ifft, + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data), self.tsb_key), + alloc, tx_ifft, blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len), (mixer, 0), blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), @@ -210,7 +177,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): ) self.tb.connect(oscillator, (mixer, 1)) self.tb.run () - self.assertComplexTuplesAlmostEqual(sink.data()[-len(occupied_carriers[0]):], tx_data, places=4) + self.assertComplexTuplesAlmostEqual(sink.data()[0][-len(occupied_carriers[0]):], tx_data, places=4) def test_005_packet_len_tag (self): """ Standard test """ @@ -222,32 +189,23 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(1, 16)) occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) + packet_len_tsb_key = "packet_len" tag2 = gr.tag_t() tag2.offset = 0 tag2.key = pmt.string_to_symbol("packet_len") tag2.value = pmt.from_long(len(expected_result)) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, "", False) - sink = blocks.vector_sink_c() - self.tb.connect(src, serializer, sink) + src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag2,)) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key, packet_len_tsb_key , 0, "", False) + sink = blocks.tsb_vector_sink_c(tsb_key=packet_len_tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 1) - result_tag = sink.tags()[0] - self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len") - self.assertEqual(pmt.to_long(result_tag.value), len(expected_result)) + self.assertEqual(sink.data()[0], expected_result) def test_099 (self): """ Make sure it fails if it should """ fft_len = 16 - occupied_carriers = ((1, 3, 4, 11, 12, 112),) - tag_name = "len" - self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name) + occupied_carriers = ((1, 3, 4, 11, 12, 112),) # Something invalid + self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, self.tsb_key) if __name__ == '__main__': diff --git a/gr-digital/python/digital/qa_packet_headergenerator_bb.py b/gr-digital/python/digital/qa_packet_headergenerator_bb.py index bec0828e4b..d2677ce220 100755 --- a/gr-digital/python/digital/qa_packet_headergenerator_bb.py +++ b/gr-digital/python/digital/qa_packet_headergenerator_bb.py @@ -1,5 +1,6 @@ #!/usr/bin/env python -# Copyright 2012 Free Software Foundation, Inc. +# +#Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -14,40 +15,35 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to +# along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # from gnuradio import gr, gr_unittest, digital, blocks +from gnuradio.gr import packet_utils import pmt class qa_packet_headergenerator_bb (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "tsb_key" def tearDown (self): self.tb = None + def setup_data_tags(self, data): + return packet_utils.packets_to_vectors( + data, + self.tsb_key + ) + def test_001_12bits (self): - # 3 PDUs: | | | - data = (1, 2, 3, 4, 1, 2) + tuple(range(25)) - tagname = "packet_len" - tag1 = gr.tag_t() - tag1.offset = 0 - tag1.key = pmt.string_to_symbol(tagname) - tag1.value = pmt.from_long(4) - tag2 = gr.tag_t() - tag2.offset = 4 - tag2.key = pmt.string_to_symbol(tagname) - tag2.value = pmt.from_long(2) - tag3 = gr.tag_t() - tag3.offset = 6 - tag3.key = pmt.string_to_symbol(tagname) - tag3.value = pmt.from_long(25) - src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) - header = digital.packet_headergenerator_bb(12, tagname) + # 3 packets: | | | + data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), tuple(range(25)))) + src = blocks.vector_source_b(data, tags=tags) + header = digital.packet_headergenerator_bb(12, self.tsb_key) sink = blocks.vector_sink_b() self.tb.connect(src, header, sink) self.tb.run() @@ -58,25 +54,11 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase): ) self.assertEqual(sink.data(), expected_data) - def test_002_32bits (self): - # 3 PDUs: | | | | - data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) - tagname = "packet_len" - tag1 = gr.tag_t() - tag1.offset = 0 - tag1.key = pmt.string_to_symbol(tagname) - tag1.value = pmt.from_long(4) - tag2 = gr.tag_t() - tag2.offset = 4 - tag2.key = pmt.string_to_symbol(tagname) - tag2.value = pmt.from_long(2) - tag3 = gr.tag_t() - tag3.offset = 6 - tag3.key = pmt.string_to_symbol(tagname) - tag3.value = pmt.from_long(4) - src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) - header = digital.packet_headergenerator_bb(32, tagname) + # 3 packets: | | | | + data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), (1, 2, 3, 4))) + src = blocks.vector_source_b(data, tags=tags) + header = digital.packet_headergenerator_bb(32, self.tsb_key) sink = blocks.vector_sink_b() self.tb.connect(src, header, sink) self.tb.run() @@ -88,26 +70,12 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase): ) self.assertEqual(sink.data(), expected_data) - def test_003_12bits_formatter_object (self): - # 3 PDUs: | | | | - data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) - tagname = "packet_len" - tag1 = gr.tag_t() - tag1.offset = 0 - tag1.key = pmt.string_to_symbol(tagname) - tag1.value = pmt.from_long(4) - tag2 = gr.tag_t() - tag2.offset = 4 - tag2.key = pmt.string_to_symbol(tagname) - tag2.value = pmt.from_long(2) - tag3 = gr.tag_t() - tag3.offset = 6 - tag3.key = pmt.string_to_symbol(tagname) - tag3.value = pmt.from_long(4) - src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) - formatter_object = digital.packet_header_default(12, tagname) - header = digital.packet_headergenerator_bb(formatter_object.formatter(), tagname) + # 3 packets: | | | | + data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), (1, 2, 3, 4))) + src = blocks.vector_source_b(data, tags=tags) + formatter_object = digital.packet_header_default(12, self.tsb_key) + header = digital.packet_headergenerator_bb(formatter_object.formatter(), self.tsb_key) sink = blocks.vector_sink_b() self.tb.connect(src, header, sink) self.tb.run() @@ -120,26 +88,13 @@ class qa_packet_headergenerator_bb (gr_unittest.TestCase): def test_004_8bits_formatter_ofdm (self): occupied_carriers = ((1, 2, 3, 5, 6, 7),) - # 3 PDUs: | | | | - data = (1, 2, 3, 4, 1, 2, 1, 2, 3, 4) - tagname = "packet_len" - tag1 = gr.tag_t() - tag1.offset = 0 - tag1.key = pmt.string_to_symbol(tagname) - tag1.value = pmt.from_long(4) - tag2 = gr.tag_t() - tag2.offset = 4 - tag2.key = pmt.string_to_symbol(tagname) - tag2.value = pmt.from_long(2) - tag3 = gr.tag_t() - tag3.offset = 6 - tag3.key = pmt.string_to_symbol(tagname) - tag3.value = pmt.from_long(4) - src = blocks.vector_source_b(data, False, 1, (tag1, tag2, tag3)) - formatter_object = digital.packet_header_ofdm(occupied_carriers, 1, tagname) + # 3 packets: | | | | + data, tags = self.setup_data_tags(((1, 2, 3, 4), (1, 2), (1, 2, 3, 4))) + src = blocks.vector_source_b(data, tags=tags) + formatter_object = digital.packet_header_ofdm(occupied_carriers, 1, self.tsb_key) self.assertEqual(formatter_object.header_len(), 6) - self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()), tagname) - header = digital.packet_headergenerator_bb(formatter_object.formatter(), tagname) + self.assertEqual(pmt.symbol_to_string(formatter_object.len_tag_key()), self.tsb_key) + header = digital.packet_headergenerator_bb(formatter_object.formatter(), self.tsb_key) sink = blocks.vector_sink_b() self.tb.connect(src, header, sink) self.tb.run() |