diff options
author | Mark Bauer <markb5@illinois.edu> | 2021-06-09 15:52:54 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-06-16 06:38:49 -0400 |
commit | c2961061793b538a8a87133755acead8088ae2ab (patch) | |
tree | 45a70498042ba308a50697c766e636d33ae24277 | |
parent | cdf17740c6b9606e41acb5d5a9a95b2ee36931b2 (diff) |
applied formatter to code
Signed-off-by: Mark Bauer <markb5@illinois.edu>
-rw-r--r-- | gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py | 18 | ||||
-rw-r--r-- | gr-fec/python/fec/extended_decoder.py | 244 | ||||
-rw-r--r-- | gr-fec/python/fec/extended_tagged_decoder.py | 230 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/channel_construction.py | 19 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/channel_construction_awgn.py | 51 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/common.py | 24 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/testbed.py | 137 | ||||
-rwxr-xr-x | gr-utils/plot_tools/gr_plot_const | 238 | ||||
-rwxr-xr-x | gr-utils/plot_tools/gr_plot_iq | 157 |
9 files changed, 703 insertions, 415 deletions
diff --git a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py index 53e9fa6b29..4e0c622e15 100644 --- a/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py +++ b/gr-fec/python/fec/LDPC/Generate_LDPC_matrix.py @@ -37,11 +37,11 @@ from .Generate_LDPC_matrix_functions import ( # First, generate a regular LDPC parity check matrix. Specify # the properties desired. For example: -n = 200 # number of columns, corresponds to codeword length -p = 3 # column weight -q = 5 # row weight +n = 200 # number of columns, corresponds to codeword length +p = 3 # column weight +q = 5 # row weight -parity_check_matrix = LDPC_matrix(n_p_q = [n,p,q]) +parity_check_matrix = LDPC_matrix(n_p_q = [n, p, q]) # Richardson and Urbanke's preprocessing method requires a full rank # matrix to start. The matrices generated by the @@ -55,7 +55,7 @@ newH = get_full_rank_H_matrix(parity_check_matrix.H) # Next, some preprocessing steps need to be performed as described # Richardson and Urbanke in Modern Coding Theory, Appendix A. This # can take a while... -[bestH,g] = get_best_matrix(newH,100) +[bestH, g] = get_best_matrix(newH, 100) # Print(out some of the resulting properties.) n = bestH.shape[1] @@ -63,12 +63,12 @@ k = n - bestH.shape[0] print("Parity check matrix properties:") print("\tSize :", bestH.shape) print("\tRank :", linalg.matrix_rank(bestH)) -print("\tRate : %.3f" % ((k*1.0) / n)) +print("\tRate : %.3f" % ((k * 1.0) / n)) print("\tn :", n, " (codeword length)") print("\tk :", k, " (info word length)") print("\tgap : %i" % g) # Save the matrix to an alist file for future use: -alist_filename = "n_%04i_k_%04i_gap_%02i.alist" % (n,k,g) -write_alist_file(alist_filename,bestH) -print('\nMatrix saved to alist file:', alist_filename, "\n") +alist_filename = "n_%04i_k_%04i_gap_%02i.alist" % (n, k, g) +write_alist_file(alist_filename, bestH) +print("\nMatrix saved to alist file:", alist_filename, "\n") diff --git a/gr-fec/python/fec/extended_decoder.py b/gr-fec/python/fec/extended_decoder.py index a71759c989..9409824981 100644 --- a/gr-fec/python/fec/extended_decoder.py +++ b/gr-fec/python/fec/extended_decoder.py @@ -20,75 +20,90 @@ from .capillary_threaded_decoder import capillary_threaded_decoder class extended_decoder(gr.hier_block2): -#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density) -#for i in numpy.arange(.1, .499, .01): - #print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);) + # solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density) + # for i in numpy.arange(.1, .499, .01): + # print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);) garbletable = { - 0.310786835319:0.1, - 0.279118162802:0.11, - 0.252699589071:0.12, - 0.230318516016:0.13, - 0.211108735347:0.14, - 0.194434959095:0.15, - 0.179820650401:0.16, - 0.166901324951:0.17, - 0.15539341766:0.18, - 0.145072979886:0.19, - 0.135760766313:0.2, - 0.127311581396:0.21, - 0.119606529806:0.22, - 0.112547286766:0.23, - 0.106051798775:0.24, - 0.10005101381:0.25, - 0.0944863633098:0.26, - 0.0893078003966:0.27, - 0.084472254501:0.28, - 0.0799424008658:0.29, - 0.0756856701944:0.3, - 0.0716734425668:0.31, - 0.0678803831565:0.32, - 0.0642838867856:0.33, - 0.0608636049994:0.34, - 0.0576010337489:0.35, - 0.0544791422522:0.36, - 0.0514820241933:0.37, - 0.0485945507251:0.38, - 0.0458019998183:0.39, - 0.0430896262596:0.4, - 0.0404421166935:0.41, - 0.0378428350972:0.42, - 0.0352726843274:0.43, - 0.0327082350617:0.44, - 0.0301183562535:0.45, - 0.0274574540266:0.46, - 0.0246498236897:0.47, - 0.0215448131298:0.48, - 0.0177274208353:0.49, + 0.310786835319: 0.1, + 0.279118162802: 0.11, + 0.252699589071: 0.12, + 0.230318516016: 0.13, + 0.211108735347: 0.14, + 0.194434959095: 0.15, + 0.179820650401: 0.16, + 0.166901324951: 0.17, + 0.15539341766: 0.18, + 0.145072979886: 0.19, + 0.135760766313: 0.2, + 0.127311581396: 0.21, + 0.119606529806: 0.22, + 0.112547286766: 0.23, + 0.106051798775: 0.24, + 0.10005101381: 0.25, + 0.0944863633098: 0.26, + 0.0893078003966: 0.27, + 0.084472254501: 0.28, + 0.0799424008658: 0.29, + 0.0756856701944: 0.3, + 0.0716734425668: 0.31, + 0.0678803831565: 0.32, + 0.0642838867856: 0.33, + 0.0608636049994: 0.34, + 0.0576010337489: 0.35, + 0.0544791422522: 0.36, + 0.0514820241933: 0.37, + 0.0485945507251: 0.38, + 0.0458019998183: 0.39, + 0.0430896262596: 0.4, + 0.0404421166935: 0.41, + 0.0378428350972: 0.42, + 0.0352726843274: 0.43, + 0.0327082350617: 0.44, + 0.0301183562535: 0.45, + 0.0274574540266: 0.46, + 0.0246498236897: 0.47, + 0.0215448131298: 0.48, + 0.0177274208353: 0.49, } - def __init__(self, decoder_obj_list, threading, ann=None, puncpat='11', - integration_period=10000, flush=None, rotator=None): - gr.hier_block2.__init__(self, "extended_decoder", - gr.io_signature(1, 1, gr.sizeof_float), - gr.io_signature(1, 1, gr.sizeof_char)) - self.blocks=[] - self.ann=ann - self.puncpat=puncpat - self.flush=flush - - if(type(decoder_obj_list) == list): - if(type(decoder_obj_list[0]) == list): + def __init__( + self, + decoder_obj_list, + threading, + ann=None, + puncpat="11", + integration_period=10000, + flush=None, + rotator=None, + ): + gr.hier_block2.__init__( + self, + "extended_decoder", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_char), + ) + self.blocks = [] + self.ann = ann + self.puncpat = puncpat + self.flush = flush + + if isinstance(decoder_obj_list, list): + if isinstance(decoder_obj_list[0], list): gr.log.info("fec.extended_decoder: Parallelism must be 1.") raise AttributeError else: # If it has parallelism of 0, force it into a list of 1 - decoder_obj_list = [decoder_obj_list,] + decoder_obj_list = [ + decoder_obj_list, + ] - message_collector_connected=False + message_collector_connected = False - ##anything going through the annihilator needs shifted, uchar vals - if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \ - fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": + # anything going through the annihilator needs shifted, uchar vals + if ( + fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" + or fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits" + ): self.blocks.append(blocks.multiply_const_ff(48.0)) if fec.get_shift(decoder_obj_list[0]) != 0.0: @@ -96,66 +111,95 @@ class extended_decoder(gr.hier_block2): elif fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": self.blocks.append(blocks.add_const_ff(128.0)) - if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \ - fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": - self.blocks.append(blocks.float_to_uchar()); + if ( + fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" + or fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits" + ): + self.blocks.append(blocks.float_to_uchar()) - const_index = 0; #index that corresponds to mod order for specinvert purposes + const_index = 0 # index that corresponds to mod order for specinvert purposes if not self.flush: - flush = 10000; + flush = 10000 else: - flush = self.flush; - if self.ann: #ann and puncpat are strings of 0s and 1s - cat = fec.ULLVector(); + flush = self.flush + if self.ann: # ann and puncpat are strings of 0s and 1s + cat = fec.ULLVector() for i in fec.read_big_bitlist(ann): - cat.append(i); + cat.append(i) - synd_garble = .49 - idx_list = list(self.garbletable.keys()) - idx_list.sort() + synd_garble = 0.49 + idx_list = sorted(self.garbletable.keys()) for i in idx_list: - if 1.0 / self.ann.count('1') >= i: + if 1.0 / self.ann.count("1") >= i: synd_garble = self.garbletable[i] - print('using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb') - print('ceiling: .0335 data garble rate') - self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'), - len(ann), integration_period, flush, synd_garble)) - - if self.puncpat != '11': - self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0)) + print( + "using syndrom garble threshold " + + str(synd_garble) + + "for conv_bit_corr_bb" + ) + print("ceiling: .0335 data garble rate") + self.blocks.append( + fec.conv_bit_corr_bb( + cat, + len(puncpat) - puncpat.count("0"), + len(ann), + integration_period, + flush, + synd_garble, + ) + ) + + if self.puncpat != "11": + self.blocks.append( + fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0) + ) if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": self.blocks.append(blocks.uchar_to_float()) self.blocks.append(blocks.add_const_ff(-128.0)) self.blocks.append(digital.binary_slicer_fb()) - self.blocks.append(blocks.unpacked_to_packed_bb(1,0)) + self.blocks.append(blocks.unpacked_to_packed_bb(1, 0)) - if(len(decoder_obj_list) > 1): - if(fec.get_history(decoder_obj_list[0]) != 0): - gr.log.info("fec.extended_decoder: Cannot use multi-threaded parallelism on a decoder with history.") + if len(decoder_obj_list) > 1: + if fec.get_history(decoder_obj_list[0]) != 0: + gr.log.info( + "fec.extended_decoder: Cannot use multi-threaded parallelism on a decoder with history." + ) raise AttributeError - if threading == 'capillary': - self.blocks.append(capillary_threaded_decoder(decoder_obj_list, - fec.get_decoder_input_item_size(decoder_obj_list[0]), - fec.get_decoder_output_item_size(decoder_obj_list[0]))) - - elif threading == 'ordinary': - self.blocks.append(threaded_decoder(decoder_obj_list, - fec.get_decoder_input_item_size(decoder_obj_list[0]), - fec.get_decoder_output_item_size(decoder_obj_list[0]))) + if threading == "capillary": + self.blocks.append( + capillary_threaded_decoder( + decoder_obj_list, + fec.get_decoder_input_item_size(decoder_obj_list[0]), + fec.get_decoder_output_item_size(decoder_obj_list[0]), + ) + ) + + elif threading == "ordinary": + self.blocks.append( + threaded_decoder( + decoder_obj_list, + fec.get_decoder_input_item_size(decoder_obj_list[0]), + fec.get_decoder_output_item_size(decoder_obj_list[0]), + ) + ) else: - self.blocks.append(fec.decoder(decoder_obj_list[0], - fec.get_decoder_input_item_size(decoder_obj_list[0]), - fec.get_decoder_output_item_size(decoder_obj_list[0]))) + self.blocks.append( + fec.decoder( + decoder_obj_list[0], + fec.get_decoder_input_item_size(decoder_obj_list[0]), + fec.get_decoder_output_item_size(decoder_obj_list[0]), + ) + ) if fec.get_decoder_output_conversion(decoder_obj_list[0]) == "unpack": - self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)); + self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)) - self.connect((self, 0), (self.blocks[0], 0)); - self.connect((self.blocks[-1], 0), (self, 0)); + self.connect((self, 0), (self.blocks[0], 0)) + self.connect((self.blocks[-1], 0), (self, 0)) for i in range(len(self.blocks) - 1): - self.connect((self.blocks[i], 0), (self.blocks[i+1], 0)); + self.connect((self.blocks[i], 0), (self.blocks[i + 1], 0)) diff --git a/gr-fec/python/fec/extended_tagged_decoder.py b/gr-fec/python/fec/extended_tagged_decoder.py index 95bffc453a..0489d738ae 100644 --- a/gr-fec/python/fec/extended_tagged_decoder.py +++ b/gr-fec/python/fec/extended_tagged_decoder.py @@ -18,67 +18,79 @@ from .bitflip import read_bitlist class extended_tagged_decoder(gr.hier_block2): -#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density) -#for i in numpy.arange(.1, .499, .01): - #print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);) + # solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density) + # for i in numpy.arange(.1, .499, .01): + # print(str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);) garbletable = { - 0.310786835319:0.1, - 0.279118162802:0.11, - 0.252699589071:0.12, - 0.230318516016:0.13, - 0.211108735347:0.14, - 0.194434959095:0.15, - 0.179820650401:0.16, - 0.166901324951:0.17, - 0.15539341766:0.18, - 0.145072979886:0.19, - 0.135760766313:0.2, - 0.127311581396:0.21, - 0.119606529806:0.22, - 0.112547286766:0.23, - 0.106051798775:0.24, - 0.10005101381:0.25, - 0.0944863633098:0.26, - 0.0893078003966:0.27, - 0.084472254501:0.28, - 0.0799424008658:0.29, - 0.0756856701944:0.3, - 0.0716734425668:0.31, - 0.0678803831565:0.32, - 0.0642838867856:0.33, - 0.0608636049994:0.34, - 0.0576010337489:0.35, - 0.0544791422522:0.36, - 0.0514820241933:0.37, - 0.0485945507251:0.38, - 0.0458019998183:0.39, - 0.0430896262596:0.4, - 0.0404421166935:0.41, - 0.0378428350972:0.42, - 0.0352726843274:0.43, - 0.0327082350617:0.44, - 0.0301183562535:0.45, - 0.0274574540266:0.46, - 0.0246498236897:0.47, - 0.0215448131298:0.48, - 0.0177274208353:0.49, + 0.310786835319: 0.1, + 0.279118162802: 0.11, + 0.252699589071: 0.12, + 0.230318516016: 0.13, + 0.211108735347: 0.14, + 0.194434959095: 0.15, + 0.179820650401: 0.16, + 0.166901324951: 0.17, + 0.15539341766: 0.18, + 0.145072979886: 0.19, + 0.135760766313: 0.2, + 0.127311581396: 0.21, + 0.119606529806: 0.22, + 0.112547286766: 0.23, + 0.106051798775: 0.24, + 0.10005101381: 0.25, + 0.0944863633098: 0.26, + 0.0893078003966: 0.27, + 0.084472254501: 0.28, + 0.0799424008658: 0.29, + 0.0756856701944: 0.3, + 0.0716734425668: 0.31, + 0.0678803831565: 0.32, + 0.0642838867856: 0.33, + 0.0608636049994: 0.34, + 0.0576010337489: 0.35, + 0.0544791422522: 0.36, + 0.0514820241933: 0.37, + 0.0485945507251: 0.38, + 0.0458019998183: 0.39, + 0.0430896262596: 0.4, + 0.0404421166935: 0.41, + 0.0378428350972: 0.42, + 0.0352726843274: 0.43, + 0.0327082350617: 0.44, + 0.0301183562535: 0.45, + 0.0274574540266: 0.46, + 0.0246498236897: 0.47, + 0.0215448131298: 0.48, + 0.0177274208353: 0.49, } - def __init__(self, decoder_obj_list, ann=None, puncpat='11', - integration_period=10000, flush=None, rotator=None, lentagname=None, - mtu=1500): - gr.hier_block2.__init__(self, "extended_decoder", - gr.io_signature(1, 1, gr.sizeof_float), - gr.io_signature(1, 1, gr.sizeof_char)) - self.blocks=[] - self.ann=ann - self.puncpat=puncpat - self.flush=flush - - if(type(decoder_obj_list) == list): + def __init__( + self, + decoder_obj_list, + ann=None, + puncpat="11", + integration_period=10000, + flush=None, + rotator=None, + lentagname=None, + mtu=1500, + ): + gr.hier_block2.__init__( + self, + "extended_decoder", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_char), + ) + self.blocks = [] + self.ann = ann + self.puncpat = puncpat + self.flush = flush + + if isinstance(decoder_obj_list, list): # This block doesn't handle parallelism of > 1 - # We could just grab encoder [0][0], but we don't want to encourage this. - if(type(decoder_obj_list[0]) == list): + # We could just grab encoder [0][0], but we don't want to encourage + # this. + if isinstance(decoder_obj_list[0], list): gr.log.info("fec.extended_tagged_decoder: Parallelism must be 1.") raise AttributeError @@ -90,15 +102,17 @@ class extended_tagged_decoder(gr.hier_block2): # If lentagname is None, fall back to using the non tagged # stream version - if type(lentagname) == str: - if(lentagname.lower() == 'none'): + if isinstance(lentagname, str): + if lentagname.lower() == "none": lentagname = None - message_collector_connected=False + message_collector_connected = False - ##anything going through the annihilator needs shifted, uchar vals - if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \ - fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": + # anything going through the annihilator needs shifted, uchar vals + if ( + fec.get_decoder_input_conversion(decoder_obj) == "uchar" + or fec.get_decoder_input_conversion(decoder_obj) == "packed_bits" + ): self.blocks.append(blocks.multiply_const_ff(48.0)) if fec.get_shift(decoder_obj) != 0.0: @@ -106,57 +120,81 @@ class extended_tagged_decoder(gr.hier_block2): elif fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": self.blocks.append(blocks.add_const_ff(128.0)) - if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \ - fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": - self.blocks.append(blocks.float_to_uchar()); + if ( + fec.get_decoder_input_conversion(decoder_obj) == "uchar" + or fec.get_decoder_input_conversion(decoder_obj) == "packed_bits" + ): + self.blocks.append(blocks.float_to_uchar()) - const_index = 0; #index that corresponds to mod order for specinvert purposes + const_index = 0 # index that corresponds to mod order for specinvert purposes if not self.flush: - flush = 10000; + flush = 10000 else: - flush = self.flush; - if self.ann: #ann and puncpat are strings of 0s and 1s - cat = fec.ULLVector(); + flush = self.flush + if self.ann: # ann and puncpat are strings of 0s and 1s + cat = fec.ULLVector() for i in fec.read_big_bitlist(ann): - cat.append(i); + cat.append(i) - synd_garble = .49 - idx_list = list(self.garbletable.keys()) - idx_list.sort() + synd_garble = 0.49 + idx_list = sorted(self.garbletable.keys()) for i in idx_list: - if 1.0 / self.ann.count('1') >= i: + if 1.0 / self.ann.count("1") >= i: synd_garble = self.garbletable[i] - print('using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb') - print('ceiling: .0335 data garble rate') - self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'), - len(ann), integration_period, flush, synd_garble)) - - if self.puncpat != '11': - self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0)) + print( + "using syndrom garble threshold " + + str(synd_garble) + + "for conv_bit_corr_bb" + ) + print("ceiling: .0335 data garble rate") + self.blocks.append( + fec.conv_bit_corr_bb( + cat, + len(puncpat) - puncpat.count("0"), + len(ann), + integration_period, + flush, + synd_garble, + ) + ) + + if self.puncpat != "11": + self.blocks.append( + fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0) + ) if fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": self.blocks.append(blocks.uchar_to_float()) self.blocks.append(blocks.add_const_ff(-128.0)) self.blocks.append(digital.binary_slicer_fb()) - self.blocks.append(blocks.unpacked_to_packed_bb(1,0)) + self.blocks.append(blocks.unpacked_to_packed_bb(1, 0)) else: - if(not lentagname): - self.blocks.append(fec.decoder(decoder_obj, - fec.get_decoder_input_item_size(decoder_obj), - fec.get_decoder_output_item_size(decoder_obj))) + if not lentagname: + self.blocks.append( + fec.decoder( + decoder_obj, + fec.get_decoder_input_item_size(decoder_obj), + fec.get_decoder_output_item_size(decoder_obj), + ) + ) else: - self.blocks.append(fec.tagged_decoder(decoder_obj, - fec.get_decoder_input_item_size(decoder_obj), - fec.get_decoder_output_item_size(decoder_obj), - lentagname, mtu)) + self.blocks.append( + fec.tagged_decoder( + decoder_obj, + fec.get_decoder_input_item_size(decoder_obj), + fec.get_decoder_output_item_size(decoder_obj), + lentagname, + mtu, + ) + ) if fec.get_decoder_output_conversion(decoder_obj) == "unpack": - self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)); + self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)) - self.connect((self, 0), (self.blocks[0], 0)); - self.connect((self.blocks[-1], 0), (self, 0)); + self.connect((self, 0), (self.blocks[0], 0)) + self.connect((self.blocks[-1], 0), (self, 0)) for i in range(len(self.blocks) - 1): - self.connect((self.blocks[i], 0), (self.blocks[i+1], 0)); + self.connect((self.blocks[i], 0), (self.blocks[i + 1], 0)) diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py index 629a7e3cfd..2ec78679a5 100644 --- a/gr-fec/python/fec/polar/channel_construction.py +++ b/gr-fec/python/fec/polar/channel_construction.py @@ -6,20 +6,22 @@ # # -''' +""" [0] Erdal Arikan: 'Channel Polarization: A Method for Constructing Capacity-Achieving Codes for Symmetric Binary-Input Memoryless Channels', 2009 foundational paper for polar codes. -''' +""" from .channel_construction_bec import calculate_bec_channel_capacities from .channel_construction_bec import design_snr_to_bec_eta from .channel_construction_bec import bhattacharyya_bounds import numpy as np + try: from .channel_construction_awgn import tal_vardy_tpm_algorithm except ImportError: print("SciPy missing. Overwrite Tal-Vardy algorithm with BEC approximation") + def tal_vardy_tpm_algorithm(block_size, design_snr, mu): return bhattacharyya_bounds(design_snr, block_size) @@ -59,7 +61,7 @@ def get_frozen_bit_mask(frozen_indices, block_size): def frozen_bit_positions(block_size, info_size, design_snr=0.0): if not design_snr > -1.5917: - print('bad value for design_nsr, must be > -1.5917! default=0.0') + print("bad value for design_nsr, must be > -1.5917! default=0.0") design_snr = 0.0 eta = design_snr_to_bec_eta(design_snr) return get_bec_frozen_indices(block_size, block_size - info_size, eta) @@ -74,6 +76,7 @@ def generate_filename(block_size, design_snr, mu): def default_dir(): dir_def = "~/.gnuradio/polar/" import os + path = os.path.expanduser(dir_def) try: @@ -84,7 +87,9 @@ def default_dir(): return path -def save_z_parameters(z_params, block_size, design_snr, mu, alt_construction_method='Tal-Vardy algorithm'): +def save_z_parameters( + z_params, block_size, design_snr, mu, alt_construction_method="Tal-Vardy algorithm" +): path = default_dir() filename = generate_filename(block_size, design_snr, mu) header = Z_PARAM_FIRST_HEADER_LINE + "\n" @@ -101,6 +106,7 @@ def load_z_parameters(block_size, design_snr, mu): filename = generate_filename(block_size, design_snr, mu) full_file = path + filename import os + if not os.path.isfile(full_file): z_params = tal_vardy_tpm_algorithm(block_size, design_snr, mu) save_z_parameters(z_params, block_size, design_snr, mu) @@ -110,7 +116,7 @@ def load_z_parameters(block_size, design_snr, mu): def main(): np.set_printoptions(precision=3, linewidth=150) - print('channel construction Bhattacharyya bounds by Arikan') + print("channel construction Bhattacharyya bounds by Arikan") n = 10 m = 2 ** n k = m // 2 @@ -123,10 +129,11 @@ def main(): if 0: import matplotlib.pyplot as plt + plt.plot(z_params) plt.plot(z_bounds) plt.show() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/gr-fec/python/fec/polar/channel_construction_awgn.py b/gr-fec/python/fec/polar/channel_construction_awgn.py index 8a81c68076..667c17b8fc 100644 --- a/gr-fec/python/fec/polar/channel_construction_awgn.py +++ b/gr-fec/python/fec/polar/channel_construction_awgn.py @@ -7,20 +7,24 @@ # -''' +""" Based on 2 papers: [1] Ido Tal, Alexander Vardy: 'How To Construct Polar Codes', 2013 for an in-depth description of a widely used algorithm for channel construction. [2] Harish Vangala, Emanuele Viterbo, Yi Hong: 'A Comparative Study of Polar Code Constructions for the AWGN Channel', 2015 for an overview of different approaches -''' +""" from scipy.optimize import fsolve from scipy.special import erfc import numpy as np -from .helper_functions import (bhattacharyya_parameter, bit_reverse_vector, - power_of_2_int, show_progress_bar) +from .helper_functions import ( + bhattacharyya_parameter, + bit_reverse_vector, + power_of_2_int, + show_progress_bar, +) from .channel_construction_bec import bhattacharyya_bounds @@ -45,7 +49,7 @@ def codeword_lambda(y, s): def instantanious_capacity_callable(): - return lambda x : 1 - np.log2(1 + x) + (x * np.log2(x) / (1 + x)) + return lambda x: 1 - np.log2(1 + x) + (x * np.log2(x) / (1 + x)) def instantanious_capacity(x): @@ -54,11 +58,11 @@ def instantanious_capacity(x): def q_function(x): # Q(x) = (1 / sqrt(2 * pi) ) * integral (x to inf) exp(- x ^ 2 / 2) dx - return .5 * erfc(x / np.sqrt(2)) + return 0.5 * erfc(x / np.sqrt(2)) def discretize_awgn(mu, design_snr): - ''' + """ needed for Binary-AWGN channels. in [1] described in Section VI in [2] described as a function of the same name. @@ -68,18 +72,20 @@ def discretize_awgn(mu, design_snr): 2. split into mu intervals. 3. find corresponding output alphabet values y of likelihood ratio function lambda(y) inserted into C(x) 4. Calculate probability for each value given that a '0' or '1' is was transmitted. - ''' + """ s = 10 ** (design_snr / 10) a = np.zeros(mu + 1, dtype=float) a[-1] = np.inf for i in range(1, mu): - a[i] = solve_capacity(1. * i / mu, s) + a[i] = solve_capacity(1.0 * i / mu, s) factor = np.sqrt(2 * s) tpm = np.zeros((2, mu)) for j in range(mu): tpm[0][j] = q_function(factor + a[j]) - q_function(factor + a[j + 1]) - tpm[1][j] = q_function(-1. * factor + a[j]) - q_function(-1. * factor + a[j + 1]) + tpm[1][j] = q_function(-1.0 * factor + a[j]) - q_function( + -1.0 * factor + a[j + 1] + ) tpm = tpm[::-1] tpm[0] = tpm[0][::-1] @@ -88,7 +94,11 @@ def discretize_awgn(mu, design_snr): def instant_capacity_delta_callable(): - return lambda a, b: -1. * (a + b) * np.log2((a + b) / 2) + a * np.log2(a) + b * np.log2(b) + return ( + lambda a, b: -1.0 * (a + b) * np.log2((a + b) / 2) + + a * np.log2(a) + + b * np.log2(b) + ) def capacity_delta_callable(): @@ -101,7 +111,7 @@ def quantize_to_size(tpm, mu): calculate_delta_I = capacity_delta_callable() L = np.shape(tpm)[1] if not mu < L: - print('WARNING: This channel gets too small!') + print("WARNING: This channel gets too small!") # lambda works on vectors just fine. Use Numpy vector awesomeness. delta_i_vec = calculate_delta_I(tpm[0, 0:-1], tpm[1, 0:-1], tpm[0, 1:], tpm[1, 1:]) @@ -133,8 +143,12 @@ def tal_vardy_tpm_algorithm(block_size, design_snr, mu): channels = np.zeros((block_size, 2, mu)) channels[0] = discretize_awgn(mu, design_snr) * 2 - print('Constructing polar code with Tal-Vardy algorithm') - print('(block_size = {0}, design SNR = {1}, mu = {2}'.format(block_size, design_snr, 2 * mu)) + print("Constructing polar code with Tal-Vardy algorithm") + print( + "(block_size = {0}, design SNR = {1}, mu = {2}".format( + block_size, design_snr, 2 * mu + ) + ) show_progress_bar(0, block_size) for j in range(0, block_power): u = 2 ** j @@ -153,8 +167,8 @@ def tal_vardy_tpm_algorithm(block_size, design_snr, mu): z = z[bit_reverse_vector(np.arange(block_size), block_power)] z = upper_bound_z_params(z, block_size, design_snr) show_progress_bar(block_size, block_size) - print('') - print('channel construction DONE') + print("") + print("channel construction DONE") return z @@ -239,7 +253,7 @@ def normalize_q(q, tpm): def main(): - print('channel construction AWGN main') + print("channel construction AWGN main") n = 8 m = 2 ** n design_snr = 0.0 @@ -250,9 +264,10 @@ def main(): if 0: import matplotlib.pyplot as plt + plt.plot(z_params) plt.show() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/gr-fec/python/fec/polar/common.py b/gr-fec/python/fec/polar/common.py index 3c3410bf85..e60cc911f2 100644 --- a/gr-fec/python/fec/polar/common.py +++ b/gr-fec/python/fec/polar/common.py @@ -7,13 +7,12 @@ # - import numpy as np from .helper_functions import bit_reverse_vector, is_power_of_two -''' +""" PolarCommon holds value checks and common initializer code for both Encoder and Decoder. -''' +""" class PolarCommon(object): @@ -23,15 +22,25 @@ class PolarCommon(object): if frozenbits is None: frozenbits = np.zeros(n - k, dtype=np.int) if not len(frozenbits) == n - k: - raise ValueError("len(frozenbits)={0} is not equal to n-k={1}!".format(len(frozenbits), n - k)) + raise ValueError( + "len(frozenbits)={0} is not equal to n-k={1}!".format( + len(frozenbits), n - k + ) + ) if not frozenbits.dtype == np.int: frozenbits = frozenbits.astype(dtype=int) if not len(frozen_bit_position) == (n - k): - raise ValueError("len(frozen_bit_position)={0} is not equal to n-k={1}!".format(len(frozen_bit_position), n - k)) + raise ValueError( + "len(frozen_bit_position)={0} is not equal to n-k={1}!".format( + len(frozen_bit_position), n - k + ) + ) if not frozen_bit_position.dtype == np.int: frozen_bit_position = frozen_bit_position.astype(dtype=int) - self.bit_reverse_positions = self._vector_bit_reversed(np.arange(n, dtype=int), int(np.log2(n))) + self.bit_reverse_positions = self._vector_bit_reversed( + np.arange(n, dtype=int), int(np.log2(n)) + ) self.N = n self.power = int(np.log2(self.N)) self.K = k @@ -65,7 +74,8 @@ class PolarCommon(object): return vec def _encode_natural_order(self, vec): - # use this function. It reflects the encoding process implemented in VOLK. + # use this function. It reflects the encoding process implemented in + # VOLK. vec = vec[self.bit_reverse_positions] return self._encode_efficient(vec) diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py index b2350f9ef6..4e1b40a7f7 100644 --- a/gr-fec/python/fec/polar/testbed.py +++ b/gr-fec/python/fec/polar/testbed.py @@ -7,7 +7,6 @@ # - from .encoder import PolarEncoder from .decoder import PolarDecoder from . import channel_construction as cc @@ -22,7 +21,9 @@ def get_frozen_bit_position(): # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) m = 256 n_frozen = m // 2 - frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, m), n_frozen) + frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters( + cc.bhattacharyya_bounds(0.0, m), n_frozen + ) print(frozenbitposition) return frozenbitposition @@ -40,7 +41,9 @@ def test_enc_dec_chain(): encoded = encoder.encode(bits) rx = decoder.decode(encoded) if not is_equal(bits, rx): - raise ValueError('Test #', i, 'failed, input and output differ', bits, '!=', rx) + raise ValueError( + "Test #", i, "failed, input and output differ", bits, "!=", rx + ) return @@ -48,7 +51,9 @@ def is_equal(first, second): if not (first == second).all(): result = first == second for i in range(len(result)): - print('{0:4}: {1:2} == {2:1} = {3}'.format(i, first[i], second[i], result[i])) + print( + "{0:4}: {1:2} == {2:1} = {3}".format(i, first[i], second[i], result[i]) + ) return False return True @@ -62,11 +67,11 @@ def approx_value(la, lb): def path_metric_exact(last_pm, llr, ui): - return last_pm + np.log(1 + np.exp(-1. * llr * (1 - 2 * ui))) + return last_pm + np.log(1 + np.exp(-1.0 * llr * (1 - 2 * ui))) def path_metric_approx(last_pm, llr, ui): - if ui == int(.5 * (1 - np.sign(llr))): + if ui == int(0.5 * (1 - np.sign(llr))): return last_pm return last_pm + np.abs(llr) @@ -97,15 +102,17 @@ def test_1024_rate_1_code(): bits = np.random.randint(2, size=k) tx = encoder.encode(bits) np.random.shuffle(possible_indices) - tx[possible_indices[0:num_transitions]] = (tx[possible_indices[0:num_transitions]] + 1) % 2 + tx[possible_indices[0:num_transitions]] = ( + tx[possible_indices[0:num_transitions]] + 1 + ) % 2 rx = tx recv = decoder.decode(rx) - channel_counter += (bits == recv) + channel_counter += bits == recv print(channel_counter) print(np.min(channel_counter), np.max(channel_counter)) - np.save('channel_counter_' + str(ntests) + '.npy', channel_counter) + np.save("channel_counter_" + str(ntests) + ".npy", channel_counter) def find_good_indices(res, nindices): @@ -121,16 +128,18 @@ def find_good_indices(res, nindices): def channel_analysis(): ntests = 10000 - filename = 'channel_counter_' + str(ntests) + '.npy' + filename = "channel_counter_" + str(ntests) + ".npy" channel_counter = np.load(filename) print(np.min(channel_counter), np.max(channel_counter)) channel_counter[0] = np.min(channel_counter) good_indices = find_good_indices(channel_counter, channel_counter.size // 2) info_bit_positions = np.where(good_indices > 0) print(info_bit_positions) - frozen_bit_positions = np.delete(np.arange(channel_counter.size), info_bit_positions) + frozen_bit_positions = np.delete( + np.arange(channel_counter.size), info_bit_positions + ) print(frozen_bit_positions) - np.save('frozen_bit_positions_n256_k128_p0.11.npy', frozen_bit_positions) + np.save("frozen_bit_positions_n256_k128_p0.11.npy", frozen_bit_positions) good_indices *= 2000 good_indices += 4000 @@ -142,38 +151,38 @@ def channel_analysis(): def merge_first_stage(init_mask): merged_frozen_mask = [] for e in range(0, len(init_mask), 2): - v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]] - s = init_mask[e]['size'] * 2 - if init_mask[e]['type'] == init_mask[e + 1]['type']: - t = init_mask[e]['type'] - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + v = [init_mask[e]["value"][0], init_mask[e + 1]["value"][0]] + s = init_mask[e]["size"] * 2 + if init_mask[e]["type"] == init_mask[e + 1]["type"]: + t = init_mask[e]["type"] + merged_frozen_mask.append({"value": v, "type": t, "size": s}) else: - t = 'RPT' - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + t = "RPT" + merged_frozen_mask.append({"value": v, "type": t, "size": s}) return merged_frozen_mask def merge_second_stage(init_mask): merged_frozen_mask = [] for e in range(0, len(init_mask), 2): - if init_mask[e]['type'] == init_mask[e + 1]['type']: - t = init_mask[e]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': - t = init_mask[e + 1]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE': - t = 'SPC' - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + if init_mask[e]["type"] == init_mask[e + 1]["type"]: + t = init_mask[e]["type"] + v = init_mask[e]["value"] + v.extend(init_mask[e + 1]["value"]) + s = init_mask[e]["size"] * 2 + merged_frozen_mask.append({"value": v, "type": t, "size": s}) + elif init_mask[e]["type"] == "ZERO" and init_mask[e + 1]["type"] == "RPT": + t = init_mask[e + 1]["type"] + v = init_mask[e]["value"] + v.extend(init_mask[e + 1]["value"]) + s = init_mask[e]["size"] * 2 + merged_frozen_mask.append({"value": v, "type": t, "size": s}) + elif init_mask[e]["type"] == "RPT" and init_mask[e + 1]["type"] == "ONE": + t = "SPC" + v = init_mask[e]["value"] + v.extend(init_mask[e + 1]["value"]) + s = init_mask[e]["size"] * 2 + merged_frozen_mask.append({"value": v, "type": t, "size": s}) else: merged_frozen_mask.append(init_mask[e]) merged_frozen_mask.append(init_mask[e + 1]) @@ -184,25 +193,27 @@ def merge_stage_n(init_mask): merged_frozen_mask = [] n_elems = len(init_mask) - (len(init_mask) % 2) for e in range(0, n_elems, 2): - if init_mask[e]['size'] == init_mask[e + 1]['size']: - if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']: - t = init_mask[e]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': - t = init_mask[e + 1]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE': - t = init_mask[e]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + if init_mask[e]["size"] == init_mask[e + 1]["size"]: + if ( + init_mask[e]["type"] == "ZERO" or init_mask[e]["type"] == "ONE" + ) and init_mask[e]["type"] == init_mask[e + 1]["type"]: + t = init_mask[e]["type"] + v = init_mask[e]["value"] + v.extend(init_mask[e + 1]["value"]) + s = init_mask[e]["size"] * 2 + merged_frozen_mask.append({"value": v, "type": t, "size": s}) + elif init_mask[e]["type"] == "ZERO" and init_mask[e + 1]["type"] == "RPT": + t = init_mask[e + 1]["type"] + v = init_mask[e]["value"] + v.extend(init_mask[e + 1]["value"]) + s = init_mask[e]["size"] * 2 + merged_frozen_mask.append({"value": v, "type": t, "size": s}) + elif init_mask[e]["type"] == "SPC" and init_mask[e + 1]["type"] == "ONE": + t = init_mask[e]["type"] + v = init_mask[e]["value"] + v.extend(init_mask[e + 1]["value"]) + s = init_mask[e]["size"] * 2 + merged_frozen_mask.append({"value": v, "type": t, "size": s}) else: merged_frozen_mask.append(init_mask[e]) merged_frozen_mask.append(init_mask[e + 1]) @@ -279,7 +290,7 @@ def find_decoder_subframes(frozen_mask): sub_mask = mask.flatten() lock_mask = lock.flatten() - words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'} + words = {0: "ZERO", 1: "ONE", 2: "RPT", 3: "SPC"} ll = lock_mask[0] sub_t = sub_mask[0] for i in range(len(frozen_mask)): @@ -289,18 +300,20 @@ def find_decoder_subframes(frozen_mask): # if i % 8 == 0: # print if not l == ll or not sub_mask[i] == sub_t: - print('--------------------------') + print("--------------------------") ll = l sub_t = sub_mask[i] - print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t)) + print("{0:4} lock {1:4} value: {2} in sub {3}".format(i, 2 ** (l + 1), v, t)) def systematic_encoder_decoder_chain_test(): - print('systematic encoder decoder chain test') + print("systematic encoder decoder chain test") block_size = int(2 ** 8) info_bit_size = block_size // 2 ntests = 100 - frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, block_size), block_size - info_bit_size) + frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters( + cc.bhattacharyya_bounds(0.0, block_size), block_size - info_bit_size + ) encoder = PolarEncoder(block_size, info_bit_size, frozenbitposition) decoder = PolarDecoder(block_size, info_bit_size, frozenbitposition) for i in range(ntests): @@ -333,5 +346,5 @@ def main(): systematic_encoder_decoder_chain_test() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/gr-utils/plot_tools/gr_plot_const b/gr-utils/plot_tools/gr_plot_const index 463baefd76..18f781e516 100755 --- a/gr-utils/plot_tools/gr_plot_const +++ b/gr-utils/plot_tools/gr_plot_const @@ -18,18 +18,22 @@ try: draw, figtext, figure, + show, get_current_fig_manager, rcParams, searchsorted, - show) + ) from matplotlib.font_manager import fontManager, FontProperties except ImportError: - print("Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \ - Python TkInter https://wiki.python.org/moin/TkInter to run this script") + print( + "Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \ + Python TkInter https://wiki.python.org/moin/TkInter to run this script" + ) raise SystemExit(1) from argparse import ArgumentParser + class draw_constellation: def __init__(self, filename, options): self.hfile = open(filename, "r") @@ -38,55 +42,78 @@ class draw_constellation: self.sample_rate = options.sample_rate self.datatype = numpy.complex64 - self.sizeof_data = self.datatype().nbytes # number of bytes per sample in file + # number of bytes per sample in file + self.sizeof_data = self.datatype().nbytes self.axis_font_size = 16 self.label_font_size = 18 self.title_font_size = 20 # Setup PLOT - self.fig = figure(1, figsize=(16, 9), facecolor='w') - rcParams['xtick.labelsize'] = self.axis_font_size - rcParams['ytick.labelsize'] = self.axis_font_size - - self.text_file = figtext(0.10, 0.95, ("File: %s" % filename), weight="heavy", size=16) - self.text_file_pos = figtext(0.10, 0.90, "File Position: ", weight="heavy", size=16) - self.text_block = figtext(0.40, 0.90, ("Block Size: %d" % self.block_length), - weight="heavy", size=16) - self.text_sr = figtext(0.60, 0.90, ("Sample Rate: %.2f" % self.sample_rate), - weight="heavy", size=16) + self.fig = figure(1, figsize=(16, 9), facecolor="w") + rcParams["xtick.labelsize"] = self.axis_font_size + rcParams["ytick.labelsize"] = self.axis_font_size + + self.text_file = figtext( + 0.10, 0.95, ("File: %s" % filename), weight="heavy", size=16 + ) + self.text_file_pos = figtext( + 0.10, 0.90, "File Position: ", weight="heavy", size=16 + ) + self.text_block = figtext( + 0.40, 0.90, ("Block Size: %d" % self.block_length), weight="heavy", size=16 + ) + self.text_sr = figtext( + 0.60, + 0.90, + ("Sample Rate: %.2f" % self.sample_rate), + weight="heavy", + size=16, + ) self.make_plots() - self.button_left_axes = self.fig.add_axes([0.45, 0.01, 0.05, 0.05], frameon=True) + self.button_left_axes = self.fig.add_axes( + [0.45, 0.01, 0.05, 0.05], frameon=True + ) self.button_left = Button(self.button_left_axes, "<") self.button_left_callback = self.button_left.on_clicked(self.button_left_click) - self.button_right_axes = self.fig.add_axes([0.50, 0.01, 0.05, 0.05], frameon=True) + self.button_right_axes = self.fig.add_axes( + [0.50, 0.01, 0.05, 0.05], frameon=True + ) self.button_right = Button(self.button_right_axes, ">") - self.button_right_callback = self.button_right.on_clicked(self.button_right_click) + self.button_right_callback = self.button_right.on_clicked( + self.button_right_click + ) self.xlim = self.sp_iq.get_xlim() self.manager = get_current_fig_manager() - connect('draw_event', self.zoom) - connect('key_press_event', self.click) - connect('button_press_event', self.mouse_button_callback) + connect("draw_event", self.zoom) + connect("key_press_event", self.click) + connect("button_press_event", self.mouse_button_callback) show() def get_data(self): - self.text_file_pos.set_text("File Position: %d" % (self.hfile.tell()//self.sizeof_data)) + self.text_file_pos.set_text( + "File Position: %d" % (self.hfile.tell() // self.sizeof_data) + ) try: - iq = numpy.fromfile(self.hfile, dtype=self.datatype, count=self.block_length) + iq = numpy.fromfile( + self.hfile, dtype=self.datatype, count=self.block_length + ) except MemoryError: print("End of File") else: # retesting length here as newer version of numpy does not throw a MemoryError, just # returns a zero-length array - if(len(iq) > 0): + if len(iq) > 0: self.reals = numpy.array([r.real for r in iq]) self.imags = numpy.array([i.imag for i in iq]) - self.time = numpy.array([i*(1/self.sample_rate) for i in range(len(self.reals))]) + self.time = numpy.array( + [i * (1 / self.sample_rate) for i in range(len(self.reals))] + ) return True else: print("End of File") @@ -94,34 +121,79 @@ class draw_constellation: def make_plots(self): # if specified on the command-line, set file pointer - self.hfile.seek(self.sizeof_data*self.start, 1) + self.hfile.seek(self.sizeof_data * self.start, 1) r = self.get_data() # Subplot for real and imaginary parts of signal - self.sp_iq = self.fig.add_subplot(2,1,1, position=[0.075, 0.2, 0.4, 0.6]) + self.sp_iq = self.fig.add_subplot(2, 1, 1, position=[0.075, 0.2, 0.4, 0.6]) self.sp_iq.set_title(("I&Q"), fontsize=self.title_font_size, fontweight="bold") - self.sp_iq.set_xlabel("Time (s)", fontsize=self.label_font_size, fontweight="bold") - self.sp_iq.set_ylabel("Amplitude (V)", fontsize=self.label_font_size, fontweight="bold") - self.plot_iq = self.sp_iq.plot(self.time, self.reals, 'bo-', self.time, self.imags, 'ro-') + self.sp_iq.set_xlabel( + "Time (s)", fontsize=self.label_font_size, fontweight="bold" + ) + self.sp_iq.set_ylabel( + "Amplitude (V)", fontsize=self.label_font_size, fontweight="bold" + ) + self.plot_iq = self.sp_iq.plot( + self.time, self.reals, "bo-", self.time, self.imags, "ro-" + ) # Subplot for constellation plot - self.sp_const = self.fig.add_subplot(2,2,1, position=[0.575, 0.2, 0.4, 0.6]) - self.sp_const.set_title(("Constellation"), fontsize=self.title_font_size, fontweight="bold") - self.sp_const.set_xlabel("Inphase", fontsize=self.label_font_size, fontweight="bold") - self.sp_const.set_ylabel("Quadrature", fontsize=self.label_font_size, fontweight="bold") - self.plot_const = self.sp_const.plot(self.reals, self.imags, 'bo') - - # Add plots to mark current location of point between time and constellation plots + self.sp_const = self.fig.add_subplot(2, 2, 1, position=[0.575, 0.2, 0.4, 0.6]) + self.sp_const.set_title( + ("Constellation"), fontsize=self.title_font_size, fontweight="bold" + ) + self.sp_const.set_xlabel( + "Inphase", fontsize=self.label_font_size, fontweight="bold" + ) + self.sp_const.set_ylabel( + "Quadrature", fontsize=self.label_font_size, fontweight="bold" + ) + self.plot_const = self.sp_const.plot(self.reals, self.imags, "bo") + + # Add plots to mark current location of point between time and + # constellation plots self.indx = 0 - self.plot_iq += self.sp_iq.plot([self.time[self.indx],], [self.reals[self.indx],], 'mo', ms=8) - self.plot_iq += self.sp_iq.plot([self.time[self.indx],], [self.imags[self.indx],], 'mo', ms=8) - self.plot_const += self.sp_const.plot([self.reals[self.indx],], [self.imags[self.indx],], 'mo', ms=12) + self.plot_iq += self.sp_iq.plot( + [ + self.time[self.indx], + ], + [ + self.reals[self.indx], + ], + "mo", + ms=8, + ) + self.plot_iq += self.sp_iq.plot( + [ + self.time[self.indx], + ], + [ + self.imags[self.indx], + ], + "mo", + ms=8, + ) + self.plot_const += self.sp_const.plot( + [ + self.reals[self.indx], + ], + [ + self.imags[self.indx], + ], + "mo", + ms=12, + ) # Adjust axis - self.sp_iq.axis([self.time.min(), self.time.max(), - 1.5*min([self.reals.min(), self.imags.min()]), - 1.5*max([self.reals.max(), self.imags.max()])]) + self.sp_iq.axis( + [ + self.time.min(), + self.time.max(), + 1.5 * min([self.reals.min(), self.imags.min()]), + 1.5 * max([self.reals.max(), self.imags.max()]), + ] + ) self.sp_const.axis([-2, 2, -2, 2]) draw() @@ -129,9 +201,14 @@ class draw_constellation: def update_plots(self): self.plot_iq[0].set_data([self.time, self.reals]) self.plot_iq[1].set_data([self.time, self.imags]) - self.sp_iq.axis([self.time.min(), self.time.max(), - 1.5*min([self.reals.min(), self.imags.min()]), - 1.5*max([self.reals.max(), self.imags.max()])]) + self.sp_iq.axis( + [ + self.time.min(), + self.time.max(), + 1.5 * min([self.reals.min(), self.imags.min()]), + 1.5 * max([self.reals.max(), self.imags.max()]), + ] + ) self.plot_const[0].set_data([self.reals, self.imags]) self.sp_const.axis([-2, 2, -2, 2]) @@ -140,7 +217,7 @@ class draw_constellation: def zoom(self, event): newxlim = numpy.array(self.sp_iq.get_xlim()) curxlim = numpy.array(self.xlim) - if(newxlim[0] != curxlim[0] or newxlim[1] != curxlim[1]): + if newxlim[0] != curxlim[0] or newxlim[1] != curxlim[1]: self.xlim = newxlim r = self.reals[int(ceil(self.xlim[0])) : int(ceil(self.xlim[1]))] i = self.imags[int(ceil(self.xlim[0])) : int(ceil(self.xlim[1]))] @@ -153,21 +230,25 @@ class draw_constellation: def click(self, event): forward_valid_keys = [" ", "down", "right"] backward_valid_keys = ["up", "left"] - trace_forward_valid_keys = [">",] - trace_backward_valid_keys = ["<",] - - if(find(event.key, forward_valid_keys)): + trace_forward_valid_keys = [ + ">", + ] + trace_backward_valid_keys = [ + "<", + ] + + if find(event.key, forward_valid_keys): self.step_forward() - elif(find(event.key, backward_valid_keys)): + elif find(event.key, backward_valid_keys): self.step_backward() - elif(find(event.key, trace_forward_valid_keys)): - self.indx = min(self.indx+1, len(self.time)-1) + elif find(event.key, trace_forward_valid_keys): + self.indx = min(self.indx + 1, len(self.time) - 1) self.set_trace(self.indx) - elif(find(event.key, trace_backward_valid_keys)): - self.indx = max(0, self.indx-1) + elif find(event.key, trace_backward_valid_keys): + self.indx = max(0, self.indx - 1) self.set_trace(self.indx) def button_left_click(self, event): @@ -178,29 +259,27 @@ class draw_constellation: def step_forward(self): r = self.get_data() - if(r): + if r: self.update_plots() def step_backward(self): # Step back in file position - if(self.hfile.tell() >= 2*self.sizeof_data*self.block_length ): - self.hfile.seek(-2*self.sizeof_data*self.block_length, 1) + if self.hfile.tell() >= 2 * self.sizeof_data * self.block_length: + self.hfile.seek(-2 * self.sizeof_data * self.block_length, 1) else: - self.hfile.seek(-self.hfile.tell(),1) + self.hfile.seek(-self.hfile.tell(), 1) r = self.get_data() - if(r): + if r: self.update_plots() - def mouse_button_callback(self, event): x, y = event.xdata, event.ydata if x is not None and y is not None: - if(event.inaxes == self.sp_iq): + if event.inaxes == self.sp_iq: self.indx = searchsorted(self.time, [x]) self.set_trace(self.indx) - def set_trace(self, indx): self.plot_iq[2].set_data(self.time[indx], self.reals[indx]) self.plot_iq[3].set_data(self.time[indx], self.imags[indx]) @@ -210,7 +289,7 @@ class draw_constellation: def find(item_in, list_search): try: - return list_search.index(item_in) != None + return list_search.index(item_in) is not None except ValueError: return False @@ -219,18 +298,33 @@ def main(): description = "Takes a GNU Radio complex binary file and displays the I&Q data versus time and the constellation plot (I vs. Q). You can set the block size to specify how many points to read in at a time and the start position in the file. By default, the system assumes a sample rate of 1, so in time, each sample is plotted versus the sample number. To set a true time axis, set the sample rate (-R or --sample-rate) to the sample rate used when capturing the samples." parser = ArgumentParser(conflict_handler="resolve", description=description) - parser.add_argument("-B", "--block", type=int, default=1000, - help="Specify the block size [default=%(default)r]") - parser.add_argument("-s", "--start", type=int, default=0, - help="Specify where to start in the file [default=%(default)r]") - parser.add_argument("-R", "--sample-rate", type=float, default=1.0, - help="Set the sampler rate of the data [default=%(default)r]") - parser.add_argument("file", metavar="FILE", - help="Input file with complex samples") + parser.add_argument( + "-B", + "--block", + type=int, + default=1000, + help="Specify the block size [default=%(default)r]", + ) + parser.add_argument( + "-s", + "--start", + type=int, + default=0, + help="Specify where to start in the file [default=%(default)r]", + ) + parser.add_argument( + "-R", + "--sample-rate", + type=float, + default=1.0, + help="Set the sampler rate of the data [default=%(default)r]", + ) + parser.add_argument("file", metavar="FILE", help="Input file with complex samples") args = parser.parse_args() dc = draw_constellation(args.file, args) + if __name__ == "__main__": try: main() diff --git a/gr-utils/plot_tools/gr_plot_iq b/gr-utils/plot_tools/gr_plot_iq index 924109e30d..ba3838c081 100755 --- a/gr-utils/plot_tools/gr_plot_iq +++ b/gr-utils/plot_tools/gr_plot_iq @@ -11,15 +11,27 @@ import numpy try: - from pylab import (Button, connect, draw, figtext, figure, - get_current_fig_manager, plot, rcParams, show) + from pylab import ( + Button, + connect, + draw, + figtext, + figure, + get_current_fig_manager, + plot, + rcParams, + show, + ) except ImportError: - print("Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \ - Python TkInter https://wiki.python.org/moin/TkInter to run this script") + print( + "Please install Python Matplotlib (http://matplotlib.sourceforge.net/) and \ + Python TkInter https://wiki.python.org/moin/TkInter to run this script" + ) raise SystemExit(1) from argparse import ArgumentParser + class draw_iq: def __init__(self, filename, options): self.hfile = open(filename, "r") @@ -28,7 +40,8 @@ class draw_iq: self.sample_rate = options.sample_rate self.datatype = numpy.complex64 - self.sizeof_data = self.datatype().nbytes # number of bytes per sample in file + # number of bytes per sample in file + self.sizeof_data = self.datatype().nbytes self.axis_font_size = 16 self.label_font_size = 18 @@ -36,65 +49,103 @@ class draw_iq: self.text_size = 22 # Setup PLOT - self.fig = figure(1, figsize=(16, 9), facecolor='w') - rcParams['xtick.labelsize'] = self.axis_font_size - rcParams['ytick.labelsize'] = self.axis_font_size - - self.text_file = figtext(0.10, 0.94, ("File: %s" % filename), weight="heavy", size=self.text_size) - self.text_file_pos = figtext(0.10, 0.88, "File Position: ", weight="heavy", size=self.text_size) - self.text_block = figtext(0.40, 0.88, ("Block Size: %d" % self.block_length), - weight="heavy", size=self.text_size) - self.text_sr = figtext(0.60, 0.88, ("Sample Rate: %.2f" % self.sample_rate), - weight="heavy", size=self.text_size) + self.fig = figure(1, figsize=(16, 9), facecolor="w") + rcParams["xtick.labelsize"] = self.axis_font_size + rcParams["ytick.labelsize"] = self.axis_font_size + + self.text_file = figtext( + 0.10, 0.94, ("File: %s" % filename), weight="heavy", size=self.text_size + ) + self.text_file_pos = figtext( + 0.10, 0.88, "File Position: ", weight="heavy", size=self.text_size + ) + self.text_block = figtext( + 0.40, + 0.88, + ("Block Size: %d" % self.block_length), + weight="heavy", + size=self.text_size, + ) + self.text_sr = figtext( + 0.60, + 0.88, + ("Sample Rate: %.2f" % self.sample_rate), + weight="heavy", + size=self.text_size, + ) self.make_plots() - self.button_left_axes = self.fig.add_axes([0.45, 0.01, 0.05, 0.05], frameon=True) + self.button_left_axes = self.fig.add_axes( + [0.45, 0.01, 0.05, 0.05], frameon=True + ) self.button_left = Button(self.button_left_axes, "<") self.button_left_callback = self.button_left.on_clicked(self.button_left_click) - self.button_right_axes = self.fig.add_axes([0.50, 0.01, 0.05, 0.05], frameon=True) + self.button_right_axes = self.fig.add_axes( + [0.50, 0.01, 0.05, 0.05], frameon=True + ) self.button_right = Button(self.button_right_axes, ">") - self.button_right_callback = self.button_right.on_clicked(self.button_right_click) + self.button_right_callback = self.button_right.on_clicked( + self.button_right_click + ) self.xlim = self.sp_iq.get_xlim() self.manager = get_current_fig_manager() - connect('key_press_event', self.click) + connect("key_press_event", self.click) show() def get_data(self): - self.text_file_pos.set_text("File Position: %d" % (self.hfile.tell()//self.sizeof_data)) + self.text_file_pos.set_text( + "File Position: %d" % (self.hfile.tell() // self.sizeof_data) + ) try: - self.iq = numpy.fromfile(self.hfile, dtype=self.datatype, count=self.block_length) + self.iq = numpy.fromfile( + self.hfile, dtype=self.datatype, count=self.block_length + ) except MemoryError: print("End of File") else: self.reals = numpy.array([r.real for r in self.iq]) self.imags = numpy.array([i.imag for i in self.iq]) - self.time = numpy.array([i*(1/self.sample_rate) for i in range(len(self.reals))]) + self.time = numpy.array( + [i * (1 / self.sample_rate) for i in range(len(self.reals))] + ) def make_plots(self): # if specified on the command-line, set file pointer - self.hfile.seek(self.sizeof_data*self.start, 1) + self.hfile.seek(self.sizeof_data * self.start, 1) self.get_data() # Subplot for real and imaginary parts of signal - self.sp_iq = self.fig.add_subplot(2,1,1, position=[0.075, 0.14, 0.85, 0.67]) + self.sp_iq = self.fig.add_subplot(2, 1, 1, position=[0.075, 0.14, 0.85, 0.67]) self.sp_iq.set_title(("I&Q"), fontsize=self.title_font_size, fontweight="bold") - self.sp_iq.set_xlabel("Time (s)", fontsize=self.label_font_size, fontweight="bold") - self.sp_iq.set_ylabel("Amplitude (V)", fontsize=self.label_font_size, fontweight="bold") - self.plot_iq = plot(self.time, self.reals, 'bo-', self.time, self.imags, 'ro-') - self.sp_iq.set_ylim([1.5*min([self.reals.min(), self.imags.min()]), - 1.5*max([self.reals.max(), self.imags.max()])]) + self.sp_iq.set_xlabel( + "Time (s)", fontsize=self.label_font_size, fontweight="bold" + ) + self.sp_iq.set_ylabel( + "Amplitude (V)", fontsize=self.label_font_size, fontweight="bold" + ) + self.plot_iq = plot(self.time, self.reals, "bo-", self.time, self.imags, "ro-") + self.sp_iq.set_ylim( + [ + 1.5 * min([self.reals.min(), self.imags.min()]), + 1.5 * max([self.reals.max(), self.imags.max()]), + ] + ) self.sp_iq.set_xlim(self.time.min(), self.time.max()) draw() def update_plots(self): self.plot_iq[0].set_data([self.time, self.reals]) self.plot_iq[1].set_data([self.time, self.imags]) - self.sp_iq.set_ylim([1.5*min([self.reals.min(), self.imags.min()]), - 1.5*max([self.reals.max(), self.imags.max()])]) + self.sp_iq.set_ylim( + [ + 1.5 * min([self.reals.min(), self.imags.min()]), + 1.5 * max([self.reals.max(), self.imags.max()]), + ] + ) self.sp_iq.set_xlim(self.time.min(), self.time.max()) draw() @@ -102,10 +153,10 @@ class draw_iq: forward_valid_keys = [" ", "down", "right"] backward_valid_keys = ["up", "left"] - if(find(event.key, forward_valid_keys)): + if find(event.key, forward_valid_keys): self.step_forward() - elif(find(event.key, backward_valid_keys)): + elif find(event.key, backward_valid_keys): self.step_backward() def button_left_click(self, event): @@ -120,37 +171,53 @@ class draw_iq: def step_backward(self): # Step back in file position - if(self.hfile.tell() >= 2*self.sizeof_data*self.block_length ): - self.hfile.seek(-2*self.sizeof_data*self.block_length, 1) + if self.hfile.tell() >= 2 * self.sizeof_data * self.block_length: + self.hfile.seek(-2 * self.sizeof_data * self.block_length, 1) else: - self.hfile.seek(-self.hfile.tell(),1) + self.hfile.seek(-self.hfile.tell(), 1) self.get_data() self.update_plots() def find(item_in, list_search): try: - return list_search.index(item_in) != None + return list_search.index(item_in) is not None except ValueError: return False + def main(): description = "Takes a GNU Radio complex binary file and displays the I&Q data versus time. You can set the block size to specify how many points to read in at a time and the start position in the file. By default, the system assumes a sample rate of 1, so in time, each sample is plotted versus the sample number. To set a true time axis, set the sample rate (-R or --sample-rate) to the sample rate used when capturing the samples." parser = ArgumentParser(conflict_handler="resolve", description=description) - parser.add_argument("-B", "--block", type=int, default=1000, - help="Specify the block size [default=%(default)r]") - parser.add_argument("-s", "--start", type=int, default=0, - help="Specify where to start in the file [default=%(default)r]") - parser.add_argument("-R", "--sample-rate", type=float, default=1.0, - help="Set the sampler rate of the data [default=%(default)r]") - parser.add_argument("file", metavar="FILE", - help="Input file with complex samples") + parser.add_argument( + "-B", + "--block", + type=int, + default=1000, + help="Specify the block size [default=%(default)r]", + ) + parser.add_argument( + "-s", + "--start", + type=int, + default=0, + help="Specify where to start in the file [default=%(default)r]", + ) + parser.add_argument( + "-R", + "--sample-rate", + type=float, + default=1.0, + help="Set the sampler rate of the data [default=%(default)r]", + ) + parser.add_argument("file", metavar="FILE", help="Input file with complex samples") args = parser.parse_args() dc = draw_iq(args.file, args) + if __name__ == "__main__": try: main() |