diff options
Diffstat (limited to 'gr-digital/python/digital/qa_ofdm_chanest_vcvc.py')
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_chanest_vcvc.py | 23 |
1 files changed, 15 insertions, 8 deletions
diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py index c9e2bacd0d..e6b331b2a8 100755 --- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py +++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py @@ -20,6 +20,7 @@ # import sys +import numpy import random import numpy @@ -60,11 +61,11 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): tx_data = shift_tuple(sync_symbol1, carr_offset) + \ shift_tuple(sync_symbol2, carr_offset) + \ shift_tuple(data_symbol, carr_offset) - tag1 = gr.gr_tag_t() + tag1 = gr.tag_t() tag1.offset = 0 tag1.key = pmt.string_to_symbol("test_tag_1") tag1.value = pmt.from_long(23) - tag2 = gr.gr_tag_t() + tag2 = gr.tag_t() tag2.offset = 2 tag2.key = pmt.string_to_symbol("test_tag_2") tag2.value = pmt.from_long(42) @@ -128,7 +129,9 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): chan = blocks.multiply_const_vcc(channel) chanest = digital.ofdm_chanest_vcvc(sync_symbol1, sync_symbol2, 1) sink = blocks.vector_sink_c(fft_len) + sink_chanest = blocks.vector_sink_c(fft_len) self.tb.connect(src, chan, chanest, sink) + self.tb.connect((chanest, 1), sink_chanest) self.tb.run() tags = sink.tags() self.assertEqual(shift_tuple(sink.data(), -carr_offset), tuple(numpy.multiply(data_symbol, channel))) @@ -137,6 +140,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): self.assertEqual(pmt.to_long(tag.value), carr_offset) if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': self.assertEqual(pmt.c32vector_elements(tag.value), channel) + self.assertEqual(sink_chanest.data(), channel) def test_004_channel_no_carroffset_1sym (self): """ Add a channel, check if it's correctly estimated. @@ -146,13 +150,17 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): sync_symbol = (0, 0, 0, 1, 0, 1, 0, -1, 0, 1, 0, -1, 0, 1, 0, 0) data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) tx_data = sync_symbol + data_symbol - channel = (0, 0, 0, 2, 2, 2, 2.5, 3, 2.5, 2, 2.5, 3, 2, 1, 1, 0) + channel = (0, 0, 0, 2, 2, 2, 2, 3, 3, 2.5, 2.5, -3, -3, 1j, 1j, 0) + #channel = (0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0) src = blocks.vector_source_c(tx_data, False, fft_len) chan = blocks.multiply_const_vcc(channel) chanest = digital.ofdm_chanest_vcvc(sync_symbol, (), 1) sink = blocks.vector_sink_c(fft_len) + sink_chanest = blocks.vector_sink_c(fft_len) self.tb.connect(src, chan, chanest, sink) + self.tb.connect((chanest, 1), sink_chanest) self.tb.run() + self.assertEqual(sink_chanest.data(), channel) tags = sink.tags() for tag in tags: if pmt.symbol_to_string(tag.key) == 'ofdm_sync_carr_offset': @@ -193,6 +201,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): data_symbol = (0, 0, 0, 1, -1, 1, -1, 1, 0, 1, -1, -1, -1, 1, 0, 0) # Channel 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 # Shifted (0, 0, 0, 0, 0, 1j, -1, 1, -1j, 1j, 0, 1, -1j, -1, -1j, 1) + chanest_exp = (0, 0, 0, 5, 6, 7, 8, 9, 0, 11, 12, 13, 14, 15, 0, 0) tx_data = shift_tuple(sync_symbol1, carr_offset) + \ shift_tuple(sync_symbol2, carr_offset) + \ shift_tuple(data_symbol, carr_offset) @@ -210,9 +219,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): self.assertEqual(pmt.to_long(tag.value), carr_offset) if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': chan_est = pmt.c32vector_elements(tag.value) - for i in range(fft_len): - if shift_tuple(sync_symbol2, carr_offset)[i]: # Only here the channel can be estimated - self.assertEqual(chan_est[i], channel[i]) + self.assertEqual(chan_est, chanest_exp) self.assertEqual(sink.data(), tuple(numpy.multiply(shift_tuple(data_symbol, carr_offset), channel))) @@ -226,7 +233,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): wgn_amplitude = 0.05 min_chan_ampl = 0.1 max_chan_ampl = 5 - n_iter = 20 + n_iter = 20 # The more the accurater def run_flow_graph(sync_sym1, sync_sym2, data_sym): top_block = gr.top_block() carr_offset = random.randint(-max_offset/2, max_offset/2) * 2 @@ -252,7 +259,7 @@ class qa_ofdm_sync_eqinit_vcvc (gr_unittest.TestCase): carr_offset_hat = pmt.to_long(tag.value) self.assertEqual(carr_offset, carr_offset_hat) if pmt.symbol_to_string(tag.key) == 'ofdm_sync_chan_taps': - channel_est = pmt.c32vector_elements(tag.value) + channel_est = shift_tuple(pmt.c32vector_elements(tag.value), carr_offset) shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset) for i in range(fft_len): if shifted_carrier_mask[i] and channel_est[i]: |