diff options
Diffstat (limited to 'gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py')
-rwxr-xr-x | gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py new file mode 100755 index 0000000000..36ae9af29e --- /dev/null +++ b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import numpy +from gnuradio import gr, gr_unittest +try: import pmt +except: from gruel import pmt +import digital_swig as digital +import blocks_swig as blocks + +class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_simple (self): + """ Very simple functionality testing: + - static equalizer + - init channel state with all ones + - transmit all ones + - make sure we rx all ones + - Tag check: put in frame length tag and one other random tag, + make sure they're propagated + """ + fft_len = 8 + equalizer = digital.ofdm_equalizer_static(fft_len) + n_syms = 3 + len_tag_key = "frame_len" + tx_data = (1,) * fft_len * n_syms + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(n_syms) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, (1,) * fft_len) + random_tag = gr.gr_tag_t() + random_tag.offset = 1 + random_tag.key = pmt.pmt_string_to_symbol("foo") + random_tag.value = pmt.pmt_from_long(42) + src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, random_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertEqual(tx_data, sink.data()) + # Check tags + tag_dict = dict() + for tag in sink.tags(): + ptag = gr.tag_to_python(tag) + tag_dict[ptag.key] = ptag.value + expected_dict = { + 'frame_len': n_syms, + 'foo': 42 + } + self.assertEqual(tag_dict, expected_dict) + + def test_001b_simple_skip_nothing (self): + """ + Same as before, but put a skip-header in there + """ + fft_len = 8 + equalizer = digital.ofdm_equalizer_static(fft_len, symbols_skipped=1) + n_syms = 3 + len_tag_key = "frame_len" + tx_data = (1,) * fft_len * n_syms + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(n_syms) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, (1,) * fft_len) + src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertEqual(tx_data, sink.data()) + + def test_001c_carrier_offset_no_cp (self): + """ + Same as before, but put a carrier offset in there + """ + fft_len = 8 + cp_len = 0 + n_syms = 1 + carr_offset = 1 + occupied_carriers = ((-2, -1, 1, 2),) + tx_data = ( + 0, 0, 0, -1j, -1j, 0, -1j, -1j, + ) + # The rx'd signal is shifted + rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(n_syms) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + # Note: this is shifted to the correct position! + chan_tag.value = pmt.pmt_init_c32vector(fft_len, (0, 0, -1j, -1j, 0, -1j, -1j, 0)) + offset_tag = gr.gr_tag_t() + offset_tag.offset = 0 + offset_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_carr_offset") + offset_tag.value = pmt.pmt_from_long(carr_offset) + src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, offset_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, len_tag_key) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4) + + def test_001c_carrier_offset_cp (self): + """ + Same as before, but put a carrier offset in there and a CP + """ + fft_len = 8 + cp_len = 2 + n_syms = 3 + # cp_len/fft_len == 1/4, therefore, the phase is rotated by + # carr_offset * \pi/2 in every symbol + occupied_carriers = ((-2, -1, 1, 2),) + carr_offset = -1 + tx_data = ( + 0,-1j,-1j, 0,-1j,-1j, 0, 0, + 0, -1, -1, 0, -1, -1, 0, 0, + 0, 1j, 1j, 0, 1j, 1j, 0, 0, + ) + # Rx'd signal is corrected + rx_expected = (0, 0, 1, 1, 0, 1, 1, 0) * n_syms + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(n_syms) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, (0, 0, 1, 1, 0, 1, 1, 0)) + offset_tag = gr.gr_tag_t() + offset_tag.offset = 0 + offset_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_carr_offset") + offset_tag.value = pmt.pmt_from_long(carr_offset) + src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag, offset_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), cp_len, len_tag_key) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertComplexTuplesAlmostEqual(rx_expected, sink.data(), places=4) + + def test_002_static (self): + """ + - Add a simple channel + - Make symbols QPSK + """ + fft_len = 8 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly (but less than \pi/2) + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 + ] + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly (but less than \pi/2) + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(4) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len]) + src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key, True) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + # Check data + self.assertEqual(tx_data, rx_data) + # Check tags + tag_dict = dict() + for tag in sink.tags(): + ptag = gr.tag_to_python(tag) + tag_dict[ptag.key] = ptag.value + if ptag.key == 'ofdm_sync_chan_taps': + tag_dict[ptag.key] = list(pmt.pmt_c32vector_elements(tag.value)) + else: + tag_dict[ptag.key] = pmt.to_python(tag.value) + expected_dict = { + 'frame_len': 4, + 'ofdm_sync_chan_taps': channel[-fft_len:] + } + self.assertEqual(tag_dict, expected_dict) + + def test_002_static_wo_tags (self): + """ Same as before, but the input stream has no tag. + We specify the frame size in the constructor. + We also specify a tag key, so the output stream *should* have + a length tag. + """ + fft_len = 8 + n_syms = 4 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly (below)... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len) + # We do specify a length tag, it should then appear at the output + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, "frame_len", False, n_syms) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + # Check len tag + tags = sink.tags() + len_tag = dict() + for tag in tags: + ptag = gr.tag_to_python(tag) + if ptag.key == 'frame_len': + len_tag[ptag.key] = ptag.value + self.assertEqual(len_tag, {'frame_len': 4}) + + def test_002_simpledfe (self): + """ Use the simple DFE equalizer. """ + fft_len = 8 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_simpledfe( + fft_len, cnst.base(), occupied_carriers, pilot_carriers, pilot_symbols, 0, 0.01 + ) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(4) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len]) + src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), 0, len_tag_key, True) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + for tag in sink.tags(): + if pmt.pmt_symbol_to_string(tag.key) == len_tag_key: + self.assertEqual(pmt.pmt_to_long(tag.value), 4) + if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps": + self.assertComplexTuplesAlmostEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:], places=1) + + +if __name__ == '__main__': + gr_unittest.run(qa_ofdm_frame_equalizer_vcvc, "qa_ofdm_frame_equalizer_vcvc.xml") + |