summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital')
-rw-r--r--gr-digital/python/digital/CMakeLists.txt2
-rw-r--r--gr-digital/python/digital/qa_correlate_and_sync.py104
-rwxr-xr-xgr-digital/python/digital/qa_mpsk_receiver.py146
-rwxr-xr-xgr-digital/python/digital/qa_ofdm_insert_preamble.py178
4 files changed, 0 insertions, 430 deletions
diff --git a/gr-digital/python/digital/CMakeLists.txt b/gr-digital/python/digital/CMakeLists.txt
index 091bd883b3..8f88948a2e 100644
--- a/gr-digital/python/digital/CMakeLists.txt
+++ b/gr-digital/python/digital/CMakeLists.txt
@@ -51,7 +51,6 @@ GR_PYTHON_INSTALL(
qpsk.py
soft_dec_lut_gen.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/digital
- COMPONENT "digital_python"
)
GR_PYTHON_INSTALL(
@@ -62,7 +61,6 @@ GR_PYTHON_INSTALL(
utils/alignment.py
utils/tagged_streams.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/digital/utils
- COMPONENT "digital_python"
)
########################################################################
diff --git a/gr-digital/python/digital/qa_correlate_and_sync.py b/gr-digital/python/digital/qa_correlate_and_sync.py
deleted file mode 100644
index 3297368617..0000000000
--- a/gr-digital/python/digital/qa_correlate_and_sync.py
+++ /dev/null
@@ -1,104 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-import math
-
-import pmt
-from gnuradio import gr, gr_unittest, digital, blocks, filter
-
-def make_parabolic_pulse_shape(sps, N=1, scale=1):
- # Make L taps on each side.
- L = int(float(N)/2 * sps)
- n_taps = 2 * L + 1
- taps = []
- for tap_i in xrange(n_taps):
- x = float(tap_i - L)/sps/N
- if abs(x) > 1:
- taps.append(0)
- else:
- taps.append(1-x*x)
- return [tap*scale for tap in taps]
-
-class test_correlate_and_sync(gr_unittest.TestCase):
-
- def test_001(self):
- # We're using a really simple preamble so that the correlation
- # is straight forward.
- preamble = [0, 0, 0, 1, 0, 0, 0]
- # Our pulse shape has this width (in units of symbols).
- pulse_width = 1.5
- # The number of filters to use for resampling.
- n_filters = 12
- sps = 3
- data = [0]*10 + preamble + [0]*40
- src = blocks.vector_source_c(data)
-
- # We want to generate taps with a sampling rate of sps=n_filters for resampling
- # purposes.
- pulse_shape = make_parabolic_pulse_shape(sps=n_filters, N=0.5, scale=35)
-
- # Create our resampling filter to generate the data for the correlator.
- shape = filter.pfb_arb_resampler_ccf(sps, pulse_shape, n_filters)
- # Generate the correlator block itself.
- correlator = digital.correlate_and_sync_cc(preamble, pulse_shape, sps, n_filters)
-
- # Connect it all up and go.
- snk = blocks.vector_sink_c()
- null = blocks.null_sink(gr.sizeof_gr_complex)
- tb = gr.top_block()
- tb.connect(src, shape, correlator, snk)
- tb.connect((correlator, 1), null)
- tb.run()
-
- # Look at the tags. Retrieve the timing offset.
- data = snk.data()
- offset = None
- timing_error = None
- for tag in snk.tags():
- key = pmt.symbol_to_string(tag.key)
- if key == "time_est":
- offset = tag.offset
- timing_error = pmt.to_double(tag.value)
- if offset is None:
- raise ValueError("No tags found.")
- # Detect where the middle of the preamble is.
- # Assume we have only one peak and that it is symmetric.
- sum_id = 0
- sum_d = 0
- for i, d in enumerate(data):
- sum_id += i*abs(d)
- sum_d += abs(d)
- data_i = sum_id/sum_d
- if offset is not None:
- diff = data_i-offset
- remainder = -(diff%sps)
- if remainder < -sps/2.0:
- remainder += sps
- tol = 0.2
- difference = timing_error - remainder
- difference = difference % sps
- if abs(difference) >= tol:
- print("Tag gives timing estimate of {0}. QA calculates it as {1}. Tolerance is {2}".format(timing_error, remainder, tol))
- self.assertTrue(abs(difference) < tol)
-
-
-if __name__ == '__main__':
- gr_unittest.run(test_correlate_and_sync, "test_correlate_and_sync.xml")
diff --git a/gr-digital/python/digital/qa_mpsk_receiver.py b/gr-digital/python/digital/qa_mpsk_receiver.py
deleted file mode 100755
index df7519dcf6..0000000000
--- a/gr-digital/python/digital/qa_mpsk_receiver.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2011-2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-import random
-import cmath
-import time
-
-from gnuradio import gr, gr_unittest, digital, filter, blocks
-
-class test_mpsk_receiver(gr_unittest.TestCase):
-
- def setUp(self):
- self.tb = gr.top_block()
-
- def tearDown(self):
- self.tb = None
-
- def test01(self):
- # Test BPSK sync
- M = 2
- theta = 0
- loop_bw = cmath.pi/100.0
- fmin = -0.5
- fmax = 0.5
- mu = 0.5
- gain_mu = 0.01
- omega = 2
- gain_omega = 0.001
- omega_rel = 0.001
-
- self.test = digital.mpsk_receiver_cc(M, theta, loop_bw,
- fmin, fmax, mu, gain_mu,
- omega, gain_omega,
- omega_rel)
-
- data = 10000*[complex(1,0), complex(-1,0)]
- #data = [2*random.randint(0,1)-1 for x in xrange(10000)]
- self.src = blocks.vector_source_c(data, False)
- self.snk = blocks.vector_sink_c()
-
- # pulse shaping interpolation filter
- nfilts = 32
- excess_bw = 0.35
- ntaps = 11 * int(omega*nfilts)
- rrc_taps0 = filter.firdes.root_raised_cosine(
- nfilts, nfilts, 1.0, excess_bw, ntaps)
- rrc_taps1 = filter.firdes.root_raised_cosine(
- 1, omega, 1.0, excess_bw, 11*omega)
- self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0)
- self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1)
-
- self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk)
- self.tb.run()
-
- expected_result = [-0.5*d for d in data]
- dst_data = self.snk.data()
-
- # Only Ncmp samples after Nstrt samples
- Nstrt = 9000
- Ncmp = 1000
- expected_result = expected_result[Nstrt:Nstrt+Ncmp]
- dst_data = dst_data[Nstrt:Nstrt+Ncmp]
-
- #for e,d in zip(expected_result, dst_data):
- # print "{0:+.02f} {1:+.02f}".format(e, d)
-
- self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
-
-
- def test02(self):
- # Test QPSK sync
- M = 4
- theta = 0
- loop_bw = cmath.pi/100.0
- fmin = -0.5
- fmax = 0.5
- mu = 0.5
- gain_mu = 0.01
- omega = 2
- gain_omega = 0.001
- omega_rel = 0.001
-
- self.test = digital.mpsk_receiver_cc(M, theta, loop_bw,
- fmin, fmax, mu, gain_mu,
- omega, gain_omega,
- omega_rel)
-
- data = 10000*[complex( 0.707, 0.707),
- complex(-0.707, 0.707),
- complex(-0.707, -0.707),
- complex( 0.707, -0.707)]
- data = [0.5*d for d in data]
- self.src = blocks.vector_source_c(data, False)
- self.snk = blocks.vector_sink_c()
-
- # pulse shaping interpolation filter
- nfilts = 32
- excess_bw = 0.35
- ntaps = 11 * int(omega*nfilts)
- rrc_taps0 = filter.firdes.root_raised_cosine(
- nfilts, nfilts, 1.0, excess_bw, ntaps)
- rrc_taps1 = filter.firdes.root_raised_cosine(
- 1, omega, 1.0, excess_bw, 11*omega)
- self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0)
- self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1)
-
- self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk)
- self.tb.run()
-
- expected_result = 10000*[complex(-0.5, +0.0), complex(+0.0, -0.5),
- complex(+0.5, +0.0), complex(+0.0, +0.5)]
-
- dst_data = self.snk.data()
-
- # Only Ncmp samples after Nstrt samples
- Nstrt = 9000
- Ncmp = 1000
- expected_result = expected_result[Nstrt:Nstrt+Ncmp]
- dst_data = dst_data[Nstrt:Nstrt+Ncmp]
-
- #for e,d in zip(expected_result, dst_data):
- # print "{0:+.02f} {1:+.02f}".format(e, d)
-
- self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
-
-if __name__ == '__main__':
- gr_unittest.run(test_mpsk_receiver, "test_mpsk_receiver.xml")
diff --git a/gr-digital/python/digital/qa_ofdm_insert_preamble.py b/gr-digital/python/digital/qa_ofdm_insert_preamble.py
deleted file mode 100755
index 4edd54c8c6..0000000000
--- a/gr-digital/python/digital/qa_ofdm_insert_preamble.py
+++ /dev/null
@@ -1,178 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2007,2010-2013 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-from pprint import pprint
-
-from gnuradio import gr, gr_unittest, digital, blocks
-
-class test_ofdm_insert_preamble(gr_unittest.TestCase):
-
- def setUp(self):
- self.tb = gr.top_block()
-
- def tearDown(self):
- self.tb = None
-
- def helper(self, v0, v1, fft_length, preamble):
- tb = self.tb
- src0 = blocks.vector_source_c(v0)
- src1 = blocks.vector_source_b(v1)
-
- s2v = blocks.stream_to_vector(gr.sizeof_gr_complex, fft_length)
-
- # print "len(v) = %d" % (len(v))
-
- op = digital.ofdm_insert_preamble(fft_length, preamble)
-
- v2s = blocks.vector_to_stream(gr.sizeof_gr_complex, fft_length)
- dst0 = blocks.vector_sink_c()
- dst1 = blocks.vector_sink_b()
-
- tb.connect(src0, s2v, (op, 0))
- tb.connect(src1, (op, 1))
- tb.connect((op, 0), v2s, dst0)
- tb.connect((op, 1), dst1)
-
- tb.run()
- r0 = dst0.data()
- r0v = []
- for i in range(len(r0)//fft_length):
- r0v.append(r0[i*fft_length:(i+1)*fft_length])
-
- r1 = dst1.data()
- self.assertEqual(len(r0v), len(r1))
- return (r1, r0v)
-
- def check_match(self, actual, expected_list):
- lst = []
- map(lambda x: lst.append(x), expected_list)
- self.assertEqual(actual, lst)
-
-
- # ----------------------------------------------------------------
-
- def test_000(self):
- # no preamble, 1 symbol payloads
-
- preamble = ()
- fft_length = 8
- npayloads = 8
- v = []
- p = []
- for i in range(npayloads):
- t = fft_length*[(i + i*1j)]
- p.append(tuple(t))
- v += t
-
- p = tuple(p)
-
- r = self.helper(v, npayloads*[1], fft_length, preamble)
- # pprint(r)
-
- self.assertEqual(r[0], tuple(npayloads*[1]))
- self.check_match(r[1], (p[0],p[1],p[2],p[3],p[4],p[5],p[6],p[7]))
-
-
- def test_001(self):
- # 1 symbol preamble, 1 symbol payloads
- preamble = ((100, 101, 102, 103, 104, 105, 106, 107),)
- p0 = preamble[0]
- fft_length = 8
- npayloads = 8
- v = []
- p = []
- for i in range(npayloads):
- t = fft_length*[(i + i*1j)]
- p.append(tuple(t))
- v += t
-
- r = self.helper(v, npayloads*[1], fft_length, preamble)
-
- self.assertEqual(r[0], tuple(npayloads*[1, 0]))
- self.check_match(r[1], (p0, p[0],
- p0, p[1],
- p0, p[2],
- p0, p[3],
- p0, p[4],
- p0, p[5],
- p0, p[6],
- p0, p[7]))
-
- def test_002(self):
- # 2 symbol preamble, 1 symbol payloads
- preamble = ((100, 101, 102, 103, 104, 105, 106, 107),
- (200, 201, 202, 203, 204, 205, 206, 207))
- p0 = preamble[0]
- p1 = preamble[1]
-
- fft_length = 8
- npayloads = 8
- v = []
- p = []
- for i in range(npayloads):
- t = fft_length*[(i + i*1j)]
- p.append(tuple(t))
- v += t
-
- r = self.helper(v, npayloads*[1], fft_length, preamble)
-
- self.assertEqual(r[0], tuple(npayloads*[1, 0, 0]))
- self.check_match(r[1], (p0, p1, p[0],
- p0, p1, p[1],
- p0, p1, p[2],
- p0, p1, p[3],
- p0, p1, p[4],
- p0, p1, p[5],
- p0, p1, p[6],
- p0, p1, p[7]))
-
-
- def xtest_003_preamble(self):
- # 2 symbol preamble, 2 symbol payloads
- preamble = ((100, 101, 102, 103, 104, 105, 106, 107),
- (200, 201, 202, 203, 204, 205, 206, 207))
- p0 = preamble[0]
- p1 = preamble[1]
-
- fft_length = 8
- npayloads = 8
- v = []
- p = []
- for i in range(npayloads * 2):
- t = fft_length*[(i + i*1j)]
- p.append(tuple(t))
- v += t
-
- r = self.helper(v, npayloads*[1, 0], fft_length, preamble)
-
- self.assertEqual(r[0], tuple(npayloads*[1, 0, 0, 0]))
- self.check_match(r[1], (p0, p1, p[0], p[1],
- p0, p1, p[2], p[3],
- p0, p1, p[4], p[5],
- p0, p1, p[6], p[7],
- p0, p1, p[8], p[9],
- p0, p1, p[10], p[11],
- p0, p1, p[12], p[13],
- p0, p1, p[14], p[15]))
-
-if __name__ == '__main__':
- gr_unittest.run(test_ofdm_insert_preamble, "test_ofdm_insert_preamble.xml")