diff options
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_serializer_vcc.py')
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_serializer_vcc.py | 120 |
1 files changed, 39 insertions, 81 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py index 69997ce981..8a60b97882 100755 --- a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py +++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2012,2013 Free Software Foundation, Inc. +# Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -29,6 +29,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "ts_last" def tearDown (self): self.tb = None @@ -42,21 +43,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(1, 16)) + (0, 0, 0) occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, "", False) - sink = blocks.vector_sink_c() - self.tb.connect(src, serializer, sink) + src = blocks.vector_source_c(tx_symbols, False, fft_len) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key, "", 0, "", False) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 1) - result_tag = sink.tags()[0] - self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name) - self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0])) + self.assertEqual(sink.data()[0], expected_result) def test_001b_shifted (self): """ Same as before, but shifted, because that's the normal mode in OFDM Rx """ @@ -69,21 +61,12 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(18)) occupied_carriers = ((13, 14, 15, 1, 2, 3), (-4, -2, -1, 1, 2, 4),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name) - sink = blocks.vector_sink_c() - self.tb.connect(src, serializer, sink) + src = blocks.vector_source_c(tx_symbols, False, fft_len) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 1) - result_tag = sink.tags()[0] - self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name) - self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0])) + self.assertEqual(sink.data()[0], expected_result) def test_002_with_offset (self): """ Standard test, carrier offset """ @@ -96,32 +79,24 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(1, 16)) + (0, 0, 0) occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) offsettag = gr.tag_t() offsettag.offset = 0 offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offsettag.value = pmt.from_long(carr_offset) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag)) - sink = blocks.vector_sink_c() + src = blocks.vector_source_c(tx_symbols, False, fft_len, (offsettag,)) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) serializer = digital.ofdm_serializer_vcc( fft_len, occupied_carriers, - tag_name, + self.tsb_key, "", 0, "ofdm_sync_carr_offset", False ) - self.tb.connect(src, serializer, sink) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 2) - for tag in sink.tags(): - if pmt.symbol_to_string(tag.key) == tag_name: - self.assertEqual(pmt.to_long(tag.value), n_syms * len(occupied_carriers[0])) + self.assertEqual(sink.data()[0], expected_result) + self.assertEqual(len(sink.tags()), 1) def test_003_connect (self): """ Connect carrier_allocator to ofdm_serializer, @@ -133,19 +108,14 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): pilot_symbols = ((1j,),(-1j,)) #tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) tx_data = (1, 2, 3, 4) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_data)) - src = blocks.vector_source_c(tx_data, False, 1, (tag,)) + src = blocks.vector_source_c(tx_data, False, 1) alloc = digital.ofdm_carrier_allocator_cvc( fft_len, occupied_carriers, pilot_carriers, pilot_symbols, (), # No sync word - tag_name, + self.tsb_key, True # Output is shifted (default) ) serializer = digital.ofdm_serializer_vcc( @@ -155,10 +125,10 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): "", # Carrier offset key True # Input is shifted (default) ) - sink = blocks.vector_sink_c() - self.tb.connect(src, alloc, serializer, sink) + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data), self.tsb_key), alloc, serializer, sink) self.tb.run () - self.assertEqual(sink.data(), tx_data) + self.assertEqual(sink.data()[0], tx_data) def test_004_connect (self): """ @@ -176,33 +146,30 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): pilot_carriers = ((-3,),(3,)) pilot_symbols = ((1j,),(-1j,)) tx_data = (1, 2, 3, 4) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(tx_data)) offsettag = gr.tag_t() offsettag.offset = 0 offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offsettag.value = pmt.from_long(carr_offset) - src = blocks.vector_source_c(tx_data, False, 1, (tag, offsettag)) + src = blocks.vector_source_c(tx_data, False, 1, (offsettag,)) alloc = digital.ofdm_carrier_allocator_cvc(fft_len, occupied_carriers, pilot_carriers, pilot_symbols, (), - tag_name) + self.tsb_key) tx_ifft = fft.fft_vcc(fft_len, False, (1.0/fft_len,)*fft_len, True) oscillator = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0) mixer = blocks.multiply_cc() rx_fft = fft.fft_vcc(fft_len, True, (), True) - sink2 = blocks.vector_sink_c(fft_len) + sink2 = blocks.tsb_vector_sink_c(vlen=fft_len, tsb_key=self.tsb_key) self.tb.connect(rx_fft, sink2) serializer = digital.ofdm_serializer_vcc( alloc, "", 0, "ofdm_sync_carr_offset", True ) - sink = blocks.vector_sink_c() + sink = blocks.tsb_vector_sink_c(tsb_key=self.tsb_key) self.tb.connect( - src, alloc, tx_ifft, + src, + blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, 1, len(tx_data), self.tsb_key), + alloc, tx_ifft, blocks.vector_to_stream(gr.sizeof_gr_complex, fft_len), (mixer, 0), blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), @@ -210,7 +177,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): ) self.tb.connect(oscillator, (mixer, 1)) self.tb.run () - self.assertComplexTuplesAlmostEqual(sink.data()[-len(occupied_carriers[0]):], tx_data, places=4) + self.assertComplexTuplesAlmostEqual(sink.data()[0][-len(occupied_carriers[0]):], tx_data, places=4) def test_005_packet_len_tag (self): """ Standard test """ @@ -222,32 +189,23 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): expected_result = tuple(range(1, 16)) occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(n_syms) + packet_len_tsb_key = "packet_len" tag2 = gr.tag_t() tag2.offset = 0 tag2.key = pmt.string_to_symbol("packet_len") tag2.value = pmt.from_long(len(expected_result)) - src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, "", False) - sink = blocks.vector_sink_c() - self.tb.connect(src, serializer, sink) + src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag2,)) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, self.tsb_key, packet_len_tsb_key , 0, "", False) + sink = blocks.tsb_vector_sink_c(tsb_key=packet_len_tsb_key) + self.tb.connect(src, blocks.stream_to_tagged_stream(gr.sizeof_gr_complex, fft_len, n_syms, self.tsb_key), serializer, sink) self.tb.run () - self.assertEqual(sink.data(), expected_result) - self.assertEqual(len(sink.tags()), 1) - result_tag = sink.tags()[0] - self.assertEqual(pmt.symbol_to_string(result_tag.key), "packet_len") - self.assertEqual(pmt.to_long(result_tag.value), len(expected_result)) + self.assertEqual(sink.data()[0], expected_result) def test_099 (self): """ Make sure it fails if it should """ fft_len = 16 - occupied_carriers = ((1, 3, 4, 11, 12, 112),) - tag_name = "len" - self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name) + occupied_carriers = ((1, 3, 4, 11, 12, 112),) # Something invalid + self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, self.tsb_key) if __name__ == '__main__': |