summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital
diff options
context:
space:
mode:
authorJosh Morman <jmorman@gnuradio.org>2021-11-24 12:25:09 -0500
committermormj <34754695+mormj@users.noreply.github.com>2021-11-24 14:41:53 -0500
commit3cecf9268b15bb0e9e2fedaeb2528bd3038beee6 (patch)
treeba0bb14fa03664b4e69e0dcb944b96ae298264d8 /gr-digital/python/digital
parent310b7f0cc25edf90c0bc6754031d763119b0ae97 (diff)
digital: pep8 formatting
Signed-off-by: Josh Morman <jmorman@gnuradio.org>
Diffstat (limited to 'gr-digital/python/digital')
-rw-r--r--gr-digital/python/digital/__init__.py12
-rw-r--r--gr-digital/python/digital/bpsk.py2
-rw-r--r--gr-digital/python/digital/constellation_map_generator.py2
-rw-r--r--gr-digital/python/digital/cpm.py69
-rw-r--r--gr-digital/python/digital/generic_mod_demod.py66
-rw-r--r--gr-digital/python/digital/gfsk.py85
-rw-r--r--gr-digital/python/digital/gmsk.py87
-rw-r--r--gr-digital/python/digital/modulation_utils.py14
-rw-r--r--gr-digital/python/digital/ofdm_txrx.py319
-rw-r--r--gr-digital/python/digital/psk.py26
-rw-r--r--gr-digital/python/digital/psk_constellations.py103
-rw-r--r--gr-digital/python/digital/qa_burst_shaper.py15
-rw-r--r--gr-digital/python/digital/qa_constellation.py1
-rw-r--r--gr-digital/python/digital/qa_constellation_encoder_bc.py11
-rwxr-xr-xgr-digital/python/digital/qa_decision_feedback_equalizer.py2
-rw-r--r--gr-digital/python/digital/qa_glfsr_source.py2
-rw-r--r--gr-digital/python/digital/qa_lfsr.py20
-rwxr-xr-xgr-digital/python/digital/qa_linear_equalizer.py2
-rw-r--r--gr-digital/python/digital/qa_ofdm_chanest_vcvc.py6
-rw-r--r--gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py3
-rw-r--r--gr-digital/python/digital/qa_scrambler.py72
-rw-r--r--gr-digital/python/digital/qam.py4
-rw-r--r--gr-digital/python/digital/qpsk.py9
-rw-r--r--gr-digital/python/digital/soft_dec_lut_gen.py48
-rw-r--r--gr-digital/python/digital/test_soft_decisions.py23
-rw-r--r--gr-digital/python/digital/utils/__init__.py6
-rw-r--r--gr-digital/python/digital/utils/alignment.py21
-rw-r--r--gr-digital/python/digital/utils/gray_code.py6
-rw-r--r--gr-digital/python/digital/utils/lfsr.py8
-rw-r--r--gr-digital/python/digital/utils/mod_codes.py7
-rw-r--r--gr-digital/python/digital/utils/tagged_streams.py21
31 files changed, 638 insertions, 434 deletions
diff --git a/gr-digital/python/digital/__init__.py b/gr-digital/python/digital/__init__.py
index 64281eb2bb..100766f22c 100644
--- a/gr-digital/python/digital/__init__.py
+++ b/gr-digital/python/digital/__init__.py
@@ -22,11 +22,7 @@ except ImportError:
__path__.append(os.path.join(dirname, "bindings"))
from .digital_python import *
-from gnuradio import analog # just need analog for the enum
-class gmskmod_bc(cpmmod_bc):
- def __init__(self, samples_per_sym = 2, L = 4, beta = 0.3):
- cpmmod_bc.__init__(self, analog.cpm.GAUSSIAN, 0.5, samples_per_sym, L, beta)
-
+from gnuradio import analog # just need analog for the enum
from .psk import *
from .qam import *
from .qamlike import *
@@ -42,4 +38,8 @@ from .psk_constellations import *
from .qam_constellations import *
from .constellation_map_generator import *
-from . import packet_utils
+
+class gmskmod_bc(cpmmod_bc):
+ def __init__(self, samples_per_sym=2, L=4, beta=0.3):
+ cpmmod_bc.__init__(self, analog.cpm.GAUSSIAN,
+ 0.5, samples_per_sym, L, beta)
diff --git a/gr-digital/python/digital/bpsk.py b/gr-digital/python/digital/bpsk.py
index 7d5b8cde7a..e6d507e2f7 100644
--- a/gr-digital/python/digital/bpsk.py
+++ b/gr-digital/python/digital/bpsk.py
@@ -25,6 +25,7 @@ from . import modulation_utils
# BPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def bpsk_constellation():
return digital_python.constellation_bpsk()
@@ -32,6 +33,7 @@ def bpsk_constellation():
# DBPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def dbpsk_constellation():
return digital_python.constellation_dbpsk()
diff --git a/gr-digital/python/digital/constellation_map_generator.py b/gr-digital/python/digital/constellation_map_generator.py
index e6e660392c..e3d6f532fd 100644
--- a/gr-digital/python/digital/constellation_map_generator.py
+++ b/gr-digital/python/digital/constellation_map_generator.py
@@ -35,7 +35,7 @@ def constellation_map_generator(basis_cpoints, basis_symbols, k, pi):
symbols = list()
for s_i in s:
tmp = 0
- for i,p in enumerate(pi):
+ for i, p in enumerate(pi):
bit = (s_i >> i) & 0x1
tmp |= bit << p
symbols.append(tmp ^ k)
diff --git a/gr-digital/python/digital/cpm.py b/gr-digital/python/digital/cpm.py
index d21ae55a87..ca32fb7036 100644
--- a/gr-digital/python/digital/cpm.py
+++ b/gr-digital/python/digital/cpm.py
@@ -29,7 +29,7 @@ _def_samples_per_symbol = 2
_def_bits_per_symbol = 1
_def_h_numerator = 1
_def_h_denominator = 2
-_def_cpm_type = 0 # 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL
+_def_cpm_type = 0 # 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL
_def_bt = 0.35
_def_symbols_per_pulse = 1
_def_generic_taps = numpy.empty(1)
@@ -80,62 +80,69 @@ class cpm_mod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "cpm_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
self._samples_per_symbol = samples_per_symbol
self._bits_per_symbol = bits_per_symbol
self._h_numerator = h_numerator
self._h_denominator = h_denominator
self._cpm_type = cpm_type
- self._bt=bt
- if cpm_type == 0 or cpm_type == 2 or cpm_type == 3: # CPFSK, RC, Generic
+ self._bt = bt
+ if cpm_type == 0 or cpm_type == 2 or cpm_type == 3: # CPFSK, RC, Generic
self._symbols_per_pulse = symbols_per_pulse
- elif cpm_type == 1: # GMSK
+ elif cpm_type == 1: # GMSK
self._symbols_per_pulse = 4
else:
- raise TypeError("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
+ raise TypeError(
+ "cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
- self._generic_taps=numpy.array(generic_taps)
+ self._generic_taps = numpy.array(generic_taps)
if samples_per_symbol < 2:
- raise TypeError("samples_per_symbol must be >= 2, is %r" % (samples_per_symbol,))
+ raise TypeError("samples_per_symbol must be >= 2, is %r" %
+ (samples_per_symbol,))
self.nsymbols = 2**bits_per_symbol
- self.sym_alphabet = numpy.arange(-(self.nsymbols-1),self.nsymbols,2).tolist()
-
+ self.sym_alphabet = numpy.arange(-(self.nsymbols - 1),
+ self.nsymbols, 2).tolist()
self.ntaps = int(self._symbols_per_pulse * samples_per_symbol)
sensitivity = 2 * pi * h_numerator / h_denominator / samples_per_symbol
# Unpack Bytes into bits_per_symbol groups
- self.B2s = blocks.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST)
-
+ self.B2s = blocks.packed_to_unpacked_bb(
+ bits_per_symbol, gr.GR_MSB_FIRST)
# Turn it into symmetric PAM data.
- self.pam = digital_python.chunks_to_symbols_bf(self.sym_alphabet,1)
+ self.pam = digital_python.chunks_to_symbols_bf(self.sym_alphabet, 1)
# Generate pulse (sum of taps = samples_per_symbol/2)
- if cpm_type == 0: # CPFSK
- self.taps= (1.0/self._symbols_per_pulse/2,) * self.ntaps
- elif cpm_type == 1: # GMSK
+ if cpm_type == 0: # CPFSK
+ self.taps = (1.0 / self._symbols_per_pulse / 2,) * self.ntaps
+ elif cpm_type == 1: # GMSK
gaussian_taps = filter.firdes.gaussian(
1.0 / 2, # gain
samples_per_symbol, # symbol_rate
bt, # bandwidth * symbol time
self.ntaps # number of taps
- )
+ )
sqwave = (1,) * samples_per_symbol # rectangular window
- self.taps = numpy.convolve(numpy.array(gaussian_taps),numpy.array(sqwave))
- elif cpm_type == 2: # Raised Cosine
+ self.taps = numpy.convolve(numpy.array(
+ gaussian_taps), numpy.array(sqwave))
+ elif cpm_type == 2: # Raised Cosine
# generalize it for arbitrary roll-off factor
- self.taps = (1-numpy.cos(2*pi*numpy.arange(0 / self.ntaps/samples_per_symbol/self._symbols_per_pulse)),(2*self._symbols_per_pulse))
- elif cpm_type == 3: # Generic CPM
+ self.taps = (1 - numpy.cos(2 * pi * numpy.arange(0 / self.ntaps /
+ samples_per_symbol / self._symbols_per_pulse)), (2 * self._symbols_per_pulse))
+ elif cpm_type == 3: # Generic CPM
self.taps = generic_taps
else:
- raise TypeError("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
+ raise TypeError(
+ "cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
- self.filter = filter.pfb.arb_resampler_fff(samples_per_symbol, self.taps)
+ self.filter = filter.pfb.arb_resampler_fff(
+ samples_per_symbol, self.taps)
# FM modulation
self.fmmod = analog.frequency_modulator_fc(sensitivity)
@@ -170,19 +177,17 @@ class cpm_mod(gr.hier_block2):
def symbols_per_pulse(self):
return self._symbols_per_pulse
-
def _print_verbage(self):
print("Samples per symbol = %d" % self._samples_per_symbol)
print("Bits per symbol = %d" % self._bits_per_symbol)
- print("h = " , self._h_numerator , " / " , self._h_denominator)
- print("Symbol alphabet = " , self.sym_alphabet)
+ print("h = ", self._h_numerator, " / ", self._h_denominator)
+ print("Symbol alphabet = ", self.sym_alphabet)
print("Symbols per pulse = %d" % self._symbols_per_pulse)
- print("taps = " , self.taps)
+ print("taps = ", self.taps)
print("CPM type = %d" % self._cpm_type)
if self._cpm_type == 1:
- print("Gaussian filter BT = %.2f" % self._bt)
-
+ print("Gaussian filter BT = %.2f" % self._bt)
def _setup_logging(self):
print("Modulation logging turned on.")
@@ -209,8 +214,7 @@ class cpm_mod(gr.hier_block2):
Given command line options, create dictionary suitable for passing to __init__
"""
return modulation_utils.extract_kwargs_from_options(cpm_mod.__init__,
- ('self',), options)
-
+ ('self',), options)
# /////////////////////////////////////////////////////////////////////////////
@@ -219,7 +223,6 @@ class cpm_mod(gr.hier_block2):
#
# Not yet implemented
#
-
#
# Add these to the mod/demod registry
#
diff --git a/gr-digital/python/digital/generic_mod_demod.py b/gr-digital/python/digital/generic_mod_demod.py
index a40a7fda5f..2db9e4482a 100644
--- a/gr-digital/python/digital/generic_mod_demod.py
+++ b/gr-digital/python/digital/generic_mod_demod.py
@@ -29,17 +29,18 @@ _def_log = False
_def_truncate = False
# Frequency correction
-_def_freq_bw = 2*math.pi/100.0
+_def_freq_bw = 2 * math.pi / 100.0
# Symbol timing recovery
-_def_timing_bw = 2*math.pi/100.0
+_def_timing_bw = 2 * math.pi / 100.0
_def_timing_max_dev = 1.5
# Fine frequency / Phase correction
-_def_phase_bw = 2*math.pi/100.0
+_def_phase_bw = 2 * math.pi / 100.0
# Number of points in constellation
_def_constellation_points = 16
# Whether differential coding is used.
_def_differential = False
+
def add_common_options(parser):
"""
Sets options common to both modulator and demodulator.
@@ -55,7 +56,7 @@ def add_common_options(parser):
parser.add_option("", "--mod-code", type="choice", choices=mod_codes.codes,
default=mod_codes.NO_CODE,
help="Select modulation code from: %s [default=%%default]"
- % (', '.join(mod_codes.codes),))
+ % (', '.join(mod_codes.codes),))
parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw,
help="set RRC excess bandwidth factor [default=%default]")
@@ -92,8 +93,9 @@ class generic_mod(gr.hier_block2):
truncate=_def_truncate):
gr.hier_block2.__init__(self, "generic_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
self._constellation = constellation
self._samples_per_symbol = samples_per_symbol
@@ -103,31 +105,36 @@ class generic_mod(gr.hier_block2):
self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code()
if self._samples_per_symbol < 2:
- raise TypeError("sps must be >= 2, is %f" % self._samples_per_symbol)
+ raise TypeError("sps must be >= 2, is %f" %
+ self._samples_per_symbol)
- arity = pow(2,self.bits_per_symbol())
+ arity = pow(2, self.bits_per_symbol())
# turn bytes into k-bit vectors
self.bytes2chunks = \
- blocks.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST)
+ blocks.packed_to_unpacked_bb(
+ self.bits_per_symbol(), gr.GR_MSB_FIRST)
if self.pre_diff_code:
- self.symbol_mapper = digital.map_bb(self._constellation.pre_diff_code())
+ self.symbol_mapper = digital.map_bb(
+ self._constellation.pre_diff_code())
if differential:
self.diffenc = digital.diff_encoder_bb(arity)
- self.chunks2symbols = digital.chunks_to_symbols_bc(self._constellation.points())
+ self.chunks2symbols = digital.chunks_to_symbols_bc(
+ self._constellation.points())
# pulse shaping filter
nfilts = 32
ntaps_per_filt = 11
- ntaps = nfilts * ntaps_per_filt * int(self._samples_per_symbol) # make nfilts filters of ntaps each
+ # make nfilts filters of ntaps each
+ ntaps = nfilts * ntaps_per_filt * int(self._samples_per_symbol)
self.rrc_taps = filter.firdes.root_raised_cosine(
nfilts, # gain
nfilts, # sampling rate based on 32 filters in resampler
1.0, # symbol rate
- self._excess_bw, # excess bandwidth (roll-off factor)
+ self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
self.rrc_filter = filter.pfb_arb_resampler_ccf(self._samples_per_symbol,
self.rrc_taps)
@@ -135,8 +142,10 @@ class generic_mod(gr.hier_block2):
# Remove the filter transient at the beginning of the transmission
if truncate:
fsps = float(self._samples_per_symbol)
- len_filt_delay = int((ntaps_per_filt*fsps*fsps-fsps)/2.0) # Length of delay through rrc filter
- self.skiphead = blocks.skiphead(gr.sizeof_gr_complex*1, len_filt_delay)
+ # Length of delay through rrc filter
+ len_filt_delay = int((ntaps_per_filt * fsps * fsps - fsps) / 2.0)
+ self.skiphead = blocks.skiphead(
+ gr.sizeof_gr_complex * 1, len_filt_delay)
# Connect
self._blocks = [self, self.bytes2chunks]
@@ -145,7 +154,7 @@ class generic_mod(gr.hier_block2):
if differential:
self._blocks.append(self.diffenc)
self._blocks += [self.chunks2symbols, self.rrc_filter]
-
+
if truncate:
self._blocks.append(self.skiphead)
self._blocks.append(self)
@@ -157,7 +166,6 @@ class generic_mod(gr.hier_block2):
if log:
self._setup_logging()
-
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -176,8 +184,7 @@ class generic_mod(gr.hier_block2):
Given command line options, create dictionary suitable for passing to __init__
"""
return extract_kwargs_from_options_for_class(cls, options)
- extract_kwargs_from_options=classmethod(extract_kwargs_from_options)
-
+ extract_kwargs_from_options = classmethod(extract_kwargs_from_options)
def _print_verbage(self):
print("\nModulator:")
@@ -239,7 +246,8 @@ class generic_demod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "generic_demod",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
self._constellation = constellation
@@ -248,19 +256,20 @@ class generic_demod(gr.hier_block2):
self._phase_bw = phase_bw
self._freq_bw = freq_bw
self._timing_bw = timing_bw
- self._timing_max_dev= _def_timing_max_dev
+ self._timing_max_dev = _def_timing_max_dev
self._differential = differential
if self._samples_per_symbol < 2:
- raise TypeError("sps must be >= 2, is %d" % self._samples_per_symbol)
+ raise TypeError("sps must be >= 2, is %d" %
+ self._samples_per_symbol)
# Only apply a predifferential coding if the constellation also supports it.
self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code()
- arity = pow(2,self.bits_per_symbol())
+ arity = pow(2, self.bits_per_symbol())
nfilts = 32
- ntaps = 11 * int(self._samples_per_symbol*nfilts)
+ ntaps = 11 * int(self._samples_per_symbol * nfilts)
# Automatic gain control
self.agc = analog.agc2_cc(0.6e-1, 1e-3, 1, 1)
@@ -271,11 +280,11 @@ class generic_demod(gr.hier_block2):
fll_ntaps, self._freq_bw)
# symbol timing recovery with RRC data filter
- taps = filter.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol,
+ taps = filter.firdes.root_raised_cosine(nfilts, nfilts * self._samples_per_symbol,
1.0, self._excess_bw, ntaps)
self.time_recov = digital.pfb_clock_sync_ccf(self._samples_per_symbol,
self._timing_bw, taps,
- nfilts, nfilts//2, self._timing_max_dev)
+ nfilts, nfilts // 2, self._timing_max_dev)
fmin = -0.25
fmax = 0.25
@@ -318,7 +327,7 @@ class generic_demod(gr.hier_block2):
def _print_verbage(self):
print("\nDemodulator:")
- print("bits per symbol: %d" % self.bits_per_symbol())
+ print("bits per symbol: %d" % self.bits_per_symbol())
print("RRC roll-off factor: %.2f" % self._excess_bw)
print("FLL bandwidth: %.2e" % self._freq_bw)
print("Timing bandwidth: %.2e" % self._timing_bw)
@@ -381,7 +390,8 @@ class generic_demod(gr.hier_block2):
Given command line options, create dictionary suitable for passing to __init__
"""
return extract_kwargs_from_options_for_class(cls, options)
- extract_kwargs_from_options=classmethod(extract_kwargs_from_options)
+ extract_kwargs_from_options = classmethod(extract_kwargs_from_options)
+
shared_demod_args = """ samples_per_symbol: samples per baud >= 2 (float)
excess_bw: Root-raised cosine filter excess bandwidth (float)
diff --git a/gr-digital/python/digital/gfsk.py b/gr-digital/python/digital/gfsk.py
index 2cc8f69c97..63583cc1e0 100644
--- a/gr-digital/python/digital/gfsk.py
+++ b/gr-digital/python/digital/gfsk.py
@@ -70,8 +70,9 @@ class gfsk_mod(gr.hier_block2):
do_unpack=_def_do_unpack):
gr.hier_block2.__init__(self, "gfsk_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
samples_per_symbol = int(samples_per_symbol)
self._samples_per_symbol = samples_per_symbol
@@ -79,12 +80,11 @@ class gfsk_mod(gr.hier_block2):
self._differential = False
if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2:
- raise TypeError("samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
+ raise TypeError(
+ "samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
ntaps = 4 * samples_per_symbol # up to 3 bits in filter at once
- #sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
-
-
+ # sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
# Turn it into NRZ data.
#self.nrz = digital.bytes_to_syms()
@@ -93,15 +93,17 @@ class gfsk_mod(gr.hier_block2):
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
self.gaussian_taps = filter.firdes.gaussian(
- 1.0, # gain
- samples_per_symbol, # symbol_rate
- bt, # bandwidth * symbol time
- ntaps # number of taps
- )
+ 1.0, # gain
+ samples_per_symbol, # symbol_rate
+ bt, # bandwidth * symbol time
+ ntaps # number of taps
+ )
self.sqwave = (1,) * samples_per_symbol # rectangular window
- self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.taps = numpy.convolve(numpy.array(
+ self.gaussian_taps), numpy.array(self.sqwave))
+ self.gaussian_filter = filter.interp_fir_filter_fff(
+ samples_per_symbol, self.taps)
# FM modulation
self.fmmod = analog.frequency_modulator_fc(sensitivity)
@@ -118,22 +120,24 @@ class gfsk_mod(gr.hier_block2):
# Connect & Initialize base class
if do_unpack:
self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
- self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
+ self.connect(self, self.unpack, self.nrz,
+ self.gaussian_filter, self.fmmod, self.amp, self)
else:
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
+ self.connect(self, self.nrz, self.gaussian_filter,
+ self.fmmod, self.amp, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@staticmethod
- def bits_per_symbol(self=None): # staticmethod that's also callable on an instance
+ # staticmethod that's also callable on an instance
+ def bits_per_symbol(self=None):
return 1
def _print_verbage(self):
print("bits per symbol = %d" % self.bits_per_symbol())
print("Gaussian filter bt = %.2f" % self._bt)
-
def _setup_logging(self):
print("Modulation logging turned on.")
self.connect(self.nrz,
@@ -197,7 +201,8 @@ class gfsk_demod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "gfsk_demod",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
self._samples_per_symbol = samples_per_symbol
@@ -207,17 +212,20 @@ class gfsk_demod(gr.hier_block2):
self._differential = False
if samples_per_symbol < 2:
- raise TypeError("samples_per_symbol >= 2, is %f" % samples_per_symbol)
+ raise TypeError("samples_per_symbol >= 2, is %f" %
+ samples_per_symbol)
- self._omega = samples_per_symbol*(1+self._freq_error)
+ self._omega = samples_per_symbol * (1 + self._freq_error)
if not self._gain_mu:
self._gain_mu = 0.175
- self._gain_omega = .25 * self._gain_mu * self._gain_mu # critically damped
+ self._gain_omega = .25 * self._gain_mu * \
+ self._gain_mu # critically damped
self._damping = 1.0
- self._loop_bw = -ln((self._gain_mu + self._gain_omega)/(-2.0) + 1) # critically damped
+ # critically damped
+ self._loop_bw = -ln((self._gain_mu + self._gain_omega) / (-2.0) + 1)
self._max_dev = self._omega_relative_limit * self._samples_per_symbol
# Demodulate FM
@@ -227,16 +235,16 @@ class gfsk_demod(gr.hier_block2):
# the clock recovery block tracks the symbol clock and resamples as needed.
# the output of the block is a stream of soft symbols (float)
self.clock_recovery = self.digital_symbol_sync_xx_0 = digital.symbol_sync_ff(digital.TED_MUELLER_AND_MULLER,
- self._omega,
- self._loop_bw,
- self._damping,
- 1.0, # Expected TED gain
- self._max_dev,
- 1, # Output sps
- digital.constellation_bpsk().base(),
- digital.IR_MMSE_8TAP,
- 128,
- [])
+ self._omega,
+ self._loop_bw,
+ self._damping,
+ 1.0, # Expected TED gain
+ self._max_dev,
+ 1, # Output sps
+ digital.constellation_bpsk().base(),
+ digital.IR_MMSE_8TAP,
+ 128,
+ [])
# slice the floats at 0, outputting 1 bit (the LSB of the output byte) per sample
self.slicer = digital.binary_slicer_fb()
@@ -248,7 +256,8 @@ class gfsk_demod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.fmdemod, self.clock_recovery, self.slicer, self)
+ self.connect(self, self.fmdemod,
+ self.clock_recovery, self.slicer, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -262,18 +271,18 @@ class gfsk_demod(gr.hier_block2):
print("Symbol Sync M&M omega = %f" % self._omega)
print("Symbol Sync M&M gain mu = %f" % self._gain_mu)
print("M&M clock recovery mu (Unused) = %f" % self._mu)
- print("Symbol Sync M&M omega rel. limit = %f" % self._omega_relative_limit)
+ print("Symbol Sync M&M omega rel. limit = %f" %
+ self._omega_relative_limit)
print("frequency error = %f" % self._freq_error)
-
def _setup_logging(self):
print("Demodulation logging turned on.")
self.connect(self.fmdemod,
- blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
+ blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
self.connect(self.clock_recovery,
- blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
+ blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
self.connect(self.slicer,
- blocks.file_sink(gr.sizeof_char, "slicer.dat"))
+ blocks.file_sink(gr.sizeof_char, "slicer.dat"))
@staticmethod
def add_options(parser):
diff --git a/gr-digital/python/digital/gmsk.py b/gr-digital/python/digital/gmsk.py
index 68127119e6..106e588782 100644
--- a/gr-digital/python/digital/gmsk.py
+++ b/gr-digital/python/digital/gmsk.py
@@ -68,8 +68,9 @@ class gmsk_mod(gr.hier_block2):
do_unpack=_def_do_unpack):
gr.hier_block2.__init__(self, "gmsk_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
samples_per_symbol = int(samples_per_symbol)
self._samples_per_symbol = samples_per_symbol
@@ -77,10 +78,13 @@ class gmsk_mod(gr.hier_block2):
self._differential = False
if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2:
- raise TypeError("samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
+ raise TypeError(
+ "samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
- ntaps = 4 * samples_per_symbol # up to 3 bits in filter at once
- sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
+ # up to 3 bits in filter at once
+ ntaps = 4 * samples_per_symbol
+ # phase change per bit = pi / 2
+ sensitivity = (pi / 2) / samples_per_symbol
# Turn it into NRZ data.
#self.nrz = digital.bytes_to_syms()
@@ -89,15 +93,17 @@ class gmsk_mod(gr.hier_block2):
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
self.gaussian_taps = filter.firdes.gaussian(
- 1, # gain
- samples_per_symbol, # symbol_rate
- bt, # bandwidth * symbol time
- ntaps # number of taps
- )
+ 1, # gain
+ samples_per_symbol, # symbol_rate
+ bt, # bandwidth * symbol time
+ ntaps # number of taps
+ )
self.sqwave = (1,) * samples_per_symbol # rectangular window
- self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.taps = numpy.convolve(numpy.array(
+ self.gaussian_taps), numpy.array(self.sqwave))
+ self.gaussian_filter = filter.interp_fir_filter_fff(
+ samples_per_symbol, self.taps)
# FM modulation
self.fmmod = analog.frequency_modulator_fc(sensitivity)
@@ -111,22 +117,24 @@ class gmsk_mod(gr.hier_block2):
# Connect & Initialize base class
if do_unpack:
self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
- self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self)
+ self.connect(self, self.unpack, self.nrz,
+ self.gaussian_filter, self.fmmod, self)
else:
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self)
+ self.connect(self, self.nrz, self.gaussian_filter,
+ self.fmmod, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@staticmethod
- def bits_per_symbol(self=None): # staticmethod that's also callable on an instance
+ # staticmethod that's also callable on an instance
+ def bits_per_symbol(self=None):
return 1
def _print_verbage(self):
print("bits per symbol = %d" % self.bits_per_symbol())
print("Gaussian filter bt = %.2f" % self._bt)
-
def _setup_logging(self):
print("Modulation logging turned on.")
self.connect(self.nrz,
@@ -185,7 +193,8 @@ class gmsk_demod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "gmsk_demod",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
self._samples_per_symbol = samples_per_symbol
@@ -195,17 +204,20 @@ class gmsk_demod(gr.hier_block2):
self._differential = False
if samples_per_symbol < 2:
- raise TypeError("samples_per_symbol >= 2, is %f" % samples_per_symbol)
+ raise TypeError("samples_per_symbol >= 2, is %f" %
+ samples_per_symbol)
- self._omega = samples_per_symbol*(1+self._freq_error)
+ self._omega = samples_per_symbol * (1 + self._freq_error)
if not self._gain_mu:
self._gain_mu = 0.175
- self._gain_omega = .25 * self._gain_mu * self._gain_mu # critically damped
+ self._gain_omega = .25 * self._gain_mu * \
+ self._gain_mu # critically damped
self._damping = 1.0
- self._loop_bw = -ln((self._gain_mu + self._gain_omega)/(-2.0) + 1) # critically damped
+ # critically damped
+ self._loop_bw = -ln((self._gain_mu + self._gain_omega) / (-2.0) + 1)
self._max_dev = self._omega_relative_limit * self._samples_per_symbol
# Demodulate FM
@@ -215,16 +227,16 @@ class gmsk_demod(gr.hier_block2):
# the clock recovery block tracks the symbol clock and resamples as needed.
# the output of the block is a stream of soft symbols (float)
self.clock_recovery = self.digital_symbol_sync_xx_0 = digital.symbol_sync_ff(digital.TED_MUELLER_AND_MULLER,
- self._omega,
- self._loop_bw,
- self._damping,
- 1.0, # Expected TED gain
- self._max_dev,
- 1, # Output sps
- digital.constellation_bpsk().base(),
- digital.IR_MMSE_8TAP,
- 128,
- [])
+ self._omega,
+ self._loop_bw,
+ self._damping,
+ 1.0, # Expected TED gain
+ self._max_dev,
+ 1, # Output sps
+ digital.constellation_bpsk().base(),
+ digital.IR_MMSE_8TAP,
+ 128,
+ [])
# slice the floats at 0, outputting 1 bit (the LSB of the output byte) per sample
self.slicer = digital.binary_slicer_fb()
@@ -236,7 +248,8 @@ class gmsk_demod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.fmdemod, self.clock_recovery, self.slicer, self)
+ self.connect(self, self.fmdemod,
+ self.clock_recovery, self.slicer, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -250,18 +263,18 @@ class gmsk_demod(gr.hier_block2):
print("Symbol Sync M&M omega = %f" % self._omega)
print("Symbol Sync M&M gain mu = %f" % self._gain_mu)
print("M&M clock recovery mu (Unused) = %f" % self._mu)
- print("Symbol Sync M&M omega rel. limit = %f" % self._omega_relative_limit)
+ print("Symbol Sync M&M omega rel. limit = %f" %
+ self._omega_relative_limit)
print("frequency error = %f" % self._freq_error)
-
def _setup_logging(self):
print("Demodulation logging turned on.")
self.connect(self.fmdemod,
- blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
+ blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
self.connect(self.clock_recovery,
- blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
+ blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
self.connect(self.slicer,
- blocks.file_sink(gr.sizeof_char, "slicer.dat"))
+ blocks.file_sink(gr.sizeof_char, "slicer.dat"))
@staticmethod
def add_options(parser):
diff --git a/gr-digital/python/digital/modulation_utils.py b/gr-digital/python/digital/modulation_utils.py
index 59809ae1d8..1d57e1c9ba 100644
--- a/gr-digital/python/digital/modulation_utils.py
+++ b/gr-digital/python/digital/modulation_utils.py
@@ -1,8 +1,8 @@
#
# Copyright 2010 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
@@ -17,9 +17,11 @@ import inspect
# Type 1 modulators accept a stream of bytes on their input and produce complex baseband output
_type_1_modulators = {}
+
def type_1_mods():
return _type_1_modulators
+
def add_type_1_mod(name, mod_class):
_type_1_modulators[name] = mod_class
@@ -29,18 +31,23 @@ def add_type_1_mod(name, mod_class):
# to resolve phase or polarity ambiguities.
_type_1_demodulators = {}
+
def type_1_demods():
return _type_1_demodulators
+
def add_type_1_demod(name, demod_class):
_type_1_demodulators[name] = demod_class
+
# Also record the constellation making functions of the modulations
_type_1_constellations = {}
+
def type_1_constellations():
return _type_1_constellations
+
def add_type_1_constellation(name, constellation):
_type_1_constellations[name] = constellation
@@ -67,7 +74,7 @@ def extract_kwargs_from_options(function, excluded_args, options):
excluded_args: function arguments that are NOT to be added to the dictionary (sequence of strings)
options: result of command argument parsing (optparse.Values)
"""
-
+
# Try this in C++ ;)
spec = inspect.getfullargspec(function)
d = {}
@@ -77,6 +84,7 @@ def extract_kwargs_from_options(function, excluded_args, options):
d[kw] = getattr(options, kw)
return d
+
def extract_kwargs_from_options_for_class(cls, options):
"""
Given command line options, create dictionary suitable for passing to __init__
diff --git a/gr-digital/python/digital/ofdm_txrx.py b/gr-digital/python/digital/ofdm_txrx.py
index 9d083c5fe2..60109ceaaa 100644
--- a/gr-digital/python/digital/ofdm_txrx.py
+++ b/gr-digital/python/digital/ofdm_txrx.py
@@ -31,15 +31,20 @@ _def_frame_length_tag_key = "frame_length"
_def_packet_length_tag_key = "packet_length"
_def_packet_num_tag_key = "packet_num"
# Data and pilot carriers are same as in 802.11a
-_def_occupied_carriers = (list(range(-26, -21)) + list(range(-20, -7)) + list(range(-6, 0)) + list(range(1, 7)) + list(range(8, 21)) + list(range(22, 27)),)
-_def_pilot_carriers=((-21, -7, 7, 21,),)
+_def_occupied_carriers = (list(range(-26, -21)) + list(range(-20, -7)) + list(
+ range(-6, 0)) + list(range(1, 7)) + list(range(8, 21)) + list(range(22, 27)),)
+_def_pilot_carriers = ((-21, -7, 7, 21,),)
_pilot_sym_scramble_seq = (
- 1,1,1,1, -1,-1,-1,1, -1,-1,-1,-1, 1,1,-1,1, -1,-1,1,1, -1,1,1,-1, 1,1,1,1, 1,1,-1,1,
- 1,1,-1,1, 1,-1,-1,1, 1,1,-1,1, -1,-1,-1,1, -1,1,-1,-1, 1,-1,-1,1, 1,1,1,1, -1,-1,1,1,
- -1,-1,1,-1, 1,-1,1,1, -1,-1,-1,1, 1,-1,-1,-1, -1,1,-1,-1, 1,-1,1,1, 1,1,-1,1, -1,1,-1,1,
- -1,-1,-1,-1, -1,1,-1,1, 1,-1,1,-1, 1,1,1,-1, -1,1,-1,-1, -1,1,1,1, -1,-1,-1,-1, -1,-1,-1
+ 1, 1, 1, 1, -1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, 1, -
+ 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, 1, 1, 1, 1, -1, 1,
+ 1, 1, -1, 1, 1, -1, -1, 1, 1, 1, -1, 1, -1, -1, -1, 1, -
+ 1, 1, -1, -1, 1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1,
+ -1, -1, 1, -1, 1, -1, 1, 1, -1, -1, -1, 1, 1, -1, -1, -
+ 1, -1, 1, -1, -1, 1, -1, 1, 1, 1, 1, -1, 1, -1, 1, -1, 1,
+ -1, -1, -1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, -1, -
+ 1, 1, -1, -1, -1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1
)
-_def_pilot_symbols= tuple([(x, x, x, -x) for x in _pilot_sym_scramble_seq])
+_def_pilot_symbols = tuple([(x, x, x, -x) for x in _pilot_sym_scramble_seq])
_seq_seed = 42
@@ -52,6 +57,7 @@ def _get_active_carriers(fft_len, occupied_carriers, pilot_carriers):
active_carriers.append(carrier)
return active_carriers
+
def _make_sync_word1(fft_len, occupied_carriers, pilot_carriers):
""" Creates a random sync sequence for fine frequency offset and timing
estimation. This is the first of typically two sync preamble symbols
@@ -64,31 +70,37 @@ def _make_sync_word1(fft_len, occupied_carriers, pilot_carriers):
Carrier 0 (DC carrier) is always zero. If used, carrier 1 is non-zero.
This means the sync algorithm has to check on odd carriers!
"""
- active_carriers = _get_active_carriers(fft_len, occupied_carriers, pilot_carriers)
+ active_carriers = _get_active_carriers(
+ fft_len, occupied_carriers, pilot_carriers)
numpy.random.seed(_seq_seed)
bpsk = {0: numpy.sqrt(2), 1: -numpy.sqrt(2)}
- sw1 = [bpsk[numpy.random.randint(2)] if x in active_carriers and x % 2 else 0 for x in range(fft_len)]
+ sw1 = [bpsk[numpy.random.randint(
+ 2)] if x in active_carriers and x % 2 else 0 for x in range(fft_len)]
return numpy.fft.fftshift(sw1)
+
def _make_sync_word2(fft_len, occupied_carriers, pilot_carriers):
""" Creates a random sync sequence for coarse frequency offset and channel
estimation. This is the second of typically two sync preamble symbols
for the Schmidl & Cox sync algorithm.
Symbols are always BPSK symbols.
"""
- active_carriers = _get_active_carriers(fft_len, occupied_carriers, pilot_carriers)
+ active_carriers = _get_active_carriers(
+ fft_len, occupied_carriers, pilot_carriers)
numpy.random.seed(_seq_seed)
bpsk = {0: 1, 1: -1}
- sw2 = [bpsk[numpy.random.randint(2)] if x in active_carriers else 0 for x in range(fft_len)]
+ sw2 = [bpsk[numpy.random.randint(
+ 2)] if x in active_carriers else 0 for x in range(fft_len)]
sw2[0] = 0j
return numpy.fft.fftshift(sw2)
+
def _get_constellation(bps):
""" Returns a modulator block for a given number of bits per symbol """
constellation = {
- 1: digital.constellation_bpsk(),
- 2: digital.constellation_qpsk(),
- 3: digital.constellation_8psk()
+ 1: digital.constellation_bpsk(),
+ 2: digital.constellation_qpsk(),
+ 3: digital.constellation_8psk()
}
try:
return constellation[bps]
@@ -96,6 +108,7 @@ def _get_constellation(bps):
print('Modulation not supported.')
exit(1)
+
class ofdm_tx(gr.hier_block2):
"""Hierarchical block for OFDM modulation.
@@ -122,6 +135,7 @@ class ofdm_tx(gr.hier_block2):
scramble_bits: Activates the scramblers (set this to True unless debugging)
"""
+
def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len,
packet_length_tag_key=_def_packet_length_tag_key,
occupied_carriers=_def_occupied_carriers,
@@ -136,75 +150,82 @@ class ofdm_tx(gr.hier_block2):
scramble_bits=False
):
gr.hier_block2.__init__(self, "ofdm_tx",
- gr.io_signature(1, 1, gr.sizeof_char),
- gr.io_signature(1, 1, gr.sizeof_gr_complex))
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
### Param init / sanity check ########################################
- self.fft_len = fft_len
- self.cp_len = cp_len
+ self.fft_len = fft_len
+ self.cp_len = cp_len
self.packet_length_tag_key = packet_length_tag_key
self.occupied_carriers = occupied_carriers
- self.pilot_carriers = pilot_carriers
- self.pilot_symbols = pilot_symbols
- self.bps_header = bps_header
- self.bps_payload = bps_payload
+ self.pilot_carriers = pilot_carriers
+ self.pilot_symbols = pilot_symbols
+ self.bps_header = bps_header
+ self.bps_payload = bps_payload
self.sync_word1 = sync_word1
if sync_word1 is None:
- self.sync_word1 = _make_sync_word1(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word1 = _make_sync_word1(
+ fft_len, occupied_carriers, pilot_carriers)
else:
if len(sync_word1) != self.fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
- self.sync_words = [self.sync_word1,]
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
+ self.sync_words = [self.sync_word1, ]
if sync_word2 is None:
- self.sync_word2 = _make_sync_word2(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word2 = _make_sync_word2(
+ fft_len, occupied_carriers, pilot_carriers)
else:
self.sync_word2 = sync_word2
if len(self.sync_word2):
if len(self.sync_word2) != fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
self.sync_word2 = list(self.sync_word2)
self.sync_words.append(self.sync_word2)
if scramble_bits:
self.scramble_seed = 0x7f
else:
- self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
+ self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
### Header modulation ################################################
crc = digital.crc32_bb(False, self.packet_length_tag_key)
- header_constellation = _get_constellation(bps_header)
- header_mod = digital.chunks_to_symbols_bc(header_constellation.points())
+ header_constellation = _get_constellation(bps_header)
+ header_mod = digital.chunks_to_symbols_bc(
+ header_constellation.points())
formatter_object = digital.packet_header_ofdm(
occupied_carriers=occupied_carriers, n_syms=1,
bits_per_header_sym=self.bps_header,
bits_per_payload_sym=self.bps_payload,
scramble_header=scramble_bits
)
- header_gen = digital.packet_headergenerator_bb(formatter_object.base(), self.packet_length_tag_key)
+ header_gen = digital.packet_headergenerator_bb(
+ formatter_object.base(), self.packet_length_tag_key)
header_payload_mux = blocks.tagged_stream_mux(
- itemsize=gr.sizeof_gr_complex*1,
- lengthtagname=self.packet_length_tag_key,
- tag_preserve_head_pos=1 # Head tags on the payload stream stay on the head
+ itemsize=gr.sizeof_gr_complex * 1,
+ lengthtagname=self.packet_length_tag_key,
+ tag_preserve_head_pos=1 # Head tags on the payload stream stay on the head
)
self.connect(
- self,
- crc,
- header_gen,
- header_mod,
- (header_payload_mux, 0)
+ self,
+ crc,
+ header_gen,
+ header_mod,
+ (header_payload_mux, 0)
)
if debug_log:
self.connect(header_gen, blocks.file_sink(1, 'tx-hdr.dat'))
### Payload modulation ###############################################
payload_constellation = _get_constellation(bps_payload)
- payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points())
+ payload_mod = digital.chunks_to_symbols_bc(
+ payload_constellation.points())
payload_scrambler = digital.additive_scrambler_bb(
0x8a,
self.scramble_seed,
7,
- 0, # Don't reset after fixed length (let the reset tag do that)
- bits_per_byte=8, # This is before unpacking
+ 0, # Don't reset after fixed length (let the reset tag do that)
+ bits_per_byte=8, # This is before unpacking
reset_tag_key=self.packet_length_tag_key
)
payload_unpack = blocks.repack_bits_bb(
- 8, # Unpack 8 bits per byte
+ 8, # Unpack 8 bits per byte
bps_payload,
self.packet_length_tag_key
)
@@ -225,21 +246,24 @@ class ofdm_tx(gr.hier_block2):
len_tag_key=self.packet_length_tag_key
)
ffter = fft.fft_vcc(
- self.fft_len,
- False, # Inverse FFT
- (), # No window
- True # Shift
+ self.fft_len,
+ False, # Inverse FFT
+ (), # No window
+ True # Shift
)
cyclic_prefixer = digital.ofdm_cyclic_prefixer(
self.fft_len,
- self.fft_len+self.cp_len,
+ self.fft_len + self.cp_len,
rolloff,
self.packet_length_tag_key
)
- self.connect(header_payload_mux, allocator, ffter, cyclic_prefixer, self)
+ self.connect(header_payload_mux, allocator,
+ ffter, cyclic_prefixer, self)
if debug_log:
- self.connect(allocator, blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'tx-post-allocator.dat'))
- self.connect(cyclic_prefixer, blocks.file_sink(gr.sizeof_gr_complex, 'tx-signal.dat'))
+ self.connect(allocator, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'tx-post-allocator.dat'))
+ self.connect(cyclic_prefixer, blocks.file_sink(
+ gr.sizeof_gr_complex, 'tx-signal.dat'))
class ofdm_rx(gr.hier_block2):
@@ -265,6 +289,7 @@ class ofdm_rx(gr.hier_block2):
| entirely. Also used for coarse frequency offset and
| channel estimation.
"""
+
def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len,
frame_length_tag_key=_def_frame_length_tag_key,
packet_length_tag_key=_def_packet_length_tag_key,
@@ -280,60 +305,69 @@ class ofdm_rx(gr.hier_block2):
scramble_bits=False
):
gr.hier_block2.__init__(self, "ofdm_rx",
- gr.io_signature(1, 1, gr.sizeof_gr_complex),
- gr.io_signature(1, 1, gr.sizeof_char))
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(1, 1, gr.sizeof_char))
### Param init / sanity check ########################################
- self.fft_len = fft_len
- self.cp_len = cp_len
- self.frame_length_tag_key = frame_length_tag_key
- self.packet_length_tag_key = packet_length_tag_key
+ self.fft_len = fft_len
+ self.cp_len = cp_len
+ self.frame_length_tag_key = frame_length_tag_key
+ self.packet_length_tag_key = packet_length_tag_key
self.occupied_carriers = occupied_carriers
- self.bps_header = bps_header
- self.bps_payload = bps_payload
+ self.bps_header = bps_header
+ self.bps_payload = bps_payload
n_sync_words = 1
if sync_word1 is None:
- self.sync_word1 = _make_sync_word1(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word1 = _make_sync_word1(
+ fft_len, occupied_carriers, pilot_carriers)
else:
if len(sync_word1) != self.fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
self.sync_word1 = sync_word1
self.sync_word2 = ()
if sync_word2 is None:
- self.sync_word2 = _make_sync_word2(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word2 = _make_sync_word2(
+ fft_len, occupied_carriers, pilot_carriers)
n_sync_words = 2
elif len(sync_word2):
if len(sync_word2) != fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
self.sync_word2 = sync_word2
n_sync_words = 2
if scramble_bits:
self.scramble_seed = 0x7f
else:
- self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
+ self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
### Sync ############################################################
sync_detect = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
- delay = blocks.delay(gr.sizeof_gr_complex, fft_len+cp_len)
+ delay = blocks.delay(gr.sizeof_gr_complex, fft_len + cp_len)
oscillator = analog.frequency_modulator_fc(-2.0 / fft_len)
mixer = blocks.multiply_cc()
hpd = digital.header_payload_demux(
- n_sync_words+1, # Number of OFDM symbols before payload (sync + 1 sym header)
+ # Number of OFDM symbols before payload (sync + 1 sym header)
+ n_sync_words + 1,
fft_len, cp_len, # FFT length, guard interval
- frame_length_tag_key, # Frame length tag key
+ frame_length_tag_key, # Frame length tag key
"", # We're not using trigger tags
- True # One output item is one OFDM symbol (False would output complex scalars)
+ # One output item is one OFDM symbol (False would output complex scalars)
+ True
)
self.connect(self, sync_detect)
self.connect(self, delay, (mixer, 0), (hpd, 0))
self.connect((sync_detect, 0), oscillator, (mixer, 1))
self.connect((sync_detect, 1), (hpd, 1))
if debug_log:
- self.connect((sync_detect, 0), blocks.file_sink(gr.sizeof_float, 'freq-offset.dat'))
- self.connect((sync_detect, 1), blocks.file_sink(gr.sizeof_char, 'sync-detect.dat'))
+ self.connect((sync_detect, 0), blocks.file_sink(
+ gr.sizeof_float, 'freq-offset.dat'))
+ self.connect((sync_detect, 1), blocks.file_sink(
+ gr.sizeof_char, 'sync-detect.dat'))
### Header demodulation ##############################################
- header_fft = fft.fft_vcc(self.fft_len, True, (), True)
- chanest = digital.ofdm_chanest_vcvc(self.sync_word1, self.sync_word2, 1)
+ header_fft = fft.fft_vcc(self.fft_len, True, (), True)
+ chanest = digital.ofdm_chanest_vcvc(
+ self.sync_word1, self.sync_word2, 1)
header_constellation = _get_constellation(bps_header)
- header_equalizer = digital.ofdm_equalizer_simpledfe(
+ header_equalizer = digital.ofdm_equalizer_simpledfe(
fft_len,
header_constellation.base(),
occupied_carriers,
@@ -342,93 +376,108 @@ class ofdm_rx(gr.hier_block2):
symbols_skipped=0,
)
header_eq = digital.ofdm_frame_equalizer_vcvc(
- header_equalizer.base(),
- cp_len,
- self.frame_length_tag_key,
- True,
- 1 # Header is 1 symbol long
+ header_equalizer.base(),
+ cp_len,
+ self.frame_length_tag_key,
+ True,
+ 1 # Header is 1 symbol long
)
header_serializer = digital.ofdm_serializer_vcc(
- fft_len, occupied_carriers,
- self.frame_length_tag_key
+ fft_len, occupied_carriers,
+ self.frame_length_tag_key
)
- header_demod = digital.constellation_decoder_cb(header_constellation.base())
+ header_demod = digital.constellation_decoder_cb(
+ header_constellation.base())
header_formatter = digital.packet_header_ofdm(
- occupied_carriers, 1,
- packet_length_tag_key,
- frame_length_tag_key,
- packet_num_tag_key,
- bps_header,
- bps_payload,
- scramble_header=scramble_bits
+ occupied_carriers, 1,
+ packet_length_tag_key,
+ frame_length_tag_key,
+ packet_num_tag_key,
+ bps_header,
+ bps_payload,
+ scramble_header=scramble_bits
)
- header_parser = digital.packet_headerparser_b(header_formatter.formatter())
+ header_parser = digital.packet_headerparser_b(
+ header_formatter.formatter())
self.connect(
- (hpd, 0),
- header_fft,
- chanest,
- header_eq,
- header_serializer,
- header_demod,
- header_parser
+ (hpd, 0),
+ header_fft,
+ chanest,
+ header_eq,
+ header_serializer,
+ header_demod,
+ header_parser
)
self.msg_connect(header_parser, "header_data", hpd, "header_data")
if debug_log:
- self.connect((chanest, 1), blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'channel-estimate.dat'))
- self.connect((chanest, 0), blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest.dat'))
- self.connect((chanest, 0), blocks.tag_debug(gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest'))
- self.connect(header_eq, blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'post-hdr-eq.dat'))
- self.connect(header_serializer, blocks.file_sink(gr.sizeof_gr_complex, 'post-hdr-serializer.dat'))
+ self.connect((chanest, 1), blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'channel-estimate.dat'))
+ self.connect((chanest, 0), blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest.dat'))
+ self.connect((chanest, 0), blocks.tag_debug(
+ gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest'))
+ self.connect(header_eq, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-hdr-eq.dat'))
+ self.connect(header_serializer, blocks.file_sink(
+ gr.sizeof_gr_complex, 'post-hdr-serializer.dat'))
### Payload demod ####################################################
payload_fft = fft.fft_vcc(self.fft_len, True, (), True)
payload_constellation = _get_constellation(bps_payload)
payload_equalizer = digital.ofdm_equalizer_simpledfe(
- fft_len,
- payload_constellation.base(),
- occupied_carriers,
- pilot_carriers,
- pilot_symbols,
- symbols_skipped=1, # (that was already in the header)
- alpha=0.1
+ fft_len,
+ payload_constellation.base(),
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ symbols_skipped=1, # (that was already in the header)
+ alpha=0.1
)
payload_eq = digital.ofdm_frame_equalizer_vcvc(
- payload_equalizer.base(),
- cp_len,
- self.frame_length_tag_key
+ payload_equalizer.base(),
+ cp_len,
+ self.frame_length_tag_key
)
payload_serializer = digital.ofdm_serializer_vcc(
- fft_len, occupied_carriers,
- self.frame_length_tag_key,
- self.packet_length_tag_key,
- 1 # Skip 1 symbol (that was already in the header)
+ fft_len, occupied_carriers,
+ self.frame_length_tag_key,
+ self.packet_length_tag_key,
+ 1 # Skip 1 symbol (that was already in the header)
)
- payload_demod = digital.constellation_decoder_cb(payload_constellation.base())
+ payload_demod = digital.constellation_decoder_cb(
+ payload_constellation.base())
self.payload_descrambler = digital.additive_scrambler_bb(
0x8a,
self.scramble_seed,
7,
- 0, # Don't reset after fixed length
- bits_per_byte=8, # This is after packing
+ 0, # Don't reset after fixed length
+ bits_per_byte=8, # This is after packing
reset_tag_key=self.packet_length_tag_key
)
- payload_pack = blocks.repack_bits_bb(bps_payload, 8, self.packet_length_tag_key, True)
+ payload_pack = blocks.repack_bits_bb(
+ bps_payload, 8, self.packet_length_tag_key, True)
self.crc = digital.crc32_bb(True, self.packet_length_tag_key)
self.connect(
- (hpd, 1),
- payload_fft,
- payload_eq,
- payload_serializer,
- payload_demod,
- payload_pack,
- self.payload_descrambler,
- self.crc,
- self
+ (hpd, 1),
+ payload_fft,
+ payload_eq,
+ payload_serializer,
+ payload_demod,
+ payload_pack,
+ self.payload_descrambler,
+ self.crc,
+ self
)
if debug_log:
- self.connect((hpd, 1), blocks.tag_debug(gr.sizeof_gr_complex*fft_len, 'post-hpd'))
- self.connect(payload_fft, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-fft.dat'))
- self.connect(payload_eq, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-eq.dat'))
- self.connect(payload_serializer, blocks.file_sink(gr.sizeof_gr_complex, 'post-payload-serializer.dat'))
- self.connect(payload_demod, blocks.file_sink(1, 'post-payload-demod.dat'))
- self.connect(payload_pack, blocks.file_sink(1, 'post-payload-pack.dat'))
- self.connect(self.crc, blocks.file_sink(1, 'post-payload-crc.dat'))
+ self.connect((hpd, 1), blocks.tag_debug(
+ gr.sizeof_gr_complex * fft_len, 'post-hpd'))
+ self.connect(payload_fft, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-payload-fft.dat'))
+ self.connect(payload_eq, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-payload-eq.dat'))
+ self.connect(payload_serializer, blocks.file_sink(
+ gr.sizeof_gr_complex, 'post-payload-serializer.dat'))
+ self.connect(payload_demod, blocks.file_sink(
+ 1, 'post-payload-demod.dat'))
+ self.connect(payload_pack, blocks.file_sink(
+ 1, 'post-payload-pack.dat'))
+ self.connect(self.crc, blocks.file_sink(1, 'post-payload-crc.dat'))
diff --git a/gr-digital/python/digital/psk.py b/gr-digital/python/digital/psk.py
index b04d66f75f..4866d54ede 100644
--- a/gr-digital/python/digital/psk.py
+++ b/gr-digital/python/digital/psk.py
@@ -28,6 +28,7 @@ _def_mod_code = mod_codes.GRAY_CODE
# Default use of differential encoding
_def_differential = True
+
def create_encodings(mod_code, arity, differential):
post_diff_code = None
if mod_code not in mod_codes.codes:
@@ -43,13 +44,15 @@ def create_encodings(mod_code, arity, differential):
pre_diff_code = []
post_diff_code = None
else:
- raise ValueError('That modulation code is not implemented for this constellation.')
+ raise ValueError(
+ 'That modulation code is not implemented for this constellation.')
return (pre_diff_code, post_diff_code)
# /////////////////////////////////////////////////////////////////////////////
# PSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
differential=_def_differential):
"""
@@ -57,8 +60,9 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
"""
k = log(m) / log(2.0)
if (k != int(k)):
- raise Exception('Number of constellation points must be a power of two.')
- points = [exp(2*pi*(0+1j)*i/m) for i in range(0,m)]
+ raise Exception(
+ 'Number of constellation points must be a power of two.')
+ points = [exp(2 * pi * (0 + 1j) * i / m) for i in range(0, m)]
pre_diff_code, post_diff_code = create_encodings(mod_code, m, differential)
if post_diff_code is not None:
inverse_post_diff_code = mod_codes.invert_code(post_diff_code)
@@ -70,6 +74,7 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
# PSK modulator
# /////////////////////////////////////////////////////////////////////////////
+
class psk_mod(generic_mod):
"""
Hierarchical block for RRC-filtered PSK modulation.
@@ -95,14 +100,17 @@ class psk_mod(generic_mod):
mod_code=_def_mod_code,
differential=_def_differential,
*args, **kwargs):
- constellation = psk_constellation(constellation_points, mod_code, differential)
- super(psk_mod, self).__init__(constellation, differential, *args, **kwargs)
+ constellation = psk_constellation(
+ constellation_points, mod_code, differential)
+ super(psk_mod, self).__init__(
+ constellation, differential, *args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
# PSK demodulator
#
# /////////////////////////////////////////////////////////////////////////////
+
class psk_demod(generic_demod):
"""
@@ -120,12 +128,16 @@ class psk_demod(generic_demod):
"""
# See generic_mod for additional arguments
__doc__ += shared_mod_args
+
def __init__(self, constellation_points=_def_constellation_points,
mod_code=_def_mod_code,
differential=_def_differential,
*args, **kwargs):
- constellation = psk_constellation(constellation_points, mod_code, differential)
- super(psk_demod, self).__init__(constellation, differential, *args, **kwargs)
+ constellation = psk_constellation(
+ constellation_points, mod_code, differential)
+ super(psk_demod, self).__init__(
+ constellation, differential, *args, **kwargs)
+
#
# Add these to the mod/demod registry
diff --git a/gr-digital/python/digital/psk_constellations.py b/gr-digital/python/digital/psk_constellations.py
index c3809e7ade..520eafebe2 100644
--- a/gr-digital/python/digital/psk_constellations.py
+++ b/gr-digital/python/digital/psk_constellations.py
@@ -52,6 +52,7 @@ rotations are based off of.
# BPSK Constellation Mappings
+
def psk_2_0x0():
'''
0 | 1
@@ -59,9 +60,12 @@ def psk_2_0x0():
const_points = [-1, 1]
symbols = [0, 1]
return (const_points, symbols)
+
+
psk_2 = psk_2_0x0 # Basic BPSK rotation
psk_2_0 = psk_2 # First ID for BPSK rotations
+
def psk_2_0x1():
'''
1 | 0
@@ -69,6 +73,8 @@ def psk_2_0x1():
const_points = [-1, 1]
symbols = [1, 0]
return (const_points, symbols)
+
+
psk_2_1 = psk_2_0x1
@@ -81,18 +87,23 @@ def sd_psk_2_0x0(x, Es=1):
0 | 1
'''
x_re = x.real
- dist = Es*numpy.sqrt(2)
- return [dist*x_re,]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_re, ]
+
+
sd_psk_2 = sd_psk_2_0x0 # Basic BPSK rotation
sd_psk_2_0 = sd_psk_2 # First ID for BPSK rotations
+
def sd_psk_2_0x1(x, Es=1):
'''
1 | 0
'''
- x_re = [x.real,]
- dist = Es*numpy.sqrt(2)
- return -dist*x_re
+ x_re = [x.real, ]
+ dist = Es * numpy.sqrt(2)
+ return -dist * x_re
+
+
sd_psk_2_1 = sd_psk_2_0x1
@@ -106,13 +117,16 @@ def psk_4_0x0_0_1():
| -------
| 00 | 01
'''
- const_points = [-1-1j, 1-1j,
- -1+1j, 1+1j]
+ const_points = [-1 - 1j, 1 - 1j,
+ -1 + 1j, 1 + 1j]
symbols = [0, 1, 2, 3]
return (const_points, symbols)
+
+
psk_4 = psk_4_0x0_0_1
psk_4_0 = psk_4
+
def psk_4_0x1_0_1():
'''
| 11 | 10
@@ -122,8 +136,11 @@ def psk_4_0x1_0_1():
k = 0x1
pi = [0, 1]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_1 = psk_4_0x1_0_1
+
def psk_4_0x2_0_1():
'''
| 00 | 01
@@ -133,8 +150,11 @@ def psk_4_0x2_0_1():
k = 0x2
pi = [0, 1]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_2 = psk_4_0x2_0_1
+
def psk_4_0x3_0_1():
'''
| 01 | 00
@@ -144,8 +164,11 @@ def psk_4_0x3_0_1():
k = 0x3
pi = [0, 1]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_3 = psk_4_0x3_0_1
+
def psk_4_0x0_1_0():
'''
| 01 | 11
@@ -155,8 +178,11 @@ def psk_4_0x0_1_0():
k = 0x0
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_4 = psk_4_0x0_1_0
+
def psk_4_0x1_1_0():
'''
| 00 | 10
@@ -166,8 +192,11 @@ def psk_4_0x1_1_0():
k = 0x1
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_5 = psk_4_0x1_1_0
+
def psk_4_0x2_1_0():
'''
| 11 | 01
@@ -177,8 +206,11 @@ def psk_4_0x2_1_0():
k = 0x2
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_6 = psk_4_0x2_1_0
+
def psk_4_0x3_1_0():
'''
| 10 | 00
@@ -188,9 +220,10 @@ def psk_4_0x3_1_0():
k = 0x3
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
-psk_4_7 = psk_4_0x3_1_0
+psk_4_7 = psk_4_0x3_1_0
+
############################################################
# QPSK Constellation Softbit LUT generators
@@ -204,11 +237,14 @@ def sd_psk_4_0x0_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_im, dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_im, dist * x_re]
+
+
sd_psk_4 = sd_psk_4_0x0_0_1
sd_psk_4_0 = sd_psk_4
+
def sd_psk_4_0x1_0_1(x, Es=1):
'''
| 11 | 10
@@ -217,10 +253,13 @@ def sd_psk_4_0x1_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_im, -dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_im, -dist * x_re]
+
+
sd_psk_4_1 = sd_psk_4_0x1_0_1
+
def sd_psk_4_0x2_0_1(x, Es=1):
'''
| 00 | 01
@@ -229,10 +268,13 @@ def sd_psk_4_0x2_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_im, dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_im, dist * x_re]
+
+
sd_psk_4_2 = sd_psk_4_0x2_0_1
+
def sd_psk_4_0x3_0_1(x, Es=1):
'''
| 01 | 00
@@ -241,10 +283,13 @@ def sd_psk_4_0x3_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_im, -dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_im, -dist * x_re]
+
+
sd_psk_4_3 = sd_psk_4_0x3_0_1
+
def sd_psk_4_0x0_1_0(x, Es=1):
'''
| 01 | 11
@@ -253,10 +298,13 @@ def sd_psk_4_0x0_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_re, dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_re, dist * x_im]
+
+
sd_psk_4_4 = sd_psk_4_0x0_1_0
+
def sd_psk_4_0x1_1_0(x, Es=1):
'''
| 00 | 10
@@ -265,8 +313,10 @@ def sd_psk_4_0x1_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_re, -dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_re, -dist * x_im]
+
+
sd_psk_4_5 = sd_psk_4_0x1_1_0
@@ -278,10 +328,13 @@ def sd_psk_4_0x2_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_re, dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_re, dist * x_im]
+
+
sd_psk_4_6 = sd_psk_4_0x2_1_0
+
def sd_psk_4_0x3_1_0(x, Es=1):
'''
| 10 | 00
@@ -290,6 +343,8 @@ def sd_psk_4_0x3_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_re, -dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_re, -dist * x_im]
+
+
sd_psk_4_7 = sd_psk_4_0x3_1_0
diff --git a/gr-digital/python/digital/qa_burst_shaper.py b/gr-digital/python/digital/qa_burst_shaper.py
index 40ac210ec2..b1fc793924 100644
--- a/gr-digital/python/digital/qa_burst_shaper.py
+++ b/gr-digital/python/digital/qa_burst_shaper.py
@@ -85,7 +85,8 @@ class qa_burst_shaper (gr_unittest.TestCase):
-4.0 * np.ones(5, dtype=complex)))
tags = (make_length_tag(0, length),)
expected = np.concatenate((np.zeros(prepad, dtype=complex), window[0:5],
- np.ones(length - len(window), dtype=complex),
+ np.ones(length - len(window),
+ dtype=complex),
window[5:10], np.zeros(postpad,
dtype=complex)))
etag = make_length_tag(0, length + prepad + postpad)
@@ -239,8 +240,10 @@ class qa_burst_shaper (gr_unittest.TestCase):
window = np.concatenate((-2.0 * np.ones(5), -4.0 * np.ones(5)))
tags = (make_length_tag(0, length1), make_length_tag(length1, length2))
expected = np.concatenate((np.zeros(prepad), window[0:5],
- np.ones(length1 - len(window)), window[5:10],
- np.zeros(postpad + prepad), -1.0 * window[0:5],
+ np.ones(length1 - len(window)
+ ), window[5:10],
+ np.zeros(postpad + prepad), -
+ 1.0 * window[0:5],
-1.0 * np.ones(length2 - len(window)),
-1.0 * window[5:10], np.zeros(postpad)))
etags = (make_length_tag(0, length1 + prepad + postpad),
@@ -335,8 +338,10 @@ class qa_burst_shaper (gr_unittest.TestCase):
make_tag(tag4_offset, 'body', pmt.intern('tag4')),
make_tag(tag5_offset, 'body', pmt.intern('tag5')))
expected = np.concatenate((np.zeros(prepad), window[0:5],
- np.ones(length1 - len(window)), window[5:10],
- np.zeros(postpad + prepad), -1.0 * window[0:5],
+ np.ones(length1 - len(window)
+ ), window[5:10],
+ np.zeros(postpad + prepad), -
+ 1.0 * window[0:5],
-1.0 * np.ones(length2 - len(window)),
-1.0 * window[5:10], np.zeros(postpad)))
elentag1_offset = 0
diff --git a/gr-digital/python/digital/qa_constellation.py b/gr-digital/python/digital/qa_constellation.py
index 7c9984405f..7345f782a2 100644
--- a/gr-digital/python/digital/qa_constellation.py
+++ b/gr-digital/python/digital/qa_constellation.py
@@ -230,7 +230,6 @@ class test_constellation(gr_unittest.TestCase):
data[first:],
msg=msg)
-
def test_soft_qpsk_gen(self):
prec = 8
constel, code = digital.psk_4_0()
diff --git a/gr-digital/python/digital/qa_constellation_encoder_bc.py b/gr-digital/python/digital/qa_constellation_encoder_bc.py
index 2c6d587dac..ef1991bf80 100644
--- a/gr-digital/python/digital/qa_constellation_encoder_bc.py
+++ b/gr-digital/python/digital/qa_constellation_encoder_bc.py
@@ -12,6 +12,7 @@
from gnuradio import gr, gr_unittest, digital, blocks
import numpy as np
+
class test_constellation_encoder(gr_unittest.TestCase):
def setUp(self):
@@ -23,8 +24,8 @@ class test_constellation_encoder(gr_unittest.TestCase):
def test_constellation_encoder_bc_bpsk(self):
cnst = digital.constellation_bpsk()
- src_data = (1, 1, 0, 0,
- 1, 0, 1)
+ src_data = (1, 1, 0, 0,
+ 1, 0, 1)
const_map = [-1.0, 1.0]
expected_result = [const_map[x] for x in src_data]
@@ -43,8 +44,8 @@ class test_constellation_encoder(gr_unittest.TestCase):
def test_constellation_encoder_bc_qpsk(self):
cnst = digital.constellation_qpsk()
- src_data = (3, 1, 0, 2,
- 3, 2, 1)
+ src_data = (3, 1, 0, 2,
+ 3, 2, 1)
expected_result = [cnst.points()[x] for x in src_data]
src = blocks.vector_source_b(src_data)
op = digital.constellation_encoder_bc(cnst.base())
@@ -59,7 +60,6 @@ class test_constellation_encoder(gr_unittest.TestCase):
# print "expected result", expected_result
self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
-
def test_constellation_encoder_bc_qpsk_random(self):
cnst = digital.constellation_qpsk()
src_data = np.random.randint(0, 4, size=20000)
@@ -77,5 +77,6 @@ class test_constellation_encoder(gr_unittest.TestCase):
# print "expected result", expected_result
self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
+
if __name__ == '__main__':
gr_unittest.run(test_constellation_encoder)
diff --git a/gr-digital/python/digital/qa_decision_feedback_equalizer.py b/gr-digital/python/digital/qa_decision_feedback_equalizer.py
index 1d4587a6f0..f915272e52 100755
--- a/gr-digital/python/digital/qa_decision_feedback_equalizer.py
+++ b/gr-digital/python/digital/qa_decision_feedback_equalizer.py
@@ -116,7 +116,7 @@ class qa_decision_feedback_equalizer(gr_unittest.TestCase):
1,
1,
alg,
- True,
+ True,
[],
'')
dst = blocks.vector_sink_c()
diff --git a/gr-digital/python/digital/qa_glfsr_source.py b/gr-digital/python/digital/qa_glfsr_source.py
index 7fde483ed3..23382605c6 100644
--- a/gr-digital/python/digital/qa_glfsr_source.py
+++ b/gr-digital/python/digital/qa_glfsr_source.py
@@ -60,7 +60,7 @@ class test_glfsr_source(gr_unittest.TestCase):
lambda: digital.glfsr_source_f(0))
self.assertRaises(RuntimeError,
lambda: digital.glfsr_source_f(65))
-
+
def test_005_correlation_f(self):
for degree in range(
1, 11): # Higher degrees take too long to correlate
diff --git a/gr-digital/python/digital/qa_lfsr.py b/gr-digital/python/digital/qa_lfsr.py
index e0db6a382b..8caa0dbada 100644
--- a/gr-digital/python/digital/qa_lfsr.py
+++ b/gr-digital/python/digital/qa_lfsr.py
@@ -14,6 +14,7 @@ import numpy as np
from gnuradio import gr, gr_unittest, digital
from gnuradio.digital.utils import lfsr_args
+
class test_lfsr(gr_unittest.TestCase):
def setUp(self):
@@ -33,19 +34,20 @@ class test_lfsr(gr_unittest.TestCase):
self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5)
def test_lfsr_002(self):
- l = digital.lfsr(*lfsr_args(0b1,5,3,0))
- result_data = [l.next_bit() for _ in range(2*(2**5-1))]
-
+ l = digital.lfsr(*lfsr_args(0b1, 5, 3, 0))
+ result_data = [l.next_bit() for _ in range(2 * (2**5 - 1))]
+
expected_result = [1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0,
- 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0]*2
+ 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0] * 2
self.assertEqual(expected_result, result_data)
- seq1 = [l.next_bit() for _ in range(2**5-1)]
- seq2 = [l.next_bit() for _ in range(2**5-1)]
- self.assertEqual(seq1,seq2)
+ seq1 = [l.next_bit() for _ in range(2**5 - 1)]
+ seq2 = [l.next_bit() for _ in range(2**5 - 1)]
+ self.assertEqual(seq1, seq2)
+
+ res = (np.convolve(seq1, [1, 0, 1, 0, 0, 1]) % 2)
+ self.assertTrue(sum(res[5:-5]) == 0, "LRS not generated properly")
- res = (np.convolve(seq1,[1,0,1,0,0,1])%2)
- self.assertTrue(sum(res[5:-5])==0,"LRS not generated properly")
if __name__ == '__main__':
gr_unittest.run(test_lfsr)
diff --git a/gr-digital/python/digital/qa_linear_equalizer.py b/gr-digital/python/digital/qa_linear_equalizer.py
index 64fd84d2f3..39fea0ef49 100755
--- a/gr-digital/python/digital/qa_linear_equalizer.py
+++ b/gr-digital/python/digital/qa_linear_equalizer.py
@@ -111,7 +111,7 @@ class qa_linear_equalizer(gr_unittest.TestCase):
4,
1,
alg,
- True,
+ True,
[],
'')
dst = blocks.vector_sink_c()
diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
index a1f8cef6e9..7d5f79781c 100644
--- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
+++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
@@ -303,7 +303,8 @@ class qa_ofdm_chanest_vcvc (gr_unittest.TestCase):
for x in range(fft_len)]
src = blocks.vector_source_c(tx_data, False, fft_len)
chan = blocks.multiply_const_vcc(channel)
- noise = blocks.vector_source_c(numpy.random.normal(0,wgn_amplitude,(len(tx_data),)), False, fft_len)
+ noise = blocks.vector_source_c(numpy.random.normal(
+ 0, wgn_amplitude, (len(tx_data),)), False, fft_len)
add = blocks.add_cc(fft_len)
chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1)
sink = blocks.vector_sink_c(fft_len)
@@ -325,7 +326,8 @@ class qa_ofdm_chanest_vcvc (gr_unittest.TestCase):
shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
for i in range(fft_len):
if shifted_carrier_mask[i] and channel_est[i]:
- self.assertAlmostEqual(channel[i], channel_est[i], places=0)
+ self.assertAlmostEqual(
+ channel[i], channel_est[i], places=0)
rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real
return carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat))
bit_errors = 0
diff --git a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
index 254479363e..10b484e839 100644
--- a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
+++ b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
@@ -106,7 +106,8 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
for _ in range(n_bursts):
gap = [0, ] * random.randint(0, 2 * fft_len)
tx_signal += gap + \
- make_bpsk_burst(fft_len, cp_len, fft_len * random.randint(5, 23))
+ make_bpsk_burst(fft_len, cp_len, fft_len *
+ random.randint(5, 23))
# Very loose definition of SNR here
snr = 20 # dB
sigma = 10**(-snr / 10)
diff --git a/gr-digital/python/digital/qa_scrambler.py b/gr-digital/python/digital/qa_scrambler.py
index 7d87f87b6c..b350d9e1b7 100644
--- a/gr-digital/python/digital/qa_scrambler.py
+++ b/gr-digital/python/digital/qa_scrambler.py
@@ -26,6 +26,7 @@ def additive_scramble_lfsr(mask, seed, reglen, bpb, data):
out.append(d ^ scramble_word)
return out
+
class test_scrambler(gr_unittest.TestCase):
def setUp(self):
@@ -34,33 +35,35 @@ class test_scrambler(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
-
def test_lfsr_002(self):
- _a = lfsr_args(1,51,3,0)
+ _a = lfsr_args(1, 51, 3, 0)
l = digital.lfsr(*_a)
seq = [l.next_bit() for _ in range(2**10)]
- reg = np.zeros(52,np.int8)
- reg[::-1][(51,3,0),] = 1
- res = (np.convolve(seq,reg)%2)
- self.assertTrue(sum(res[52:-52])==0,"LRS not generated properly")
+ reg = np.zeros(52, np.int8)
+ reg[::-1][(51, 3, 0), ] = 1
+ res = (np.convolve(seq, reg) % 2)
+ self.assertTrue(sum(res[52:-52]) == 0, "LRS not generated properly")
def test_scrambler_descrambler_001(self):
- src_data = np.random.randint(0,2,500,dtype=np.int8)
+ src_data = np.random.randint(0, 2, 500, dtype=np.int8)
src = blocks.vector_source_b(src_data, False)
- scrambler = digital.scrambler_bb(*lfsr_args(0b1,7,2,0)) # p(x) = x^7 + x^2 + 1
- descrambler = digital.descrambler_bb(*lfsr_args(0b111,7,2,0)) # we can use any seed here, it is descrambling.
+ scrambler = digital.scrambler_bb(
+ *lfsr_args(0b1, 7, 2, 0)) # p(x) = x^7 + x^2 + 1
+ # we can use any seed here, it is descrambling.
+ descrambler = digital.descrambler_bb(*lfsr_args(0b111, 7, 2, 0))
m_tap = blocks.vector_sink_b()
dst = blocks.vector_sink_b()
self.tb.connect(src, scrambler, descrambler, dst)
self.tb.connect(scrambler, m_tap)
self.tb.run()
- self.assertEqual(src_data[:-7].tolist(), dst.data()[7:]) # skip garbage during synchronization
- self.assertEqual(tuple(np.convolve(m_tap.data(),[1,0,0,0,0,1,0,1])%2)[7:-10],
- tuple(src_data[:-10])) # manual descrambling test
+ # skip garbage during synchronization
+ self.assertEqual(src_data[:-7].tolist(), dst.data()[7:])
+ self.assertEqual(tuple(np.convolve(m_tap.data(), [1, 0, 0, 0, 0, 1, 0, 1]) % 2)[7:-10],
+ tuple(src_data[:-10])) # manual descrambling test
def test_scrambler_descrambler_002(self):
- _a = lfsr_args(0b1,51,6,0) #p(x) = x^51+x^6+1
- src_data = np.random.randint(0,2,1000,dtype=np.int8)
+ _a = lfsr_args(0b1, 51, 6, 0) # p(x) = x^51+x^6+1
+ src_data = np.random.randint(0, 2, 1000, dtype=np.int8)
src = blocks.vector_source_b(src_data, False)
scrambler = digital.scrambler_bb(*_a)
m_tap = blocks.vector_sink_b()
@@ -69,26 +72,29 @@ class test_scrambler(gr_unittest.TestCase):
self.tb.connect(src, scrambler, descrambler, dst)
self.tb.connect(scrambler, m_tap)
self.tb.run()
- self.assertTrue(np.all(src_data[:-51]==dst.data()[51:])) # skip garbage during synchronization
- reg = np.zeros(52,np.int8)
- reg[::-1][(51,6,0),] = 1
- self.assertTrue(np.all( np.convolve(m_tap.data(),reg)[51:-60]%2
- == src_data[:-60])) # manual descrambling test
+ # skip garbage during synchronization
+ self.assertTrue(np.all(src_data[:-51] == dst.data()[51:]))
+ reg = np.zeros(52, np.int8)
+ reg[::-1][(51, 6, 0), ] = 1
+ self.assertTrue(np.all(np.convolve(m_tap.data(), reg)[51:-60] % 2 ==
+ src_data[:-60])) # manual descrambling test
def test_scrambler_descrambler_003(self):
- src_data = np.random.randint(0,2,1000,dtype=np.int8)
+ src_data = np.random.randint(0, 2, 1000, dtype=np.int8)
src = blocks.vector_source_b(src_data, False)
- scrambler = digital.scrambler_bb(*lfsr_args(1,12,10,3,2,0)) # this is the product of the other two
- descrambler1 = digital.descrambler_bb(*lfsr_args(1,5,3,0))
- descrambler2 = digital.descrambler_bb(*lfsr_args(1,7,2,0))
+ # this is the product of the other two
+ scrambler = digital.scrambler_bb(*lfsr_args(1, 12, 10, 3, 2, 0))
+ descrambler1 = digital.descrambler_bb(*lfsr_args(1, 5, 3, 0))
+ descrambler2 = digital.descrambler_bb(*lfsr_args(1, 7, 2, 0))
dst = blocks.vector_sink_b()
self.tb.connect(src, scrambler, descrambler1, descrambler2, dst)
self.tb.run()
- self.assertTrue(np.all(src_data[:-12]==dst.data()[12:])) # skip garbage during synchronization
+ # skip garbage during synchronization
+ self.assertTrue(np.all(src_data[:-12] == dst.data()[12:]))
def test_additive_scrambler_001(self):
- _a = lfsr_args(1,51,3,0) #i p(x) = x^51+x^3+1, seed 0x1
- src_data = np.random.randint(0,2,1000,dtype=np.int8).tolist()
+ _a = lfsr_args(1, 51, 3, 0) # i p(x) = x^51+x^3+1, seed 0x1
+ src_data = np.random.randint(0, 2, 1000, dtype=np.int8).tolist()
src = blocks.vector_source_b(src_data, False)
scrambler = digital.additive_scrambler_bb(*_a)
descrambler = digital.additive_scrambler_bb(*_a)
@@ -98,18 +104,18 @@ class test_scrambler(gr_unittest.TestCase):
self.assertEqual(tuple(src_data), tuple(dst.data()))
def test_additive_scrambler_002(self):
- _a = lfsr_args(1,51,3,0) #i p(x) = x^51+x^3+1, seed 0x1
- src_data = [1,]*1000
+ _a = lfsr_args(1, 51, 3, 0) # i p(x) = x^51+x^3+1, seed 0x1
+ src_data = [1, ] * 1000
src = blocks.vector_source_b(src_data, False)
scrambler = digital.additive_scrambler_bb(*_a)
dst = blocks.vector_sink_b()
self.tb.connect(src, scrambler, dst)
self.tb.run()
- reg = np.zeros(52,np.int8)
- reg[::-1][(51,3,0),] = 1
- res = (np.convolve(dst.data(),reg)%2)[52:-52]
- self.assertEqual(len(res), sum(res)) # when convolved with mask,
- # after sync, only 1's would be returned.
+ reg = np.zeros(52, np.int8)
+ reg[::-1][(51, 3, 0), ] = 1
+ res = (np.convolve(dst.data(), reg) % 2)[52:-52]
+ self.assertEqual(len(res), sum(res)) # when convolved with mask,
+ # after sync, only 1's would be returned.
def test_scrambler_descrambler(self):
src_data = [1, ] * 1000
diff --git a/gr-digital/python/digital/qam.py b/gr-digital/python/digital/qam.py
index d74866b946..6e8951ef1e 100644
--- a/gr-digital/python/digital/qam.py
+++ b/gr-digital/python/digital/qam.py
@@ -243,8 +243,8 @@ def large_ampls_to_corners_mapping(side, points, width):
# use the center point.
c = ((real_x - side / 2.0 + 0.5) * width +
(imag_x - side / 2.0 + 0.5) * width * 1j)
- if (real_x >= extra_layers and real_x < side - extra_layers
- and imag_x >= extra_layers and imag_x < side - extra_layers):
+ if (real_x >= extra_layers and real_x < side - extra_layers and
+ imag_x >= extra_layers and imag_x < side - extra_layers):
# This is not an edge row/column. Find closest point.
index = find_closest_point(c, points)
else:
diff --git a/gr-digital/python/digital/qpsk.py b/gr-digital/python/digital/qpsk.py
index db97d31fc3..22587948fc 100644
--- a/gr-digital/python/digital/qpsk.py
+++ b/gr-digital/python/digital/qpsk.py
@@ -27,23 +27,28 @@ _def_mod_code = mod_codes.GRAY_CODE
# QPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def qpsk_constellation(mod_code=_def_mod_code):
"""
Creates a QPSK constellation.
"""
if mod_code != mod_codes.GRAY_CODE:
- raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
+ raise ValueError(
+ "This QPSK mod/demod works only for gray-coded constellations.")
return digital.constellation_qpsk()
# /////////////////////////////////////////////////////////////////////////////
# DQPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def dqpsk_constellation(mod_code=_def_mod_code):
if mod_code != mod_codes.GRAY_CODE:
- raise ValueError("The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.")
+ raise ValueError(
+ "The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.")
return digital.constellation_dqpsk()
+
#
# Add these to the mod/demod registry
#
diff --git a/gr-digital/python/digital/soft_dec_lut_gen.py b/gr-digital/python/digital/soft_dec_lut_gen.py
index 898af5320d..13c50251a5 100644
--- a/gr-digital/python/digital/soft_dec_lut_gen.py
+++ b/gr-digital/python/digital/soft_dec_lut_gen.py
@@ -11,6 +11,7 @@
import numpy
+
def soft_dec_table_generator(soft_dec_gen, prec, Es=1):
'''
| Builds a LUT that is a list of tuples. The tuple represents the
@@ -71,7 +72,7 @@ def soft_dec_table_generator(soft_dec_gen, prec, Es=1):
'''
npts = int(2.0**prec)
- maxd = Es*numpy.sqrt(2.0)/2.0
+ maxd = Es * numpy.sqrt(2.0) / 2.0
yrng = numpy.linspace(-maxd, maxd, npts)
xrng = numpy.linspace(-maxd, maxd, npts)
@@ -83,6 +84,7 @@ def soft_dec_table_generator(soft_dec_gen, prec, Es=1):
table.append(decs)
return table
+
def soft_dec_table(constel, symbols, prec, npwr=1):
'''
Similar in nature to soft_dec_table_generator above. Instead, this
@@ -120,6 +122,7 @@ def soft_dec_table(constel, symbols, prec, npwr=1):
table.append(decs)
return table
+
def calc_soft_dec_from_table(sample, table, prec, Es=1.0):
'''
Takes in a complex sample and converts it from the coordinates
@@ -144,25 +147,26 @@ def calc_soft_dec_from_table(sample, table, prec, Es=1.0):
constellation.
'''
lut_scale = 2.0**prec
- maxd = Es*numpy.sqrt(2.0)/2.0
- scale = (lut_scale) / (2.0*maxd)
+ maxd = Es * numpy.sqrt(2.0) / 2.0
+ scale = (lut_scale) / (2.0 * maxd)
- alpha = 0.99 # to keep index within bounds
+ alpha = 0.99 # to keep index within bounds
xre = sample.real
xim = sample.imag
- xre = ((maxd + min(alpha*maxd, max(-alpha*maxd, xre))) * scale)
- xim = ((maxd + min(alpha*maxd, max(-alpha*maxd, xim))) * scale)
- index = int(xre) + lut_scale*int(xim)
+ xre = ((maxd + min(alpha * maxd, max(-alpha * maxd, xre))) * scale)
+ xim = ((maxd + min(alpha * maxd, max(-alpha * maxd, xim))) * scale)
+ index = int(xre) + lut_scale * int(xim)
max_index = lut_scale**2
while(index >= max_index):
- index -= lut_scale;
+ index -= lut_scale
while(index < 0):
- index += lut_scale;
+ index += lut_scale
return table[int(index)]
+
def calc_soft_dec(sample, constel, symbols, npwr=1):
'''
This function takes in any consteallation and symbol symbol set
@@ -186,8 +190,8 @@ def calc_soft_dec(sample, constel, symbols, npwr=1):
M = len(constel)
k = int(numpy.log2(M))
- tmp = 2*k*[0]
- s = k*[0]
+ tmp = 2 * k * [0]
+ s = k * [0]
for i in range(M):
# Calculate the distance between the sample and the current
@@ -200,21 +204,21 @@ def calc_soft_dec(sample, constel, symbols, npwr=1):
for j in range(k):
# Get the bit at the jth index
- mask = 1<<j
+ mask = 1 << j
bit = (symbols[i] & mask) >> j
# If the bit is a 0, add to the probability of a zero
if(bit == 0):
- tmp[2*j+0] += d
+ tmp[2 * j + 0] += d
# else, add to the probability of a one
else:
- tmp[2*j+1] += d
+ tmp[2 * j + 1] += d
# Calculate the log-likelihood ratio for all bits based on the
# probability of ones (tmp[2*i+1]) over the probability of a zero
# (tmp[2*i+0]).
for i in range(k):
- s[k-1-i] = (numpy.log(tmp[2*i+1]) - numpy.log(tmp[2*i+0]))
+ s[k - 1 - i] = (numpy.log(tmp[2 * i + 1]) - numpy.log(tmp[2 * i + 0]))
return s
@@ -225,19 +229,19 @@ def show_table(table):
pp = ""
subi = 1
subj = 0
- for i in reversed(list(range(prec+1))):
- if(i == prec//2):
- pp += "-----" + prec*((nbits*8)+3)*"-" + "\n"
+ for i in reversed(list(range(prec + 1))):
+ if(i == prec // 2):
+ pp += "-----" + prec * ((nbits * 8) + 3) * "-" + "\n"
subi = 0
continue
- for j in range(prec+1):
- if(j == prec//2):
+ for j in range(prec + 1):
+ if(j == prec // 2):
pp += "| "
subj = 1
else:
- item = table[prec*(i-subi) + (j-subj)]
+ item = table[prec * (i - subi) + (j - subj)]
pp += "( "
- for t in range(nbits-1, -1, -1):
+ for t in range(nbits - 1, -1, -1):
pp += "{0: .4f} ".format(item[t])
pp += ") "
pp += "\n"
diff --git a/gr-digital/python/digital/test_soft_decisions.py b/gr-digital/python/digital/test_soft_decisions.py
index 59b6edc032..3350103573 100644
--- a/gr-digital/python/digital/test_soft_decisions.py
+++ b/gr-digital/python/digital/test_soft_decisions.py
@@ -9,13 +9,15 @@
#
-import numpy, sys
+import numpy
+import sys
from matplotlib import pyplot
from gnuradio import digital
from .soft_dec_lut_gen import soft_dec_table, calc_soft_dec_from_table, calc_soft_dec
from .psk_constellations import psk_4_0, psk_4_1, psk_4_2, psk_4_3, psk_4_4, psk_4_5, psk_4_6, psk_4_7, sd_psk_4_0, sd_psk_4_1, sd_psk_4_2, sd_psk_4_3, sd_psk_4_4, sd_psk_4_5, sd_psk_4_6, sd_psk_4_7
from .qam_constellations import qam_16_0, sd_qam_16_0
+
def test_qpsk(i, sample, prec):
qpsk_const_list = [psk_4_0, psk_4_1, psk_4_2, psk_4_3,
psk_4_4, psk_4_5, psk_4_6, psk_4_7]
@@ -33,7 +35,8 @@ def test_qpsk(i, sample, prec):
# Get max energy/symbol in constellation
constel = c.points()
- Es = max([numpy.sqrt(constel_i.real**2 + constel_i.imag**2) for constel_i in constel])
+ Es = max([numpy.sqrt(constel_i.real**2 + constel_i.imag**2)
+ for constel_i in constel])
#table = soft_dec_table_generator(qpsk_lut_gen, prec, Es)
table = soft_dec_table(constel, code, prec)
@@ -50,6 +53,7 @@ def test_qpsk(i, sample, prec):
return (y_python_gen_calc, y_python_table, y_python_raw_calc,
y_cpp_table, y_cpp_raw_calc, constel, code, c)
+
def test_qam16(i, sample, prec):
sample = sample / 1
qam_const_list = [qam_16_0, ]
@@ -71,7 +75,7 @@ def test_qam16(i, sample, prec):
#table = soft_dec_table_generator(qam_lut_gen, prec, Es)
table = soft_dec_table(constel, code, prec, 1)
- #c.gen_soft_dec_lut(prec)
+ # c.gen_soft_dec_lut(prec)
c.set_soft_dec_lut(table, prec)
y_python_gen_calc = qam_lut_gen(sample, Es)
@@ -83,14 +87,15 @@ def test_qam16(i, sample, prec):
return (y_python_gen_calc, y_python_table, y_python_raw_calc,
y_cpp_table, y_cpp_raw_calc, constel, code, c)
+
if __name__ == "__main__":
index = 0
prec = 8
- x_re = 2*numpy.random.random()-1
- x_im = 2*numpy.random.random()-1
- x = x_re + x_im*1j
+ x_re = 2 * numpy.random.random() - 1
+ x_im = 2 * numpy.random.random() - 1
+ x = x_re + x_im * 1j
#x = -1 + -0.j
if 1:
@@ -112,14 +117,14 @@ if __name__ == "__main__":
print("C++ Raw calc: ", (y_cpp_raw_calc))
fig = pyplot.figure(1)
- sp1 = fig.add_subplot(1,1,1)
+ sp1 = fig.add_subplot(1, 1, 1)
sp1.plot([c.real for c in constel],
[c.imag for c in constel], 'bo')
sp1.plot(x.real, x.imag, 'ro')
sp1.set_xlim([-1.5, 1.5])
sp1.set_ylim([-1.5, 1.5])
fill = int(numpy.log2(len(constel)))
- for i,c in enumerate(constel):
- sp1.text(1.2*c.real, 1.2*c.imag, bin(code[i])[2:].zfill(fill),
+ for i, c in enumerate(constel):
+ sp1.text(1.2 * c.real, 1.2 * c.imag, bin(code[i])[2:].zfill(fill),
ha='center', va='center', size=18)
pyplot.show()
diff --git a/gr-digital/python/digital/utils/__init__.py b/gr-digital/python/digital/utils/__init__.py
index 0d5aae0b79..f3a8390af8 100644
--- a/gr-digital/python/digital/utils/__init__.py
+++ b/gr-digital/python/digital/utils/__init__.py
@@ -1,11 +1,11 @@
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
from .lfsr import lfsr_args
diff --git a/gr-digital/python/digital/utils/alignment.py b/gr-digital/python/digital/utils/alignment.py
index f0541dea56..531c39830e 100644
--- a/gr-digital/python/digital/utils/alignment.py
+++ b/gr-digital/python/digital/utils/alignment.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
"""
This module contains functions for aligning sequences.
@@ -41,6 +41,7 @@ def_max_offset = 500
# The maximum number of samples to take from two sequences to check alignment.
def_num_samples = 1000
+
def compare_sequences(d1, d2, offset, sample_indices=None):
"""
Takes two binary sequences and an offset and returns the number of
@@ -49,7 +50,7 @@ def compare_sequences(d1, d2, offset, sample_indices=None):
offset -- offset of d2 relative to d1
sample_indices -- a list of indices to use for the comparison
"""
- max_index = min(len(d1), len(d2)+offset)
+ max_index = min(len(d1), len(d2) + offset)
if sample_indices is None:
sample_indices = list(range(0, max_index))
correct = 0
@@ -57,11 +58,12 @@ def compare_sequences(d1, d2, offset, sample_indices=None):
for i in sample_indices:
if i >= max_index:
break
- if d1[i] == d2[i-offset]:
+ if d1[i] == d2[i - offset]:
correct += 1
total += 1
return (correct, total)
+
def random_sample(size, num_samples=def_num_samples, seed=None):
"""
Returns a set of random integers between 0 and (size-1).
@@ -76,12 +78,13 @@ def random_sample(size, num_samples=def_num_samples, seed=None):
num_samples = num_samples / 2
indices = set([])
while len(indices) < num_samples:
- index = rndm.randint(0, size-1)
+ index = rndm.randint(0, size - 1)
indices.add(index)
indices = list(indices)
indices.sort()
return indices
+
def align_sequences(d1, d2,
num_samples=def_num_samples,
max_offset=def_max_offset,
@@ -113,7 +116,7 @@ def align_sequences(d1, d2,
int_range = [item for items in zip(pos_range, neg_range) for item in items]
for offset in int_range:
correct, compared = compare_sequences(d1, d2, offset, indices)
- frac_correct = 1.0*correct/compared
+ frac_correct = 1.0 * correct / compared
if frac_correct > max_frac_correct:
max_frac_correct = frac_correct
best_offset = offset
@@ -122,8 +125,8 @@ def align_sequences(d1, d2,
if frac_correct > correct_cutoff:
break
return max_frac_correct, best_compared, best_offset, indices
-
+
+
if __name__ == "__main__":
import doctest
doctest.testmod()
-
diff --git a/gr-digital/python/digital/utils/gray_code.py b/gr-digital/python/digital/utils/gray_code.py
index e045e9a4ac..de8ffbc93c 100644
--- a/gr-digital/python/digital/utils/gray_code.py
+++ b/gr-digital/python/digital/utils/gray_code.py
@@ -41,14 +41,14 @@ class GrayCodeGenerator(object):
else:
# if not we take advantage of the symmetry of all but the last bit
# around a power of two.
- result = self.gcs[2*self.lp2-1-self.i] + self.lp2
+ result = self.gcs[2 * self.lp2 - 1 - self.i] + self.lp2
self.gcs.append(result)
self.i += 1
if self.i == self.np2:
self.lp2 = self.i
- self.np2 = self.i*2
+ self.np2 = self.i * 2
+
_gray_code_generator = GrayCodeGenerator()
gray_code = _gray_code_generator.get_gray_code
-
diff --git a/gr-digital/python/digital/utils/lfsr.py b/gr-digital/python/digital/utils/lfsr.py
index 2b8a47cb76..44f62f0c7a 100644
--- a/gr-digital/python/digital/utils/lfsr.py
+++ b/gr-digital/python/digital/utils/lfsr.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python
#
# Copyright 2020 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
def lfsr_args(seed, *exp):
"""
@@ -18,4 +18,4 @@ def lfsr_args(seed, *exp):
Creates an lfsr object with seed 0b11001, mask 0b1000011, K=6
"""
from functools import reduce
- return reduce(int.__xor__, map(lambda x:2**x, exp)), seed, max(exp)-1
+ return reduce(int.__xor__, map(lambda x: 2**x, exp)), seed, max(exp) - 1
diff --git a/gr-digital/python/digital/utils/mod_codes.py b/gr-digital/python/digital/utils/mod_codes.py
index bafb85e18b..f0ac9f1fb5 100644
--- a/gr-digital/python/digital/utils/mod_codes.py
+++ b/gr-digital/python/digital/utils/mod_codes.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
# Constants used to represent what coding to use.
GRAY_CODE = 'gray'
@@ -15,6 +15,7 @@ NO_CODE = 'none'
codes = (GRAY_CODE, SET_PARTITION_CODE, NO_CODE)
+
def invert_code(code):
c = enumerate(code)
ic = [(b, a) for (a, b) in c]
diff --git a/gr-digital/python/digital/utils/tagged_streams.py b/gr-digital/python/digital/utils/tagged_streams.py
index 49c1058981..9876519bd0 100644
--- a/gr-digital/python/digital/utils/tagged_streams.py
+++ b/gr-digital/python/digital/utils/tagged_streams.py
@@ -13,6 +13,7 @@
from gnuradio import gr
import pmt
+
def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
tags = []
assert(len(offsets) == len(lengths))
@@ -24,26 +25,31 @@ def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
tags.append(tag)
return tags
+
def string_to_vector(string):
v = []
for s in string:
v.append(ord(s))
return v
+
def strings_to_vectors(strings, lengthtagname):
vs = [string_to_vector(string) for string in strings]
return packets_to_vectors(vs, lengthtagname)
+
def vector_to_string(v):
s = []
for d in v:
s.append(chr(d))
return ''.join(s)
+
def vectors_to_strings(data, tags, lengthtagname):
packets = vectors_to_packets(data, tags, lengthtagname)
return [vector_to_string(packet) for packet in packets]
+
def count_bursts(data, tags, lengthtagname, vlen=1):
lengthtags = [t for t in tags
if pmt.symbol_to_string(t.key) == lengthtagname]
@@ -53,7 +59,7 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(lengthtagname, tag.offset))
- lengths[tag.offset] = pmt.to_long(tag.value)*vlen
+ lengths[tag.offset] = pmt.to_long(tag.value) * vlen
in_burst = False
in_packet = False
packet_length = None
@@ -62,7 +68,8 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
for pos in range(len(data)):
if pos in lengths:
if in_packet:
- print("Got tag at pos {0} current packet_pos is {1}".format(pos, packet_pos))
+ print("Got tag at pos {0} current packet_pos is {1}".format(
+ pos, packet_pos))
raise Exception("Received packet tag while in packet.")
packet_pos = -1
packet_length = lengths[pos]
@@ -74,11 +81,12 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
in_burst = False
if in_packet:
packet_pos += 1
- if packet_pos == packet_length-1:
+ if packet_pos == packet_length - 1:
in_packet = False
packet_pos = None
return burst_count
+
def vectors_to_packets(data, tags, lengthtagname, vlen=1):
lengthtags = [t for t in tags
if pmt.symbol_to_string(t.key) == lengthtagname]
@@ -88,7 +96,7 @@ def vectors_to_packets(data, tags, lengthtagname, vlen=1):
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(lengthtagname, tag.offset))
- lengths[tag.offset] = pmt.to_long(tag.value)*vlen
+ lengths[tag.offset] = pmt.to_long(tag.value) * vlen
if 0 not in lengths:
raise ValueError("There is no tag with key {0} and an offset of 0"
.format(lengthtagname))
@@ -102,12 +110,13 @@ def vectors_to_packets(data, tags, lengthtagname, vlen=1):
length = lengths[pos]
if length == 0:
raise ValueError("Packets cannot have zero length.")
- if pos+length > len(data):
+ if pos + length > len(data):
raise ValueError("The final packet is incomplete.")
- packets.append(data[pos: pos+length])
+ packets.append(data[pos: pos + length])
pos += length
return packets
+
def packets_to_vectors(packets, lengthtagname, vlen=1):
tags = []
data = []