From f968b36d4ed2b194111585605f9a1b5367638fb3 Mon Sep 17 00:00:00 2001
From: Martin Braun <martin.braun@kit.edu>
Date: Fri, 15 Mar 2013 02:12:20 -0700
Subject: Squash/rebased martin/ofdm-master onto trial merge branch

Conflicts:
	gr-blocks/include/blocks/CMakeLists.txt
---
 gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py | 161 ++++++++++++++++++++++
 1 file changed, 161 insertions(+)
 create mode 100755 gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py

(limited to 'gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py')

diff --git a/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
new file mode 100755
index 000000000..6f44fd00f
--- /dev/null
+++ b/gr-digital/python/qa_ofdm_frame_equalizer_vcvc.py
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+# Copyright 2012 Free Software Foundation, Inc.
+# 
+# This file is part of GNU Radio
+# 
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+# 
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING.  If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+# 
+
+import numpy
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import digital_swig as digital
+
+class qa_ofdm_frame_equalizer_vcvc (gr_unittest.TestCase):
+
+    def setUp (self):
+        self.tb = gr.top_block ()
+
+    def tearDown (self):
+        self.tb = None
+
+    def test_001_simple (self):
+        """ Very simple functionality testing """
+        fft_len = 8
+        equalizer = digital.ofdm_equalizer_static(fft_len)
+        n_syms = 3
+        len_tag_key = "frame_len"
+        tx_data = (1,) * fft_len * n_syms
+        len_tag = gr.gr_tag_t()
+        len_tag.offset = 0
+        len_tag.key = pmt.pmt_string_to_symbol(len_tag_key)
+        len_tag.value = pmt.pmt_from_long(n_syms)
+        chan_tag = gr.gr_tag_t()
+        chan_tag.offset = 0
+        chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps")
+        chan_tag.value = pmt.pmt_init_c32vector(fft_len, (1,) * fft_len)
+        src = gr.vector_source_c(tx_data, False, fft_len, (len_tag, chan_tag))
+        eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key)
+        sink = gr.vector_sink_c(fft_len)
+        self.tb.connect(src, eq, sink)
+        self.tb.run ()
+        # Check data
+        self.assertEqual(tx_data, sink.data())
+        for tag in sink.tags():
+            self.assertEqual(pmt.pmt_symbol_to_string(tag.key), len_tag_key)
+            self.assertEqual(pmt.pmt_to_long(tag.value), n_syms)
+
+    def test_002_static (self):
+        fft_len = 8
+        #           4   5  6  7   0  1  2   3
+        tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0
+                   -1, -1, 0, 2, -1, 2, 0, -1, # 8
+                   -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols)
+                   -1, -1, 1, 1, -1, 0, 2, -1] # 24
+        cnst = digital.constellation_qpsk()
+        tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data]
+        occupied_carriers = ((1, 2, 6, 7),)
+        pilot_carriers = ((), (), (1, 2, 6, 7), ())
+        pilot_symbols = (
+                [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], []
+        )
+        equalizer = digital.ofdm_equalizer_static(fft_len, occupied_carriers, pilot_carriers, pilot_symbols)
+        channel = [
+            0, 0,  1,  1, 0,  1,  1, 0,
+            0, 0,  1,  1, 0,  1,  1, 0, # These coefficients will be rotated slightly...
+            0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here!
+            0, 0, 1j, 1j, 0, 1j, 1j, 0  # ...and again here.
+        ]
+        for idx in range(fft_len, 2*fft_len):
+            channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5))
+            idx2 = idx+2*fft_len
+            channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5))
+        len_tag_key = "frame_len"
+        len_tag = gr.gr_tag_t()
+        len_tag.offset = 0
+        len_tag.key = pmt.pmt_string_to_symbol(len_tag_key)
+        len_tag.value = pmt.pmt_from_long(4)
+        chan_tag = gr.gr_tag_t()
+        chan_tag.offset = 0
+        chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps")
+        chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len])
+        src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag))
+        eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True)
+        sink = gr.vector_sink_c(fft_len)
+        self.tb.connect(src, eq, sink)
+        self.tb.run ()
+        rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()]
+        self.assertEqual(tx_data, rx_data)
+        for tag in sink.tags():
+            if pmt.pmt_symbol_to_string(tag.key) == len_tag_key:
+                self.assertEqual(pmt.pmt_to_long(tag.value), 4)
+            if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
+                self.assertEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:])
+
+    def test_002_simpledfe (self):
+        fft_len = 8
+        #           4   5  6  7   0  1  2   3
+        tx_data = [-1, -1, 1, 2, -1, 3, 0, -1, # 0
+                   -1, -1, 0, 2, -1, 2, 0, -1, # 8
+                   -1, -1, 3, 0, -1, 1, 0, -1, # 16 (Pilot symbols)
+                   -1, -1, 1, 1, -1, 0, 2, -1] # 24
+        cnst = digital.constellation_qpsk()
+        tx_signal = [cnst.map_to_points_v(x)[0] if x != -1 else 0 for x in tx_data]
+        occupied_carriers = ((1, 2, 6, 7),)
+        pilot_carriers = ((), (), (1, 2, 6, 7), ())
+        pilot_symbols = (
+                [], [], [cnst.map_to_points_v(x)[0] for x in (1, 0, 3, 0)], []
+        )
+        equalizer = digital.ofdm_equalizer_simpledfe(
+            fft_len, cnst.base(), occupied_carriers, pilot_carriers, pilot_symbols, 0, 0.01
+        )
+        channel = [
+            0, 0,  1,  1, 0,  1,  1, 0,
+            0, 0,  1,  1, 0,  1,  1, 0, # These coefficients will be rotated slightly...
+            0, 0, 1j, 1j, 0, 1j, 1j, 0, # Go crazy here!
+            0, 0, 1j, 1j, 0, 1j, 1j, 0  # ...and again here.
+        ]
+        for idx in range(fft_len, 2*fft_len):
+            channel[idx] = channel[idx-fft_len] * numpy.exp(1j * .1 * numpy.pi * (numpy.random.rand()-.5))
+            idx2 = idx+2*fft_len
+            channel[idx2] = channel[idx2] * numpy.exp(1j * 0 * numpy.pi * (numpy.random.rand()-.5))
+        len_tag_key = "frame_len"
+        len_tag = gr.gr_tag_t()
+        len_tag.offset = 0
+        len_tag.key = pmt.pmt_string_to_symbol(len_tag_key)
+        len_tag.value = pmt.pmt_from_long(4)
+        chan_tag = gr.gr_tag_t()
+        chan_tag.offset = 0
+        chan_tag.key = pmt.pmt_string_to_symbol("ofdm_sync_chan_taps")
+        chan_tag.value = pmt.pmt_init_c32vector(fft_len, channel[:fft_len])
+        src = gr.vector_source_c(numpy.multiply(tx_signal, channel), False, fft_len, (len_tag, chan_tag))
+        eq = digital.ofdm_frame_equalizer_vcvc(equalizer.base(), len_tag_key, True)
+        sink = gr.vector_sink_c(fft_len)
+        self.tb.connect(src, eq, sink)
+        self.tb.run ()
+        rx_data = [cnst.decision_maker_v((x,)) if x != 0 else -1 for x in sink.data()]
+        self.assertEqual(tx_data, rx_data)
+        for tag in sink.tags():
+            if pmt.pmt_symbol_to_string(tag.key) == len_tag_key:
+                self.assertEqual(pmt.pmt_to_long(tag.value), 4)
+            if pmt.pmt_symbol_to_string(tag.key) == "ofdm_sync_chan_taps":
+                self.assertComplexTuplesAlmostEqual(list(pmt.pmt_c32vector_elements(tag.value)), channel[-fft_len:], places=1)
+
+
+if __name__ == '__main__':
+    gr_unittest.run(qa_ofdm_frame_equalizer_vcvc, "qa_ofdm_frame_equalizer_vcvc.xml")
+
-- 
cgit v1.2.3