summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_chanest_vcvc.py')
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_chanest_vcvc.py23
1 files changed, 15 insertions, 8 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
index c9e2bacd0d..e6b331b2a8 100755
--- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
+++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
@@ -20,6 +20,7 @@
#
import sys
+import numpy
import random
import numpy
@@ -60,11 +61,11 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
tx_data = shift_tuple(sync_symbol1, carr_offset) + \
shift_tuple(sync_symbol2, carr_offset) + \
shift_tuple(data_symbol, carr_offset)
- tag1 = gr.gr_tag_t()
+ tag1 = gr.tag_t()
tag1.offset = 0
tag1.key = pmt.string_to_symbol("test_tag_1")
tag1.value = pmt.from_long(23)
- tag2 = gr.gr_tag_t()
+ tag2 = gr.tag_t()
tag2.offset = 2
tag2.key = pmt.string_to_symbol("test_tag_2")
tag2.value = pmt.from_long(42)
@@ -128,7 +129,9 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1)
sink = blocks.vector_sink_c(fft_len)
+ sink_chanest = blocks.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
+ self.tb.connect((chanest, 1), sink_chanest)
self.tb.run()
tags = sink.tags()
self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel)))
@@ -137,6 +140,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
self.assertEqual(pmt.to_long(tag.value), carr_offset)
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
self.assertEqual(pmt.c32vector_elements(tag.value), channel)
+ self.assertEqual(sink_chanest.data(), channel)
def test_004_channel_no_carroffset_1sym (self):
""" Add a channel, check if it's correctly estimated.
@@ -146,13 +150,17 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0)
data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
tx_data = sync_symbol + data_symbol
- channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0)
+ channel = (0, 0, 0, 2, 2, 2, 2, 3, 3, 2.5, 2.5, -3, -3, 1j, 1j, 0)
+ #channel = (0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0)
src = blocks.vector_source_c(tx_data, False, fft_len)
chan = blocks.multiply_const_vcc(channel)
chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1)
sink = blocks.vector_sink_c(fft_len)
+ sink_chanest = blocks.vector_sink_c(fft_len)
self.tb.connect(src, chan, chanest, sink)
+ self.tb.connect((chanest, 1), sink_chanest)
self.tb.run()
+ self.assertEqual(sink_chanest.data(), channel)
tags = sink.tags()
for tag in tags:
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset':
@@ -193,6 +201,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0)
# Channel 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# Shifted (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1)
+ chanest_exp = (0, 0, 0, 5, 6, 7, 8, 9, 0, 11, 12, 13, 14, 15, 0, 0)
tx_data = shift_tuple(sync_symbol1, carr_offset) + \
shift_tuple(sync_symbol2, carr_offset) + \
shift_tuple(data_symbol, carr_offset)
@@ -210,9 +219,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
self.assertEqual(pmt.to_long(tag.value), carr_offset)
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
chan_est = pmt.c32vector_elements(tag.value)
- for i in range(fft_len):
- if shift_tuple(sync_symbol2, carr_offset)[i]: # Only here the channel can be estimated
- self.assertEqual(chan_est[i], channel[i])
+ self.assertEqual(chan_est, chanest_exp)
self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel)))
@@ -226,7 +233,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
wgn_amplitude = 0.05
min_chan_ampl = 0.1
max_chan_ampl = 5
- n_iter = 20
+ n_iter = 20 # The more the accurater
def run_flow_graph(sync_sym1, sync_sym2, data_sym):
top_block = gr.top_block()
carr_offset = random.randint(-max_offset/2, max_offset/2) * 2
@@ -252,7 +259,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase):
carr_offset_hat = pmt.to_long(tag.value)
self.assertEqual(carr_offset, carr_offset_hat)
if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps':
- channel_est = pmt.c32vector_elements(tag.value)
+ channel_est = shift_tuple(pmt.c32vector_elements(tag.value), carr_offset)
shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
for i in range(fft_len):
if shifted_carrier_mask[i] and channel_est[i]: