diff options
author | Johannes Demel <ufcsy@student.kit.edu> | 2015-08-12 16:51:02 +0200 |
---|---|---|
committer | Johannes Demel <ufcsy@student.kit.edu> | 2015-09-21 10:46:59 +0200 |
commit | d9719d7da8300c8546b305dab2eff763f47d216f (patch) | |
tree | ca8062d252e6e9f2e389fa85f045a92b5218bdee /gr-fec/python | |
parent | 73d84a231c31bf8312214b5e3a0a97e15c8db98f (diff) |
polar: refarctoring and clean-up
Diffstat (limited to 'gr-fec/python')
-rw-r--r-- | gr-fec/python/fec/polar/README.md | 7 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/channel_construction.py | 200 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/channel_construction_bec.py | 9 | ||||
-rwxr-xr-x | gr-fec/python/fec/polar/channel_construction_bsc.py | 67 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/decoder.py | 26 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/encoder.py | 35 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/helper_functions.py | 96 | ||||
-rwxr-xr-x | gr-fec/python/fec/polar/testbed.py | 208 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_decoder_sc.py | 31 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_decoder_sc_list.py | 46 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_encoder.py | 8 |
11 files changed, 292 insertions, 441 deletions
diff --git a/gr-fec/python/fec/polar/README.md b/gr-fec/python/fec/polar/README.md index 2bd00dc3de..d425e8650d 100644 --- a/gr-fec/python/fec/polar/README.md +++ b/gr-fec/python/fec/polar/README.md @@ -1,4 +1,9 @@ POLAR Code Python test functions module =========== -This folder contains all the necessary files for POLAR code testcode. It shall serve as a reference later on.
\ No newline at end of file +This directory contains all the necessary files for POLAR code testcode. +It serves as a reference for C++ implementations. + +'polar_channel_construction' exposes functionality to calculate polar channels for different sizes. +It may be used to calculate Bhattacharyya parameters once and store them in a file in '~/.gnuradio/polar'. +Frozen bit positions are recalculated on every run.
\ No newline at end of file diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py index 9c38d3a7e6..bf3ff925d8 100644 --- a/gr-fec/python/fec/polar/channel_construction.py +++ b/gr-fec/python/fec/polar/channel_construction.py @@ -24,9 +24,9 @@ foundational paper for polar codes. ''' -import numpy as np from channel_construction_bec import calculate_bec_channel_capacities from channel_construction_bec import design_snr_to_bec_eta +from channel_construction_bec import bhattacharyya_bounds from channel_construction_bsc import tal_vardy_tpm_algorithm from helper_functions import * import matplotlib.pyplot as plt @@ -59,6 +59,12 @@ def get_bec_frozen_indices(nblock, kfrozen, eta): return positions +def get_frozen_bit_mask(frozen_indices, block_size): + frozen_mask = np.zeros(block_size, dtype=int) + frozen_mask[frozen_indices] = 1 + return frozen_mask + + def frozen_bit_positions(block_size, info_size, design_snr=0.0): if not design_snr > -1.5917: print('bad value for design_nsr, must be > -1.5917! default=0.0') @@ -110,201 +116,21 @@ def load_z_parameters(block_size, design_snr, mu): return z_params -def prepare_merger(frozen_mask): - mask = [] - for e in frozen_mask: - mask.append([e, ]) - return np.array(mask, dtype=int) - - -def merge_first_stage(init_mask): - merged_frozen_mask = [] - for e in range(0, len(init_mask), 2): - v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]] - s = init_mask[e]['size'] * 2 - if init_mask[e]['type'] == init_mask[e + 1]['type']: - t = init_mask[e]['type'] - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - else: - t = 'RPT' - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - return merged_frozen_mask - - -def merge_second_stage(init_mask): - merged_frozen_mask = [] - for e in range(0, len(init_mask), 2): - if init_mask[e]['type'] == init_mask[e + 1]['type']: - t = init_mask[e]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': - t = init_mask[e + 1]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE': - t = 'SPC' - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - else: - merged_frozen_mask.append(init_mask[e]) - merged_frozen_mask.append(init_mask[e + 1]) - return merged_frozen_mask - - -def merge_stage_n(init_mask): - merged_frozen_mask = [] - n_elems = len(init_mask) - (len(init_mask) % 2) - for e in range(0, n_elems, 2): - if init_mask[e]['size'] == init_mask[e + 1]['size']: - if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']: - t = init_mask[e]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': - t = init_mask[e + 1]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE': - t = init_mask[e]['type'] - v = init_mask[e]['value'] - v.extend(init_mask[e + 1]['value']) - s = init_mask[e]['size'] * 2 - merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) - else: - merged_frozen_mask.append(init_mask[e]) - merged_frozen_mask.append(init_mask[e + 1]) - else: - merged_frozen_mask.append(init_mask[e]) - merged_frozen_mask.append(init_mask[e + 1]) - if n_elems < len(init_mask): - merged_frozen_mask.append(init_mask[-1]) - return merged_frozen_mask - - -def print_decode_subframes(subframes): - for e in subframes: - print(e) - - -def find_decoder_subframes(frozen_mask): - stages = power_of_2_int(len(frozen_mask)) - frame_size = 2 ** stages - - lock_mask = np.zeros(frame_size, dtype=int) - sub_mask = [] - - for e in frozen_mask: - if e == 1: - sub_mask.append(0) - else: - sub_mask.append(1) - sub_mask = np.array(sub_mask, dtype=int) - - for s in range(0, stages): - stage_size = 2 ** s - mask = np.reshape(sub_mask, (-1, stage_size)) - lock = np.reshape(lock_mask, (-1, stage_size)) - for p in range(0, (frame_size // stage_size) - 1, 2): - l0 = lock[p] - l1 = lock[p + 1] - first = mask[p] - second = mask[p + 1] - print(l0, l1) - print(first, second) - if np.all(l0 == l1): - for eq in range(2): - if np.all(first == eq) and np.all(second == eq): - mask[p].fill(eq) - mask[p + 1].fill(eq) - lock[p].fill(s) - lock[p + 1].fill(s) - - if np.all(first == 0) and np.all(second == 2): - mask[p].fill(2) - mask[p + 1].fill(2) - lock[p].fill(s) - lock[p + 1].fill(s) - - if np.all(first == 3) and np.all(second == 1): - mask[p].fill(3) - mask[p + 1].fill(3) - lock[p].fill(s) - lock[p + 1].fill(s) - - if s == 0 and np.all(first == 0) and np.all(second == 1): - mask[p].fill(2) - mask[p + 1].fill(2) - lock[p].fill(s) - lock[p + 1].fill(s) - - if s == 1 and np.all(first == 2) and np.all(second == 1): - mask[p].fill(3) - mask[p + 1].fill(3) - lock[p].fill(s) - lock[p + 1].fill(s) - - sub_mask = mask.flatten() - lock_mask = lock.flatten() - - words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'} - ll = lock_mask[0] - sub_t = sub_mask[0] - for i in range(len(frozen_mask)): - v = frozen_mask[i] - t = words[sub_mask[i]] - l = lock_mask[i] - # if i % 8 == 0: - # print - if not l == ll or not sub_mask[i] == sub_t: - print('--------------------------') - ll = l - sub_t = sub_mask[i] - print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t)) - - - def main(): np.set_printoptions(precision=3, linewidth=150) print 'channel construction Bhattacharyya bounds by Arikan' n = 10 m = 2 ** n k = m // 2 - design_snr = -1.59 + design_snr = 0.0 mu = 32 z_params = load_z_parameters(m, design_snr, mu) - # plt.plot(z_params) - # plt.show() - frozen_indices = get_frozen_bit_indices_from_z_parameters(z_params, k) - - frozen_mask = np.zeros(m, dtype=int) - frozen_mask[frozen_indices] = 1 - # frozen_mask = np.reshape(frozen_mask, (-1, 32)) - # for p in frozen_mask: - # print(p) - # if np.all(p == 1): - # print("zero rate") - # elif np.all(p == 0): - # print("ONE rate") - # elif p[0] == 1 and np.all(p[1:] == 0): - # print("SPC code") - # elif np.all(p[0:-1] == 1) and p[-1] == 0: - # print("REPETITION code") - - find_decoder_subframes(frozen_mask) - - + z_bounds = bhattacharyya_bounds(design_snr, m) + print(z_params[-10:]) + plt.plot(z_params) + plt.plot(z_bounds) + plt.show() if __name__ == '__main__': diff --git a/gr-fec/python/fec/polar/channel_construction_bec.py b/gr-fec/python/fec/polar/channel_construction_bec.py index 341b290057..c57ca6517b 100644 --- a/gr-fec/python/fec/polar/channel_construction_bec.py +++ b/gr-fec/python/fec/polar/channel_construction_bec.py @@ -29,10 +29,10 @@ def bec_channel(eta): W(y|0) * W(y|1) = 0 or W(y|0) = W(y|1) transistions are 1 -> 1 or 0 -> 0 or {0, 1} -> ? (erased symbol) ''' - # looks like BSC but should be interpreted differently. - W = np.array((1 - eta, eta, 1 - eta), dtype=float) - return W + w = np.array((1 - eta, eta, 1 - eta), dtype=float) + return w + def odd_rec(iwn): return iwn ** 2 @@ -73,7 +73,7 @@ def calculate_z_parameters_one_recursion(z_params): def calculate_bec_channel_z_parameters(eta, block_size): # compare [0, Arikan] eq. 38 block_power = hf.power_of_2_int(block_size) - z_params = np.array([eta,], dtype=float) + z_params = np.array([eta, ], dtype=float) for block_size in range(block_power): z_params = calculate_z_parameters_one_recursion(z_params) return z_params @@ -110,5 +110,6 @@ def main(): print(calculate_bec_channel_z_parameters(eta, block_size)) print(calculate_bec_channel_capacities(eta, block_size)) + if __name__ == '__main__': main() diff --git a/gr-fec/python/fec/polar/channel_construction_bsc.py b/gr-fec/python/fec/polar/channel_construction_bsc.py index 69acea861d..e16813fcb7 100755 --- a/gr-fec/python/fec/polar/channel_construction_bsc.py +++ b/gr-fec/python/fec/polar/channel_construction_bsc.py @@ -53,56 +53,6 @@ def bsc_channel(p): return W -def get_Bn(n): - # this is a bit reversal matrix. - lw = int(np.log2(n)) # number of used bits - indexes = [bit_reverse(i, lw) for i in range(n)] - Bn = np.zeros((n, n), type(n)) - for i, index in enumerate(indexes): - Bn[i][index] = 1 - return Bn - - -def get_Fn(n): - # this matrix defines the actual channel combining. - if n == 1: - return np.array([1, ]) - F2 = np.array([[1, 0], [1, 1]], np.int) - nump = int(np.log2(n)) - 1 # number of Kronecker products to calculate - Fn = F2 - for i in range(nump): - Fn = np.kron(Fn, F2) - return Fn - -def get_Gn(n): - # this matrix is called generator matrix - if not is_power_of_two(n): - print "invalid input" - return None - if n == 1: - return np.array([1, ]) - Bn = get_Bn(n) - Fn = get_Fn(n) - Gn = np.dot(Bn, Fn) - return Gn - - -def mutual_information(w): - ''' - calculate mutual information I(W) - I(W) = sum over y e Y ( sum over x e X ( ... ) ) - .5 W(y|x) log frac { W(y|x) }{ .5 W(y|0) + .5 W(y|1) } - ''' - ydim, xdim = np.shape(w) - i = 0.0 - for y in range(ydim): - for x in range(xdim): - v = w[y][x] * np.log2(w[y][x] / (0.5 * w[y][0] + 0.5 * w[y][1])) - i += v - i /= 2.0 - return i - - def solver_equation(val, s): cw_lambda = codeword_lambda_callable(s) ic_lambda = instantanious_capacity_callable() @@ -315,27 +265,16 @@ def normalize_q(q, tpm): def main(): print 'channel construction BSC main' - n = 10 + n = 8 m = 2 ** n - k = m // 2 - design_snr = 0.5 - mu = 32 - + design_snr = 0.0 + mu = 16 z_params = tal_vardy_tpm_algorithm(m, design_snr, mu) print(z_params) plt.plot(z_params) plt.show() - # q = discretize_awgn(mu, design_snr) - - - # print('discretized:', np.sum(q)) - # qu = upper_convolve(q, mu) - # print('upper_convolve:', np.sum(qu)) - # q0 = lower_convolve(q, mu) - # print('lower_convolve:', np.sum(q0)) - if __name__ == '__main__': main() diff --git a/gr-fec/python/fec/polar/decoder.py b/gr-fec/python/fec/polar/decoder.py index ef7d70081f..10eef9b6ed 100644 --- a/gr-fec/python/fec/polar/decoder.py +++ b/gr-fec/python/fec/polar/decoder.py @@ -224,13 +224,9 @@ def compare_decoder_impls(): n = 8 k = 4 frozenbits = np.zeros(n - k) - # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) + # frozenbitposition16 = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) frozenbitposition = np.array((0, 1, 2, 4), dtype=int) - # bits = np.ones(k, dtype=int) bits = np.random.randint(2, size=k) - # bits = np.array([0, 1, 1, 1]) - # bits = np.array([0, 1, 1, 0]) - # bits = np.array([1, 0, 1, 0]) print 'bits:', bits encoder = PolarEncoder(n, k, frozenbitposition, frozenbits) decoder = PolarDecoder(n, k, frozenbitposition, frozenbits) @@ -243,7 +239,6 @@ def compare_decoder_impls(): print (rx_st == rx_eff).all() - def main(): power = 3 n = 2 ** power @@ -257,33 +252,18 @@ def main(): decoder = PolarDecoder(n, k, frozenbitposition, frozenbits) bits = np.ones(k, dtype=int) - # bits = np.array([1, 0, 1, 0], dtype=int) print "bits: ", bits evec = encoder.encode(bits) print "froz: ", encoder._insert_frozen_bits(bits) print "evec: ", evec - # dvec = decoder.decode(evec) - # print "dec: ", dvec - # llr = decoder._llr(4, evec, np.array([0, 0, 0])) - # print "llr=", llr evec[1] = 0 deced = decoder._lr_sc_decoder(evec) print 'SC decoded:', deced - - - - # test_reverse_enc_dec() + test_reverse_enc_dec() compare_decoder_impls() - # graph_decode() - - - - - - if __name__ == '__main__': - main()
\ No newline at end of file + main() diff --git a/gr-fec/python/fec/polar/encoder.py b/gr-fec/python/fec/polar/encoder.py index 6f87a22191..3b5eea2a94 100644 --- a/gr-fec/python/fec/polar/encoder.py +++ b/gr-fec/python/fec/polar/encoder.py @@ -20,30 +20,13 @@ import numpy as np from common import PolarCommon +import helper_functions as hf class PolarEncoder(PolarCommon): def __init__(self, n, k, frozen_bit_position, frozenbits=None): PolarCommon.__init__(self, n, k, frozen_bit_position, frozenbits) - self.G = self._gn(n) - - def _gn(self, n): - # this matrix is called generator matrix - if n == 1: - return np.array([1, ]) - f = self._fn(n) - return f - - def _fn(self, n): - # this matrix defines the actual channel combining. - if n == 1: - return np.array([1, ]) - f2 = np.array([[1, 0], [1, 1]], np.int) - nump = int(np.log2(n)) - 1 # number of Kronecker products to calculate - fn = f2 - for i in range(nump): - fn = np.kron(fn, f2) - return fn + self.G = hf.get_Fn(n) def get_gn(self): return self.G @@ -59,9 +42,9 @@ class PolarEncoder(PolarCommon): return data def _encode_efficient(self, vec): - nstages = int(np.log2(self.N)) + n_stages = int(np.log2(self.N)) pos = np.arange(self.N, dtype=int) - for i in range(nstages): + for i in range(n_stages): splitted = np.reshape(pos, (2 ** (i + 1), -1)) upper_branch = splitted[0::2].flatten() lower_branch = splitted[1::2].flatten() @@ -108,7 +91,7 @@ def test_pseudo_rate_1_encoder(encoder, ntests, k): def test_encoder_impls(): - print('comparing encoder implementations, matrix vs. efficient') + print('Compare encoder implementations, matrix vs. efficient') ntests = 1000 n = 16 k = 8 @@ -120,16 +103,12 @@ def test_encoder_impls(): print('Test rate-1 encoder/decoder chain results') r1_test = test_pseudo_rate_1_encoder(encoder, ntests, k) - print 'test rate-1 encoder/decoder:', r1_test - + print 'Test rate-1 encoder/decoder:', r1_test def main(): - print "main in encoder" test_encoder_impls() - - if __name__ == '__main__': - main()
\ No newline at end of file + main() diff --git a/gr-fec/python/fec/polar/helper_functions.py b/gr-fec/python/fec/polar/helper_functions.py index 72501beae3..ca66bf4a50 100644 --- a/gr-fec/python/fec/polar/helper_functions.py +++ b/gr-fec/python/fec/polar/helper_functions.py @@ -56,6 +56,41 @@ def bit_reverse_vector(vec, n): return np.array([bit_reverse(e, n) for e in vec], dtype=vec.dtype) +def get_Bn(n): + # this is a bit reversal matrix. + lw = power_of_2_int(n) # number of used bits + indexes = [bit_reverse(i, lw) for i in range(n)] + Bn = np.zeros((n, n), type(n)) + for i, index in enumerate(indexes): + Bn[i][index] = 1 + return Bn + + +def get_Fn(n): + # this matrix defines the actual channel combining. + if n == 1: + return np.array([1, ]) + nump = power_of_2_int(n) - 1 # number of Kronecker products to calculate + F2 = np.array([[1, 0], [1, 1]], np.int) + Fn = F2 + for i in range(nump): + Fn = np.kron(Fn, F2) + return Fn + + +def get_Gn(n): + # this matrix is called generator matrix + if not is_power_of_two(n): + print "invalid input" + return None + if n == 1: + return np.array([1, ]) + Bn = get_Bn(n) + Fn = get_Fn(n) + Gn = np.dot(Bn, Fn) + return Gn + + def unpack_byte(byte, nactive): if np.amin(byte) < 0 or np.amax(byte) > 255: return None @@ -118,74 +153,17 @@ def bhattacharyya_parameter(w): def main(): print 'helper functions' - for i in range(8): + for i in range(9): print(i, 'is power of 2: ', is_power_of_two(i)) n = 6 m = 2 ** n - k = m // 2 - eta = 0.3 + pos = np.arange(m) rev_pos = bit_reverse_vector(pos, n) print(pos) print(rev_pos) - bound = 16 - num_lanes = m // bound - - - lanes = np.zeros((num_lanes, bound), dtype=int) - for i in range(0, num_lanes): - p = i * bound - part = rev_pos[p: p + bound] - lanes[i] = part - - print('reved lanes') - print(lanes) - - # SHUFFLE! - shuffle_pos = bit_reverse_vector(np.arange(bound), 4) - for i in range(num_lanes): - lane = lanes[i] - lanes[i] = lanes[i, shuffle_pos] - print('\nshuffled lanes') - print(lanes) - - # SORT HALVES - hb = bound // 2 - for i in range(num_lanes // 2): - l0 = lanes[i] - l1 = lanes[i + (num_lanes // 2)] - l0p = copy.deepcopy(l0[hb:]) - l0[hb:] = l1[0:hb] - l1[0:hb] = l0p - lanes[i] =l0 - lanes[i + (num_lanes // 2)] = l1 - print('\nsort halves') - print(lanes) - - # 'MELT' SHUFFLE INTERLEAVE! - melt_pos = np.arange(bound, dtype=int) - melt_pos = np.reshape(melt_pos, (2, -1)).T.flatten() - for i in range(num_lanes): - lanes[i] = lanes[i, melt_pos] - print('\nmelt lanes') - print(lanes) - - - - for i in range(0, m, bound): - print("\nlook at this part") - part = pos[i: i + bound] - rev = bit_reverse_vector(part, n) - sorted_rev = np.sort(rev) - print(part) - print(rev) - print(sorted_rev) - sorted_part = rev[shuffle_pos] - print(sorted_part) - - if __name__ == '__main__': main() diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py index c35b62099c..d60c83e776 100755 --- a/gr-fec/python/fec/polar/testbed.py +++ b/gr-fec/python/fec/polar/testbed.py @@ -18,9 +18,11 @@ # Boston, MA 02110-1301, USA. # -import numpy as np + from encoder import PolarEncoder from decoder import PolarDecoder +import channel_construction as cc +from helper_functions import * import matplotlib.pyplot as plt @@ -28,7 +30,9 @@ import matplotlib.pyplot as plt def get_frozen_bit_position(): # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 16, 17, 18, 20, 24), dtype=int) # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) - frozenbitposition = np.load('frozen_bit_positions_n256_k128_p0.11.npy').flatten() + m = 256 + n_frozen = m // 2 + frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, m), n_frozen) print(frozenbitposition) return frozenbitposition @@ -140,12 +144,185 @@ def channel_analysis(): good_indices *= 2000 good_indices += 4000 - plt.plot(channel_counter) plt.plot(good_indices) plt.show() + +def merge_first_stage(init_mask): + merged_frozen_mask = [] + for e in range(0, len(init_mask), 2): + v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]] + s = init_mask[e]['size'] * 2 + if init_mask[e]['type'] == init_mask[e + 1]['type']: + t = init_mask[e]['type'] + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + else: + t = 'RPT' + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + return merged_frozen_mask + + +def merge_second_stage(init_mask): + merged_frozen_mask = [] + for e in range(0, len(init_mask), 2): + if init_mask[e]['type'] == init_mask[e + 1]['type']: + t = init_mask[e]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': + t = init_mask[e + 1]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE': + t = 'SPC' + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + else: + merged_frozen_mask.append(init_mask[e]) + merged_frozen_mask.append(init_mask[e + 1]) + return merged_frozen_mask + + +def merge_stage_n(init_mask): + merged_frozen_mask = [] + n_elems = len(init_mask) - (len(init_mask) % 2) + for e in range(0, n_elems, 2): + if init_mask[e]['size'] == init_mask[e + 1]['size']: + if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']: + t = init_mask[e]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': + t = init_mask[e + 1]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE': + t = init_mask[e]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + else: + merged_frozen_mask.append(init_mask[e]) + merged_frozen_mask.append(init_mask[e + 1]) + else: + merged_frozen_mask.append(init_mask[e]) + merged_frozen_mask.append(init_mask[e + 1]) + if n_elems < len(init_mask): + merged_frozen_mask.append(init_mask[-1]) + return merged_frozen_mask + + +def print_decode_subframes(subframes): + for e in subframes: + print(e) + + +def find_decoder_subframes(frozen_mask): + stages = power_of_2_int(len(frozen_mask)) + block_size = 2 ** stages + + lock_mask = np.zeros(block_size, dtype=int) + sub_mask = [] + + for e in frozen_mask: + if e == 1: + sub_mask.append(0) + else: + sub_mask.append(1) + sub_mask = np.array(sub_mask, dtype=int) + + for s in range(0, stages): + stage_size = 2 ** s + mask = np.reshape(sub_mask, (-1, stage_size)) + lock = np.reshape(lock_mask, (-1, stage_size)) + for p in range(0, (block_size // stage_size) - 1, 2): + l0 = lock[p] + l1 = lock[p + 1] + first = mask[p] + second = mask[p + 1] + print(l0, l1) + print(first, second) + if np.all(l0 == l1): + for eq in range(2): + if np.all(first == eq) and np.all(second == eq): + mask[p].fill(eq) + mask[p + 1].fill(eq) + lock[p].fill(s) + lock[p + 1].fill(s) + + if np.all(first == 0) and np.all(second == 2): + mask[p].fill(2) + mask[p + 1].fill(2) + lock[p].fill(s) + lock[p + 1].fill(s) + + if np.all(first == 3) and np.all(second == 1): + mask[p].fill(3) + mask[p + 1].fill(3) + lock[p].fill(s) + lock[p + 1].fill(s) + + if s == 0 and np.all(first == 0) and np.all(second == 1): + mask[p].fill(2) + mask[p + 1].fill(2) + lock[p].fill(s) + lock[p + 1].fill(s) + + if s == 1 and np.all(first == 2) and np.all(second == 1): + mask[p].fill(3) + mask[p + 1].fill(3) + lock[p].fill(s) + lock[p + 1].fill(s) + + sub_mask = mask.flatten() + lock_mask = lock.flatten() + + words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'} + ll = lock_mask[0] + sub_t = sub_mask[0] + for i in range(len(frozen_mask)): + v = frozen_mask[i] + t = words[sub_mask[i]] + l = lock_mask[i] + # if i % 8 == 0: + # print + if not l == ll or not sub_mask[i] == sub_t: + print('--------------------------') + ll = l + sub_t = sub_mask[i] + print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t)) + + +def load_file(filename): + z_params = [] + with open(filename, 'r') as f: + for line in f: + if 'Bhattacharyya:' in line: + l = line.split(' ') + l = l[10:-2] + l = l[0][:-1] + l = float(l) + z_params.append(l) + return np.array(z_params) + + def main(): + n = 8 + m = 2 ** n + k = m // 2 + n_frozen = n - k # n = 16 # k = 8 # frozenbits = np.zeros(n - k) @@ -153,12 +330,31 @@ def main(): # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) # print frozenbitposition - test_enc_dec_chain() - + # test_enc_dec_chain() # test_1024_rate_1_code() - # channel_analysis() + frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11) + frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m) + find_decoder_subframes(frozen_mask) + + frozen_mask = np.zeros(m, dtype=int) + frozen_mask[frozen_indices] = 1 + + # filename = 'channel_z-parameters.txt' + # ido = load_file(filename) + # ido_frozen = cc.get_frozen_bit_indices_from_z_parameters(ido, k) + # ido_mask = np.zeros(m, dtype=int) + # ido_mask[ido_frozen] = 1 + # + # + # plt.plot(ido_mask) + # plt.plot(frozen_mask) + # for i in range(m): + # if not ido_mask[i] == frozen_mask[i]: + # plt.axvline(i, color='r') + # plt.show() + if __name__ == '__main__': main()
\ No newline at end of file diff --git a/gr-fec/python/fec/qa_polar_decoder_sc.py b/gr-fec/python/fec/qa_polar_decoder_sc.py index 1e7cd25e26..030142d6a6 100644 --- a/gr-fec/python/fec/qa_polar_decoder_sc.py +++ b/gr-fec/python/fec/qa_polar_decoder_sc.py @@ -19,24 +19,20 @@ # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # -from Crypto.Cipher._AES import block_size from gnuradio import gr, gr_unittest, blocks import fec_swig as fec -from _qa_helper import _qa_helper -import numpy as np -import os -from extended_encoder import extended_encoder +import numpy as np from extended_decoder import extended_decoder from polar.encoder import PolarEncoder -from polar.decoder import PolarDecoder import polar.channel_construction as cc -from polar.helper_functions import bit_reverse_vector +# import os # print('PID:', os.getpid()) # raw_input('tell me smth') + class test_polar_decoder_sc(gr_unittest.TestCase): def setUp(self): @@ -46,13 +42,12 @@ class test_polar_decoder_sc(gr_unittest.TestCase): self.tb = None def test_001_setup(self): - is_packed = False block_size = 16 num_info_bits = 8 frozen_bit_positions = np.arange(block_size - num_info_bits) frozen_bit_values = np.array([],) - polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) self.assertEqual(num_info_bits, polar_decoder.get_output_size()) self.assertEqual(block_size, polar_decoder.get_input_size()) @@ -60,8 +55,6 @@ class test_polar_decoder_sc(gr_unittest.TestCase): self.assertFalse(polar_decoder.set_frame_size(10)) def test_002_one_vector(self): - print "test_002_one_vector" - is_packed = False block_power = 10 block_size = 2 ** block_power num_info_bits = 2 ** (block_power - 1) @@ -71,7 +64,7 @@ class test_polar_decoder_sc(gr_unittest.TestCase): bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, 1, True) - polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) src = blocks.vector_source_f(gr_data, False) dec_block = extended_decoder(polar_decoder, None) snk = blocks.vector_sink_b(1) @@ -81,17 +74,10 @@ class test_polar_decoder_sc(gr_unittest.TestCase): self.tb.run() res = np.array(snk.data()).astype(dtype=int) - - print("input:", gr_data.astype(dtype=int)) - print("ref :", bits) - print("res :", res) - self.assertTupleEqual(tuple(res), tuple(bits)) def test_003_stream(self): - print "test_003_stream" nframes = 3 - is_packed = False block_power = 8 block_size = 2 ** block_power num_info_bits = 2 ** (block_power - 1) @@ -101,7 +87,7 @@ class test_polar_decoder_sc(gr_unittest.TestCase): bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, False) - polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) src = blocks.vector_source_f(gr_data, False) dec_block = extended_decoder(polar_decoder, None) snk = blocks.vector_sink_b(1) @@ -111,11 +97,6 @@ class test_polar_decoder_sc(gr_unittest.TestCase): self.tb.run() res = np.array(snk.data()).astype(dtype=int) - - print("input:", gr_data.astype(dtype=int)) - print("ref :", bits) - print("res :", res) - self.assertTupleEqual(tuple(res), tuple(bits)) def generate_test_data(self, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, onlyones): diff --git a/gr-fec/python/fec/qa_polar_decoder_sc_list.py b/gr-fec/python/fec/qa_polar_decoder_sc_list.py index 3aefd0f478..6b1fe3d431 100644 --- a/gr-fec/python/fec/qa_polar_decoder_sc_list.py +++ b/gr-fec/python/fec/qa_polar_decoder_sc_list.py @@ -23,15 +23,12 @@ from gnuradio import gr, gr_unittest, blocks import fec_swig as fec import numpy as np -import os -from extended_encoder import extended_encoder from extended_decoder import extended_decoder from polar.encoder import PolarEncoder -from polar.decoder import PolarDecoder import polar.channel_construction as cc - +# import os # print('PID:', os.getpid()) # raw_input('tell me smth') @@ -45,14 +42,13 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): self.tb = None def test_001_setup(self): - is_packed = False block_size = 16 num_info_bits = 8 max_list_size = 4 frozen_bit_positions = np.arange(block_size - num_info_bits) frozen_bit_values = np.array([],) - polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) self.assertEqual(num_info_bits, polar_decoder.get_output_size()) self.assertEqual(block_size, polar_decoder.get_input_size()) @@ -61,7 +57,6 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): def test_002_one_vector(self): print "test_002_one_vector" - is_packed = False expo = 6 block_size = 2 ** expo num_info_bits = 2 ** (expo - 1) @@ -69,19 +64,13 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): num_frozen_bits = block_size - num_info_bits frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) frozen_bit_values = np.array([0] * num_frozen_bits,) - print(frozen_bit_positions) - - python_decoder = PolarDecoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - # data = np.ones(block_size, dtype=int) bits = np.random.randint(2, size=num_info_bits) - # bits = np.ones(num_info_bits, dtype=int) encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) data = encoder.encode(bits) - # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int) gr_data = 2.0 * data - 1.0 - polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) src = blocks.vector_source_f(gr_data, False) dec_block = extended_decoder(polar_decoder, None) snk = blocks.vector_sink_b(1) @@ -92,19 +81,16 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): res = np.array(snk.data()).astype(dtype=int) - ref = python_decoder.decode(data) + print("\ninput -> result -> bits") + print(data) + print(res) + print(bits) - print("input:", data) - print("res :", res) - print("ref :", ref) - print("bits :", bits) - - self.assertTupleEqual(tuple(res), tuple(ref)) + self.assertTupleEqual(tuple(res), tuple(bits)) def test_003_stream(self): print "test_003_stream" nframes = 5 - is_packed = False expo = 8 block_size = 2 ** expo num_info_bits = 2 ** (expo - 1) @@ -112,11 +98,9 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): num_frozen_bits = block_size - num_info_bits frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) frozen_bit_values = np.array([0] * num_frozen_bits,) - print(frozen_bit_positions) encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - # data = np.ones(block_size, dtype=int) ref = np.array([], dtype=int) data = np.array([], dtype=int) for i in range(nframes): @@ -124,13 +108,9 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): d = encoder.encode(b) data = np.append(data, d) ref = np.append(ref, b) - - # bits = np.ones(num_info_bits, dtype=int) - # data = encoder.encode(bits) - # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int) gr_data = 2.0 * data - 1.0 - polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) src = blocks.vector_source_f(gr_data, False) dec_block = extended_decoder(polar_decoder, None) snk = blocks.vector_sink_b(1) @@ -140,17 +120,9 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase): self.tb.run() res = np.array(snk.data()).astype(dtype=int) - - - print("input:", data) - print("res :", res) - print("ref :", ref) - self.assertTupleEqual(tuple(res), tuple(ref)) - - if __name__ == '__main__': gr_unittest.run(test_polar_decoder_sc_list) diff --git a/gr-fec/python/fec/qa_polar_encoder.py b/gr-fec/python/fec/qa_polar_encoder.py index 90190cd719..22d9b11fae 100644 --- a/gr-fec/python/fec/qa_polar_encoder.py +++ b/gr-fec/python/fec/qa_polar_encoder.py @@ -101,12 +101,10 @@ class test_polar_encoder(gr_unittest.TestCase): self.tb.run() res = np.array(snk.data()).astype(dtype=int) - - print(res) - print(ref) self.assertTupleEqual(tuple(res), tuple(ref)) def get_test_data(self, block_size, num_info_bits, num_blocks, is_packed): + # helper function to set up test data and together with encoder object. num_frozen_bits = block_size - num_info_bits frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) frozen_bit_values = np.array([0] * num_frozen_bits,) @@ -122,10 +120,6 @@ class test_polar_encoder(gr_unittest.TestCase): return data, ref, polar_encoder - - - - if __name__ == '__main__': gr_unittest.run(test_polar_encoder) |