summaryrefslogtreecommitdiff
path: root/gr-fec/python/fec/polar
diff options
context:
space:
mode:
Diffstat (limited to 'gr-fec/python/fec/polar')
-rw-r--r--gr-fec/python/fec/polar/__init__.py18
-rw-r--r--gr-fec/python/fec/polar/channel_construction.py3
-rw-r--r--gr-fec/python/fec/polar/channel_construction_awgn.py18
-rw-r--r--gr-fec/python/fec/polar/channel_construction_bec.py5
-rw-r--r--gr-fec/python/fec/polar/common.py3
-rw-r--r--gr-fec/python/fec/polar/decoder.py25
-rw-r--r--gr-fec/python/fec/polar/encoder.py8
-rw-r--r--gr-fec/python/fec/polar/helper_functions.py9
-rw-r--r--gr-fec/python/fec/polar/polar_channel_construction12
-rw-r--r--gr-fec/python/fec/polar/testbed.py9
10 files changed, 67 insertions, 43 deletions
diff --git a/gr-fec/python/fec/polar/__init__.py b/gr-fec/python/fec/polar/__init__.py
index ce4e506334..26698752c8 100644
--- a/gr-fec/python/fec/polar/__init__.py
+++ b/gr-fec/python/fec/polar/__init__.py
@@ -17,19 +17,25 @@ from .helper_functions import is_power_of_two
CHANNEL_TYPE_AWGN = 'AWGN'
CHANNEL_TYPE_BEC = 'BEC'
+
def get_z_params(is_prototype, channel, block_size, design_snr, mu):
- print('POLAR code channel construction called with parameters channel={0}, blocksize={1}, design SNR={2}, mu={3}'.format(channel, block_size, design_snr, mu))
+ print('POLAR code channel construction called with parameters channel={0}, blocksize={1}, design SNR={2}, mu={3}'.format(
+ channel, block_size, design_snr, mu))
if not (channel == 'AWGN' or channel == 'BEC'):
- raise ValueError("channel is {0}, but only BEC and AWGN are supported!".format(channel))
+ raise ValueError(
+ "channel is {0}, but only BEC and AWGN are supported!".format(channel))
if not is_power_of_two(block_size):
- raise ValueError("block size={0} is not a power of 2!".format(block_size))
+ raise ValueError(
+ "block size={0} is not a power of 2!".format(block_size))
if design_snr < -1.5917:
- raise ValueError("design SNR={0} < -1.5917. MUST be greater!".format(design_snr))
+ raise ValueError(
+ "design SNR={0} < -1.5917. MUST be greater!".format(design_snr))
if not mu > 0:
raise ValueError("mu={0} < 1. MUST be > 1!".format(mu))
if not is_prototype and channel == 'AWGN':
z_params = cc.load_z_parameters(block_size, design_snr, mu)
- print('Read Z-parameter file: {0}'.format(cc.default_dir() + cc.generate_filename(block_size, design_snr, mu)))
+ print('Read Z-parameter file: {0}'.format(cc.default_dir() +
+ cc.generate_filename(block_size, design_snr, mu)))
return z_params
return bhattacharyya_bounds(design_snr, block_size)
@@ -48,5 +54,5 @@ def load_frozen_bits_info(is_prototype, channel, block_size, num_info_bits, desi
'design_snr': design_snr,
'channel': channel,
'mu': mu,
- }
+ }
return data_set
diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py
index 2ec78679a5..643bfb7609 100644
--- a/gr-fec/python/fec/polar/channel_construction.py
+++ b/gr-fec/python/fec/polar/channel_construction.py
@@ -69,7 +69,8 @@ def frozen_bit_positions(block_size, info_size, design_snr=0.0):
def generate_filename(block_size, design_snr, mu):
filename = "polar_code_z_parameters_N" + str(int(block_size))
- filename += "_SNR" + str(float(design_snr)) + "_MU" + str(int(mu)) + ".polar"
+ filename += "_SNR" + str(float(design_snr)) + \
+ "_MU" + str(int(mu)) + ".polar"
return filename
diff --git a/gr-fec/python/fec/polar/channel_construction_awgn.py b/gr-fec/python/fec/polar/channel_construction_awgn.py
index 667c17b8fc..5a90262007 100644
--- a/gr-fec/python/fec/polar/channel_construction_awgn.py
+++ b/gr-fec/python/fec/polar/channel_construction_awgn.py
@@ -95,9 +95,9 @@ def discretize_awgn(mu, design_snr):
def instant_capacity_delta_callable():
return (
- lambda a, b: -1.0 * (a + b) * np.log2((a + b) / 2)
- + a * np.log2(a)
- + b * np.log2(b)
+ lambda a, b: -1.0 * (a + b) * np.log2((a + b) / 2) +
+ a * np.log2(a) +
+ b * np.log2(b)
)
@@ -114,16 +114,19 @@ def quantize_to_size(tpm, mu):
print("WARNING: This channel gets too small!")
# lambda works on vectors just fine. Use Numpy vector awesomeness.
- delta_i_vec = calculate_delta_I(tpm[0, 0:-1], tpm[1, 0:-1], tpm[0, 1:], tpm[1, 1:])
+ delta_i_vec = calculate_delta_I(
+ tpm[0, 0:-1], tpm[1, 0:-1], tpm[0, 1:], tpm[1, 1:])
for i in range(L - mu):
d = np.argmin(delta_i_vec)
ap = tpm[0, d] + tpm[0, d + 1]
bp = tpm[1, d] + tpm[1, d + 1]
if d > 0:
- delta_i_vec[d - 1] = calculate_delta_I(tpm[0, d - 1], tpm[1, d - 1], ap, bp)
+ delta_i_vec[d -
+ 1] = calculate_delta_I(tpm[0, d - 1], tpm[1, d - 1], ap, bp)
if d < delta_i_vec.size - 1:
- delta_i_vec[d + 1] = calculate_delta_I(ap, bp, tpm[0, d + 1], tpm[1, d + 1])
+ delta_i_vec[d +
+ 1] = calculate_delta_I(ap, bp, tpm[0, d + 1], tpm[1, d + 1])
delta_i_vec = np.delete(delta_i_vec, d)
tpm = np.delete(tpm, d, axis=1)
tpm[0, d] = ap
@@ -174,7 +177,8 @@ def tal_vardy_tpm_algorithm(block_size, design_snr, mu):
def merge_lr_based(q, mu):
lrs = q[0] / q[1]
- vals, indices, inv_indices = np.unique(lrs, return_index=True, return_inverse=True)
+ vals, indices, inv_indices = np.unique(
+ lrs, return_index=True, return_inverse=True)
# compare [1] (20). Ordering of representatives according to LRs.
temp = np.zeros((2, len(indices)), dtype=float)
if vals.size < mu:
diff --git a/gr-fec/python/fec/polar/channel_construction_bec.py b/gr-fec/python/fec/polar/channel_construction_bec.py
index dd0f386d81..edbfbacf45 100644
--- a/gr-fec/python/fec/polar/channel_construction_bec.py
+++ b/gr-fec/python/fec/polar/channel_construction_bec.py
@@ -41,7 +41,7 @@ def calc_one_recursion(iw0):
def calculate_bec_channel_capacities_loop(initial_channel, block_power):
- # compare [0, Arikan] eq. 6
+ # compare [0, Arikan] eq. 6
iw = np.array([initial_channel, ], dtype=float)
for i in range(block_power):
iw = calc_one_recursion(iw)
@@ -136,7 +136,7 @@ def plot_channel_capacities(capacity, save_file=None):
def plot_average_channel_distance(save_file=None):
- eta = 0.5 # design_snr_to_bec_eta(-1.5917)
+ eta = 0.5 # design_snr_to_bec_eta(-1.5917)
powers = np.arange(4, 26)
try:
@@ -219,5 +219,6 @@ def main():
# plot_average_channel_distance()
calculate_bec_channel_z_parameters(eta, block_size)
+
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/common.py b/gr-fec/python/fec/polar/common.py
index e60cc911f2..178544464a 100644
--- a/gr-fec/python/fec/polar/common.py
+++ b/gr-fec/python/fec/polar/common.py
@@ -46,7 +46,8 @@ class PolarCommon(object):
self.K = k
self.frozenbits = frozenbits
self.frozen_bit_position = frozen_bit_position
- self.info_bit_position = np.delete(np.arange(self.N), self.frozen_bit_position)
+ self.info_bit_position = np.delete(
+ np.arange(self.N), self.frozen_bit_position)
def _insert_frozen_bits(self, u):
prototype = np.empty(self.N, dtype=int)
diff --git a/gr-fec/python/fec/polar/decoder.py b/gr-fec/python/fec/polar/decoder.py
index 8cbda51017..d245f06e40 100644
--- a/gr-fec/python/fec/polar/decoder.py
+++ b/gr-fec/python/fec/polar/decoder.py
@@ -19,8 +19,10 @@ class PolarDecoder(PolarCommon):
def __init__(self, n, k, frozen_bit_position, frozenbits=None):
PolarCommon.__init__(self, n, k, frozen_bit_position, frozenbits)
- self.error_probability = 0.1 # this is kind of a dummy value. usually chosen individually.
- self.lrs = ((1 - self.error_probability) / self.error_probability, self.error_probability / (1 - self.error_probability))
+ # this is kind of a dummy value. usually chosen individually.
+ self.error_probability = 0.1
+ self.lrs = ((1 - self.error_probability) / self.error_probability,
+ self.error_probability / (1 - self.error_probability))
self.llrs = np.log(self.lrs)
def _llr_bit(self, bit):
@@ -77,8 +79,8 @@ class PolarDecoder(PolarCommon):
def _calculate_lrs(self, y, u):
ue = self._get_even_indices_values(u)
uo = self._get_odd_indices_values(u)
- ya = y[0:y.size//2]
- yb = y[(y.size//2):]
+ ya = y[0:y.size // 2]
+ yb = y[(y.size // 2):]
la = self._lr_decision_element(ya, (ue + uo) % 2)
lb = self._lr_decision_element(yb, ue)
return la, lb
@@ -140,7 +142,8 @@ class PolarDecoder(PolarCommon):
for i in range(self.N):
graph[i][self.power] = self._llr_bit(y[i])
decode_order = self._vector_bit_reversed(np.arange(self.N), self.power)
- decode_order = np.delete(decode_order, np.where(decode_order >= self.N // 2))
+ decode_order = np.delete(
+ decode_order, np.where(decode_order >= self.N // 2))
u = np.array([], dtype=int)
for pos in decode_order:
graph = self._butterfly(pos, 0, graph, u)
@@ -171,7 +174,8 @@ class PolarDecoder(PolarCommon):
# activate right side butterflies
u_even = self._get_even_indices_values(u)
u_odd = self._get_odd_indices_values(u)
- graph = self._butterfly(bf_entry_row, stage + 1, graph, (u_even + u_odd) % 2)
+ graph = self._butterfly(bf_entry_row, stage + 1,
+ graph, (u_even + u_odd) % 2)
lower_right = bf_entry_row + self.N // (2 ** (stage + 1))
graph = self._butterfly(lower_right, stage + 1, graph, u_even)
@@ -182,7 +186,8 @@ class PolarDecoder(PolarCommon):
def decode(self, data, is_packed=False):
if not len(data) == self.N:
- raise ValueError("len(data)={0} is not equal to n={1}!".format(len(data), self.N))
+ raise ValueError(
+ "len(data)={0} is not equal to n={1}!".format(len(data), self.N))
if is_packed:
data = np.unpackbits(data)
data = self._lr_sc_decoder_efficient(data)
@@ -192,12 +197,14 @@ class PolarDecoder(PolarCommon):
return data
def _extract_info_bits_reversed(self, y):
- info_bit_positions_reversed = self._vector_bit_reversed(self.info_bit_position, self.power)
+ info_bit_positions_reversed = self._vector_bit_reversed(
+ self.info_bit_position, self.power)
return y[info_bit_positions_reversed]
def decode_systematic(self, data):
if not len(data) == self.N:
- raise ValueError("len(data)={0} is not equal to n={1}!".format(len(data), self.N))
+ raise ValueError(
+ "len(data)={0} is not equal to n={1}!".format(len(data), self.N))
# data = self._reverse_bits(data)
data = self._lr_sc_decoder_efficient(data)
data = self._encode_natural_order(data)
diff --git a/gr-fec/python/fec/polar/encoder.py b/gr-fec/python/fec/polar/encoder.py
index d6279b1af5..273d6cc9cf 100644
--- a/gr-fec/python/fec/polar/encoder.py
+++ b/gr-fec/python/fec/polar/encoder.py
@@ -32,7 +32,8 @@ class PolarEncoder(PolarCommon):
def encode(self, data, is_packed=False):
if not len(data) == self.K:
- raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K))
+ raise ValueError(
+ "len(data)={0} is not equal to k={1}!".format(len(data), self.K))
if is_packed:
data = np.unpackbits(data)
if np.max(data) > 1 or np.min(data) < 0:
@@ -45,7 +46,8 @@ class PolarEncoder(PolarCommon):
def encode_systematic(self, data):
if not len(data) == self.K:
- raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K))
+ raise ValueError(
+ "len(data)={0} is not equal to k={1}!".format(len(data), self.K))
if np.max(data) > 1 or np.min(data) < 0:
raise ValueError("can only encode bits!")
@@ -102,7 +104,7 @@ def test_encoder_impls():
# frozenbits = np.zeros(n - k)
# frozenbitposition8 = np.array((0, 1, 2, 4), dtype=int) # keep it!
frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
- encoder = PolarEncoder(n, k, frozenbitposition) #, frozenbits)
+ encoder = PolarEncoder(n, k, frozenbitposition) # , frozenbits)
print('result:', compare_results(encoder, ntests, k))
print('Test rate-1 encoder/decoder chain results')
diff --git a/gr-fec/python/fec/polar/helper_functions.py b/gr-fec/python/fec/polar/helper_functions.py
index 357bccd5b2..bcddfb83c9 100644
--- a/gr-fec/python/fec/polar/helper_functions.py
+++ b/gr-fec/python/fec/polar/helper_functions.py
@@ -8,7 +8,8 @@
import numpy as np
-import time, sys
+import time
+import sys
import copy
@@ -125,8 +126,8 @@ def show_progress_bar(ndone, ntotal):
percentage = 100. * fract
ndone_chars = int(nchars * fract)
nundone_chars = nchars - ndone_chars
- sys.stdout.write('\r[{0}{1}] {2:5.2f}% ({3} / {4})'.format('=' * ndone_chars, ' ' * nundone_chars, percentage, ndone, ntotal))
-
+ sys.stdout.write('\r[{0}{1}] {2:5.2f}% ({3} / {4})'.format('=' *
+ ndone_chars, ' ' * nundone_chars, percentage, ndone, ntotal))
def mutual_information(w):
@@ -173,7 +174,6 @@ def main():
n = 6
m = 2 ** n
-
pos = np.arange(m)
rev_pos = bit_reverse_vector(pos, n)
print(pos)
@@ -190,6 +190,5 @@ def main():
print(a)
-
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/polar_channel_construction b/gr-fec/python/fec/polar/polar_channel_construction
index 8ea5753d48..6f1879acf7 100644
--- a/gr-fec/python/fec/polar/polar_channel_construction
+++ b/gr-fec/python/fec/polar/polar_channel_construction
@@ -18,14 +18,14 @@ def setup_parser():
override this and call the parent function. """
parser = ArgumentParser()
parser.add_argument("-c", "--channel", choices=('BEC', 'AWGN'),
- help="specify channel, currently BEC or AWGN (default='BEC')",
- default='BEC')
+ help="specify channel, currently BEC or AWGN (default='BEC')",
+ default='BEC')
parser.add_argument("-b", "--blocksize", type=int, dest="block_size",
- help="specify block size of polar code (default=16)", default=16)
+ help="specify block size of polar code (default=16)", default=16)
parser.add_argument("-s", "--design-snr", type=float, dest="design_snr",
- help="specify design SNR of polar code (default=0.0)", default=0.0)
+ help="specify design SNR of polar code (default=0.0)", default=0.0)
parser.add_argument("-k", "--mu", type=int,
- help="specify block size of polar code (default=2)", default=2)
+ help="specify block size of polar code (default=2)", default=2)
return parser
@@ -37,7 +37,7 @@ def main():
args = parser.parse_args()
z_params = cc.get_z_params(False, args.channel, args.block_size,
- args.design_snr, args.mu)
+ args.design_snr, args.mu)
print(z_params)
diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py
index 4e1b40a7f7..7ce9f57bb0 100644
--- a/gr-fec/python/fec/polar/testbed.py
+++ b/gr-fec/python/fec/polar/testbed.py
@@ -52,7 +52,8 @@ def is_equal(first, second):
result = first == second
for i in range(len(result)):
print(
- "{0:4}: {1:2} == {2:1} = {3}".format(i, first[i], second[i], result[i])
+ "{0:4}: {1:2} == {2:1} = {3}".format(
+ i, first[i], second[i], result[i])
)
return False
return True
@@ -132,7 +133,8 @@ def channel_analysis():
channel_counter = np.load(filename)
print(np.min(channel_counter), np.max(channel_counter))
channel_counter[0] = np.min(channel_counter)
- good_indices = find_good_indices(channel_counter, channel_counter.size // 2)
+ good_indices = find_good_indices(
+ channel_counter, channel_counter.size // 2)
info_bit_positions = np.where(good_indices > 0)
print(info_bit_positions)
frozen_bit_positions = np.delete(
@@ -303,7 +305,8 @@ def find_decoder_subframes(frozen_mask):
print("--------------------------")
ll = l
sub_t = sub_mask[i]
- print("{0:4} lock {1:4} value: {2} in sub {3}".format(i, 2 ** (l + 1), v, t))
+ print("{0:4} lock {1:4} value: {2} in sub {3}".format(
+ i, 2 ** (l + 1), v, t))
def systematic_encoder_decoder_chain_test():