diff options
Diffstat (limited to 'gr-digital/python/digital')
-rw-r--r-- | gr-digital/python/digital/CMakeLists.txt | 2 | ||||
-rw-r--r-- | gr-digital/python/digital/qa_correlate_and_sync.py | 104 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_mpsk_receiver.py | 146 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_ofdm_insert_preamble.py | 178 |
4 files changed, 0 insertions, 430 deletions
diff --git a/gr-digital/python/digital/CMakeLists.txt b/gr-digital/python/digital/CMakeLists.txt index 091bd883b3..8f88948a2e 100644 --- a/gr-digital/python/digital/CMakeLists.txt +++ b/gr-digital/python/digital/CMakeLists.txt @@ -51,7 +51,6 @@ GR_PYTHON_INSTALL( qpsk.py soft_dec_lut_gen.py DESTINATION ${GR_PYTHON_DIR}/gnuradio/digital - COMPONENT "digital_python" ) GR_PYTHON_INSTALL( @@ -62,7 +61,6 @@ GR_PYTHON_INSTALL( utils/alignment.py utils/tagged_streams.py DESTINATION ${GR_PYTHON_DIR}/gnuradio/digital/utils - COMPONENT "digital_python" ) ######################################################################## diff --git a/gr-digital/python/digital/qa_correlate_and_sync.py b/gr-digital/python/digital/qa_correlate_and_sync.py deleted file mode 100644 index 3297368617..0000000000 --- a/gr-digital/python/digital/qa_correlate_and_sync.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# -import math - -import pmt -from gnuradio import gr, gr_unittest, digital, blocks, filter - -def make_parabolic_pulse_shape(sps, N=1, scale=1): - # Make L taps on each side. - L = int(float(N)/2 * sps) - n_taps = 2 * L + 1 - taps = [] - for tap_i in xrange(n_taps): - x = float(tap_i - L)/sps/N - if abs(x) > 1: - taps.append(0) - else: - taps.append(1-x*x) - return [tap*scale for tap in taps] - -class test_correlate_and_sync(gr_unittest.TestCase): - - def test_001(self): - # We're using a really simple preamble so that the correlation - # is straight forward. - preamble = [0, 0, 0, 1, 0, 0, 0] - # Our pulse shape has this width (in units of symbols). - pulse_width = 1.5 - # The number of filters to use for resampling. - n_filters = 12 - sps = 3 - data = [0]*10 + preamble + [0]*40 - src = blocks.vector_source_c(data) - - # We want to generate taps with a sampling rate of sps=n_filters for resampling - # purposes. - pulse_shape = make_parabolic_pulse_shape(sps=n_filters, N=0.5, scale=35) - - # Create our resampling filter to generate the data for the correlator. - shape = filter.pfb_arb_resampler_ccf(sps, pulse_shape, n_filters) - # Generate the correlator block itself. - correlator = digital.correlate_and_sync_cc(preamble, pulse_shape, sps, n_filters) - - # Connect it all up and go. - snk = blocks.vector_sink_c() - null = blocks.null_sink(gr.sizeof_gr_complex) - tb = gr.top_block() - tb.connect(src, shape, correlator, snk) - tb.connect((correlator, 1), null) - tb.run() - - # Look at the tags. Retrieve the timing offset. - data = snk.data() - offset = None - timing_error = None - for tag in snk.tags(): - key = pmt.symbol_to_string(tag.key) - if key == "time_est": - offset = tag.offset - timing_error = pmt.to_double(tag.value) - if offset is None: - raise ValueError("No tags found.") - # Detect where the middle of the preamble is. - # Assume we have only one peak and that it is symmetric. - sum_id = 0 - sum_d = 0 - for i, d in enumerate(data): - sum_id += i*abs(d) - sum_d += abs(d) - data_i = sum_id/sum_d - if offset is not None: - diff = data_i-offset - remainder = -(diff%sps) - if remainder < -sps/2.0: - remainder += sps - tol = 0.2 - difference = timing_error - remainder - difference = difference % sps - if abs(difference) >= tol: - print("Tag gives timing estimate of {0}. QA calculates it as {1}. Tolerance is {2}".format(timing_error, remainder, tol)) - self.assertTrue(abs(difference) < tol) - - -if __name__ == '__main__': - gr_unittest.run(test_correlate_and_sync, "test_correlate_and_sync.xml") diff --git a/gr-digital/python/digital/qa_mpsk_receiver.py b/gr-digital/python/digital/qa_mpsk_receiver.py deleted file mode 100755 index df7519dcf6..0000000000 --- a/gr-digital/python/digital/qa_mpsk_receiver.py +++ /dev/null @@ -1,146 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2011-2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -import random -import cmath -import time - -from gnuradio import gr, gr_unittest, digital, filter, blocks - -class test_mpsk_receiver(gr_unittest.TestCase): - - def setUp(self): - self.tb = gr.top_block() - - def tearDown(self): - self.tb = None - - def test01(self): - # Test BPSK sync - M = 2 - theta = 0 - loop_bw = cmath.pi/100.0 - fmin = -0.5 - fmax = 0.5 - mu = 0.5 - gain_mu = 0.01 - omega = 2 - gain_omega = 0.001 - omega_rel = 0.001 - - self.test = digital.mpsk_receiver_cc(M, theta, loop_bw, - fmin, fmax, mu, gain_mu, - omega, gain_omega, - omega_rel) - - data = 10000*[complex(1,0), complex(-1,0)] - #data = [2*random.randint(0,1)-1 for x in xrange(10000)] - self.src = blocks.vector_source_c(data, False) - self.snk = blocks.vector_sink_c() - - # pulse shaping interpolation filter - nfilts = 32 - excess_bw = 0.35 - ntaps = 11 * int(omega*nfilts) - rrc_taps0 = filter.firdes.root_raised_cosine( - nfilts, nfilts, 1.0, excess_bw, ntaps) - rrc_taps1 = filter.firdes.root_raised_cosine( - 1, omega, 1.0, excess_bw, 11*omega) - self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0) - self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1) - - self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk) - self.tb.run() - - expected_result = [-0.5*d for d in data] - dst_data = self.snk.data() - - # Only Ncmp samples after Nstrt samples - Nstrt = 9000 - Ncmp = 1000 - expected_result = expected_result[Nstrt:Nstrt+Ncmp] - dst_data = dst_data[Nstrt:Nstrt+Ncmp] - - #for e,d in zip(expected_result, dst_data): - # print "{0:+.02f} {1:+.02f}".format(e, d) - - self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) - - - def test02(self): - # Test QPSK sync - M = 4 - theta = 0 - loop_bw = cmath.pi/100.0 - fmin = -0.5 - fmax = 0.5 - mu = 0.5 - gain_mu = 0.01 - omega = 2 - gain_omega = 0.001 - omega_rel = 0.001 - - self.test = digital.mpsk_receiver_cc(M, theta, loop_bw, - fmin, fmax, mu, gain_mu, - omega, gain_omega, - omega_rel) - - data = 10000*[complex( 0.707, 0.707), - complex(-0.707, 0.707), - complex(-0.707, -0.707), - complex( 0.707, -0.707)] - data = [0.5*d for d in data] - self.src = blocks.vector_source_c(data, False) - self.snk = blocks.vector_sink_c() - - # pulse shaping interpolation filter - nfilts = 32 - excess_bw = 0.35 - ntaps = 11 * int(omega*nfilts) - rrc_taps0 = filter.firdes.root_raised_cosine( - nfilts, nfilts, 1.0, excess_bw, ntaps) - rrc_taps1 = filter.firdes.root_raised_cosine( - 1, omega, 1.0, excess_bw, 11*omega) - self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0) - self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1) - - self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk) - self.tb.run() - - expected_result = 10000*[complex(-0.5, +0.0), complex(+0.0, -0.5), - complex(+0.5, +0.0), complex(+0.0, +0.5)] - - dst_data = self.snk.data() - - # Only Ncmp samples after Nstrt samples - Nstrt = 9000 - Ncmp = 1000 - expected_result = expected_result[Nstrt:Nstrt+Ncmp] - dst_data = dst_data[Nstrt:Nstrt+Ncmp] - - #for e,d in zip(expected_result, dst_data): - # print "{0:+.02f} {1:+.02f}".format(e, d) - - self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1) - -if __name__ == '__main__': - gr_unittest.run(test_mpsk_receiver, "test_mpsk_receiver.xml") diff --git a/gr-digital/python/digital/qa_ofdm_insert_preamble.py b/gr-digital/python/digital/qa_ofdm_insert_preamble.py deleted file mode 100755 index 4edd54c8c6..0000000000 --- a/gr-digital/python/digital/qa_ofdm_insert_preamble.py +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2007,2010-2013 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -from pprint import pprint - -from gnuradio import gr, gr_unittest, digital, blocks - -class test_ofdm_insert_preamble(gr_unittest.TestCase): - - def setUp(self): - self.tb = gr.top_block() - - def tearDown(self): - self.tb = None - - def helper(self, v0, v1, fft_length, preamble): - tb = self.tb - src0 = blocks.vector_source_c(v0) - src1 = blocks.vector_source_b(v1) - - s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length) - - # print "len(v) = %d" % (len(v)) - - op = digital.ofdm_insert_preamble(fft_length, preamble) - - v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, fft_length) - dst0 = blocks.vector_sink_c() - dst1 = blocks.vector_sink_b() - - tb.connect(src0, s2v, (op, 0)) - tb.connect(src1, (op, 1)) - tb.connect((op, 0), v2s, dst0) - tb.connect((op, 1), dst1) - - tb.run() - r0 = dst0.data() - r0v = [] - for i in range(len(r0)//fft_length): - r0v.append(r0[i*fft_length:(i+1)*fft_length]) - - r1 = dst1.data() - self.assertEqual(len(r0v), len(r1)) - return (r1, r0v) - - def check_match(self, actual, expected_list): - lst = [] - map(lambda x: lst.append(x), expected_list) - self.assertEqual(actual, lst) - - - # ---------------------------------------------------------------- - - def test_000(self): - # no preamble, 1 symbol payloads - - preamble = () - fft_length = 8 - npayloads = 8 - v = [] - p = [] - for i in range(npayloads): - t = fft_length*[(i + i*1j)] - p.append(tuple(t)) - v += t - - p = tuple(p) - - r = self.helper(v, npayloads*[1], fft_length, preamble) - # pprint(r) - - self.assertEqual(r[0], tuple(npayloads*[1])) - self.check_match(r[1], (p[0],p[1],p[2],p[3],p[4],p[5],p[6],p[7])) - - - def test_001(self): - # 1 symbol preamble, 1 symbol payloads - preamble = ((100, 101, 102, 103, 104, 105, 106, 107),) - p0 = preamble[0] - fft_length = 8 - npayloads = 8 - v = [] - p = [] - for i in range(npayloads): - t = fft_length*[(i + i*1j)] - p.append(tuple(t)) - v += t - - r = self.helper(v, npayloads*[1], fft_length, preamble) - - self.assertEqual(r[0], tuple(npayloads*[1, 0])) - self.check_match(r[1], (p0, p[0], - p0, p[1], - p0, p[2], - p0, p[3], - p0, p[4], - p0, p[5], - p0, p[6], - p0, p[7])) - - def test_002(self): - # 2 symbol preamble, 1 symbol payloads - preamble = ((100, 101, 102, 103, 104, 105, 106, 107), - (200, 201, 202, 203, 204, 205, 206, 207)) - p0 = preamble[0] - p1 = preamble[1] - - fft_length = 8 - npayloads = 8 - v = [] - p = [] - for i in range(npayloads): - t = fft_length*[(i + i*1j)] - p.append(tuple(t)) - v += t - - r = self.helper(v, npayloads*[1], fft_length, preamble) - - self.assertEqual(r[0], tuple(npayloads*[1, 0, 0])) - self.check_match(r[1], (p0, p1, p[0], - p0, p1, p[1], - p0, p1, p[2], - p0, p1, p[3], - p0, p1, p[4], - p0, p1, p[5], - p0, p1, p[6], - p0, p1, p[7])) - - - def xtest_003_preamble(self): - # 2 symbol preamble, 2 symbol payloads - preamble = ((100, 101, 102, 103, 104, 105, 106, 107), - (200, 201, 202, 203, 204, 205, 206, 207)) - p0 = preamble[0] - p1 = preamble[1] - - fft_length = 8 - npayloads = 8 - v = [] - p = [] - for i in range(npayloads * 2): - t = fft_length*[(i + i*1j)] - p.append(tuple(t)) - v += t - - r = self.helper(v, npayloads*[1, 0], fft_length, preamble) - - self.assertEqual(r[0], tuple(npayloads*[1, 0, 0, 0])) - self.check_match(r[1], (p0, p1, p[0], p[1], - p0, p1, p[2], p[3], - p0, p1, p[4], p[5], - p0, p1, p[6], p[7], - p0, p1, p[8], p[9], - p0, p1, p[10], p[11], - p0, p1, p[12], p[13], - p0, p1, p[14], p[15])) - -if __name__ == '__main__': - gr_unittest.run(test_ofdm_insert_preamble, "test_ofdm_insert_preamble.xml") |