summaryrefslogtreecommitdiff
path: root/gr-digital
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital')
-rw-r--r--gr-digital/examples/berawgn.py34
-rw-r--r--gr-digital/examples/example_costas.py42
-rw-r--r--gr-digital/examples/example_fll.py48
-rw-r--r--gr-digital/examples/example_timing.py84
-rw-r--r--gr-digital/examples/gen_whitener.py11
-rw-r--r--gr-digital/examples/narrowband/benchmark_add_channel.py17
-rw-r--r--gr-digital/examples/narrowband/digital_bert_rx.py34
-rw-r--r--gr-digital/examples/narrowband/digital_bert_tx.py27
-rw-r--r--gr-digital/examples/narrowband/uhd_interface.py62
-rw-r--r--gr-digital/examples/ofdm/benchmark_add_channel.py17
-rw-r--r--gr-digital/examples/ofdm/receive_path.py19
-rw-r--r--gr-digital/examples/ofdm/transmit_path.py9
-rw-r--r--gr-digital/examples/ofdm/uhd_interface.py61
-rw-r--r--gr-digital/examples/run_length.py18
-rw-r--r--gr-digital/examples/snr_estimators.py57
-rw-r--r--gr-digital/python/digital/__init__.py12
-rw-r--r--gr-digital/python/digital/bpsk.py2
-rw-r--r--gr-digital/python/digital/constellation_map_generator.py2
-rw-r--r--gr-digital/python/digital/cpm.py69
-rw-r--r--gr-digital/python/digital/generic_mod_demod.py66
-rw-r--r--gr-digital/python/digital/gfsk.py85
-rw-r--r--gr-digital/python/digital/gmsk.py87
-rw-r--r--gr-digital/python/digital/modulation_utils.py14
-rw-r--r--gr-digital/python/digital/ofdm_txrx.py319
-rw-r--r--gr-digital/python/digital/psk.py26
-rw-r--r--gr-digital/python/digital/psk_constellations.py103
-rw-r--r--gr-digital/python/digital/qa_burst_shaper.py15
-rw-r--r--gr-digital/python/digital/qa_constellation.py1
-rw-r--r--gr-digital/python/digital/qa_constellation_encoder_bc.py11
-rwxr-xr-xgr-digital/python/digital/qa_decision_feedback_equalizer.py2
-rw-r--r--gr-digital/python/digital/qa_glfsr_source.py2
-rw-r--r--gr-digital/python/digital/qa_lfsr.py20
-rwxr-xr-xgr-digital/python/digital/qa_linear_equalizer.py2
-rw-r--r--gr-digital/python/digital/qa_ofdm_chanest_vcvc.py6
-rw-r--r--gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py3
-rw-r--r--gr-digital/python/digital/qa_scrambler.py72
-rw-r--r--gr-digital/python/digital/qam.py4
-rw-r--r--gr-digital/python/digital/qpsk.py9
-rw-r--r--gr-digital/python/digital/soft_dec_lut_gen.py48
-rw-r--r--gr-digital/python/digital/test_soft_decisions.py23
-rw-r--r--gr-digital/python/digital/utils/__init__.py6
-rw-r--r--gr-digital/python/digital/utils/alignment.py21
-rw-r--r--gr-digital/python/digital/utils/gray_code.py6
-rw-r--r--gr-digital/python/digital/utils/lfsr.py8
-rw-r--r--gr-digital/python/digital/utils/mod_codes.py7
-rw-r--r--gr-digital/python/digital/utils/tagged_streams.py21
46 files changed, 940 insertions, 672 deletions
diff --git a/gr-digital/examples/berawgn.py b/gr-digital/examples/berawgn.py
index 082b73d83f..31ea8403b0 100644
--- a/gr-digital/examples/berawgn.py
+++ b/gr-digital/examples/berawgn.py
@@ -21,7 +21,6 @@ magnitude below what you chose for N_BITS.
"""
-
import math
import numpy
from gnuradio import gr, digital
@@ -45,20 +44,23 @@ except ImportError:
N_BITS = 1e7
RAND_SEED = 42
+
def berawgn(EbN0):
""" Calculates theoretical bit error rate in AWGN (for BPSK and given Eb/N0) """
return 0.5 * erfc(math.sqrt(10**(float(EbN0) / 10)))
+
class BitErrors(gr.hier_block2):
""" Two inputs: true and received bits. We compare them and
add up the number of incorrect bits. Because integrate_ff()
can only add up a certain number of values, the output is
not a scalar, but a sequence of values, the sum of which is
the BER. """
+
def __init__(self, bits_per_byte):
gr.hier_block2.__init__(self, "BitErrors",
- gr.io_signature(2, 2, gr.sizeof_char),
- gr.io_signature(1, 1, gr.sizeof_int))
+ gr.io_signature(2, 2, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_int))
# Bit comparison
comp = blocks.xor_bb()
@@ -74,29 +76,32 @@ class BitErrors(gr.hier_block2):
self)
self.connect((self, 1), (comp, 1))
+
class BERAWGNSimu(gr.top_block):
" This contains the simulation flow graph "
+
def __init__(self, EbN0):
gr.top_block.__init__(self)
self.const = digital.qpsk_constellation()
# Source is N_BITS bits, non-repeated
- data = list(map(int, numpy.random.randint(0, self.const.arity(), N_BITS / self.const.bits_per_symbol())))
- src = blocks.vector_source_b(data, False)
- mod = digital.chunks_to_symbols_bc((self.const.points()), 1)
- add = blocks.add_vcc()
+ data = list(map(int, numpy.random.randint(
+ 0, self.const.arity(), N_BITS / self.const.bits_per_symbol())))
+ src = blocks.vector_source_b(data, False)
+ mod = digital.chunks_to_symbols_bc((self.const.points()), 1)
+ add = blocks.add_vcc()
noise = analog.noise_source_c(analog.GR_GAUSSIAN,
self.EbN0_to_noise_voltage(EbN0),
RAND_SEED)
demod = digital.constellation_decoder_cb(self.const.base())
- ber = BitErrors(self.const.bits_per_symbol())
- self.sink = blocks.vector_sink_f()
+ ber = BitErrors(self.const.bits_per_symbol())
+ self.sink = blocks.vector_sink_f()
self.connect(src, mod, add, demod, ber, self.sink)
self.connect(noise, (add, 1))
self.connect(src, (ber, 1))
def EbN0_to_noise_voltage(self, EbN0):
""" Converts Eb/N0 to a complex noise voltage (assuming unit symbol power) """
- return 1.0 / math.sqrt(self.const.bits_per_symbol( * 10**(float(EbN0) / 10)))
+ return 1.0 / math.sqrt(self.const.bits_per_symbol(* 10**(float(EbN0) / 10)))
def simulate_ber(EbN0):
@@ -106,16 +111,17 @@ def simulate_ber(EbN0):
fg.run()
return numpy.sum(fg.sink.data())
+
if __name__ == "__main__":
EbN0_min = 0
EbN0_max = 15
- EbN0_range = list(range(EbN0_min, EbN0_max+1))
- ber_theory = [berawgn(x) for x in EbN0_range]
+ EbN0_range = list(range(EbN0_min, EbN0_max + 1))
+ ber_theory = [berawgn(x) for x in EbN0_range]
print("Simulating...")
- ber_simu = [simulate_ber(x) for x in EbN0_range]
+ ber_simu = [simulate_ber(x) for x in EbN0_range]
f = pyplot.figure()
- s = f.add_subplot(1,1,1)
+ s = f.add_subplot(1, 1, 1)
s.semilogy(EbN0_range, ber_theory, 'g-.', label="Theoretical")
s.semilogy(EbN0_range, ber_simu, 'b-o', label="Simulated")
s.set_title('BER Simulation')
diff --git a/gr-digital/examples/example_costas.py b/gr-digital/examples/example_costas.py
index 1db3287578..b7185ba974 100644
--- a/gr-digital/examples/example_costas.py
+++ b/gr-digital/examples/example_costas.py
@@ -24,6 +24,7 @@ except ImportError:
print("Error: could not import pyplot (http://matplotlib.sourceforge.net/)")
sys.exit(1)
+
class example_costas(gr.top_block):
def __init__(self, N, sps, rolloff, ntaps, bw, noise, foffset, toffset, poffset):
gr.top_block.__init__(self)
@@ -31,8 +32,8 @@ class example_costas(gr.top_block):
rrc_taps = filter.firdes.root_raised_cosine(
sps, sps, 1.0, rolloff, ntaps)
- data = 2.0*numpy.random.randint(0, 2, N) - 1.0
- data = numpy.exp(1j*poffset) * data
+ data = 2.0 * numpy.random.randint(0, 2, N) - 1.0
+ data = numpy.exp(1j * poffset) * data
self.src = blocks.vector_source_c(data.tolist(), False)
self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps)
@@ -45,28 +46,29 @@ class example_costas(gr.top_block):
self.connect(self.src, self.rrc, self.chn, self.cst, self.vsnk_cst)
self.connect(self.rrc, self.vsnk_src)
- self.connect((self.cst,1), self.vsnk_frq)
+ self.connect((self.cst, 1), self.vsnk_frq)
+
def main():
parser = ArgumentParser(conflict_handler="resolve")
parser.add_argument("-N", "--nsamples", type=int, default=2000,
- help="Set the number of samples to process [default=%(default)r]")
+ help="Set the number of samples to process [default=%(default)r]")
parser.add_argument("-S", "--sps", type=int, default=4,
- help="Set the samples per symbol [default=%(default)r]")
+ help="Set the samples per symbol [default=%(default)r]")
parser.add_argument("-r", "--rolloff", type=eng_float, default=0.35,
- help="Set the rolloff factor [default=%(default)r]")
- parser.add_argument("-W", "--bandwidth", type=eng_float, default=2*numpy.pi/100.0,
- help="Set the loop bandwidth [default=%(default)r]")
+ help="Set the rolloff factor [default=%(default)r]")
+ parser.add_argument("-W", "--bandwidth", type=eng_float, default=2 * numpy.pi / 100.0,
+ help="Set the loop bandwidth [default=%(default)r]")
parser.add_argument("-n", "--ntaps", type=int, default=45,
- help="Set the number of taps in the filters [default=%(default)r]")
+ help="Set the number of taps in the filters [default=%(default)r]")
parser.add_argument("--noise", type=eng_float, default=0.0,
- help="Set the simulation noise voltage [default=%(default)r]")
+ help="Set the simulation noise voltage [default=%(default)r]")
parser.add_argument("-f", "--foffset", type=eng_float, default=0.0,
- help="Set the simulation's normalized frequency offset (in Hz) [default=%(default)r]")
+ help="Set the simulation's normalized frequency offset (in Hz) [default=%(default)r]")
parser.add_argument("-t", "--toffset", type=eng_float, default=1.0,
- help="Set the simulation's timing offset [default=%(default)r]")
+ help="Set the simulation's timing offset [default=%(default)r]")
parser.add_argument("-p", "--poffset", type=eng_float, default=0.707,
- help="Set the simulation's phase offset [default=%(default)r]")
+ help="Set the simulation's phase offset [default=%(default)r]")
args = parser.parse_args()
# Adjust N for the interpolation by sps
@@ -81,21 +83,21 @@ def main():
data_src = numpy.array(put.vsnk_src.data())
# Convert the FLL's LO frequency from rads/sec to Hz
- data_frq = numpy.array(put.vsnk_frq.data()) / (2.0*numpy.pi)
+ data_frq = numpy.array(put.vsnk_frq.data()) / (2.0 * numpy.pi)
# adjust this to align with the data.
- data_cst = numpy.array(3*[0,]+list(put.vsnk_cst.data()))
+ data_cst = numpy.array(3 * [0, ] + list(put.vsnk_cst.data()))
# Plot the Costas loop's LO frequency
- f1 = pyplot.figure(1, figsize=(12,10), facecolor='w')
- s1 = f1.add_subplot(2,2,1)
+ f1 = pyplot.figure(1, figsize=(12, 10), facecolor='w')
+ s1 = f1.add_subplot(2, 2, 1)
s1.plot(data_frq)
s1.set_title("Costas LO")
s1.set_xlabel("Samples")
s1.set_ylabel("Frequency (normalized Hz)")
# Plot the IQ symbols
- s3 = f1.add_subplot(2,2,2)
+ s3 = f1.add_subplot(2, 2, 2)
s3.plot(data_src.real, data_src.imag, "o")
s3.plot(data_cst.real, data_cst.imag, "rx")
s3.set_title("IQ")
@@ -105,7 +107,7 @@ def main():
s3.set_ylim([-2, 2])
# Plot the symbols in time
- s4 = f1.add_subplot(2,2,3)
+ s4 = f1.add_subplot(2, 2, 3)
s4.set_position([0.125, 0.05, 0.775, 0.4])
s4.plot(data_src.real, "o-")
s4.plot(data_cst.real, "rx-")
@@ -115,9 +117,9 @@ def main():
pyplot.show()
+
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
-
diff --git a/gr-digital/examples/example_fll.py b/gr-digital/examples/example_fll.py
index 021a3c1f6f..70b5bbd749 100644
--- a/gr-digital/examples/example_fll.py
+++ b/gr-digital/examples/example_fll.py
@@ -24,6 +24,7 @@ except ImportError:
print("Error: could not from matplotlib import pyplot (http://matplotlib.sourceforge.net/)")
sys.exit(1)
+
class example_fll(gr.top_block):
def __init__(self, N, sps, rolloff, ntaps, bw, noise, foffset, toffset, poffset):
gr.top_block.__init__(self)
@@ -31,8 +32,8 @@ class example_fll(gr.top_block):
rrc_taps = filter.firdes.root_raised_cosine(
sps, sps, 1.0, rolloff, ntaps)
- data = 2.0*numpy.random.randint(0, 2, N) - 1.0
- data = numpy.exp(1j*poffset) * data
+ data = 2.0 * numpy.random.randint(0, 2, N) - 1.0
+ data = numpy.exp(1j * poffset) * data
self.src = blocks.vector_source_c(data.tolist(), False)
self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps)
@@ -47,30 +48,31 @@ class example_fll(gr.top_block):
self.connect(self.src, self.rrc, self.chn, self.fll, self.vsnk_fll)
self.connect(self.rrc, self.vsnk_src)
- self.connect((self.fll,1), self.vsnk_frq)
- self.connect((self.fll,2), self.vsnk_phs)
- self.connect((self.fll,3), self.vsnk_err)
+ self.connect((self.fll, 1), self.vsnk_frq)
+ self.connect((self.fll, 2), self.vsnk_phs)
+ self.connect((self.fll, 3), self.vsnk_err)
+
def main():
parser = ArgumentParser(conflict_handler="resolve")
parser.add_argument("-N", "--nsamples", type=int, default=2000,
- help="Set the number of samples to process [default=%(default)r]")
+ help="Set the number of samples to process [default=%(default)r]")
parser.add_argument("-S", "--sps", type=int, default=4,
- help="Set the samples per symbol [default=%(default)r]")
+ help="Set the samples per symbol [default=%(default)r]")
parser.add_argument("-r", "--rolloff", type=eng_float, default=0.35,
- help="Set the rolloff factor [default=%(default)r]")
- parser.add_argument("-W", "--bandwidth", type=eng_float, default=2*numpy.pi/100.0,
- help="Set the loop bandwidth [default=%(default)r]")
+ help="Set the rolloff factor [default=%(default)r]")
+ parser.add_argument("-W", "--bandwidth", type=eng_float, default=2 * numpy.pi / 100.0,
+ help="Set the loop bandwidth [default=%(default)r]")
parser.add_argument("-n", "--ntaps", type=int, default=45,
- help="Set the number of taps in the filters [default=%(default)r]")
+ help="Set the number of taps in the filters [default=%(default)r]")
parser.add_argument("--noise", type=eng_float, default=0.0,
- help="Set the simulation noise voltage [default=%(default)r]")
+ help="Set the simulation noise voltage [default=%(default)r]")
parser.add_argument("-f", "--foffset", type=eng_float, default=0.2,
- help="Set the simulation's normalized frequency offset (in Hz) [default=%(default)r]")
+ help="Set the simulation's normalized frequency offset (in Hz) [default=%(default)r]")
parser.add_argument("-t", "--toffset", type=eng_float, default=1.0,
- help="Set the simulation's timing offset [default=%(default)r]")
+ help="Set the simulation's timing offset [default=%(default)r]")
parser.add_argument("-p", "--poffset", type=eng_float, default=0.0,
- help="Set the simulation's phase offset [default=%(default)r]")
+ help="Set the simulation's phase offset [default=%(default)r]")
args = parser.parse_args()
# Adjust N for the interpolation by sps
@@ -86,29 +88,29 @@ def main():
data_err = numpy.array(put.vsnk_err.data())
# Convert the FLL's LO frequency from rads/sec to Hz
- data_frq = numpy.array(put.vsnk_frq.data()) / (2.0*numpy.pi)
+ data_frq = numpy.array(put.vsnk_frq.data()) / (2.0 * numpy.pi)
# adjust this to align with the data. There are 2 filters of
# ntaps long and the channel introduces another 4 sample delay.
- data_fll = numpy.array(put.vsnk_fll.data()[2*args.ntaps-4:])
+ data_fll = numpy.array(put.vsnk_fll.data()[2 * args.ntaps - 4:])
# Plot the FLL's LO frequency
- f1 = pyplot.figure(1, figsize=(12,10))
- s1 = f1.add_subplot(2,2,1)
+ f1 = pyplot.figure(1, figsize=(12, 10))
+ s1 = f1.add_subplot(2, 2, 1)
s1.plot(data_frq)
s1.set_title("FLL LO")
s1.set_xlabel("Samples")
s1.set_ylabel("Frequency (normalized Hz)")
# Plot the FLL's error
- s2 = f1.add_subplot(2,2,2)
+ s2 = f1.add_subplot(2, 2, 2)
s2.plot(data_err)
s2.set_title("FLL Error")
s2.set_xlabel("Samples")
s2.set_ylabel("FLL Loop error")
# Plot the IQ symbols
- s3 = f1.add_subplot(2,2,3)
+ s3 = f1.add_subplot(2, 2, 3)
s3.plot(data_src.real, data_src.imag, "o")
s3.plot(data_fll.real, data_fll.imag, "rx")
s3.set_title("IQ")
@@ -116,7 +118,7 @@ def main():
s3.set_ylabel("Imag part")
# Plot the symbols in time
- s4 = f1.add_subplot(2,2,4)
+ s4 = f1.add_subplot(2, 2, 4)
s4.plot(data_src.real, "o-")
s4.plot(data_fll.real, "rx-")
s4.set_title("Symbols")
@@ -125,9 +127,9 @@ def main():
pyplot.show()
+
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
-
diff --git a/gr-digital/examples/example_timing.py b/gr-digital/examples/example_timing.py
index e2ff4a5649..f49576b2aa 100644
--- a/gr-digital/examples/example_timing.py
+++ b/gr-digital/examples/example_timing.py
@@ -24,6 +24,7 @@ except ImportError:
print("Error: could not from matplotlib import pyplot (http://matplotlib.sourceforge.net/)")
sys.exit(1)
+
class example_timing(gr.top_block):
def __init__(self, N, sps, rolloff, ntaps, bw, noise,
foffset, toffset, poffset, mode=0):
@@ -35,10 +36,10 @@ class example_timing(gr.top_block):
gain = bw
nfilts = 32
rrc_taps_rx = filter.firdes.root_raised_cosine(
- nfilts, sps*nfilts, 1.0, rolloff, ntaps*nfilts)
+ nfilts, sps * nfilts, 1.0, rolloff, ntaps * nfilts)
- data = 2.0*numpy.random.randint(0, 2, N) - 1.0
- data = numpy.exp(1j*poffset) * data
+ data = 2.0 * numpy.random.randint(0, 2, N) - 1.0
+ data = numpy.exp(1j * poffset) * data
self.src = blocks.vector_source_c(data.tolist(), False)
self.rrc = filter.interp_fir_filter_ccf(sps, rrc_taps)
@@ -47,25 +48,25 @@ class example_timing(gr.top_block):
if mode == 0:
self.clk = digital.pfb_clock_sync_ccf(sps, gain, rrc_taps_rx,
- nfilts, nfilts//2, 1)
+ nfilts, nfilts // 2, 1)
self.taps = self.clk.taps()
self.dtaps = self.clk.diff_taps()
- self.delay = int(numpy.ceil(((len(rrc_taps)-1)//2 +
- (len(self.taps[0])-1)//2 )//float(sps))) + 1
+ self.delay = int(numpy.ceil(((len(rrc_taps) - 1) // 2 +
+ (len(self.taps[0]) - 1) // 2) // float(sps))) + 1
self.vsnk_err = blocks.vector_sink_f()
self.vsnk_rat = blocks.vector_sink_f()
self.vsnk_phs = blocks.vector_sink_f()
- self.connect((self.clk,1), self.vsnk_err)
- self.connect((self.clk,2), self.vsnk_rat)
- self.connect((self.clk,3), self.vsnk_phs)
+ self.connect((self.clk, 1), self.vsnk_err)
+ self.connect((self.clk, 2), self.vsnk_rat)
+ self.connect((self.clk, 3), self.vsnk_phs)
- else: # mode == 1
+ else: # mode == 1
mu = 0.5
gain_mu = bw
- gain_omega = 0.25*gain_mu*gain_mu
+ gain_omega = 0.25 * gain_mu * gain_mu
omega_rel_lim = 0.02
self.clk = digital.clock_recovery_mm_cc(sps, gain_omega,
mu, gain_mu,
@@ -73,37 +74,38 @@ class example_timing(gr.top_block):
self.vsnk_err = blocks.vector_sink_f()
- self.connect((self.clk,1), self.vsnk_err)
+ self.connect((self.clk, 1), self.vsnk_err)
self.vsnk_src = blocks.vector_sink_c()
self.vsnk_clk = blocks.vector_sink_c()
- self.connect(self.src, self.rrc, self.chn, self.off, self.clk, self.vsnk_clk)
+ self.connect(self.src, self.rrc, self.chn,
+ self.off, self.clk, self.vsnk_clk)
self.connect(self.src, self.vsnk_src)
def main():
parser = ArgumentParser(conflict_handler="resolve")
parser.add_argument("-N", "--nsamples", type=int, default=2000,
- help="Set the number of samples to process [default=%(default)r]")
+ help="Set the number of samples to process [default=%(default)r]")
parser.add_argument("-S", "--sps", type=int, default=4,
- help="Set the samples per symbol [default=%(default)r]")
+ help="Set the samples per symbol [default=%(default)r]")
parser.add_argument("-r", "--rolloff", type=eng_float, default=0.35,
- help="Set the rolloff factor [default=%(default)r]")
- parser.add_argument("-W", "--bandwidth", type=eng_float, default=2*numpy.pi/100.0,
- help="Set the loop bandwidth (PFB) or gain (M&M) [default=%(default)r]")
+ help="Set the rolloff factor [default=%(default)r]")
+ parser.add_argument("-W", "--bandwidth", type=eng_float, default=2 * numpy.pi / 100.0,
+ help="Set the loop bandwidth (PFB) or gain (M&M) [default=%(default)r]")
parser.add_argument("-n", "--ntaps", type=int, default=45,
- help="Set the number of taps in the filters [default=%(default)r]")
+ help="Set the number of taps in the filters [default=%(default)r]")
parser.add_argument("--noise", type=eng_float, default=0.0,
- help="Set the simulation noise voltage [default=%(default)r]")
+ help="Set the simulation noise voltage [default=%(default)r]")
parser.add_argument("-f", "--foffset", type=eng_float, default=0.0,
- help="Set the simulation's normalized frequency offset (in Hz) [default=%(default)r]")
+ help="Set the simulation's normalized frequency offset (in Hz) [default=%(default)r]")
parser.add_argument("-t", "--toffset", type=eng_float, default=1.0,
- help="Set the simulation's timing offset [default=%(default)r]")
+ help="Set the simulation's timing offset [default=%(default)r]")
parser.add_argument("-p", "--poffset", type=eng_float, default=0.0,
- help="Set the simulation's phase offset [default=%(default)r]")
+ help="Set the simulation's phase offset [default=%(default)r]")
parser.add_argument("-M", "--mode", type=int, default=0,
- help="Set the recovery mode (0: polyphase, 1: M&M) [default=%(default)r]")
+ help="Set the recovery mode (0: polyphase, 1: M&M) [default=%(default)r]")
args = parser.parse_args()
# Adjust N for the interpolation by sps
@@ -124,10 +126,10 @@ def main():
data_rat = numpy.array(put.vsnk_rat.data()[20:])
data_phs = numpy.array(put.vsnk_phs.data()[20:])
- f1 = pyplot.figure(1, figsize=(12,10), facecolor='w')
+ f1 = pyplot.figure(1, figsize=(12, 10), facecolor='w')
# Plot the IQ symbols
- s1 = f1.add_subplot(2,2,1)
+ s1 = f1.add_subplot(2, 2, 1)
s1.plot(data_src.real, data_src.imag, "bo")
s1.plot(data_clk.real, data_clk.imag, "ro")
s1.set_title("IQ")
@@ -139,7 +141,7 @@ def main():
# Plot the symbols in time
delay = put.delay
m = len(data_clk.real)
- s2 = f1.add_subplot(2,2,2)
+ s2 = f1.add_subplot(2, 2, 2)
s2.plot(data_src.real, "bs", markersize=10, label="Input")
s2.plot(data_clk.real[delay:], "ro", label="Recovered")
s2.set_title("Symbols")
@@ -148,7 +150,7 @@ def main():
s2.legend()
# Plot the clock recovery loop's error
- s3 = f1.add_subplot(2,2,3)
+ s3 = f1.add_subplot(2, 2, 3)
s3.plot(data_err, label="Error")
s3.plot(data_rat, 'r', label="Update rate")
s3.set_title("Clock Recovery Loop Error")
@@ -158,26 +160,27 @@ def main():
s3.legend()
# Plot the clock recovery loop's error
- s4 = f1.add_subplot(2,2,4)
+ s4 = f1.add_subplot(2, 2, 4)
s4.plot(data_phs)
s4.set_title("Clock Recovery Loop Filter Phase")
s4.set_xlabel("Samples")
s4.set_ylabel("Filter Phase")
-
diff_taps = put.dtaps
ntaps = len(diff_taps[0])
nfilts = len(diff_taps)
- t = numpy.arange(0, ntaps*nfilts)
+ t = numpy.arange(0, ntaps * nfilts)
- f3 = pyplot.figure(3, figsize=(12,10), facecolor='w')
- s31 = f3.add_subplot(2,1,1)
- s32 = f3.add_subplot(2,1,2)
+ f3 = pyplot.figure(3, figsize=(12, 10), facecolor='w')
+ s31 = f3.add_subplot(2, 1, 1)
+ s32 = f3.add_subplot(2, 1, 2)
s31.set_title("Differential Filters")
s32.set_title("FFT of Differential Filters")
- for i,d in enumerate(diff_taps):
- D = 20.0*numpy.log10(1e-20+abs(numpy.fft.fftshift(numpy.fft.fft(d, 10000))))
+ for i, d in enumerate(diff_taps):
+ D = 20.0 * \
+ numpy.log10(
+ 1e-20 + abs(numpy.fft.fftshift(numpy.fft.fft(d, 10000))))
s31.plot(t[i::nfilts].real, d, "-o")
s32.plot(D)
s32.set_ylim([-120, 10])
@@ -189,10 +192,10 @@ def main():
data_err = numpy.array(put.vsnk_err.data()[20:])
- f1 = pyplot.figure(1, figsize=(12,10), facecolor='w')
+ f1 = pyplot.figure(1, figsize=(12, 10), facecolor='w')
# Plot the IQ symbols
- s1 = f1.add_subplot(2,2,1)
+ s1 = f1.add_subplot(2, 2, 1)
s1.plot(data_src.real, data_src.imag, "o")
s1.plot(data_clk.real, data_clk.imag, "ro")
s1.set_title("IQ")
@@ -202,7 +205,7 @@ def main():
s1.set_ylim([-2, 2])
# Plot the symbols in time
- s2 = f1.add_subplot(2,2,2)
+ s2 = f1.add_subplot(2, 2, 2)
s2.plot(data_src.real, "bs", markersize=10, label="Input")
s2.plot(data_clk.real, "ro", label="Recovered")
s2.set_title("Symbols")
@@ -211,7 +214,7 @@ def main():
s2.legend()
# Plot the clock recovery loop's error
- s3 = f1.add_subplot(2,2,3)
+ s3 = f1.add_subplot(2, 2, 3)
s3.plot(data_err)
s3.set_title("Clock Recovery Loop Error")
s3.set_xlabel("Samples")
@@ -219,6 +222,7 @@ def main():
pyplot.show()
+
if __name__ == "__main__":
try:
main()
diff --git a/gr-digital/examples/gen_whitener.py b/gr-digital/examples/gen_whitener.py
index a6261d021b..6798597131 100644
--- a/gr-digital/examples/gen_whitener.py
+++ b/gr-digital/examples/gen_whitener.py
@@ -1,18 +1,19 @@
#!/usr/bin/env python
#
# Copyright 2011,2013 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
from gnuradio import gr
from gnuradio import blocks
from argparse import ArgumentParser
import sys
+
class my_graph(gr.top_block):
def __init__(self):
@@ -25,6 +26,7 @@ class my_graph(gr.top_block):
self.dst = blocks.vector_sink_s()
self.connect(src, head, self.dst)
+
if __name__ == '__main__':
try:
tb = my_graph()
@@ -34,10 +36,9 @@ if __name__ == '__main__':
for s in tb.dst.data():
f.write("%3d, " % (s & 0xff,))
f.write("%3d, " % ((s >> 8) & 0xff,))
- i = i+2
+ i = i + 2
if i % 16 == 0:
f.write('\n')
except KeyboardInterrupt:
pass
-
diff --git a/gr-digital/examples/narrowband/benchmark_add_channel.py b/gr-digital/examples/narrowband/benchmark_add_channel.py
index 351aae29cd..725138f85d 100644
--- a/gr-digital/examples/narrowband/benchmark_add_channel.py
+++ b/gr-digital/examples/narrowband/benchmark_add_channel.py
@@ -15,7 +15,10 @@ from gnuradio import eng_notation
from gnuradio.eng_option import eng_option
from optparse import OptionParser
-import random, math, sys
+import random
+import math
+import sys
+
class my_top_block(gr.top_block):
def __init__(self, ifile, ofile, options):
@@ -24,7 +27,7 @@ class my_top_block(gr.top_block):
SNR = 10.0**(options.snr / 10.0)
frequency_offset = options.frequency_offset
time_offset = options.time_offset
- phase_offset = options.phase_offset*(math.pi / 180.0)
+ phase_offset = options.phase_offset * (math.pi / 180.0)
# calculate noise voltage from SNR
power_in_signal = abs(options.tx_amplitude)**2
@@ -35,9 +38,9 @@ class my_top_block(gr.top_block):
#self.throttle = blocks.throttle(gr.sizeof_gr_complex, options.sample_rate)
self.channel = channels.channel_model(noise_voltage, frequency_offset,
- time_offset, noise_seed=-random.randint(0,100000))
+ time_offset, noise_seed=-random.randint(0, 100000))
self.phase = blocks.multiply_const_cc(complex(math.cos(phase_offset),
- math.sin(phase_offset)))
+ math.sin(phase_offset)))
self.snk = blocks.file_sink(gr.sizeof_gr_complex, ofile)
self.connect(self.src, self.channel, self.phase, self.snk)
@@ -50,7 +53,8 @@ class my_top_block(gr.top_block):
def main():
# Create Options Parser:
usage = "benchmack_add_channel.py [options] <input file> <output file>"
- parser = OptionParser (usage=usage, option_class=eng_option, conflict_handler="resolve")
+ parser = OptionParser(
+ usage=usage, option_class=eng_option, conflict_handler="resolve")
parser.add_option("-n", "--snr", type="eng_float", default=30,
help="set the SNR of the channel in dB [default=%default]")
parser.add_option("", "--seed", action="store_true", default=False,
@@ -67,7 +71,7 @@ def main():
default=1.0,
help="tell the simulator the signal amplitude [default=%default]")
- (options, args) = parser.parse_args ()
+ (options, args) = parser.parse_args()
if len(args) != 2:
parser.print_help(sys.stderr)
@@ -86,6 +90,7 @@ def main():
tb.start() # start flow graph
tb.wait() # wait for it to finish
+
if __name__ == '__main__':
try:
main()
diff --git a/gr-digital/examples/narrowband/digital_bert_rx.py b/gr-digital/examples/narrowband/digital_bert_rx.py
index e20b798ee2..7b951c4569 100644
--- a/gr-digital/examples/narrowband/digital_bert_rx.py
+++ b/gr-digital/examples/narrowband/digital_bert_rx.py
@@ -13,7 +13,9 @@ from gnuradio import gr, eng_notation
from optparse import OptionParser
from gnuradio.eng_option import eng_option
import threading
-import sys, time, math
+import sys
+import time
+import math
from gnuradio import digital
from gnuradio import blocks
@@ -23,6 +25,7 @@ from uhd_interface import uhd_receiver
n2s = eng_notation.num_to_str
+
class status_thread(threading.Thread):
def __init__(self, tb):
threading.Thread.__init__(self)
@@ -34,14 +37,13 @@ class status_thread(threading.Thread):
def run(self):
while not self.done:
print("Freq. Offset: {0:5.0f} Hz Timing Offset: {1:10.1f} ppm Estimated SNR: {2:4.1f} dB BER: {3:g}".format(
- tb.frequency_offset(), tb.timing_offset()*1e6, tb.snr(), tb.ber()))
+ tb.frequency_offset(), tb.timing_offset() * 1e6, tb.snr(), tb.ber()))
try:
time.sleep(1.0)
except KeyboardInterrupt:
self.done = True
-
class bert_receiver(gr.hier_block2):
def __init__(self, bitrate,
constellation, samples_per_symbol,
@@ -50,7 +52,8 @@ class bert_receiver(gr.hier_block2):
verbose, log):
gr.hier_block2.__init__(self, "bert_receive",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(0, 0, 0)) # Output signature
self._bitrate = bitrate
@@ -70,7 +73,8 @@ class bert_receiver(gr.hier_block2):
self.connect(self._demod.time_recov, self._snr_probe)
# Descramble BERT sequence. A channel error will create 3 incorrect bits
- self._descrambler = digital.descrambler_bb(0x8A, 0x7F, 7) # CCSDS 7-bit descrambler
+ self._descrambler = digital.descrambler_bb(
+ 0x8A, 0x7F, 7) # CCSDS 7-bit descrambler
# Measure BER by the density of 0s in the stream
self._ber = digital.probe_density_b(1.0 / self._symbol_rate)
@@ -78,7 +82,7 @@ class bert_receiver(gr.hier_block2):
self.connect(self, self._demod, self._descrambler, self._ber)
def frequency_offset(self):
- return self._demod.freq_recov.get_frequency()*self._sample_rate/(2*math.pi)
+ return self._demod.freq_recov.get_frequency() * self._sample_rate / (2 * math.pi)
def timing_offset(self):
return self._demod.time_recov.clock_rate()
@@ -87,8 +91,7 @@ class bert_receiver(gr.hier_block2):
return self._snr_probe.snr()
def ber(self):
- return (1.0-self._ber.density()) / 3.0
-
+ return (1.0 - self._ber.density()) / 3.0
class rx_psk_block(gr.top_block):
@@ -99,7 +102,8 @@ class rx_psk_block(gr.top_block):
self._demodulator_class = demod
# Get demod_kwargs
- demod_kwargs = self._demodulator_class.extract_kwargs_from_options(options)
+ demod_kwargs = self._demodulator_class.extract_kwargs_from_options(
+ options)
# demodulator
self._demodulator = self._demodulator_class(**demod_kwargs)
@@ -114,7 +118,8 @@ class rx_psk_block(gr.top_block):
options.samples_per_symbol = self._source._sps
elif(options.from_file is not None):
- self._source = blocks.file_source(gr.sizeof_gr_complex, options.from_file)
+ self._source = blocks.file_source(
+ gr.sizeof_gr_complex, options.from_file)
else:
self._source = blocks.null_source(gr.sizeof_gr_complex)
@@ -154,7 +159,7 @@ class rx_psk_block(gr.top_block):
def get_options(demods):
parser = OptionParser(option_class=eng_option, conflict_handler="resolve")
- parser.add_option("","--from-file", default=None,
+ parser.add_option("", "--from-file", default=None,
help="input file of samples to demod")
parser.add_option("-m", "--modulation", type="choice", choices=list(demods.keys()),
default='psk',
@@ -165,10 +170,11 @@ def get_options(demods):
parser.add_option("-S", "--samples-per-symbol", type="float", default=2,
help="set samples/symbol [default=%default]")
if not parser.has_option("--verbose"):
- parser.add_option("-v", "--verbose", action="store_true", default=False)
+ parser.add_option("-v", "--verbose",
+ action="store_true", default=False)
if not parser.has_option("--log"):
parser.add_option("", "--log", action="store_true", default=False,
- help="Log all parts of flow graph to files (CAUTION: lots of data)")
+ help="Log all parts of flow graph to files (CAUTION: lots of data)")
uhd_receiver.add_options(parser)
@@ -185,7 +191,7 @@ def get_options(demods):
if __name__ == "__main__":
- print ("""Warning: this example in its current shape is deprecated and
+ print("""Warning: this example in its current shape is deprecated and
will be removed or fundamentally reworked in a coming GNU Radio
release.""")
demods = digital.modulation_utils.type_1_demods()
diff --git a/gr-digital/examples/narrowband/digital_bert_tx.py b/gr-digital/examples/narrowband/digital_bert_tx.py
index fe8ad0671f..d609534271 100644
--- a/gr-digital/examples/narrowband/digital_bert_tx.py
+++ b/gr-digital/examples/narrowband/digital_bert_tx.py
@@ -21,25 +21,30 @@ from uhd_interface import uhd_transmitter
n2s = eng_notation.num_to_str
+
class bert_transmit(gr.hier_block2):
def __init__(self, constellation, samples_per_symbol,
differential, excess_bw, gray_coded,
verbose, log):
gr.hier_block2.__init__(self, "bert_transmit",
- gr.io_signature(0, 0, 0), # Output signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Input signature
+ # Output signature
+ gr.io_signature(0, 0, 0),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Input signature
# Create BERT data bit stream
- self._bits = blocks.vector_source_b([1,], True) # Infinite stream of ones
- self._scrambler = digital.scrambler_bb(0x8A, 0x7F, 7) # CCSDS 7-bit scrambler
+ self._bits = blocks.vector_source_b(
+ [1, ], True) # Infinite stream of ones
+ self._scrambler = digital.scrambler_bb(
+ 0x8A, 0x7F, 7) # CCSDS 7-bit scrambler
self._mod = digital.generic_mod(constellation, differential,
samples_per_symbol,
gray_coded, excess_bw,
verbose, log)
- self._pack = blocks.unpacked_to_packed_bb(self._mod.bits_per_symbol(), gr.GR_MSB_FIRST)
+ self._pack = blocks.unpacked_to_packed_bb(
+ self._mod.bits_per_symbol(), gr.GR_MSB_FIRST)
self.connect(self._bits, self._scrambler, self._pack, self._mod, self)
@@ -66,11 +71,11 @@ class tx_psk_block(gr.top_block):
options.samples_per_symbol = self._sink._sps
elif(options.to_file is not None):
- self._sink = blocks.file_sink(gr.sizeof_gr_complex, options.to_file)
+ self._sink = blocks.file_sink(
+ gr.sizeof_gr_complex, options.to_file)
else:
self._sink = blocks.null_sink(gr.sizeof_gr_complex)
-
self._transmitter = bert_transmit(self._modulator._constellation,
options.samples_per_symbol,
options.differential,
@@ -95,10 +100,11 @@ def get_options(mods):
help="Select modulation bit rate (default=%default)")
parser.add_option("-S", "--samples-per-symbol", type="float", default=2,
help="set samples/symbol [default=%default]")
- parser.add_option("","--to-file", default=None,
+ parser.add_option("", "--to-file", default=None,
help="Output file for modulated samples")
if not parser.has_option("--verbose"):
- parser.add_option("-v", "--verbose", action="store_true", default=False)
+ parser.add_option("-v", "--verbose",
+ action="store_true", default=False)
if not parser.has_option("--log"):
parser.add_option("", "--log", action="store_true", default=False)
@@ -114,8 +120,9 @@ def get_options(mods):
return (options, args)
+
if __name__ == "__main__":
- print ("""Warning: this example in its current shape is deprecated and
+ print("""Warning: this example in its current shape is deprecated and
will be removed or fundamentally reworked in a coming GNU Radio
release.""")
mods = digital.modulation_utils.type_1_mods()
diff --git a/gr-digital/examples/narrowband/uhd_interface.py b/gr-digital/examples/narrowband/uhd_interface.py
index 54d152cd7c..7a541296ee 100644
--- a/gr-digital/examples/narrowband/uhd_interface.py
+++ b/gr-digital/examples/narrowband/uhd_interface.py
@@ -16,6 +16,7 @@ from optparse import OptionParser
import sys
+
def add_freq_option(parser):
"""
Hackery that has the -f / --freq option set both tx_freq and rx_freq
@@ -30,14 +31,17 @@ def add_freq_option(parser):
help="set Tx and/or Rx frequency to FREQ [default=%default]",
metavar="FREQ")
+
class uhd_interface(object):
def __init__(self, istx, args, sym_rate, sps, freq=None, lo_offset=None,
gain=None, spec=None, antenna=None, clock_source=None):
if(istx):
- self.u = uhd.usrp_sink(device_addr=args, stream_args=uhd.stream_args('fc32'))
+ self.u = uhd.usrp_sink(
+ device_addr=args, stream_args=uhd.stream_args('fc32'))
else:
- self.u = uhd.usrp_source(device_addr=args, stream_args=uhd.stream_args('fc32'))
+ self.u = uhd.usrp_source(
+ device_addr=args, stream_args=uhd.stream_args('fc32'))
# Set clock source
if(clock_source):
@@ -52,7 +56,7 @@ class uhd_interface(object):
self.u.set_antenna(antenna, 0)
self._args = args
- self._ant = antenna
+ self._ant = antenna
self._spec = spec
self._gain = self.set_gain(gain)
self._lo_offset = lo_offset
@@ -69,7 +73,7 @@ class uhd_interface(object):
sps = actual_samp_rate / sym_rate
if(sps < 2):
- req_sps +=1
+ req_sps += 1
else:
actual_sps = sps
break
@@ -93,10 +97,10 @@ class uhd_interface(object):
if gain is None:
# if no gain was specified, use the mid-point in dB
g = self.u.get_gain_range()
- gain = float(g.start()+g.stop()) / 2
+ gain = float(g.start() + g.stop()) / 2
print("\nNo gain specified.")
print("Setting gain to %f (from [%f, %f])" %
- (gain, g.start(), g.stop()))
+ (gain, g.start(), g.stop()))
self.u.set_gain(gain, 0)
return gain
@@ -111,20 +115,21 @@ class uhd_interface(object):
return freq
else:
frange = self.u.get_freq_range()
- sys.stderr.write(("\nRequested frequency (%f) out or range [%f, %f]\n") % \
- (freq, frange.start(), frange.stop()))
+ sys.stderr.write(("\nRequested frequency (%f) out or range [%f, %f]\n") %
+ (freq, frange.start(), frange.stop()))
sys.exit(1)
#-------------------------------------------------------------------#
# TRANSMITTER
#-------------------------------------------------------------------#
+
class uhd_transmitter(uhd_interface, gr.hier_block2):
def __init__(self, args, sym_rate, sps, freq=None, lo_offset=None, gain=None,
spec=None, antenna=None, clock_source=None, verbose=False):
gr.hier_block2.__init__(self, "uhd_transmitter",
- gr.io_signature(1,1,gr.sizeof_gr_complex),
- gr.io_signature(0,0,0))
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(0, 0, 0))
# Set up the UHD interface as a transmitter
uhd_interface.__init__(self, True, args, sym_rate, sps,
@@ -153,21 +158,23 @@ class uhd_transmitter(uhd_interface, gr.hier_block2):
help="set transmit gain in dB (default is midpoint)")
parser.add_option("-C", "--clock-source", type="string", default=None,
help="select clock source (e.g. 'external') [default=%default]")
- parser.add_option("-v", "--verbose", action="store_true", default=False)
+ parser.add_option("-v", "--verbose",
+ action="store_true", default=False)
def _print_verbage(self):
"""
Prints information about the UHD transmitter
"""
print("\nUHD Transmitter:")
- print("Args: %s" % (self._args))
- print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
- print("LO Offset: %sHz" % (eng_notation.num_to_str(self._lo_offset)) )
+ print("Args: %s" % (self._args))
+ print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
+ print("LO Offset: %sHz" %
+ (eng_notation.num_to_str(self._lo_offset)))
print("Gain: %f dB" % (self._gain))
print("Sample Rate: %ssps" % (eng_notation.num_to_str(self._rate)))
- print("Antenna: %s" % (self._ant))
- print("Subdev Spec: %s" % (self._spec))
- print("Clock Source: %s" % (self._clock_source))
+ print("Antenna: %s" % (self._ant))
+ print("Subdev Spec: %s" % (self._spec))
+ print("Clock Source: %s" % (self._clock_source))
#-------------------------------------------------------------------#
# RECEIVER
@@ -178,8 +185,8 @@ class uhd_receiver(uhd_interface, gr.hier_block2):
def __init__(self, args, sym_rate, sps, freq=None, lo_offset=None, gain=None,
spec=None, antenna=None, clock_source=None, verbose=False):
gr.hier_block2.__init__(self, "uhd_receiver",
- gr.io_signature(0,0,0),
- gr.io_signature(1,1,gr.sizeof_gr_complex))
+ gr.io_signature(0, 0, 0),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
# Set up the UHD interface as a receiver
uhd_interface.__init__(self, False, args, sym_rate, sps,
@@ -209,19 +216,20 @@ class uhd_receiver(uhd_interface, gr.hier_block2):
parser.add_option("-C", "--clock-source", type="string", default=None,
help="select clock source (e.g. 'external') [default=%default]")
if not parser.has_option("--verbose"):
- parser.add_option("-v", "--verbose", action="store_true", default=False)
+ parser.add_option("-v", "--verbose",
+ action="store_true", default=False)
def _print_verbage(self):
"""
Prints information about the UHD transmitter
"""
print("\nUHD Receiver:")
- print("UHD Args: %s" % (self._args))
- print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
- print("LO Offset: %sHz" % (eng_notation.num_to_str(self._lo_offset)) )
+ print("UHD Args: %s" % (self._args))
+ print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
+ print("LO Offset: %sHz" %
+ (eng_notation.num_to_str(self._lo_offset)))
print("Gain: %f dB" % (self._gain))
print("Sample Rate: %ssps" % (eng_notation.num_to_str(self._rate)))
- print("Antenna: %s" % (self._ant))
- print("Spec: %s" % (self._spec))
- print("Clock Source: %s" % (self._clock_source))
-
+ print("Antenna: %s" % (self._ant))
+ print("Spec: %s" % (self._spec))
+ print("Clock Source: %s" % (self._clock_source))
diff --git a/gr-digital/examples/ofdm/benchmark_add_channel.py b/gr-digital/examples/ofdm/benchmark_add_channel.py
index 6355bcc828..fec315e8d7 100644
--- a/gr-digital/examples/ofdm/benchmark_add_channel.py
+++ b/gr-digital/examples/ofdm/benchmark_add_channel.py
@@ -15,7 +15,10 @@ from gnuradio import eng_notation
from gnuradio.eng_option import eng_option
from optparse import OptionParser
-import random, math, sys
+import random
+import math
+import sys
+
class my_top_block(gr.top_block):
def __init__(self, ifile, ofile, options):
@@ -23,7 +26,7 @@ class my_top_block(gr.top_block):
SNR = 10.0**(options.snr / 10.0)
time_offset = options.time_offset
- phase_offset = options.phase_offset*(math.pi / 180.0)
+ phase_offset = options.phase_offset * (math.pi / 180.0)
# calculate noise voltage from SNR
power_in_signal = abs(options.tx_amplitude)**2
@@ -37,9 +40,9 @@ class my_top_block(gr.top_block):
#self.throttle = blocks.throttle(gr.sizeof_gr_complex, options.sample_rate)
self.channel = channels.channel_model(noise_voltage, frequency_offset,
- time_offset, noise_seed=-random.randint(0,100000))
+ time_offset, noise_seed=-random.randint(0, 100000))
self.phase = blocks.multiply_const_cc(complex(math.cos(phase_offset),
- math.sin(phase_offset)))
+ math.sin(phase_offset)))
self.snk = blocks.file_sink(gr.sizeof_gr_complex, ofile)
self.connect(self.src, self.channel, self.phase, self.snk)
@@ -52,7 +55,8 @@ class my_top_block(gr.top_block):
def main():
# Create Options Parser:
usage = "benchmack_add_channel.py [options] <input file> <output file>"
- parser = OptionParser (usage=usage, option_class=eng_option, conflict_handler="resolve")
+ parser = OptionParser(
+ usage=usage, option_class=eng_option, conflict_handler="resolve")
parser.add_option("-n", "--snr", type="eng_float", default=30,
help="set the SNR of the channel in dB [default=%default]")
parser.add_option("", "--seed", action="store_true", default=False,
@@ -71,7 +75,7 @@ def main():
default=1.0,
help="tell the simulator the signal amplitude [default=%default]")
- (options, args) = parser.parse_args ()
+ (options, args) = parser.parse_args()
if len(args) != 2:
parser.print_help(sys.stderr)
@@ -94,6 +98,7 @@ def main():
tb.start() # start flow graph
tb.wait() # wait for it to finish
+
if __name__ == '__main__':
try:
main()
diff --git a/gr-digital/examples/ofdm/receive_path.py b/gr-digital/examples/ofdm/receive_path.py
index 90b572a08f..e46056c245 100644
--- a/gr-digital/examples/ofdm/receive_path.py
+++ b/gr-digital/examples/ofdm/receive_path.py
@@ -19,6 +19,7 @@ import sys
# receive path
# /////////////////////////////////////////////////////////////////////////////
+
class receive_path(gr.hier_block2):
def __init__(self, rx_callback, options):
@@ -26,12 +27,13 @@ class receive_path(gr.hier_block2):
gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(0, 0, 0))
+ # make a copy so we can destructively modify
+ options = copy.copy(options)
- options = copy.copy(options) # make a copy so we can destructively modify
-
- self._verbose = options.verbose
- self._log = options.log
- self._rx_callback = rx_callback # this callback is fired when there's a packet available
+ self._verbose = options.verbose
+ self._log = options.log
+ # this callback is fired when there's a packet available
+ self._rx_callback = rx_callback
# receiver
self.ofdm_rx = digital.ofdm_demod(options,
@@ -40,7 +42,7 @@ class receive_path(gr.hier_block2):
# Carrier Sensing Blocks
alpha = 0.001
thresh = 30 # in dB, will have to adjust
- self.probe = analog.probe_avg_mag_sqrd_c(thresh,alpha)
+ self.probe = analog.probe_avg_mag_sqrd_c(thresh, alpha)
self.connect(self, self.ofdm_rx)
self.connect(self.ofdm_rx, self.probe)
@@ -53,7 +55,7 @@ class receive_path(gr.hier_block2):
"""
Return True if we think carrier is present.
"""
- #return self.probe.level() > X
+ # return self.probe.level() > X
return self.probe.unmuted()
def carrier_threshold(self):
@@ -79,7 +81,8 @@ class receive_path(gr.hier_block2):
normal.add_option("-W", "--bandwidth", type="eng_float",
default=500e3,
help="set symbol bandwidth [default=%default]")
- normal.add_option("-v", "--verbose", action="store_true", default=False)
+ normal.add_option("-v", "--verbose",
+ action="store_true", default=False)
expert.add_option("", "--log", action="store_true", default=False,
help="Log all parts of flow graph to files (CAUTION: lots of data)")
diff --git a/gr-digital/examples/ofdm/transmit_path.py b/gr-digital/examples/ofdm/transmit_path.py
index 0982bb688c..83ac6a19a4 100644
--- a/gr-digital/examples/ofdm/transmit_path.py
+++ b/gr-digital/examples/ofdm/transmit_path.py
@@ -20,6 +20,7 @@ import sys
# transmit path
# /////////////////////////////////////////////////////////////////////////////
+
class transmit_path(gr.hier_block2):
def __init__(self, options):
'''
@@ -30,10 +31,11 @@ class transmit_path(gr.hier_block2):
gr.io_signature(0, 0, 0),
gr.io_signature(1, 1, gr.sizeof_gr_complex))
- options = copy.copy(options) # make a copy so we can destructively modify
+ # make a copy so we can destructively modify
+ options = copy.copy(options)
- self._verbose = options.verbose # turn verbose mode on/off
- self._tx_amplitude = options.tx_amplitude # digital amp sent to radio
+ self._verbose = options.verbose # turn verbose mode on/off
+ self._tx_amplitude = options.tx_amplitude # digital amp sent to radio
self.ofdm_tx = digital.ofdm_mod(options,
msgq_limit=4,
@@ -87,4 +89,3 @@ class transmit_path(gr.hier_block2):
Prints information about the transmit path
"""
print("Tx amplitude %s" % (self._tx_amplitude))
-
diff --git a/gr-digital/examples/ofdm/uhd_interface.py b/gr-digital/examples/ofdm/uhd_interface.py
index c354473267..ca0f0e2598 100644
--- a/gr-digital/examples/ofdm/uhd_interface.py
+++ b/gr-digital/examples/ofdm/uhd_interface.py
@@ -16,6 +16,7 @@ from optparse import OptionParser
import sys
+
def add_freq_option(parser):
"""
Hackery that has the -f / --freq option set both tx_freq and rx_freq
@@ -30,14 +31,17 @@ def add_freq_option(parser):
help="set Tx and/or Rx frequency to FREQ [default=%default]",
metavar="FREQ")
+
class uhd_interface(object):
def __init__(self, istx, args, bandwidth, freq=None, lo_offset=None,
gain=None, spec=None, antenna=None, clock_source=None):
if(istx):
- self.u = uhd.usrp_sink(device_addr=args, stream_args=uhd.stream_args('fc32'))
+ self.u = uhd.usrp_sink(
+ device_addr=args, stream_args=uhd.stream_args('fc32'))
else:
- self.u = uhd.usrp_source(device_addr=args, stream_args=uhd.stream_args('fc32'))
+ self.u = uhd.usrp_source(
+ device_addr=args, stream_args=uhd.stream_args('fc32'))
# Set clock source to external.
if(clock_source):
@@ -52,7 +56,7 @@ class uhd_interface(object):
self.u.set_antenna(antenna, 0)
self._args = args
- self._ant = antenna
+ self._ant = antenna
self._spec = spec
self._gain = self.set_gain(gain)
self._lo_offset = lo_offset
@@ -73,10 +77,10 @@ class uhd_interface(object):
if gain is None:
# if no gain was specified, use the mid-point in dB
g = self.u.get_gain_range()
- gain = float(g.start()+g.stop()) / 2
+ gain = float(g.start() + g.stop()) / 2
print("\nNo gain specified.")
print("Setting gain to %f (from [%f, %f])" %
- (gain, g.start(), g.stop()))
+ (gain, g.start(), g.stop()))
self.u.set_gain(gain, 0)
return gain
@@ -92,20 +96,21 @@ class uhd_interface(object):
return freq
else:
frange = self.u.get_freq_range()
- sys.stderr.write(("\nRequested frequency (%f) out or range [%f, %f]\n") % \
- (freq, frange.start(), frange.stop()))
+ sys.stderr.write(("\nRequested frequency (%f) out or range [%f, %f]\n") %
+ (freq, frange.start(), frange.stop()))
sys.exit(1)
#-------------------------------------------------------------------#
# TRANSMITTER
#-------------------------------------------------------------------#
+
class uhd_transmitter(uhd_interface, gr.hier_block2):
def __init__(self, args, bandwidth, freq=None, lo_offset=None, gain=None,
spec=None, antenna=None, clock_source=None, verbose=False):
gr.hier_block2.__init__(self, "uhd_transmitter",
- gr.io_signature(1,1,gr.sizeof_gr_complex),
- gr.io_signature(0,0,0))
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(0, 0, 0))
# Set up the UHD interface as a transmitter
uhd_interface.__init__(self, True, args, bandwidth,
@@ -134,22 +139,23 @@ class uhd_transmitter(uhd_interface, gr.hier_block2):
help="set transmit gain in dB (default is midpoint)")
parser.add_option("-C", "--clock-source", type="string", default=None,
help="select clock source (e.g. 'external') [default=%default]")
- parser.add_option("-v", "--verbose", action="store_true", default=False)
+ parser.add_option("-v", "--verbose",
+ action="store_true", default=False)
def _print_verbage(self):
"""
Prints information about the UHD transmitter
"""
print("\nUHD Transmitter:")
- print("UHD Args: %s" % (self._args))
- print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
- print("LO Offset: %sHz" % (eng_notation.num_to_str(self._lo_offset)))
+ print("UHD Args: %s" % (self._args))
+ print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
+ print("LO Offset: %sHz" %
+ (eng_notation.num_to_str(self._lo_offset)))
print("Gain: %f dB" % (self._gain))
print("Sample Rate: %ssps" % (eng_notation.num_to_str(self._rate)))
- print("Antenna: %s" % (self._ant))
- print("Subdev Sec: %s" % (self._spec))
- print("Clock Source: %s" % (self._clock_source))
-
+ print("Antenna: %s" % (self._ant))
+ print("Subdev Sec: %s" % (self._spec))
+ print("Clock Source: %s" % (self._clock_source))
#-------------------------------------------------------------------#
@@ -161,8 +167,8 @@ class uhd_receiver(uhd_interface, gr.hier_block2):
def __init__(self, args, bandwidth, freq=None, lo_offset=None, gain=None,
spec=None, antenna=None, clock_source=None, verbose=False):
gr.hier_block2.__init__(self, "uhd_receiver",
- gr.io_signature(0,0,0),
- gr.io_signature(1,1,gr.sizeof_gr_complex))
+ gr.io_signature(0, 0, 0),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
# Set up the UHD interface as a receiver
uhd_interface.__init__(self, False, args, bandwidth,
@@ -192,19 +198,20 @@ class uhd_receiver(uhd_interface, gr.hier_block2):
parser.add_option("-C", "--clock-source", type="string", default=None,
help="select clock source (e.g. 'external') [default=%default]")
if not parser.has_option("--verbose"):
- parser.add_option("-v", "--verbose", action="store_true", default=False)
+ parser.add_option("-v", "--verbose",
+ action="store_true", default=False)
def _print_verbage(self):
"""
Prints information about the UHD transmitter
"""
print("\nUHD Receiver:")
- print("UHD Args: %s" % (self._args))
- print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
- print("LO Offset: %sHz" % (eng_notation.num_to_str(self._lo_offset)))
+ print("UHD Args: %s" % (self._args))
+ print("Freq: %sHz" % (eng_notation.num_to_str(self._freq)))
+ print("LO Offset: %sHz" %
+ (eng_notation.num_to_str(self._lo_offset)))
print("Gain: %f dB" % (self._gain))
print("Sample Rate: %ssps" % (eng_notation.num_to_str(self._rate)))
- print("Antenna: %s" % (self._ant))
- print("Subdev Sec: %s" % (self._spec))
- print("Clock Source: %s" % (self._clock_source))
-
+ print("Antenna: %s" % (self._ant))
+ print("Subdev Sec: %s" % (self._spec))
+ print("Clock Source: %s" % (self._clock_source))
diff --git a/gr-digital/examples/run_length.py b/gr-digital/examples/run_length.py
index 0d9477012c..8d84b60533 100644
--- a/gr-digital/examples/run_length.py
+++ b/gr-digital/examples/run_length.py
@@ -12,6 +12,7 @@
from optparse import OptionParser
import sys
+
def main():
parser = OptionParser()
parser.add_option("-f", "--file", default=None,
@@ -33,7 +34,7 @@ def main():
for ch in f.read():
x = ord(ch)
bytes = bytes + 1
- for i in range(7,-1,-1):
+ for i in range(7, -1, -1):
bits = bits + 1
t = (x >> i) & 0x1
if t == current:
@@ -42,29 +43,30 @@ def main():
if count > 0:
if len(runs) < count:
for j in range(count - len(runs)):
- runs.append(0);
- runs[count-1] = runs[count-1] + 1
+ runs.append(0)
+ runs[count - 1] = runs[count - 1] + 1
- current = 1-current;
+ current = 1 - current
count = 1
# Deal with last run at EOF
if len(runs) < count and count > 0:
for j in range(count - len(runs)):
- runs.append(0);
- runs[count-1] = runs[count-1] + 1
+ runs.append(0)
+ runs[count - 1] = runs[count - 1] + 1
chk = 0
print("Bytes read: ", bytes)
print("Bits read: ", bits)
print()
for i in range(len(runs)):
- chk = chk + runs[i]*(i+1)
- print("Runs of length", i+1, ":", runs[i])
+ chk = chk + runs[i] * (i + 1)
+ print("Runs of length", i + 1, ":", runs[i])
print()
print("Sum of runs:", chk, "bits")
print()
print("Maximum run length is", len(runs), "bits")
+
if __name__ == "__main__":
main()
diff --git a/gr-digital/examples/snr_estimators.py b/gr-digital/examples/snr_estimators.py
index fbaf06a5ab..bc622bfd31 100644
--- a/gr-digital/examples/snr_estimators.py
+++ b/gr-digital/examples/snr_estimators.py
@@ -38,6 +38,7 @@ For an explination of the online algorithms, see:
http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
'''
+
def online_skewness(data):
n = 0
mean = 0
@@ -46,52 +47,57 @@ def online_skewness(data):
for n in range(len(data)):
delta = data[n] - mean
- delta_n = delta / (n+1)
+ delta_n = delta / (n + 1)
term1 = delta * delta_n * n
mean = mean + delta_n
M3 = M3 + term1 * delta_n * (n - 1) - 3 * delta_n * M2
M2 = M2 + term1
- return scipy.sqrt(len(data))*M3 / scipy.power(M2, 3.0 / 2.0);
+ return scipy.sqrt(len(data)) * M3 / scipy.power(M2, 3.0 / 2.0)
+
def snr_est_simple(signal):
s = scipy.mean(abs(signal)**2)
- n = 2*scipy.var(abs(signal))
+ n = 2 * scipy.var(abs(signal))
snr_rat = s / n
- return 10.0*scipy.log10(snr_rat), snr_rat
+ return 10.0 * scipy.log10(snr_rat), snr_rat
+
def snr_est_skew(signal):
y1 = scipy.mean(abs(signal))
y2 = scipy.mean(scipy.real(signal**2))
- y3 = (y1*y1 - y2)
+ y3 = (y1 * y1 - y2)
y4 = online_skewness(signal.real)
#y4 = stats.skew(abs(signal.real))
- skw = y4*y4 / (y2*y2*y2);
- s = y1*y1
- n = 2*(y3 + skw*s)
+ skw = y4 * y4 / (y2 * y2 * y2)
+ s = y1 * y1
+ n = 2 * (y3 + skw * s)
snr_rat = s / n
- return 10.0*scipy.log10(snr_rat), snr_rat
+ return 10.0 * scipy.log10(snr_rat), snr_rat
+
def snr_est_m2m4(signal):
M2 = scipy.mean(abs(signal)**2)
M4 = scipy.mean(abs(signal)**4)
- snr_rat = scipy.sqrt(2*M2*M2 - M4) / (M2 - scipy.sqrt(2*M2*M2 - M4))
- return 10.0*scipy.log10(snr_rat), snr_rat
+ snr_rat = scipy.sqrt(2 * M2 * M2 - M4) / \
+ (M2 - scipy.sqrt(2 * M2 * M2 - M4))
+ return 10.0 * scipy.log10(snr_rat), snr_rat
+
def snr_est_svr(signal):
N = len(signal)
ssum = 0
msum = 0
for i in range(1, N):
- ssum += (abs(signal[i])**2)*(abs(signal[i-1])**2)
+ ssum += (abs(signal[i])**2) * (abs(signal[i - 1])**2)
msum += (abs(signal[i])**4)
- savg = (1.0 / (float(N-1.0)))*ssum
- mavg = (1.0 / (float(N-1.0)))*msum
+ savg = (1.0 / (float(N - 1.0))) * ssum
+ mavg = (1.0 / (float(N - 1.0))) * msum
beta = savg / (mavg - savg)
- snr_rat = ((beta - 1) + scipy.sqrt(beta*(beta-1)))
- return 10.0*scipy.log10(snr_rat), snr_rat
+ snr_rat = ((beta - 1) + scipy.sqrt(beta * (beta - 1)))
+ return 10.0 * scipy.log10(snr_rat), snr_rat
def main():
@@ -104,7 +110,6 @@ def main():
"m2m4": snr_est_m2m4,
"svr": snr_est_svr}
-
parser = OptionParser(option_class=eng_option, conflict_handler="resolve")
parser.add_option("-N", "--nsamples", type="int", default=10000,
help="Set the number of samples to process [default=%default]")
@@ -118,13 +123,13 @@ def main():
choices=list(gr_estimators.keys()), default="simple",
help="Estimator type {0} [default=%default]".format(
list(gr_estimators.keys())))
- (options, args) = parser.parse_args ()
+ (options, args) = parser.parse_args()
N = options.nsamples
xx = scipy.random.randn(N)
xy = scipy.random.randn(N)
- bits =2*scipy.complex64(scipy.random.randint(0, 2, N)) - 1
- #bits =(2*scipy.complex64(scipy.random.randint(0, 2, N)) - 1) + \
+ bits = 2 * scipy.complex64(scipy.random.randint(0, 2, N)) - 1
+ # bits =(2*scipy.complex64(scipy.random.randint(0, 2, N)) - 1) + \
# 1j*(2*scipy.complex64(scipy.random.randint(0, 2, N)) - 1)
snr_known = list()
@@ -134,7 +139,7 @@ def main():
# when to issue an SNR tag; can be ignored in this example.
ntag = 10000
- n_cpx = xx + 1j*xy
+ n_cpx = xx + 1j * xy
py_est = py_estimators[options.type]
gr_est = gr_estimators[options.type]
@@ -142,17 +147,17 @@ def main():
SNR_min = options.snr_min
SNR_max = options.snr_max
SNR_step = options.snr_step
- SNR_dB = scipy.arange(SNR_min, SNR_max+SNR_step, SNR_step)
+ SNR_dB = scipy.arange(SNR_min, SNR_max + SNR_step, SNR_step)
for snr in SNR_dB:
SNR = 10.0**(snr / 10.0)
- scale = scipy.sqrt(2*SNR)
+ scale = scipy.sqrt(2 * SNR)
yy = bits + n_cpx / scale
print("SNR: ", snr)
Sknown = scipy.mean(yy**2)
Nknown = scipy.var(n_cpx / scale)
snr0 = Sknown / Nknown
- snr0dB = 10.0*scipy.log10(snr0)
+ snr0dB = 10.0 * scipy.log10(snr0)
snr_known.append(float(snr0dB))
snrdB, snr = py_est(yy)
@@ -169,7 +174,7 @@ def main():
snr_gr.append(gr_snr.snr())
f1 = pyplot.figure(1)
- s1 = f1.add_subplot(1,1,1)
+ s1 = f1.add_subplot(1, 1, 1)
s1.plot(SNR_dB, snr_known, "k-o", linewidth=2, label="Known")
s1.plot(SNR_dB, snr_python, "b-o", linewidth=2, label="Python")
s1.plot(SNR_dB, snr_gr, "g-o", linewidth=2, label="GNU Radio")
@@ -180,7 +185,7 @@ def main():
s1.legend()
f2 = pyplot.figure(2)
- s2 = f2.add_subplot(1,1,1)
+ s2 = f2.add_subplot(1, 1, 1)
s2.plot(yy.real, yy.imag, 'o')
pyplot.show()
diff --git a/gr-digital/python/digital/__init__.py b/gr-digital/python/digital/__init__.py
index 64281eb2bb..100766f22c 100644
--- a/gr-digital/python/digital/__init__.py
+++ b/gr-digital/python/digital/__init__.py
@@ -22,11 +22,7 @@ except ImportError:
__path__.append(os.path.join(dirname, "bindings"))
from .digital_python import *
-from gnuradio import analog # just need analog for the enum
-class gmskmod_bc(cpmmod_bc):
- def __init__(self, samples_per_sym = 2, L = 4, beta = 0.3):
- cpmmod_bc.__init__(self, analog.cpm.GAUSSIAN, 0.5, samples_per_sym, L, beta)
-
+from gnuradio import analog # just need analog for the enum
from .psk import *
from .qam import *
from .qamlike import *
@@ -42,4 +38,8 @@ from .psk_constellations import *
from .qam_constellations import *
from .constellation_map_generator import *
-from . import packet_utils
+
+class gmskmod_bc(cpmmod_bc):
+ def __init__(self, samples_per_sym=2, L=4, beta=0.3):
+ cpmmod_bc.__init__(self, analog.cpm.GAUSSIAN,
+ 0.5, samples_per_sym, L, beta)
diff --git a/gr-digital/python/digital/bpsk.py b/gr-digital/python/digital/bpsk.py
index 7d5b8cde7a..e6d507e2f7 100644
--- a/gr-digital/python/digital/bpsk.py
+++ b/gr-digital/python/digital/bpsk.py
@@ -25,6 +25,7 @@ from . import modulation_utils
# BPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def bpsk_constellation():
return digital_python.constellation_bpsk()
@@ -32,6 +33,7 @@ def bpsk_constellation():
# DBPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def dbpsk_constellation():
return digital_python.constellation_dbpsk()
diff --git a/gr-digital/python/digital/constellation_map_generator.py b/gr-digital/python/digital/constellation_map_generator.py
index e6e660392c..e3d6f532fd 100644
--- a/gr-digital/python/digital/constellation_map_generator.py
+++ b/gr-digital/python/digital/constellation_map_generator.py
@@ -35,7 +35,7 @@ def constellation_map_generator(basis_cpoints, basis_symbols, k, pi):
symbols = list()
for s_i in s:
tmp = 0
- for i,p in enumerate(pi):
+ for i, p in enumerate(pi):
bit = (s_i >> i) & 0x1
tmp |= bit << p
symbols.append(tmp ^ k)
diff --git a/gr-digital/python/digital/cpm.py b/gr-digital/python/digital/cpm.py
index d21ae55a87..ca32fb7036 100644
--- a/gr-digital/python/digital/cpm.py
+++ b/gr-digital/python/digital/cpm.py
@@ -29,7 +29,7 @@ _def_samples_per_symbol = 2
_def_bits_per_symbol = 1
_def_h_numerator = 1
_def_h_denominator = 2
-_def_cpm_type = 0 # 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL
+_def_cpm_type = 0 # 0=CPFSK, 1=GMSK, 2=RC, 3=GENERAL
_def_bt = 0.35
_def_symbols_per_pulse = 1
_def_generic_taps = numpy.empty(1)
@@ -80,62 +80,69 @@ class cpm_mod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "cpm_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
self._samples_per_symbol = samples_per_symbol
self._bits_per_symbol = bits_per_symbol
self._h_numerator = h_numerator
self._h_denominator = h_denominator
self._cpm_type = cpm_type
- self._bt=bt
- if cpm_type == 0 or cpm_type == 2 or cpm_type == 3: # CPFSK, RC, Generic
+ self._bt = bt
+ if cpm_type == 0 or cpm_type == 2 or cpm_type == 3: # CPFSK, RC, Generic
self._symbols_per_pulse = symbols_per_pulse
- elif cpm_type == 1: # GMSK
+ elif cpm_type == 1: # GMSK
self._symbols_per_pulse = 4
else:
- raise TypeError("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
+ raise TypeError(
+ "cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
- self._generic_taps=numpy.array(generic_taps)
+ self._generic_taps = numpy.array(generic_taps)
if samples_per_symbol < 2:
- raise TypeError("samples_per_symbol must be >= 2, is %r" % (samples_per_symbol,))
+ raise TypeError("samples_per_symbol must be >= 2, is %r" %
+ (samples_per_symbol,))
self.nsymbols = 2**bits_per_symbol
- self.sym_alphabet = numpy.arange(-(self.nsymbols-1),self.nsymbols,2).tolist()
-
+ self.sym_alphabet = numpy.arange(-(self.nsymbols - 1),
+ self.nsymbols, 2).tolist()
self.ntaps = int(self._symbols_per_pulse * samples_per_symbol)
sensitivity = 2 * pi * h_numerator / h_denominator / samples_per_symbol
# Unpack Bytes into bits_per_symbol groups
- self.B2s = blocks.packed_to_unpacked_bb(bits_per_symbol,gr.GR_MSB_FIRST)
-
+ self.B2s = blocks.packed_to_unpacked_bb(
+ bits_per_symbol, gr.GR_MSB_FIRST)
# Turn it into symmetric PAM data.
- self.pam = digital_python.chunks_to_symbols_bf(self.sym_alphabet,1)
+ self.pam = digital_python.chunks_to_symbols_bf(self.sym_alphabet, 1)
# Generate pulse (sum of taps = samples_per_symbol/2)
- if cpm_type == 0: # CPFSK
- self.taps= (1.0/self._symbols_per_pulse/2,) * self.ntaps
- elif cpm_type == 1: # GMSK
+ if cpm_type == 0: # CPFSK
+ self.taps = (1.0 / self._symbols_per_pulse / 2,) * self.ntaps
+ elif cpm_type == 1: # GMSK
gaussian_taps = filter.firdes.gaussian(
1.0 / 2, # gain
samples_per_symbol, # symbol_rate
bt, # bandwidth * symbol time
self.ntaps # number of taps
- )
+ )
sqwave = (1,) * samples_per_symbol # rectangular window
- self.taps = numpy.convolve(numpy.array(gaussian_taps),numpy.array(sqwave))
- elif cpm_type == 2: # Raised Cosine
+ self.taps = numpy.convolve(numpy.array(
+ gaussian_taps), numpy.array(sqwave))
+ elif cpm_type == 2: # Raised Cosine
# generalize it for arbitrary roll-off factor
- self.taps = (1-numpy.cos(2*pi*numpy.arange(0 / self.ntaps/samples_per_symbol/self._symbols_per_pulse)),(2*self._symbols_per_pulse))
- elif cpm_type == 3: # Generic CPM
+ self.taps = (1 - numpy.cos(2 * pi * numpy.arange(0 / self.ntaps /
+ samples_per_symbol / self._symbols_per_pulse)), (2 * self._symbols_per_pulse))
+ elif cpm_type == 3: # Generic CPM
self.taps = generic_taps
else:
- raise TypeError("cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
+ raise TypeError(
+ "cpm_type must be an integer in {0,1,2,3}, is %r" % (cpm_type,))
- self.filter = filter.pfb.arb_resampler_fff(samples_per_symbol, self.taps)
+ self.filter = filter.pfb.arb_resampler_fff(
+ samples_per_symbol, self.taps)
# FM modulation
self.fmmod = analog.frequency_modulator_fc(sensitivity)
@@ -170,19 +177,17 @@ class cpm_mod(gr.hier_block2):
def symbols_per_pulse(self):
return self._symbols_per_pulse
-
def _print_verbage(self):
print("Samples per symbol = %d" % self._samples_per_symbol)
print("Bits per symbol = %d" % self._bits_per_symbol)
- print("h = " , self._h_numerator , " / " , self._h_denominator)
- print("Symbol alphabet = " , self.sym_alphabet)
+ print("h = ", self._h_numerator, " / ", self._h_denominator)
+ print("Symbol alphabet = ", self.sym_alphabet)
print("Symbols per pulse = %d" % self._symbols_per_pulse)
- print("taps = " , self.taps)
+ print("taps = ", self.taps)
print("CPM type = %d" % self._cpm_type)
if self._cpm_type == 1:
- print("Gaussian filter BT = %.2f" % self._bt)
-
+ print("Gaussian filter BT = %.2f" % self._bt)
def _setup_logging(self):
print("Modulation logging turned on.")
@@ -209,8 +214,7 @@ class cpm_mod(gr.hier_block2):
Given command line options, create dictionary suitable for passing to __init__
"""
return modulation_utils.extract_kwargs_from_options(cpm_mod.__init__,
- ('self',), options)
-
+ ('self',), options)
# /////////////////////////////////////////////////////////////////////////////
@@ -219,7 +223,6 @@ class cpm_mod(gr.hier_block2):
#
# Not yet implemented
#
-
#
# Add these to the mod/demod registry
#
diff --git a/gr-digital/python/digital/generic_mod_demod.py b/gr-digital/python/digital/generic_mod_demod.py
index a40a7fda5f..2db9e4482a 100644
--- a/gr-digital/python/digital/generic_mod_demod.py
+++ b/gr-digital/python/digital/generic_mod_demod.py
@@ -29,17 +29,18 @@ _def_log = False
_def_truncate = False
# Frequency correction
-_def_freq_bw = 2*math.pi/100.0
+_def_freq_bw = 2 * math.pi / 100.0
# Symbol timing recovery
-_def_timing_bw = 2*math.pi/100.0
+_def_timing_bw = 2 * math.pi / 100.0
_def_timing_max_dev = 1.5
# Fine frequency / Phase correction
-_def_phase_bw = 2*math.pi/100.0
+_def_phase_bw = 2 * math.pi / 100.0
# Number of points in constellation
_def_constellation_points = 16
# Whether differential coding is used.
_def_differential = False
+
def add_common_options(parser):
"""
Sets options common to both modulator and demodulator.
@@ -55,7 +56,7 @@ def add_common_options(parser):
parser.add_option("", "--mod-code", type="choice", choices=mod_codes.codes,
default=mod_codes.NO_CODE,
help="Select modulation code from: %s [default=%%default]"
- % (', '.join(mod_codes.codes),))
+ % (', '.join(mod_codes.codes),))
parser.add_option("", "--excess-bw", type="float", default=_def_excess_bw,
help="set RRC excess bandwidth factor [default=%default]")
@@ -92,8 +93,9 @@ class generic_mod(gr.hier_block2):
truncate=_def_truncate):
gr.hier_block2.__init__(self, "generic_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
self._constellation = constellation
self._samples_per_symbol = samples_per_symbol
@@ -103,31 +105,36 @@ class generic_mod(gr.hier_block2):
self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code()
if self._samples_per_symbol < 2:
- raise TypeError("sps must be >= 2, is %f" % self._samples_per_symbol)
+ raise TypeError("sps must be >= 2, is %f" %
+ self._samples_per_symbol)
- arity = pow(2,self.bits_per_symbol())
+ arity = pow(2, self.bits_per_symbol())
# turn bytes into k-bit vectors
self.bytes2chunks = \
- blocks.packed_to_unpacked_bb(self.bits_per_symbol(), gr.GR_MSB_FIRST)
+ blocks.packed_to_unpacked_bb(
+ self.bits_per_symbol(), gr.GR_MSB_FIRST)
if self.pre_diff_code:
- self.symbol_mapper = digital.map_bb(self._constellation.pre_diff_code())
+ self.symbol_mapper = digital.map_bb(
+ self._constellation.pre_diff_code())
if differential:
self.diffenc = digital.diff_encoder_bb(arity)
- self.chunks2symbols = digital.chunks_to_symbols_bc(self._constellation.points())
+ self.chunks2symbols = digital.chunks_to_symbols_bc(
+ self._constellation.points())
# pulse shaping filter
nfilts = 32
ntaps_per_filt = 11
- ntaps = nfilts * ntaps_per_filt * int(self._samples_per_symbol) # make nfilts filters of ntaps each
+ # make nfilts filters of ntaps each
+ ntaps = nfilts * ntaps_per_filt * int(self._samples_per_symbol)
self.rrc_taps = filter.firdes.root_raised_cosine(
nfilts, # gain
nfilts, # sampling rate based on 32 filters in resampler
1.0, # symbol rate
- self._excess_bw, # excess bandwidth (roll-off factor)
+ self._excess_bw, # excess bandwidth (roll-off factor)
ntaps)
self.rrc_filter = filter.pfb_arb_resampler_ccf(self._samples_per_symbol,
self.rrc_taps)
@@ -135,8 +142,10 @@ class generic_mod(gr.hier_block2):
# Remove the filter transient at the beginning of the transmission
if truncate:
fsps = float(self._samples_per_symbol)
- len_filt_delay = int((ntaps_per_filt*fsps*fsps-fsps)/2.0) # Length of delay through rrc filter
- self.skiphead = blocks.skiphead(gr.sizeof_gr_complex*1, len_filt_delay)
+ # Length of delay through rrc filter
+ len_filt_delay = int((ntaps_per_filt * fsps * fsps - fsps) / 2.0)
+ self.skiphead = blocks.skiphead(
+ gr.sizeof_gr_complex * 1, len_filt_delay)
# Connect
self._blocks = [self, self.bytes2chunks]
@@ -145,7 +154,7 @@ class generic_mod(gr.hier_block2):
if differential:
self._blocks.append(self.diffenc)
self._blocks += [self.chunks2symbols, self.rrc_filter]
-
+
if truncate:
self._blocks.append(self.skiphead)
self._blocks.append(self)
@@ -157,7 +166,6 @@ class generic_mod(gr.hier_block2):
if log:
self._setup_logging()
-
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -176,8 +184,7 @@ class generic_mod(gr.hier_block2):
Given command line options, create dictionary suitable for passing to __init__
"""
return extract_kwargs_from_options_for_class(cls, options)
- extract_kwargs_from_options=classmethod(extract_kwargs_from_options)
-
+ extract_kwargs_from_options = classmethod(extract_kwargs_from_options)
def _print_verbage(self):
print("\nModulator:")
@@ -239,7 +246,8 @@ class generic_demod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "generic_demod",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
self._constellation = constellation
@@ -248,19 +256,20 @@ class generic_demod(gr.hier_block2):
self._phase_bw = phase_bw
self._freq_bw = freq_bw
self._timing_bw = timing_bw
- self._timing_max_dev= _def_timing_max_dev
+ self._timing_max_dev = _def_timing_max_dev
self._differential = differential
if self._samples_per_symbol < 2:
- raise TypeError("sps must be >= 2, is %d" % self._samples_per_symbol)
+ raise TypeError("sps must be >= 2, is %d" %
+ self._samples_per_symbol)
# Only apply a predifferential coding if the constellation also supports it.
self.pre_diff_code = pre_diff_code and self._constellation.apply_pre_diff_code()
- arity = pow(2,self.bits_per_symbol())
+ arity = pow(2, self.bits_per_symbol())
nfilts = 32
- ntaps = 11 * int(self._samples_per_symbol*nfilts)
+ ntaps = 11 * int(self._samples_per_symbol * nfilts)
# Automatic gain control
self.agc = analog.agc2_cc(0.6e-1, 1e-3, 1, 1)
@@ -271,11 +280,11 @@ class generic_demod(gr.hier_block2):
fll_ntaps, self._freq_bw)
# symbol timing recovery with RRC data filter
- taps = filter.firdes.root_raised_cosine(nfilts, nfilts*self._samples_per_symbol,
+ taps = filter.firdes.root_raised_cosine(nfilts, nfilts * self._samples_per_symbol,
1.0, self._excess_bw, ntaps)
self.time_recov = digital.pfb_clock_sync_ccf(self._samples_per_symbol,
self._timing_bw, taps,
- nfilts, nfilts//2, self._timing_max_dev)
+ nfilts, nfilts // 2, self._timing_max_dev)
fmin = -0.25
fmax = 0.25
@@ -318,7 +327,7 @@ class generic_demod(gr.hier_block2):
def _print_verbage(self):
print("\nDemodulator:")
- print("bits per symbol: %d" % self.bits_per_symbol())
+ print("bits per symbol: %d" % self.bits_per_symbol())
print("RRC roll-off factor: %.2f" % self._excess_bw)
print("FLL bandwidth: %.2e" % self._freq_bw)
print("Timing bandwidth: %.2e" % self._timing_bw)
@@ -381,7 +390,8 @@ class generic_demod(gr.hier_block2):
Given command line options, create dictionary suitable for passing to __init__
"""
return extract_kwargs_from_options_for_class(cls, options)
- extract_kwargs_from_options=classmethod(extract_kwargs_from_options)
+ extract_kwargs_from_options = classmethod(extract_kwargs_from_options)
+
shared_demod_args = """ samples_per_symbol: samples per baud >= 2 (float)
excess_bw: Root-raised cosine filter excess bandwidth (float)
diff --git a/gr-digital/python/digital/gfsk.py b/gr-digital/python/digital/gfsk.py
index 2cc8f69c97..63583cc1e0 100644
--- a/gr-digital/python/digital/gfsk.py
+++ b/gr-digital/python/digital/gfsk.py
@@ -70,8 +70,9 @@ class gfsk_mod(gr.hier_block2):
do_unpack=_def_do_unpack):
gr.hier_block2.__init__(self, "gfsk_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
samples_per_symbol = int(samples_per_symbol)
self._samples_per_symbol = samples_per_symbol
@@ -79,12 +80,11 @@ class gfsk_mod(gr.hier_block2):
self._differential = False
if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2:
- raise TypeError("samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
+ raise TypeError(
+ "samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
ntaps = 4 * samples_per_symbol # up to 3 bits in filter at once
- #sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
-
-
+ # sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
# Turn it into NRZ data.
#self.nrz = digital.bytes_to_syms()
@@ -93,15 +93,17 @@ class gfsk_mod(gr.hier_block2):
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
self.gaussian_taps = filter.firdes.gaussian(
- 1.0, # gain
- samples_per_symbol, # symbol_rate
- bt, # bandwidth * symbol time
- ntaps # number of taps
- )
+ 1.0, # gain
+ samples_per_symbol, # symbol_rate
+ bt, # bandwidth * symbol time
+ ntaps # number of taps
+ )
self.sqwave = (1,) * samples_per_symbol # rectangular window
- self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.taps = numpy.convolve(numpy.array(
+ self.gaussian_taps), numpy.array(self.sqwave))
+ self.gaussian_filter = filter.interp_fir_filter_fff(
+ samples_per_symbol, self.taps)
# FM modulation
self.fmmod = analog.frequency_modulator_fc(sensitivity)
@@ -118,22 +120,24 @@ class gfsk_mod(gr.hier_block2):
# Connect & Initialize base class
if do_unpack:
self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
- self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
+ self.connect(self, self.unpack, self.nrz,
+ self.gaussian_filter, self.fmmod, self.amp, self)
else:
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self.amp, self)
+ self.connect(self, self.nrz, self.gaussian_filter,
+ self.fmmod, self.amp, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@staticmethod
- def bits_per_symbol(self=None): # staticmethod that's also callable on an instance
+ # staticmethod that's also callable on an instance
+ def bits_per_symbol(self=None):
return 1
def _print_verbage(self):
print("bits per symbol = %d" % self.bits_per_symbol())
print("Gaussian filter bt = %.2f" % self._bt)
-
def _setup_logging(self):
print("Modulation logging turned on.")
self.connect(self.nrz,
@@ -197,7 +201,8 @@ class gfsk_demod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "gfsk_demod",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
self._samples_per_symbol = samples_per_symbol
@@ -207,17 +212,20 @@ class gfsk_demod(gr.hier_block2):
self._differential = False
if samples_per_symbol < 2:
- raise TypeError("samples_per_symbol >= 2, is %f" % samples_per_symbol)
+ raise TypeError("samples_per_symbol >= 2, is %f" %
+ samples_per_symbol)
- self._omega = samples_per_symbol*(1+self._freq_error)
+ self._omega = samples_per_symbol * (1 + self._freq_error)
if not self._gain_mu:
self._gain_mu = 0.175
- self._gain_omega = .25 * self._gain_mu * self._gain_mu # critically damped
+ self._gain_omega = .25 * self._gain_mu * \
+ self._gain_mu # critically damped
self._damping = 1.0
- self._loop_bw = -ln((self._gain_mu + self._gain_omega)/(-2.0) + 1) # critically damped
+ # critically damped
+ self._loop_bw = -ln((self._gain_mu + self._gain_omega) / (-2.0) + 1)
self._max_dev = self._omega_relative_limit * self._samples_per_symbol
# Demodulate FM
@@ -227,16 +235,16 @@ class gfsk_demod(gr.hier_block2):
# the clock recovery block tracks the symbol clock and resamples as needed.
# the output of the block is a stream of soft symbols (float)
self.clock_recovery = self.digital_symbol_sync_xx_0 = digital.symbol_sync_ff(digital.TED_MUELLER_AND_MULLER,
- self._omega,
- self._loop_bw,
- self._damping,
- 1.0, # Expected TED gain
- self._max_dev,
- 1, # Output sps
- digital.constellation_bpsk().base(),
- digital.IR_MMSE_8TAP,
- 128,
- [])
+ self._omega,
+ self._loop_bw,
+ self._damping,
+ 1.0, # Expected TED gain
+ self._max_dev,
+ 1, # Output sps
+ digital.constellation_bpsk().base(),
+ digital.IR_MMSE_8TAP,
+ 128,
+ [])
# slice the floats at 0, outputting 1 bit (the LSB of the output byte) per sample
self.slicer = digital.binary_slicer_fb()
@@ -248,7 +256,8 @@ class gfsk_demod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.fmdemod, self.clock_recovery, self.slicer, self)
+ self.connect(self, self.fmdemod,
+ self.clock_recovery, self.slicer, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -262,18 +271,18 @@ class gfsk_demod(gr.hier_block2):
print("Symbol Sync M&M omega = %f" % self._omega)
print("Symbol Sync M&M gain mu = %f" % self._gain_mu)
print("M&M clock recovery mu (Unused) = %f" % self._mu)
- print("Symbol Sync M&M omega rel. limit = %f" % self._omega_relative_limit)
+ print("Symbol Sync M&M omega rel. limit = %f" %
+ self._omega_relative_limit)
print("frequency error = %f" % self._freq_error)
-
def _setup_logging(self):
print("Demodulation logging turned on.")
self.connect(self.fmdemod,
- blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
+ blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
self.connect(self.clock_recovery,
- blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
+ blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
self.connect(self.slicer,
- blocks.file_sink(gr.sizeof_char, "slicer.dat"))
+ blocks.file_sink(gr.sizeof_char, "slicer.dat"))
@staticmethod
def add_options(parser):
diff --git a/gr-digital/python/digital/gmsk.py b/gr-digital/python/digital/gmsk.py
index 68127119e6..106e588782 100644
--- a/gr-digital/python/digital/gmsk.py
+++ b/gr-digital/python/digital/gmsk.py
@@ -68,8 +68,9 @@ class gmsk_mod(gr.hier_block2):
do_unpack=_def_do_unpack):
gr.hier_block2.__init__(self, "gmsk_mod",
- gr.io_signature(1, 1, gr.sizeof_char), # Input signature
- gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex)) # Output signature
samples_per_symbol = int(samples_per_symbol)
self._samples_per_symbol = samples_per_symbol
@@ -77,10 +78,13 @@ class gmsk_mod(gr.hier_block2):
self._differential = False
if not isinstance(samples_per_symbol, int) or samples_per_symbol < 2:
- raise TypeError("samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
+ raise TypeError(
+ "samples_per_symbol must be an integer >= 2, is %r" % (samples_per_symbol,))
- ntaps = 4 * samples_per_symbol # up to 3 bits in filter at once
- sensitivity = (pi / 2) / samples_per_symbol # phase change per bit = pi / 2
+ # up to 3 bits in filter at once
+ ntaps = 4 * samples_per_symbol
+ # phase change per bit = pi / 2
+ sensitivity = (pi / 2) / samples_per_symbol
# Turn it into NRZ data.
#self.nrz = digital.bytes_to_syms()
@@ -89,15 +93,17 @@ class gmsk_mod(gr.hier_block2):
# Form Gaussian filter
# Generate Gaussian response (Needs to be convolved with window below).
self.gaussian_taps = filter.firdes.gaussian(
- 1, # gain
- samples_per_symbol, # symbol_rate
- bt, # bandwidth * symbol time
- ntaps # number of taps
- )
+ 1, # gain
+ samples_per_symbol, # symbol_rate
+ bt, # bandwidth * symbol time
+ ntaps # number of taps
+ )
self.sqwave = (1,) * samples_per_symbol # rectangular window
- self.taps = numpy.convolve(numpy.array(self.gaussian_taps),numpy.array(self.sqwave))
- self.gaussian_filter = filter.interp_fir_filter_fff(samples_per_symbol, self.taps)
+ self.taps = numpy.convolve(numpy.array(
+ self.gaussian_taps), numpy.array(self.sqwave))
+ self.gaussian_filter = filter.interp_fir_filter_fff(
+ samples_per_symbol, self.taps)
# FM modulation
self.fmmod = analog.frequency_modulator_fc(sensitivity)
@@ -111,22 +117,24 @@ class gmsk_mod(gr.hier_block2):
# Connect & Initialize base class
if do_unpack:
self.unpack = blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
- self.connect(self, self.unpack, self.nrz, self.gaussian_filter, self.fmmod, self)
+ self.connect(self, self.unpack, self.nrz,
+ self.gaussian_filter, self.fmmod, self)
else:
- self.connect(self, self.nrz, self.gaussian_filter, self.fmmod, self)
+ self.connect(self, self.nrz, self.gaussian_filter,
+ self.fmmod, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@staticmethod
- def bits_per_symbol(self=None): # staticmethod that's also callable on an instance
+ # staticmethod that's also callable on an instance
+ def bits_per_symbol(self=None):
return 1
def _print_verbage(self):
print("bits per symbol = %d" % self.bits_per_symbol())
print("Gaussian filter bt = %.2f" % self._bt)
-
def _setup_logging(self):
print("Modulation logging turned on.")
self.connect(self.nrz,
@@ -185,7 +193,8 @@ class gmsk_demod(gr.hier_block2):
log=_def_log):
gr.hier_block2.__init__(self, "gmsk_demod",
- gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input signature
+ # Input signature
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
gr.io_signature(1, 1, gr.sizeof_char)) # Output signature
self._samples_per_symbol = samples_per_symbol
@@ -195,17 +204,20 @@ class gmsk_demod(gr.hier_block2):
self._differential = False
if samples_per_symbol < 2:
- raise TypeError("samples_per_symbol >= 2, is %f" % samples_per_symbol)
+ raise TypeError("samples_per_symbol >= 2, is %f" %
+ samples_per_symbol)
- self._omega = samples_per_symbol*(1+self._freq_error)
+ self._omega = samples_per_symbol * (1 + self._freq_error)
if not self._gain_mu:
self._gain_mu = 0.175
- self._gain_omega = .25 * self._gain_mu * self._gain_mu # critically damped
+ self._gain_omega = .25 * self._gain_mu * \
+ self._gain_mu # critically damped
self._damping = 1.0
- self._loop_bw = -ln((self._gain_mu + self._gain_omega)/(-2.0) + 1) # critically damped
+ # critically damped
+ self._loop_bw = -ln((self._gain_mu + self._gain_omega) / (-2.0) + 1)
self._max_dev = self._omega_relative_limit * self._samples_per_symbol
# Demodulate FM
@@ -215,16 +227,16 @@ class gmsk_demod(gr.hier_block2):
# the clock recovery block tracks the symbol clock and resamples as needed.
# the output of the block is a stream of soft symbols (float)
self.clock_recovery = self.digital_symbol_sync_xx_0 = digital.symbol_sync_ff(digital.TED_MUELLER_AND_MULLER,
- self._omega,
- self._loop_bw,
- self._damping,
- 1.0, # Expected TED gain
- self._max_dev,
- 1, # Output sps
- digital.constellation_bpsk().base(),
- digital.IR_MMSE_8TAP,
- 128,
- [])
+ self._omega,
+ self._loop_bw,
+ self._damping,
+ 1.0, # Expected TED gain
+ self._max_dev,
+ 1, # Output sps
+ digital.constellation_bpsk().base(),
+ digital.IR_MMSE_8TAP,
+ 128,
+ [])
# slice the floats at 0, outputting 1 bit (the LSB of the output byte) per sample
self.slicer = digital.binary_slicer_fb()
@@ -236,7 +248,8 @@ class gmsk_demod(gr.hier_block2):
self._setup_logging()
# Connect & Initialize base class
- self.connect(self, self.fmdemod, self.clock_recovery, self.slicer, self)
+ self.connect(self, self.fmdemod,
+ self.clock_recovery, self.slicer, self)
def samples_per_symbol(self):
return self._samples_per_symbol
@@ -250,18 +263,18 @@ class gmsk_demod(gr.hier_block2):
print("Symbol Sync M&M omega = %f" % self._omega)
print("Symbol Sync M&M gain mu = %f" % self._gain_mu)
print("M&M clock recovery mu (Unused) = %f" % self._mu)
- print("Symbol Sync M&M omega rel. limit = %f" % self._omega_relative_limit)
+ print("Symbol Sync M&M omega rel. limit = %f" %
+ self._omega_relative_limit)
print("frequency error = %f" % self._freq_error)
-
def _setup_logging(self):
print("Demodulation logging turned on.")
self.connect(self.fmdemod,
- blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
+ blocks.file_sink(gr.sizeof_float, "fmdemod.dat"))
self.connect(self.clock_recovery,
- blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
+ blocks.file_sink(gr.sizeof_float, "clock_recovery.dat"))
self.connect(self.slicer,
- blocks.file_sink(gr.sizeof_char, "slicer.dat"))
+ blocks.file_sink(gr.sizeof_char, "slicer.dat"))
@staticmethod
def add_options(parser):
diff --git a/gr-digital/python/digital/modulation_utils.py b/gr-digital/python/digital/modulation_utils.py
index 59809ae1d8..1d57e1c9ba 100644
--- a/gr-digital/python/digital/modulation_utils.py
+++ b/gr-digital/python/digital/modulation_utils.py
@@ -1,8 +1,8 @@
#
# Copyright 2010 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
@@ -17,9 +17,11 @@ import inspect
# Type 1 modulators accept a stream of bytes on their input and produce complex baseband output
_type_1_modulators = {}
+
def type_1_mods():
return _type_1_modulators
+
def add_type_1_mod(name, mod_class):
_type_1_modulators[name] = mod_class
@@ -29,18 +31,23 @@ def add_type_1_mod(name, mod_class):
# to resolve phase or polarity ambiguities.
_type_1_demodulators = {}
+
def type_1_demods():
return _type_1_demodulators
+
def add_type_1_demod(name, demod_class):
_type_1_demodulators[name] = demod_class
+
# Also record the constellation making functions of the modulations
_type_1_constellations = {}
+
def type_1_constellations():
return _type_1_constellations
+
def add_type_1_constellation(name, constellation):
_type_1_constellations[name] = constellation
@@ -67,7 +74,7 @@ def extract_kwargs_from_options(function, excluded_args, options):
excluded_args: function arguments that are NOT to be added to the dictionary (sequence of strings)
options: result of command argument parsing (optparse.Values)
"""
-
+
# Try this in C++ ;)
spec = inspect.getfullargspec(function)
d = {}
@@ -77,6 +84,7 @@ def extract_kwargs_from_options(function, excluded_args, options):
d[kw] = getattr(options, kw)
return d
+
def extract_kwargs_from_options_for_class(cls, options):
"""
Given command line options, create dictionary suitable for passing to __init__
diff --git a/gr-digital/python/digital/ofdm_txrx.py b/gr-digital/python/digital/ofdm_txrx.py
index 9d083c5fe2..60109ceaaa 100644
--- a/gr-digital/python/digital/ofdm_txrx.py
+++ b/gr-digital/python/digital/ofdm_txrx.py
@@ -31,15 +31,20 @@ _def_frame_length_tag_key = "frame_length"
_def_packet_length_tag_key = "packet_length"
_def_packet_num_tag_key = "packet_num"
# Data and pilot carriers are same as in 802.11a
-_def_occupied_carriers = (list(range(-26, -21)) + list(range(-20, -7)) + list(range(-6, 0)) + list(range(1, 7)) + list(range(8, 21)) + list(range(22, 27)),)
-_def_pilot_carriers=((-21, -7, 7, 21,),)
+_def_occupied_carriers = (list(range(-26, -21)) + list(range(-20, -7)) + list(
+ range(-6, 0)) + list(range(1, 7)) + list(range(8, 21)) + list(range(22, 27)),)
+_def_pilot_carriers = ((-21, -7, 7, 21,),)
_pilot_sym_scramble_seq = (
- 1,1,1,1, -1,-1,-1,1, -1,-1,-1,-1, 1,1,-1,1, -1,-1,1,1, -1,1,1,-1, 1,1,1,1, 1,1,-1,1,
- 1,1,-1,1, 1,-1,-1,1, 1,1,-1,1, -1,-1,-1,1, -1,1,-1,-1, 1,-1,-1,1, 1,1,1,1, -1,-1,1,1,
- -1,-1,1,-1, 1,-1,1,1, -1,-1,-1,1, 1,-1,-1,-1, -1,1,-1,-1, 1,-1,1,1, 1,1,-1,1, -1,1,-1,1,
- -1,-1,-1,-1, -1,1,-1,1, 1,-1,1,-1, 1,1,1,-1, -1,1,-1,-1, -1,1,1,1, -1,-1,-1,-1, -1,-1,-1
+ 1, 1, 1, 1, -1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, 1, -
+ 1, -1, 1, 1, -1, 1, 1, -1, 1, 1, 1, 1, 1, 1, -1, 1,
+ 1, 1, -1, 1, 1, -1, -1, 1, 1, 1, -1, 1, -1, -1, -1, 1, -
+ 1, 1, -1, -1, 1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1,
+ -1, -1, 1, -1, 1, -1, 1, 1, -1, -1, -1, 1, 1, -1, -1, -
+ 1, -1, 1, -1, -1, 1, -1, 1, 1, 1, 1, -1, 1, -1, 1, -1, 1,
+ -1, -1, -1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, -1, -
+ 1, 1, -1, -1, -1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1
)
-_def_pilot_symbols= tuple([(x, x, x, -x) for x in _pilot_sym_scramble_seq])
+_def_pilot_symbols = tuple([(x, x, x, -x) for x in _pilot_sym_scramble_seq])
_seq_seed = 42
@@ -52,6 +57,7 @@ def _get_active_carriers(fft_len, occupied_carriers, pilot_carriers):
active_carriers.append(carrier)
return active_carriers
+
def _make_sync_word1(fft_len, occupied_carriers, pilot_carriers):
""" Creates a random sync sequence for fine frequency offset and timing
estimation. This is the first of typically two sync preamble symbols
@@ -64,31 +70,37 @@ def _make_sync_word1(fft_len, occupied_carriers, pilot_carriers):
Carrier 0 (DC carrier) is always zero. If used, carrier 1 is non-zero.
This means the sync algorithm has to check on odd carriers!
"""
- active_carriers = _get_active_carriers(fft_len, occupied_carriers, pilot_carriers)
+ active_carriers = _get_active_carriers(
+ fft_len, occupied_carriers, pilot_carriers)
numpy.random.seed(_seq_seed)
bpsk = {0: numpy.sqrt(2), 1: -numpy.sqrt(2)}
- sw1 = [bpsk[numpy.random.randint(2)] if x in active_carriers and x % 2 else 0 for x in range(fft_len)]
+ sw1 = [bpsk[numpy.random.randint(
+ 2)] if x in active_carriers and x % 2 else 0 for x in range(fft_len)]
return numpy.fft.fftshift(sw1)
+
def _make_sync_word2(fft_len, occupied_carriers, pilot_carriers):
""" Creates a random sync sequence for coarse frequency offset and channel
estimation. This is the second of typically two sync preamble symbols
for the Schmidl & Cox sync algorithm.
Symbols are always BPSK symbols.
"""
- active_carriers = _get_active_carriers(fft_len, occupied_carriers, pilot_carriers)
+ active_carriers = _get_active_carriers(
+ fft_len, occupied_carriers, pilot_carriers)
numpy.random.seed(_seq_seed)
bpsk = {0: 1, 1: -1}
- sw2 = [bpsk[numpy.random.randint(2)] if x in active_carriers else 0 for x in range(fft_len)]
+ sw2 = [bpsk[numpy.random.randint(
+ 2)] if x in active_carriers else 0 for x in range(fft_len)]
sw2[0] = 0j
return numpy.fft.fftshift(sw2)
+
def _get_constellation(bps):
""" Returns a modulator block for a given number of bits per symbol """
constellation = {
- 1: digital.constellation_bpsk(),
- 2: digital.constellation_qpsk(),
- 3: digital.constellation_8psk()
+ 1: digital.constellation_bpsk(),
+ 2: digital.constellation_qpsk(),
+ 3: digital.constellation_8psk()
}
try:
return constellation[bps]
@@ -96,6 +108,7 @@ def _get_constellation(bps):
print('Modulation not supported.')
exit(1)
+
class ofdm_tx(gr.hier_block2):
"""Hierarchical block for OFDM modulation.
@@ -122,6 +135,7 @@ class ofdm_tx(gr.hier_block2):
scramble_bits: Activates the scramblers (set this to True unless debugging)
"""
+
def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len,
packet_length_tag_key=_def_packet_length_tag_key,
occupied_carriers=_def_occupied_carriers,
@@ -136,75 +150,82 @@ class ofdm_tx(gr.hier_block2):
scramble_bits=False
):
gr.hier_block2.__init__(self, "ofdm_tx",
- gr.io_signature(1, 1, gr.sizeof_char),
- gr.io_signature(1, 1, gr.sizeof_gr_complex))
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_gr_complex))
### Param init / sanity check ########################################
- self.fft_len = fft_len
- self.cp_len = cp_len
+ self.fft_len = fft_len
+ self.cp_len = cp_len
self.packet_length_tag_key = packet_length_tag_key
self.occupied_carriers = occupied_carriers
- self.pilot_carriers = pilot_carriers
- self.pilot_symbols = pilot_symbols
- self.bps_header = bps_header
- self.bps_payload = bps_payload
+ self.pilot_carriers = pilot_carriers
+ self.pilot_symbols = pilot_symbols
+ self.bps_header = bps_header
+ self.bps_payload = bps_payload
self.sync_word1 = sync_word1
if sync_word1 is None:
- self.sync_word1 = _make_sync_word1(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word1 = _make_sync_word1(
+ fft_len, occupied_carriers, pilot_carriers)
else:
if len(sync_word1) != self.fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
- self.sync_words = [self.sync_word1,]
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
+ self.sync_words = [self.sync_word1, ]
if sync_word2 is None:
- self.sync_word2 = _make_sync_word2(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word2 = _make_sync_word2(
+ fft_len, occupied_carriers, pilot_carriers)
else:
self.sync_word2 = sync_word2
if len(self.sync_word2):
if len(self.sync_word2) != fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
self.sync_word2 = list(self.sync_word2)
self.sync_words.append(self.sync_word2)
if scramble_bits:
self.scramble_seed = 0x7f
else:
- self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
+ self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
### Header modulation ################################################
crc = digital.crc32_bb(False, self.packet_length_tag_key)
- header_constellation = _get_constellation(bps_header)
- header_mod = digital.chunks_to_symbols_bc(header_constellation.points())
+ header_constellation = _get_constellation(bps_header)
+ header_mod = digital.chunks_to_symbols_bc(
+ header_constellation.points())
formatter_object = digital.packet_header_ofdm(
occupied_carriers=occupied_carriers, n_syms=1,
bits_per_header_sym=self.bps_header,
bits_per_payload_sym=self.bps_payload,
scramble_header=scramble_bits
)
- header_gen = digital.packet_headergenerator_bb(formatter_object.base(), self.packet_length_tag_key)
+ header_gen = digital.packet_headergenerator_bb(
+ formatter_object.base(), self.packet_length_tag_key)
header_payload_mux = blocks.tagged_stream_mux(
- itemsize=gr.sizeof_gr_complex*1,
- lengthtagname=self.packet_length_tag_key,
- tag_preserve_head_pos=1 # Head tags on the payload stream stay on the head
+ itemsize=gr.sizeof_gr_complex * 1,
+ lengthtagname=self.packet_length_tag_key,
+ tag_preserve_head_pos=1 # Head tags on the payload stream stay on the head
)
self.connect(
- self,
- crc,
- header_gen,
- header_mod,
- (header_payload_mux, 0)
+ self,
+ crc,
+ header_gen,
+ header_mod,
+ (header_payload_mux, 0)
)
if debug_log:
self.connect(header_gen, blocks.file_sink(1, 'tx-hdr.dat'))
### Payload modulation ###############################################
payload_constellation = _get_constellation(bps_payload)
- payload_mod = digital.chunks_to_symbols_bc(payload_constellation.points())
+ payload_mod = digital.chunks_to_symbols_bc(
+ payload_constellation.points())
payload_scrambler = digital.additive_scrambler_bb(
0x8a,
self.scramble_seed,
7,
- 0, # Don't reset after fixed length (let the reset tag do that)
- bits_per_byte=8, # This is before unpacking
+ 0, # Don't reset after fixed length (let the reset tag do that)
+ bits_per_byte=8, # This is before unpacking
reset_tag_key=self.packet_length_tag_key
)
payload_unpack = blocks.repack_bits_bb(
- 8, # Unpack 8 bits per byte
+ 8, # Unpack 8 bits per byte
bps_payload,
self.packet_length_tag_key
)
@@ -225,21 +246,24 @@ class ofdm_tx(gr.hier_block2):
len_tag_key=self.packet_length_tag_key
)
ffter = fft.fft_vcc(
- self.fft_len,
- False, # Inverse FFT
- (), # No window
- True # Shift
+ self.fft_len,
+ False, # Inverse FFT
+ (), # No window
+ True # Shift
)
cyclic_prefixer = digital.ofdm_cyclic_prefixer(
self.fft_len,
- self.fft_len+self.cp_len,
+ self.fft_len + self.cp_len,
rolloff,
self.packet_length_tag_key
)
- self.connect(header_payload_mux, allocator, ffter, cyclic_prefixer, self)
+ self.connect(header_payload_mux, allocator,
+ ffter, cyclic_prefixer, self)
if debug_log:
- self.connect(allocator, blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'tx-post-allocator.dat'))
- self.connect(cyclic_prefixer, blocks.file_sink(gr.sizeof_gr_complex, 'tx-signal.dat'))
+ self.connect(allocator, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'tx-post-allocator.dat'))
+ self.connect(cyclic_prefixer, blocks.file_sink(
+ gr.sizeof_gr_complex, 'tx-signal.dat'))
class ofdm_rx(gr.hier_block2):
@@ -265,6 +289,7 @@ class ofdm_rx(gr.hier_block2):
| entirely. Also used for coarse frequency offset and
| channel estimation.
"""
+
def __init__(self, fft_len=_def_fft_len, cp_len=_def_cp_len,
frame_length_tag_key=_def_frame_length_tag_key,
packet_length_tag_key=_def_packet_length_tag_key,
@@ -280,60 +305,69 @@ class ofdm_rx(gr.hier_block2):
scramble_bits=False
):
gr.hier_block2.__init__(self, "ofdm_rx",
- gr.io_signature(1, 1, gr.sizeof_gr_complex),
- gr.io_signature(1, 1, gr.sizeof_char))
+ gr.io_signature(1, 1, gr.sizeof_gr_complex),
+ gr.io_signature(1, 1, gr.sizeof_char))
### Param init / sanity check ########################################
- self.fft_len = fft_len
- self.cp_len = cp_len
- self.frame_length_tag_key = frame_length_tag_key
- self.packet_length_tag_key = packet_length_tag_key
+ self.fft_len = fft_len
+ self.cp_len = cp_len
+ self.frame_length_tag_key = frame_length_tag_key
+ self.packet_length_tag_key = packet_length_tag_key
self.occupied_carriers = occupied_carriers
- self.bps_header = bps_header
- self.bps_payload = bps_payload
+ self.bps_header = bps_header
+ self.bps_payload = bps_payload
n_sync_words = 1
if sync_word1 is None:
- self.sync_word1 = _make_sync_word1(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word1 = _make_sync_word1(
+ fft_len, occupied_carriers, pilot_carriers)
else:
if len(sync_word1) != self.fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
self.sync_word1 = sync_word1
self.sync_word2 = ()
if sync_word2 is None:
- self.sync_word2 = _make_sync_word2(fft_len, occupied_carriers, pilot_carriers)
+ self.sync_word2 = _make_sync_word2(
+ fft_len, occupied_carriers, pilot_carriers)
n_sync_words = 2
elif len(sync_word2):
if len(sync_word2) != fft_len:
- raise ValueError("Length of sync sequence(s) must be FFT length.")
+ raise ValueError(
+ "Length of sync sequence(s) must be FFT length.")
self.sync_word2 = sync_word2
n_sync_words = 2
if scramble_bits:
self.scramble_seed = 0x7f
else:
- self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
+ self.scramble_seed = 0x00 # We deactivate the scrambler by init'ing it with zeros
### Sync ############################################################
sync_detect = digital.ofdm_sync_sc_cfb(fft_len, cp_len)
- delay = blocks.delay(gr.sizeof_gr_complex, fft_len+cp_len)
+ delay = blocks.delay(gr.sizeof_gr_complex, fft_len + cp_len)
oscillator = analog.frequency_modulator_fc(-2.0 / fft_len)
mixer = blocks.multiply_cc()
hpd = digital.header_payload_demux(
- n_sync_words+1, # Number of OFDM symbols before payload (sync + 1 sym header)
+ # Number of OFDM symbols before payload (sync + 1 sym header)
+ n_sync_words + 1,
fft_len, cp_len, # FFT length, guard interval
- frame_length_tag_key, # Frame length tag key
+ frame_length_tag_key, # Frame length tag key
"", # We're not using trigger tags
- True # One output item is one OFDM symbol (False would output complex scalars)
+ # One output item is one OFDM symbol (False would output complex scalars)
+ True
)
self.connect(self, sync_detect)
self.connect(self, delay, (mixer, 0), (hpd, 0))
self.connect((sync_detect, 0), oscillator, (mixer, 1))
self.connect((sync_detect, 1), (hpd, 1))
if debug_log:
- self.connect((sync_detect, 0), blocks.file_sink(gr.sizeof_float, 'freq-offset.dat'))
- self.connect((sync_detect, 1), blocks.file_sink(gr.sizeof_char, 'sync-detect.dat'))
+ self.connect((sync_detect, 0), blocks.file_sink(
+ gr.sizeof_float, 'freq-offset.dat'))
+ self.connect((sync_detect, 1), blocks.file_sink(
+ gr.sizeof_char, 'sync-detect.dat'))
### Header demodulation ##############################################
- header_fft = fft.fft_vcc(self.fft_len, True, (), True)
- chanest = digital.ofdm_chanest_vcvc(self.sync_word1, self.sync_word2, 1)
+ header_fft = fft.fft_vcc(self.fft_len, True, (), True)
+ chanest = digital.ofdm_chanest_vcvc(
+ self.sync_word1, self.sync_word2, 1)
header_constellation = _get_constellation(bps_header)
- header_equalizer = digital.ofdm_equalizer_simpledfe(
+ header_equalizer = digital.ofdm_equalizer_simpledfe(
fft_len,
header_constellation.base(),
occupied_carriers,
@@ -342,93 +376,108 @@ class ofdm_rx(gr.hier_block2):
symbols_skipped=0,
)
header_eq = digital.ofdm_frame_equalizer_vcvc(
- header_equalizer.base(),
- cp_len,
- self.frame_length_tag_key,
- True,
- 1 # Header is 1 symbol long
+ header_equalizer.base(),
+ cp_len,
+ self.frame_length_tag_key,
+ True,
+ 1 # Header is 1 symbol long
)
header_serializer = digital.ofdm_serializer_vcc(
- fft_len, occupied_carriers,
- self.frame_length_tag_key
+ fft_len, occupied_carriers,
+ self.frame_length_tag_key
)
- header_demod = digital.constellation_decoder_cb(header_constellation.base())
+ header_demod = digital.constellation_decoder_cb(
+ header_constellation.base())
header_formatter = digital.packet_header_ofdm(
- occupied_carriers, 1,
- packet_length_tag_key,
- frame_length_tag_key,
- packet_num_tag_key,
- bps_header,
- bps_payload,
- scramble_header=scramble_bits
+ occupied_carriers, 1,
+ packet_length_tag_key,
+ frame_length_tag_key,
+ packet_num_tag_key,
+ bps_header,
+ bps_payload,
+ scramble_header=scramble_bits
)
- header_parser = digital.packet_headerparser_b(header_formatter.formatter())
+ header_parser = digital.packet_headerparser_b(
+ header_formatter.formatter())
self.connect(
- (hpd, 0),
- header_fft,
- chanest,
- header_eq,
- header_serializer,
- header_demod,
- header_parser
+ (hpd, 0),
+ header_fft,
+ chanest,
+ header_eq,
+ header_serializer,
+ header_demod,
+ header_parser
)
self.msg_connect(header_parser, "header_data", hpd, "header_data")
if debug_log:
- self.connect((chanest, 1), blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'channel-estimate.dat'))
- self.connect((chanest, 0), blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest.dat'))
- self.connect((chanest, 0), blocks.tag_debug(gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest'))
- self.connect(header_eq, blocks.file_sink(gr.sizeof_gr_complex * fft_len, 'post-hdr-eq.dat'))
- self.connect(header_serializer, blocks.file_sink(gr.sizeof_gr_complex, 'post-hdr-serializer.dat'))
+ self.connect((chanest, 1), blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'channel-estimate.dat'))
+ self.connect((chanest, 0), blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest.dat'))
+ self.connect((chanest, 0), blocks.tag_debug(
+ gr.sizeof_gr_complex * fft_len, 'post-hdr-chanest'))
+ self.connect(header_eq, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-hdr-eq.dat'))
+ self.connect(header_serializer, blocks.file_sink(
+ gr.sizeof_gr_complex, 'post-hdr-serializer.dat'))
### Payload demod ####################################################
payload_fft = fft.fft_vcc(self.fft_len, True, (), True)
payload_constellation = _get_constellation(bps_payload)
payload_equalizer = digital.ofdm_equalizer_simpledfe(
- fft_len,
- payload_constellation.base(),
- occupied_carriers,
- pilot_carriers,
- pilot_symbols,
- symbols_skipped=1, # (that was already in the header)
- alpha=0.1
+ fft_len,
+ payload_constellation.base(),
+ occupied_carriers,
+ pilot_carriers,
+ pilot_symbols,
+ symbols_skipped=1, # (that was already in the header)
+ alpha=0.1
)
payload_eq = digital.ofdm_frame_equalizer_vcvc(
- payload_equalizer.base(),
- cp_len,
- self.frame_length_tag_key
+ payload_equalizer.base(),
+ cp_len,
+ self.frame_length_tag_key
)
payload_serializer = digital.ofdm_serializer_vcc(
- fft_len, occupied_carriers,
- self.frame_length_tag_key,
- self.packet_length_tag_key,
- 1 # Skip 1 symbol (that was already in the header)
+ fft_len, occupied_carriers,
+ self.frame_length_tag_key,
+ self.packet_length_tag_key,
+ 1 # Skip 1 symbol (that was already in the header)
)
- payload_demod = digital.constellation_decoder_cb(payload_constellation.base())
+ payload_demod = digital.constellation_decoder_cb(
+ payload_constellation.base())
self.payload_descrambler = digital.additive_scrambler_bb(
0x8a,
self.scramble_seed,
7,
- 0, # Don't reset after fixed length
- bits_per_byte=8, # This is after packing
+ 0, # Don't reset after fixed length
+ bits_per_byte=8, # This is after packing
reset_tag_key=self.packet_length_tag_key
)
- payload_pack = blocks.repack_bits_bb(bps_payload, 8, self.packet_length_tag_key, True)
+ payload_pack = blocks.repack_bits_bb(
+ bps_payload, 8, self.packet_length_tag_key, True)
self.crc = digital.crc32_bb(True, self.packet_length_tag_key)
self.connect(
- (hpd, 1),
- payload_fft,
- payload_eq,
- payload_serializer,
- payload_demod,
- payload_pack,
- self.payload_descrambler,
- self.crc,
- self
+ (hpd, 1),
+ payload_fft,
+ payload_eq,
+ payload_serializer,
+ payload_demod,
+ payload_pack,
+ self.payload_descrambler,
+ self.crc,
+ self
)
if debug_log:
- self.connect((hpd, 1), blocks.tag_debug(gr.sizeof_gr_complex*fft_len, 'post-hpd'))
- self.connect(payload_fft, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-fft.dat'))
- self.connect(payload_eq, blocks.file_sink(gr.sizeof_gr_complex*fft_len, 'post-payload-eq.dat'))
- self.connect(payload_serializer, blocks.file_sink(gr.sizeof_gr_complex, 'post-payload-serializer.dat'))
- self.connect(payload_demod, blocks.file_sink(1, 'post-payload-demod.dat'))
- self.connect(payload_pack, blocks.file_sink(1, 'post-payload-pack.dat'))
- self.connect(self.crc, blocks.file_sink(1, 'post-payload-crc.dat'))
+ self.connect((hpd, 1), blocks.tag_debug(
+ gr.sizeof_gr_complex * fft_len, 'post-hpd'))
+ self.connect(payload_fft, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-payload-fft.dat'))
+ self.connect(payload_eq, blocks.file_sink(
+ gr.sizeof_gr_complex * fft_len, 'post-payload-eq.dat'))
+ self.connect(payload_serializer, blocks.file_sink(
+ gr.sizeof_gr_complex, 'post-payload-serializer.dat'))
+ self.connect(payload_demod, blocks.file_sink(
+ 1, 'post-payload-demod.dat'))
+ self.connect(payload_pack, blocks.file_sink(
+ 1, 'post-payload-pack.dat'))
+ self.connect(self.crc, blocks.file_sink(1, 'post-payload-crc.dat'))
diff --git a/gr-digital/python/digital/psk.py b/gr-digital/python/digital/psk.py
index b04d66f75f..4866d54ede 100644
--- a/gr-digital/python/digital/psk.py
+++ b/gr-digital/python/digital/psk.py
@@ -28,6 +28,7 @@ _def_mod_code = mod_codes.GRAY_CODE
# Default use of differential encoding
_def_differential = True
+
def create_encodings(mod_code, arity, differential):
post_diff_code = None
if mod_code not in mod_codes.codes:
@@ -43,13 +44,15 @@ def create_encodings(mod_code, arity, differential):
pre_diff_code = []
post_diff_code = None
else:
- raise ValueError('That modulation code is not implemented for this constellation.')
+ raise ValueError(
+ 'That modulation code is not implemented for this constellation.')
return (pre_diff_code, post_diff_code)
# /////////////////////////////////////////////////////////////////////////////
# PSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
differential=_def_differential):
"""
@@ -57,8 +60,9 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
"""
k = log(m) / log(2.0)
if (k != int(k)):
- raise Exception('Number of constellation points must be a power of two.')
- points = [exp(2*pi*(0+1j)*i/m) for i in range(0,m)]
+ raise Exception(
+ 'Number of constellation points must be a power of two.')
+ points = [exp(2 * pi * (0 + 1j) * i / m) for i in range(0, m)]
pre_diff_code, post_diff_code = create_encodings(mod_code, m, differential)
if post_diff_code is not None:
inverse_post_diff_code = mod_codes.invert_code(post_diff_code)
@@ -70,6 +74,7 @@ def psk_constellation(m=_def_constellation_points, mod_code=_def_mod_code,
# PSK modulator
# /////////////////////////////////////////////////////////////////////////////
+
class psk_mod(generic_mod):
"""
Hierarchical block for RRC-filtered PSK modulation.
@@ -95,14 +100,17 @@ class psk_mod(generic_mod):
mod_code=_def_mod_code,
differential=_def_differential,
*args, **kwargs):
- constellation = psk_constellation(constellation_points, mod_code, differential)
- super(psk_mod, self).__init__(constellation, differential, *args, **kwargs)
+ constellation = psk_constellation(
+ constellation_points, mod_code, differential)
+ super(psk_mod, self).__init__(
+ constellation, differential, *args, **kwargs)
# /////////////////////////////////////////////////////////////////////////////
# PSK demodulator
#
# /////////////////////////////////////////////////////////////////////////////
+
class psk_demod(generic_demod):
"""
@@ -120,12 +128,16 @@ class psk_demod(generic_demod):
"""
# See generic_mod for additional arguments
__doc__ += shared_mod_args
+
def __init__(self, constellation_points=_def_constellation_points,
mod_code=_def_mod_code,
differential=_def_differential,
*args, **kwargs):
- constellation = psk_constellation(constellation_points, mod_code, differential)
- super(psk_demod, self).__init__(constellation, differential, *args, **kwargs)
+ constellation = psk_constellation(
+ constellation_points, mod_code, differential)
+ super(psk_demod, self).__init__(
+ constellation, differential, *args, **kwargs)
+
#
# Add these to the mod/demod registry
diff --git a/gr-digital/python/digital/psk_constellations.py b/gr-digital/python/digital/psk_constellations.py
index c3809e7ade..520eafebe2 100644
--- a/gr-digital/python/digital/psk_constellations.py
+++ b/gr-digital/python/digital/psk_constellations.py
@@ -52,6 +52,7 @@ rotations are based off of.
# BPSK Constellation Mappings
+
def psk_2_0x0():
'''
0 | 1
@@ -59,9 +60,12 @@ def psk_2_0x0():
const_points = [-1, 1]
symbols = [0, 1]
return (const_points, symbols)
+
+
psk_2 = psk_2_0x0 # Basic BPSK rotation
psk_2_0 = psk_2 # First ID for BPSK rotations
+
def psk_2_0x1():
'''
1 | 0
@@ -69,6 +73,8 @@ def psk_2_0x1():
const_points = [-1, 1]
symbols = [1, 0]
return (const_points, symbols)
+
+
psk_2_1 = psk_2_0x1
@@ -81,18 +87,23 @@ def sd_psk_2_0x0(x, Es=1):
0 | 1
'''
x_re = x.real
- dist = Es*numpy.sqrt(2)
- return [dist*x_re,]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_re, ]
+
+
sd_psk_2 = sd_psk_2_0x0 # Basic BPSK rotation
sd_psk_2_0 = sd_psk_2 # First ID for BPSK rotations
+
def sd_psk_2_0x1(x, Es=1):
'''
1 | 0
'''
- x_re = [x.real,]
- dist = Es*numpy.sqrt(2)
- return -dist*x_re
+ x_re = [x.real, ]
+ dist = Es * numpy.sqrt(2)
+ return -dist * x_re
+
+
sd_psk_2_1 = sd_psk_2_0x1
@@ -106,13 +117,16 @@ def psk_4_0x0_0_1():
| -------
| 00 | 01
'''
- const_points = [-1-1j, 1-1j,
- -1+1j, 1+1j]
+ const_points = [-1 - 1j, 1 - 1j,
+ -1 + 1j, 1 + 1j]
symbols = [0, 1, 2, 3]
return (const_points, symbols)
+
+
psk_4 = psk_4_0x0_0_1
psk_4_0 = psk_4
+
def psk_4_0x1_0_1():
'''
| 11 | 10
@@ -122,8 +136,11 @@ def psk_4_0x1_0_1():
k = 0x1
pi = [0, 1]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_1 = psk_4_0x1_0_1
+
def psk_4_0x2_0_1():
'''
| 00 | 01
@@ -133,8 +150,11 @@ def psk_4_0x2_0_1():
k = 0x2
pi = [0, 1]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_2 = psk_4_0x2_0_1
+
def psk_4_0x3_0_1():
'''
| 01 | 00
@@ -144,8 +164,11 @@ def psk_4_0x3_0_1():
k = 0x3
pi = [0, 1]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_3 = psk_4_0x3_0_1
+
def psk_4_0x0_1_0():
'''
| 01 | 11
@@ -155,8 +178,11 @@ def psk_4_0x0_1_0():
k = 0x0
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_4 = psk_4_0x0_1_0
+
def psk_4_0x1_1_0():
'''
| 00 | 10
@@ -166,8 +192,11 @@ def psk_4_0x1_1_0():
k = 0x1
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_5 = psk_4_0x1_1_0
+
def psk_4_0x2_1_0():
'''
| 11 | 01
@@ -177,8 +206,11 @@ def psk_4_0x2_1_0():
k = 0x2
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
+
+
psk_4_6 = psk_4_0x2_1_0
+
def psk_4_0x3_1_0():
'''
| 10 | 00
@@ -188,9 +220,10 @@ def psk_4_0x3_1_0():
k = 0x3
pi = [1, 0]
return constellation_map_generator(psk_4()[0], psk_4()[1], k, pi)
-psk_4_7 = psk_4_0x3_1_0
+psk_4_7 = psk_4_0x3_1_0
+
############################################################
# QPSK Constellation Softbit LUT generators
@@ -204,11 +237,14 @@ def sd_psk_4_0x0_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_im, dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_im, dist * x_re]
+
+
sd_psk_4 = sd_psk_4_0x0_0_1
sd_psk_4_0 = sd_psk_4
+
def sd_psk_4_0x1_0_1(x, Es=1):
'''
| 11 | 10
@@ -217,10 +253,13 @@ def sd_psk_4_0x1_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_im, -dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_im, -dist * x_re]
+
+
sd_psk_4_1 = sd_psk_4_0x1_0_1
+
def sd_psk_4_0x2_0_1(x, Es=1):
'''
| 00 | 01
@@ -229,10 +268,13 @@ def sd_psk_4_0x2_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_im, dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_im, dist * x_re]
+
+
sd_psk_4_2 = sd_psk_4_0x2_0_1
+
def sd_psk_4_0x3_0_1(x, Es=1):
'''
| 01 | 00
@@ -241,10 +283,13 @@ def sd_psk_4_0x3_0_1(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_im, -dist*x_re]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_im, -dist * x_re]
+
+
sd_psk_4_3 = sd_psk_4_0x3_0_1
+
def sd_psk_4_0x0_1_0(x, Es=1):
'''
| 01 | 11
@@ -253,10 +298,13 @@ def sd_psk_4_0x0_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_re, dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_re, dist * x_im]
+
+
sd_psk_4_4 = sd_psk_4_0x0_1_0
+
def sd_psk_4_0x1_1_0(x, Es=1):
'''
| 00 | 10
@@ -265,8 +313,10 @@ def sd_psk_4_0x1_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [dist*x_re, -dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [dist * x_re, -dist * x_im]
+
+
sd_psk_4_5 = sd_psk_4_0x1_1_0
@@ -278,10 +328,13 @@ def sd_psk_4_0x2_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_re, dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_re, dist * x_im]
+
+
sd_psk_4_6 = sd_psk_4_0x2_1_0
+
def sd_psk_4_0x3_1_0(x, Es=1):
'''
| 10 | 00
@@ -290,6 +343,8 @@ def sd_psk_4_0x3_1_0(x, Es=1):
'''
x_re = x.real
x_im = x.imag
- dist = Es*numpy.sqrt(2)
- return [-dist*x_re, -dist*x_im]
+ dist = Es * numpy.sqrt(2)
+ return [-dist * x_re, -dist * x_im]
+
+
sd_psk_4_7 = sd_psk_4_0x3_1_0
diff --git a/gr-digital/python/digital/qa_burst_shaper.py b/gr-digital/python/digital/qa_burst_shaper.py
index 40ac210ec2..b1fc793924 100644
--- a/gr-digital/python/digital/qa_burst_shaper.py
+++ b/gr-digital/python/digital/qa_burst_shaper.py
@@ -85,7 +85,8 @@ class qa_burst_shaper (gr_unittest.TestCase):
-4.0 * np.ones(5, dtype=complex)))
tags = (make_length_tag(0, length),)
expected = np.concatenate((np.zeros(prepad, dtype=complex), window[0:5],
- np.ones(length - len(window), dtype=complex),
+ np.ones(length - len(window),
+ dtype=complex),
window[5:10], np.zeros(postpad,
dtype=complex)))
etag = make_length_tag(0, length + prepad + postpad)
@@ -239,8 +240,10 @@ class qa_burst_shaper (gr_unittest.TestCase):
window = np.concatenate((-2.0 * np.ones(5), -4.0 * np.ones(5)))
tags = (make_length_tag(0, length1), make_length_tag(length1, length2))
expected = np.concatenate((np.zeros(prepad), window[0:5],
- np.ones(length1 - len(window)), window[5:10],
- np.zeros(postpad + prepad), -1.0 * window[0:5],
+ np.ones(length1 - len(window)
+ ), window[5:10],
+ np.zeros(postpad + prepad), -
+ 1.0 * window[0:5],
-1.0 * np.ones(length2 - len(window)),
-1.0 * window[5:10], np.zeros(postpad)))
etags = (make_length_tag(0, length1 + prepad + postpad),
@@ -335,8 +338,10 @@ class qa_burst_shaper (gr_unittest.TestCase):
make_tag(tag4_offset, 'body', pmt.intern('tag4')),
make_tag(tag5_offset, 'body', pmt.intern('tag5')))
expected = np.concatenate((np.zeros(prepad), window[0:5],
- np.ones(length1 - len(window)), window[5:10],
- np.zeros(postpad + prepad), -1.0 * window[0:5],
+ np.ones(length1 - len(window)
+ ), window[5:10],
+ np.zeros(postpad + prepad), -
+ 1.0 * window[0:5],
-1.0 * np.ones(length2 - len(window)),
-1.0 * window[5:10], np.zeros(postpad)))
elentag1_offset = 0
diff --git a/gr-digital/python/digital/qa_constellation.py b/gr-digital/python/digital/qa_constellation.py
index 7c9984405f..7345f782a2 100644
--- a/gr-digital/python/digital/qa_constellation.py
+++ b/gr-digital/python/digital/qa_constellation.py
@@ -230,7 +230,6 @@ class test_constellation(gr_unittest.TestCase):
data[first:],
msg=msg)
-
def test_soft_qpsk_gen(self):
prec = 8
constel, code = digital.psk_4_0()
diff --git a/gr-digital/python/digital/qa_constellation_encoder_bc.py b/gr-digital/python/digital/qa_constellation_encoder_bc.py
index 2c6d587dac..ef1991bf80 100644
--- a/gr-digital/python/digital/qa_constellation_encoder_bc.py
+++ b/gr-digital/python/digital/qa_constellation_encoder_bc.py
@@ -12,6 +12,7 @@
from gnuradio import gr, gr_unittest, digital, blocks
import numpy as np
+
class test_constellation_encoder(gr_unittest.TestCase):
def setUp(self):
@@ -23,8 +24,8 @@ class test_constellation_encoder(gr_unittest.TestCase):
def test_constellation_encoder_bc_bpsk(self):
cnst = digital.constellation_bpsk()
- src_data = (1, 1, 0, 0,
- 1, 0, 1)
+ src_data = (1, 1, 0, 0,
+ 1, 0, 1)
const_map = [-1.0, 1.0]
expected_result = [const_map[x] for x in src_data]
@@ -43,8 +44,8 @@ class test_constellation_encoder(gr_unittest.TestCase):
def test_constellation_encoder_bc_qpsk(self):
cnst = digital.constellation_qpsk()
- src_data = (3, 1, 0, 2,
- 3, 2, 1)
+ src_data = (3, 1, 0, 2,
+ 3, 2, 1)
expected_result = [cnst.points()[x] for x in src_data]
src = blocks.vector_source_b(src_data)
op = digital.constellation_encoder_bc(cnst.base())
@@ -59,7 +60,6 @@ class test_constellation_encoder(gr_unittest.TestCase):
# print "expected result", expected_result
self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
-
def test_constellation_encoder_bc_qpsk_random(self):
cnst = digital.constellation_qpsk()
src_data = np.random.randint(0, 4, size=20000)
@@ -77,5 +77,6 @@ class test_constellation_encoder(gr_unittest.TestCase):
# print "expected result", expected_result
self.assertFloatTuplesAlmostEqual(expected_result, actual_result)
+
if __name__ == '__main__':
gr_unittest.run(test_constellation_encoder)
diff --git a/gr-digital/python/digital/qa_decision_feedback_equalizer.py b/gr-digital/python/digital/qa_decision_feedback_equalizer.py
index 1d4587a6f0..f915272e52 100755
--- a/gr-digital/python/digital/qa_decision_feedback_equalizer.py
+++ b/gr-digital/python/digital/qa_decision_feedback_equalizer.py
@@ -116,7 +116,7 @@ class qa_decision_feedback_equalizer(gr_unittest.TestCase):
1,
1,
alg,
- True,
+ True,
[],
'')
dst = blocks.vector_sink_c()
diff --git a/gr-digital/python/digital/qa_glfsr_source.py b/gr-digital/python/digital/qa_glfsr_source.py
index 7fde483ed3..23382605c6 100644
--- a/gr-digital/python/digital/qa_glfsr_source.py
+++ b/gr-digital/python/digital/qa_glfsr_source.py
@@ -60,7 +60,7 @@ class test_glfsr_source(gr_unittest.TestCase):
lambda: digital.glfsr_source_f(0))
self.assertRaises(RuntimeError,
lambda: digital.glfsr_source_f(65))
-
+
def test_005_correlation_f(self):
for degree in range(
1, 11): # Higher degrees take too long to correlate
diff --git a/gr-digital/python/digital/qa_lfsr.py b/gr-digital/python/digital/qa_lfsr.py
index e0db6a382b..8caa0dbada 100644
--- a/gr-digital/python/digital/qa_lfsr.py
+++ b/gr-digital/python/digital/qa_lfsr.py
@@ -14,6 +14,7 @@ import numpy as np
from gnuradio import gr, gr_unittest, digital
from gnuradio.digital.utils import lfsr_args
+
class test_lfsr(gr_unittest.TestCase):
def setUp(self):
@@ -33,19 +34,20 @@ class test_lfsr(gr_unittest.TestCase):
self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5)
def test_lfsr_002(self):
- l = digital.lfsr(*lfsr_args(0b1,5,3,0))
- result_data = [l.next_bit() for _ in range(2*(2**5-1))]
-
+ l = digital.lfsr(*lfsr_args(0b1, 5, 3, 0))
+ result_data = [l.next_bit() for _ in range(2 * (2**5 - 1))]
+
expected_result = [1, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 0,
- 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0]*2
+ 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0] * 2
self.assertEqual(expected_result, result_data)
- seq1 = [l.next_bit() for _ in range(2**5-1)]
- seq2 = [l.next_bit() for _ in range(2**5-1)]
- self.assertEqual(seq1,seq2)
+ seq1 = [l.next_bit() for _ in range(2**5 - 1)]
+ seq2 = [l.next_bit() for _ in range(2**5 - 1)]
+ self.assertEqual(seq1, seq2)
+
+ res = (np.convolve(seq1, [1, 0, 1, 0, 0, 1]) % 2)
+ self.assertTrue(sum(res[5:-5]) == 0, "LRS not generated properly")
- res = (np.convolve(seq1,[1,0,1,0,0,1])%2)
- self.assertTrue(sum(res[5:-5])==0,"LRS not generated properly")
if __name__ == '__main__':
gr_unittest.run(test_lfsr)
diff --git a/gr-digital/python/digital/qa_linear_equalizer.py b/gr-digital/python/digital/qa_linear_equalizer.py
index 64fd84d2f3..39fea0ef49 100755
--- a/gr-digital/python/digital/qa_linear_equalizer.py
+++ b/gr-digital/python/digital/qa_linear_equalizer.py
@@ -111,7 +111,7 @@ class qa_linear_equalizer(gr_unittest.TestCase):
4,
1,
alg,
- True,
+ True,
[],
'')
dst = blocks.vector_sink_c()
diff --git a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
index a1f8cef6e9..7d5f79781c 100644
--- a/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
+++ b/gr-digital/python/digital/qa_ofdm_chanest_vcvc.py
@@ -303,7 +303,8 @@ class qa_ofdm_chanest_vcvc (gr_unittest.TestCase):
for x in range(fft_len)]
src = blocks.vector_source_c(tx_data, False, fft_len)
chan = blocks.multiply_const_vcc(channel)
- noise = blocks.vector_source_c(numpy.random.normal(0,wgn_amplitude,(len(tx_data),)), False, fft_len)
+ noise = blocks.vector_source_c(numpy.random.normal(
+ 0, wgn_amplitude, (len(tx_data),)), False, fft_len)
add = blocks.add_cc(fft_len)
chanest = digital.ofdm_chanest_vcvc(sync_sym1, sync_sym2, 1)
sink = blocks.vector_sink_c(fft_len)
@@ -325,7 +326,8 @@ class qa_ofdm_chanest_vcvc (gr_unittest.TestCase):
shifted_carrier_mask = shift_tuple(carrier_mask, carr_offset)
for i in range(fft_len):
if shifted_carrier_mask[i] and channel_est[i]:
- self.assertAlmostEqual(channel[i], channel_est[i], places=0)
+ self.assertAlmostEqual(
+ channel[i], channel_est[i], places=0)
rx_sym_est[i] = (sink.data()[i] / channel_est[i]).real
return carr_offset, list(shift_tuple(rx_sym_est, -carr_offset_hat))
bit_errors = 0
diff --git a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
index 254479363e..10b484e839 100644
--- a/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
+++ b/gr-digital/python/digital/qa_ofdm_sync_sc_cfb.py
@@ -106,7 +106,8 @@ class qa_ofdm_sync_sc_cfb (gr_unittest.TestCase):
for _ in range(n_bursts):
gap = [0, ] * random.randint(0, 2 * fft_len)
tx_signal += gap + \
- make_bpsk_burst(fft_len, cp_len, fft_len * random.randint(5, 23))
+ make_bpsk_burst(fft_len, cp_len, fft_len *
+ random.randint(5, 23))
# Very loose definition of SNR here
snr = 20 # dB
sigma = 10**(-snr / 10)
diff --git a/gr-digital/python/digital/qa_scrambler.py b/gr-digital/python/digital/qa_scrambler.py
index 7d87f87b6c..b350d9e1b7 100644
--- a/gr-digital/python/digital/qa_scrambler.py
+++ b/gr-digital/python/digital/qa_scrambler.py
@@ -26,6 +26,7 @@ def additive_scramble_lfsr(mask, seed, reglen, bpb, data):
out.append(d ^ scramble_word)
return out
+
class test_scrambler(gr_unittest.TestCase):
def setUp(self):
@@ -34,33 +35,35 @@ class test_scrambler(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
-
def test_lfsr_002(self):
- _a = lfsr_args(1,51,3,0)
+ _a = lfsr_args(1, 51, 3, 0)
l = digital.lfsr(*_a)
seq = [l.next_bit() for _ in range(2**10)]
- reg = np.zeros(52,np.int8)
- reg[::-1][(51,3,0),] = 1
- res = (np.convolve(seq,reg)%2)
- self.assertTrue(sum(res[52:-52])==0,"LRS not generated properly")
+ reg = np.zeros(52, np.int8)
+ reg[::-1][(51, 3, 0), ] = 1
+ res = (np.convolve(seq, reg) % 2)
+ self.assertTrue(sum(res[52:-52]) == 0, "LRS not generated properly")
def test_scrambler_descrambler_001(self):
- src_data = np.random.randint(0,2,500,dtype=np.int8)
+ src_data = np.random.randint(0, 2, 500, dtype=np.int8)
src = blocks.vector_source_b(src_data, False)
- scrambler = digital.scrambler_bb(*lfsr_args(0b1,7,2,0)) # p(x) = x^7 + x^2 + 1
- descrambler = digital.descrambler_bb(*lfsr_args(0b111,7,2,0)) # we can use any seed here, it is descrambling.
+ scrambler = digital.scrambler_bb(
+ *lfsr_args(0b1, 7, 2, 0)) # p(x) = x^7 + x^2 + 1
+ # we can use any seed here, it is descrambling.
+ descrambler = digital.descrambler_bb(*lfsr_args(0b111, 7, 2, 0))
m_tap = blocks.vector_sink_b()
dst = blocks.vector_sink_b()
self.tb.connect(src, scrambler, descrambler, dst)
self.tb.connect(scrambler, m_tap)
self.tb.run()
- self.assertEqual(src_data[:-7].tolist(), dst.data()[7:]) # skip garbage during synchronization
- self.assertEqual(tuple(np.convolve(m_tap.data(),[1,0,0,0,0,1,0,1])%2)[7:-10],
- tuple(src_data[:-10])) # manual descrambling test
+ # skip garbage during synchronization
+ self.assertEqual(src_data[:-7].tolist(), dst.data()[7:])
+ self.assertEqual(tuple(np.convolve(m_tap.data(), [1, 0, 0, 0, 0, 1, 0, 1]) % 2)[7:-10],
+ tuple(src_data[:-10])) # manual descrambling test
def test_scrambler_descrambler_002(self):
- _a = lfsr_args(0b1,51,6,0) #p(x) = x^51+x^6+1
- src_data = np.random.randint(0,2,1000,dtype=np.int8)
+ _a = lfsr_args(0b1, 51, 6, 0) # p(x) = x^51+x^6+1
+ src_data = np.random.randint(0, 2, 1000, dtype=np.int8)
src = blocks.vector_source_b(src_data, False)
scrambler = digital.scrambler_bb(*_a)
m_tap = blocks.vector_sink_b()
@@ -69,26 +72,29 @@ class test_scrambler(gr_unittest.TestCase):
self.tb.connect(src, scrambler, descrambler, dst)
self.tb.connect(scrambler, m_tap)
self.tb.run()
- self.assertTrue(np.all(src_data[:-51]==dst.data()[51:])) # skip garbage during synchronization
- reg = np.zeros(52,np.int8)
- reg[::-1][(51,6,0),] = 1
- self.assertTrue(np.all( np.convolve(m_tap.data(),reg)[51:-60]%2
- == src_data[:-60])) # manual descrambling test
+ # skip garbage during synchronization
+ self.assertTrue(np.all(src_data[:-51] == dst.data()[51:]))
+ reg = np.zeros(52, np.int8)
+ reg[::-1][(51, 6, 0), ] = 1
+ self.assertTrue(np.all(np.convolve(m_tap.data(), reg)[51:-60] % 2 ==
+ src_data[:-60])) # manual descrambling test
def test_scrambler_descrambler_003(self):
- src_data = np.random.randint(0,2,1000,dtype=np.int8)
+ src_data = np.random.randint(0, 2, 1000, dtype=np.int8)
src = blocks.vector_source_b(src_data, False)
- scrambler = digital.scrambler_bb(*lfsr_args(1,12,10,3,2,0)) # this is the product of the other two
- descrambler1 = digital.descrambler_bb(*lfsr_args(1,5,3,0))
- descrambler2 = digital.descrambler_bb(*lfsr_args(1,7,2,0))
+ # this is the product of the other two
+ scrambler = digital.scrambler_bb(*lfsr_args(1, 12, 10, 3, 2, 0))
+ descrambler1 = digital.descrambler_bb(*lfsr_args(1, 5, 3, 0))
+ descrambler2 = digital.descrambler_bb(*lfsr_args(1, 7, 2, 0))
dst = blocks.vector_sink_b()
self.tb.connect(src, scrambler, descrambler1, descrambler2, dst)
self.tb.run()
- self.assertTrue(np.all(src_data[:-12]==dst.data()[12:])) # skip garbage during synchronization
+ # skip garbage during synchronization
+ self.assertTrue(np.all(src_data[:-12] == dst.data()[12:]))
def test_additive_scrambler_001(self):
- _a = lfsr_args(1,51,3,0) #i p(x) = x^51+x^3+1, seed 0x1
- src_data = np.random.randint(0,2,1000,dtype=np.int8).tolist()
+ _a = lfsr_args(1, 51, 3, 0) # i p(x) = x^51+x^3+1, seed 0x1
+ src_data = np.random.randint(0, 2, 1000, dtype=np.int8).tolist()
src = blocks.vector_source_b(src_data, False)
scrambler = digital.additive_scrambler_bb(*_a)
descrambler = digital.additive_scrambler_bb(*_a)
@@ -98,18 +104,18 @@ class test_scrambler(gr_unittest.TestCase):
self.assertEqual(tuple(src_data), tuple(dst.data()))
def test_additive_scrambler_002(self):
- _a = lfsr_args(1,51,3,0) #i p(x) = x^51+x^3+1, seed 0x1
- src_data = [1,]*1000
+ _a = lfsr_args(1, 51, 3, 0) # i p(x) = x^51+x^3+1, seed 0x1
+ src_data = [1, ] * 1000
src = blocks.vector_source_b(src_data, False)
scrambler = digital.additive_scrambler_bb(*_a)
dst = blocks.vector_sink_b()
self.tb.connect(src, scrambler, dst)
self.tb.run()
- reg = np.zeros(52,np.int8)
- reg[::-1][(51,3,0),] = 1
- res = (np.convolve(dst.data(),reg)%2)[52:-52]
- self.assertEqual(len(res), sum(res)) # when convolved with mask,
- # after sync, only 1's would be returned.
+ reg = np.zeros(52, np.int8)
+ reg[::-1][(51, 3, 0), ] = 1
+ res = (np.convolve(dst.data(), reg) % 2)[52:-52]
+ self.assertEqual(len(res), sum(res)) # when convolved with mask,
+ # after sync, only 1's would be returned.
def test_scrambler_descrambler(self):
src_data = [1, ] * 1000
diff --git a/gr-digital/python/digital/qam.py b/gr-digital/python/digital/qam.py
index d74866b946..6e8951ef1e 100644
--- a/gr-digital/python/digital/qam.py
+++ b/gr-digital/python/digital/qam.py
@@ -243,8 +243,8 @@ def large_ampls_to_corners_mapping(side, points, width):
# use the center point.
c = ((real_x - side / 2.0 + 0.5) * width +
(imag_x - side / 2.0 + 0.5) * width * 1j)
- if (real_x >= extra_layers and real_x < side - extra_layers
- and imag_x >= extra_layers and imag_x < side - extra_layers):
+ if (real_x >= extra_layers and real_x < side - extra_layers and
+ imag_x >= extra_layers and imag_x < side - extra_layers):
# This is not an edge row/column. Find closest point.
index = find_closest_point(c, points)
else:
diff --git a/gr-digital/python/digital/qpsk.py b/gr-digital/python/digital/qpsk.py
index db97d31fc3..22587948fc 100644
--- a/gr-digital/python/digital/qpsk.py
+++ b/gr-digital/python/digital/qpsk.py
@@ -27,23 +27,28 @@ _def_mod_code = mod_codes.GRAY_CODE
# QPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def qpsk_constellation(mod_code=_def_mod_code):
"""
Creates a QPSK constellation.
"""
if mod_code != mod_codes.GRAY_CODE:
- raise ValueError("This QPSK mod/demod works only for gray-coded constellations.")
+ raise ValueError(
+ "This QPSK mod/demod works only for gray-coded constellations.")
return digital.constellation_qpsk()
# /////////////////////////////////////////////////////////////////////////////
# DQPSK constellation
# /////////////////////////////////////////////////////////////////////////////
+
def dqpsk_constellation(mod_code=_def_mod_code):
if mod_code != mod_codes.GRAY_CODE:
- raise ValueError("The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.")
+ raise ValueError(
+ "The DQPSK constellation is only generated for gray_coding. But it can be used for non-grayed coded modulation if one doesn't use the pre-differential code.")
return digital.constellation_dqpsk()
+
#
# Add these to the mod/demod registry
#
diff --git a/gr-digital/python/digital/soft_dec_lut_gen.py b/gr-digital/python/digital/soft_dec_lut_gen.py
index 898af5320d..13c50251a5 100644
--- a/gr-digital/python/digital/soft_dec_lut_gen.py
+++ b/gr-digital/python/digital/soft_dec_lut_gen.py
@@ -11,6 +11,7 @@
import numpy
+
def soft_dec_table_generator(soft_dec_gen, prec, Es=1):
'''
| Builds a LUT that is a list of tuples. The tuple represents the
@@ -71,7 +72,7 @@ def soft_dec_table_generator(soft_dec_gen, prec, Es=1):
'''
npts = int(2.0**prec)
- maxd = Es*numpy.sqrt(2.0)/2.0
+ maxd = Es * numpy.sqrt(2.0) / 2.0
yrng = numpy.linspace(-maxd, maxd, npts)
xrng = numpy.linspace(-maxd, maxd, npts)
@@ -83,6 +84,7 @@ def soft_dec_table_generator(soft_dec_gen, prec, Es=1):
table.append(decs)
return table
+
def soft_dec_table(constel, symbols, prec, npwr=1):
'''
Similar in nature to soft_dec_table_generator above. Instead, this
@@ -120,6 +122,7 @@ def soft_dec_table(constel, symbols, prec, npwr=1):
table.append(decs)
return table
+
def calc_soft_dec_from_table(sample, table, prec, Es=1.0):
'''
Takes in a complex sample and converts it from the coordinates
@@ -144,25 +147,26 @@ def calc_soft_dec_from_table(sample, table, prec, Es=1.0):
constellation.
'''
lut_scale = 2.0**prec
- maxd = Es*numpy.sqrt(2.0)/2.0
- scale = (lut_scale) / (2.0*maxd)
+ maxd = Es * numpy.sqrt(2.0) / 2.0
+ scale = (lut_scale) / (2.0 * maxd)
- alpha = 0.99 # to keep index within bounds
+ alpha = 0.99 # to keep index within bounds
xre = sample.real
xim = sample.imag
- xre = ((maxd + min(alpha*maxd, max(-alpha*maxd, xre))) * scale)
- xim = ((maxd + min(alpha*maxd, max(-alpha*maxd, xim))) * scale)
- index = int(xre) + lut_scale*int(xim)
+ xre = ((maxd + min(alpha * maxd, max(-alpha * maxd, xre))) * scale)
+ xim = ((maxd + min(alpha * maxd, max(-alpha * maxd, xim))) * scale)
+ index = int(xre) + lut_scale * int(xim)
max_index = lut_scale**2
while(index >= max_index):
- index -= lut_scale;
+ index -= lut_scale
while(index < 0):
- index += lut_scale;
+ index += lut_scale
return table[int(index)]
+
def calc_soft_dec(sample, constel, symbols, npwr=1):
'''
This function takes in any consteallation and symbol symbol set
@@ -186,8 +190,8 @@ def calc_soft_dec(sample, constel, symbols, npwr=1):
M = len(constel)
k = int(numpy.log2(M))
- tmp = 2*k*[0]
- s = k*[0]
+ tmp = 2 * k * [0]
+ s = k * [0]
for i in range(M):
# Calculate the distance between the sample and the current
@@ -200,21 +204,21 @@ def calc_soft_dec(sample, constel, symbols, npwr=1):
for j in range(k):
# Get the bit at the jth index
- mask = 1<<j
+ mask = 1 << j
bit = (symbols[i] & mask) >> j
# If the bit is a 0, add to the probability of a zero
if(bit == 0):
- tmp[2*j+0] += d
+ tmp[2 * j + 0] += d
# else, add to the probability of a one
else:
- tmp[2*j+1] += d
+ tmp[2 * j + 1] += d
# Calculate the log-likelihood ratio for all bits based on the
# probability of ones (tmp[2*i+1]) over the probability of a zero
# (tmp[2*i+0]).
for i in range(k):
- s[k-1-i] = (numpy.log(tmp[2*i+1]) - numpy.log(tmp[2*i+0]))
+ s[k - 1 - i] = (numpy.log(tmp[2 * i + 1]) - numpy.log(tmp[2 * i + 0]))
return s
@@ -225,19 +229,19 @@ def show_table(table):
pp = ""
subi = 1
subj = 0
- for i in reversed(list(range(prec+1))):
- if(i == prec//2):
- pp += "-----" + prec*((nbits*8)+3)*"-" + "\n"
+ for i in reversed(list(range(prec + 1))):
+ if(i == prec // 2):
+ pp += "-----" + prec * ((nbits * 8) + 3) * "-" + "\n"
subi = 0
continue
- for j in range(prec+1):
- if(j == prec//2):
+ for j in range(prec + 1):
+ if(j == prec // 2):
pp += "| "
subj = 1
else:
- item = table[prec*(i-subi) + (j-subj)]
+ item = table[prec * (i - subi) + (j - subj)]
pp += "( "
- for t in range(nbits-1, -1, -1):
+ for t in range(nbits - 1, -1, -1):
pp += "{0: .4f} ".format(item[t])
pp += ") "
pp += "\n"
diff --git a/gr-digital/python/digital/test_soft_decisions.py b/gr-digital/python/digital/test_soft_decisions.py
index 59b6edc032..3350103573 100644
--- a/gr-digital/python/digital/test_soft_decisions.py
+++ b/gr-digital/python/digital/test_soft_decisions.py
@@ -9,13 +9,15 @@
#
-import numpy, sys
+import numpy
+import sys
from matplotlib import pyplot
from gnuradio import digital
from .soft_dec_lut_gen import soft_dec_table, calc_soft_dec_from_table, calc_soft_dec
from .psk_constellations import psk_4_0, psk_4_1, psk_4_2, psk_4_3, psk_4_4, psk_4_5, psk_4_6, psk_4_7, sd_psk_4_0, sd_psk_4_1, sd_psk_4_2, sd_psk_4_3, sd_psk_4_4, sd_psk_4_5, sd_psk_4_6, sd_psk_4_7
from .qam_constellations import qam_16_0, sd_qam_16_0
+
def test_qpsk(i, sample, prec):
qpsk_const_list = [psk_4_0, psk_4_1, psk_4_2, psk_4_3,
psk_4_4, psk_4_5, psk_4_6, psk_4_7]
@@ -33,7 +35,8 @@ def test_qpsk(i, sample, prec):
# Get max energy/symbol in constellation
constel = c.points()
- Es = max([numpy.sqrt(constel_i.real**2 + constel_i.imag**2) for constel_i in constel])
+ Es = max([numpy.sqrt(constel_i.real**2 + constel_i.imag**2)
+ for constel_i in constel])
#table = soft_dec_table_generator(qpsk_lut_gen, prec, Es)
table = soft_dec_table(constel, code, prec)
@@ -50,6 +53,7 @@ def test_qpsk(i, sample, prec):
return (y_python_gen_calc, y_python_table, y_python_raw_calc,
y_cpp_table, y_cpp_raw_calc, constel, code, c)
+
def test_qam16(i, sample, prec):
sample = sample / 1
qam_const_list = [qam_16_0, ]
@@ -71,7 +75,7 @@ def test_qam16(i, sample, prec):
#table = soft_dec_table_generator(qam_lut_gen, prec, Es)
table = soft_dec_table(constel, code, prec, 1)
- #c.gen_soft_dec_lut(prec)
+ # c.gen_soft_dec_lut(prec)
c.set_soft_dec_lut(table, prec)
y_python_gen_calc = qam_lut_gen(sample, Es)
@@ -83,14 +87,15 @@ def test_qam16(i, sample, prec):
return (y_python_gen_calc, y_python_table, y_python_raw_calc,
y_cpp_table, y_cpp_raw_calc, constel, code, c)
+
if __name__ == "__main__":
index = 0
prec = 8
- x_re = 2*numpy.random.random()-1
- x_im = 2*numpy.random.random()-1
- x = x_re + x_im*1j
+ x_re = 2 * numpy.random.random() - 1
+ x_im = 2 * numpy.random.random() - 1
+ x = x_re + x_im * 1j
#x = -1 + -0.j
if 1:
@@ -112,14 +117,14 @@ if __name__ == "__main__":
print("C++ Raw calc: ", (y_cpp_raw_calc))
fig = pyplot.figure(1)
- sp1 = fig.add_subplot(1,1,1)
+ sp1 = fig.add_subplot(1, 1, 1)
sp1.plot([c.real for c in constel],
[c.imag for c in constel], 'bo')
sp1.plot(x.real, x.imag, 'ro')
sp1.set_xlim([-1.5, 1.5])
sp1.set_ylim([-1.5, 1.5])
fill = int(numpy.log2(len(constel)))
- for i,c in enumerate(constel):
- sp1.text(1.2*c.real, 1.2*c.imag, bin(code[i])[2:].zfill(fill),
+ for i, c in enumerate(constel):
+ sp1.text(1.2 * c.real, 1.2 * c.imag, bin(code[i])[2:].zfill(fill),
ha='center', va='center', size=18)
pyplot.show()
diff --git a/gr-digital/python/digital/utils/__init__.py b/gr-digital/python/digital/utils/__init__.py
index 0d5aae0b79..f3a8390af8 100644
--- a/gr-digital/python/digital/utils/__init__.py
+++ b/gr-digital/python/digital/utils/__init__.py
@@ -1,11 +1,11 @@
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
from .lfsr import lfsr_args
diff --git a/gr-digital/python/digital/utils/alignment.py b/gr-digital/python/digital/utils/alignment.py
index f0541dea56..531c39830e 100644
--- a/gr-digital/python/digital/utils/alignment.py
+++ b/gr-digital/python/digital/utils/alignment.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
"""
This module contains functions for aligning sequences.
@@ -41,6 +41,7 @@ def_max_offset = 500
# The maximum number of samples to take from two sequences to check alignment.
def_num_samples = 1000
+
def compare_sequences(d1, d2, offset, sample_indices=None):
"""
Takes two binary sequences and an offset and returns the number of
@@ -49,7 +50,7 @@ def compare_sequences(d1, d2, offset, sample_indices=None):
offset -- offset of d2 relative to d1
sample_indices -- a list of indices to use for the comparison
"""
- max_index = min(len(d1), len(d2)+offset)
+ max_index = min(len(d1), len(d2) + offset)
if sample_indices is None:
sample_indices = list(range(0, max_index))
correct = 0
@@ -57,11 +58,12 @@ def compare_sequences(d1, d2, offset, sample_indices=None):
for i in sample_indices:
if i >= max_index:
break
- if d1[i] == d2[i-offset]:
+ if d1[i] == d2[i - offset]:
correct += 1
total += 1
return (correct, total)
+
def random_sample(size, num_samples=def_num_samples, seed=None):
"""
Returns a set of random integers between 0 and (size-1).
@@ -76,12 +78,13 @@ def random_sample(size, num_samples=def_num_samples, seed=None):
num_samples = num_samples / 2
indices = set([])
while len(indices) < num_samples:
- index = rndm.randint(0, size-1)
+ index = rndm.randint(0, size - 1)
indices.add(index)
indices = list(indices)
indices.sort()
return indices
+
def align_sequences(d1, d2,
num_samples=def_num_samples,
max_offset=def_max_offset,
@@ -113,7 +116,7 @@ def align_sequences(d1, d2,
int_range = [item for items in zip(pos_range, neg_range) for item in items]
for offset in int_range:
correct, compared = compare_sequences(d1, d2, offset, indices)
- frac_correct = 1.0*correct/compared
+ frac_correct = 1.0 * correct / compared
if frac_correct > max_frac_correct:
max_frac_correct = frac_correct
best_offset = offset
@@ -122,8 +125,8 @@ def align_sequences(d1, d2,
if frac_correct > correct_cutoff:
break
return max_frac_correct, best_compared, best_offset, indices
-
+
+
if __name__ == "__main__":
import doctest
doctest.testmod()
-
diff --git a/gr-digital/python/digital/utils/gray_code.py b/gr-digital/python/digital/utils/gray_code.py
index e045e9a4ac..de8ffbc93c 100644
--- a/gr-digital/python/digital/utils/gray_code.py
+++ b/gr-digital/python/digital/utils/gray_code.py
@@ -41,14 +41,14 @@ class GrayCodeGenerator(object):
else:
# if not we take advantage of the symmetry of all but the last bit
# around a power of two.
- result = self.gcs[2*self.lp2-1-self.i] + self.lp2
+ result = self.gcs[2 * self.lp2 - 1 - self.i] + self.lp2
self.gcs.append(result)
self.i += 1
if self.i == self.np2:
self.lp2 = self.i
- self.np2 = self.i*2
+ self.np2 = self.i * 2
+
_gray_code_generator = GrayCodeGenerator()
gray_code = _gray_code_generator.get_gray_code
-
diff --git a/gr-digital/python/digital/utils/lfsr.py b/gr-digital/python/digital/utils/lfsr.py
index 2b8a47cb76..44f62f0c7a 100644
--- a/gr-digital/python/digital/utils/lfsr.py
+++ b/gr-digital/python/digital/utils/lfsr.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python
#
# Copyright 2020 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
def lfsr_args(seed, *exp):
"""
@@ -18,4 +18,4 @@ def lfsr_args(seed, *exp):
Creates an lfsr object with seed 0b11001, mask 0b1000011, K=6
"""
from functools import reduce
- return reduce(int.__xor__, map(lambda x:2**x, exp)), seed, max(exp)-1
+ return reduce(int.__xor__, map(lambda x: 2**x, exp)), seed, max(exp) - 1
diff --git a/gr-digital/python/digital/utils/mod_codes.py b/gr-digital/python/digital/utils/mod_codes.py
index bafb85e18b..f0ac9f1fb5 100644
--- a/gr-digital/python/digital/utils/mod_codes.py
+++ b/gr-digital/python/digital/utils/mod_codes.py
@@ -1,12 +1,12 @@
#!/usr/bin/env python
#
# Copyright 2011 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
# Constants used to represent what coding to use.
GRAY_CODE = 'gray'
@@ -15,6 +15,7 @@ NO_CODE = 'none'
codes = (GRAY_CODE, SET_PARTITION_CODE, NO_CODE)
+
def invert_code(code):
c = enumerate(code)
ic = [(b, a) for (a, b) in c]
diff --git a/gr-digital/python/digital/utils/tagged_streams.py b/gr-digital/python/digital/utils/tagged_streams.py
index 49c1058981..9876519bd0 100644
--- a/gr-digital/python/digital/utils/tagged_streams.py
+++ b/gr-digital/python/digital/utils/tagged_streams.py
@@ -13,6 +13,7 @@
from gnuradio import gr
import pmt
+
def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
tags = []
assert(len(offsets) == len(lengths))
@@ -24,26 +25,31 @@ def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
tags.append(tag)
return tags
+
def string_to_vector(string):
v = []
for s in string:
v.append(ord(s))
return v
+
def strings_to_vectors(strings, lengthtagname):
vs = [string_to_vector(string) for string in strings]
return packets_to_vectors(vs, lengthtagname)
+
def vector_to_string(v):
s = []
for d in v:
s.append(chr(d))
return ''.join(s)
+
def vectors_to_strings(data, tags, lengthtagname):
packets = vectors_to_packets(data, tags, lengthtagname)
return [vector_to_string(packet) for packet in packets]
+
def count_bursts(data, tags, lengthtagname, vlen=1):
lengthtags = [t for t in tags
if pmt.symbol_to_string(t.key) == lengthtagname]
@@ -53,7 +59,7 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(lengthtagname, tag.offset))
- lengths[tag.offset] = pmt.to_long(tag.value)*vlen
+ lengths[tag.offset] = pmt.to_long(tag.value) * vlen
in_burst = False
in_packet = False
packet_length = None
@@ -62,7 +68,8 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
for pos in range(len(data)):
if pos in lengths:
if in_packet:
- print("Got tag at pos {0} current packet_pos is {1}".format(pos, packet_pos))
+ print("Got tag at pos {0} current packet_pos is {1}".format(
+ pos, packet_pos))
raise Exception("Received packet tag while in packet.")
packet_pos = -1
packet_length = lengths[pos]
@@ -74,11 +81,12 @@ def count_bursts(data, tags, lengthtagname, vlen=1):
in_burst = False
if in_packet:
packet_pos += 1
- if packet_pos == packet_length-1:
+ if packet_pos == packet_length - 1:
in_packet = False
packet_pos = None
return burst_count
+
def vectors_to_packets(data, tags, lengthtagname, vlen=1):
lengthtags = [t for t in tags
if pmt.symbol_to_string(t.key) == lengthtagname]
@@ -88,7 +96,7 @@ def vectors_to_packets(data, tags, lengthtagname, vlen=1):
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(lengthtagname, tag.offset))
- lengths[tag.offset] = pmt.to_long(tag.value)*vlen
+ lengths[tag.offset] = pmt.to_long(tag.value) * vlen
if 0 not in lengths:
raise ValueError("There is no tag with key {0} and an offset of 0"
.format(lengthtagname))
@@ -102,12 +110,13 @@ def vectors_to_packets(data, tags, lengthtagname, vlen=1):
length = lengths[pos]
if length == 0:
raise ValueError("Packets cannot have zero length.")
- if pos+length > len(data):
+ if pos + length > len(data):
raise ValueError("The final packet is incomplete.")
- packets.append(data[pos: pos+length])
+ packets.append(data[pos: pos + length])
pos += length
return packets
+
def packets_to_vectors(packets, lengthtagname, vlen=1):
tags = []
data = []