diff options
Diffstat (limited to 'gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py')
-rwxr-xr-x | gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py new file mode 100755 index 0000000000..6f44fd00fb --- /dev/null +++ b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import numpy +from gnuradio import gr, gr_unittest +try: import pmt +except: from gruel import pmt +import digital_swig as digital + +class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_simple (self): + """ Very simple functionality testing """ + fft_len = 8 + equalizer = digital.ofdm_equalizer_static(fft_len) + n_syms = 3 + len_tag_key = "frame_len" + tx_data = (1,) * fft_len * n_syms + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(n_syms) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, (1,) * fft_len) + src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + # Check data + self.assertEqual(tx_data, sink.data()) + for tag in sink.tags(): + self.assertEqual(pmt.pmt_symbol_to_string(tag.key), len_tag_key) + self.assertEqual(pmt.pmt_to_long(tag.value), n_syms) + + def test_002_static (self): + fft_len = 8 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(4) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len]) + src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + for tag in sink.tags(): + if pmt.pmt_symbol_to_string(tag.key) == len_tag_key: + self.assertEqual(pmt.pmt_to_long(tag.value), 4) + if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps": + self.assertEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:]) + + def test_002_simpledfe (self): + fft_len = 8 + # 4 5 6 7 0 1 2 3 + tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0 + -1, -1, 0, 2, -1, 2, 0, -1, # 8 + -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols) + -1, -1, 1, 1, -1, 0, 2, -1] # 24 + cnst = digital.constellation_qpsk() + tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data] + occupied_carriers = ((1, 2, 6, 7),) + pilot_carriers = ((), (), (1, 2, 6, 7), ()) + pilot_symbols = ( + [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], [] + ) + equalizer = digital.ofdm_equalizer_simpledfe( + fft_len, cnst.base(), occupied_carriers, pilot_carriers, pilot_symbols, 0, 0.01 + ) + channel = [ + 0, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 1, 1, 0, 1, 1, 0, # These coefficients will be rotated slightly... + 0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here! + 0, 0, 1j, 1j, 0, 1j, 1j, 0 # ...and again here. + ] + for idx in range(fft_len, 2*fft_len): + channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5)) + idx2 = idx+2*fft_len + channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5)) + len_tag_key = "frame_len" + len_tag = gr.gr_tag_t() + len_tag.offset = 0 + len_tag.key = pmt.pmt_string_to_symbol(len_tag_key) + len_tag.value = pmt.pmt_from_long(4) + chan_tag = gr.gr_tag_t() + chan_tag.offset = 0 + chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps") + chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len]) + src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag)) + eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True) + sink = gr.vector_sink_c(fft_len) + self.tb.connect(src, eq, sink) + self.tb.run () + rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()] + self.assertEqual(tx_data, rx_data) + for tag in sink.tags(): + if pmt.pmt_symbol_to_string(tag.key) == len_tag_key: + self.assertEqual(pmt.pmt_to_long(tag.value), 4) + if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps": + self.assertComplexTuplesAlmostEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:], places=1) + + +if __name__ == '__main__': + gr_unittest.run(qa_ofdm_frame_equalizer_vcvc, "qa_ofdm_frame_equalizer_vcvc.xml") + |