summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_linear_equalizer.py
diff options
context:
space:
mode:
authormormj <34754695+mormj@users.noreply.github.com>2020-04-09 07:38:25 -0400
committerGitHub <noreply@github.com>2020-04-09 07:38:25 -0400
commit3c1750c481a578bd831fce933502fde88444eb54 (patch)
treeb2e3baf85a1e0605fdf4b0bfcf3e4d6280dfe439 /gr-digital/python/digital/qa_linear_equalizer.py
parent7bc8622dda145164c97b73b1c8d4aaf3e0a8bf16 (diff)
digital: restructure equalizers and add dfe (#3306)
* digital: restructure equalizers and add dfe This commit restructures the linear equalizer to have a separate specifiable adaptive algorithm. Generally this works the same as the previous LMS and CMA decision directed equalizers, but also adds the ability to equalize using training sequences as well. Also, a Decision Feedback Equalizer structure is added * digital: more const in equalizers * digital: equalizers - more safety based on review * digital: dfe - use deque instead of vector for decision_history * digital - equalizers, further cleanup
Diffstat (limited to 'gr-digital/python/digital/qa_linear_equalizer.py')
-rwxr-xr-xgr-digital/python/digital/qa_linear_equalizer.py142
1 files changed, 142 insertions, 0 deletions
diff --git a/gr-digital/python/digital/qa_linear_equalizer.py b/gr-digital/python/digital/qa_linear_equalizer.py
new file mode 100755
index 0000000000..d7a4a597f3
--- /dev/null
+++ b/gr-digital/python/digital/qa_linear_equalizer.py
@@ -0,0 +1,142 @@
+#!/usr/bin/env python
+#
+# Copyright 2020 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+#
+
+from gnuradio import gr, gr_unittest
+
+import random, numpy
+from gnuradio import digital, blocks, channels
+
+class qa_linear_equalizer(gr_unittest.TestCase):
+
+ def unpack_values(self, values_in, bits_per_value, bits_per_symbol):
+ # verify that 8 is divisible by bits_per_symbol
+ m = bits_per_value / bits_per_symbol
+ # print(m)
+ mask = 2**(bits_per_symbol)-1
+
+ if bits_per_value != m*bits_per_symbol:
+ print("error - bits per symbols must fit nicely into bits_per_value bit values")
+ return []
+
+ num_values = len(values_in)
+ num_symbols = int(num_values*( m) )
+
+ cur_byte = 0
+ cur_bit = 0
+ out = []
+ for i in range(num_symbols):
+ s = (values_in[cur_byte] >> (bits_per_value-bits_per_symbol-cur_bit)) & mask
+ out.append(s)
+ cur_bit += bits_per_symbol
+
+ if cur_bit >= bits_per_value:
+ cur_bit = 0
+ cur_byte += 1
+
+ return out
+
+ def map_symbols_to_constellation(self, symbols, cons):
+ l = list(map(lambda x: cons.points()[x], symbols))
+ return l
+
+
+ def setUp(self):
+ random.seed(987654)
+ self.tb = gr.top_block()
+ self.num_data = num_data = 10000
+
+
+ self.sps = sps = 4
+ self.eb = eb = 0.35
+ self.preamble = preamble = [0x27,0x2F,0x18,0x5D,0x5B,0x2A,0x3F,0x71,0x63,0x3C,0x17,0x0C,0x0A,0x41,0xD6,0x1F,0x4C,0x23,0x65,0x68,0xED,0x1C,0x77,0xA7,0x0E,0x0A,0x9E,0x47,0x82,0xA4,0x57,0x24,]
+
+ self.payload_size = payload_size = 300 # bytes
+ self.data = data = [0]*4+[random.getrandbits(8) for i in range(payload_size)]
+ self.gain = gain = .001 # LMS gain
+ self.corr_thresh = corr_thresh = 3e6
+ self.num_taps = num_taps = 16
+
+
+
+ def tearDown(self):
+ self.tb = None
+
+
+ def transform(self, src_data, gain, const):
+ SRC = blocks.vector_source_c(src_data, False)
+ EQU = digital.lms_dd_equalizer_cc(4, gain, 1, const.base())
+ DST = blocks.vector_sink_c()
+ self.tb.connect(SRC, EQU, DST)
+ self.tb.run()
+ return DST.data()
+
+ def test_001_identity(self):
+ # Constant modulus signal so no adjustments
+ const = digital.constellation_qpsk()
+ src_data = const.points()*1000
+
+ N = 100 # settling time
+ expected_data = src_data[N:]
+ result = self.transform(src_data, 0.1, const)[N:]
+
+ N = -500
+ self.assertComplexTuplesAlmostEqual(expected_data[N:], result[N:], 5)
+
+ def test_qpsk_3tap_lms_training(self):
+ # set up fg
+ gain = 0.01 # LMS gain
+ num_taps = 16
+ num_samp = 2000
+ num_test = 500
+ cons = digital.constellation_qpsk().base()
+ rxmod = digital.generic_mod(cons, False, self.sps, True, self.eb, False, False)
+ modulated_sync_word_pre = digital.modulate_vector_bc(rxmod.to_basic_block(), self.preamble+self.preamble, [1])
+ modulated_sync_word = modulated_sync_word_pre[86:(512+86)] # compensate for the RRC filter delay
+ corr_max = numpy.abs(numpy.dot(modulated_sync_word,numpy.conj(modulated_sync_word)))
+ corr_calc = self.corr_thresh/(corr_max*corr_max)
+ preamble_symbols = self.map_symbols_to_constellation(self.unpack_values(self.preamble, 8, 2), cons)
+
+ alg = digital.adaptive_algorithm_lms(cons, gain).base()
+ evm = digital.meas_evm_cc(cons, digital.evm_measurement_t_EVM_PERCENT)
+ leq = digital.linear_equalizer(num_taps, self.sps, alg, False, preamble_symbols, 'corr_est')
+ correst = digital.corr_est_cc(modulated_sync_word, self.sps, 12, corr_calc, digital.THRESHOLD_ABSOLUTE)
+ constmod = digital.generic_mod(
+ constellation=cons,
+ differential=False,
+ samples_per_symbol=4,
+ pre_diff_code=True,
+ excess_bw=0.35,
+ verbose=False,
+ log=False)
+ chan = channels.channel_model(
+ noise_voltage=0.0,
+ frequency_offset=0.0,
+ epsilon=1.0,
+ taps=(1.0 + 1.0j, 0.63-.22j, -.1+.07j),
+ noise_seed=0,
+ block_tags=False)
+ vso = blocks.vector_source_b(self.preamble+self.data, True, 1, [])
+ head = blocks.head(gr.sizeof_float*1, num_samp)
+ vsi = blocks.vector_sink_f()
+
+ self.tb.connect(vso, constmod, chan, correst, leq, evm, head, vsi)
+ self.tb.run()
+
+ # look at the last 1000 samples, should converge quickly, below 5% EVM
+ upper_bound = tuple(20.0*numpy.ones((num_test,)))
+ lower_bound = tuple(0.0*numpy.zeros((num_test,)))
+ output_data = vsi.data()
+ output_data = output_data[-num_test:]
+ self.assertLess(output_data, upper_bound)
+ self.assertGreater(output_data, lower_bound)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_linear_equalizer)