path: root/gr-fec/python/fec/polar/
diff options
Diffstat (limited to 'gr-fec/python/fec/polar/')
1 files changed, 360 insertions, 0 deletions
diff --git a/gr-fec/python/fec/polar/ b/gr-fec/python/fec/polar/
new file mode 100755
index 0000000000..d60c83e776
--- /dev/null
+++ b/gr-fec/python/fec/polar/
@@ -0,0 +1,360 @@
+#!/usr/bin/env python
+# Copyright 2015 Free Software Foundation, Inc.
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+from encoder import PolarEncoder
+from decoder import PolarDecoder
+import channel_construction as cc
+from helper_functions import *
+import matplotlib.pyplot as plt
+def get_frozen_bit_position():
+ # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 16, 17, 18, 20, 24), dtype=int)
+ # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
+ m = 256
+ n_frozen = m // 2
+ frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, m), n_frozen)
+ print(frozenbitposition)
+ return frozenbitposition
+def test_enc_dec_chain():
+ ntests = 100
+ n = 256
+ k = n // 2
+ frozenbits = np.zeros(n - k)
+ frozenbitposition = get_frozen_bit_position()
+ for i in range(ntests):
+ bits = np.random.randint(2, size=k)
+ encoder = PolarEncoder(n, k, frozenbitposition, frozenbits)
+ decoder = PolarDecoder(n, k, frozenbitposition, frozenbits)
+ encoded = encoder.encode(bits)
+ rx = decoder.decode(encoded)
+ if not is_equal(bits, rx):
+ raise ValueError('Test #', i, 'failed, input and output differ', bits, '!=', rx)
+ return
+def is_equal(first, second):
+ if not (first == second).all():
+ result = first == second
+ for i in range(len(result)):
+ print '{0:4}: {1:2} == {2:1} = {3}'.format(i, first[i], second[i], result[i])
+ return False
+ return True
+def exact_value(la, lb):
+ return np.log((np.exp(la + lb) + 1) / (np.exp(la) + np.exp(lb)))
+def approx_value(la, lb):
+ return np.sign(la) * np.sign(lb) * np.minimum(np.abs(la), np.abs(lb))
+def path_metric_exact(last_pm, llr, ui):
+ return last_pm + np.log(1 + np.exp(-1. * llr * (1 - 2 * ui)))
+def path_metric_approx(last_pm, llr, ui):
+ if ui == int(.5 * (1 - np.sign(llr))):
+ return last_pm
+ return last_pm + np.abs(llr)
+def calculate_path_metric_vector(metric, llrs, us):
+ res = np.zeros(llrs.size)
+ res[0] = metric(0, llrs[0], us[0])
+ for i in range(1, llrs.size):
+ res[i] = metric(res[i - 1], llrs[i], us[i])
+ return res
+def test_1024_rate_1_code():
+ # effectively a Monte-Carlo simulation for channel polarization.
+ ntests = 10000
+ n = 256
+ k = n
+ transition_prob = 0.11
+ num_transitions = int(k * transition_prob)
+ frozenbits = np.zeros(n - k)
+ frozenbitposition = np.array((), dtype=int)
+ encoder = PolarEncoder(n, k, frozenbitposition, frozenbits)
+ decoder = PolarDecoder(n, k, frozenbitposition, frozenbits)
+ channel_counter = np.zeros(k)
+ possible_indices = np.arange(n, dtype=int)
+ for i in range(ntests):
+ bits = np.random.randint(2, size=k)
+ tx = encoder.encode(bits)
+ np.random.shuffle(possible_indices)
+ tx[possible_indices[0:num_transitions]] = (tx[possible_indices[0:num_transitions]] + 1) % 2
+ rx = tx
+ recv = decoder.decode(rx)
+ channel_counter += (bits == recv)
+ print channel_counter
+ print(np.min(channel_counter), np.max(channel_counter))
+'channel_counter_' + str(ntests) + '.npy', channel_counter)
+def find_good_indices(res, nindices):
+ channel_counter = np.copy(res)
+ good_indices = np.zeros(channel_counter.size)
+ for i in range(nindices):
+ idx = np.argmax(channel_counter)
+ good_indices[idx] = 1
+ channel_counter[idx] = 0
+ return good_indices
+def channel_analysis():
+ ntests = 10000
+ filename = 'channel_counter_' + str(ntests) + '.npy'
+ channel_counter = np.load(filename)
+ print(np.min(channel_counter), np.max(channel_counter))
+ channel_counter[0] = np.min(channel_counter)
+ good_indices = find_good_indices(channel_counter, channel_counter.size // 2)
+ info_bit_positions = np.where(good_indices > 0)
+ print(info_bit_positions)
+ frozen_bit_positions = np.delete(np.arange(channel_counter.size), info_bit_positions)
+ print(frozen_bit_positions)
+'frozen_bit_positions_n256_k128_p0.11.npy', frozen_bit_positions)
+ good_indices *= 2000
+ good_indices += 4000
+ plt.plot(channel_counter)
+ plt.plot(good_indices)
+def merge_first_stage(init_mask):
+ merged_frozen_mask = []
+ for e in range(0, len(init_mask), 2):
+ v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]]
+ s = init_mask[e]['size'] * 2
+ if init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ t = 'RPT'
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ return merged_frozen_mask
+def merge_second_stage(init_mask):
+ merged_frozen_mask = []
+ for e in range(0, len(init_mask), 2):
+ if init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
+ t = init_mask[e + 1]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE':
+ t = 'SPC'
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ return merged_frozen_mask
+def merge_stage_n(init_mask):
+ merged_frozen_mask = []
+ n_elems = len(init_mask) - (len(init_mask) % 2)
+ for e in range(0, n_elems, 2):
+ if init_mask[e]['size'] == init_mask[e + 1]['size']:
+ if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
+ t = init_mask[e + 1]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE':
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ if n_elems < len(init_mask):
+ merged_frozen_mask.append(init_mask[-1])
+ return merged_frozen_mask
+def print_decode_subframes(subframes):
+ for e in subframes:
+ print(e)
+def find_decoder_subframes(frozen_mask):
+ stages = power_of_2_int(len(frozen_mask))
+ block_size = 2 ** stages
+ lock_mask = np.zeros(block_size, dtype=int)
+ sub_mask = []
+ for e in frozen_mask:
+ if e == 1:
+ sub_mask.append(0)
+ else:
+ sub_mask.append(1)
+ sub_mask = np.array(sub_mask, dtype=int)
+ for s in range(0, stages):
+ stage_size = 2 ** s
+ mask = np.reshape(sub_mask, (-1, stage_size))
+ lock = np.reshape(lock_mask, (-1, stage_size))
+ for p in range(0, (block_size // stage_size) - 1, 2):
+ l0 = lock[p]
+ l1 = lock[p + 1]
+ first = mask[p]
+ second = mask[p + 1]
+ print(l0, l1)
+ print(first, second)
+ if np.all(l0 == l1):
+ for eq in range(2):
+ if np.all(first == eq) and np.all(second == eq):
+ mask[p].fill(eq)
+ mask[p + 1].fill(eq)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+ if np.all(first == 0) and np.all(second == 2):
+ mask[p].fill(2)
+ mask[p + 1].fill(2)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+ if np.all(first == 3) and np.all(second == 1):
+ mask[p].fill(3)
+ mask[p + 1].fill(3)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+ if s == 0 and np.all(first == 0) and np.all(second == 1):
+ mask[p].fill(2)
+ mask[p + 1].fill(2)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+ if s == 1 and np.all(first == 2) and np.all(second == 1):
+ mask[p].fill(3)
+ mask[p + 1].fill(3)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+ sub_mask = mask.flatten()
+ lock_mask = lock.flatten()
+ words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'}
+ ll = lock_mask[0]
+ sub_t = sub_mask[0]
+ for i in range(len(frozen_mask)):
+ v = frozen_mask[i]
+ t = words[sub_mask[i]]
+ l = lock_mask[i]
+ # if i % 8 == 0:
+ # print
+ if not l == ll or not sub_mask[i] == sub_t:
+ print('--------------------------')
+ ll = l
+ sub_t = sub_mask[i]
+ print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t))
+def load_file(filename):
+ z_params = []
+ with open(filename, 'r') as f:
+ for line in f:
+ if 'Bhattacharyya:' in line:
+ l = line.split(' ')
+ l = l[10:-2]
+ l = l[0][:-1]
+ l = float(l)
+ z_params.append(l)
+ return np.array(z_params)
+def main():
+ n = 8
+ m = 2 ** n
+ k = m // 2
+ n_frozen = n - k
+ # n = 16
+ # k = 8
+ # frozenbits = np.zeros(n - k)
+ # frozenbitposition8 = np.array((0, 1, 2, 4), dtype=int)
+ # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
+ # print frozenbitposition
+ # test_enc_dec_chain()
+ # test_1024_rate_1_code()
+ # channel_analysis()
+ frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11)
+ frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m)
+ find_decoder_subframes(frozen_mask)
+ frozen_mask = np.zeros(m, dtype=int)
+ frozen_mask[frozen_indices] = 1
+ # filename = 'channel_z-parameters.txt'
+ # ido = load_file(filename)
+ # ido_frozen = cc.get_frozen_bit_indices_from_z_parameters(ido, k)
+ # ido_mask = np.zeros(m, dtype=int)
+ # ido_mask[ido_frozen] = 1
+ #
+ #
+ # plt.plot(ido_mask)
+ # plt.plot(frozen_mask)
+ # for i in range(m):
+ # if not ido_mask[i] == frozen_mask[i]:
+ # plt.axvline(i, color='r')
+ #
+if __name__ == '__main__':
+ main() \ No newline at end of file