diff options
author | Josh Morman <jmorman@gnuradio.org> | 2021-11-24 12:25:09 -0500 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-11-24 14:41:53 -0500 |
commit | 3cecf9268b15bb0e9e2fedaeb2528bd3038beee6 (patch) | |
tree | ba0bb14fa03664b4e69e0dcb944b96ae298264d8 /gr-digital/python/digital | |
parent | 310b7f0cc25edf90c0bc6754031d763119b0ae97 (diff) |
digital: pep8 formatting
Signed-off-by: Josh Morman <jmorman@gnuradio.org>
Diffstat (limited to 'gr-digital/python/digital')
31 files changed, 638 insertions, 434 deletions
diff --git a/gr-digital/python/digital/__init__.py b/gr-digital/python/digital/__init__.py index 64281eb2bb..100766f22c 100644 --- a/gr-digital/python/digital/__init__.py +++ b/gr-digital/python/digital/__init__.py @@ -22,11 +22,7 @@ except ImportError: __path__.append(os.path.join(dirname, "bindings")) from .digital_python import * -from gnuradio import analog # just need analog for the enum -class gmskmod_bc(cpmmod_bc): - def __init__(self, samples_per_sym = 2, L = 4, beta = 0.3): - cpmmod_bc.__init__(self, analog.cpm.GAUSSIAN, 0.5, samples_per_sym, L, beta) - +from gnuradio import analog # just need analog for the enum from .psk import * from .qam import * from .qamlike import * @@ -42,4 +38,8 @@ from .psk_constellations import * from .qam_constellations import * from .constellation_map_generator import * -from . import packet_utils + +class gmskmod_bc(cpmmod_bc): + def __init__(self, samples_per_sym=2, L=4, beta=0.3): + cpmmod_bc.__init__(self, analog.cpm.GAUSSIAN, + 0.5, samples_per_sym, L, beta) diff --git a/gr-digital/python/digital/bpsk.py b/gr-digital/python/digital/bpsk.py index 7d5b8cde7a..e6d507e2f7 100644 --- a/gr-digital/python/digital/bpsk.py +++ b/gr-digital/python/digital/bpsk.py @@ -25,6 +25,7 @@ from . import modulation_utils # BPSK constellation # ///////////////////////////////////////////////////////////////////////////// + def bpsk_constellation(): return digital_python.constellation_bpsk() @@ -32,6 +33,7 @@ def bpsk_constellation(): # DBPSK constellation # ///////////////////////////////////////////////////////////////////////////// + def dbpsk_constellation(): return digital_python.constellation_dbpsk() diff --git a/gr-digital/python/digital/constellation_map_generator.py b/gr-digital/python/digital/constellation_map_generator.py index e6e660392c..e3d6f532fd 100644 --- a/gr-digital/python/digital/constellation_map_generator.py +++ b/gr-digital/python/digital/constellation_map_generator.py @@ -35,7 +35,7 @@ def constellation_map_generator(basis_cpoints, basis_symbols, k, pi): symbols = list() for s_i in s: tmp = 0 - for i,p in enumerate(pi): + for i, p in enumerate(pi): bit = (s_i >> i) & 0x1 tmp |= bit << p symbols.append(tmp ^ k) diff --git a/gr-digital/python/digital/cpm.py b/gr-digital/python/digital/cpm.py index d21ae55a87..ca32fb7036 100644 --- a/gr-digital/python/digital/cpm.py +++ b/gr-digital/python/digital/cpm.py @@ -29,7 +29,7 @@ _def_samples_per_symbol = 2 _def_bits_per_symbol = 1 _def_h_numerator = 1 _def_h_denominator = 2 -_def_cpm_type = 0 # 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL +_def_cpm_type = 0 # 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL _def_bt = 0.35 _def_symbols_per_pulse = 1 _def_generic_taps = numpy.empty(1) @@ -80,62 +80,69 @@ class cpm_mod(gr.hier_block2): log=_def_log): gr.hier_block2.__init__(self, "cpm_mod", - gr.io_signature(1, 1, gr.sizeof_char), # Input signature - gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature self._samples_per_symbol = samples_per_symbol self._bits_per_symbol = bits_per_symbol self._h_numerator = h_numerator self._h_denominator = h_denominator self._cpm_type = cpm_type - self._bt=bt - if cpm_type == 0 or cpm_type == 2 or cpm_type == 3: # CPFSK, RC, Generic + self._bt = bt + if cpm_type == 0 or cpm_type == 2 or cpm_type == 3: # CPFSK, RC, Generic self._symbols_per_pulse = symbols_per_pulse - elif cpm_type == 1: # GMSK + elif cpm_type == 1: # GMSK self._symbols_per_pulse = 4 else: - raise TypeError("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,)) + raise TypeError( + "cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,)) - self._generic_taps=numpy.array(generic_taps) + self._generic_taps = numpy.array(generic_taps) if samples_per_symbol < 2: - raise TypeError("samples_per_symbol must be >= 2, is %r" % (samples_per_symbol,)) + raise TypeError("samples_per_symbol must be >= 2, is %r" % + (samples_per_symbol,)) self.nsymbols = 2**bits_per_symbol - self.sym_alphabet = numpy.arange(-(self.nsymbols-1),self.nsymbols,2).tolist() - + self.sym_alphabet = numpy.arange(-(self.nsymbols - 1), + self.nsymbols, 2).tolist() self.ntaps = int(self._symbols_per_pulse * samples_per_symbol) sensitivity = 2 * pi * h_numerator / h_denominator / samples_per_symbol # Unpack Bytes into bits_per_symbol groups - self.B2s = blocks.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST) - + self.B2s = blocks.packed_to_unpacked_bb( + bits_per_symbol, gr.GR_MSB_FIRST) # Turn it into symmetric PAM data. - self.pam = digital_python.chunks_to_symbols_bf(self.sym_alphabet,1) + self.pam = digital_python.chunks_to_symbols_bf(self.sym_alphabet, 1) # Generate pulse (sum of taps = samples_per_symbol/2) - if cpm_type == 0: # CPFSK - self.taps= (1.0/self._symbols_per_pulse/2,) * self.ntaps - elif cpm_type == 1: # GMSK + if cpm_type == 0: # CPFSK + self.taps = (1.0 / self._symbols_per_pulse / 2,) * self.ntaps + elif cpm_type == 1: # GMSK gaussian_taps = filter.firdes.gaussian( 1.0 / 2, # gain samples_per_symbol, # symbol_rate bt, # bandwidth * symbol time self.ntaps # number of taps - ) + ) sqwave = (1,) * samples_per_symbol # rectangular window - self.taps = numpy.convolve(numpy.array(gaussian_taps),numpy.array(sqwave)) - elif cpm_type == 2: # Raised Cosine + self.taps = numpy.convolve(numpy.array( + gaussian_taps), numpy.array(sqwave)) + elif cpm_type == 2: # Raised Cosine # generalize it for arbitrary roll-off factor - self.taps = (1-numpy.cos(2*pi*numpy.arange(0 / self.ntaps/samples_per_symbol/self._symbols_per_pulse)),(2*self._symbols_per_pulse)) - elif cpm_type == 3: # Generic CPM + self.taps = (1 - numpy.cos(2 * pi * numpy.arange(0 / self.ntaps / + samples_per_symbol / self._symbols_per_pulse)), (2 * self._symbols_per_pulse)) + elif cpm_type == 3: # Generic CPM self.taps = generic_taps else: - raise TypeError("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,)) + raise TypeError( + "cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,)) - self.filter = filter.pfb.arb_resampler_fff(samples_per_symbol, self.taps) + self.filter = filter.pfb.arb_resampler_fff( + samples_per_symbol, self.taps) # FM modulation self.fmmod = analog.frequency_modulator_fc(sensitivity) @@ -170,19 +177,17 @@ class cpm_mod(gr.hier_block2): def symbols_per_pulse(self): return self._symbols_per_pulse - def _print_verbage(self): print("Samples per symbol = %d" % self._samples_per_symbol) print("Bits per symbol = %d" % self._bits_per_symbol) - print("h = " , self._h_numerator , " / " , self._h_denominator) - print("Symbol alphabet = " , self.sym_alphabet) + print("h = ", self._h_numerator, " / ", self._h_denominator) + print("Symbol alphabet = ", self.sym_alphabet) print("Symbols per pulse = %d" % self._symbols_per_pulse) - print("taps = " , self.taps) + print("taps = ", self.taps) print("CPM type = %d" % self._cpm_type) if self._cpm_type == 1: - print("Gaussian filter BT = %.2f" % self._bt) - + print("Gaussian filter BT = %.2f" % self._bt) def _setup_logging(self): print("Modulation logging turned on.") @@ -209,8 +214,7 @@ class cpm_mod(gr.hier_block2): Given command line options, create dictionary suitable for passing to __init__ """ return modulation_utils.extract_kwargs_from_options(cpm_mod.__init__, - ('self',), options) - + ('self',), options) # ///////////////////////////////////////////////////////////////////////////// @@ -219,7 +223,6 @@ class cpm_mod(gr.hier_block2): # # Not yet implemented # - # # Add these to the mod/demod registry # diff --git a/gr-digital/python/digital/generic_mod_demod.py b/gr-digital/python/digital/generic_mod_demod.py index a40a7fda5f..2db9e4482a 100644 --- a/gr-digital/python/digital/generic_mod_demod.py +++ b/gr-digital/python/digital/generic_mod_demod.py @@ -29,17 +29,18 @@ _def_log = False _def_truncate = False # Frequency correction -_def_freq_bw = 2*math.pi/100.0 +_def_freq_bw = 2 * math.pi / 100.0 # Symbol timing recovery -_def_timing_bw = 2*math.pi/100.0 +_def_timing_bw = 2 * math.pi / 100.0 _def_timing_max_dev = 1.5 # Fine frequency / Phase correction -_def_phase_bw = 2*math.pi/100.0 +_def_phase_bw = 2 * math.pi / 100.0 # Number of points in constellation _def_constellation_points = 16 # Whether differential coding is used. _def_differential = False + def add_common_options(parser): """ Sets options common to both modulator and demodulator. @@ -55,7 +56,7 @@ def add_common_options(parser): parser.add_option("", "--mod-code", type="choice", choices=mod_codes.codes, default=mod_codes.NO_CODE, help="Select modulation code from: %s [default=%%default]" - % (', '.join(mod_codes.codes),)) + % (', '.join(mod_codes.codes),)) parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw, help="set RRC excess bandwidth factor [default=%default]") @@ -92,8 +93,9 @@ class generic_mod(gr.hier_block2): truncate=_def_truncate): gr.hier_block2.__init__(self, "generic_mod", - gr.io_signature(1, 1, gr.sizeof_char), # Input signature - gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature self._constellation = constellation self._samples_per_symbol = samples_per_symbol @@ -103,31 +105,36 @@ class generic_mod(gr.hier_block2): self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code() if self._samples_per_symbol < 2: - raise TypeError("sps must be >= 2, is %f" % self._samples_per_symbol) + raise TypeError("sps must be >= 2, is %f" % + self._samples_per_symbol) - arity = pow(2,self.bits_per_symbol()) + arity = pow(2, self.bits_per_symbol()) # turn bytes into k-bit vectors self.bytes2chunks = \ - blocks.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST) + blocks.packed_to_unpacked_bb( + self.bits_per_symbol(), gr.GR_MSB_FIRST) if self.pre_diff_code: - self.symbol_mapper = digital.map_bb(self._constellation.pre_diff_code()) + self.symbol_mapper = digital.map_bb( + self._constellation.pre_diff_code()) if differential: self.diffenc = digital.diff_encoder_bb(arity) - self.chunks2symbols = digital.chunks_to_symbols_bc(self._constellation.points()) + self.chunks2symbols = digital.chunks_to_symbols_bc( + self._constellation.points()) # pulse shaping filter nfilts = 32 ntaps_per_filt = 11 - ntaps = nfilts * ntaps_per_filt * int(self._samples_per_symbol) # make nfilts filters of ntaps each + # make nfilts filters of ntaps each + ntaps = nfilts * ntaps_per_filt * int(self._samples_per_symbol) self.rrc_taps = filter.firdes.root_raised_cosine( nfilts, # gain nfilts, # sampling rate based on 32 filters in resampler 1.0, # symbol rate - self._excess_bw, # excess bandwidth (roll-off factor) + self._excess_bw, # excess bandwidth (roll-off factor) ntaps) self.rrc_filter = filter.pfb_arb_resampler_ccf(self._samples_per_symbol, self.rrc_taps) @@ -135,8 +142,10 @@ class generic_mod(gr.hier_block2): # Remove the filter transient at the beginning of the transmission if truncate: fsps = float(self._samples_per_symbol) - len_filt_delay = int((ntaps_per_filt*fsps*fsps-fsps)/2.0) # Length of delay through rrc filter - self.skiphead = blocks.skiphead(gr.sizeof_gr_complex*1, len_filt_delay) + # Length of delay through rrc filter + len_filt_delay = int((ntaps_per_filt * fsps * fsps - fsps) / 2.0) + self.skiphead = blocks.skiphead( + gr.sizeof_gr_complex * 1, len_filt_delay) # Connect self._blocks = [self, self.bytes2chunks] @@ -145,7 +154,7 @@ class generic_mod(gr.hier_block2): if differential: self._blocks.append(self.diffenc) self._blocks += [self.chunks2symbols, self.rrc_filter] - + if truncate: self._blocks.append(self.skiphead) self._blocks.append(self) @@ -157,7 +166,6 @@ class generic_mod(gr.hier_block2): if log: self._setup_logging() - def samples_per_symbol(self): return self._samples_per_symbol @@ -176,8 +184,7 @@ class generic_mod(gr.hier_block2): Given command line options, create dictionary suitable for passing to __init__ """ return extract_kwargs_from_options_for_class(cls, options) - extract_kwargs_from_options=classmethod(extract_kwargs_from_options) - + extract_kwargs_from_options = classmethod(extract_kwargs_from_options) def _print_verbage(self): print("\nModulator:") @@ -239,7 +246,8 @@ class generic_demod(gr.hier_block2): log=_def_log): gr.hier_block2.__init__(self, "generic_demod", - gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_gr_complex), gr.io_signature(1, 1, gr.sizeof_char)) # Output signature self._constellation = constellation @@ -248,19 +256,20 @@ class generic_demod(gr.hier_block2): self._phase_bw = phase_bw self._freq_bw = freq_bw self._timing_bw = timing_bw - self._timing_max_dev= _def_timing_max_dev + self._timing_max_dev = _def_timing_max_dev self._differential = differential if self._samples_per_symbol < 2: - raise TypeError("sps must be >= 2, is %d" % self._samples_per_symbol) + raise TypeError("sps must be >= 2, is %d" % + self._samples_per_symbol) # Only apply a predifferential coding if the constellation also supports it. self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code() - arity = pow(2,self.bits_per_symbol()) + arity = pow(2, self.bits_per_symbol()) nfilts = 32 - ntaps = 11 * int(self._samples_per_symbol*nfilts) + ntaps = 11 * int(self._samples_per_symbol * nfilts) # Automatic gain control self.agc = analog.agc2_cc(0.6e-1, 1e-3, 1, 1) @@ -271,11 +280,11 @@ class generic_demod(gr.hier_block2): fll_ntaps, self._freq_bw) # symbol timing recovery with RRC data filter - taps = filter.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol, + taps = filter.firdes.root_raised_cosine(nfilts, nfilts * self._samples_per_symbol, 1.0, self._excess_bw, ntaps) self.time_recov = digital.pfb_clock_sync_ccf(self._samples_per_symbol, self._timing_bw, taps, - nfilts, nfilts//2, self._timing_max_dev) + nfilts, nfilts // 2, self._timing_max_dev) fmin = -0.25 fmax = 0.25 @@ -318,7 +327,7 @@ class generic_demod(gr.hier_block2): def _print_verbage(self): print("\nDemodulator:") - print("bits per symbol: %d" % self.bits_per_symbol()) + print("bits per symbol: %d" % self.bits_per_symbol()) print("RRC roll-off factor: %.2f" % self._excess_bw) print("FLL bandwidth: %.2e" % self._freq_bw) print("Timing bandwidth: %.2e" % self._timing_bw) @@ -381,7 +390,8 @@ class generic_demod(gr.hier_block2): Given command line options, create dictionary suitable for passing to __init__ """ return extract_kwargs_from_options_for_class(cls, options) - extract_kwargs_from_options=classmethod(extract_kwargs_from_options) + extract_kwargs_from_options = classmethod(extract_kwargs_from_options) + shared_demod_args = """ samples_per_symbol: samples per baud >= 2 (float) excess_bw: Root-raised cosine filter excess bandwidth (float) diff --git a/gr-digital/python/digital/gfsk.py b/gr-digital/python/digital/gfsk.py index 2cc8f69c97..63583cc1e0 100644 --- a/gr-digital/python/digital/gfsk.py +++ b/gr-digital/python/digital/gfsk.py @@ -70,8 +70,9 @@ class gfsk_mod(gr.hier_block2): do_unpack=_def_do_unpack): gr.hier_block2.__init__(self, "gfsk_mod", - gr.io_signature(1, 1, gr.sizeof_char), # Input signature - gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature samples_per_symbol = int(samples_per_symbol) self._samples_per_symbol = samples_per_symbol @@ -79,12 +80,11 @@ class gfsk_mod(gr.hier_block2): self._differential = False if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2: - raise TypeError("samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,)) + raise TypeError( + "samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,)) ntaps = 4 * samples_per_symbol # up to 3 bits in filter at once - #sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2 - - + # sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2 # Turn it into NRZ data. #self.nrz = digital.bytes_to_syms() @@ -93,15 +93,17 @@ class gfsk_mod(gr.hier_block2): # Form Gaussian filter # Generate Gaussian response (Needs to be convolved with window below). self.gaussian_taps = filter.firdes.gaussian( - 1.0, # gain - samples_per_symbol, # symbol_rate - bt, # bandwidth * symbol time - ntaps # number of taps - ) + 1.0, # gain + samples_per_symbol, # symbol_rate + bt, # bandwidth * symbol time + ntaps # number of taps + ) self.sqwave = (1,) * samples_per_symbol # rectangular window - self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave)) - self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps) + self.taps = numpy.convolve(numpy.array( + self.gaussian_taps), numpy.array(self.sqwave)) + self.gaussian_filter = filter.interp_fir_filter_fff( + samples_per_symbol, self.taps) # FM modulation self.fmmod = analog.frequency_modulator_fc(sensitivity) @@ -118,22 +120,24 @@ class gfsk_mod(gr.hier_block2): # Connect & Initialize base class if do_unpack: self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) - self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self) + self.connect(self, self.unpack, self.nrz, + self.gaussian_filter, self.fmmod, self.amp, self) else: - self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self) + self.connect(self, self.nrz, self.gaussian_filter, + self.fmmod, self.amp, self) def samples_per_symbol(self): return self._samples_per_symbol @staticmethod - def bits_per_symbol(self=None): # staticmethod that's also callable on an instance + # staticmethod that's also callable on an instance + def bits_per_symbol(self=None): return 1 def _print_verbage(self): print("bits per symbol = %d" % self.bits_per_symbol()) print("Gaussian filter bt = %.2f" % self._bt) - def _setup_logging(self): print("Modulation logging turned on.") self.connect(self.nrz, @@ -197,7 +201,8 @@ class gfsk_demod(gr.hier_block2): log=_def_log): gr.hier_block2.__init__(self, "gfsk_demod", - gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_gr_complex), gr.io_signature(1, 1, gr.sizeof_char)) # Output signature self._samples_per_symbol = samples_per_symbol @@ -207,17 +212,20 @@ class gfsk_demod(gr.hier_block2): self._differential = False if samples_per_symbol < 2: - raise TypeError("samples_per_symbol >= 2, is %f" % samples_per_symbol) + raise TypeError("samples_per_symbol >= 2, is %f" % + samples_per_symbol) - self._omega = samples_per_symbol*(1+self._freq_error) + self._omega = samples_per_symbol * (1 + self._freq_error) if not self._gain_mu: self._gain_mu = 0.175 - self._gain_omega = .25 * self._gain_mu * self._gain_mu # critically damped + self._gain_omega = .25 * self._gain_mu * \ + self._gain_mu # critically damped self._damping = 1.0 - self._loop_bw = -ln((self._gain_mu + self._gain_omega)/(-2.0) + 1) # critically damped + # critically damped + self._loop_bw = -ln((self._gain_mu + self._gain_omega) / (-2.0) + 1) self._max_dev = self._omega_relative_limit * self._samples_per_symbol # Demodulate FM @@ -227,16 +235,16 @@ class gfsk_demod(gr.hier_block2): # the clock recovery block tracks the symbol clock and resamples as needed. # the output of the block is a stream of soft symbols (float) self.clock_recovery = self.digital_symbol_sync_xx_0 = digital.symbol_sync_ff(digital.TED_MUELLER_AND_MULLER, - self._omega, - self._loop_bw, - self._damping, - 1.0, # Expected TED gain - self._max_dev, - 1, # Output sps - digital.constellation_bpsk().base(), - digital.IR_MMSE_8TAP, - 128, - []) + self._omega, + self._loop_bw, + self._damping, + 1.0, # Expected TED gain + self._max_dev, + 1, # Output sps + digital.constellation_bpsk().base(), + digital.IR_MMSE_8TAP, + 128, + []) # slice the floats at 0, outputting 1 bit (the LSB of the output byte) per sample self.slicer = digital.binary_slicer_fb() @@ -248,7 +256,8 @@ class gfsk_demod(gr.hier_block2): self._setup_logging() # Connect & Initialize base class - self.connect(self, self.fmdemod, self.clock_recovery, self.slicer, self) + self.connect(self, self.fmdemod, + self.clock_recovery, self.slicer, self) def samples_per_symbol(self): return self._samples_per_symbol @@ -262,18 +271,18 @@ class gfsk_demod(gr.hier_block2): print("Symbol Sync M&M omega = %f" % self._omega) print("Symbol Sync M&M gain mu = %f" % self._gain_mu) print("M&M clock recovery mu (Unused) = %f" % self._mu) - print("Symbol Sync M&M omega rel. limit = %f" % self._omega_relative_limit) + print("Symbol Sync M&M omega rel. limit = %f" % + self._omega_relative_limit) print("frequency error = %f" % self._freq_error) - def _setup_logging(self): print("Demodulation logging turned on.") self.connect(self.fmdemod, - blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) + blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) self.connect(self.clock_recovery, - blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) + blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) self.connect(self.slicer, - blocks.file_sink(gr.sizeof_char, "slicer.dat")) + blocks.file_sink(gr.sizeof_char, "slicer.dat")) @staticmethod def add_options(parser): diff --git a/gr-digital/python/digital/gmsk.py b/gr-digital/python/digital/gmsk.py index 68127119e6..106e588782 100644 --- a/gr-digital/python/digital/gmsk.py +++ b/gr-digital/python/digital/gmsk.py @@ -68,8 +68,9 @@ class gmsk_mod(gr.hier_block2): do_unpack=_def_do_unpack): gr.hier_block2.__init__(self, "gmsk_mod", - gr.io_signature(1, 1, gr.sizeof_char), # Input signature - gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature samples_per_symbol = int(samples_per_symbol) self._samples_per_symbol = samples_per_symbol @@ -77,10 +78,13 @@ class gmsk_mod(gr.hier_block2): self._differential = False if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2: - raise TypeError("samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,)) + raise TypeError( + "samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,)) - ntaps = 4 * samples_per_symbol # up to 3 bits in filter at once - sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2 + # up to 3 bits in filter at once + ntaps = 4 * samples_per_symbol + # phase change per bit = pi / 2 + sensitivity = (pi / 2) / samples_per_symbol # Turn it into NRZ data. #self.nrz = digital.bytes_to_syms() @@ -89,15 +93,17 @@ class gmsk_mod(gr.hier_block2): # Form Gaussian filter # Generate Gaussian response (Needs to be convolved with window below). self.gaussian_taps = filter.firdes.gaussian( - 1, # gain - samples_per_symbol, # symbol_rate - bt, # bandwidth * symbol time - ntaps # number of taps - ) + 1, # gain + samples_per_symbol, # symbol_rate + bt, # bandwidth * symbol time + ntaps # number of taps + ) self.sqwave = (1,) * samples_per_symbol # rectangular window - self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave)) - self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps) + self.taps = numpy.convolve(numpy.array( + self.gaussian_taps), numpy.array(self.sqwave)) + self.gaussian_filter = filter.interp_fir_filter_fff( + samples_per_symbol, self.taps) # FM modulation self.fmmod = analog.frequency_modulator_fc(sensitivity) @@ -111,22 +117,24 @@ class gmsk_mod(gr.hier_block2): # Connect & Initialize base class if do_unpack: self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) - self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self) + self.connect(self, self.unpack, self.nrz, + self.gaussian_filter, self.fmmod, self) else: - self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self) + self.connect(self, self.nrz, self.gaussian_filter, + self.fmmod, self) def samples_per_symbol(self): return self._samples_per_symbol @staticmethod - def bits_per_symbol(self=None): # staticmethod that's also callable on an instance + # staticmethod that's also callable on an instance + def bits_per_symbol(self=None): return 1 def _print_verbage(self): print("bits per symbol = %d" % self.bits_per_symbol()) print("Gaussian filter bt = %.2f" % self._bt) - def _setup_logging(self): print("Modulation logging turned on.") self.connect(self.nrz, @@ -185,7 +193,8 @@ class gmsk_demod(gr.hier_block2): log=_def_log): gr.hier_block2.__init__(self, "gmsk_demod", - gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature + # Input signature + gr.io_signature(1, 1, gr.sizeof_gr_complex), gr.io_signature(1, 1, gr.sizeof_char)) # Output signature self._samples_per_symbol = samples_per_symbol @@ -195,17 +204,20 @@ class gmsk_demod(gr.hier_block2): self._differential = False if samples_per_symbol < 2: - raise TypeError("samples_per_symbol >= 2, is %f" % samples_per_symbol) + raise TypeError("samples_per_symbol >= 2, is %f" % + samples_per_symbol) - self._omega = samples_per_symbol*(1+self._freq_error) + self._omega = samples_per_symbol * (1 + self._freq_error) if not self._gain_mu: self._gain_mu = 0.175 - self._gain_omega = .25 * self._gain_mu * self._gain_mu # critically damped + self._gain_omega = .25 * self._gain_mu * \ + self._gain_mu # critically damped self._damping = 1.0 - self._loop_bw = -ln((self._gain_mu + self._gain_omega)/(-2.0) + 1) # critically damped + # critically damped + self._loop_bw = -ln((self._gain_mu + self._gain_omega) / (-2.0) + 1) self._max_dev = self._omega_relative_limit * self._samples_per_symbol # Demodulate FM @@ -215,16 +227,16 @@ class gmsk_demod(gr.hier_block2): # the clock recovery block tracks the symbol clock and resamples as needed. # the output of the block is a stream of soft symbols (float) self.clock_recovery = self.digital_symbol_sync_xx_0 = digital.symbol_sync_ff(digital.TED_MUELLER_AND_MULLER, - self._omega, - self._loop_bw, - self._damping, - 1.0, # Expected TED gain - self._max_dev, - 1, # Output sps - digital.constellation_bpsk().base(), - digital.IR_MMSE_8TAP, - 128, - []) + self._omega, + self._loop_bw, + self._damping, + 1.0, # Expected TED gain + self._max_dev, + 1, # Output sps + digital.constellation_bpsk().base(), + digital.IR_MMSE_8TAP, + 128, + []) # slice the floats at 0, outputting 1 bit (the LSB of the output byte) per sample self.slicer = digital.binary_slicer_fb() @@ -236,7 +248,8 @@ class gmsk_demod(gr.hier_block2): self._setup_logging() # Connect & Initialize base class - self.connect(self, self.fmdemod, self.clock_recovery, self.slicer, self) + self.connect(self, self.fmdemod, + self.clock_recovery, self.slicer, self) def samples_per_symbol(self): return self._samples_per_symbol @@ -250,18 +263,18 @@ class gmsk_demod(gr.hier_block2): print("Symbol Sync M&M omega = %f" % self._omega) print("Symbol Sync M&M gain mu = %f" % self._gain_mu) print("M&M clock recovery mu (Unused) = %f" % self._mu) - print("Symbol Sync M&M omega rel. limit = %f" % self._omega_relative_limit) + print("Symbol Sync M&M omega rel. limit = %f" % + self._omega_relative_limit) print("frequency error = %f" % self._freq_error) - def _setup_logging(self): print("Demodulation logging turned on.") self.connect(self.fmdemod, - blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) + blocks.file_sink(gr.sizeof_float, "fmdemod.dat")) self.connect(self.clock_recovery, - blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) + blocks.file_sink(gr.sizeof_float, "clock_recovery.dat")) self.connect(self.slicer, - blocks.file_sink(gr.sizeof_char, "slicer.dat")) + blocks.file_sink(gr.sizeof_char, "slicer.dat")) @staticmethod def add_options(parser): diff --git a/gr-digital/python/digital/modulation_utils.py b/gr-digital/python/digital/modulation_utils.py index 59809ae1d8..1d57e1c9ba 100644 --- a/gr-digital/python/digital/modulation_utils.py +++ b/gr-digital/python/digital/modulation_utils.py @@ -1,8 +1,8 @@ # # Copyright 2010 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # @@ -17,9 +17,11 @@ import inspect # Type 1 modulators accept a stream of bytes on their input and produce complex baseband output _type_1_modulators = {} + def type_1_mods(): return _type_1_modulators + def add_type_1_mod(name, mod_class): _type_1_modulators[name] = mod_class @@ -29,18 +31,23 @@ def add_type_1_mod(name, mod_class): # to resolve phase or polarity ambiguities. _type_1_demodulators = {} + def type_1_demods(): return _type_1_demodulators + def add_type_1_demod(name, demod_class): _type_1_demodulators[name] = demod_class + # Also record the constellation making functions of the modulations _type_1_constellations = {} + def type_1_constellations(): return _type_1_constellations + def add_type_1_constellation(name, constellation): _type_1_constellations[name] = constellation @@ -67,7 +74,7 @@ def extract_kwargs_from_options(function, excluded_args, options): excluded_args: function arguments that are NOT to be added to the dictionary (sequence of strings) options: result of command argument parsing (optparse.Values) """ - + # Try this in C++ ;) spec = inspect.getfullargspec(function) d = {} @@ -77,6 +84,7 @@ def extract_kwargs_from_options(function, excluded_args, options): d[kw] = getattr(options, kw) return d + def extract_kwargs_from_options_for_class(cls, options): """ Given command line options, create dictionary suitable for passing to __init__ diff --git a/gr-digital/python/digital/ofdm_txrx.py b/gr-digital/python/digital/ofdm_txrx.py index 9d083c5fe2..60109ceaaa 100644 --- a/gr-digital/python/digital/ofdm_txrx.py +++ b/gr-digital/python/digital/ofdm_txrx.py @@ -31,15 +31,20 @@ _def_frame_length_tag_key = "frame_length" _def_packet_length_tag_key = "packet_length" _def_packet_num_tag_key = "packet_num" # Data and pilot carriers are same as in 802.11a -_def_occupied_carriers = (list(range(-26, -21)) + list(range(-20, -7)) + list(range(-6, 0)) + list(range(1, 7)) + list(range(8, 21)) + list(range(22, 27)),) -_def_pilot_carriers=((-21, -7, 7, 21,),) +_def_occupied_carriers = (list(range(-26, -21)) + list(range(-20, -7)) + list( + range(-6, 0)) + list(range(1, 7)) + list(range(8, 21)) + list(range(22, 27)),) +_def_pilot_carriers = ((-21, -7, 7, 21,),) _pilot_sym_scramble_seq = ( - 1,1,1,1, -1,-1,-1,1, -1,-1,-1,-1, 1,1,-1,1, -1,-1,1,1, -1,1,1,-1, 1,1,1,1, 1,1,-1,1, - 1,1,-1,1, 1,-1,-1,1, 1,1,-1,1, -1,-1,-1,1, -1,1,-1,-1, 1,-1,-1,1, 1,1,1,1, -1,-1,1,1, - -1,-1,1,-1, 1,-1,1,1, -1,-1,-1,1, 1,-1,-1,-1, -1,1,-1,-1, 1,-1,1,1, 1,1,-1,1, -1,1,-1,1, - -1,-1,-1,-1, -1,1,-1,1, 1,-1,1,-1, 1,1,1,-1, -1,1,-1,-1, -1,1,1,1, -1,-1,-1,-1, -1,-1,-1 + 1, 1, 1, 1, -1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, 1, - + 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, 1, 1, 1, 1, -1, 1, + 1, 1, -1, 1, 1, -1, -1, 1, 1, 1, -1, 1, -1, -1, -1, 1, - + 1, 1, -1, -1, 1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1, + -1, -1, 1, -1, 1, -1, 1, 1, -1, -1, -1, 1, 1, -1, -1, - + 1, -1, 1, -1, -1, 1, -1, 1, 1, 1, 1, -1, 1, -1, 1, -1, 1, + -1, -1, -1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, -1, - + 1, 1, -1, -1, -1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1 ) -_def_pilot_symbols= tuple([(x, x, x, -x) for x in _pilot_sym_scramble_seq]) +_def_pilot_symbols = tuple([(x, x, x, -x) for x in _pilot_sym_scramble_seq]) _seq_seed = 42 @@ -52,6 +57,7 @@ def _get_active_carriers(fft_len, occupied_carriers, pilot_carriers): active_carriers.append(carrier) return active_carriers + def _make_sync_word1(fft_len, occupied_carriers, pilot_carriers): """ Creates a random sync sequence for fine frequency offset and timing estimation. This is the first of typically two sync preamble symbols @@ -64,31 +70,37 @@ def _make_sync_word1(fft_len, occupied_carriers, pilot_carriers): Carrier 0 (DC carrier) is always zero. If used, carrier 1 is non-zero. This means the sync algorithm has to check on odd carriers! """ - active_carriers = _get_active_carriers(fft_len, occupied_carriers, pilot_carriers) + active_carriers = _get_active_carriers( + fft_len, occupied_carriers, pilot_carriers) numpy.random.seed(_seq_seed) bpsk = {0: numpy.sqrt(2), 1: -numpy.sqrt(2)} - sw1 = [bpsk[numpy.random.randint(2)] if x in active_carriers and x % 2 else 0 for x in range(fft_len)] + sw1 = [bpsk[numpy.random.randint( + 2)] if x in active_carriers and x % 2 else 0 for x in range(fft_len)] return numpy.fft.fftshift(sw1) + def _make_sync_word2(fft_len, occupied_carriers, pilot_carriers): """ Creates a random sync sequence for coarse frequency offset and channel estimation. This is the second of typically two sync preamble symbols for the Schmidl & Cox sync algorithm. Symbols are always BPSK symbols. """ - active_carriers = _get_active_carriers(fft_len, occupied_carriers, pilot_carriers) + active_carriers = _get_active_carriers( + fft_len, occupied_carriers, pilot_carriers) numpy.random.seed(_seq_seed) bpsk = {0: 1, 1: -1} - sw2 = [bpsk[numpy.random.randint(2)] if x in active_carriers else 0 for x in range(fft_len)] + sw2 = [bpsk[numpy.random.randint( + 2)] if x in active_carriers else 0 for x in range(fft_len)] sw2[0] = 0j return numpy.fft.fftshift(sw2) + def _get_constellation(bps): """ Returns a modulator block for a given number of bits per symbol """ constellation = { - 1: digital.constellation_bpsk(), - 2: digital.constellation_qpsk(), - 3: digital.constellation_8psk() + 1: digital.constellation_bpsk(), + 2: digital.constellation_qpsk(), + 3: digital.constellation_8psk() } try: return constellation[bps] @@ -96,6 +108,7 @@ def _get_constellation(bps): print('Modulation not supported.') exit(1) + class ofdm_tx(gr.hier_block2): """Hierarchical block for OFDM modulation. @@ -122,6 +135,7 @@ class ofdm_tx(gr.hier_block2): scramble_bits: Activates the scramblers (set this to True unless debugging) """ + def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len, packet_length_tag_key=_def_packet_length_tag_key, occupied_carriers=_def_occupied_carriers, @@ -136,75 +150,82 @@ class ofdm_tx(gr.hier_block2): scramble_bits=False ): gr.hier_block2.__init__(self, "ofdm_tx", - gr.io_signature(1, 1, gr.sizeof_char), - gr.io_signature(1, 1, gr.sizeof_gr_complex)) + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_gr_complex)) ### Param init / sanity check ######################################## - self.fft_len = fft_len - self.cp_len = cp_len + self.fft_len = fft_len + self.cp_len = cp_len self.packet_length_tag_key = packet_length_tag_key self.occupied_carriers = occupied_carriers - self.pilot_carriers = pilot_carriers - self.pilot_symbols = pilot_symbols - self.bps_header = bps_header - self.bps_payload = bps_payload + self.pilot_carriers = pilot_carriers + self.pilot_symbols = pilot_symbols + self.bps_header = bps_header + self.bps_payload = bps_payload self.sync_word1 = sync_word1 if sync_word1 is None: - self.sync_word1 = _make_sync_word1(fft_len, occupied_carriers, pilot_carriers) + self.sync_word1 = _make_sync_word1( + fft_len, occupied_carriers, pilot_carriers) else: if len(sync_word1) != self.fft_len: - raise ValueError("Length of sync sequence(s) must be FFT length.") - self.sync_words = [self.sync_word1,] + raise ValueError( + "Length of sync sequence(s) must be FFT length.") + self.sync_words = [self.sync_word1, ] if sync_word2 is None: - self.sync_word2 = _make_sync_word2(fft_len, occupied_carriers, pilot_carriers) + self.sync_word2 = _make_sync_word2( + fft_len, occupied_carriers, pilot_carriers) else: self.sync_word2 = sync_word2 if len(self.sync_word2): if len(self.sync_word2) != fft_len: - raise ValueError("Length of sync sequence(s) must be FFT length.") + raise ValueError( + "Length of sync sequence(s) must be FFT length.") self.sync_word2 = list(self.sync_word2) self.sync_words.append(self.sync_word2) if scramble_bits: self.scramble_seed = 0x7f else: - self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros + self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros ### Header modulation ################################################ crc = digital.crc32_bb(False, self.packet_length_tag_key) - header_constellation = _get_constellation(bps_header) - header_mod = digital.chunks_to_symbols_bc(header_constellation.points()) + header_constellation = _get_constellation(bps_header) + header_mod = digital.chunks_to_symbols_bc( + header_constellation.points()) formatter_object = digital.packet_header_ofdm( occupied_carriers=occupied_carriers, n_syms=1, bits_per_header_sym=self.bps_header, bits_per_payload_sym=self.bps_payload, scramble_header=scramble_bits ) - header_gen = digital.packet_headergenerator_bb(formatter_object.base(), self.packet_length_tag_key) + header_gen = digital.packet_headergenerator_bb( + formatter_object.base(), self.packet_length_tag_key) header_payload_mux = blocks.tagged_stream_mux( - itemsize=gr.sizeof_gr_complex*1, - lengthtagname=self.packet_length_tag_key, - tag_preserve_head_pos=1 # Head tags on the payload stream stay on the head + itemsize=gr.sizeof_gr_complex * 1, + lengthtagname=self.packet_length_tag_key, + tag_preserve_head_pos=1 # Head tags on the payload stream stay on the head ) self.connect( - self, - crc, - header_gen, - header_mod, - (header_payload_mux, 0) + self, + crc, + header_gen, + header_mod, + (header_payload_mux, 0) ) if debug_log: self.connect(header_gen, blocks.file_sink(1, 'tx-hdr.dat')) ### Payload modulation ############################################### payload_constellation = _get_constellation(bps_payload) - payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points()) + payload_mod = digital.chunks_to_symbols_bc( + payload_constellation.points()) payload_scrambler = digital.additive_scrambler_bb( 0x8a, self.scramble_seed, 7, - 0, # Don't reset after fixed length (let the reset tag do that) - bits_per_byte=8, # This is before unpacking + 0, # Don't reset after fixed length (let the reset tag do that) + bits_per_byte=8, # This is before unpacking reset_tag_key=self.packet_length_tag_key ) payload_unpack = blocks.repack_bits_bb( - 8, # Unpack 8 bits per byte + 8, # Unpack 8 bits per byte bps_payload, self.packet_length_tag_key ) @@ -225,21 +246,24 @@ class ofdm_tx(gr.hier_block2): len_tag_key=self.packet_length_tag_key ) ffter = fft.fft_vcc( - self.fft_len, - False, # Inverse FFT - (), # No window - True # Shift + self.fft_len, + False, # Inverse FFT + (), # No window + True # Shift ) cyclic_prefixer = digital.ofdm_cyclic_prefixer( self.fft_len, - self.fft_len+self.cp_len, + self.fft_len + self.cp_len, rolloff, self.packet_length_tag_key ) - self.connect(header_payload_mux, allocator, ffter, cyclic_prefixer, self) + self.connect(header_payload_mux, allocator, + ffter, cyclic_prefixer, self) if debug_log: - self.connect(allocator, blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'tx-post-allocator.dat')) - self.connect(cyclic_prefixer, blocks.file_sink(gr.sizeof_gr_complex, 'tx-signal.dat')) + self.connect(allocator, blocks.file_sink( + gr.sizeof_gr_complex * fft_len, 'tx-post-allocator.dat')) + self.connect(cyclic_prefixer, blocks.file_sink( + gr.sizeof_gr_complex, 'tx-signal.dat')) class ofdm_rx(gr.hier_block2): @@ -265,6 +289,7 @@ class ofdm_rx(gr.hier_block2): | entirely. Also used for coarse frequency offset and | channel estimation. """ + def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len, frame_length_tag_key=_def_frame_length_tag_key, packet_length_tag_key=_def_packet_length_tag_key, @@ -280,60 +305,69 @@ class ofdm_rx(gr.hier_block2): scramble_bits=False ): gr.hier_block2.__init__(self, "ofdm_rx", - gr.io_signature(1, 1, gr.sizeof_gr_complex), - gr.io_signature(1, 1, gr.sizeof_char)) + gr.io_signature(1, 1, gr.sizeof_gr_complex), + gr.io_signature(1, 1, gr.sizeof_char)) ### Param init / sanity check ######################################## - self.fft_len = fft_len - self.cp_len = cp_len - self.frame_length_tag_key = frame_length_tag_key - self.packet_length_tag_key = packet_length_tag_key + self.fft_len = fft_len + self.cp_len = cp_len + self.frame_length_tag_key = frame_length_tag_key + self.packet_length_tag_key = packet_length_tag_key self.occupied_carriers = occupied_carriers - self.bps_header = bps_header - self.bps_payload = bps_payload + self.bps_header = bps_header + self.bps_payload = bps_payload n_sync_words = 1 if sync_word1 is None: - self.sync_word1 = _make_sync_word1(fft_len, occupied_carriers, pilot_carriers) + self.sync_word1 = _make_sync_word1( + fft_len, occupied_carriers, pilot_carriers) else: if len(sync_word1) != self.fft_len: - raise ValueError("Length of sync sequence(s) must be FFT length.") + raise ValueError( + "Length of sync sequence(s) must be FFT length.") self.sync_word1 = sync_word1 self.sync_word2 = () if sync_word2 is None: - self.sync_word2 = _make_sync_word2(fft_len, occupied_carriers, pilot_carriers) + self.sync_word2 = _make_sync_word2( + fft_len, occupied_carriers, pilot_carriers) n_sync_words = 2 elif len(sync_word2): if len(sync_word2) != fft_len: - raise ValueError("Length of sync sequence(s) must be FFT length.") + raise ValueError( + "Length of sync sequence(s) must be FFT length.") self.sync_word2 = sync_word2 n_sync_words = 2 if scramble_bits: self.scramble_seed = 0x7f else: - self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros + self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros ### Sync ############################################################ sync_detect = digital.ofdm_sync_sc_cfb(fft_len, cp_len) - delay = blocks.delay(gr.sizeof_gr_complex, fft_len+cp_len) + delay = blocks.delay(gr.sizeof_gr_complex, fft_len + cp_len) oscillator = analog.frequency_modulator_fc(-2.0 / fft_len) mixer = blocks.multiply_cc() hpd = digital.header_payload_demux( - n_sync_words+1, # Number of OFDM symbols before payload (sync + 1 sym header) + # Number of OFDM symbols before payload (sync + 1 sym header) + n_sync_words + 1, fft_len, cp_len, # FFT length, guard interval - frame_length_tag_key, # Frame length tag key + frame_length_tag_key, # Frame length tag key "", # We're not using trigger tags - True # One output item is one OFDM symbol (False would output complex scalars) + # One output item is one OFDM symbol (False would output complex scalars) + True ) self.connect(self, sync_detect) self.connect(self, delay, (mixer, 0), (hpd, 0)) self.connect((sync_detect, 0), oscillator, (mixer, 1)) self.connect((sync_detect, 1), (hpd, 1)) if debug_log: - self.connect((sync_detect, 0), blocks.file_sink(gr.sizeof_float, 'freq-offset.dat')) - self.connect((sync_detect, 1), blocks.file_sink(gr.sizeof_char, 'sync-detect.dat')) + self.connect((sync_detect, 0), blocks.file_sink( + gr.sizeof_float, 'freq-offset.dat')) + self.connect((sync_detect, 1), blocks.file_sink( + gr.sizeof_char, 'sync-detect.dat')) ### Header demodulation ############################################## - header_fft = fft.fft_vcc(self.fft_len, True, (), True) - chanest = digital.ofdm_chanest_vcvc(self.sync_word1, self.sync_word2, 1) + header_fft = fft.fft_vcc(self.fft_len, True, (), True) + chanest = digital.ofdm_chanest_vcvc( + self.sync_word1, self.sync_word2, 1) header_constellation = _get_constellation(bps_header) - header_equalizer = digital.ofdm_equalizer_simpledfe( + header_equalizer = digital.ofdm_equalizer_simpledfe( fft_len, header_constellation.base(), occupied_carriers, @@ -342,93 +376,108 @@ class ofdm_rx(gr.hier_block2): symbols_skipped=0, ) header_eq = digital.ofdm_frame_equalizer_vcvc( - header_equalizer.base(), - cp_len, - self.frame_length_tag_key, - True, - 1 # Header is 1 symbol long + header_equalizer.base(), + cp_len, + self.frame_length_tag_key, + True, + 1 # Header is 1 symbol long ) header_serializer = digital.ofdm_serializer_vcc( - fft_len, occupied_carriers, - self.frame_length_tag_key + fft_len, occupied_carriers, + self.frame_length_tag_key ) - header_demod = digital.constellation_decoder_cb(header_constellation.base()) + header_demod = digital.constellation_decoder_cb( + header_constellation.base()) header_formatter = digital.packet_header_ofdm( - occupied_carriers, 1, - packet_length_tag_key, - frame_length_tag_key, - packet_num_tag_key, - bps_header, - bps_payload, - scramble_header=scramble_bits + occupied_carriers, 1, + packet_length_tag_key, + frame_length_tag_key, + packet_num_tag_key, + bps_header, + bps_payload, + scramble_header=scramble_bits ) - header_parser = digital.packet_headerparser_b(header_formatter.formatter()) + header_parser = digital.packet_headerparser_b( + header_formatter.formatter()) self.connect( - (hpd, 0), - header_fft, - chanest, - header_eq, - header_serializer, - header_demod, - header_parser + (hpd, 0), + header_fft, + chanest, + header_eq, + header_serializer, + header_demod, + header_parser ) self.msg_connect(header_parser, "header_data", hpd, "header_data") if debug_log: - self.connect((chanest, 1), blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'channel-estimate.dat')) - self.connect((chanest, 0), blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest.dat')) - self.connect((chanest, 0), blocks.tag_debug(gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest')) - self.connect(header_eq, blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'post-hdr-eq.dat')) - self.connect(header_serializer, blocks.file_sink(gr.sizeof_gr_complex, 'post-hdr-serializer.dat')) + self.connect((chanest, 1), blocks.file_sink( + gr.sizeof_gr_complex * fft_len, 'channel-estimate.dat')) + self.connect((chanest, 0), blocks.file_sink( + gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest.dat')) + self.connect((chanest, 0), blocks.tag_debug( + gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest')) + self.connect(header_eq, blocks.file_sink( + gr.sizeof_gr_complex * fft_len, 'post-hdr-eq.dat')) + self.connect(header_serializer, blocks.file_sink( + gr.sizeof_gr_complex, 'post-hdr-serializer.dat')) ### Payload demod #################################################### payload_fft = fft.fft_vcc(self.fft_len, True, (), True) payload_constellation = _get_constellation(bps_payload) payload_equalizer = digital.ofdm_equalizer_simpledfe( - fft_len, - payload_constellation.base(), - occupied_carriers, - pilot_carriers, - pilot_symbols, - symbols_skipped=1, # (that was already in the header) - alpha=0.1 + fft_len, + payload_constellation.base(), + occupied_carriers, + pilot_carriers, + pilot_symbols, + symbols_skipped=1, # (that was already in the header) + alpha=0.1 ) payload_eq = digital.ofdm_frame_equalizer_vcvc( - payload_equalizer.base(), - cp_len, - self.frame_length_tag_key + payload_equalizer.base(), + cp_len, + self.frame_length_tag_key ) payload_serializer = digital.ofdm_serializer_vcc( - fft_len, occupied_carriers, - self.frame_length_tag_key, - self.packet_length_tag_key, - 1 # Skip 1 symbol (that was already in the header) + fft_len, occupied_carriers, + self.frame_length_tag_key, + self.packet_length_tag_key, + 1 # Skip 1 symbol (that was already in the header) ) - payload_demod = digital.constellation_decoder_cb(payload_constellation.base()) + payload_demod = digital.constellation_decoder_cb( + payload_constellation.base()) self.payload_descrambler = digital.additive_scrambler_bb( 0x8a, self.scramble_seed, 7, - 0, # Don't reset after fixed length - bits_per_byte=8, # This is after packing + 0, # Don't reset after fixed length + bits_per_byte=8, # This is after packing reset_tag_key=self.packet_length_tag_key ) - payload_pack = blocks.repack_bits_bb(bps_payload, 8, self.packet_length_tag_key, True) + payload_pack = blocks.repack_bits_bb( + bps_payload, 8, self.packet_length_tag_key, True) self.crc = digital.crc32_bb(True, self.packet_length_tag_key) self.connect( - (hpd, 1), - payload_fft, - payload_eq, - payload_serializer, - payload_demod, - payload_pack, - self.payload_descrambler, - self.crc, - self + (hpd, 1), + payload_fft, + payload_eq, + payload_serializer, + payload_demod, + payload_pack, + self.payload_descrambler, + self.crc, + self ) if debug_log: - self.connect((hpd, 1), blocks.tag_debug(gr.sizeof_gr_complex*fft_len, 'post-hpd')) - self.connect(payload_fft, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-fft.dat')) - self.connect(payload_eq, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-eq.dat')) - self.connect(payload_serializer, blocks.file_sink(gr.sizeof_gr_complex, 'post-payload-serializer.dat')) - self.connect(payload_demod, blocks.file_sink(1, 'post-payload-demod.dat')) - self.connect(payload_pack, blocks.file_sink(1, 'post-payload-pack.dat')) - self.connect(self.crc, blocks.file_sink(1, 'post-payload-crc.dat')) + self.connect((hpd, 1), blocks.tag_debug( + gr.sizeof_gr_complex * fft_len, 'post-hpd')) + self.connect(payload_fft, blocks.file_sink( + gr.sizeof_gr_complex * fft_len, 'post-payload-fft.dat')) + self.connect(payload_eq, blocks.file_sink( + gr.sizeof_gr_complex * fft_len, 'post-payload-eq.dat')) + self.connect(payload_serializer, blocks.file_sink( + gr.sizeof_gr_complex, 'post-payload-serializer.dat')) + self.connect(payload_demod, blocks.file_sink( + 1, 'post-payload-demod.dat')) + self.connect(payload_pack, blocks.file_sink( + 1, 'post-payload-pack.dat')) + self.connect(self.crc, blocks.file_sink(1, 'post-payload-crc.dat')) diff --git a/gr-digital/python/digital/psk.py b/gr-digital/python/digital/psk.py index b04d66f75f..4866d54ede 100644 --- a/gr-digital/python/digital/psk.py +++ b/gr-digital/python/digital/psk.py @@ -28,6 +28,7 @@ _def_mod_code = mod_codes.GRAY_CODE # Default use of differential encoding _def_differential = True + def create_encodings(mod_code, arity, differential): post_diff_code = None if mod_code not in mod_codes.codes: @@ -43,13 +44,15 @@ def create_encodings(mod_code, arity, differential): pre_diff_code = [] post_diff_code = None else: - raise ValueError('That modulation code is not implemented for this constellation.') + raise ValueError( + 'That modulation code is not implemented for this constellation.') return (pre_diff_code, post_diff_code) # ///////////////////////////////////////////////////////////////////////////// # PSK constellation # ///////////////////////////////////////////////////////////////////////////// + def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code, differential=_def_differential): """ @@ -57,8 +60,9 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code, """ k = log(m) / log(2.0) if (k != int(k)): - raise Exception('Number of constellation points must be a power of two.') - points = [exp(2*pi*(0+1j)*i/m) for i in range(0,m)] + raise Exception( + 'Number of constellation points must be a power of two.') + points = [exp(2 * pi * (0 + 1j) * i / m) for i in range(0, m)] pre_diff_code, post_diff_code = create_encodings(mod_code, m, differential) if post_diff_code is not None: inverse_post_diff_code = mod_codes.invert_code(post_diff_code) @@ -70,6 +74,7 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code, # PSK modulator # ///////////////////////////////////////////////////////////////////////////// + class psk_mod(generic_mod): """ Hierarchical block for RRC-filtered PSK modulation. @@ -95,14 +100,17 @@ class psk_mod(generic_mod): mod_code=_def_mod_code, differential=_def_differential, *args, **kwargs): - constellation = psk_constellation(constellation_points, mod_code, differential) - super(psk_mod, self).__init__(constellation, differential, *args, **kwargs) + constellation = psk_constellation( + constellation_points, mod_code, differential) + super(psk_mod, self).__init__( + constellation, differential, *args, **kwargs) # ///////////////////////////////////////////////////////////////////////////// # PSK demodulator # # ///////////////////////////////////////////////////////////////////////////// + class psk_demod(generic_demod): """ @@ -120,12 +128,16 @@ class psk_demod(generic_demod): """ # See generic_mod for additional arguments __doc__ += shared_mod_args + def __init__(self, constellation_points=_def_constellation_points, mod_code=_def_mod_code, differential=_def_differential, *args, **kwargs): - constellation = psk_constellation(constellation_points, mod_code, differential) - super(psk_demod, self).__init__(constellation, differential, *args, **kwargs) + constellation = psk_constellation( + constellation_points, mod_code, differential) + super(psk_demod, self).__init__( + constellation, differential, *args, **kwargs) + # # Add these to the mod/demod registry diff --git a/gr-digital/python/digital/psk_constellations.py b/gr-digital/python/digital/psk_constellations.py index c3809e7ade..520eafebe2 100644 --- a/gr-digital/python/digital/psk_constellations.py +++ b/gr-digital/python/digital/psk_constellations.py @@ -52,6 +52,7 @@ rotations are based off of. # BPSK Constellation Mappings + def psk_2_0x0(): ''' 0 | 1 @@ -59,9 +60,12 @@ def psk_2_0x0(): const_points = [-1, 1] symbols = [0, 1] return (const_points, symbols) + + psk_2 = psk_2_0x0 # Basic BPSK rotation psk_2_0 = psk_2 # First ID for BPSK rotations + def psk_2_0x1(): ''' 1 | 0 @@ -69,6 +73,8 @@ def psk_2_0x1(): const_points = [-1, 1] symbols = [1, 0] return (const_points, symbols) + + psk_2_1 = psk_2_0x1 @@ -81,18 +87,23 @@ def sd_psk_2_0x0(x, Es=1): 0 | 1 ''' x_re = x.real - dist = Es*numpy.sqrt(2) - return [dist*x_re,] + dist = Es * numpy.sqrt(2) + return [dist * x_re, ] + + sd_psk_2 = sd_psk_2_0x0 # Basic BPSK rotation sd_psk_2_0 = sd_psk_2 # First ID for BPSK rotations + def sd_psk_2_0x1(x, Es=1): ''' 1 | 0 ''' - x_re = [x.real,] - dist = Es*numpy.sqrt(2) - return -dist*x_re + x_re = [x.real, ] + dist = Es * numpy.sqrt(2) + return -dist * x_re + + sd_psk_2_1 = sd_psk_2_0x1 @@ -106,13 +117,16 @@ def psk_4_0x0_0_1(): | ------- | 00 | 01 ''' - const_points = [-1-1j, 1-1j, - -1+1j, 1+1j] + const_points = [-1 - 1j, 1 - 1j, + -1 + 1j, 1 + 1j] symbols = [0, 1, 2, 3] return (const_points, symbols) + + psk_4 = psk_4_0x0_0_1 psk_4_0 = psk_4 + def psk_4_0x1_0_1(): ''' | 11 | 10 @@ -122,8 +136,11 @@ def psk_4_0x1_0_1(): k = 0x1 pi = [0, 1] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) + + psk_4_1 = psk_4_0x1_0_1 + def psk_4_0x2_0_1(): ''' | 00 | 01 @@ -133,8 +150,11 @@ def psk_4_0x2_0_1(): k = 0x2 pi = [0, 1] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) + + psk_4_2 = psk_4_0x2_0_1 + def psk_4_0x3_0_1(): ''' | 01 | 00 @@ -144,8 +164,11 @@ def psk_4_0x3_0_1(): k = 0x3 pi = [0, 1] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) + + psk_4_3 = psk_4_0x3_0_1 + def psk_4_0x0_1_0(): ''' | 01 | 11 @@ -155,8 +178,11 @@ def psk_4_0x0_1_0(): k = 0x0 pi = [1, 0] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) + + psk_4_4 = psk_4_0x0_1_0 + def psk_4_0x1_1_0(): ''' | 00 | 10 @@ -166,8 +192,11 @@ def psk_4_0x1_1_0(): k = 0x1 pi = [1, 0] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) + + psk_4_5 = psk_4_0x1_1_0 + def psk_4_0x2_1_0(): ''' | 11 | 01 @@ -177,8 +206,11 @@ def psk_4_0x2_1_0(): k = 0x2 pi = [1, 0] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) + + psk_4_6 = psk_4_0x2_1_0 + def psk_4_0x3_1_0(): ''' | 10 | 00 @@ -188,9 +220,10 @@ def psk_4_0x3_1_0(): k = 0x3 pi = [1, 0] return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi) -psk_4_7 = psk_4_0x3_1_0 +psk_4_7 = psk_4_0x3_1_0 + ############################################################ # QPSK Constellation Softbit LUT generators @@ -204,11 +237,14 @@ def sd_psk_4_0x0_0_1(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [dist*x_im, dist*x_re] + dist = Es * numpy.sqrt(2) + return [dist * x_im, dist * x_re] + + sd_psk_4 = sd_psk_4_0x0_0_1 sd_psk_4_0 = sd_psk_4 + def sd_psk_4_0x1_0_1(x, Es=1): ''' | 11 | 10 @@ -217,10 +253,13 @@ def sd_psk_4_0x1_0_1(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [dist*x_im, -dist*x_re] + dist = Es * numpy.sqrt(2) + return [dist * x_im, -dist * x_re] + + sd_psk_4_1 = sd_psk_4_0x1_0_1 + def sd_psk_4_0x2_0_1(x, Es=1): ''' | 00 | 01 @@ -229,10 +268,13 @@ def sd_psk_4_0x2_0_1(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [-dist*x_im, dist*x_re] + dist = Es * numpy.sqrt(2) + return [-dist * x_im, dist * x_re] + + sd_psk_4_2 = sd_psk_4_0x2_0_1 + def sd_psk_4_0x3_0_1(x, Es=1): ''' | 01 | 00 @@ -241,10 +283,13 @@ def sd_psk_4_0x3_0_1(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [-dist*x_im, -dist*x_re] + dist = Es * numpy.sqrt(2) + return [-dist * x_im, -dist * x_re] + + sd_psk_4_3 = sd_psk_4_0x3_0_1 + def sd_psk_4_0x0_1_0(x, Es=1): ''' | 01 | 11 @@ -253,10 +298,13 @@ def sd_psk_4_0x0_1_0(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [dist*x_re, dist*x_im] + dist = Es * numpy.sqrt(2) + return [dist * x_re, dist * x_im] + + sd_psk_4_4 = sd_psk_4_0x0_1_0 + def sd_psk_4_0x1_1_0(x, Es=1): ''' | 00 | 10 @@ -265,8 +313,10 @@ def sd_psk_4_0x1_1_0(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [dist*x_re, -dist*x_im] + dist = Es * numpy.sqrt(2) + return [dist * x_re, -dist * x_im] + + sd_psk_4_5 = sd_psk_4_0x1_1_0 @@ -278,10 +328,13 @@ def sd_psk_4_0x2_1_0(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [-dist*x_re, dist*x_im] + dist = Es * numpy.sqrt(2) + return [-dist * x_re, dist * x_im] + + sd_psk_4_6 = sd_psk_4_0x2_1_0 + def sd_psk_4_0x3_1_0(x, Es=1): ''' | 10 | 00 @@ -290,6 +343,8 @@ def sd_psk_4_0x3_1_0(x, Es=1): ''' x_re = x.real x_im = x.imag - dist = Es*numpy.sqrt(2) - return [-dist*x_re, -dist*x_im] + dist = Es * numpy.sqrt(2) + return [-dist * x_re, -dist * x_im] + + sd_psk_4_7 = sd_psk_4_0x3_1_0 diff --git a/gr-digital/python/digital/qa_burst_shaper.py b/gr-digital/python/digital/qa_burst_shaper.py index 40ac210ec2..b1fc793924 100644 --- a/gr-digital/python/digital/qa_burst_shaper.py +++ b/gr-digital/python/digital/qa_burst_shaper.py @@ -85,7 +85,8 @@ class qa_burst_shaper (gr_unittest.TestCase): -4.0 * np.ones(5, dtype=complex))) tags = (make_length_tag(0, length),) expected = np.concatenate((np.zeros(prepad, dtype=complex), window[0:5], - np.ones(length - len(window), dtype=complex), + np.ones(length - len(window), + dtype=complex), window[5:10], np.zeros(postpad, dtype=complex))) etag = make_length_tag(0, length + prepad + postpad) @@ -239,8 +240,10 @@ class qa_burst_shaper (gr_unittest.TestCase): window = np.concatenate((-2.0 * np.ones(5), -4.0 * np.ones(5))) tags = (make_length_tag(0, length1), make_length_tag(length1, length2)) expected = np.concatenate((np.zeros(prepad), window[0:5], - np.ones(length1 - len(window)), window[5:10], - np.zeros(postpad + prepad), -1.0 * window[0:5], + np.ones(length1 - len(window) + ), window[5:10], + np.zeros(postpad + prepad), - + 1.0 * window[0:5], -1.0 * np.ones(length2 - len(window)), -1.0 * window[5:10], np.zeros(postpad))) etags = (make_length_tag(0, length1 + prepad + postpad), @@ -335,8 +338,10 @@ class qa_burst_shaper (gr_unittest.TestCase): make_tag(tag4_offset, 'body', pmt.intern('tag4')), make_tag(tag5_offset, 'body', pmt.intern('tag5'))) expected = np.concatenate((np.zeros(prepad), window[0:5], - np.ones(length1 - len(window)), window[5:10], - np.zeros(postpad + prepad), -1.0 * window[0:5], + np.ones(length1 - len(window) + ), window[5:10], + np.zeros(postpad + prepad), - + 1.0 * window[0:5], -1.0 * np.ones(length2 - len(window)), -1.0 * window[5:10], np.zeros(postpad))) elentag1_offset = 0 diff --git a/gr-digital/python/digital/qa_constellation.py b/gr-digital/python/digital/qa_constellation.py index 7c9984405f..7345f782a2 100644 --- a/gr-digital/python/digital/qa_constellation.py +++ b/gr-digital/python/digital/qa_constellation.py @@ -230,7 +230,6 @@ class test_constellation(gr_unittest.TestCase): data[first:], msg=msg) - def test_soft_qpsk_gen(self): prec = 8 constel, code = digital.psk_4_0() diff --git a/gr-digital/python/digital/qa_constellation_encoder_bc.py b/gr-digital/python/digital/qa_constellation_encoder_bc.py index 2c6d587dac..ef1991bf80 100644 --- a/gr-digital/python/digital/qa_constellation_encoder_bc.py +++ b/gr-digital/python/digital/qa_constellation_encoder_bc.py @@ -12,6 +12,7 @@ from gnuradio import gr, gr_unittest, digital, blocks import numpy as np + class test_constellation_encoder(gr_unittest.TestCase): def setUp(self): @@ -23,8 +24,8 @@ class test_constellation_encoder(gr_unittest.TestCase): def test_constellation_encoder_bc_bpsk(self): cnst = digital.constellation_bpsk() - src_data = (1, 1, 0, 0, - 1, 0, 1) + src_data = (1, 1, 0, 0, + 1, 0, 1) const_map = [-1.0, 1.0] expected_result = [const_map[x] for x in src_data] @@ -43,8 +44,8 @@ class test_constellation_encoder(gr_unittest.TestCase): def test_constellation_encoder_bc_qpsk(self): cnst = digital.constellation_qpsk() - src_data = (3, 1, 0, 2, - 3, 2, 1) + src_data = (3, 1, 0, 2, + 3, 2, 1) expected_result = [cnst.points()[x] for x in src_data] src = blocks.vector_source_b(src_data) op = digital.constellation_encoder_bc(cnst.base()) @@ -59,7 +60,6 @@ class test_constellation_encoder(gr_unittest.TestCase): # print "expected result", expected_result self.assertFloatTuplesAlmostEqual(expected_result, actual_result) - def test_constellation_encoder_bc_qpsk_random(self): cnst = digital.constellation_qpsk() src_data = np.random.randint(0, 4, size=20000) @@ -77,5 +77,6 @@ class test_constellation_encoder(gr_unittest.TestCase): # print "expected result", expected_result self.assertFloatTuplesAlmostEqual(expected_result, actual_result) + if __name__ == '__main__': gr_unittest.run(test_constellation_encoder) diff --git a/gr-digital/python/digital/qa_decision_feedback_equalizer.py b/gr-digital/python/digital/qa_decision_feedback_equalizer.py index 1d4587a6f0..f915272e52 100755 --- a/gr-digital/python/digital/qa_decision_feedback_equalizer.py +++ b/gr-digital/python/digital/qa_decision_feedback_equalizer.py @@ -116,7 +116,7 @@ class qa_decision_feedback_equalizer(gr_unittest.TestCase): 1, 1, alg, - True, + True, [], '') dst = blocks.vector_sink_c() diff --git a/gr-digital/python/digital/qa_glfsr_source.py b/gr-digital/python/digital/qa_glfsr_source.py index 7fde483ed3..23382605c6 100644 --- a/gr-digital/python/digital/qa_glfsr_source.py +++ b/gr-digital/python/digital/qa_glfsr_source.py @@ -60,7 +60,7 @@ class test_glfsr_source(gr_unittest.TestCase): lambda: digital.glfsr_source_f(0)) self.assertRaises(RuntimeError, lambda: digital.glfsr_source_f(65)) - + def test_005_correlation_f(self): for degree in range( 1, 11): # Higher degrees take too long to correlate diff --git a/gr-digital/python/digital/qa_lfsr.py b/gr-digital/python/digital/qa_lfsr.py index e0db6a382b..8caa0dbada 100644 --- a/gr-digital/python/digital/qa_lfsr.py +++ b/gr-digital/python/digital/qa_lfsr.py @@ -14,6 +14,7 @@ import numpy as np from gnuradio import gr, gr_unittest, digital from gnuradio.digital.utils import lfsr_args + class test_lfsr(gr_unittest.TestCase): def setUp(self): @@ -33,19 +34,20 @@ class test_lfsr(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5) def test_lfsr_002(self): - l = digital.lfsr(*lfsr_args(0b1,5,3,0)) - result_data = [l.next_bit() for _ in range(2*(2**5-1))] - + l = digital.lfsr(*lfsr_args(0b1, 5, 3, 0)) + result_data = [l.next_bit() for _ in range(2 * (2**5 - 1))] + expected_result = [1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0, - 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0]*2 + 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0] * 2 self.assertEqual(expected_result, result_data) - seq1 = [l.next_bit() for _ in range(2**5-1)] - seq2 = [l.next_bit() for _ in range(2**5-1)] - self.assertEqual(seq1,seq2) + seq1 = [l.next_bit() for _ in range(2**5 - 1)] + seq2 = [l.next_bit() for _ in range(2**5 - 1)] + self.assertEqual(seq1, seq2) + + res = (np.convolve(seq1, [1, 0, 1, 0, 0, 1]) % 2) + self.assertTrue(sum(res[5:-5]) == 0, "LRS not generated properly") - res = (np.convolve(seq1,[1,0,1,0,0,1])%2) - self.assertTrue(sum(res[5:-5])==0,"LRS not generated properly") if __name__ == '__main__': gr_unittest.run(test_lfsr) diff --git a/gr-digital/python/digital/qa_linear_equalizer.py b/gr-digital/python/digital/qa_linear_equalizer.py index 64fd84d2f3..39fea0ef49 100755 --- a/gr-digital/python/digital/qa_linear_equalizer.py +++ b/gr-digital/python/digital/qa_linear_equalizer.py @@ -111,7 +111,7 @@ class qa_linear_equalizer(gr_unittest.TestCase): 4, 1, alg, - True, + True, [], '') dst = blocks.vector_sink_c() diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py index a1f8cef6e9..7d5f79781c 100644 --- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py +++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py @@ -303,7 +303,8 @@ class qa_ofdm_chanest_vcvc (gr_unittest.TestCase): for x in range(fft_len)] src = blocks.vector_source_c(tx_data, False, fft_len) chan = blocks.multiply_const_vcc(channel) - noise = blocks.vector_source_c(numpy.random.normal(0,wgn_amplitude,(len(tx_data),)), False, fft_len) + noise = blocks.vector_source_c(numpy.random.normal( + 0, wgn_amplitude, (len(tx_data),)), False, fft_len) add = blocks.add_cc(fft_len) chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1) sink = blocks.vector_sink_c(fft_len) @@ -325,7 +326,8 @@ class qa_ofdm_chanest_vcvc (gr_unittest.TestCase): shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset) for i in range(fft_len): if shifted_carrier_mask[i] and channel_est[i]: - self.assertAlmostEqual(channel[i], channel_est[i], places=0) + self.assertAlmostEqual( + channel[i], channel_est[i], places=0) rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real return carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat)) bit_errors = 0 diff --git a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py index 254479363e..10b484e839 100644 --- a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py +++ b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py @@ -106,7 +106,8 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase): for _ in range(n_bursts): gap = [0, ] * random.randint(0, 2 * fft_len) tx_signal += gap + \ - make_bpsk_burst(fft_len, cp_len, fft_len * random.randint(5, 23)) + make_bpsk_burst(fft_len, cp_len, fft_len * + random.randint(5, 23)) # Very loose definition of SNR here snr = 20 # dB sigma = 10**(-snr / 10) diff --git a/gr-digital/python/digital/qa_scrambler.py b/gr-digital/python/digital/qa_scrambler.py index 7d87f87b6c..b350d9e1b7 100644 --- a/gr-digital/python/digital/qa_scrambler.py +++ b/gr-digital/python/digital/qa_scrambler.py @@ -26,6 +26,7 @@ def additive_scramble_lfsr(mask, seed, reglen, bpb, data): out.append(d ^ scramble_word) return out + class test_scrambler(gr_unittest.TestCase): def setUp(self): @@ -34,33 +35,35 @@ class test_scrambler(gr_unittest.TestCase): def tearDown(self): self.tb = None - def test_lfsr_002(self): - _a = lfsr_args(1,51,3,0) + _a = lfsr_args(1, 51, 3, 0) l = digital.lfsr(*_a) seq = [l.next_bit() for _ in range(2**10)] - reg = np.zeros(52,np.int8) - reg[::-1][(51,3,0),] = 1 - res = (np.convolve(seq,reg)%2) - self.assertTrue(sum(res[52:-52])==0,"LRS not generated properly") + reg = np.zeros(52, np.int8) + reg[::-1][(51, 3, 0), ] = 1 + res = (np.convolve(seq, reg) % 2) + self.assertTrue(sum(res[52:-52]) == 0, "LRS not generated properly") def test_scrambler_descrambler_001(self): - src_data = np.random.randint(0,2,500,dtype=np.int8) + src_data = np.random.randint(0, 2, 500, dtype=np.int8) src = blocks.vector_source_b(src_data, False) - scrambler = digital.scrambler_bb(*lfsr_args(0b1,7,2,0)) # p(x) = x^7 + x^2 + 1 - descrambler = digital.descrambler_bb(*lfsr_args(0b111,7,2,0)) # we can use any seed here, it is descrambling. + scrambler = digital.scrambler_bb( + *lfsr_args(0b1, 7, 2, 0)) # p(x) = x^7 + x^2 + 1 + # we can use any seed here, it is descrambling. + descrambler = digital.descrambler_bb(*lfsr_args(0b111, 7, 2, 0)) m_tap = blocks.vector_sink_b() dst = blocks.vector_sink_b() self.tb.connect(src, scrambler, descrambler, dst) self.tb.connect(scrambler, m_tap) self.tb.run() - self.assertEqual(src_data[:-7].tolist(), dst.data()[7:]) # skip garbage during synchronization - self.assertEqual(tuple(np.convolve(m_tap.data(),[1,0,0,0,0,1,0,1])%2)[7:-10], - tuple(src_data[:-10])) # manual descrambling test + # skip garbage during synchronization + self.assertEqual(src_data[:-7].tolist(), dst.data()[7:]) + self.assertEqual(tuple(np.convolve(m_tap.data(), [1, 0, 0, 0, 0, 1, 0, 1]) % 2)[7:-10], + tuple(src_data[:-10])) # manual descrambling test def test_scrambler_descrambler_002(self): - _a = lfsr_args(0b1,51,6,0) #p(x) = x^51+x^6+1 - src_data = np.random.randint(0,2,1000,dtype=np.int8) + _a = lfsr_args(0b1, 51, 6, 0) # p(x) = x^51+x^6+1 + src_data = np.random.randint(0, 2, 1000, dtype=np.int8) src = blocks.vector_source_b(src_data, False) scrambler = digital.scrambler_bb(*_a) m_tap = blocks.vector_sink_b() @@ -69,26 +72,29 @@ class test_scrambler(gr_unittest.TestCase): self.tb.connect(src, scrambler, descrambler, dst) self.tb.connect(scrambler, m_tap) self.tb.run() - self.assertTrue(np.all(src_data[:-51]==dst.data()[51:])) # skip garbage during synchronization - reg = np.zeros(52,np.int8) - reg[::-1][(51,6,0),] = 1 - self.assertTrue(np.all( np.convolve(m_tap.data(),reg)[51:-60]%2 - == src_data[:-60])) # manual descrambling test + # skip garbage during synchronization + self.assertTrue(np.all(src_data[:-51] == dst.data()[51:])) + reg = np.zeros(52, np.int8) + reg[::-1][(51, 6, 0), ] = 1 + self.assertTrue(np.all(np.convolve(m_tap.data(), reg)[51:-60] % 2 == + src_data[:-60])) # manual descrambling test def test_scrambler_descrambler_003(self): - src_data = np.random.randint(0,2,1000,dtype=np.int8) + src_data = np.random.randint(0, 2, 1000, dtype=np.int8) src = blocks.vector_source_b(src_data, False) - scrambler = digital.scrambler_bb(*lfsr_args(1,12,10,3,2,0)) # this is the product of the other two - descrambler1 = digital.descrambler_bb(*lfsr_args(1,5,3,0)) - descrambler2 = digital.descrambler_bb(*lfsr_args(1,7,2,0)) + # this is the product of the other two + scrambler = digital.scrambler_bb(*lfsr_args(1, 12, 10, 3, 2, 0)) + descrambler1 = digital.descrambler_bb(*lfsr_args(1, 5, 3, 0)) + descrambler2 = digital.descrambler_bb(*lfsr_args(1, 7, 2, 0)) dst = blocks.vector_sink_b() self.tb.connect(src, scrambler, descrambler1, descrambler2, dst) self.tb.run() - self.assertTrue(np.all(src_data[:-12]==dst.data()[12:])) # skip garbage during synchronization + # skip garbage during synchronization + self.assertTrue(np.all(src_data[:-12] == dst.data()[12:])) def test_additive_scrambler_001(self): - _a = lfsr_args(1,51,3,0) #i p(x) = x^51+x^3+1, seed 0x1 - src_data = np.random.randint(0,2,1000,dtype=np.int8).tolist() + _a = lfsr_args(1, 51, 3, 0) # i p(x) = x^51+x^3+1, seed 0x1 + src_data = np.random.randint(0, 2, 1000, dtype=np.int8).tolist() src = blocks.vector_source_b(src_data, False) scrambler = digital.additive_scrambler_bb(*_a) descrambler = digital.additive_scrambler_bb(*_a) @@ -98,18 +104,18 @@ class test_scrambler(gr_unittest.TestCase): self.assertEqual(tuple(src_data), tuple(dst.data())) def test_additive_scrambler_002(self): - _a = lfsr_args(1,51,3,0) #i p(x) = x^51+x^3+1, seed 0x1 - src_data = [1,]*1000 + _a = lfsr_args(1, 51, 3, 0) # i p(x) = x^51+x^3+1, seed 0x1 + src_data = [1, ] * 1000 src = blocks.vector_source_b(src_data, False) scrambler = digital.additive_scrambler_bb(*_a) dst = blocks.vector_sink_b() self.tb.connect(src, scrambler, dst) self.tb.run() - reg = np.zeros(52,np.int8) - reg[::-1][(51,3,0),] = 1 - res = (np.convolve(dst.data(),reg)%2)[52:-52] - self.assertEqual(len(res), sum(res)) # when convolved with mask, - # after sync, only 1's would be returned. + reg = np.zeros(52, np.int8) + reg[::-1][(51, 3, 0), ] = 1 + res = (np.convolve(dst.data(), reg) % 2)[52:-52] + self.assertEqual(len(res), sum(res)) # when convolved with mask, + # after sync, only 1's would be returned. def test_scrambler_descrambler(self): src_data = [1, ] * 1000 diff --git a/gr-digital/python/digital/qam.py b/gr-digital/python/digital/qam.py index d74866b946..6e8951ef1e 100644 --- a/gr-digital/python/digital/qam.py +++ b/gr-digital/python/digital/qam.py @@ -243,8 +243,8 @@ def large_ampls_to_corners_mapping(side, points, width): # use the center point. c = ((real_x - side / 2.0 + 0.5) * width + (imag_x - side / 2.0 + 0.5) * width * 1j) - if (real_x >= extra_layers and real_x < side - extra_layers - and imag_x >= extra_layers and imag_x < side - extra_layers): + if (real_x >= extra_layers and real_x < side - extra_layers and + imag_x >= extra_layers and imag_x < side - extra_layers): # This is not an edge row/column. Find closest point. index = find_closest_point(c, points) else: diff --git a/gr-digital/python/digital/qpsk.py b/gr-digital/python/digital/qpsk.py index db97d31fc3..22587948fc 100644 --- a/gr-digital/python/digital/qpsk.py +++ b/gr-digital/python/digital/qpsk.py @@ -27,23 +27,28 @@ _def_mod_code = mod_codes.GRAY_CODE # QPSK constellation # ///////////////////////////////////////////////////////////////////////////// + def qpsk_constellation(mod_code=_def_mod_code): """ Creates a QPSK constellation. """ if mod_code != mod_codes.GRAY_CODE: - raise ValueError("This QPSK mod/demod works only for gray-coded constellations.") + raise ValueError( + "This QPSK mod/demod works only for gray-coded constellations.") return digital.constellation_qpsk() # ///////////////////////////////////////////////////////////////////////////// # DQPSK constellation # ///////////////////////////////////////////////////////////////////////////// + def dqpsk_constellation(mod_code=_def_mod_code): if mod_code != mod_codes.GRAY_CODE: - raise ValueError("The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.") + raise ValueError( + "The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.") return digital.constellation_dqpsk() + # # Add these to the mod/demod registry # diff --git a/gr-digital/python/digital/soft_dec_lut_gen.py b/gr-digital/python/digital/soft_dec_lut_gen.py index 898af5320d..13c50251a5 100644 --- a/gr-digital/python/digital/soft_dec_lut_gen.py +++ b/gr-digital/python/digital/soft_dec_lut_gen.py @@ -11,6 +11,7 @@ import numpy + def soft_dec_table_generator(soft_dec_gen, prec, Es=1): ''' | Builds a LUT that is a list of tuples. The tuple represents the @@ -71,7 +72,7 @@ def soft_dec_table_generator(soft_dec_gen, prec, Es=1): ''' npts = int(2.0**prec) - maxd = Es*numpy.sqrt(2.0)/2.0 + maxd = Es * numpy.sqrt(2.0) / 2.0 yrng = numpy.linspace(-maxd, maxd, npts) xrng = numpy.linspace(-maxd, maxd, npts) @@ -83,6 +84,7 @@ def soft_dec_table_generator(soft_dec_gen, prec, Es=1): table.append(decs) return table + def soft_dec_table(constel, symbols, prec, npwr=1): ''' Similar in nature to soft_dec_table_generator above. Instead, this @@ -120,6 +122,7 @@ def soft_dec_table(constel, symbols, prec, npwr=1): table.append(decs) return table + def calc_soft_dec_from_table(sample, table, prec, Es=1.0): ''' Takes in a complex sample and converts it from the coordinates @@ -144,25 +147,26 @@ def calc_soft_dec_from_table(sample, table, prec, Es=1.0): constellation. ''' lut_scale = 2.0**prec - maxd = Es*numpy.sqrt(2.0)/2.0 - scale = (lut_scale) / (2.0*maxd) + maxd = Es * numpy.sqrt(2.0) / 2.0 + scale = (lut_scale) / (2.0 * maxd) - alpha = 0.99 # to keep index within bounds + alpha = 0.99 # to keep index within bounds xre = sample.real xim = sample.imag - xre = ((maxd + min(alpha*maxd, max(-alpha*maxd, xre))) * scale) - xim = ((maxd + min(alpha*maxd, max(-alpha*maxd, xim))) * scale) - index = int(xre) + lut_scale*int(xim) + xre = ((maxd + min(alpha * maxd, max(-alpha * maxd, xre))) * scale) + xim = ((maxd + min(alpha * maxd, max(-alpha * maxd, xim))) * scale) + index = int(xre) + lut_scale * int(xim) max_index = lut_scale**2 while(index >= max_index): - index -= lut_scale; + index -= lut_scale while(index < 0): - index += lut_scale; + index += lut_scale return table[int(index)] + def calc_soft_dec(sample, constel, symbols, npwr=1): ''' This function takes in any consteallation and symbol symbol set @@ -186,8 +190,8 @@ def calc_soft_dec(sample, constel, symbols, npwr=1): M = len(constel) k = int(numpy.log2(M)) - tmp = 2*k*[0] - s = k*[0] + tmp = 2 * k * [0] + s = k * [0] for i in range(M): # Calculate the distance between the sample and the current @@ -200,21 +204,21 @@ def calc_soft_dec(sample, constel, symbols, npwr=1): for j in range(k): # Get the bit at the jth index - mask = 1<<j + mask = 1 << j bit = (symbols[i] & mask) >> j # If the bit is a 0, add to the probability of a zero if(bit == 0): - tmp[2*j+0] += d + tmp[2 * j + 0] += d # else, add to the probability of a one else: - tmp[2*j+1] += d + tmp[2 * j + 1] += d # Calculate the log-likelihood ratio for all bits based on the # probability of ones (tmp[2*i+1]) over the probability of a zero # (tmp[2*i+0]). for i in range(k): - s[k-1-i] = (numpy.log(tmp[2*i+1]) - numpy.log(tmp[2*i+0])) + s[k - 1 - i] = (numpy.log(tmp[2 * i + 1]) - numpy.log(tmp[2 * i + 0])) return s @@ -225,19 +229,19 @@ def show_table(table): pp = "" subi = 1 subj = 0 - for i in reversed(list(range(prec+1))): - if(i == prec//2): - pp += "-----" + prec*((nbits*8)+3)*"-" + "\n" + for i in reversed(list(range(prec + 1))): + if(i == prec // 2): + pp += "-----" + prec * ((nbits * 8) + 3) * "-" + "\n" subi = 0 continue - for j in range(prec+1): - if(j == prec//2): + for j in range(prec + 1): + if(j == prec // 2): pp += "| " subj = 1 else: - item = table[prec*(i-subi) + (j-subj)] + item = table[prec * (i - subi) + (j - subj)] pp += "( " - for t in range(nbits-1, -1, -1): + for t in range(nbits - 1, -1, -1): pp += "{0: .4f} ".format(item[t]) pp += ") " pp += "\n" diff --git a/gr-digital/python/digital/test_soft_decisions.py b/gr-digital/python/digital/test_soft_decisions.py index 59b6edc032..3350103573 100644 --- a/gr-digital/python/digital/test_soft_decisions.py +++ b/gr-digital/python/digital/test_soft_decisions.py @@ -9,13 +9,15 @@ # -import numpy, sys +import numpy +import sys from matplotlib import pyplot from gnuradio import digital from .soft_dec_lut_gen import soft_dec_table, calc_soft_dec_from_table, calc_soft_dec from .psk_constellations import psk_4_0, psk_4_1, psk_4_2, psk_4_3, psk_4_4, psk_4_5, psk_4_6, psk_4_7, sd_psk_4_0, sd_psk_4_1, sd_psk_4_2, sd_psk_4_3, sd_psk_4_4, sd_psk_4_5, sd_psk_4_6, sd_psk_4_7 from .qam_constellations import qam_16_0, sd_qam_16_0 + def test_qpsk(i, sample, prec): qpsk_const_list = [psk_4_0, psk_4_1, psk_4_2, psk_4_3, psk_4_4, psk_4_5, psk_4_6, psk_4_7] @@ -33,7 +35,8 @@ def test_qpsk(i, sample, prec): # Get max energy/symbol in constellation constel = c.points() - Es = max([numpy.sqrt(constel_i.real**2 + constel_i.imag**2) for constel_i in constel]) + Es = max([numpy.sqrt(constel_i.real**2 + constel_i.imag**2) + for constel_i in constel]) #table = soft_dec_table_generator(qpsk_lut_gen, prec, Es) table = soft_dec_table(constel, code, prec) @@ -50,6 +53,7 @@ def test_qpsk(i, sample, prec): return (y_python_gen_calc, y_python_table, y_python_raw_calc, y_cpp_table, y_cpp_raw_calc, constel, code, c) + def test_qam16(i, sample, prec): sample = sample / 1 qam_const_list = [qam_16_0, ] @@ -71,7 +75,7 @@ def test_qam16(i, sample, prec): #table = soft_dec_table_generator(qam_lut_gen, prec, Es) table = soft_dec_table(constel, code, prec, 1) - #c.gen_soft_dec_lut(prec) + # c.gen_soft_dec_lut(prec) c.set_soft_dec_lut(table, prec) y_python_gen_calc = qam_lut_gen(sample, Es) @@ -83,14 +87,15 @@ def test_qam16(i, sample, prec): return (y_python_gen_calc, y_python_table, y_python_raw_calc, y_cpp_table, y_cpp_raw_calc, constel, code, c) + if __name__ == "__main__": index = 0 prec = 8 - x_re = 2*numpy.random.random()-1 - x_im = 2*numpy.random.random()-1 - x = x_re + x_im*1j + x_re = 2 * numpy.random.random() - 1 + x_im = 2 * numpy.random.random() - 1 + x = x_re + x_im * 1j #x = -1 + -0.j if 1: @@ -112,14 +117,14 @@ if __name__ == "__main__": print("C++ Raw calc: ", (y_cpp_raw_calc)) fig = pyplot.figure(1) - sp1 = fig.add_subplot(1,1,1) + sp1 = fig.add_subplot(1, 1, 1) sp1.plot([c.real for c in constel], [c.imag for c in constel], 'bo') sp1.plot(x.real, x.imag, 'ro') sp1.set_xlim([-1.5, 1.5]) sp1.set_ylim([-1.5, 1.5]) fill = int(numpy.log2(len(constel))) - for i,c in enumerate(constel): - sp1.text(1.2*c.real, 1.2*c.imag, bin(code[i])[2:].zfill(fill), + for i, c in enumerate(constel): + sp1.text(1.2 * c.real, 1.2 * c.imag, bin(code[i])[2:].zfill(fill), ha='center', va='center', size=18) pyplot.show() diff --git a/gr-digital/python/digital/utils/__init__.py b/gr-digital/python/digital/utils/__init__.py index 0d5aae0b79..f3a8390af8 100644 --- a/gr-digital/python/digital/utils/__init__.py +++ b/gr-digital/python/digital/utils/__init__.py @@ -1,11 +1,11 @@ #!/usr/bin/env python # # Copyright 2011 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# from .lfsr import lfsr_args diff --git a/gr-digital/python/digital/utils/alignment.py b/gr-digital/python/digital/utils/alignment.py index f0541dea56..531c39830e 100644 --- a/gr-digital/python/digital/utils/alignment.py +++ b/gr-digital/python/digital/utils/alignment.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # # Copyright 2011 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# """ This module contains functions for aligning sequences. @@ -41,6 +41,7 @@ def_max_offset = 500 # The maximum number of samples to take from two sequences to check alignment. def_num_samples = 1000 + def compare_sequences(d1, d2, offset, sample_indices=None): """ Takes two binary sequences and an offset and returns the number of @@ -49,7 +50,7 @@ def compare_sequences(d1, d2, offset, sample_indices=None): offset -- offset of d2 relative to d1 sample_indices -- a list of indices to use for the comparison """ - max_index = min(len(d1), len(d2)+offset) + max_index = min(len(d1), len(d2) + offset) if sample_indices is None: sample_indices = list(range(0, max_index)) correct = 0 @@ -57,11 +58,12 @@ def compare_sequences(d1, d2, offset, sample_indices=None): for i in sample_indices: if i >= max_index: break - if d1[i] == d2[i-offset]: + if d1[i] == d2[i - offset]: correct += 1 total += 1 return (correct, total) + def random_sample(size, num_samples=def_num_samples, seed=None): """ Returns a set of random integers between 0 and (size-1). @@ -76,12 +78,13 @@ def random_sample(size, num_samples=def_num_samples, seed=None): num_samples = num_samples / 2 indices = set([]) while len(indices) < num_samples: - index = rndm.randint(0, size-1) + index = rndm.randint(0, size - 1) indices.add(index) indices = list(indices) indices.sort() return indices + def align_sequences(d1, d2, num_samples=def_num_samples, max_offset=def_max_offset, @@ -113,7 +116,7 @@ def align_sequences(d1, d2, int_range = [item for items in zip(pos_range, neg_range) for item in items] for offset in int_range: correct, compared = compare_sequences(d1, d2, offset, indices) - frac_correct = 1.0*correct/compared + frac_correct = 1.0 * correct / compared if frac_correct > max_frac_correct: max_frac_correct = frac_correct best_offset = offset @@ -122,8 +125,8 @@ def align_sequences(d1, d2, if frac_correct > correct_cutoff: break return max_frac_correct, best_compared, best_offset, indices - + + if __name__ == "__main__": import doctest doctest.testmod() - diff --git a/gr-digital/python/digital/utils/gray_code.py b/gr-digital/python/digital/utils/gray_code.py index e045e9a4ac..de8ffbc93c 100644 --- a/gr-digital/python/digital/utils/gray_code.py +++ b/gr-digital/python/digital/utils/gray_code.py @@ -41,14 +41,14 @@ class GrayCodeGenerator(object): else: # if not we take advantage of the symmetry of all but the last bit # around a power of two. - result = self.gcs[2*self.lp2-1-self.i] + self.lp2 + result = self.gcs[2 * self.lp2 - 1 - self.i] + self.lp2 self.gcs.append(result) self.i += 1 if self.i == self.np2: self.lp2 = self.i - self.np2 = self.i*2 + self.np2 = self.i * 2 + _gray_code_generator = GrayCodeGenerator() gray_code = _gray_code_generator.get_gray_code - diff --git a/gr-digital/python/digital/utils/lfsr.py b/gr-digital/python/digital/utils/lfsr.py index 2b8a47cb76..44f62f0c7a 100644 --- a/gr-digital/python/digital/utils/lfsr.py +++ b/gr-digital/python/digital/utils/lfsr.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # # Copyright 2020 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# def lfsr_args(seed, *exp): """ @@ -18,4 +18,4 @@ def lfsr_args(seed, *exp): Creates an lfsr object with seed 0b11001, mask 0b1000011, K=6 """ from functools import reduce - return reduce(int.__xor__, map(lambda x:2**x, exp)), seed, max(exp)-1 + return reduce(int.__xor__, map(lambda x: 2**x, exp)), seed, max(exp) - 1 diff --git a/gr-digital/python/digital/utils/mod_codes.py b/gr-digital/python/digital/utils/mod_codes.py index bafb85e18b..f0ac9f1fb5 100644 --- a/gr-digital/python/digital/utils/mod_codes.py +++ b/gr-digital/python/digital/utils/mod_codes.py @@ -1,12 +1,12 @@ #!/usr/bin/env python # # Copyright 2011 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# # Constants used to represent what coding to use. GRAY_CODE = 'gray' @@ -15,6 +15,7 @@ NO_CODE = 'none' codes = (GRAY_CODE, SET_PARTITION_CODE, NO_CODE) + def invert_code(code): c = enumerate(code) ic = [(b, a) for (a, b) in c] diff --git a/gr-digital/python/digital/utils/tagged_streams.py b/gr-digital/python/digital/utils/tagged_streams.py index 49c1058981..9876519bd0 100644 --- a/gr-digital/python/digital/utils/tagged_streams.py +++ b/gr-digital/python/digital/utils/tagged_streams.py @@ -13,6 +13,7 @@ from gnuradio import gr import pmt + def make_lengthtags(lengths, offsets, tagname='length', vlen=1): tags = [] assert(len(offsets) == len(lengths)) @@ -24,26 +25,31 @@ def make_lengthtags(lengths, offsets, tagname='length', vlen=1): tags.append(tag) return tags + def string_to_vector(string): v = [] for s in string: v.append(ord(s)) return v + def strings_to_vectors(strings, lengthtagname): vs = [string_to_vector(string) for string in strings] return packets_to_vectors(vs, lengthtagname) + def vector_to_string(v): s = [] for d in v: s.append(chr(d)) return ''.join(s) + def vectors_to_strings(data, tags, lengthtagname): packets = vectors_to_packets(data, tags, lengthtagname) return [vector_to_string(packet) for packet in packets] + def count_bursts(data, tags, lengthtagname, vlen=1): lengthtags = [t for t in tags if pmt.symbol_to_string(t.key) == lengthtagname] @@ -53,7 +59,7 @@ def count_bursts(data, tags, lengthtagname, vlen=1): raise ValueError( "More than one tags with key {0} with the same offset={1}." .format(lengthtagname, tag.offset)) - lengths[tag.offset] = pmt.to_long(tag.value)*vlen + lengths[tag.offset] = pmt.to_long(tag.value) * vlen in_burst = False in_packet = False packet_length = None @@ -62,7 +68,8 @@ def count_bursts(data, tags, lengthtagname, vlen=1): for pos in range(len(data)): if pos in lengths: if in_packet: - print("Got tag at pos {0} current packet_pos is {1}".format(pos, packet_pos)) + print("Got tag at pos {0} current packet_pos is {1}".format( + pos, packet_pos)) raise Exception("Received packet tag while in packet.") packet_pos = -1 packet_length = lengths[pos] @@ -74,11 +81,12 @@ def count_bursts(data, tags, lengthtagname, vlen=1): in_burst = False if in_packet: packet_pos += 1 - if packet_pos == packet_length-1: + if packet_pos == packet_length - 1: in_packet = False packet_pos = None return burst_count + def vectors_to_packets(data, tags, lengthtagname, vlen=1): lengthtags = [t for t in tags if pmt.symbol_to_string(t.key) == lengthtagname] @@ -88,7 +96,7 @@ def vectors_to_packets(data, tags, lengthtagname, vlen=1): raise ValueError( "More than one tags with key {0} with the same offset={1}." .format(lengthtagname, tag.offset)) - lengths[tag.offset] = pmt.to_long(tag.value)*vlen + lengths[tag.offset] = pmt.to_long(tag.value) * vlen if 0 not in lengths: raise ValueError("There is no tag with key {0} and an offset of 0" .format(lengthtagname)) @@ -102,12 +110,13 @@ def vectors_to_packets(data, tags, lengthtagname, vlen=1): length = lengths[pos] if length == 0: raise ValueError("Packets cannot have zero length.") - if pos+length > len(data): + if pos + length > len(data): raise ValueError("The final packet is incomplete.") - packets.append(data[pos: pos+length]) + packets.append(data[pos: pos + length]) pos += length return packets + def packets_to_vectors(packets, lengthtagname, vlen=1): tags = [] data = [] |