summaryrefslogtreecommitdiff
path: root/gr-fec/python
diff options
context:
space:
mode:
authorJohannes Demel <ufcsy@student.kit.edu>2015-08-12 16:51:02 +0200
committerJohannes Demel <ufcsy@student.kit.edu>2015-09-21 10:46:59 +0200
commitd9719d7da8300c8546b305dab2eff763f47d216f (patch)
treeca8062d252e6e9f2e389fa85f045a92b5218bdee /gr-fec/python
parent73d84a231c31bf8312214b5e3a0a97e15c8db98f (diff)
polar: refarctoring and clean-up
Diffstat (limited to 'gr-fec/python')
-rw-r--r--gr-fec/python/fec/polar/README.md7
-rw-r--r--gr-fec/python/fec/polar/channel_construction.py200
-rw-r--r--gr-fec/python/fec/polar/channel_construction_bec.py9
-rwxr-xr-xgr-fec/python/fec/polar/channel_construction_bsc.py67
-rw-r--r--gr-fec/python/fec/polar/decoder.py26
-rw-r--r--gr-fec/python/fec/polar/encoder.py35
-rw-r--r--gr-fec/python/fec/polar/helper_functions.py96
-rwxr-xr-xgr-fec/python/fec/polar/testbed.py208
-rw-r--r--gr-fec/python/fec/qa_polar_decoder_sc.py31
-rw-r--r--gr-fec/python/fec/qa_polar_decoder_sc_list.py46
-rw-r--r--gr-fec/python/fec/qa_polar_encoder.py8
11 files changed, 292 insertions, 441 deletions
diff --git a/gr-fec/python/fec/polar/README.md b/gr-fec/python/fec/polar/README.md
index 2bd00dc3de..d425e8650d 100644
--- a/gr-fec/python/fec/polar/README.md
+++ b/gr-fec/python/fec/polar/README.md
@@ -1,4 +1,9 @@
POLAR Code Python test functions module
===========
-This folder contains all the necessary files for POLAR code testcode. It shall serve as a reference later on. \ No newline at end of file
+This directory contains all the necessary files for POLAR code testcode.
+It serves as a reference for C++ implementations.
+
+'polar_channel_construction' exposes functionality to calculate polar channels for different sizes.
+It may be used to calculate Bhattacharyya parameters once and store them in a file in '~/.gnuradio/polar'.
+Frozen bit positions are recalculated on every run. \ No newline at end of file
diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py
index 9c38d3a7e6..bf3ff925d8 100644
--- a/gr-fec/python/fec/polar/channel_construction.py
+++ b/gr-fec/python/fec/polar/channel_construction.py
@@ -24,9 +24,9 @@ foundational paper for polar codes.
'''
-import numpy as np
from channel_construction_bec import calculate_bec_channel_capacities
from channel_construction_bec import design_snr_to_bec_eta
+from channel_construction_bec import bhattacharyya_bounds
from channel_construction_bsc import tal_vardy_tpm_algorithm
from helper_functions import *
import matplotlib.pyplot as plt
@@ -59,6 +59,12 @@ def get_bec_frozen_indices(nblock, kfrozen, eta):
return positions
+def get_frozen_bit_mask(frozen_indices, block_size):
+ frozen_mask = np.zeros(block_size, dtype=int)
+ frozen_mask[frozen_indices] = 1
+ return frozen_mask
+
+
def frozen_bit_positions(block_size, info_size, design_snr=0.0):
if not design_snr > -1.5917:
print('bad value for design_nsr, must be > -1.5917! default=0.0')
@@ -110,201 +116,21 @@ def load_z_parameters(block_size, design_snr, mu):
return z_params
-def prepare_merger(frozen_mask):
- mask = []
- for e in frozen_mask:
- mask.append([e, ])
- return np.array(mask, dtype=int)
-
-
-def merge_first_stage(init_mask):
- merged_frozen_mask = []
- for e in range(0, len(init_mask), 2):
- v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]]
- s = init_mask[e]['size'] * 2
- if init_mask[e]['type'] == init_mask[e + 1]['type']:
- t = init_mask[e]['type']
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- else:
- t = 'RPT'
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- return merged_frozen_mask
-
-
-def merge_second_stage(init_mask):
- merged_frozen_mask = []
- for e in range(0, len(init_mask), 2):
- if init_mask[e]['type'] == init_mask[e + 1]['type']:
- t = init_mask[e]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
- t = init_mask[e + 1]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE':
- t = 'SPC'
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- else:
- merged_frozen_mask.append(init_mask[e])
- merged_frozen_mask.append(init_mask[e + 1])
- return merged_frozen_mask
-
-
-def merge_stage_n(init_mask):
- merged_frozen_mask = []
- n_elems = len(init_mask) - (len(init_mask) % 2)
- for e in range(0, n_elems, 2):
- if init_mask[e]['size'] == init_mask[e + 1]['size']:
- if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']:
- t = init_mask[e]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
- t = init_mask[e + 1]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE':
- t = init_mask[e]['type']
- v = init_mask[e]['value']
- v.extend(init_mask[e + 1]['value'])
- s = init_mask[e]['size'] * 2
- merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
- else:
- merged_frozen_mask.append(init_mask[e])
- merged_frozen_mask.append(init_mask[e + 1])
- else:
- merged_frozen_mask.append(init_mask[e])
- merged_frozen_mask.append(init_mask[e + 1])
- if n_elems < len(init_mask):
- merged_frozen_mask.append(init_mask[-1])
- return merged_frozen_mask
-
-
-def print_decode_subframes(subframes):
- for e in subframes:
- print(e)
-
-
-def find_decoder_subframes(frozen_mask):
- stages = power_of_2_int(len(frozen_mask))
- frame_size = 2 ** stages
-
- lock_mask = np.zeros(frame_size, dtype=int)
- sub_mask = []
-
- for e in frozen_mask:
- if e == 1:
- sub_mask.append(0)
- else:
- sub_mask.append(1)
- sub_mask = np.array(sub_mask, dtype=int)
-
- for s in range(0, stages):
- stage_size = 2 ** s
- mask = np.reshape(sub_mask, (-1, stage_size))
- lock = np.reshape(lock_mask, (-1, stage_size))
- for p in range(0, (frame_size // stage_size) - 1, 2):
- l0 = lock[p]
- l1 = lock[p + 1]
- first = mask[p]
- second = mask[p + 1]
- print(l0, l1)
- print(first, second)
- if np.all(l0 == l1):
- for eq in range(2):
- if np.all(first == eq) and np.all(second == eq):
- mask[p].fill(eq)
- mask[p + 1].fill(eq)
- lock[p].fill(s)
- lock[p + 1].fill(s)
-
- if np.all(first == 0) and np.all(second == 2):
- mask[p].fill(2)
- mask[p + 1].fill(2)
- lock[p].fill(s)
- lock[p + 1].fill(s)
-
- if np.all(first == 3) and np.all(second == 1):
- mask[p].fill(3)
- mask[p + 1].fill(3)
- lock[p].fill(s)
- lock[p + 1].fill(s)
-
- if s == 0 and np.all(first == 0) and np.all(second == 1):
- mask[p].fill(2)
- mask[p + 1].fill(2)
- lock[p].fill(s)
- lock[p + 1].fill(s)
-
- if s == 1 and np.all(first == 2) and np.all(second == 1):
- mask[p].fill(3)
- mask[p + 1].fill(3)
- lock[p].fill(s)
- lock[p + 1].fill(s)
-
- sub_mask = mask.flatten()
- lock_mask = lock.flatten()
-
- words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'}
- ll = lock_mask[0]
- sub_t = sub_mask[0]
- for i in range(len(frozen_mask)):
- v = frozen_mask[i]
- t = words[sub_mask[i]]
- l = lock_mask[i]
- # if i % 8 == 0:
- # print
- if not l == ll or not sub_mask[i] == sub_t:
- print('--------------------------')
- ll = l
- sub_t = sub_mask[i]
- print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t))
-
-
-
def main():
np.set_printoptions(precision=3, linewidth=150)
print 'channel construction Bhattacharyya bounds by Arikan'
n = 10
m = 2 ** n
k = m // 2
- design_snr = -1.59
+ design_snr = 0.0
mu = 32
z_params = load_z_parameters(m, design_snr, mu)
- # plt.plot(z_params)
- # plt.show()
- frozen_indices = get_frozen_bit_indices_from_z_parameters(z_params, k)
-
- frozen_mask = np.zeros(m, dtype=int)
- frozen_mask[frozen_indices] = 1
- # frozen_mask = np.reshape(frozen_mask, (-1, 32))
- # for p in frozen_mask:
- # print(p)
- # if np.all(p == 1):
- # print("zero rate")
- # elif np.all(p == 0):
- # print("ONE rate")
- # elif p[0] == 1 and np.all(p[1:] == 0):
- # print("SPC code")
- # elif np.all(p[0:-1] == 1) and p[-1] == 0:
- # print("REPETITION code")
-
- find_decoder_subframes(frozen_mask)
-
-
+ z_bounds = bhattacharyya_bounds(design_snr, m)
+ print(z_params[-10:])
+ plt.plot(z_params)
+ plt.plot(z_bounds)
+ plt.show()
if __name__ == '__main__':
diff --git a/gr-fec/python/fec/polar/channel_construction_bec.py b/gr-fec/python/fec/polar/channel_construction_bec.py
index 341b290057..c57ca6517b 100644
--- a/gr-fec/python/fec/polar/channel_construction_bec.py
+++ b/gr-fec/python/fec/polar/channel_construction_bec.py
@@ -29,10 +29,10 @@ def bec_channel(eta):
W(y|0) * W(y|1) = 0 or W(y|0) = W(y|1)
transistions are 1 -> 1 or 0 -> 0 or {0, 1} -> ? (erased symbol)
'''
-
# looks like BSC but should be interpreted differently.
- W = np.array((1 - eta, eta, 1 - eta), dtype=float)
- return W
+ w = np.array((1 - eta, eta, 1 - eta), dtype=float)
+ return w
+
def odd_rec(iwn):
return iwn ** 2
@@ -73,7 +73,7 @@ def calculate_z_parameters_one_recursion(z_params):
def calculate_bec_channel_z_parameters(eta, block_size):
# compare [0, Arikan] eq. 38
block_power = hf.power_of_2_int(block_size)
- z_params = np.array([eta,], dtype=float)
+ z_params = np.array([eta, ], dtype=float)
for block_size in range(block_power):
z_params = calculate_z_parameters_one_recursion(z_params)
return z_params
@@ -110,5 +110,6 @@ def main():
print(calculate_bec_channel_z_parameters(eta, block_size))
print(calculate_bec_channel_capacities(eta, block_size))
+
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/channel_construction_bsc.py b/gr-fec/python/fec/polar/channel_construction_bsc.py
index 69acea861d..e16813fcb7 100755
--- a/gr-fec/python/fec/polar/channel_construction_bsc.py
+++ b/gr-fec/python/fec/polar/channel_construction_bsc.py
@@ -53,56 +53,6 @@ def bsc_channel(p):
return W
-def get_Bn(n):
- # this is a bit reversal matrix.
- lw = int(np.log2(n)) # number of used bits
- indexes = [bit_reverse(i, lw) for i in range(n)]
- Bn = np.zeros((n, n), type(n))
- for i, index in enumerate(indexes):
- Bn[i][index] = 1
- return Bn
-
-
-def get_Fn(n):
- # this matrix defines the actual channel combining.
- if n == 1:
- return np.array([1, ])
- F2 = np.array([[1, 0], [1, 1]], np.int)
- nump = int(np.log2(n)) - 1 # number of Kronecker products to calculate
- Fn = F2
- for i in range(nump):
- Fn = np.kron(Fn, F2)
- return Fn
-
-def get_Gn(n):
- # this matrix is called generator matrix
- if not is_power_of_two(n):
- print "invalid input"
- return None
- if n == 1:
- return np.array([1, ])
- Bn = get_Bn(n)
- Fn = get_Fn(n)
- Gn = np.dot(Bn, Fn)
- return Gn
-
-
-def mutual_information(w):
- '''
- calculate mutual information I(W)
- I(W) = sum over y e Y ( sum over x e X ( ... ) )
- .5 W(y|x) log frac { W(y|x) }{ .5 W(y|0) + .5 W(y|1) }
- '''
- ydim, xdim = np.shape(w)
- i = 0.0
- for y in range(ydim):
- for x in range(xdim):
- v = w[y][x] * np.log2(w[y][x] / (0.5 * w[y][0] + 0.5 * w[y][1]))
- i += v
- i /= 2.0
- return i
-
-
def solver_equation(val, s):
cw_lambda = codeword_lambda_callable(s)
ic_lambda = instantanious_capacity_callable()
@@ -315,27 +265,16 @@ def normalize_q(q, tpm):
def main():
print 'channel construction BSC main'
- n = 10
+ n = 8
m = 2 ** n
- k = m // 2
- design_snr = 0.5
- mu = 32
-
+ design_snr = 0.0
+ mu = 16
z_params = tal_vardy_tpm_algorithm(m, design_snr, mu)
print(z_params)
plt.plot(z_params)
plt.show()
- # q = discretize_awgn(mu, design_snr)
-
-
- # print('discretized:', np.sum(q))
- # qu = upper_convolve(q, mu)
- # print('upper_convolve:', np.sum(qu))
- # q0 = lower_convolve(q, mu)
- # print('lower_convolve:', np.sum(q0))
-
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/decoder.py b/gr-fec/python/fec/polar/decoder.py
index ef7d70081f..10eef9b6ed 100644
--- a/gr-fec/python/fec/polar/decoder.py
+++ b/gr-fec/python/fec/polar/decoder.py
@@ -224,13 +224,9 @@ def compare_decoder_impls():
n = 8
k = 4
frozenbits = np.zeros(n - k)
- # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
+ # frozenbitposition16 = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
frozenbitposition = np.array((0, 1, 2, 4), dtype=int)
- # bits = np.ones(k, dtype=int)
bits = np.random.randint(2, size=k)
- # bits = np.array([0, 1, 1, 1])
- # bits = np.array([0, 1, 1, 0])
- # bits = np.array([1, 0, 1, 0])
print 'bits:', bits
encoder = PolarEncoder(n, k, frozenbitposition, frozenbits)
decoder = PolarDecoder(n, k, frozenbitposition, frozenbits)
@@ -243,7 +239,6 @@ def compare_decoder_impls():
print (rx_st == rx_eff).all()
-
def main():
power = 3
n = 2 ** power
@@ -257,33 +252,18 @@ def main():
decoder = PolarDecoder(n, k, frozenbitposition, frozenbits)
bits = np.ones(k, dtype=int)
- # bits = np.array([1, 0, 1, 0], dtype=int)
print "bits: ", bits
evec = encoder.encode(bits)
print "froz: ", encoder._insert_frozen_bits(bits)
print "evec: ", evec
- # dvec = decoder.decode(evec)
- # print "dec: ", dvec
- # llr = decoder._llr(4, evec, np.array([0, 0, 0]))
- # print "llr=", llr
evec[1] = 0
deced = decoder._lr_sc_decoder(evec)
print 'SC decoded:', deced
-
-
-
- # test_reverse_enc_dec()
+ test_reverse_enc_dec()
compare_decoder_impls()
- # graph_decode()
-
-
-
-
-
-
if __name__ == '__main__':
- main() \ No newline at end of file
+ main()
diff --git a/gr-fec/python/fec/polar/encoder.py b/gr-fec/python/fec/polar/encoder.py
index 6f87a22191..3b5eea2a94 100644
--- a/gr-fec/python/fec/polar/encoder.py
+++ b/gr-fec/python/fec/polar/encoder.py
@@ -20,30 +20,13 @@
import numpy as np
from common import PolarCommon
+import helper_functions as hf
class PolarEncoder(PolarCommon):
def __init__(self, n, k, frozen_bit_position, frozenbits=None):
PolarCommon.__init__(self, n, k, frozen_bit_position, frozenbits)
- self.G = self._gn(n)
-
- def _gn(self, n):
- # this matrix is called generator matrix
- if n == 1:
- return np.array([1, ])
- f = self._fn(n)
- return f
-
- def _fn(self, n):
- # this matrix defines the actual channel combining.
- if n == 1:
- return np.array([1, ])
- f2 = np.array([[1, 0], [1, 1]], np.int)
- nump = int(np.log2(n)) - 1 # number of Kronecker products to calculate
- fn = f2
- for i in range(nump):
- fn = np.kron(fn, f2)
- return fn
+ self.G = hf.get_Fn(n)
def get_gn(self):
return self.G
@@ -59,9 +42,9 @@ class PolarEncoder(PolarCommon):
return data
def _encode_efficient(self, vec):
- nstages = int(np.log2(self.N))
+ n_stages = int(np.log2(self.N))
pos = np.arange(self.N, dtype=int)
- for i in range(nstages):
+ for i in range(n_stages):
splitted = np.reshape(pos, (2 ** (i + 1), -1))
upper_branch = splitted[0::2].flatten()
lower_branch = splitted[1::2].flatten()
@@ -108,7 +91,7 @@ def test_pseudo_rate_1_encoder(encoder, ntests, k):
def test_encoder_impls():
- print('comparing encoder implementations, matrix vs. efficient')
+ print('Compare encoder implementations, matrix vs. efficient')
ntests = 1000
n = 16
k = 8
@@ -120,16 +103,12 @@ def test_encoder_impls():
print('Test rate-1 encoder/decoder chain results')
r1_test = test_pseudo_rate_1_encoder(encoder, ntests, k)
- print 'test rate-1 encoder/decoder:', r1_test
-
+ print 'Test rate-1 encoder/decoder:', r1_test
def main():
- print "main in encoder"
test_encoder_impls()
-
-
if __name__ == '__main__':
- main() \ No newline at end of file
+ main()
diff --git a/gr-fec/python/fec/polar/helper_functions.py b/gr-fec/python/fec/polar/helper_functions.py
index 72501beae3..ca66bf4a50 100644
--- a/gr-fec/python/fec/polar/helper_functions.py
+++ b/gr-fec/python/fec/polar/helper_functions.py
@@ -56,6 +56,41 @@ def bit_reverse_vector(vec, n):
return np.array([bit_reverse(e, n) for e in vec], dtype=vec.dtype)
+def get_Bn(n):
+ # this is a bit reversal matrix.
+ lw = power_of_2_int(n) # number of used bits
+ indexes = [bit_reverse(i, lw) for i in range(n)]
+ Bn = np.zeros((n, n), type(n))
+ for i, index in enumerate(indexes):
+ Bn[i][index] = 1
+ return Bn
+
+
+def get_Fn(n):
+ # this matrix defines the actual channel combining.
+ if n == 1:
+ return np.array([1, ])
+ nump = power_of_2_int(n) - 1 # number of Kronecker products to calculate
+ F2 = np.array([[1, 0], [1, 1]], np.int)
+ Fn = F2
+ for i in range(nump):
+ Fn = np.kron(Fn, F2)
+ return Fn
+
+
+def get_Gn(n):
+ # this matrix is called generator matrix
+ if not is_power_of_two(n):
+ print "invalid input"
+ return None
+ if n == 1:
+ return np.array([1, ])
+ Bn = get_Bn(n)
+ Fn = get_Fn(n)
+ Gn = np.dot(Bn, Fn)
+ return Gn
+
+
def unpack_byte(byte, nactive):
if np.amin(byte) < 0 or np.amax(byte) > 255:
return None
@@ -118,74 +153,17 @@ def bhattacharyya_parameter(w):
def main():
print 'helper functions'
- for i in range(8):
+ for i in range(9):
print(i, 'is power of 2: ', is_power_of_two(i))
n = 6
m = 2 ** n
- k = m // 2
- eta = 0.3
+
pos = np.arange(m)
rev_pos = bit_reverse_vector(pos, n)
print(pos)
print(rev_pos)
- bound = 16
- num_lanes = m // bound
-
-
- lanes = np.zeros((num_lanes, bound), dtype=int)
- for i in range(0, num_lanes):
- p = i * bound
- part = rev_pos[p: p + bound]
- lanes[i] = part
-
- print('reved lanes')
- print(lanes)
-
- # SHUFFLE!
- shuffle_pos = bit_reverse_vector(np.arange(bound), 4)
- for i in range(num_lanes):
- lane = lanes[i]
- lanes[i] = lanes[i, shuffle_pos]
- print('\nshuffled lanes')
- print(lanes)
-
- # SORT HALVES
- hb = bound // 2
- for i in range(num_lanes // 2):
- l0 = lanes[i]
- l1 = lanes[i + (num_lanes // 2)]
- l0p = copy.deepcopy(l0[hb:])
- l0[hb:] = l1[0:hb]
- l1[0:hb] = l0p
- lanes[i] =l0
- lanes[i + (num_lanes // 2)] = l1
- print('\nsort halves')
- print(lanes)
-
- # 'MELT' SHUFFLE INTERLEAVE!
- melt_pos = np.arange(bound, dtype=int)
- melt_pos = np.reshape(melt_pos, (2, -1)).T.flatten()
- for i in range(num_lanes):
- lanes[i] = lanes[i, melt_pos]
- print('\nmelt lanes')
- print(lanes)
-
-
-
- for i in range(0, m, bound):
- print("\nlook at this part")
- part = pos[i: i + bound]
- rev = bit_reverse_vector(part, n)
- sorted_rev = np.sort(rev)
- print(part)
- print(rev)
- print(sorted_rev)
- sorted_part = rev[shuffle_pos]
- print(sorted_part)
-
-
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py
index c35b62099c..d60c83e776 100755
--- a/gr-fec/python/fec/polar/testbed.py
+++ b/gr-fec/python/fec/polar/testbed.py
@@ -18,9 +18,11 @@
# Boston, MA 02110-1301, USA.
#
-import numpy as np
+
from encoder import PolarEncoder
from decoder import PolarDecoder
+import channel_construction as cc
+from helper_functions import *
import matplotlib.pyplot as plt
@@ -28,7 +30,9 @@ import matplotlib.pyplot as plt
def get_frozen_bit_position():
# frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 6, 8, 9, 10, 12, 16, 17, 18, 20, 24), dtype=int)
# frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
- frozenbitposition = np.load('frozen_bit_positions_n256_k128_p0.11.npy').flatten()
+ m = 256
+ n_frozen = m // 2
+ frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, m), n_frozen)
print(frozenbitposition)
return frozenbitposition
@@ -140,12 +144,185 @@ def channel_analysis():
good_indices *= 2000
good_indices += 4000
-
plt.plot(channel_counter)
plt.plot(good_indices)
plt.show()
+
+def merge_first_stage(init_mask):
+ merged_frozen_mask = []
+ for e in range(0, len(init_mask), 2):
+ v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]]
+ s = init_mask[e]['size'] * 2
+ if init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ t = 'RPT'
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ return merged_frozen_mask
+
+
+def merge_second_stage(init_mask):
+ merged_frozen_mask = []
+ for e in range(0, len(init_mask), 2):
+ if init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
+ t = init_mask[e + 1]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE':
+ t = 'SPC'
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ return merged_frozen_mask
+
+
+def merge_stage_n(init_mask):
+ merged_frozen_mask = []
+ n_elems = len(init_mask) - (len(init_mask) % 2)
+ for e in range(0, n_elems, 2):
+ if init_mask[e]['size'] == init_mask[e + 1]['size']:
+ if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
+ t = init_mask[e + 1]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE':
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ if n_elems < len(init_mask):
+ merged_frozen_mask.append(init_mask[-1])
+ return merged_frozen_mask
+
+
+def print_decode_subframes(subframes):
+ for e in subframes:
+ print(e)
+
+
+def find_decoder_subframes(frozen_mask):
+ stages = power_of_2_int(len(frozen_mask))
+ block_size = 2 ** stages
+
+ lock_mask = np.zeros(block_size, dtype=int)
+ sub_mask = []
+
+ for e in frozen_mask:
+ if e == 1:
+ sub_mask.append(0)
+ else:
+ sub_mask.append(1)
+ sub_mask = np.array(sub_mask, dtype=int)
+
+ for s in range(0, stages):
+ stage_size = 2 ** s
+ mask = np.reshape(sub_mask, (-1, stage_size))
+ lock = np.reshape(lock_mask, (-1, stage_size))
+ for p in range(0, (block_size // stage_size) - 1, 2):
+ l0 = lock[p]
+ l1 = lock[p + 1]
+ first = mask[p]
+ second = mask[p + 1]
+ print(l0, l1)
+ print(first, second)
+ if np.all(l0 == l1):
+ for eq in range(2):
+ if np.all(first == eq) and np.all(second == eq):
+ mask[p].fill(eq)
+ mask[p + 1].fill(eq)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if np.all(first == 0) and np.all(second == 2):
+ mask[p].fill(2)
+ mask[p + 1].fill(2)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if np.all(first == 3) and np.all(second == 1):
+ mask[p].fill(3)
+ mask[p + 1].fill(3)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if s == 0 and np.all(first == 0) and np.all(second == 1):
+ mask[p].fill(2)
+ mask[p + 1].fill(2)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if s == 1 and np.all(first == 2) and np.all(second == 1):
+ mask[p].fill(3)
+ mask[p + 1].fill(3)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ sub_mask = mask.flatten()
+ lock_mask = lock.flatten()
+
+ words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'}
+ ll = lock_mask[0]
+ sub_t = sub_mask[0]
+ for i in range(len(frozen_mask)):
+ v = frozen_mask[i]
+ t = words[sub_mask[i]]
+ l = lock_mask[i]
+ # if i % 8 == 0:
+ # print
+ if not l == ll or not sub_mask[i] == sub_t:
+ print('--------------------------')
+ ll = l
+ sub_t = sub_mask[i]
+ print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t))
+
+
+def load_file(filename):
+ z_params = []
+ with open(filename, 'r') as f:
+ for line in f:
+ if 'Bhattacharyya:' in line:
+ l = line.split(' ')
+ l = l[10:-2]
+ l = l[0][:-1]
+ l = float(l)
+ z_params.append(l)
+ return np.array(z_params)
+
+
def main():
+ n = 8
+ m = 2 ** n
+ k = m // 2
+ n_frozen = n - k
# n = 16
# k = 8
# frozenbits = np.zeros(n - k)
@@ -153,12 +330,31 @@ def main():
# frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
# print frozenbitposition
- test_enc_dec_chain()
-
+ # test_enc_dec_chain()
# test_1024_rate_1_code()
-
# channel_analysis()
+ frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11)
+ frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m)
+ find_decoder_subframes(frozen_mask)
+
+ frozen_mask = np.zeros(m, dtype=int)
+ frozen_mask[frozen_indices] = 1
+
+ # filename = 'channel_z-parameters.txt'
+ # ido = load_file(filename)
+ # ido_frozen = cc.get_frozen_bit_indices_from_z_parameters(ido, k)
+ # ido_mask = np.zeros(m, dtype=int)
+ # ido_mask[ido_frozen] = 1
+ #
+ #
+ # plt.plot(ido_mask)
+ # plt.plot(frozen_mask)
+ # for i in range(m):
+ # if not ido_mask[i] == frozen_mask[i]:
+ # plt.axvline(i, color='r')
+ # plt.show()
+
if __name__ == '__main__':
main() \ No newline at end of file
diff --git a/gr-fec/python/fec/qa_polar_decoder_sc.py b/gr-fec/python/fec/qa_polar_decoder_sc.py
index 1e7cd25e26..030142d6a6 100644
--- a/gr-fec/python/fec/qa_polar_decoder_sc.py
+++ b/gr-fec/python/fec/qa_polar_decoder_sc.py
@@ -19,24 +19,20 @@
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
-from Crypto.Cipher._AES import block_size
from gnuradio import gr, gr_unittest, blocks
import fec_swig as fec
-from _qa_helper import _qa_helper
-import numpy as np
-import os
-from extended_encoder import extended_encoder
+import numpy as np
from extended_decoder import extended_decoder
from polar.encoder import PolarEncoder
-from polar.decoder import PolarDecoder
import polar.channel_construction as cc
-from polar.helper_functions import bit_reverse_vector
+# import os
# print('PID:', os.getpid())
# raw_input('tell me smth')
+
class test_polar_decoder_sc(gr_unittest.TestCase):
def setUp(self):
@@ -46,13 +42,12 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
self.tb = None
def test_001_setup(self):
- is_packed = False
block_size = 16
num_info_bits = 8
frozen_bit_positions = np.arange(block_size - num_info_bits)
frozen_bit_values = np.array([],)
- polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
self.assertEqual(num_info_bits, polar_decoder.get_output_size())
self.assertEqual(block_size, polar_decoder.get_input_size())
@@ -60,8 +55,6 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
self.assertFalse(polar_decoder.set_frame_size(10))
def test_002_one_vector(self):
- print "test_002_one_vector"
- is_packed = False
block_power = 10
block_size = 2 ** block_power
num_info_bits = 2 ** (block_power - 1)
@@ -71,7 +64,7 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, 1, True)
- polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
src = blocks.vector_source_f(gr_data, False)
dec_block = extended_decoder(polar_decoder, None)
snk = blocks.vector_sink_b(1)
@@ -81,17 +74,10 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
self.tb.run()
res = np.array(snk.data()).astype(dtype=int)
-
- print("input:", gr_data.astype(dtype=int))
- print("ref :", bits)
- print("res :", res)
-
self.assertTupleEqual(tuple(res), tuple(bits))
def test_003_stream(self):
- print "test_003_stream"
nframes = 3
- is_packed = False
block_power = 8
block_size = 2 ** block_power
num_info_bits = 2 ** (block_power - 1)
@@ -101,7 +87,7 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, False)
- polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
src = blocks.vector_source_f(gr_data, False)
dec_block = extended_decoder(polar_decoder, None)
snk = blocks.vector_sink_b(1)
@@ -111,11 +97,6 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
self.tb.run()
res = np.array(snk.data()).astype(dtype=int)
-
- print("input:", gr_data.astype(dtype=int))
- print("ref :", bits)
- print("res :", res)
-
self.assertTupleEqual(tuple(res), tuple(bits))
def generate_test_data(self, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, onlyones):
diff --git a/gr-fec/python/fec/qa_polar_decoder_sc_list.py b/gr-fec/python/fec/qa_polar_decoder_sc_list.py
index 3aefd0f478..6b1fe3d431 100644
--- a/gr-fec/python/fec/qa_polar_decoder_sc_list.py
+++ b/gr-fec/python/fec/qa_polar_decoder_sc_list.py
@@ -23,15 +23,12 @@
from gnuradio import gr, gr_unittest, blocks
import fec_swig as fec
import numpy as np
-import os
-from extended_encoder import extended_encoder
from extended_decoder import extended_decoder
from polar.encoder import PolarEncoder
-from polar.decoder import PolarDecoder
import polar.channel_construction as cc
-
+# import os
# print('PID:', os.getpid())
# raw_input('tell me smth')
@@ -45,14 +42,13 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
self.tb = None
def test_001_setup(self):
- is_packed = False
block_size = 16
num_info_bits = 8
max_list_size = 4
frozen_bit_positions = np.arange(block_size - num_info_bits)
frozen_bit_values = np.array([],)
- polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
self.assertEqual(num_info_bits, polar_decoder.get_output_size())
self.assertEqual(block_size, polar_decoder.get_input_size())
@@ -61,7 +57,6 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
def test_002_one_vector(self):
print "test_002_one_vector"
- is_packed = False
expo = 6
block_size = 2 ** expo
num_info_bits = 2 ** (expo - 1)
@@ -69,19 +64,13 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
num_frozen_bits = block_size - num_info_bits
frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
frozen_bit_values = np.array([0] * num_frozen_bits,)
- print(frozen_bit_positions)
-
- python_decoder = PolarDecoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
- # data = np.ones(block_size, dtype=int)
bits = np.random.randint(2, size=num_info_bits)
- # bits = np.ones(num_info_bits, dtype=int)
encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
data = encoder.encode(bits)
- # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int)
gr_data = 2.0 * data - 1.0
- polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
src = blocks.vector_source_f(gr_data, False)
dec_block = extended_decoder(polar_decoder, None)
snk = blocks.vector_sink_b(1)
@@ -92,19 +81,16 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
res = np.array(snk.data()).astype(dtype=int)
- ref = python_decoder.decode(data)
+ print("\ninput -> result -> bits")
+ print(data)
+ print(res)
+ print(bits)
- print("input:", data)
- print("res :", res)
- print("ref :", ref)
- print("bits :", bits)
-
- self.assertTupleEqual(tuple(res), tuple(ref))
+ self.assertTupleEqual(tuple(res), tuple(bits))
def test_003_stream(self):
print "test_003_stream"
nframes = 5
- is_packed = False
expo = 8
block_size = 2 ** expo
num_info_bits = 2 ** (expo - 1)
@@ -112,11 +98,9 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
num_frozen_bits = block_size - num_info_bits
frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
frozen_bit_values = np.array([0] * num_frozen_bits,)
- print(frozen_bit_positions)
encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
- # data = np.ones(block_size, dtype=int)
ref = np.array([], dtype=int)
data = np.array([], dtype=int)
for i in range(nframes):
@@ -124,13 +108,9 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
d = encoder.encode(b)
data = np.append(data, d)
ref = np.append(ref, b)
-
- # bits = np.ones(num_info_bits, dtype=int)
- # data = encoder.encode(bits)
- # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int)
gr_data = 2.0 * data - 1.0
- polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ polar_decoder = fec.polar_decoder_sc_list.make(max_list_size, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
src = blocks.vector_source_f(gr_data, False)
dec_block = extended_decoder(polar_decoder, None)
snk = blocks.vector_sink_b(1)
@@ -140,17 +120,9 @@ class test_polar_decoder_sc_list(gr_unittest.TestCase):
self.tb.run()
res = np.array(snk.data()).astype(dtype=int)
-
-
- print("input:", data)
- print("res :", res)
- print("ref :", ref)
-
self.assertTupleEqual(tuple(res), tuple(ref))
-
-
if __name__ == '__main__':
gr_unittest.run(test_polar_decoder_sc_list)
diff --git a/gr-fec/python/fec/qa_polar_encoder.py b/gr-fec/python/fec/qa_polar_encoder.py
index 90190cd719..22d9b11fae 100644
--- a/gr-fec/python/fec/qa_polar_encoder.py
+++ b/gr-fec/python/fec/qa_polar_encoder.py
@@ -101,12 +101,10 @@ class test_polar_encoder(gr_unittest.TestCase):
self.tb.run()
res = np.array(snk.data()).astype(dtype=int)
-
- print(res)
- print(ref)
self.assertTupleEqual(tuple(res), tuple(ref))
def get_test_data(self, block_size, num_info_bits, num_blocks, is_packed):
+ # helper function to set up test data and together with encoder object.
num_frozen_bits = block_size - num_info_bits
frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
frozen_bit_values = np.array([0] * num_frozen_bits,)
@@ -122,10 +120,6 @@ class test_polar_encoder(gr_unittest.TestCase):
return data, ref, polar_encoder
-
-
-
-
if __name__ == '__main__':
gr_unittest.run(test_polar_encoder)