summaryrefslogtreecommitdiff
path: root/gr-digital/python/qa_ofdm_chanest_vcvc.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/qa_ofdm_chanest_vcvc.py')
-rwxr-xr-xgr-digital/python/qa_ofdm_chanest_vcvc.py26
1 files changed, 17 insertions, 9 deletions
diff --git a/gr-digital/python/qa_ofdm_chanest_vcvc.py b/gr-digital/python/qa_ofdm_chanest_vcvc.py
index b11bb4c556..ae11c0534c 100755
--- a/gr-digital/python/qa_ofdm_chanest_vcvc.py
+++ b/gr-digital/python/qa_ofdm_chanest_vcvc.py
@@ -19,14 +19,16 @@
# Boston, MA 02110-1301, USA.
#
+import sys
+import numpy
+import random
from gnuradio import gr, gr_unittest
import pmt
import blocks_swig as blocks
import analog_swig as analog
import digital_swig as digital
-import sys
-import numpy
-import random
+import blocks_swig as blocks
+from ofdm_txrx import ofdm_tx
def shift_tuple(vec, N):
""" Shifts a vector by N elements. Fills up with zeros. """
@@ -129,7 +131,9 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
sink = blocks.vector_sink_c(fft_len)
+ sink_chanest = blocks.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
+ self.tb.connect((chanest, 1), sink_chanest)
self.tb.run()
tags = sink.tags()
self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel)))
@@ -138,6 +142,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
self.assertEqual(pmt.to_long(tag.value), carr_offset)
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
self.assertEqual(pmt.c32vector_elements(tag.value), channel)
+ self.assertEqual(sink_chanest.data(), channel)
def test_004_channel_no_carroffset_1sym (self):
""" Add a channel, check if it's correctly estimated.
@@ -147,13 +152,17 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
tx_data = sync_symbol + data_symbol
- channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
+ channel = (0, 0, 0, 2, 2, 2, 2, 3, 3, 2.5, 2.5, -3, -3, 1j, 1j, 0)
+ #channel = (0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0)
src = blocks.vector_source_c(tx_data, False, fft_len)
chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1)
sink = blocks.vector_sink_c(fft_len)
+ sink_chanest = blocks.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
+ self.tb.connect((chanest, 1), sink_chanest)
self.tb.run()
+ self.assertEqual(sink_chanest.data(), channel)
tags = sink.tags()
for tag in tags:
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
@@ -194,6 +203,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
# Channel 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Shifted (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1)
+ chanest_exp = (0, 0, 0, 5, 6, 7, 8, 9, 0, 11, 12, 13, 14, 15, 0, 0)
tx_data = shift_tuple(sync_symbol1, carr_offset) + \
shift_tuple(sync_symbol2, carr_offset) + \
shift_tuple(data_symbol, carr_offset)
@@ -211,9 +221,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
self.assertEqual(pmt.to_long(tag.value), carr_offset)
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
chan_est = pmt.c32vector_elements(tag.value)
- for i in range(fft_len):
- if shift_tuple(sync_symbol2, carr_offset)[i]: # Only here the channel can be estimated
- self.assertEqual(chan_est[i], channel[i])
+ self.assertEqual(chan_est, chanest_exp)
self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel)))
@@ -227,7 +235,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
wgn_amplitude = 0.05
min_chan_ampl = 0.1
max_chan_ampl = 5
- n_iter = 20
+ n_iter = 20 # The more the accurater
def run_flow_graph(sync_sym1, sync_sym2, data_sym):
top_block = gr.top_block()
carr_offset = random.randint(-max_offset/2, max_offset/2) * 2
@@ -253,7 +261,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
carr_offset_hat = pmt.to_long(tag.value)
self.assertEqual(carr_offset, carr_offset_hat)
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- channel_est = pmt.c32vector_elements(tag.value)
+ channel_est = shift_tuple(pmt.c32vector_elements(tag.value), carr_offset)
shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
for i in range(fft_len):
if shifted_carrier_mask[i] and channel_est[i]: