summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMark Bauer <markb5@illinois.edu>2021-06-09 15:52:54 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-06-16 06:38:49 -0400
commitc2961061793b538a8a87133755acead8088ae2ab (patch)
tree45a70498042ba308a50697c766e636d33ae24277
parentcdf17740c6b9606e41acb5d5a9a95b2ee36931b2 (diff)
applied formatter to code
Signed-off-by: Mark Bauer <markb5@illinois.edu>
-rw-r--r--gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py18
-rw-r--r--gr-fec/python/fec/extended_decoder.py244
-rw-r--r--gr-fec/python/fec/extended_tagged_decoder.py230
-rw-r--r--gr-fec/python/fec/polar/channel_construction.py19
-rw-r--r--gr-fec/python/fec/polar/channel_construction_awgn.py51
-rw-r--r--gr-fec/python/fec/polar/common.py24
-rw-r--r--gr-fec/python/fec/polar/testbed.py137
-rwxr-xr-xgr-utils/plot_tools/gr_plot_const238
-rwxr-xr-xgr-utils/plot_tools/gr_plot_iq157
9 files changed, 703 insertions, 415 deletions
diff --git a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py
index 53e9fa6b29..4e0c622e15 100644
--- a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py
+++ b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py
@@ -37,11 +37,11 @@ from .Generate_LDPC_matrix_functions import (
# First, generate a regular LDPC parity check matrix. Specify
# the properties desired. For example:
-n = 200 # number of columns, corresponds to codeword length
-p = 3 # column weight
-q = 5 # row weight
+n = 200 # number of columns, corresponds to codeword length
+p = 3 # column weight
+q = 5 # row weight
-parity_check_matrix = LDPC_matrix(n_p_q = [n,p,q])
+parity_check_matrix = LDPC_matrix(n_p_q = [n, p, q])
# Richardson and Urbanke's preprocessing method requires a full rank
# matrix to start. The matrices generated by the
@@ -55,7 +55,7 @@ newH = get_full_rank_H_matrix(parity_check_matrix.H)
# Next, some preprocessing steps need to be performed as described
# Richardson and Urbanke in Modern Coding Theory, Appendix A. This
# can take a while...
-[bestH,g] = get_best_matrix(newH,100)
+[bestH, g] = get_best_matrix(newH, 100)
# Print(out some of the resulting properties.)
n = bestH.shape[1]
@@ -63,12 +63,12 @@ k = n - bestH.shape[0]
print("Parity check matrix properties:")
print("\tSize :", bestH.shape)
print("\tRank :", linalg.matrix_rank(bestH))
-print("\tRate : %.3f" % ((k*1.0) / n))
+print("\tRate : %.3f" % ((k * 1.0) / n))
print("\tn :", n, " (codeword length)")
print("\tk :", k, " (info word length)")
print("\tgap : %i" % g)
# Save the matrix to an alist file for future use:
-alist_filename = "n_%04i_k_%04i_gap_%02i.alist" % (n,k,g)
-write_alist_file(alist_filename,bestH)
-print('\nMatrix saved to alist file:', alist_filename, "\n")
+alist_filename = "n_%04i_k_%04i_gap_%02i.alist" % (n, k, g)
+write_alist_file(alist_filename, bestH)
+print("\nMatrix saved to alist file:", alist_filename, "\n")
diff --git a/gr-fec/python/fec/extended_decoder.py b/gr-fec/python/fec/extended_decoder.py
index a71759c989..9409824981 100644
--- a/gr-fec/python/fec/extended_decoder.py
+++ b/gr-fec/python/fec/extended_decoder.py
@@ -20,75 +20,90 @@ from .capillary_threaded_decoder import capillary_threaded_decoder
class extended_decoder(gr.hier_block2):
-#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density)
-#for i in numpy.arange(.1, .499, .01):
- #print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);)
+ # solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density)
+ # for i in numpy.arange(.1, .499, .01):
+ # print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);)
garbletable = {
- 0.310786835319:0.1,
- 0.279118162802:0.11,
- 0.252699589071:0.12,
- 0.230318516016:0.13,
- 0.211108735347:0.14,
- 0.194434959095:0.15,
- 0.179820650401:0.16,
- 0.166901324951:0.17,
- 0.15539341766:0.18,
- 0.145072979886:0.19,
- 0.135760766313:0.2,
- 0.127311581396:0.21,
- 0.119606529806:0.22,
- 0.112547286766:0.23,
- 0.106051798775:0.24,
- 0.10005101381:0.25,
- 0.0944863633098:0.26,
- 0.0893078003966:0.27,
- 0.084472254501:0.28,
- 0.0799424008658:0.29,
- 0.0756856701944:0.3,
- 0.0716734425668:0.31,
- 0.0678803831565:0.32,
- 0.0642838867856:0.33,
- 0.0608636049994:0.34,
- 0.0576010337489:0.35,
- 0.0544791422522:0.36,
- 0.0514820241933:0.37,
- 0.0485945507251:0.38,
- 0.0458019998183:0.39,
- 0.0430896262596:0.4,
- 0.0404421166935:0.41,
- 0.0378428350972:0.42,
- 0.0352726843274:0.43,
- 0.0327082350617:0.44,
- 0.0301183562535:0.45,
- 0.0274574540266:0.46,
- 0.0246498236897:0.47,
- 0.0215448131298:0.48,
- 0.0177274208353:0.49,
+ 0.310786835319: 0.1,
+ 0.279118162802: 0.11,
+ 0.252699589071: 0.12,
+ 0.230318516016: 0.13,
+ 0.211108735347: 0.14,
+ 0.194434959095: 0.15,
+ 0.179820650401: 0.16,
+ 0.166901324951: 0.17,
+ 0.15539341766: 0.18,
+ 0.145072979886: 0.19,
+ 0.135760766313: 0.2,
+ 0.127311581396: 0.21,
+ 0.119606529806: 0.22,
+ 0.112547286766: 0.23,
+ 0.106051798775: 0.24,
+ 0.10005101381: 0.25,
+ 0.0944863633098: 0.26,
+ 0.0893078003966: 0.27,
+ 0.084472254501: 0.28,
+ 0.0799424008658: 0.29,
+ 0.0756856701944: 0.3,
+ 0.0716734425668: 0.31,
+ 0.0678803831565: 0.32,
+ 0.0642838867856: 0.33,
+ 0.0608636049994: 0.34,
+ 0.0576010337489: 0.35,
+ 0.0544791422522: 0.36,
+ 0.0514820241933: 0.37,
+ 0.0485945507251: 0.38,
+ 0.0458019998183: 0.39,
+ 0.0430896262596: 0.4,
+ 0.0404421166935: 0.41,
+ 0.0378428350972: 0.42,
+ 0.0352726843274: 0.43,
+ 0.0327082350617: 0.44,
+ 0.0301183562535: 0.45,
+ 0.0274574540266: 0.46,
+ 0.0246498236897: 0.47,
+ 0.0215448131298: 0.48,
+ 0.0177274208353: 0.49,
}
- def __init__(self, decoder_obj_list, threading, ann=None, puncpat='11',
- integration_period=10000, flush=None, rotator=None):
- gr.hier_block2.__init__(self, "extended_decoder",
- gr.io_signature(1, 1, gr.sizeof_float),
- gr.io_signature(1, 1, gr.sizeof_char))
- self.blocks=[]
- self.ann=ann
- self.puncpat=puncpat
- self.flush=flush
-
- if(type(decoder_obj_list) == list):
- if(type(decoder_obj_list[0]) == list):
+ def __init__(
+ self,
+ decoder_obj_list,
+ threading,
+ ann=None,
+ puncpat="11",
+ integration_period=10000,
+ flush=None,
+ rotator=None,
+ ):
+ gr.hier_block2.__init__(
+ self,
+ "extended_decoder",
+ gr.io_signature(1, 1, gr.sizeof_float),
+ gr.io_signature(1, 1, gr.sizeof_char),
+ )
+ self.blocks = []
+ self.ann = ann
+ self.puncpat = puncpat
+ self.flush = flush
+
+ if isinstance(decoder_obj_list, list):
+ if isinstance(decoder_obj_list[0], list):
gr.log.info("fec.extended_decoder: Parallelism must be 1.")
raise AttributeError
else:
# If it has parallelism of 0, force it into a list of 1
- decoder_obj_list = [decoder_obj_list,]
+ decoder_obj_list = [
+ decoder_obj_list,
+ ]
- message_collector_connected=False
+ message_collector_connected = False
- ##anything going through the annihilator needs shifted, uchar vals
- if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \
- fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
+ # anything going through the annihilator needs shifted, uchar vals
+ if (
+ fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar"
+ or fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits"
+ ):
self.blocks.append(blocks.multiply_const_ff(48.0))
if fec.get_shift(decoder_obj_list[0]) != 0.0:
@@ -96,66 +111,95 @@ class extended_decoder(gr.hier_block2):
elif fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
self.blocks.append(blocks.add_const_ff(128.0))
- if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \
- fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
- self.blocks.append(blocks.float_to_uchar());
+ if (
+ fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar"
+ or fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits"
+ ):
+ self.blocks.append(blocks.float_to_uchar())
- const_index = 0; #index that corresponds to mod order for specinvert purposes
+ const_index = 0 # index that corresponds to mod order for specinvert purposes
if not self.flush:
- flush = 10000;
+ flush = 10000
else:
- flush = self.flush;
- if self.ann: #ann and puncpat are strings of 0s and 1s
- cat = fec.ULLVector();
+ flush = self.flush
+ if self.ann: # ann and puncpat are strings of 0s and 1s
+ cat = fec.ULLVector()
for i in fec.read_big_bitlist(ann):
- cat.append(i);
+ cat.append(i)
- synd_garble = .49
- idx_list = list(self.garbletable.keys())
- idx_list.sort()
+ synd_garble = 0.49
+ idx_list = sorted(self.garbletable.keys())
for i in idx_list:
- if 1.0 / self.ann.count('1') >= i:
+ if 1.0 / self.ann.count("1") >= i:
synd_garble = self.garbletable[i]
- print('using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb')
- print('ceiling: .0335 data garble rate')
- self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'),
- len(ann), integration_period, flush, synd_garble))
-
- if self.puncpat != '11':
- self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+ print(
+ "using syndrom garble threshold "
+ + str(synd_garble)
+ + "for conv_bit_corr_bb"
+ )
+ print("ceiling: .0335 data garble rate")
+ self.blocks.append(
+ fec.conv_bit_corr_bb(
+ cat,
+ len(puncpat) - puncpat.count("0"),
+ len(ann),
+ integration_period,
+ flush,
+ synd_garble,
+ )
+ )
+
+ if self.puncpat != "11":
+ self.blocks.append(
+ fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0)
+ )
if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
self.blocks.append(blocks.uchar_to_float())
self.blocks.append(blocks.add_const_ff(-128.0))
self.blocks.append(digital.binary_slicer_fb())
- self.blocks.append(blocks.unpacked_to_packed_bb(1,0))
+ self.blocks.append(blocks.unpacked_to_packed_bb(1, 0))
- if(len(decoder_obj_list) > 1):
- if(fec.get_history(decoder_obj_list[0]) != 0):
- gr.log.info("fec.extended_decoder: Cannot use multi-threaded parallelism on a decoder with history.")
+ if len(decoder_obj_list) > 1:
+ if fec.get_history(decoder_obj_list[0]) != 0:
+ gr.log.info(
+ "fec.extended_decoder: Cannot use multi-threaded parallelism on a decoder with history."
+ )
raise AttributeError
- if threading == 'capillary':
- self.blocks.append(capillary_threaded_decoder(decoder_obj_list,
- fec.get_decoder_input_item_size(decoder_obj_list[0]),
- fec.get_decoder_output_item_size(decoder_obj_list[0])))
-
- elif threading == 'ordinary':
- self.blocks.append(threaded_decoder(decoder_obj_list,
- fec.get_decoder_input_item_size(decoder_obj_list[0]),
- fec.get_decoder_output_item_size(decoder_obj_list[0])))
+ if threading == "capillary":
+ self.blocks.append(
+ capillary_threaded_decoder(
+ decoder_obj_list,
+ fec.get_decoder_input_item_size(decoder_obj_list[0]),
+ fec.get_decoder_output_item_size(decoder_obj_list[0]),
+ )
+ )
+
+ elif threading == "ordinary":
+ self.blocks.append(
+ threaded_decoder(
+ decoder_obj_list,
+ fec.get_decoder_input_item_size(decoder_obj_list[0]),
+ fec.get_decoder_output_item_size(decoder_obj_list[0]),
+ )
+ )
else:
- self.blocks.append(fec.decoder(decoder_obj_list[0],
- fec.get_decoder_input_item_size(decoder_obj_list[0]),
- fec.get_decoder_output_item_size(decoder_obj_list[0])))
+ self.blocks.append(
+ fec.decoder(
+ decoder_obj_list[0],
+ fec.get_decoder_input_item_size(decoder_obj_list[0]),
+ fec.get_decoder_output_item_size(decoder_obj_list[0]),
+ )
+ )
if fec.get_decoder_output_conversion(decoder_obj_list[0]) == "unpack":
- self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST));
+ self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
- self.connect((self, 0), (self.blocks[0], 0));
- self.connect((self.blocks[-1], 0), (self, 0));
+ self.connect((self, 0), (self.blocks[0], 0))
+ self.connect((self.blocks[-1], 0), (self, 0))
for i in range(len(self.blocks) - 1):
- self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
+ self.connect((self.blocks[i], 0), (self.blocks[i + 1], 0))
diff --git a/gr-fec/python/fec/extended_tagged_decoder.py b/gr-fec/python/fec/extended_tagged_decoder.py
index 95bffc453a..0489d738ae 100644
--- a/gr-fec/python/fec/extended_tagged_decoder.py
+++ b/gr-fec/python/fec/extended_tagged_decoder.py
@@ -18,67 +18,79 @@ from .bitflip import read_bitlist
class extended_tagged_decoder(gr.hier_block2):
-#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density)
-#for i in numpy.arange(.1, .499, .01):
- #print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);)
+ # solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density)
+ # for i in numpy.arange(.1, .499, .01):
+ # print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);)
garbletable = {
- 0.310786835319:0.1,
- 0.279118162802:0.11,
- 0.252699589071:0.12,
- 0.230318516016:0.13,
- 0.211108735347:0.14,
- 0.194434959095:0.15,
- 0.179820650401:0.16,
- 0.166901324951:0.17,
- 0.15539341766:0.18,
- 0.145072979886:0.19,
- 0.135760766313:0.2,
- 0.127311581396:0.21,
- 0.119606529806:0.22,
- 0.112547286766:0.23,
- 0.106051798775:0.24,
- 0.10005101381:0.25,
- 0.0944863633098:0.26,
- 0.0893078003966:0.27,
- 0.084472254501:0.28,
- 0.0799424008658:0.29,
- 0.0756856701944:0.3,
- 0.0716734425668:0.31,
- 0.0678803831565:0.32,
- 0.0642838867856:0.33,
- 0.0608636049994:0.34,
- 0.0576010337489:0.35,
- 0.0544791422522:0.36,
- 0.0514820241933:0.37,
- 0.0485945507251:0.38,
- 0.0458019998183:0.39,
- 0.0430896262596:0.4,
- 0.0404421166935:0.41,
- 0.0378428350972:0.42,
- 0.0352726843274:0.43,
- 0.0327082350617:0.44,
- 0.0301183562535:0.45,
- 0.0274574540266:0.46,
- 0.0246498236897:0.47,
- 0.0215448131298:0.48,
- 0.0177274208353:0.49,
+ 0.310786835319: 0.1,
+ 0.279118162802: 0.11,
+ 0.252699589071: 0.12,
+ 0.230318516016: 0.13,
+ 0.211108735347: 0.14,
+ 0.194434959095: 0.15,
+ 0.179820650401: 0.16,
+ 0.166901324951: 0.17,
+ 0.15539341766: 0.18,
+ 0.145072979886: 0.19,
+ 0.135760766313: 0.2,
+ 0.127311581396: 0.21,
+ 0.119606529806: 0.22,
+ 0.112547286766: 0.23,
+ 0.106051798775: 0.24,
+ 0.10005101381: 0.25,
+ 0.0944863633098: 0.26,
+ 0.0893078003966: 0.27,
+ 0.084472254501: 0.28,
+ 0.0799424008658: 0.29,
+ 0.0756856701944: 0.3,
+ 0.0716734425668: 0.31,
+ 0.0678803831565: 0.32,
+ 0.0642838867856: 0.33,
+ 0.0608636049994: 0.34,
+ 0.0576010337489: 0.35,
+ 0.0544791422522: 0.36,
+ 0.0514820241933: 0.37,
+ 0.0485945507251: 0.38,
+ 0.0458019998183: 0.39,
+ 0.0430896262596: 0.4,
+ 0.0404421166935: 0.41,
+ 0.0378428350972: 0.42,
+ 0.0352726843274: 0.43,
+ 0.0327082350617: 0.44,
+ 0.0301183562535: 0.45,
+ 0.0274574540266: 0.46,
+ 0.0246498236897: 0.47,
+ 0.0215448131298: 0.48,
+ 0.0177274208353: 0.49,
}
- def __init__(self, decoder_obj_list, ann=None, puncpat='11',
- integration_period=10000, flush=None, rotator=None, lentagname=None,
- mtu=1500):
- gr.hier_block2.__init__(self, "extended_decoder",
- gr.io_signature(1, 1, gr.sizeof_float),
- gr.io_signature(1, 1, gr.sizeof_char))
- self.blocks=[]
- self.ann=ann
- self.puncpat=puncpat
- self.flush=flush
-
- if(type(decoder_obj_list) == list):
+ def __init__(
+ self,
+ decoder_obj_list,
+ ann=None,
+ puncpat="11",
+ integration_period=10000,
+ flush=None,
+ rotator=None,
+ lentagname=None,
+ mtu=1500,
+ ):
+ gr.hier_block2.__init__(
+ self,
+ "extended_decoder",
+ gr.io_signature(1, 1, gr.sizeof_float),
+ gr.io_signature(1, 1, gr.sizeof_char),
+ )
+ self.blocks = []
+ self.ann = ann
+ self.puncpat = puncpat
+ self.flush = flush
+
+ if isinstance(decoder_obj_list, list):
# This block doesn't handle parallelism of > 1
- # We could just grab encoder [0][0], but we don't want to encourage this.
- if(type(decoder_obj_list[0]) == list):
+ # We could just grab encoder [0][0], but we don't want to encourage
+ # this.
+ if isinstance(decoder_obj_list[0], list):
gr.log.info("fec.extended_tagged_decoder: Parallelism must be 1.")
raise AttributeError
@@ -90,15 +102,17 @@ class extended_tagged_decoder(gr.hier_block2):
# If lentagname is None, fall back to using the non tagged
# stream version
- if type(lentagname) == str:
- if(lentagname.lower() == 'none'):
+ if isinstance(lentagname, str):
+ if lentagname.lower() == "none":
lentagname = None
- message_collector_connected=False
+ message_collector_connected = False
- ##anything going through the annihilator needs shifted, uchar vals
- if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \
- fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
+ # anything going through the annihilator needs shifted, uchar vals
+ if (
+ fec.get_decoder_input_conversion(decoder_obj) == "uchar"
+ or fec.get_decoder_input_conversion(decoder_obj) == "packed_bits"
+ ):
self.blocks.append(blocks.multiply_const_ff(48.0))
if fec.get_shift(decoder_obj) != 0.0:
@@ -106,57 +120,81 @@ class extended_tagged_decoder(gr.hier_block2):
elif fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
self.blocks.append(blocks.add_const_ff(128.0))
- if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \
- fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
- self.blocks.append(blocks.float_to_uchar());
+ if (
+ fec.get_decoder_input_conversion(decoder_obj) == "uchar"
+ or fec.get_decoder_input_conversion(decoder_obj) == "packed_bits"
+ ):
+ self.blocks.append(blocks.float_to_uchar())
- const_index = 0; #index that corresponds to mod order for specinvert purposes
+ const_index = 0 # index that corresponds to mod order for specinvert purposes
if not self.flush:
- flush = 10000;
+ flush = 10000
else:
- flush = self.flush;
- if self.ann: #ann and puncpat are strings of 0s and 1s
- cat = fec.ULLVector();
+ flush = self.flush
+ if self.ann: # ann and puncpat are strings of 0s and 1s
+ cat = fec.ULLVector()
for i in fec.read_big_bitlist(ann):
- cat.append(i);
+ cat.append(i)
- synd_garble = .49
- idx_list = list(self.garbletable.keys())
- idx_list.sort()
+ synd_garble = 0.49
+ idx_list = sorted(self.garbletable.keys())
for i in idx_list:
- if 1.0 / self.ann.count('1') >= i:
+ if 1.0 / self.ann.count("1") >= i:
synd_garble = self.garbletable[i]
- print('using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb')
- print('ceiling: .0335 data garble rate')
- self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'),
- len(ann), integration_period, flush, synd_garble))
-
- if self.puncpat != '11':
- self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+ print(
+ "using syndrom garble threshold "
+ + str(synd_garble)
+ + "for conv_bit_corr_bb"
+ )
+ print("ceiling: .0335 data garble rate")
+ self.blocks.append(
+ fec.conv_bit_corr_bb(
+ cat,
+ len(puncpat) - puncpat.count("0"),
+ len(ann),
+ integration_period,
+ flush,
+ synd_garble,
+ )
+ )
+
+ if self.puncpat != "11":
+ self.blocks.append(
+ fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0)
+ )
if fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
self.blocks.append(blocks.uchar_to_float())
self.blocks.append(blocks.add_const_ff(-128.0))
self.blocks.append(digital.binary_slicer_fb())
- self.blocks.append(blocks.unpacked_to_packed_bb(1,0))
+ self.blocks.append(blocks.unpacked_to_packed_bb(1, 0))
else:
- if(not lentagname):
- self.blocks.append(fec.decoder(decoder_obj,
- fec.get_decoder_input_item_size(decoder_obj),
- fec.get_decoder_output_item_size(decoder_obj)))
+ if not lentagname:
+ self.blocks.append(
+ fec.decoder(
+ decoder_obj,
+ fec.get_decoder_input_item_size(decoder_obj),
+ fec.get_decoder_output_item_size(decoder_obj),
+ )
+ )
else:
- self.blocks.append(fec.tagged_decoder(decoder_obj,
- fec.get_decoder_input_item_size(decoder_obj),
- fec.get_decoder_output_item_size(decoder_obj),
- lentagname, mtu))
+ self.blocks.append(
+ fec.tagged_decoder(
+ decoder_obj,
+ fec.get_decoder_input_item_size(decoder_obj),
+ fec.get_decoder_output_item_size(decoder_obj),
+ lentagname,
+ mtu,
+ )
+ )
if fec.get_decoder_output_conversion(decoder_obj) == "unpack":
- self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST));
+ self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST))
- self.connect((self, 0), (self.blocks[0], 0));
- self.connect((self.blocks[-1], 0), (self, 0));
+ self.connect((self, 0), (self.blocks[0], 0))
+ self.connect((self.blocks[-1], 0), (self, 0))
for i in range(len(self.blocks) - 1):
- self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
+ self.connect((self.blocks[i], 0), (self.blocks[i + 1], 0))
diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py
index 629a7e3cfd..2ec78679a5 100644
--- a/gr-fec/python/fec/polar/channel_construction.py
+++ b/gr-fec/python/fec/polar/channel_construction.py
@@ -6,20 +6,22 @@
#
#
-'''
+"""
[0] Erdal Arikan: 'Channel Polarization: A Method for Constructing Capacity-Achieving Codes for Symmetric Binary-Input Memoryless Channels', 2009
foundational paper for polar codes.
-'''
+"""
from .channel_construction_bec import calculate_bec_channel_capacities
from .channel_construction_bec import design_snr_to_bec_eta
from .channel_construction_bec import bhattacharyya_bounds
import numpy as np
+
try:
from .channel_construction_awgn import tal_vardy_tpm_algorithm
except ImportError:
print("SciPy missing. Overwrite Tal-Vardy algorithm with BEC approximation")
+
def tal_vardy_tpm_algorithm(block_size, design_snr, mu):
return bhattacharyya_bounds(design_snr, block_size)
@@ -59,7 +61,7 @@ def get_frozen_bit_mask(frozen_indices, block_size):
def frozen_bit_positions(block_size, info_size, design_snr=0.0):
if not design_snr > -1.5917:
- print('bad value for design_nsr, must be > -1.5917! default=0.0')
+ print("bad value for design_nsr, must be > -1.5917! default=0.0")
design_snr = 0.0
eta = design_snr_to_bec_eta(design_snr)
return get_bec_frozen_indices(block_size, block_size - info_size, eta)
@@ -74,6 +76,7 @@ def generate_filename(block_size, design_snr, mu):
def default_dir():
dir_def = "~/.gnuradio/polar/"
import os
+
path = os.path.expanduser(dir_def)
try:
@@ -84,7 +87,9 @@ def default_dir():
return path
-def save_z_parameters(z_params, block_size, design_snr, mu, alt_construction_method='Tal-Vardy algorithm'):
+def save_z_parameters(
+ z_params, block_size, design_snr, mu, alt_construction_method="Tal-Vardy algorithm"
+):
path = default_dir()
filename = generate_filename(block_size, design_snr, mu)
header = Z_PARAM_FIRST_HEADER_LINE + "\n"
@@ -101,6 +106,7 @@ def load_z_parameters(block_size, design_snr, mu):
filename = generate_filename(block_size, design_snr, mu)
full_file = path + filename
import os
+
if not os.path.isfile(full_file):
z_params = tal_vardy_tpm_algorithm(block_size, design_snr, mu)
save_z_parameters(z_params, block_size, design_snr, mu)
@@ -110,7 +116,7 @@ def load_z_parameters(block_size, design_snr, mu):
def main():
np.set_printoptions(precision=3, linewidth=150)
- print('channel construction Bhattacharyya bounds by Arikan')
+ print("channel construction Bhattacharyya bounds by Arikan")
n = 10
m = 2 ** n
k = m // 2
@@ -123,10 +129,11 @@ def main():
if 0:
import matplotlib.pyplot as plt
+
plt.plot(z_params)
plt.plot(z_bounds)
plt.show()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/gr-fec/python/fec/polar/channel_construction_awgn.py b/gr-fec/python/fec/polar/channel_construction_awgn.py
index 8a81c68076..667c17b8fc 100644
--- a/gr-fec/python/fec/polar/channel_construction_awgn.py
+++ b/gr-fec/python/fec/polar/channel_construction_awgn.py
@@ -7,20 +7,24 @@
#
-'''
+"""
Based on 2 papers:
[1] Ido Tal, Alexander Vardy: 'How To Construct Polar Codes', 2013
for an in-depth description of a widely used algorithm for channel construction.
[2] Harish Vangala, Emanuele Viterbo, Yi Hong: 'A Comparative Study of Polar Code Constructions for the AWGN Channel', 2015
for an overview of different approaches
-'''
+"""
from scipy.optimize import fsolve
from scipy.special import erfc
import numpy as np
-from .helper_functions import (bhattacharyya_parameter, bit_reverse_vector,
- power_of_2_int, show_progress_bar)
+from .helper_functions import (
+ bhattacharyya_parameter,
+ bit_reverse_vector,
+ power_of_2_int,
+ show_progress_bar,
+)
from .channel_construction_bec import bhattacharyya_bounds
@@ -45,7 +49,7 @@ def codeword_lambda(y, s):
def instantanious_capacity_callable():
- return lambda x : 1 - np.log2(1 + x) + (x * np.log2(x) / (1 + x))
+ return lambda x: 1 - np.log2(1 + x) + (x * np.log2(x) / (1 + x))
def instantanious_capacity(x):
@@ -54,11 +58,11 @@ def instantanious_capacity(x):
def q_function(x):
# Q(x) = (1 / sqrt(2 * pi) ) * integral (x to inf) exp(- x ^ 2 / 2) dx
- return .5 * erfc(x / np.sqrt(2))
+ return 0.5 * erfc(x / np.sqrt(2))
def discretize_awgn(mu, design_snr):
- '''
+ """
needed for Binary-AWGN channels.
in [1] described in Section VI
in [2] described as a function of the same name.
@@ -68,18 +72,20 @@ def discretize_awgn(mu, design_snr):
2. split into mu intervals.
3. find corresponding output alphabet values y of likelihood ratio function lambda(y) inserted into C(x)
4. Calculate probability for each value given that a '0' or '1' is was transmitted.
- '''
+ """
s = 10 ** (design_snr / 10)
a = np.zeros(mu + 1, dtype=float)
a[-1] = np.inf
for i in range(1, mu):
- a[i] = solve_capacity(1. * i / mu, s)
+ a[i] = solve_capacity(1.0 * i / mu, s)
factor = np.sqrt(2 * s)
tpm = np.zeros((2, mu))
for j in range(mu):
tpm[0][j] = q_function(factor + a[j]) - q_function(factor + a[j + 1])
- tpm[1][j] = q_function(-1. * factor + a[j]) - q_function(-1. * factor + a[j + 1])
+ tpm[1][j] = q_function(-1.0 * factor + a[j]) - q_function(
+ -1.0 * factor + a[j + 1]
+ )
tpm = tpm[::-1]
tpm[0] = tpm[0][::-1]
@@ -88,7 +94,11 @@ def discretize_awgn(mu, design_snr):
def instant_capacity_delta_callable():
- return lambda a, b: -1. * (a + b) * np.log2((a + b) / 2) + a * np.log2(a) + b * np.log2(b)
+ return (
+ lambda a, b: -1.0 * (a + b) * np.log2((a + b) / 2)
+ + a * np.log2(a)
+ + b * np.log2(b)
+ )
def capacity_delta_callable():
@@ -101,7 +111,7 @@ def quantize_to_size(tpm, mu):
calculate_delta_I = capacity_delta_callable()
L = np.shape(tpm)[1]
if not mu < L:
- print('WARNING: This channel gets too small!')
+ print("WARNING: This channel gets too small!")
# lambda works on vectors just fine. Use Numpy vector awesomeness.
delta_i_vec = calculate_delta_I(tpm[0, 0:-1], tpm[1, 0:-1], tpm[0, 1:], tpm[1, 1:])
@@ -133,8 +143,12 @@ def tal_vardy_tpm_algorithm(block_size, design_snr, mu):
channels = np.zeros((block_size, 2, mu))
channels[0] = discretize_awgn(mu, design_snr) * 2
- print('Constructing polar code with Tal-Vardy algorithm')
- print('(block_size = {0}, design SNR = {1}, mu = {2}'.format(block_size, design_snr, 2 * mu))
+ print("Constructing polar code with Tal-Vardy algorithm")
+ print(
+ "(block_size = {0}, design SNR = {1}, mu = {2}".format(
+ block_size, design_snr, 2 * mu
+ )
+ )
show_progress_bar(0, block_size)
for j in range(0, block_power):
u = 2 ** j
@@ -153,8 +167,8 @@ def tal_vardy_tpm_algorithm(block_size, design_snr, mu):
z = z[bit_reverse_vector(np.arange(block_size), block_power)]
z = upper_bound_z_params(z, block_size, design_snr)
show_progress_bar(block_size, block_size)
- print('')
- print('channel construction DONE')
+ print("")
+ print("channel construction DONE")
return z
@@ -239,7 +253,7 @@ def normalize_q(q, tpm):
def main():
- print('channel construction AWGN main')
+ print("channel construction AWGN main")
n = 8
m = 2 ** n
design_snr = 0.0
@@ -250,9 +264,10 @@ def main():
if 0:
import matplotlib.pyplot as plt
+
plt.plot(z_params)
plt.show()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/gr-fec/python/fec/polar/common.py b/gr-fec/python/fec/polar/common.py
index 3c3410bf85..e60cc911f2 100644
--- a/gr-fec/python/fec/polar/common.py
+++ b/gr-fec/python/fec/polar/common.py
@@ -7,13 +7,12 @@
#
-
import numpy as np
from .helper_functions import bit_reverse_vector, is_power_of_two
-'''
+"""
PolarCommon holds value checks and common initializer code for both Encoder and Decoder.
-'''
+"""
class PolarCommon(object):
@@ -23,15 +22,25 @@ class PolarCommon(object):
if frozenbits is None:
frozenbits = np.zeros(n - k, dtype=np.int)
if not len(frozenbits) == n - k:
- raise ValueError("len(frozenbits)={0} is not equal to n-k={1}!".format(len(frozenbits), n - k))
+ raise ValueError(
+ "len(frozenbits)={0} is not equal to n-k={1}!".format(
+ len(frozenbits), n - k
+ )
+ )
if not frozenbits.dtype == np.int:
frozenbits = frozenbits.astype(dtype=int)
if not len(frozen_bit_position) == (n - k):
- raise ValueError("len(frozen_bit_position)={0} is not equal to n-k={1}!".format(len(frozen_bit_position), n - k))
+ raise ValueError(
+ "len(frozen_bit_position)={0} is not equal to n-k={1}!".format(
+ len(frozen_bit_position), n - k
+ )
+ )
if not frozen_bit_position.dtype == np.int:
frozen_bit_position = frozen_bit_position.astype(dtype=int)
- self.bit_reverse_positions = self._vector_bit_reversed(np.arange(n, dtype=int), int(np.log2(n)))
+ self.bit_reverse_positions = self._vector_bit_reversed(
+ np.arange(n, dtype=int), int(np.log2(n))
+ )
self.N = n
self.power = int(np.log2(self.N))
self.K = k
@@ -65,7 +74,8 @@ class PolarCommon(object):
return vec
def _encode_natural_order(self, vec):
- # use this function. It reflects the encoding process implemented in VOLK.
+ # use this function. It reflects the encoding process implemented in
+ # VOLK.
vec = vec[self.bit_reverse_positions]
return self._encode_efficient(vec)
diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py
index b2350f9ef6..4e1b40a7f7 100644
--- a/gr-fec/python/fec/polar/testbed.py
+++ b/gr-fec/python/fec/polar/testbed.py
@@ -7,7 +7,6 @@
#
-
from .encoder import PolarEncoder
from .decoder import PolarDecoder
from . import channel_construction as cc
@@ -22,7 +21,9 @@ def get_frozen_bit_position():
# frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
m = 256
n_frozen = m // 2
- frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, m), n_frozen)
+ frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(
+ cc.bhattacharyya_bounds(0.0, m), n_frozen
+ )
print(frozenbitposition)
return frozenbitposition
@@ -40,7 +41,9 @@ def test_enc_dec_chain():
encoded = encoder.encode(bits)
rx = decoder.decode(encoded)
if not is_equal(bits, rx):
- raise ValueError('Test #', i, 'failed, input and output differ', bits, '!=', rx)
+ raise ValueError(
+ "Test #", i, "failed, input and output differ", bits, "!=", rx
+ )
return
@@ -48,7 +51,9 @@ def is_equal(first, second):
if not (first == second).all():
result = first == second
for i in range(len(result)):
- print('{0:4}: {1:2} == {2:1} = {3}'.format(i, first[i], second[i], result[i]))
+ print(
+ "{0:4}: {1:2} == {2:1} = {3}".format(i, first[i], second[i], result[i])
+ )
return False
return True
@@ -62,11 +67,11 @@ def approx_value(la, lb):
def path_metric_exact(last_pm, llr, ui):
- return last_pm + np.log(1 + np.exp(-1. * llr * (1 - 2 * ui)))
+ return last_pm + np.log(1 + np.exp(-1.0 * llr * (1 - 2 * ui)))
def path_metric_approx(last_pm, llr, ui):
- if ui == int(.5 * (1 - np.sign(llr))):
+ if ui == int(0.5 * (1 - np.sign(llr))):
return last_pm
return last_pm + np.abs(llr)
@@ -97,15 +102,17 @@ def test_1024_rate_1_code():
bits = np.random.randint(2, size=k)
tx = encoder.encode(bits)
np.random.shuffle(possible_indices)
- tx[possible_indices[0:num_transitions]] = (tx[possible_indices[0:num_transitions]] + 1) % 2
+ tx[possible_indices[0:num_transitions]] = (
+ tx[possible_indices[0:num_transitions]] + 1
+ ) % 2
rx = tx
recv = decoder.decode(rx)
- channel_counter += (bits == recv)
+ channel_counter += bits == recv
print(channel_counter)
print(np.min(channel_counter), np.max(channel_counter))
- np.save('channel_counter_' + str(ntests) + '.npy', channel_counter)
+ np.save("channel_counter_" + str(ntests) + ".npy", channel_counter)
def find_good_indices(res, nindices):
@@ -121,16 +128,18 @@ def find_good_indices(res, nindices):
def channel_analysis():
ntests = 10000
- filename = 'channel_counter_' + str(ntests) + '.npy'
+ filename = "channel_counter_" + str(ntests) + ".npy"
channel_counter = np.load(filename)
print(np.min(channel_counter), np.max(channel_counter))
channel_counter[0] = np.min(channel_counter)
good_indices = find_good_indices(channel_counter, channel_counter.size // 2)
info_bit_positions = np.where(good_indices > 0)
print(info_bit_positions)
- frozen_bit_positions = np.delete(np.arange(channel_counter.size), info_bit_positions)
+ frozen_bit_positions = np.delete(
+ np.arange(channel_counter.size), info_bit_positions
+ )
print(frozen_bit_positions)
- np.save('frozen_bit_positions_n256_k128_p0.11.npy', frozen_bit_positions)
+ np.save("frozen_bit_positions_n256_k128_p0.11.npy", frozen_bit_positions)
good_indices *= 2000
good_indices += 4000
@@ -142,38 +151,38 @@ def channel_analysis():
def merge_first_stage(init_mask):
merged_frozen_mask = []
for e in range(0, len(init_mask), 2):
- v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]]
- s = init_mask[e]['size'] * 2
- if init_mask[e]['type'] == init_mask[e + 1]['type']:
- t = init_mask[e]['type']
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ v = [init_mask[e]["value"][0], init_mask[e + 1]["value"][0]]
+ s = init_mask[e]["size"] * 2
+ if init_mask[e]["type"] == init_mask[e + 1]["type"]:
+ t = init_mask[e]["type"]
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
else:
- t = 'RPT'
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ t = "RPT"
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
return merged_frozen_mask
def merge_second_stage(init_mask):
merged_frozen_mask = []
for e in range(0, len(init_mask), 2):
- if init_mask[e]['type'] == init_mask[e + 1]['type']:
- t = init_mask[e]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
- t = init_mask[e + 1]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE':
- t = 'SPC'
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ if init_mask[e]["type"] == init_mask[e + 1]["type"]:
+ t = init_mask[e]["type"]
+ v = init_mask[e]["value"]
+ v.extend(init_mask[e + 1]["value"])
+ s = init_mask[e]["size"] * 2
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
+ elif init_mask[e]["type"] == "ZERO" and init_mask[e + 1]["type"] == "RPT":
+ t = init_mask[e + 1]["type"]
+ v = init_mask[e]["value"]
+ v.extend(init_mask[e + 1]["value"])
+ s = init_mask[e]["size"] * 2
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
+ elif init_mask[e]["type"] == "RPT" and init_mask[e + 1]["type"] == "ONE":
+ t = "SPC"
+ v = init_mask[e]["value"]
+ v.extend(init_mask[e + 1]["value"])
+ s = init_mask[e]["size"] * 2
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
else:
merged_frozen_mask.append(init_mask[e])
merged_frozen_mask.append(init_mask[e + 1])
@@ -184,25 +193,27 @@ def merge_stage_n(init_mask):
merged_frozen_mask = []
n_elems = len(init_mask) - (len(init_mask) % 2)
for e in range(0, n_elems, 2):
- if init_mask[e]['size'] == init_mask[e + 1]['size']:
- if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']:
- t = init_mask[e]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
- t = init_mask[e + 1]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE':
- t = init_mask[e]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ if init_mask[e]["size"] == init_mask[e + 1]["size"]:
+ if (
+ init_mask[e]["type"] == "ZERO" or init_mask[e]["type"] == "ONE"
+ ) and init_mask[e]["type"] == init_mask[e + 1]["type"]:
+ t = init_mask[e]["type"]
+ v = init_mask[e]["value"]
+ v.extend(init_mask[e + 1]["value"])
+ s = init_mask[e]["size"] * 2
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
+ elif init_mask[e]["type"] == "ZERO" and init_mask[e + 1]["type"] == "RPT":
+ t = init_mask[e + 1]["type"]
+ v = init_mask[e]["value"]
+ v.extend(init_mask[e + 1]["value"])
+ s = init_mask[e]["size"] * 2
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
+ elif init_mask[e]["type"] == "SPC" and init_mask[e + 1]["type"] == "ONE":
+ t = init_mask[e]["type"]
+ v = init_mask[e]["value"]
+ v.extend(init_mask[e + 1]["value"])
+ s = init_mask[e]["size"] * 2
+ merged_frozen_mask.append({"value": v, "type": t, "size": s})
else:
merged_frozen_mask.append(init_mask[e])
merged_frozen_mask.append(init_mask[e + 1])
@@ -279,7 +290,7 @@ def find_decoder_subframes(frozen_mask):
sub_mask = mask.flatten()
lock_mask = lock.flatten()
- words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'}
+ words = {0: "ZERO", 1: "ONE", 2: "RPT", 3: "SPC"}
ll = lock_mask[0]
sub_t = sub_mask[0]
for i in range(len(frozen_mask)):
@@ -289,18 +300,20 @@ def find_decoder_subframes(frozen_mask):
# if i % 8 == 0:
# print
if not l == ll or not sub_mask[i] == sub_t:
- print('--------------------------')
+ print("--------------------------")
ll = l
sub_t = sub_mask[i]
- print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t))
+ print("{0:4} lock {1:4} value: {2} in sub {3}".format(i, 2 ** (l + 1), v, t))
def systematic_encoder_decoder_chain_test():
- print('systematic encoder decoder chain test')
+ print("systematic encoder decoder chain test")
block_size = int(2 ** 8)
info_bit_size = block_size // 2
ntests = 100
- frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, block_size), block_size - info_bit_size)
+ frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(
+ cc.bhattacharyya_bounds(0.0, block_size), block_size - info_bit_size
+ )
encoder = PolarEncoder(block_size, info_bit_size, frozenbitposition)
decoder = PolarDecoder(block_size, info_bit_size, frozenbitposition)
for i in range(ntests):
@@ -333,5 +346,5 @@ def main():
systematic_encoder_decoder_chain_test()
-if __name__ == '__main__':
+if __name__ == "__main__":
main()
diff --git a/gr-utils/plot_tools/gr_plot_const b/gr-utils/plot_tools/gr_plot_const
index 463baefd76..18f781e516 100755
--- a/gr-utils/plot_tools/gr_plot_const
+++ b/gr-utils/plot_tools/gr_plot_const
@@ -18,18 +18,22 @@ try:
draw,
figtext,
figure,
+ show,
get_current_fig_manager,
rcParams,
searchsorted,
- show)
+ )
from matplotlib.font_manager import fontManager, FontProperties
except ImportError:
- print("Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \
- Python TkInter https://wiki.python.org/moin/TkInter to run this script")
+ print(
+ "Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \
+ Python TkInter https://wiki.python.org/moin/TkInter to run this script"
+ )
raise SystemExit(1)
from argparse import ArgumentParser
+
class draw_constellation:
def __init__(self, filename, options):
self.hfile = open(filename, "r")
@@ -38,55 +42,78 @@ class draw_constellation:
self.sample_rate = options.sample_rate
self.datatype = numpy.complex64
- self.sizeof_data = self.datatype().nbytes # number of bytes per sample in file
+ # number of bytes per sample in file
+ self.sizeof_data = self.datatype().nbytes
self.axis_font_size = 16
self.label_font_size = 18
self.title_font_size = 20
# Setup PLOT
- self.fig = figure(1, figsize=(16, 9), facecolor='w')
- rcParams['xtick.labelsize'] = self.axis_font_size
- rcParams['ytick.labelsize'] = self.axis_font_size
-
- self.text_file = figtext(0.10, 0.95, ("File: %s" % filename), weight="heavy", size=16)
- self.text_file_pos = figtext(0.10, 0.90, "File Position: ", weight="heavy", size=16)
- self.text_block = figtext(0.40, 0.90, ("Block Size: %d" % self.block_length),
- weight="heavy", size=16)
- self.text_sr = figtext(0.60, 0.90, ("Sample Rate: %.2f" % self.sample_rate),
- weight="heavy", size=16)
+ self.fig = figure(1, figsize=(16, 9), facecolor="w")
+ rcParams["xtick.labelsize"] = self.axis_font_size
+ rcParams["ytick.labelsize"] = self.axis_font_size
+
+ self.text_file = figtext(
+ 0.10, 0.95, ("File: %s" % filename), weight="heavy", size=16
+ )
+ self.text_file_pos = figtext(
+ 0.10, 0.90, "File Position: ", weight="heavy", size=16
+ )
+ self.text_block = figtext(
+ 0.40, 0.90, ("Block Size: %d" % self.block_length), weight="heavy", size=16
+ )
+ self.text_sr = figtext(
+ 0.60,
+ 0.90,
+ ("Sample Rate: %.2f" % self.sample_rate),
+ weight="heavy",
+ size=16,
+ )
self.make_plots()
- self.button_left_axes = self.fig.add_axes([0.45, 0.01, 0.05, 0.05], frameon=True)
+ self.button_left_axes = self.fig.add_axes(
+ [0.45, 0.01, 0.05, 0.05], frameon=True
+ )
self.button_left = Button(self.button_left_axes, "<")
self.button_left_callback = self.button_left.on_clicked(self.button_left_click)
- self.button_right_axes = self.fig.add_axes([0.50, 0.01, 0.05, 0.05], frameon=True)
+ self.button_right_axes = self.fig.add_axes(
+ [0.50, 0.01, 0.05, 0.05], frameon=True
+ )
self.button_right = Button(self.button_right_axes, ">")
- self.button_right_callback = self.button_right.on_clicked(self.button_right_click)
+ self.button_right_callback = self.button_right.on_clicked(
+ self.button_right_click
+ )
self.xlim = self.sp_iq.get_xlim()
self.manager = get_current_fig_manager()
- connect('draw_event', self.zoom)
- connect('key_press_event', self.click)
- connect('button_press_event', self.mouse_button_callback)
+ connect("draw_event", self.zoom)
+ connect("key_press_event", self.click)
+ connect("button_press_event", self.mouse_button_callback)
show()
def get_data(self):
- self.text_file_pos.set_text("File Position: %d" % (self.hfile.tell()//self.sizeof_data))
+ self.text_file_pos.set_text(
+ "File Position: %d" % (self.hfile.tell() // self.sizeof_data)
+ )
try:
- iq = numpy.fromfile(self.hfile, dtype=self.datatype, count=self.block_length)
+ iq = numpy.fromfile(
+ self.hfile, dtype=self.datatype, count=self.block_length
+ )
except MemoryError:
print("End of File")
else:
# retesting length here as newer version of numpy does not throw a MemoryError, just
# returns a zero-length array
- if(len(iq) > 0):
+ if len(iq) > 0:
self.reals = numpy.array([r.real for r in iq])
self.imags = numpy.array([i.imag for i in iq])
- self.time = numpy.array([i*(1/self.sample_rate) for i in range(len(self.reals))])
+ self.time = numpy.array(
+ [i * (1 / self.sample_rate) for i in range(len(self.reals))]
+ )
return True
else:
print("End of File")
@@ -94,34 +121,79 @@ class draw_constellation:
def make_plots(self):
# if specified on the command-line, set file pointer
- self.hfile.seek(self.sizeof_data*self.start, 1)
+ self.hfile.seek(self.sizeof_data * self.start, 1)
r = self.get_data()
# Subplot for real and imaginary parts of signal
- self.sp_iq = self.fig.add_subplot(2,1,1, position=[0.075, 0.2, 0.4, 0.6])
+ self.sp_iq = self.fig.add_subplot(2, 1, 1, position=[0.075, 0.2, 0.4, 0.6])
self.sp_iq.set_title(("I&Q"), fontsize=self.title_font_size, fontweight="bold")
- self.sp_iq.set_xlabel("Time (s)", fontsize=self.label_font_size, fontweight="bold")
- self.sp_iq.set_ylabel("Amplitude (V)", fontsize=self.label_font_size, fontweight="bold")
- self.plot_iq = self.sp_iq.plot(self.time, self.reals, 'bo-', self.time, self.imags, 'ro-')
+ self.sp_iq.set_xlabel(
+ "Time (s)", fontsize=self.label_font_size, fontweight="bold"
+ )
+ self.sp_iq.set_ylabel(
+ "Amplitude (V)", fontsize=self.label_font_size, fontweight="bold"
+ )
+ self.plot_iq = self.sp_iq.plot(
+ self.time, self.reals, "bo-", self.time, self.imags, "ro-"
+ )
# Subplot for constellation plot
- self.sp_const = self.fig.add_subplot(2,2,1, position=[0.575, 0.2, 0.4, 0.6])
- self.sp_const.set_title(("Constellation"), fontsize=self.title_font_size, fontweight="bold")
- self.sp_const.set_xlabel("Inphase", fontsize=self.label_font_size, fontweight="bold")
- self.sp_const.set_ylabel("Quadrature", fontsize=self.label_font_size, fontweight="bold")
- self.plot_const = self.sp_const.plot(self.reals, self.imags, 'bo')
-
- # Add plots to mark current location of point between time and constellation plots
+ self.sp_const = self.fig.add_subplot(2, 2, 1, position=[0.575, 0.2, 0.4, 0.6])
+ self.sp_const.set_title(
+ ("Constellation"), fontsize=self.title_font_size, fontweight="bold"
+ )
+ self.sp_const.set_xlabel(
+ "Inphase", fontsize=self.label_font_size, fontweight="bold"
+ )
+ self.sp_const.set_ylabel(
+ "Quadrature", fontsize=self.label_font_size, fontweight="bold"
+ )
+ self.plot_const = self.sp_const.plot(self.reals, self.imags, "bo")
+
+ # Add plots to mark current location of point between time and
+ # constellation plots
self.indx = 0
- self.plot_iq += self.sp_iq.plot([self.time[self.indx],], [self.reals[self.indx],], 'mo', ms=8)
- self.plot_iq += self.sp_iq.plot([self.time[self.indx],], [self.imags[self.indx],], 'mo', ms=8)
- self.plot_const += self.sp_const.plot([self.reals[self.indx],], [self.imags[self.indx],], 'mo', ms=12)
+ self.plot_iq += self.sp_iq.plot(
+ [
+ self.time[self.indx],
+ ],
+ [
+ self.reals[self.indx],
+ ],
+ "mo",
+ ms=8,
+ )
+ self.plot_iq += self.sp_iq.plot(
+ [
+ self.time[self.indx],
+ ],
+ [
+ self.imags[self.indx],
+ ],
+ "mo",
+ ms=8,
+ )
+ self.plot_const += self.sp_const.plot(
+ [
+ self.reals[self.indx],
+ ],
+ [
+ self.imags[self.indx],
+ ],
+ "mo",
+ ms=12,
+ )
# Adjust axis
- self.sp_iq.axis([self.time.min(), self.time.max(),
- 1.5*min([self.reals.min(), self.imags.min()]),
- 1.5*max([self.reals.max(), self.imags.max()])])
+ self.sp_iq.axis(
+ [
+ self.time.min(),
+ self.time.max(),
+ 1.5 * min([self.reals.min(), self.imags.min()]),
+ 1.5 * max([self.reals.max(), self.imags.max()]),
+ ]
+ )
self.sp_const.axis([-2, 2, -2, 2])
draw()
@@ -129,9 +201,14 @@ class draw_constellation:
def update_plots(self):
self.plot_iq[0].set_data([self.time, self.reals])
self.plot_iq[1].set_data([self.time, self.imags])
- self.sp_iq.axis([self.time.min(), self.time.max(),
- 1.5*min([self.reals.min(), self.imags.min()]),
- 1.5*max([self.reals.max(), self.imags.max()])])
+ self.sp_iq.axis(
+ [
+ self.time.min(),
+ self.time.max(),
+ 1.5 * min([self.reals.min(), self.imags.min()]),
+ 1.5 * max([self.reals.max(), self.imags.max()]),
+ ]
+ )
self.plot_const[0].set_data([self.reals, self.imags])
self.sp_const.axis([-2, 2, -2, 2])
@@ -140,7 +217,7 @@ class draw_constellation:
def zoom(self, event):
newxlim = numpy.array(self.sp_iq.get_xlim())
curxlim = numpy.array(self.xlim)
- if(newxlim[0] != curxlim[0] or newxlim[1] != curxlim[1]):
+ if newxlim[0] != curxlim[0] or newxlim[1] != curxlim[1]:
self.xlim = newxlim
r = self.reals[int(ceil(self.xlim[0])) : int(ceil(self.xlim[1]))]
i = self.imags[int(ceil(self.xlim[0])) : int(ceil(self.xlim[1]))]
@@ -153,21 +230,25 @@ class draw_constellation:
def click(self, event):
forward_valid_keys = [" ", "down", "right"]
backward_valid_keys = ["up", "left"]
- trace_forward_valid_keys = [">",]
- trace_backward_valid_keys = ["<",]
-
- if(find(event.key, forward_valid_keys)):
+ trace_forward_valid_keys = [
+ ">",
+ ]
+ trace_backward_valid_keys = [
+ "<",
+ ]
+
+ if find(event.key, forward_valid_keys):
self.step_forward()
- elif(find(event.key, backward_valid_keys)):
+ elif find(event.key, backward_valid_keys):
self.step_backward()
- elif(find(event.key, trace_forward_valid_keys)):
- self.indx = min(self.indx+1, len(self.time)-1)
+ elif find(event.key, trace_forward_valid_keys):
+ self.indx = min(self.indx + 1, len(self.time) - 1)
self.set_trace(self.indx)
- elif(find(event.key, trace_backward_valid_keys)):
- self.indx = max(0, self.indx-1)
+ elif find(event.key, trace_backward_valid_keys):
+ self.indx = max(0, self.indx - 1)
self.set_trace(self.indx)
def button_left_click(self, event):
@@ -178,29 +259,27 @@ class draw_constellation:
def step_forward(self):
r = self.get_data()
- if(r):
+ if r:
self.update_plots()
def step_backward(self):
# Step back in file position
- if(self.hfile.tell() >= 2*self.sizeof_data*self.block_length ):
- self.hfile.seek(-2*self.sizeof_data*self.block_length, 1)
+ if self.hfile.tell() >= 2 * self.sizeof_data * self.block_length:
+ self.hfile.seek(-2 * self.sizeof_data * self.block_length, 1)
else:
- self.hfile.seek(-self.hfile.tell(),1)
+ self.hfile.seek(-self.hfile.tell(), 1)
r = self.get_data()
- if(r):
+ if r:
self.update_plots()
-
def mouse_button_callback(self, event):
x, y = event.xdata, event.ydata
if x is not None and y is not None:
- if(event.inaxes == self.sp_iq):
+ if event.inaxes == self.sp_iq:
self.indx = searchsorted(self.time, [x])
self.set_trace(self.indx)
-
def set_trace(self, indx):
self.plot_iq[2].set_data(self.time[indx], self.reals[indx])
self.plot_iq[3].set_data(self.time[indx], self.imags[indx])
@@ -210,7 +289,7 @@ class draw_constellation:
def find(item_in, list_search):
try:
- return list_search.index(item_in) != None
+ return list_search.index(item_in) is not None
except ValueError:
return False
@@ -219,18 +298,33 @@ def main():
description = "Takes a GNU Radio complex binary file and displays the I&Q data versus time and the constellation plot (I vs. Q). You can set the block size to specify how many points to read in at a time and the start position in the file. By default, the system assumes a sample rate of 1, so in time, each sample is plotted versus the sample number. To set a true time axis, set the sample rate (-R or --sample-rate) to the sample rate used when capturing the samples."
parser = ArgumentParser(conflict_handler="resolve", description=description)
- parser.add_argument("-B", "--block", type=int, default=1000,
- help="Specify the block size [default=%(default)r]")
- parser.add_argument("-s", "--start", type=int, default=0,
- help="Specify where to start in the file [default=%(default)r]")
- parser.add_argument("-R", "--sample-rate", type=float, default=1.0,
- help="Set the sampler rate of the data [default=%(default)r]")
- parser.add_argument("file", metavar="FILE",
- help="Input file with complex samples")
+ parser.add_argument(
+ "-B",
+ "--block",
+ type=int,
+ default=1000,
+ help="Specify the block size [default=%(default)r]",
+ )
+ parser.add_argument(
+ "-s",
+ "--start",
+ type=int,
+ default=0,
+ help="Specify where to start in the file [default=%(default)r]",
+ )
+ parser.add_argument(
+ "-R",
+ "--sample-rate",
+ type=float,
+ default=1.0,
+ help="Set the sampler rate of the data [default=%(default)r]",
+ )
+ parser.add_argument("file", metavar="FILE", help="Input file with complex samples")
args = parser.parse_args()
dc = draw_constellation(args.file, args)
+
if __name__ == "__main__":
try:
main()
diff --git a/gr-utils/plot_tools/gr_plot_iq b/gr-utils/plot_tools/gr_plot_iq
index 924109e30d..ba3838c081 100755
--- a/gr-utils/plot_tools/gr_plot_iq
+++ b/gr-utils/plot_tools/gr_plot_iq
@@ -11,15 +11,27 @@
import numpy
try:
- from pylab import (Button, connect, draw, figtext, figure,
- get_current_fig_manager, plot, rcParams, show)
+ from pylab import (
+ Button,
+ connect,
+ draw,
+ figtext,
+ figure,
+ get_current_fig_manager,
+ plot,
+ rcParams,
+ show,
+ )
except ImportError:
- print("Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \
- Python TkInter https://wiki.python.org/moin/TkInter to run this script")
+ print(
+ "Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \
+ Python TkInter https://wiki.python.org/moin/TkInter to run this script"
+ )
raise SystemExit(1)
from argparse import ArgumentParser
+
class draw_iq:
def __init__(self, filename, options):
self.hfile = open(filename, "r")
@@ -28,7 +40,8 @@ class draw_iq:
self.sample_rate = options.sample_rate
self.datatype = numpy.complex64
- self.sizeof_data = self.datatype().nbytes # number of bytes per sample in file
+ # number of bytes per sample in file
+ self.sizeof_data = self.datatype().nbytes
self.axis_font_size = 16
self.label_font_size = 18
@@ -36,65 +49,103 @@ class draw_iq:
self.text_size = 22
# Setup PLOT
- self.fig = figure(1, figsize=(16, 9), facecolor='w')
- rcParams['xtick.labelsize'] = self.axis_font_size
- rcParams['ytick.labelsize'] = self.axis_font_size
-
- self.text_file = figtext(0.10, 0.94, ("File: %s" % filename), weight="heavy", size=self.text_size)
- self.text_file_pos = figtext(0.10, 0.88, "File Position: ", weight="heavy", size=self.text_size)
- self.text_block = figtext(0.40, 0.88, ("Block Size: %d" % self.block_length),
- weight="heavy", size=self.text_size)
- self.text_sr = figtext(0.60, 0.88, ("Sample Rate: %.2f" % self.sample_rate),
- weight="heavy", size=self.text_size)
+ self.fig = figure(1, figsize=(16, 9), facecolor="w")
+ rcParams["xtick.labelsize"] = self.axis_font_size
+ rcParams["ytick.labelsize"] = self.axis_font_size
+
+ self.text_file = figtext(
+ 0.10, 0.94, ("File: %s" % filename), weight="heavy", size=self.text_size
+ )
+ self.text_file_pos = figtext(
+ 0.10, 0.88, "File Position: ", weight="heavy", size=self.text_size
+ )
+ self.text_block = figtext(
+ 0.40,
+ 0.88,
+ ("Block Size: %d" % self.block_length),
+ weight="heavy",
+ size=self.text_size,
+ )
+ self.text_sr = figtext(
+ 0.60,
+ 0.88,
+ ("Sample Rate: %.2f" % self.sample_rate),
+ weight="heavy",
+ size=self.text_size,
+ )
self.make_plots()
- self.button_left_axes = self.fig.add_axes([0.45, 0.01, 0.05, 0.05], frameon=True)
+ self.button_left_axes = self.fig.add_axes(
+ [0.45, 0.01, 0.05, 0.05], frameon=True
+ )
self.button_left = Button(self.button_left_axes, "<")
self.button_left_callback = self.button_left.on_clicked(self.button_left_click)
- self.button_right_axes = self.fig.add_axes([0.50, 0.01, 0.05, 0.05], frameon=True)
+ self.button_right_axes = self.fig.add_axes(
+ [0.50, 0.01, 0.05, 0.05], frameon=True
+ )
self.button_right = Button(self.button_right_axes, ">")
- self.button_right_callback = self.button_right.on_clicked(self.button_right_click)
+ self.button_right_callback = self.button_right.on_clicked(
+ self.button_right_click
+ )
self.xlim = self.sp_iq.get_xlim()
self.manager = get_current_fig_manager()
- connect('key_press_event', self.click)
+ connect("key_press_event", self.click)
show()
def get_data(self):
- self.text_file_pos.set_text("File Position: %d" % (self.hfile.tell()//self.sizeof_data))
+ self.text_file_pos.set_text(
+ "File Position: %d" % (self.hfile.tell() // self.sizeof_data)
+ )
try:
- self.iq = numpy.fromfile(self.hfile, dtype=self.datatype, count=self.block_length)
+ self.iq = numpy.fromfile(
+ self.hfile, dtype=self.datatype, count=self.block_length
+ )
except MemoryError:
print("End of File")
else:
self.reals = numpy.array([r.real for r in self.iq])
self.imags = numpy.array([i.imag for i in self.iq])
- self.time = numpy.array([i*(1/self.sample_rate) for i in range(len(self.reals))])
+ self.time = numpy.array(
+ [i * (1 / self.sample_rate) for i in range(len(self.reals))]
+ )
def make_plots(self):
# if specified on the command-line, set file pointer
- self.hfile.seek(self.sizeof_data*self.start, 1)
+ self.hfile.seek(self.sizeof_data * self.start, 1)
self.get_data()
# Subplot for real and imaginary parts of signal
- self.sp_iq = self.fig.add_subplot(2,1,1, position=[0.075, 0.14, 0.85, 0.67])
+ self.sp_iq = self.fig.add_subplot(2, 1, 1, position=[0.075, 0.14, 0.85, 0.67])
self.sp_iq.set_title(("I&Q"), fontsize=self.title_font_size, fontweight="bold")
- self.sp_iq.set_xlabel("Time (s)", fontsize=self.label_font_size, fontweight="bold")
- self.sp_iq.set_ylabel("Amplitude (V)", fontsize=self.label_font_size, fontweight="bold")
- self.plot_iq = plot(self.time, self.reals, 'bo-', self.time, self.imags, 'ro-')
- self.sp_iq.set_ylim([1.5*min([self.reals.min(), self.imags.min()]),
- 1.5*max([self.reals.max(), self.imags.max()])])
+ self.sp_iq.set_xlabel(
+ "Time (s)", fontsize=self.label_font_size, fontweight="bold"
+ )
+ self.sp_iq.set_ylabel(
+ "Amplitude (V)", fontsize=self.label_font_size, fontweight="bold"
+ )
+ self.plot_iq = plot(self.time, self.reals, "bo-", self.time, self.imags, "ro-")
+ self.sp_iq.set_ylim(
+ [
+ 1.5 * min([self.reals.min(), self.imags.min()]),
+ 1.5 * max([self.reals.max(), self.imags.max()]),
+ ]
+ )
self.sp_iq.set_xlim(self.time.min(), self.time.max())
draw()
def update_plots(self):
self.plot_iq[0].set_data([self.time, self.reals])
self.plot_iq[1].set_data([self.time, self.imags])
- self.sp_iq.set_ylim([1.5*min([self.reals.min(), self.imags.min()]),
- 1.5*max([self.reals.max(), self.imags.max()])])
+ self.sp_iq.set_ylim(
+ [
+ 1.5 * min([self.reals.min(), self.imags.min()]),
+ 1.5 * max([self.reals.max(), self.imags.max()]),
+ ]
+ )
self.sp_iq.set_xlim(self.time.min(), self.time.max())
draw()
@@ -102,10 +153,10 @@ class draw_iq:
forward_valid_keys = [" ", "down", "right"]
backward_valid_keys = ["up", "left"]
- if(find(event.key, forward_valid_keys)):
+ if find(event.key, forward_valid_keys):
self.step_forward()
- elif(find(event.key, backward_valid_keys)):
+ elif find(event.key, backward_valid_keys):
self.step_backward()
def button_left_click(self, event):
@@ -120,37 +171,53 @@ class draw_iq:
def step_backward(self):
# Step back in file position
- if(self.hfile.tell() >= 2*self.sizeof_data*self.block_length ):
- self.hfile.seek(-2*self.sizeof_data*self.block_length, 1)
+ if self.hfile.tell() >= 2 * self.sizeof_data * self.block_length:
+ self.hfile.seek(-2 * self.sizeof_data * self.block_length, 1)
else:
- self.hfile.seek(-self.hfile.tell(),1)
+ self.hfile.seek(-self.hfile.tell(), 1)
self.get_data()
self.update_plots()
def find(item_in, list_search):
try:
- return list_search.index(item_in) != None
+ return list_search.index(item_in) is not None
except ValueError:
return False
+
def main():
description = "Takes a GNU Radio complex binary file and displays the I&Q data versus time. You can set the block size to specify how many points to read in at a time and the start position in the file. By default, the system assumes a sample rate of 1, so in time, each sample is plotted versus the sample number. To set a true time axis, set the sample rate (-R or --sample-rate) to the sample rate used when capturing the samples."
parser = ArgumentParser(conflict_handler="resolve", description=description)
- parser.add_argument("-B", "--block", type=int, default=1000,
- help="Specify the block size [default=%(default)r]")
- parser.add_argument("-s", "--start", type=int, default=0,
- help="Specify where to start in the file [default=%(default)r]")
- parser.add_argument("-R", "--sample-rate", type=float, default=1.0,
- help="Set the sampler rate of the data [default=%(default)r]")
- parser.add_argument("file", metavar="FILE",
- help="Input file with complex samples")
+ parser.add_argument(
+ "-B",
+ "--block",
+ type=int,
+ default=1000,
+ help="Specify the block size [default=%(default)r]",
+ )
+ parser.add_argument(
+ "-s",
+ "--start",
+ type=int,
+ default=0,
+ help="Specify where to start in the file [default=%(default)r]",
+ )
+ parser.add_argument(
+ "-R",
+ "--sample-rate",
+ type=float,
+ default=1.0,
+ help="Set the sampler rate of the data [default=%(default)r]",
+ )
+ parser.add_argument("file", metavar="FILE", help="Input file with complex samples")
args = parser.parse_args()
dc = draw_iq(args.file, args)
+
if __name__ == "__main__":
try:
main()