diff options
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py')
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py | 231 |
1 files changed, 209 insertions, 22 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py index 3b3b896b83..1cdb8ed9a4 100755 --- a/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py +++ b/gr-digital/python/digital/qa_ofdm_frame_equalizer_vcvc.py @@ -33,32 +33,156 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): self.tb = None def test_001_simple (self): - """ Very simple functionality testing """ + """ Very simple functionality testing: + - static equalizer + - init channel state with all ones + - transmit all ones + - make sure we rx all ones + - Tag check: put in frame length tag and one other random tag, + make sure they're propagated + """ fft_len = 8 equalizer = digital.ofdm_equalizer_static(fft_len) n_syms = 3 len_tag_key = "frame_len" tx_data = (1,) * fft_len * n_syms - len_tag = gr.gr_tag_t() + len_tag = gr.tag_t() len_tag.offset = 0 len_tag.key = pmt.string_to_symbol(len_tag_key) len_tag.value = pmt.from_long(n_syms) - chan_tag = gr.gr_tag_t() + chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len) - src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key) + random_tag = gr.tag_t() + random_tag.offset = 1 + random_tag.key = pmt.string_to_symbol("foo") + random_tag.value = pmt.from_long(42) + src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, random_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key) sink = blocks.vector_sink_c(fft_len) self.tb.connect(src, eq, sink) self.tb.run () # Check data self.assertEqual(tx_data, sink.data()) + # Check tags + tag_dict = dict() for tag in sink.tags(): - self.assertEqual(pmt.symbol_to_string(tag.key), len_tag_key) - self.assertEqual(pmt.to_long(tag.value), n_syms) + ptag = gr.tag_to_python(tag) + tag_dict[ptag.key] = ptag.value + expected_dict = { + 'frame_len': n_syms, + 'foo': 42 + } + self.assertEqual(tag_dict, expected_dict) + + def test_001b_simple_skip_nothing (self): + """ + Same as before, but put a skip-header in there + """ + fft_len = 8 + equalizer = digital.ofdm_equalizer_static(fft_len, symbols_skipped=1) + n_syms = 3 + len_tag_key = "frame_len" + tx_data = (1,) * fft_len * n_syms + len_tag = gr.tag_t() + len_tag.offset = 0 + len_tag.key = pmt.string_to_symbol(len_tag_key) + len_tag.value = pmt.from_long(n_syms) + chan_tag = gr.tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.init_c32vector(fft_len, (1,) * fft_len) + src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertEqual(tx_data, sink.data()) + + def test_001c_carrier_offset_no_cp (self): + """ + Same as before, but put a carrier offset in there + """ + fft_len = 8 + cp_len = 0 + n_syms = 1 + carr_offset = 1 + occupied_carriers = ((-2, -1, 1, 2),) + tx_data = ( + 0, 0, 0, -1j, -1j, 0, -1j, -1j, + ) + # The rx'd signal is shifted + rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers) + len_tag_key = "frame_len" + len_tag = gr.tag_t() + len_tag.offset = 0 + len_tag.key = pmt.string_to_symbol(len_tag_key) + len_tag.value = pmt.from_long(n_syms) + chan_tag = gr.tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") + # Note: this is shifted to the correct position! + chan_tag.value = pmt.init_c32vector(fft_len, (0, 0, -1j, -1j, 0, -1j, -1j, 0)) + offset_tag = gr.tag_t() + offset_tag.offset = 0 + offset_tag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") + offset_tag.value = pmt.from_long(carr_offset) + src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, offset_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, len_tag_key) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4) + + def test_001c_carrier_offset_cp (self): + """ + Same as before, but put a carrier offset in there and a CP + """ + fft_len = 8 + cp_len = 2 + n_syms = 3 + # cp_len/fft_len == 1/4, therefore, the phase is rotated by + # carr_offset * \pi/2 in every symbol + occupied_carriers = ((-2, -1, 1, 2),) + carr_offset = -1 + tx_data = ( + 0,-1j,-1j, 0,-1j,-1j, 0, 0, + 0, -1, -1, 0, -1, -1, 0, 0, + 0, 1j, 1j, 0, 1j, 1j, 0, 0, + ) + # Rx'd signal is corrected + rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers) + len_tag_key = "frame_len" + len_tag = gr.tag_t() + len_tag.offset = 0 + len_tag.key = pmt.string_to_symbol(len_tag_key) + len_tag.value = pmt.from_long(n_syms) + chan_tag = gr.tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.init_c32vector(fft_len, (0, 0, 1, 1, 0, 1, 1, 0)) + offset_tag = gr.tag_t() + offset_tag.offset = 0 + offset_tag.key = pmt.string_to_symbol("ofdm_sync_carr_offset") + offset_tag.value = pmt.from_long(carr_offset) + src = blocks.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, offset_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, len_tag_key) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4) def test_002_static (self): + """ + - Add a simple channel + - Make symbols QPSK + """ fft_len = 8 # 4 5 6 7 0 1 2 3 tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 @@ -75,35 +199,97 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols) channel = [ 0, 0, 1, 1, 0, 1, 1, 0, - 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly... + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly (but less than \pi/2) 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! - 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + 0, 0, 1j, 1j, 0, 1j, 1j, 0 + ] + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly (but less than \pi/2) + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 ] for idx in range(fft_len, 2*fft_len): channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) - idx2 = idx+2*fft_len - channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) len_tag_key = "frame_len" - len_tag = gr.gr_tag_t() + len_tag = gr.tag_t() len_tag.offset = 0 len_tag.key = pmt.string_to_symbol(len_tag_key) len_tag.value = pmt.from_long(4) - chan_tag = gr.gr_tag_t() + chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len]) src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True) sink = blocks.vector_sink_c(fft_len) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key, True) self.tb.connect(src, eq, sink) self.tb.run () rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + # Check data self.assertEqual(tx_data, rx_data) + # Check tags + tag_dict = dict() for tag in sink.tags(): - if pmt.symbol_to_string(tag.key) == len_tag_key: - self.assertEqual(pmt.to_long(tag.value), 4) - if pmt.symbol_to_string(tag.key) == "ofdm_sync_chan_taps": - self.assertEqual(list(pmt.c32vector_elements(tag.value)), channel[-fft_len:]) + ptag = gr.tag_to_python(tag) + tag_dict[ptag.key] = ptag.value + if ptag.key == 'ofdm_sync_chan_taps': + tag_dict[ptag.key] = list(pmt.c32vector_elements(tag.value)) + else: + tag_dict[ptag.key] = pmt.to_python(tag.value) + expected_dict = { + 'frame_len': 4, + 'ofdm_sync_chan_taps': channel[-fft_len:] + } + self.assertEqual(tag_dict, expected_dict) + + def test_002_static_wo_tags (self): + """ Same as before, but the input stream has no tag. + We specify the frame size in the constructor. + We also specify a tag key, so the output stream *should* have + a length tag. + """ + fft_len = 8 + n_syms = 4 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly (below)... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len) + # We do specify a length tag, it should then appear at the output + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, "frame_len", False, n_syms) + sink = blocks.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + # Check len tag + tags = sink.tags() + len_tag = dict() + for tag in tags: + ptag = gr.tag_to_python(tag) + if ptag.key == 'frame_len': + len_tag[ptag.key] = ptag.value + self.assertEqual(len_tag, {'frame_len': 4}) def test_002_static_wo_tags (self): fft_len = 8 @@ -131,14 +317,15 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): idx2 = idx+2*fft_len channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), "", False, 4) sink = blocks.vector_sink_c(fft_len) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, "", False, 4) self.tb.connect(src, eq, sink) self.tb.run () rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] self.assertEqual(tx_data, rx_data) def test_002_simpledfe (self): + """ Use the simple DFE equalizer. """ fft_len = 8 # 4 5 6 7 0 1 2 3 tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 @@ -166,16 +353,16 @@ class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): idx2 = idx+2*fft_len channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) len_tag_key = "frame_len" - len_tag = gr.gr_tag_t() + len_tag = gr.tag_t() len_tag.offset = 0 len_tag.key = pmt.string_to_symbol(len_tag_key) len_tag.value = pmt.from_long(4) - chan_tag = gr.gr_tag_t() + chan_tag = gr.tag_t() chan_tag.offset = 0 chan_tag.key = pmt.string_to_symbol("ofdm_sync_chan_taps") chan_tag.value = pmt.init_c32vector(fft_len, channel[:fft_len]) src = blocks.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) - eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key, True) sink = blocks.vector_sink_c(fft_len) self.tb.connect(src, eq, sink) self.tb.run () |