diff options
Diffstat (limited to 'gr-digital/python/digital/qa_linear_equalizer.py')
-rwxr-xr-x | gr-digital/python/digital/qa_linear_equalizer.py | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/gr-digital/python/digital/qa_linear_equalizer.py b/gr-digital/python/digital/qa_linear_equalizer.py new file mode 100755 index 0000000000..d7a4a597f3 --- /dev/null +++ b/gr-digital/python/digital/qa_linear_equalizer.py @@ -0,0 +1,142 @@ +#!/usr/bin/env python +# +# Copyright 2020 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# + +from gnuradio import gr, gr_unittest + +import random, numpy +from gnuradio import digital, blocks, channels + +class qa_linear_equalizer(gr_unittest.TestCase): + + def unpack_values(self, values_in, bits_per_value, bits_per_symbol): + # verify that 8 is divisible by bits_per_symbol + m = bits_per_value / bits_per_symbol + # print(m) + mask = 2**(bits_per_symbol)-1 + + if bits_per_value != m*bits_per_symbol: + print("error - bits per symbols must fit nicely into bits_per_value bit values") + return [] + + num_values = len(values_in) + num_symbols = int(num_values*( m) ) + + cur_byte = 0 + cur_bit = 0 + out = [] + for i in range(num_symbols): + s = (values_in[cur_byte] >> (bits_per_value-bits_per_symbol-cur_bit)) & mask + out.append(s) + cur_bit += bits_per_symbol + + if cur_bit >= bits_per_value: + cur_bit = 0 + cur_byte += 1 + + return out + + def map_symbols_to_constellation(self, symbols, cons): + l = list(map(lambda x: cons.points()[x], symbols)) + return l + + + def setUp(self): + random.seed(987654) + self.tb = gr.top_block() + self.num_data = num_data = 10000 + + + self.sps = sps = 4 + self.eb = eb = 0.35 + self.preamble = preamble = [0x27,0x2F,0x18,0x5D,0x5B,0x2A,0x3F,0x71,0x63,0x3C,0x17,0x0C,0x0A,0x41,0xD6,0x1F,0x4C,0x23,0x65,0x68,0xED,0x1C,0x77,0xA7,0x0E,0x0A,0x9E,0x47,0x82,0xA4,0x57,0x24,] + + self.payload_size = payload_size = 300 # bytes + self.data = data = [0]*4+[random.getrandbits(8) for i in range(payload_size)] + self.gain = gain = .001 # LMS gain + self.corr_thresh = corr_thresh = 3e6 + self.num_taps = num_taps = 16 + + + + def tearDown(self): + self.tb = None + + + def transform(self, src_data, gain, const): + SRC = blocks.vector_source_c(src_data, False) + EQU = digital.lms_dd_equalizer_cc(4, gain, 1, const.base()) + DST = blocks.vector_sink_c() + self.tb.connect(SRC, EQU, DST) + self.tb.run() + return DST.data() + + def test_001_identity(self): + # Constant modulus signal so no adjustments + const = digital.constellation_qpsk() + src_data = const.points()*1000 + + N = 100 # settling time + expected_data = src_data[N:] + result = self.transform(src_data, 0.1, const)[N:] + + N = -500 + self.assertComplexTuplesAlmostEqual(expected_data[N:], result[N:], 5) + + def test_qpsk_3tap_lms_training(self): + # set up fg + gain = 0.01 # LMS gain + num_taps = 16 + num_samp = 2000 + num_test = 500 + cons = digital.constellation_qpsk().base() + rxmod = digital.generic_mod(cons, False, self.sps, True, self.eb, False, False) + modulated_sync_word_pre = digital.modulate_vector_bc(rxmod.to_basic_block(), self.preamble+self.preamble, [1]) + modulated_sync_word = modulated_sync_word_pre[86:(512+86)] # compensate for the RRC filter delay + corr_max = numpy.abs(numpy.dot(modulated_sync_word,numpy.conj(modulated_sync_word))) + corr_calc = self.corr_thresh/(corr_max*corr_max) + preamble_symbols = self.map_symbols_to_constellation(self.unpack_values(self.preamble, 8, 2), cons) + + alg = digital.adaptive_algorithm_lms(cons, gain).base() + evm = digital.meas_evm_cc(cons, digital.evm_measurement_t_EVM_PERCENT) + leq = digital.linear_equalizer(num_taps, self.sps, alg, False, preamble_symbols, 'corr_est') + correst = digital.corr_est_cc(modulated_sync_word, self.sps, 12, corr_calc, digital.THRESHOLD_ABSOLUTE) + constmod = digital.generic_mod( + constellation=cons, + differential=False, + samples_per_symbol=4, + pre_diff_code=True, + excess_bw=0.35, + verbose=False, + log=False) + chan = channels.channel_model( + noise_voltage=0.0, + frequency_offset=0.0, + epsilon=1.0, + taps=(1.0 + 1.0j, 0.63-.22j, -.1+.07j), + noise_seed=0, + block_tags=False) + vso = blocks.vector_source_b(self.preamble+self.data, True, 1, []) + head = blocks.head(gr.sizeof_float*1, num_samp) + vsi = blocks.vector_sink_f() + + self.tb.connect(vso, constmod, chan, correst, leq, evm, head, vsi) + self.tb.run() + + # look at the last 1000 samples, should converge quickly, below 5% EVM + upper_bound = tuple(20.0*numpy.ones((num_test,))) + lower_bound = tuple(0.0*numpy.zeros((num_test,))) + output_data = vsi.data() + output_data = output_data[-num_test:] + self.assertLess(output_data, upper_bound) + self.assertGreater(output_data, lower_bound) + + +if __name__ == '__main__': + gr_unittest.run(qa_linear_equalizer) |