diff options
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_serializer_vcc.py')
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_serializer_vcc.py | 122 |
1 files changed, 83 insertions, 39 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py index 8a5e8f4893..fbef2b1f27 100755 --- a/gr-digital/python/digital/qa_ofdm_serializer_vcc.py +++ b/gr-digital/python/digital/qa_ofdm_serializer_vcc.py @@ -22,7 +22,7 @@ import numpy -from gnuradio import gr, gr_unittest, blocks. fft, analog, digital +from gnuradio import gr, gr_unittest, blocks, fft, analog, digital import pmt class qa_ofdm_serializer_vcc (gr_unittest.TestCase): @@ -36,7 +36,6 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): def test_001_simple (self): """ Standard test """ fft_len = 16 - tx_symbols = range(1, 16); tx_symbols = (0, 1, 1j, 2, 3, 0, 0, 0, 0, 0, 0, 4, 5, 2j, 6, 0, 0, 7, 8, 3j, 9, 0, 0, 0, 0, 0, 0, 10, 4j, 11, 12, 0, 0, 13, 1j, 14, 15, 0, 0, 0, 0, 0, 0, 0, 0, 2j, 0, 0) @@ -44,12 +43,39 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len tag_name = "len" - tag = gr.gr_tag_t() + tag = gr.tag_t() + tag.offset = 0 + tag.key = pmt.string_to_symbol(tag_name) + tag.value = pmt.from_long(n_syms) + src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, "", False) + sink = blocks.vector_sink_c() + self.tb.connect(src, serializer, sink) + self.tb.run () + self.assertEqual(sink.data(), expected_result) + self.assertEqual(len(sink.tags()), 1) + result_tag = sink.tags()[0] + self.assertEqual(pmt.symbol_to_string(result_tag.key), tag_name) + self.assertEqual(pmt.to_long(result_tag.value), n_syms * len(occupied_carriers[0])) + + def test_001b_shifted (self): + """ Same as before, but shifted, because that's the normal mode in OFDM Rx """ + fft_len = 16 + tx_symbols = ( + 0, 0, 0, 0, 0, 0, 1, 2, 0, 3, 4, 5, 0, 0, 0, 0, + 0, 0, 0, 0, 6, 1j, 7, 8, 0, 9, 10, 1j, 11, 0, 0, 0, + 0, 0, 0, 0, 0, 12, 13, 14, 0, 15, 16, 17, 0, 0, 0, 0, + ) + expected_result = tuple(range(18)) + occupied_carriers = ((13, 14, 15, 1, 2, 3), (-4, -2, -1, 1, 2, 4),) + n_syms = len(tx_symbols)/fft_len + tag_name = "len" + tag = gr.tag_t() tag.offset = 0 tag.key = pmt.string_to_symbol(tag_name) tag.value = pmt.from_long(n_syms) src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag,)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name) sink = blocks.vector_sink_c() self.tb.connect(src, serializer, sink) self.tb.run () @@ -71,17 +97,24 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len tag_name = "len" - tag = gr.gr_tag_t() + tag = gr.tag_t() tag.offset = 0 tag.key = pmt.string_to_symbol(tag_name) tag.value = pmt.from_long(n_syms) - offsettag = gr.gr_tag_t() + offsettag = gr.tag_t() offsettag.offset = 0 offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offsettag.value = pmt.from_long(carr_offset) src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, offsettag)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "", 0, False) sink = blocks.vector_sink_c() + serializer = digital.ofdm_serializer_vcc( + fft_len, + occupied_carriers, + tag_name, + "", 0, + "ofdm_sync_carr_offset", + False + ) self.tb.connect(src, serializer, sink) self.tb.run () self.assertEqual(sink.data(), expected_result) @@ -93,29 +126,39 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): def test_003_connect (self): """ Connect carrier_allocator to ofdm_serializer, make sure output==input """ - fft_len = 32 - n_syms = 10 + fft_len = 8 + n_syms = 1 occupied_carriers = ((1, 2, 6, 7),) pilot_carriers = ((3,),(5,)) pilot_symbols = ((1j,),(-1j,)) - sync_word = (range(fft_len),) - tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) + #tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) + tx_data = (1, 2, 3, 4) tag_name = "len" - tag = gr.gr_tag_t() + tag = gr.tag_t() tag.offset = 0 tag.key = pmt.string_to_symbol(tag_name) tag.value = pmt.from_long(len(tx_data)) src = blocks.vector_source_c(tx_data, False, 1, (tag,)) - alloc = digital.ofdm_carrier_allocator_cvc(fft_len, - occupied_carriers, - pilot_carriers, - pilot_symbols, sync_word, - tag_name) - serializer = digital.ofdm_serializer_vcc(alloc) + alloc = digital.ofdm_carrier_allocator_cvc( + fft_len, + occupied_carriers, + pilot_carriers, + pilot_symbols, + (), # No sync word + tag_name, + True # Output is shifted (default) + ) + serializer = digital.ofdm_serializer_vcc( + alloc, + "", # Len tag key + 0, # Symbols skipped + "", # Carrier offset key + True # Input is shifted (default) + ) sink = blocks.vector_sink_c() self.tb.connect(src, alloc, serializer, sink) self.tb.run () - self.assertEqual(sink.data()[4:], tx_data) + self.assertEqual(sink.data(), tx_data) def test_004_connect (self): """ @@ -126,21 +169,19 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): - Frequency offset is -2 carriers """ fft_len = 8 - n_syms = 2 + n_syms = 1 carr_offset = -2 - freq_offset = 2 * numpy.pi * carr_offset / fft_len # If the sampling rate == 1 - occupied_carriers = ((1, 2, -2, -1),) - pilot_carriers = ((3,),(5,)) + freq_offset = 1.0 / fft_len * carr_offset # Normalized frequency + occupied_carriers = ((-2, -1, 1, 2),) + pilot_carriers = ((-3,),(3,)) pilot_symbols = ((1j,),(-1j,)) - sync_word = (range(fft_len),) - tx_data = tuple([numpy.random.randint(0, 10) for x in range(4 * n_syms)]) - #tx_data = (1,) * occupied_carriers[0] * n_syms + tx_data = (1, 2, 3, 4) tag_name = "len" - tag = gr.gr_tag_t() + tag = gr.tag_t() tag.offset = 0 tag.key = pmt.string_to_symbol(tag_name) tag.value = pmt.from_long(len(tx_data)) - offsettag = gr.gr_tag_t() + offsettag = gr.tag_t() offsettag.offset = 0 offsettag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") offsettag.value = pmt.from_long(carr_offset) @@ -148,13 +189,17 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): alloc = digital.ofdm_carrier_allocator_cvc(fft_len, occupied_carriers, pilot_carriers, - pilot_symbols, sync_word, + pilot_symbols, (), tag_name) - tx_ifft = fft.fft_vcc(fft_len, False, ()) - offset_sig = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0) + tx_ifft = fft.fft_vcc(fft_len, False, (1.0/fft_len,)*fft_len, True) + oscillator = analog.sig_source_c(1.0, analog.GR_COS_WAVE, freq_offset, 1.0/fft_len) mixer = blocks.multiply_cc() rx_fft = fft.fft_vcc(fft_len, True, (), True) - serializer = digital.ofdm_serializer_vcc(alloc) + sink2 = blocks.vector_sink_c(fft_len) + self.tb.connect(rx_fft, sink2) + serializer = digital.ofdm_serializer_vcc( + alloc, "", 0, "ofdm_sync_carr_offset", True + ) sink = blocks.vector_sink_c() self.tb.connect( src, alloc, tx_ifft, @@ -163,10 +208,9 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): blocks.stream_to_vector(gr.sizeof_gr_complex, fft_len), rx_fft, serializer, sink ) - self.tb.connect(offset_sig, (mixer, 1)) + self.tb.connect(oscillator, (mixer, 1)) self.tb.run () - # FIXME check this - #self.assertEqual(sink.data(), tx_data) + self.assertComplexTuplesAlmostEqual(sink.data()[-len(occupied_carriers[0]):], tx_data, places=4) def test_005_packet_len_tag (self): """ Standard test """ @@ -179,16 +223,16 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): occupied_carriers = ((1, 3, 4, 11, 12, 14), (1, 2, 4, 11, 13, 14),) n_syms = len(tx_symbols)/fft_len tag_name = "len" - tag = gr.gr_tag_t() + tag = gr.tag_t() tag.offset = 0 tag.key = pmt.string_to_symbol(tag_name) tag.value = pmt.from_long(n_syms) - tag2 = gr.gr_tag_t() + tag2 = gr.tag_t() tag2.offset = 0 tag2.key = pmt.string_to_symbol("packet_len") tag2.value = pmt.from_long(len(expected_result)) src = blocks.vector_source_c(tx_symbols, False, fft_len, (tag, tag2)) - serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, False) + serializer = digital.ofdm_serializer_vcc(fft_len, occupied_carriers, tag_name, "packet_len", 0, "", False) sink = blocks.vector_sink_c() self.tb.connect(src, serializer, sink) self.tb.run () @@ -201,7 +245,7 @@ class qa_ofdm_serializer_vcc (gr_unittest.TestCase): def test_099 (self): """ Make sure it fails if it should """ fft_len = 16 - occupied_carriers = ((1, 3, 4, 11, 12, 17),) + occupied_carriers = ((1, 3, 4, 11, 12, 112),) tag_name = "len" self.assertRaises(RuntimeError, digital.ofdm_serializer_vcc, fft_len, occupied_carriers, tag_name) |