summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python')
-rw-r--r--gr-digital/python/CMakeLists.txt2
-rw-r--r--gr-digital/python/bpsk.py10
-rw-r--r--gr-digital/python/cpm.py4
-rw-r--r--gr-digital/python/crc.py6
-rw-r--r--gr-digital/python/generic_mod_demod.py15
-rw-r--r--gr-digital/python/gfsk.py13
-rw-r--r--gr-digital/python/gmsk.py14
-rw-r--r--gr-digital/python/ofdm.py30
-rw-r--r--gr-digital/python/ofdm_receiver.py13
-rw-r--r--gr-digital/python/ofdm_sync_ml.py11
-rw-r--r--gr-digital/python/ofdm_sync_pn.py15
-rw-r--r--gr-digital/python/ofdm_sync_pnac.py9
-rw-r--r--gr-digital/python/pkt.py4
-rwxr-xr-xgr-digital/python/qa_binary_slicer_fb.py30
-rwxr-xr-xgr-digital/python/qa_bytes_to_syms.py51
-rwxr-xr-xgr-digital/python/qa_chunks_to_symbols.py6
-rwxr-xr-xgr-digital/python/qa_clock_recovery_mm.py50
-rwxr-xr-xgr-digital/python/qa_cma_equalizer.py10
-rwxr-xr-xgr-digital/python/qa_constellation.py36
-rwxr-xr-xgr-digital/python/qa_constellation_decoder_cb.py52
-rwxr-xr-xgr-digital/python/qa_constellation_receiver.py11
-rwxr-xr-xgr-digital/python/qa_correlate_access_code.py37
-rwxr-xr-xgr-digital/python/qa_costas_loop_cc.py39
-rwxr-xr-xgr-digital/python/qa_cpm.py12
-rwxr-xr-xgr-digital/python/qa_crc32.py26
-rwxr-xr-xgr-digital/python/qa_diff_encoder.py8
-rwxr-xr-xgr-digital/python/qa_diff_phasor_cc.py26
-rwxr-xr-xgr-digital/python/qa_digital.py8
-rwxr-xr-xgr-digital/python/qa_fll_band_edge.py17
-rwxr-xr-xgr-digital/python/qa_framer_sink.py8
-rwxr-xr-xgr-digital/python/qa_glfsr_source.py6
-rwxr-xr-xgr-digital/python/qa_lms_equalizer.py12
-rwxr-xr-xgr-digital/python/qa_map.py10
-rwxr-xr-xgr-digital/python/qa_mpsk_receiver.py105
-rwxr-xr-xgr-digital/python/qa_mpsk_snr_est.py63
-rwxr-xr-xgr-digital/python/qa_ofdm_insert_preamble.py16
-rwxr-xr-xgr-digital/python/qa_pfb_clock_sync.py31
-rwxr-xr-xgr-digital/python/qa_pn_correlator_cc.py2
-rwxr-xr-xgr-digital/python/qa_probe_density.py24
-rwxr-xr-xgr-digital/python/qa_scrambler.py2
-rwxr-xr-xgr-digital/python/qa_simple_framer.py27
-rw-r--r--gr-digital/python/qam.py6
-rw-r--r--gr-digital/python/qpsk.py23
43 files changed, 460 insertions, 440 deletions
diff --git a/gr-digital/python/CMakeLists.txt b/gr-digital/python/CMakeLists.txt
index 6a9f102955..fdb5acd819 100644
--- a/gr-digital/python/CMakeLists.txt
+++ b/gr-digital/python/CMakeLists.txt
@@ -71,6 +71,8 @@ foreach(py_qa_test_file ${py_qa_test_files})
${CMAKE_BINARY_DIR}/gnuradio-core/src/lib/swig
${CMAKE_BINARY_DIR}/gr-digital/python
${CMAKE_BINARY_DIR}/gr-digital/swig
+ ${CMAKE_BINARY_DIR}/gr-filter/python
+ ${CMAKE_BINARY_DIR}/gr-filter/swig
)
set(GR_TEST_TARGET_DEPS volk gruel gnuradio-core gnuradio-digital)
GR_ADD_TEST(${py_qa_test_name} ${PYTHON_EXECUTABLE} ${PYTHON_DASH_B} ${py_qa_test_file})
diff --git a/gr-digital/python/bpsk.py b/gr-digital/python/bpsk.py
index bee6cb0034..57cf2534f4 100644
--- a/gr-digital/python/bpsk.py
+++ b/gr-digital/python/bpsk.py
@@ -116,9 +116,8 @@ class dbpsk_mod(bpsk_mod):
__doc__ += shared_mod_args
def __init__(self, mod_code=None, *args, **kwargs):
- super(dbpsk_mod, self).__init__(differential=True,
- *args, **kwargs)
-#dbpsk_mod.__doc__ += shared_mod_args
+
+ super(dbpsk_mod, self).__init__(*args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
# DBPSK demodulator
@@ -139,9 +138,8 @@ class dbpsk_demod(bpsk_demod):
__doc__ += shared_demod_args
def __init__(self, mod_code=None, *args, **kwargs):
- super(dbpsk_demod, self).__init__(differential=True,
- *args, **kwargs)
-#dbpsk_demod.__doc__ += shared_demod_args
+
+ super(dbpsk_demod, self).__init__(*args, **kwargs)
#
# Add these to the mod/demod registry
diff --git a/gr-digital/python/cpm.py b/gr-digital/python/cpm.py
index 194eb71a81..a2c9f2f0e0 100644
--- a/gr-digital/python/cpm.py
+++ b/gr-digital/python/cpm.py
@@ -24,7 +24,7 @@
# See gnuradio-examples/python/digital for examples
-from gnuradio import gr, blks2
+from gnuradio import gr, filter
from math import pi
import numpy
@@ -142,7 +142,7 @@ class cpm_mod(gr.hier_block2):
else:
raise TypeError, ("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
- self.filter = blks2.pfb_arb_resampler_fff(samples_per_symbol, self.taps)
+ self.filter = filter.pfb.arb_resampler_fff(samples_per_symbol, self.taps)
# FM modulation
self.fmmod = gr.frequency_modulator_fc(sensitivity)
diff --git a/gr-digital/python/crc.py b/gr-digital/python/crc.py
index 198ab059f5..e228faaa98 100644
--- a/gr-digital/python/crc.py
+++ b/gr-digital/python/crc.py
@@ -20,11 +20,11 @@
#
from gnuradio import gru
-import digital_swig
+import digital_swig as digital
import struct
def gen_and_append_crc32(s):
- crc = digital_swig.crc32(s)
+ crc = digital.crc32(s)
return s + struct.pack(">I", gru.hexint(crc) & 0xFFFFFFFF)
def check_crc32(s):
@@ -32,7 +32,7 @@ def check_crc32(s):
return (False, '')
msg = s[:-4]
#print "msg = '%s'" % (msg,)
- actual = digital_swig.crc32(msg)
+ actual = digital.crc32(msg)
(expected,) = struct.unpack(">I", s[-4:])
# print "actual =", hex(actual), "expected =", hex(expected)
return (actual == expected, msg)
diff --git a/gr-digital/python/generic_mod_demod.py b/gr-digital/python/generic_mod_demod.py
index 6f27092429..855249dc63 100644
--- a/gr-digital/python/generic_mod_demod.py
+++ b/gr-digital/python/generic_mod_demod.py
@@ -31,6 +31,11 @@ from utils import mod_codes
import digital_swig as digital
import math
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 2
_def_excess_bw = 0.35
@@ -102,7 +107,7 @@ class generic_mod(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_char), # Input signature
gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
- self._constellation = constellation.base()
+ self._constellation = constellation
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
self._differential = differential
@@ -135,8 +140,8 @@ class generic_mod(gr.hier_block2):
1.0, # symbol rate
self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
- self.rrc_filter = gr.pfb_arb_resampler_ccf(self._samples_per_symbol,
- self.rrc_taps)
+ self.rrc_filter = filter.pfb_arb_resampler_ccf(self._samples_per_symbol,
+ self.rrc_taps)
# Connect
blocks = [self, self.bytes2chunks]
@@ -238,7 +243,7 @@ class generic_demod(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
- self._constellation = constellation.base()
+ self._constellation = constellation
self._samples_per_symbol = samples_per_symbol
self._excess_bw = excess_bw
self._phase_bw = phase_bw
@@ -276,7 +281,7 @@ class generic_demod(gr.hier_block2):
fmin = -0.25
fmax = 0.25
self.receiver = digital.constellation_receiver_cb(
- self._constellation, self._phase_bw,
+ self._constellation.base(), self._phase_bw,
fmin, fmax)
# Do differential decoding based on phase change of symbols
diff --git a/gr-digital/python/gfsk.py b/gr-digital/python/gfsk.py
index aa602d8b8d..09f12ebc30 100644
--- a/gr-digital/python/gfsk.py
+++ b/gr-digital/python/gfsk.py
@@ -32,6 +32,11 @@ import numpy
from pprint import pprint
import inspect
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 2
_def_sensitivity = 1
@@ -91,7 +96,9 @@ class gfsk_mod(gr.hier_block2):
#sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
# Turn it into NRZ data.
- self.nrz = gr.bytes_to_syms()
+ #self.nrz = digital.bytes_to_syms()
+ self.unpack = gr.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ self.nrz = digital.chunks_to_symbols_bf([-1, 1])
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
@@ -104,7 +111,7 @@ class gfsk_mod(gr.hier_block2):
self.sqwave = (1,) * samples_per_symbol # rectangular window
self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = gr.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
# FM modulation
self.fmmod = gr.frequency_modulator_fc(sensitivity)
@@ -119,7 +126,7 @@ class gfsk_mod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
+ self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
def samples_per_symbol(self):
return self._samples_per_symbol
diff --git a/gr-digital/python/gmsk.py b/gr-digital/python/gmsk.py
index d7b547d012..e7853dd0af 100644
--- a/gr-digital/python/gmsk.py
+++ b/gr-digital/python/gmsk.py
@@ -32,6 +32,12 @@ import numpy
from pprint import pprint
import inspect
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
+
# default values (used in __init__ and add_options)
_def_samples_per_symbol = 2
_def_bt = 0.35
@@ -89,7 +95,9 @@ class gmsk_mod(gr.hier_block2):
sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
# Turn it into NRZ data.
- self.nrz = gr.bytes_to_syms()
+ #self.nrz = digital.bytes_to_syms()
+ self.unpack = gr.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ self.nrz = digital.chunks_to_symbols_bf([-1, 1], 1)
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
@@ -102,7 +110,7 @@ class gmsk_mod(gr.hier_block2):
self.sqwave = (1,) * samples_per_symbol # rectangular window
self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = gr.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
# FM modulation
self.fmmod = gr.frequency_modulator_fc(sensitivity)
@@ -114,7 +122,7 @@ class gmsk_mod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self)
+ self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self)
def samples_per_symbol(self):
return self._samples_per_symbol
diff --git a/gr-digital/python/ofdm.py b/gr-digital/python/ofdm.py
index 4c53ad0108..4113a552eb 100644
--- a/gr-digital/python/ofdm.py
+++ b/gr-digital/python/ofdm.py
@@ -21,8 +21,8 @@
#
import math
-from gnuradio import gr
-import digital_swig
+from gnuradio import gr, fft
+import digital_swig as digital
import ofdm_packet_utils
from ofdm_receiver import ofdm_receiver
import gnuradio.gr.gr_threading as _threading
@@ -97,16 +97,16 @@ class ofdm_mod(gr.hier_block2):
constel = qam.qam_constellation(arity)
rotated_const = map(lambda pt: pt * rot, constel.points())
#print rotated_const
- self._pkt_input = digital_swig.ofdm_mapper_bcv(rotated_const,
- msgq_limit,
- options.occupied_tones,
- options.fft_length)
+ self._pkt_input = digital.ofdm_mapper_bcv(rotated_const,
+ msgq_limit,
+ options.occupied_tones,
+ options.fft_length)
- self.preambles = digital_swig.ofdm_insert_preamble(self._fft_length,
- padded_preambles)
- self.ifft = gr.fft_vcc(self._fft_length, False, win, True)
- self.cp_adder = digital_swig.ofdm_cyclic_prefixer(self._fft_length,
- symbol_length)
+ self.preambles = digital.ofdm_insert_preamble(self._fft_length,
+ padded_preambles)
+ self.ifft = fft.fft_vcc(self._fft_length, False, win, True)
+ self.cp_adder = digital.ofdm_cyclic_prefixer(self._fft_length,
+ symbol_length)
self.scale = gr.multiply_const_cc(1.0 / math.sqrt(self._fft_length))
self.connect((self._pkt_input, 0), (self.preambles, 0))
@@ -240,10 +240,10 @@ class ofdm_demod(gr.hier_block2):
phgain = 0.25
frgain = phgain*phgain / 4.0
- self.ofdm_demod = digital_swig.ofdm_frame_sink(rotated_const, range(arity),
- self._rcvd_pktq,
- self._occupied_tones,
- phgain, frgain)
+ self.ofdm_demod = digital.ofdm_frame_sink(rotated_const, range(arity),
+ self._rcvd_pktq,
+ self._occupied_tones,
+ phgain, frgain)
self.connect(self, self.ofdm_recv)
self.connect((self.ofdm_recv, 0), (self.ofdm_demod, 0))
diff --git a/gr-digital/python/ofdm_receiver.py b/gr-digital/python/ofdm_receiver.py
index 33d9b66268..1dc3cdf7cd 100644
--- a/gr-digital/python/ofdm_receiver.py
+++ b/gr-digital/python/ofdm_receiver.py
@@ -24,12 +24,17 @@ import math
from numpy import fft
from gnuradio import gr
-import digital_swig
+import digital_swig as digital
from ofdm_sync_pn import ofdm_sync_pn
from ofdm_sync_fixed import ofdm_sync_fixed
from ofdm_sync_pnac import ofdm_sync_pnac
from ofdm_sync_ml import ofdm_sync_ml
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
class ofdm_receiver(gr.hier_block2):
"""
Performs receiver synchronization on OFDM symbols.
@@ -67,7 +72,7 @@ class ofdm_receiver(gr.hier_block2):
bw+tb, # midpoint of trans. band
tb, # width of trans. band
gr.firdes.WIN_HAMMING) # filter type
- self.chan_filt = gr.fft_filter_ccc(1, chan_coeffs)
+ self.chan_filt = filter.fft_filter_ccc(1, chan_coeffs)
win = [1 for i in range(fft_length)]
@@ -116,9 +121,9 @@ class ofdm_receiver(gr.hier_block2):
self.nco = gr.frequency_modulator_fc(nco_sensitivity) # generate a signal proportional to frequency error of sync block
self.sigmix = gr.multiply_cc()
- self.sampler = digital_swig.ofdm_sampler(fft_length, fft_length+cp_length)
+ self.sampler = digital.ofdm_sampler(fft_length, fft_length+cp_length)
self.fft_demod = gr.fft_vcc(fft_length, True, win, True)
- self.ofdm_frame_acq = digital_swig.ofdm_frame_acquisition(occupied_tones,
+ self.ofdm_frame_acq = digital.ofdm_frame_acquisition(occupied_tones,
fft_length,
cp_length, ks[0])
diff --git a/gr-digital/python/ofdm_sync_ml.py b/gr-digital/python/ofdm_sync_ml.py
index 7c75d7f1d4..f732fdf29a 100644
--- a/gr-digital/python/ofdm_sync_ml.py
+++ b/gr-digital/python/ofdm_sync_ml.py
@@ -23,6 +23,11 @@
import math
from gnuradio import gr
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
class ofdm_sync_ml(gr.hier_block2):
def __init__(self, fft_length, cp_length, snr, kstime, logging):
''' Maximum Likelihood OFDM synchronizer:
@@ -57,7 +62,7 @@ class ofdm_sync_ml(gr.hier_block2):
self.adder = gr.add_ff()
moving_sum_taps = [rho/2 for i in range(cp_length)]
- self.moving_sum_filter = gr.fir_filter_fff(1,moving_sum_taps)
+ self.moving_sum_filter = filter.fir_filter_fff(1,moving_sum_taps)
self.connect(self.input,self.magsqrd1)
self.connect(self.delay,self.magsqrd2)
@@ -71,7 +76,7 @@ class ofdm_sync_ml(gr.hier_block2):
self.mixer = gr.multiply_cc();
movingsum2_taps = [1.0 for i in range(cp_length)]
- self.movingsum2 = gr.fir_filter_ccf(1,movingsum2_taps)
+ self.movingsum2 = filter.fir_filter_ccf(1,movingsum2_taps)
# Correlator data handler
self.c2mag = gr.complex_to_mag()
@@ -115,7 +120,7 @@ class ofdm_sync_ml(gr.hier_block2):
# to readjust the timing in the middle of the packet or we ruin the equalizer settings.
kstime = [k.conjugate() for k in kstime]
kstime.reverse()
- self.kscorr = gr.fir_filter_ccc(1, kstime)
+ self.kscorr = filter.fir_filter_ccc(1, kstime)
self.corrmag = gr.complex_to_mag_squared()
self.div = gr.divide_ff()
diff --git a/gr-digital/python/ofdm_sync_pn.py b/gr-digital/python/ofdm_sync_pn.py
index 05b1de2e19..8307a8ee14 100644
--- a/gr-digital/python/ofdm_sync_pn.py
+++ b/gr-digital/python/ofdm_sync_pn.py
@@ -24,6 +24,11 @@ import math
from numpy import fft
from gnuradio import gr
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
class ofdm_sync_pn(gr.hier_block2):
def __init__(self, fft_length, cp_length, logging=False):
"""
@@ -51,19 +56,19 @@ class ofdm_sync_pn(gr.hier_block2):
# Create a moving sum filter for the corr output
if 1:
moving_sum_taps = [1.0 for i in range(fft_length//2)]
- self.moving_sum_filter = gr.fir_filter_ccf(1,moving_sum_taps)
+ self.moving_sum_filter = filter.fir_filter_ccf(1,moving_sum_taps)
else:
moving_sum_taps = [complex(1.0,0.0) for i in range(fft_length//2)]
- self.moving_sum_filter = gr.fft_filter_ccc(1,moving_sum_taps)
+ self.moving_sum_filter = filter.fft_filter_ccc(1,moving_sum_taps)
# Create a moving sum filter for the input
self.inputmag2 = gr.complex_to_mag_squared()
movingsum2_taps = [1.0 for i in range(fft_length//2)]
if 1:
- self.inputmovingsum = gr.fir_filter_fff(1,movingsum2_taps)
+ self.inputmovingsum = filter.fir_filter_fff(1,movingsum2_taps)
else:
- self.inputmovingsum = gr.fft_filter_fff(1,movingsum2_taps)
+ self.inputmovingsum = filter.fft_filter_fff(1,movingsum2_taps)
self.square = gr.multiply_ff()
self.normalize = gr.divide_ff()
@@ -100,7 +105,7 @@ class ofdm_sync_pn(gr.hier_block2):
# Create a moving sum filter for the corr output
matched_filter_taps = [1.0/cp_length for i in range(cp_length)]
- self.matched_filter = gr.fir_filter_fff(1,matched_filter_taps)
+ self.matched_filter = filter.fir_filter_fff(1,matched_filter_taps)
self.connect(self.normalize, self.matched_filter)
self.connect(self.matched_filter, self.sub1, self.pk_detect)
diff --git a/gr-digital/python/ofdm_sync_pnac.py b/gr-digital/python/ofdm_sync_pnac.py
index 10a1259641..a5edc272a8 100644
--- a/gr-digital/python/ofdm_sync_pnac.py
+++ b/gr-digital/python/ofdm_sync_pnac.py
@@ -24,6 +24,11 @@ import math
from numpy import fft
from gnuradio import gr
+try:
+ from gnuradio import filter
+except ImportError:
+ import filter_swig as filter
+
class ofdm_sync_pnac(gr.hier_block2):
def __init__(self, fft_length, cp_length, kstime, logging=False):
"""
@@ -59,7 +64,7 @@ class ofdm_sync_pnac(gr.hier_block2):
# cross-correlate with the known symbol
kstime = [k.conjugate() for k in kstime[0:fft_length//2]]
kstime.reverse()
- self.crosscorr_filter = gr.fir_filter_ccc(1, kstime)
+ self.crosscorr_filter = filter.fir_filter_ccc(1, kstime)
# Create a delay line
self.delay = gr.delay(gr.sizeof_gr_complex, fft_length/2)
@@ -71,7 +76,7 @@ class ofdm_sync_pnac(gr.hier_block2):
# Create a moving sum filter for the input
self.mag = gr.complex_to_mag_squared()
movingsum_taps = (fft_length//1)*[1.0,]
- self.power = gr.fir_filter_fff(1,movingsum_taps)
+ self.power = filter.fir_filter_fff(1,movingsum_taps)
# Get magnitude (peaks) and angle (phase/freq error)
self.c2mag = gr.complex_to_mag_squared()
diff --git a/gr-digital/python/pkt.py b/gr-digital/python/pkt.py
index 283a150d2c..d084c3ff0f 100644
--- a/gr-digital/python/pkt.py
+++ b/gr-digital/python/pkt.py
@@ -23,7 +23,7 @@ from math import pi
from gnuradio import gr
import gnuradio.gr.gr_threading as _threading
import packet_utils
-import digital_swig
+import digital_swig as digital
# /////////////////////////////////////////////////////////////////////////////
@@ -136,7 +136,7 @@ class demod_pkts(gr.hier_block2):
threshold = 12 # FIXME raise exception
self._rcvd_pktq = gr.msg_queue() # holds packets from the PHY
- self.correlator = digital_swig.correlate_access_code_bb(access_code, threshold)
+ self.correlator = digital.correlate_access_code_bb(access_code, threshold)
self.framer_sink = digital.framer_sink_1(self._rcvd_pktq)
self.connect(self, self._demodulator, self.correlator, self.framer_sink)
diff --git a/gr-digital/python/qa_binary_slicer_fb.py b/gr-digital/python/qa_binary_slicer_fb.py
index 60d92c5d19..22f7da73ff 100755
--- a/gr-digital/python/qa_binary_slicer_fb.py
+++ b/gr-digital/python/qa_binary_slicer_fb.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,33 +21,33 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import math, random
-class test_binary_slicer_fb (gr_unittest.TestCase):
+class test_binary_slicer_fb(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_binary_slicer_fb (self):
+ def test_binary_slicer_fb(self):
expected_result = ( 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1)
src_data = (-1, 1, -1, -1, 1, 1, -1, -1, -1, 1, 1, 1, -1, 1, 1, 1, 1)
src_data = [s + (1 - random.random()) for s in src_data] # add some noise
- src = gr.vector_source_f (src_data)
- op = digital_swig.binary_slicer_fb ()
- dst = gr.vector_sink_b ()
+ src = gr.vector_source_f(src_data)
+ op = digital.binary_slicer_fb()
+ dst = gr.vector_sink_b()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
+ actual_result = dst.data() # fetch the contents of the sink
#print "actual result", actual_result
#print "expected result", expected_result
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_bytes_to_syms.py b/gr-digital/python/qa_bytes_to_syms.py
deleted file mode 100755
index 75475a95b2..0000000000
--- a/gr-digital/python/qa_bytes_to_syms.py
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2004,2007,2010,2012 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-from gnuradio import gr, gr_unittest
-import digital_swig as digital
-import math
-
-class test_bytes_to_syms (gr_unittest.TestCase):
-
- def setUp (self):
- self.tb = gr.top_block ()
-
- def tearDown (self):
- self.tb = None
-
- def test_bytes_to_syms_001 (self):
- src_data = (0x01, 0x80, 0x03)
- expected_result = (-1, -1, -1, -1, -1, -1, -1, +1,
- +1, -1, -1, -1, -1, -1, -1, -1,
- -1, -1, -1, -1, -1, -1, +1, +1)
- src = gr.vector_source_b (src_data)
- op = digital.bytes_to_syms ()
- dst = gr.vector_sink_f ()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
-
-if __name__ == '__main__':
- gr_unittest.run(test_bytes_to_syms, "test_bytes_to_syms.xml")
-
diff --git a/gr-digital/python/qa_chunks_to_symbols.py b/gr-digital/python/qa_chunks_to_symbols.py
index 63af10d8ff..5ffe425132 100755
--- a/gr-digital/python/qa_chunks_to_symbols.py
+++ b/gr-digital/python/qa_chunks_to_symbols.py
@@ -25,10 +25,10 @@ import digital_swig as digital
class test_chunks_to_symbols(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def test_bc_001(self):
diff --git a/gr-digital/python/qa_clock_recovery_mm.py b/gr-digital/python/qa_clock_recovery_mm.py
index f4c345b034..e904cf4c21 100755
--- a/gr-digital/python/qa_clock_recovery_mm.py
+++ b/gr-digital/python/qa_clock_recovery_mm.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,18 +21,18 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import random, cmath
class test_clock_recovery_mm(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# Test complex/complex version
omega = 2
gain_omega = 0.001
@@ -40,9 +40,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.01
omega_rel_lim = 0.001
- self.test = digital_swig.clock_recovery_mm_cc(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_cc(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 100*[complex(1, 1),]
self.src = gr.vector_source_c(data, False)
@@ -64,10 +64,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5)
- def test02 (self):
+ def test02(self):
# Test float/float version
omega = 2
gain_omega = 0.01
@@ -75,9 +75,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.01
omega_rel_lim = 0.001
- self.test = digital_swig.clock_recovery_mm_ff(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_ff(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 100*[1,]
self.src = gr.vector_source_f(data, False)
@@ -99,10 +99,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 5)
- def test03 (self):
+ def test03(self):
# Test complex/complex version with varying input
omega = 2
gain_omega = 0.01
@@ -110,9 +110,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.1
omega_rel_lim = 0.0001
- self.test = digital_swig.clock_recovery_mm_cc(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_cc(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 1000*[complex(1, 1), complex(1, 1), complex(-1, -1), complex(-1, -1)]
self.src = gr.vector_source_c(data, False)
@@ -134,10 +134,10 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
- def test04 (self):
+ def test04(self):
# Test float/float version
omega = 2
gain_omega = 0.01
@@ -145,9 +145,9 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
gain_mu = 0.1
omega_rel_lim = 0.001
- self.test = digital_swig.clock_recovery_mm_ff(omega, gain_omega,
- mu, gain_mu,
- omega_rel_lim)
+ self.test = digital.clock_recovery_mm_ff(omega, gain_omega,
+ mu, gain_mu,
+ omega_rel_lim)
data = 1000*[1, 1, -1, -1]
self.src = gr.vector_source_f(data, False)
@@ -169,7 +169,7 @@ class test_clock_recovery_mm(gr_unittest.TestCase):
#print expected_result
#print dst_data
- self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 1)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_cma_equalizer.py b/gr-digital/python/qa_cma_equalizer.py
index 75fb0f05ed..2af1505c1c 100755
--- a/gr-digital/python/qa_cma_equalizer.py
+++ b/gr-digital/python/qa_cma_equalizer.py
@@ -21,7 +21,7 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
class test_cma_equalizer_fir(gr_unittest.TestCase):
@@ -33,7 +33,7 @@ class test_cma_equalizer_fir(gr_unittest.TestCase):
def transform(self, src_data):
SRC = gr.vector_source_c(src_data, False)
- EQU = digital_swig.cma_equalizer_cc(4, 1.0, .001, 1)
+ EQU = digital.cma_equalizer_cc(4, 1.0, .001, 1)
DST = gr.vector_sink_c()
self.tb.connect(SRC, EQU, DST)
self.tb.run()
@@ -44,7 +44,11 @@ class test_cma_equalizer_fir(gr_unittest.TestCase):
src_data = (1+0j, 0+1j, -1+0j, 0-1j)*1000
expected_data = src_data
result = self.transform(src_data)
- self.assertComplexTuplesAlmostEqual(expected_data, result)
+
+ # only test last N samples to allow for settling. Also adjust
+ # for a 1 sample delay in the filter.
+ N = -500
+ self.assertComplexTuplesAlmostEqual(expected_data[N:-1], result[N+1:])
if __name__ == "__main__":
gr_unittest.run(test_cma_equalizer_fir, "test_cma_equalizer_fir.xml")
diff --git a/gr-digital/python/qa_constellation.py b/gr-digital/python/qa_constellation.py
index e0b5b3888e..750337a119 100755
--- a/gr-digital/python/qa_constellation.py
+++ b/gr-digital/python/qa_constellation.py
@@ -25,7 +25,7 @@ from cmath import exp, pi, log
from gnuradio import gr, gr_unittest, blks2
from utils import mod_codes
-import digital_swig
+import digital_swig as digital
# import from local folder
import psk
@@ -50,7 +50,7 @@ def twod_constell():
(-1+0j), (0-1j))
rot_sym = 2
dim = 2
- return digital_swig.constellation_calcdist(points, [], rot_sym, dim)
+ return digital.constellation_calcdist(points, [], rot_sym, dim)
def threed_constell():
oned_points = ((1+0j), (0+1j), (-1+0j), (0-1j))
@@ -62,7 +62,7 @@ def threed_constell():
points += [oned_points[ia], oned_points[ib], oned_points[ic]]
rot_sym = 4
dim = 3
- return digital_swig.constellation_calcdist(points, [], rot_sym, dim)
+ return digital.constellation_calcdist(points, [], rot_sym, dim)
tested_constellation_info = (
(psk.psk_constellation,
@@ -85,10 +85,10 @@ tested_constellation_info = (
'mod_code': tested_mod_codes,
'differential': (False,)},
False, None),
- (digital_swig.constellation_bpsk, {}, True, None),
- (digital_swig.constellation_qpsk, {}, False, None),
- (digital_swig.constellation_dqpsk, {}, True, None),
- (digital_swig.constellation_8psk, {}, False, None),
+ (digital.constellation_bpsk, {}, True, None),
+ (digital.constellation_qpsk, {}, False, None),
+ (digital.constellation_dqpsk, {}, True, None),
+ (digital.constellation_8psk, {}, False, None),
(twod_constell, {}, True, None),
(threed_constell, {}, True, None),
)
@@ -123,7 +123,7 @@ def tested_constellations():
break
-class test_constellation (gr_unittest.TestCase):
+class test_constellation(gr_unittest.TestCase):
src_length = 256
@@ -151,7 +151,7 @@ class test_constellation (gr_unittest.TestCase):
data = dst.data()
# Don't worry about cut off data for now.
first = constellation.bits_per_symbol()
- self.assertEqual (self.src_data[first:len(data)], data[first:])
+ self.assertEqual(self.src_data[first:len(data)], data[first:])
class mod_demod(gr.hier_block2):
@@ -173,8 +173,7 @@ class mod_demod(gr.hier_block2):
self.blocks = [self]
# We expect a stream of unpacked bits.
# First step is to pack them.
- self.blocks.append(
- gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST))
+ self.blocks.append(gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST))
# Second step we unpack them such that we have k bits in each byte where
# each constellation symbol hold k bits.
self.blocks.append(
@@ -183,13 +182,13 @@ class mod_demod(gr.hier_block2):
# Apply any pre-differential coding
# Gray-coding is done here if we're also using differential coding.
if self.constellation.apply_pre_diff_code():
- self.blocks.append(digital_swig.map_bb(self.constellation.pre_diff_code()))
+ self.blocks.append(digital.map_bb(self.constellation.pre_diff_code()))
# Differential encoding.
if self.differential:
- self.blocks.append(digital_swig.diff_encoder_bb(arity))
+ self.blocks.append(digital.diff_encoder_bb(arity))
# Convert to constellation symbols.
- self.blocks.append(digital_swig.chunks_to_symbols_bc(self.constellation.points(),
- self.constellation.dimensionality()))
+ self.blocks.append(digital.chunks_to_symbols_bc(self.constellation.points(),
+ self.constellation.dimensionality()))
# CHANNEL
# Channel just consists of a rotation to check differential coding.
if rotation is not None:
@@ -197,13 +196,13 @@ class mod_demod(gr.hier_block2):
# RX
# Convert the constellation symbols back to binary values.
- self.blocks.append(digital_swig.constellation_decoder_cb(self.constellation.base()))
+ self.blocks.append(digital.constellation_decoder_cb(self.constellation.base()))
# Differential decoding.
if self.differential:
- self.blocks.append(digital_swig.diff_decoder_bb(arity))
+ self.blocks.append(digital.diff_decoder_bb(arity))
# Decode any pre-differential coding.
if self.constellation.apply_pre_diff_code():
- self.blocks.append(digital_swig.map_bb(
+ self.blocks.append(digital.map_bb(
mod_codes.invert_code(self.constellation.pre_diff_code())))
# unpack the k bit vector into a stream of bits
self.blocks.append(gr.unpack_k_bits_bb(
@@ -214,7 +213,6 @@ class mod_demod(gr.hier_block2):
self.blocks.append(self)
self.connect(*self.blocks)
-
if __name__ == '__main__':
gr_unittest.run(test_constellation, "test_constellation.xml")
diff --git a/gr-digital/python/qa_constellation_decoder_cb.py b/gr-digital/python/qa_constellation_decoder_cb.py
index 5401a07fc0..6a93b6e743 100755
--- a/gr-digital/python/qa_constellation_decoder_cb.py
+++ b/gr-digital/python/qa_constellation_decoder_cb.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2004,2007,2010,2011 Free Software Foundation, Inc.
+# Copyright 2004,2007,2010-2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,54 +21,54 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import math
-class test_constellation_decoder (gr_unittest.TestCase):
+class test_constellation_decoder(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_constellation_decoder_cb_bpsk (self):
- cnst = digital_swig.constellation_bpsk()
+ def test_constellation_decoder_cb_bpsk(self):
+ cnst = digital.constellation_bpsk()
src_data = (0.5 + 0.5j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j,
0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j)
expected_result = ( 1, 1, 0, 0,
1, 0, 1)
- src = gr.vector_source_c (src_data)
- op = digital_swig.constellation_decoder_cb (cnst.base())
- dst = gr.vector_sink_b ()
+ src = gr.vector_source_c(src_data)
+ op = digital.constellation_decoder_cb(cnst.base())
+ dst = gr.vector_sink_b()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
+ actual_result = dst.data() # fetch the contents of the sink
#print "actual result", actual_result
#print "expected result", expected_result
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
- def test_constellation_decoder_cb_qpsk (self):
- cnst = digital_swig.constellation_qpsk()
+ def _test_constellation_decoder_cb_qpsk(self):
+ cnst = digital.constellation_qpsk()
src_data = (0.5 + 0.5j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j,
0.8 + 1.0j, -0.5 + 0.1j, 0.1 - 1.2j)
expected_result = ( 3, 1, 0, 2,
3, 2, 1)
- src = gr.vector_source_c (src_data)
- op = digital_swig.constellation_decoder_cb (cnst.base())
- dst = gr.vector_sink_b ()
+ src = gr.vector_source_c(src_data)
+ op = digital_swig.constellation_decoder_cb(cnst.base())
+ dst = gr.vector_sink_b()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
+ actual_result = dst.data() # fetch the contents of the sink
#print "actual result", actual_result
#print "expected result", expected_result
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_constellation_receiver.py b/gr-digital/python/qa_constellation_receiver.py
index 8c2d2da0c5..871df2da28 100755
--- a/gr-digital/python/qa_constellation_receiver.py
+++ b/gr-digital/python/qa_constellation_receiver.py
@@ -24,7 +24,8 @@ import random
from gnuradio import gr, blks2, gr_unittest
from utils import mod_codes, alignment
-import digital_swig, packet_utils
+import packet_utils
+import filter_swig as filter
from generic_mod_demod import generic_mod, generic_demod
from qa_constellation import tested_constellations, twod_constell
@@ -51,7 +52,7 @@ FREQ_BW = 2*math.pi/100.0
PHASE_BW = 2*math.pi/100.0
-class test_constellation_receiver (gr_unittest.TestCase):
+class test_constellation_receiver(gr_unittest.TestCase):
# We ignore the first half of the output data since often it takes
# a while for the receiver to lock on.
@@ -104,7 +105,7 @@ class test_constellation_receiver (gr_unittest.TestCase):
self.assertTrue(correct > REQ_CORRECT)
-class rec_test_tb (gr.top_block):
+class rec_test_tb(gr.top_block):
"""
Takes a constellation an runs a generic modulation, channel,
and generic demodulation.
@@ -130,9 +131,9 @@ class rec_test_tb (gr.top_block):
mod = generic_mod(constellation, differential=differential)
# Channel
if freq_offset:
- channel = gr.channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET)
+ channel = filter.channel_model(NOISE_VOLTAGE, FREQUENCY_OFFSET, TIMING_OFFSET)
else:
- channel = gr.channel_model(NOISE_VOLTAGE, 0, TIMING_OFFSET)
+ channel = filter.channel_model(NOISE_VOLTAGE, 0, TIMING_OFFSET)
# Receiver Blocks
if freq_offset:
demod = generic_demod(constellation, differential=differential,
diff --git a/gr-digital/python/qa_correlate_access_code.py b/gr-digital/python/qa_correlate_access_code.py
index 96246dcfb9..5a5f2209f7 100755
--- a/gr-digital/python/qa_correlate_access_code.py
+++ b/gr-digital/python/qa_correlate_access_code.py
@@ -52,13 +52,13 @@ class test_correlate_access_code(gr_unittest.TestCase):
# 0 0 0 1 0 0 0 1
src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
expected_result = pad + (1, 0, 1, 1, 3, 1, 0, 1, 1, 2) + (0,) * 6
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.correlate_access_code_bb("1011", 0)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
def test_002(self):
@@ -69,13 +69,13 @@ class test_correlate_access_code(gr_unittest.TestCase):
#print access_code
src_data = code + (1, 0, 1, 1) + pad
expected_result = pad + code + (3, 0, 1, 1)
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.correlate_access_code_bb(access_code, 0)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
def test_003(self):
code = tuple(string_to_1_0_list(default_access_code))
@@ -85,14 +85,13 @@ class test_correlate_access_code(gr_unittest.TestCase):
#print access_code
src_data = code + (1, 0, 1, 1) + pad
expected_result = code + (1, 0, 1, 1) + pad
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.correlate_access_code_tag_bb(access_code, 0, "test")
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
-
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code, "test_correlate_access_code.xml")
diff --git a/gr-digital/python/qa_costas_loop_cc.py b/gr-digital/python/qa_costas_loop_cc.py
index 75fdbc2f84..365eda736a 100755
--- a/gr-digital/python/qa_costas_loop_cc.py
+++ b/gr-digital/python/qa_costas_loop_cc.py
@@ -21,22 +21,23 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig, psk
+import digital_swig as digital
+import psk
import random, cmath
class test_costas_loop_cc(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# test basic functionality by setting all gains to 0
natfreq = 0.0
order = 2
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
data = 100*[complex(1,0),]
self.src = gr.vector_source_c(data, False)
@@ -47,13 +48,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
expected_result = data
dst_data = self.snk.data()
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5)
- def test02 (self):
+ def test02(self):
# Make sure it doesn't diverge given perfect data
natfreq = 0.25
order = 2
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)]
self.src = gr.vector_source_c(data, False)
@@ -65,13 +66,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
expected_result = data
dst_data = self.snk.data()
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 5)
- def test03 (self):
+ def test03(self):
# BPSK Convergence test with static rotation
natfreq = 0.25
order = 2
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
rot = cmath.exp(0.2j) # some small rotation
data = [complex(2*random.randint(0,1)-1, 0) for i in xrange(100)]
@@ -90,13 +91,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
# generously compare results; the loop will converge near to, but
# not exactly on, the target data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2)
- def test04 (self):
+ def test04(self):
# QPSK Convergence test with static rotation
natfreq = 0.25
order = 4
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
rot = cmath.exp(0.2j) # some small rotation
data = [complex(2*random.randint(0,1)-1, 2*random.randint(0,1)-1)
@@ -116,13 +117,13 @@ class test_costas_loop_cc(gr_unittest.TestCase):
# generously compare results; the loop will converge near to, but
# not exactly on, the target data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2)
- def test05 (self):
+ def test05(self):
# 8PSK Convergence test with static rotation
natfreq = 0.25
order = 8
- self.test = digital_swig.costas_loop_cc(natfreq, order)
+ self.test = digital.costas_loop_cc(natfreq, order)
rot = cmath.exp(-cmath.pi/8.0j) # rotate to match Costas rotation
const = psk.psk_constellation(order)
@@ -145,7 +146,7 @@ class test_costas_loop_cc(gr_unittest.TestCase):
# generously compare results; the loop will converge near to, but
# not exactly on, the target data
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 2)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 2)
if __name__ == '__main__':
gr_unittest.run(test_costas_loop_cc, "test_costas_loop_cc.xml")
diff --git a/gr-digital/python/qa_cpm.py b/gr-digital/python/qa_cpm.py
index 12a84c76c2..2221d16b6f 100755
--- a/gr-digital/python/qa_cpm.py
+++ b/gr-digital/python/qa_cpm.py
@@ -21,15 +21,15 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import numpy
class test_cpm(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def do_check_phase_shift(self, type, name):
@@ -37,7 +37,7 @@ class test_cpm(gr_unittest.TestCase):
L = 1
in_bits = (1,) * 20
src = gr.vector_source_b(in_bits, False)
- cpm = digital_swig.cpmmod_bc(type, 0.5, sps, L)
+ cpm = digital.cpmmod_bc(type, 0.5, sps, L)
arg = gr.complex_to_arg()
sink = gr.vector_sink_f()
@@ -68,7 +68,7 @@ class test_cpm(gr_unittest.TestCase):
bt = 0.3
in_bits = (1,) * 20
src = gr.vector_source_b(in_bits, False)
- gmsk = digital_swig.gmskmod_bc(sps, bt, L)
+ gmsk = digital.gmskmod_bc(sps, L, bt)
arg = gr.complex_to_arg()
sink = gr.vector_sink_f()
diff --git a/gr-digital/python/qa_crc32.py b/gr-digital/python/qa_crc32.py
index f86813f3f3..cd4006b1d3 100755
--- a/gr-digital/python/qa_crc32.py
+++ b/gr-digital/python/qa_crc32.py
@@ -21,40 +21,40 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
import random, cmath
class test_crc32(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
data = 100*"0"
expected_result = 2943744955
- result = digital_swig.crc32(data)
+ result = digital.crc32(data)
#print hex(result)
- self.assertEqual (expected_result, result)
+ self.assertEqual(expected_result, result)
- def test02 (self):
+ def test02(self):
data = 100*"1"
expected_result = 2326594156
- result = digital_swig.crc32(data)
+ result = digital.crc32(data)
#print hex(result)
- self.assertEqual (expected_result, result)
+ self.assertEqual(expected_result, result)
- def test03 (self):
+ def test03(self):
data = 10*"0123456789"
expected_result = 3774345973
- result = digital_swig.crc32(data)
+ result = digital.crc32(data)
#print hex(result)
- self.assertEqual (expected_result, result)
+ self.assertEqual(expected_result, result)
if __name__ == '__main__':
gr_unittest.run(test_crc32, "test_crc32.xml")
diff --git a/gr-digital/python/qa_diff_encoder.py b/gr-digital/python/qa_diff_encoder.py
index e4f5470af5..c28f4dbdf8 100755
--- a/gr-digital/python/qa_diff_encoder.py
+++ b/gr-digital/python/qa_diff_encoder.py
@@ -32,12 +32,12 @@ def make_random_int_tuple(L, min, max):
return tuple(result)
-class test_diff_encoder (gr_unittest.TestCase):
+class test_diff_encoder(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def test_diff_encdec_000(self):
diff --git a/gr-digital/python/qa_diff_phasor_cc.py b/gr-digital/python/qa_diff_phasor_cc.py
index 3e7617fe47..833158d0a8 100755
--- a/gr-digital/python/qa_diff_phasor_cc.py
+++ b/gr-digital/python/qa_diff_phasor_cc.py
@@ -24,25 +24,25 @@ from gnuradio import gr, gr_unittest
import digital_swig as digital
import math
-class test_diff_phasor (gr_unittest.TestCase):
+class test_diff_phasor(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_diff_phasor_cc (self):
+ def test_diff_phasor_cc(self):
src_data = (0+0j, 1+0j, -1+0j, 3+4j, -3-4j, -3+4j)
expected_result = (0+0j, 0+0j, -1+0j, -3-4j, -25+0j, -7-24j)
- src = gr.vector_source_c (src_data)
- op = digital.diff_phasor_cc ()
- dst = gr.vector_sink_c ()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run () # run the graph and wait for it to finish
- actual_result = dst.data () # fetch the contents of the sink
- self.assertComplexTuplesAlmostEqual (expected_result, actual_result)
+ src = gr.vector_source_c(src_data)
+ op = digital.diff_phasor_cc()
+ dst = gr.vector_sink_c()
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run() # run the graph and wait for it to finish
+ actual_result = dst.data() # fetch the contents of the sink
+ self.assertComplexTuplesAlmostEqual(expected_result, actual_result)
if __name__ == '__main__':
gr_unittest.run(test_diff_phasor, "test_diff_phasor.xml")
diff --git a/gr-digital/python/qa_digital.py b/gr-digital/python/qa_digital.py
index 97e35da568..6f54f14208 100755
--- a/gr-digital/python/qa_digital.py
+++ b/gr-digital/python/qa_digital.py
@@ -21,14 +21,14 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
class test_digital(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_fll_band_edge.py b/gr-digital/python/qa_fll_band_edge.py
index 9e4ca079b7..a4269931f5 100755
--- a/gr-digital/python/qa_fll_band_edge.py
+++ b/gr-digital/python/qa_fll_band_edge.py
@@ -21,18 +21,19 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
+import filter_swig as filter
import random, math
class test_fll_band_edge_cc(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
sps = 4
rolloff = 0.35
bw = 2*math.pi/100.0
@@ -49,14 +50,14 @@ class test_fll_band_edge_cc(gr_unittest.TestCase):
random.seed(0)
data = [2.0*random.randint(0, 2) - 1.0 for i in xrange(200)]
self.src = gr.vector_source_c(data, False)
- self.rrc = gr.interp_fir_filter_ccf(sps, rrc_taps)
+ self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps)
# Mix symbols with a complex sinusoid to spin them
self.nco = gr.sig_source_c(1, gr.GR_SIN_WAVE, foffset, 1)
self.mix = gr.multiply_cc()
# FLL will despin the symbols to an arbitrary phase
- self.fll = digital_swig.fll_band_edge_cc(sps, rolloff, ntaps, bw)
+ self.fll = digital.fll_band_edge_cc(sps, rolloff, ntaps, bw)
# Create sinks for all outputs of the FLL
# we will only care about the freq and error outputs
@@ -78,7 +79,7 @@ class test_fll_band_edge_cc(gr_unittest.TestCase):
dst_data = self.vsnk_frq.data()[N:]
expected_result = len(dst_data)* [-0.20,]
- self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 4)
+ self.assertFloatTuplesAlmostEqual(expected_result, dst_data, 4)
if __name__ == '__main__':
gr_unittest.run(test_fll_band_edge_cc, "test_fll_band_edge_cc.xml")
diff --git a/gr-digital/python/qa_framer_sink.py b/gr-digital/python/qa_framer_sink.py
index bccc86dc78..e717e6ae05 100755
--- a/gr-digital/python/qa_framer_sink.py
+++ b/gr-digital/python/qa_framer_sink.py
@@ -63,11 +63,11 @@ class test_framker_sink(gr_unittest.TestCase):
self.tb.connect(src, correlator, framer_sink)
self.tb.connect(correlator, vsnk)
- self.tb.run ()
+ self.tb.run()
result_data = rcvd_pktq.delete_head()
result_data = result_data.to_string()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_002(self):
@@ -87,11 +87,11 @@ class test_framker_sink(gr_unittest.TestCase):
self.tb.connect(src, correlator, framer_sink)
self.tb.connect(correlator, vsnk)
- self.tb.run ()
+ self.tb.run()
result_data = rcvd_pktq.delete_head()
result_data = result_data.to_string()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
if __name__ == '__main__':
gr_unittest.run(test_framker_sink, "test_framker_sink.xml")
diff --git a/gr-digital/python/qa_glfsr_source.py b/gr-digital/python/qa_glfsr_source.py
index 7d02037335..c5adab3023 100755
--- a/gr-digital/python/qa_glfsr_source.py
+++ b/gr-digital/python/qa_glfsr_source.py
@@ -25,10 +25,10 @@ import digital_swig as digital
class test_glfsr_source(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def test_000_make_b(self):
diff --git a/gr-digital/python/qa_lms_equalizer.py b/gr-digital/python/qa_lms_equalizer.py
index 025c785aa4..7bde258e7f 100755
--- a/gr-digital/python/qa_lms_equalizer.py
+++ b/gr-digital/python/qa_lms_equalizer.py
@@ -21,7 +21,7 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
+import digital_swig as digital
class test_lms_dd_equalizer(gr_unittest.TestCase):
@@ -33,7 +33,7 @@ class test_lms_dd_equalizer(gr_unittest.TestCase):
def transform(self, src_data, gain, const):
SRC = gr.vector_source_c(src_data, False)
- EQU = digital_swig.lms_dd_equalizer_cc(4, gain, 1, const.base())
+ EQU = digital.lms_dd_equalizer_cc(4, gain, 1, const.base())
DST = gr.vector_sink_c()
self.tb.connect(SRC, EQU, DST)
self.tb.run()
@@ -41,13 +41,17 @@ class test_lms_dd_equalizer(gr_unittest.TestCase):
def test_001_identity(self):
# Constant modulus signal so no adjustments
- const = digital_swig.constellation_qpsk()
+ const = digital.constellation_qpsk()
src_data = const.points()*1000
N = 100 # settling time
expected_data = src_data[N:]
result = self.transform(src_data, 0.1, const)[N:]
- self.assertComplexTuplesAlmostEqual(expected_data, result, 5)
+
+ # only test last N samples to allow for settling. Also adjust
+ # for a 1 sample delay in the filter.
+ N = -500
+ self.assertComplexTuplesAlmostEqual(expected_data[N:-1], result[N+1:])
if __name__ == "__main__":
gr_unittest.run(test_lms_dd_equalizer, "test_lms_dd_equalizer.xml")
diff --git a/gr-digital/python/qa_map.py b/gr-digital/python/qa_map.py
index 3ad99a2c12..0fd7c479a1 100755
--- a/gr-digital/python/qa_map.py
+++ b/gr-digital/python/qa_map.py
@@ -34,14 +34,14 @@ class test_map(gr_unittest.TestCase):
def helper(self, symbols):
src_data = [0, 1, 2, 3, 0, 1, 2, 3]
expected_data = map(lambda x: symbols[x], src_data)
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.map_bb(symbols)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op, dst)
- self.tb.run ()
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
result_data = list(dst.data())
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_001(self):
symbols = [0, 0, 0, 0]
diff --git a/gr-digital/python/qa_mpsk_receiver.py b/gr-digital/python/qa_mpsk_receiver.py
index e1f16ee671..bde8895e76 100755
--- a/gr-digital/python/qa_mpsk_receiver.py
+++ b/gr-digital/python/qa_mpsk_receiver.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -21,101 +21,128 @@
#
from gnuradio import gr, gr_unittest
-import digital_swig
-import random, cmath
+import digital_swig as digital
+import filter_swig as filter
+import random, cmath, time
class test_mpsk_receiver(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# Test BPSK sync
M = 2
theta = 0
loop_bw = cmath.pi/100.0
fmin = -0.5
fmax = 0.5
- mu = 0.25
+ mu = 0.5
gain_mu = 0.01
omega = 2
gain_omega = 0.001
omega_rel = 0.001
- self.test = digital_swig.mpsk_receiver_cc(M, theta, loop_bw,
- fmin, fmax, mu, gain_mu,
- omega, gain_omega,
- omega_rel)
+ self.test = digital.mpsk_receiver_cc(M, theta, loop_bw,
+ fmin, fmax, mu, gain_mu,
+ omega, gain_omega,
+ omega_rel)
- data = 1000*[complex(1,0), complex(1,0), complex(-1,0), complex(-1,0)]
+ data = 10000*[complex(1,0), complex(-1,0)]
+ #data = [2*random.randint(0,1)-1 for x in xrange(10000)]
self.src = gr.vector_source_c(data, False)
self.snk = gr.vector_sink_c()
- self.tb.connect(self.src, self.test, self.snk)
+ # pulse shaping interpolation filter
+ nfilts = 32
+ excess_bw = 0.35
+ ntaps = 11 * int(omega*nfilts)
+ rrc_taps0 = filter.firdes.root_raised_cosine(
+ nfilts, nfilts, 1.0, excess_bw, ntaps)
+ rrc_taps1 = filter.firdes.root_raised_cosine(
+ 1, omega, 1.0, excess_bw, 11*omega)
+ self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0)
+ self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1)
+
+ self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[complex(-0.5,0), complex(0.5,0)]
+ expected_result = [0.5*d for d in data]
dst_data = self.snk.data()
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
- expected_result = expected_result[len_e - Ncmp:]
+ expected_result = expected_result[len_e - Ncmp-1:-1]
dst_data = dst_data[len_d - Ncmp:]
-
+
#for e,d in zip(expected_result, dst_data):
- # print e, d
+ # print "{0:+.02f} {1:+.02f}".format(e, d)
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
- def test02 (self):
+ def test02(self):
# Test QPSK sync
M = 4
theta = 0
- loop_bw = 2*cmath.pi/100.0
+ loop_bw = cmath.pi/100.0
fmin = -0.5
fmax = 0.5
- mu = 0.25
+ mu = 0.5
gain_mu = 0.01
omega = 2
gain_omega = 0.001
omega_rel = 0.001
- self.test = digital_swig.mpsk_receiver_cc(M, theta, loop_bw,
- fmin, fmax, mu, gain_mu,
- omega, gain_omega,
- omega_rel)
+ self.test = digital.mpsk_receiver_cc(M, theta, loop_bw,
+ fmin, fmax, mu, gain_mu,
+ omega, gain_omega,
+ omega_rel)
- data = 1000*[complex( 0.707, 0.707), complex( 0.707, 0.707),
- complex(-0.707, 0.707), complex(-0.707, 0.707),
- complex(-0.707, -0.707), complex(-0.707, -0.707),
- complex( 0.707, -0.707), complex( 0.707, -0.707)]
+ data = 10000*[complex( 0.707, 0.707),
+ complex(-0.707, 0.707),
+ complex(-0.707, -0.707),
+ complex( 0.707, -0.707)]
+ data = [0.5*d for d in data]
self.src = gr.vector_source_c(data, False)
self.snk = gr.vector_sink_c()
- self.tb.connect(self.src, self.test, self.snk)
+ # pulse shaping interpolation filter
+ nfilts = 32
+ excess_bw = 0.35
+ ntaps = 11 * int(omega*nfilts)
+ rrc_taps0 = filter.firdes.root_raised_cosine(
+ nfilts, nfilts, 1.0, excess_bw, ntaps)
+ rrc_taps1 = filter.firdes.root_raised_cosine(
+ 1, omega, 1.0, excess_bw, 11*omega)
+ self.rrc0 = filter.pfb_arb_resampler_ccf(omega, rrc_taps0)
+ self.rrc1 = filter.fir_filter_ccf(1, rrc_taps1)
+
+ self.tb.connect(self.src, self.rrc0, self.rrc1, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[complex(0, -1.0), complex(1.0, 0),
- complex(0, 1.0), complex(-1.0, 0)]
- dst_data = self.snk.data()
+ expected_result = 10000*[complex(-0.5, +0.0), complex(+0.0, -0.5),
+ complex(+0.5, +0.0), complex(+0.0, +0.5)]
+
+ # get data after a settling period
+ dst_data = self.snk.data()[200:]
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
- expected_result = expected_result[len_e - Ncmp:]
+ expected_result = expected_result[len_e - Ncmp - 1:-1]
dst_data = dst_data[len_d - Ncmp:]
#for e,d in zip(expected_result, dst_data):
- # print e, d
+ # print "{0:+.02f} {1:+.02f}".format(e, d)
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
if __name__ == '__main__':
gr_unittest.run(test_mpsk_receiver, "test_mpsk_receiver.xml")
diff --git a/gr-digital/python/qa_mpsk_snr_est.py b/gr-digital/python/qa_mpsk_snr_est.py
index d392567bfd..c976bf21a8 100755
--- a/gr-digital/python/qa_mpsk_snr_est.py
+++ b/gr-digital/python/qa_mpsk_snr_est.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2011 Free Software Foundation, Inc.
+# Copyright 2011,2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -29,94 +29,93 @@ def get_cplx():
def get_n_cplx():
return complex(random.random()-0.5, random.random()-0.5)
-class test_mpsk_snr_est (gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+class test_mpsk_snr_est(gr_unittest.TestCase):
+ def setUp(self):
+ self.tb = gr.top_block()
random.seed(0) # make repeatable
N = 10000
self._noise = [get_n_cplx() for i in xrange(N)]
self._bits = [get_cplx() for i in xrange(N)]
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def mpsk_snr_est_setup (self, op):
+ def mpsk_snr_est_setup(self, op):
result = []
for i in xrange(1,6):
src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)]
- src = gr.vector_source_c (src_data)
- dst = gr.null_sink (gr.sizeof_gr_complex)
+ src = gr.vector_source_c(src_data)
+ dst = gr.null_sink(gr.sizeof_gr_complex)
- tb = gr.top_block ()
- tb.connect (src, op)
- tb.connect (op, dst)
- tb.run () # run the graph and wait for it to finish
+ tb = gr.top_block()
+ tb.connect(src, op)
+ tb.connect(op, dst)
+ tb.run() # run the graph and wait for it to finish
result.append(op.snr())
return result
- def test_mpsk_snr_est_simple (self):
+ def test_mpsk_snr_est_simple(self):
expected_result = [11.48, 5.91, 3.30, 2.08, 1.46]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_SIMPLE, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_SIMPLE, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_mpsk_snr_est_skew (self):
+ def test_mpsk_snr_est_skew(self):
expected_result = [11.48, 5.91, 3.30, 2.08, 1.46]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_SKEW, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_SKEW, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_mpsk_snr_est_m2m4 (self):
+ def test_mpsk_snr_est_m2m4(self):
expected_result = [11.02, 6.20, 4.98, 5.16, 5.66]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_M2M4, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_M2M4, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_mpsk_snr_est_svn (self):
+ def test_mpsk_snr_est_svn(self):
expected_result = [10.90, 6.00, 4.76, 4.97, 5.49]
N = 10000
alpha = 0.001
- op = digital.mpsk_snr_est_cc (digital.SNR_EST_SVR, N, alpha)
+ op = digital.mpsk_snr_est_cc(digital.SNR_EST_SVR, N, alpha)
actual_result = self.mpsk_snr_est_setup(op)
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
- def test_probe_mpsk_snr_est_m2m4 (self):
+ def test_probe_mpsk_snr_est_m2m4(self):
expected_result = [11.02, 6.20, 4.98, 5.16, 5.66]
actual_result = []
for i in xrange(1,6):
src_data = [b+(i*n) for b,n in zip(self._bits, self._noise)]
- src = gr.vector_source_c (src_data)
+ src = gr.vector_source_c(src_data)
N = 10000
alpha = 0.001
- op = digital.probe_mpsk_snr_est_c (digital.SNR_EST_M2M4, N, alpha)
+ op = digital.probe_mpsk_snr_est_c(digital.SNR_EST_M2M4, N, alpha)
- tb = gr.top_block ()
- tb.connect (src, op)
- tb.run () # run the graph and wait for it to finish
+ tb = gr.top_block()
+ tb.connect(src, op)
+ tb.run() # run the graph and wait for it to finish
actual_result.append(op.snr())
- self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 2)
-
+ self.assertFloatTuplesAlmostEqual(expected_result, actual_result, 2)
if __name__ == '__main__':
# Test various SNR estimators; we're not using a Gaussian
diff --git a/gr-digital/python/qa_ofdm_insert_preamble.py b/gr-digital/python/qa_ofdm_insert_preamble.py
index c45893fa38..d084796644 100755
--- a/gr-digital/python/qa_ofdm_insert_preamble.py
+++ b/gr-digital/python/qa_ofdm_insert_preamble.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2007,2010,2011 Free Software Foundation, Inc.
+# Copyright 2007,2010-2012 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -22,14 +22,14 @@
from gnuradio import gr, gr_unittest
from pprint import pprint
-import digital_swig
+import digital_swig as digital
-class test_ofdm_insert_preamble (gr_unittest.TestCase):
+class test_ofdm_insert_preamble(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
def helper(self, v0, v1, fft_length, preamble):
@@ -41,7 +41,7 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase):
# print "len(v) = %d" % (len(v))
- op = digital_swig.ofdm_insert_preamble(fft_length, preamble)
+ op = digital.ofdm_insert_preamble(fft_length, preamble)
v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_length)
dst0 = gr.vector_sink_c()
@@ -105,7 +105,6 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase):
p.append(tuple(t))
v += t
-
r = self.helper(v, npayloads*[1], fft_length, preamble)
self.assertEqual(r[0], tuple(npayloads*[1, 0]))
@@ -175,6 +174,5 @@ class test_ofdm_insert_preamble (gr_unittest.TestCase):
p0, p1, p[12], p[13],
p0, p1, p[14], p[15]))
-
if __name__ == '__main__':
gr_unittest.run(test_ofdm_insert_preamble, "test_ofdm_insert_preamble.xml")
diff --git a/gr-digital/python/qa_pfb_clock_sync.py b/gr-digital/python/qa_pfb_clock_sync.py
index 06c8a60ba7..44419264f7 100755
--- a/gr-digital/python/qa_pfb_clock_sync.py
+++ b/gr-digital/python/qa_pfb_clock_sync.py
@@ -21,18 +21,19 @@
#
from gnuradio import gr, gr_unittest
+import filter_swig as filter
import digital_swig as digital
import random, cmath
class test_pfb_clock_sync(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test01 (self):
+ def test01(self):
# Test BPSK sync
excess_bw = 0.35
@@ -52,7 +53,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
max_rate_deviation,
osps)
- data = 1000*[complex(1,0), complex(-1,0)]
+ data = 10000*[complex(1,0), complex(-1,0)]
self.src = gr.vector_source_c(data, False)
# pulse shaping interpolation filter
@@ -62,18 +63,18 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
1.0, # symbol rate
excess_bw, # excess bandwidth (roll-off factor)
ntaps)
- self.rrc_filter = gr.pfb_arb_resampler_ccf(sps, rrc_taps)
+ self.rrc_filter = filter.pfb_arb_resampler_ccf(sps, rrc_taps)
self.snk = gr.vector_sink_c()
self.tb.connect(self.src, self.rrc_filter, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[complex(-1,0), complex(1,0)]
+ expected_result = 10000*[complex(-1,0), complex(1,0)]
dst_data = self.snk.data()
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
expected_result = expected_result[len_e - Ncmp:]
@@ -82,10 +83,10 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
#for e,d in zip(expected_result, dst_data):
# print e, d
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
- def test02 (self):
+ def test02(self):
# Test real BPSK sync
excess_bw = 0.35
@@ -105,7 +106,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
max_rate_deviation,
osps)
- data = 1000*[1, -1]
+ data = 10000*[1, -1]
self.src = gr.vector_source_f(data, False)
# pulse shaping interpolation filter
@@ -115,18 +116,18 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
1.0, # symbol rate
excess_bw, # excess bandwidth (roll-off factor)
ntaps)
- self.rrc_filter = gr.pfb_arb_resampler_fff(sps, rrc_taps)
+ self.rrc_filter = filter.pfb_arb_resampler_fff(sps, rrc_taps)
self.snk = gr.vector_sink_f()
self.tb.connect(self.src, self.rrc_filter, self.test, self.snk)
self.tb.run()
- expected_result = 1000*[-1, 1]
+ expected_result = 10000*[-1, 1]
dst_data = self.snk.data()
# Only compare last Ncmp samples
- Ncmp = 100
+ Ncmp = 1000
len_e = len(expected_result)
len_d = len(dst_data)
expected_result = expected_result[len_e - Ncmp:]
@@ -135,7 +136,7 @@ class test_pfb_clock_sync(gr_unittest.TestCase):
#for e,d in zip(expected_result, dst_data):
# print e, d
- self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 1)
+ self.assertComplexTuplesAlmostEqual(expected_result, dst_data, 1)
if __name__ == '__main__':
diff --git a/gr-digital/python/qa_pn_correlator_cc.py b/gr-digital/python/qa_pn_correlator_cc.py
index 377bef5feb..4e81bf6662 100755
--- a/gr-digital/python/qa_pn_correlator_cc.py
+++ b/gr-digital/python/qa_pn_correlator_cc.py
@@ -26,7 +26,7 @@ import digital_swig as digital
class test_pn_correlator_cc(gr_unittest.TestCase):
def setUp(self):
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
def tearDown(self):
self.tb = None
diff --git a/gr-digital/python/qa_probe_density.py b/gr-digital/python/qa_probe_density.py
index c5b7e0e7c2..f42f00a7f7 100755
--- a/gr-digital/python/qa_probe_density.py
+++ b/gr-digital/python/qa_probe_density.py
@@ -34,37 +34,37 @@ class test_probe_density(gr_unittest.TestCase):
def test_001(self):
src_data = [0, 1, 0, 1]
expected_data = 1
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.probe_density_b(1)
- self.tb.connect (src, op)
- self.tb.run ()
+ self.tb.connect(src, op)
+ self.tb.run()
result_data = op.density()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_002(self):
src_data = [1, 1, 1, 1]
expected_data = 1
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.probe_density_b(0.01)
- self.tb.connect (src, op)
- self.tb.run ()
+ self.tb.connect(src, op)
+ self.tb.run()
result_data = op.density()
- self.assertEqual (expected_data, result_data)
+ self.assertEqual(expected_data, result_data)
def test_003(self):
src_data = [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]
expected_data = 0.95243
- src = gr.vector_source_b (src_data)
+ src = gr.vector_source_b(src_data)
op = digital.probe_density_b(0.01)
- self.tb.connect (src, op)
- self.tb.run ()
+ self.tb.connect(src, op)
+ self.tb.run()
result_data = op.density()
print result_data
- self.assertAlmostEqual (expected_data, result_data, 5)
+ self.assertAlmostEqual(expected_data, result_data, 5)
if __name__ == '__main__':
gr_unittest.run(test_probe_density, "test_probe_density.xml")
diff --git a/gr-digital/python/qa_scrambler.py b/gr-digital/python/qa_scrambler.py
index f5bd612429..3127a7c1e6 100755
--- a/gr-digital/python/qa_scrambler.py
+++ b/gr-digital/python/qa_scrambler.py
@@ -25,7 +25,7 @@ import digital_swig as digital
class test_scrambler(gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
self.tb = gr.top_block()
def tearDown(self):
diff --git a/gr-digital/python/qa_simple_framer.py b/gr-digital/python/qa_simple_framer.py
index 09b2d329b2..f8c894da28 100755
--- a/gr-digital/python/qa_simple_framer.py
+++ b/gr-digital/python/qa_simple_framer.py
@@ -24,15 +24,15 @@ from gnuradio import gr, gr_unittest
import digital_swig as digital
import math
-class test_simple_framer (gr_unittest.TestCase):
+class test_simple_framer(gr_unittest.TestCase):
- def setUp (self):
- self.tb = gr.top_block ()
+ def setUp(self):
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_simple_framer_001 (self):
+ def test_simple_framer_001(self):
src_data = (0x00, 0x11, 0x22, 0x33,
0x44, 0x55, 0x66, 0x77,
0x88, 0x99, 0xaa, 0xbb,
@@ -44,15 +44,14 @@ class test_simple_framer (gr_unittest.TestCase):
0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x02, 0x88, 0x99, 0xaa, 0xbb, 0x55,
0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x03, 0xcc, 0xdd, 0xee, 0xff, 0x55)
- src = gr.vector_source_b (src_data)
- op = digital.simple_framer (4)
- dst = gr.vector_sink_b ()
- self.tb.connect (src, op)
- self.tb.connect (op, dst)
- self.tb.run ()
- result_data = dst.data ()
- self.assertEqual (expected_result, result_data)
-
+ src = gr.vector_source_b(src_data)
+ op = digital.simple_framer(4)
+ dst = gr.vector_sink_b()
+ self.tb.connect(src, op)
+ self.tb.connect(op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
if __name__ == '__main__':
gr_unittest.run(test_simple_framer, "test_simple_framer.xml")
diff --git a/gr-digital/python/qam.py b/gr-digital/python/qam.py
index aafa5725d9..8584c59c6f 100644
--- a/gr-digital/python/qam.py
+++ b/gr-digital/python/qam.py
@@ -31,7 +31,7 @@ from generic_mod_demod import shared_mod_args, shared_demod_args
from utils.gray_code import gray_code
from utils import mod_codes
import modulation_utils
-import digital_swig
+import digital_swig as digital
# Default number of points in constellation.
_def_constellation_points = 16
@@ -172,8 +172,8 @@ def qam_constellation(constellation_points=_def_constellation_points,
pre_diff_code = range(0, m/2) + range(3*m/4, m) + range(m/2, 3*m/4)
else:
pre_diff_code = []
- constellation = digital_swig.constellation_rect(points, pre_diff_code, 4,
- side, side, width, width)
+ constellation = digital.constellation_rect(points, pre_diff_code, 4,
+ side, side, width, width)
return constellation
# /////////////////////////////////////////////////////////////////////////////
diff --git a/gr-digital/python/qpsk.py b/gr-digital/python/qpsk.py
index 60f791af7f..859d981367 100644
--- a/gr-digital/python/qpsk.py
+++ b/gr-digital/python/qpsk.py
@@ -29,7 +29,7 @@ from gnuradio import gr
from gnuradio.digital.generic_mod_demod import generic_mod, generic_demod
from gnuradio.digital.generic_mod_demod import shared_mod_args, shared_demod_args
from utils import mod_codes
-import digital_swig
+import digital_swig as digital
import modulation_utils
# The default encoding (e.g. gray-code, set-partition)
@@ -45,7 +45,7 @@ def qpsk_constellation(mod_code=_def_mod_code):
"""
if mod_code != mod_codes.GRAY_CODE:
raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
- return digital_swig.constellation_qpsk()
+ return digital.constellation_qpsk()
# /////////////////////////////////////////////////////////////////////////////
# QPSK modulator
@@ -68,12 +68,12 @@ class qpsk_mod(generic_mod):
def __init__(self, mod_code=_def_mod_code, differential=False, *args, **kwargs):
pre_diff_code = True
if not differential:
- constellation = digital_swig.constellation_qpsk()
+ constellation = digital.constellation_qpsk()
if mod_code != mod_codes.GRAY_CODE:
raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
else:
- constellation = digital_swig.constellation_dqpsk()
- if mod_code not in (mod_codes.GRAY_CODE or mod_codes.NO_CODE):
+ constellation = digital.constellation_dqpsk()
+ if mod_code not in set([mod_codes.GRAY_CODE, mod_codes.NO_CODE]):
raise ValueError("That mod_code is not supported for DQPSK mod/demod.")
if mod_code == mod_codes.NO_CODE:
pre_diff_code = False
@@ -106,12 +106,12 @@ class qpsk_demod(generic_demod):
*args, **kwargs):
pre_diff_code = True
if not differential:
- constellation = digital_swig.constellation_qpsk()
+ constellation = digital.constellation_qpsk()
if mod_code != mod_codes.GRAY_CODE:
raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
else:
- constellation = digital_swig.constellation_dqpsk()
- if mod_code not in (mod_codes.GRAY_CODE or mod_codes.NO_CODE):
+ constellation = digital.constellation_dqpsk()
+ if mod_code not in set([mod_codes.GRAY_CODE, mod_codes.NO_CODE]):
raise ValueError("That mod_code is not supported for DQPSK mod/demod.")
if mod_code == mod_codes.NO_CODE:
pre_diff_code = False
@@ -129,7 +129,7 @@ class qpsk_demod(generic_demod):
def dqpsk_constellation(mod_code=_def_mod_code):
if mod_code != mod_codes.GRAY_CODE:
raise ValueError("The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.")
- return digital_swig.constellation_dqpsk()
+ return digital.constellation_dqpsk()
# /////////////////////////////////////////////////////////////////////////////
# DQPSK modulator
@@ -149,7 +149,7 @@ class dqpsk_mod(qpsk_mod):
__doc__ += shared_mod_args
def __init__(self, mod_code=_def_mod_code, *args, **kwargs):
- super(dqpsk_mod, self).__init__(mod_code, differential=True,
+ super(dqpsk_mod, self).__init__(mod_code,
*args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
@@ -171,7 +171,7 @@ class dqpsk_demod(qpsk_demod):
__doc__ += shared_demod_args
def __init__(self, mod_code=_def_mod_code, *args, **kwargs):
- super(dqpsk_demod, self).__init__(mod_code, differential=True,
+ super(dqpsk_demod, self).__init__(mod_code,
*args, **kwargs)
#
@@ -183,4 +183,3 @@ modulation_utils.add_type_1_constellation('qpsk', qpsk_constellation)
modulation_utils.add_type_1_mod('dqpsk', dqpsk_mod)
modulation_utils.add_type_1_demod('dqpsk', dqpsk_demod)
modulation_utils.add_type_1_constellation('dqpsk', dqpsk_constellation)
-