summaryrefslogtreecommitdiff
path: root/gr-fec/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-fec/python')
-rw-r--r--gr-fec/python/fec/polar/channel_construction.py194
-rwxr-xr-xgr-fec/python/fec/polar/channel_construction_bsc.py3
-rw-r--r--gr-fec/python/fec/polar/helper_functions.py77
-rwxr-xr-xgr-fec/python/fec/polar/testbed.py4
-rw-r--r--gr-fec/python/fec/qa_polar_decoder_sc.py62
-rw-r--r--gr-fec/python/fec/qa_polar_encoder.py73
6 files changed, 322 insertions, 91 deletions
diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py
index f971ce4f61..9c38d3a7e6 100644
--- a/gr-fec/python/fec/polar/channel_construction.py
+++ b/gr-fec/python/fec/polar/channel_construction.py
@@ -28,6 +28,7 @@ import numpy as np
from channel_construction_bec import calculate_bec_channel_capacities
from channel_construction_bec import design_snr_to_bec_eta
from channel_construction_bsc import tal_vardy_tpm_algorithm
+from helper_functions import *
import matplotlib.pyplot as plt
@@ -39,7 +40,7 @@ def get_frozen_bit_indices_from_capacities(chan_caps, nfrozen):
while indexes.size < nfrozen:
index = np.argmin(chan_caps)
indexes = np.append(indexes, index)
- chan_caps[index] = 1.0
+ chan_caps[index] = 2.0 # make absolutely sure value is out of range!
return np.sort(indexes)
@@ -109,19 +110,200 @@ def load_z_parameters(block_size, design_snr, mu):
return z_params
+def prepare_merger(frozen_mask):
+ mask = []
+ for e in frozen_mask:
+ mask.append([e, ])
+ return np.array(mask, dtype=int)
+
+
+def merge_first_stage(init_mask):
+ merged_frozen_mask = []
+ for e in range(0, len(init_mask), 2):
+ v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]]
+ s = init_mask[e]['size'] * 2
+ if init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ t = 'RPT'
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ return merged_frozen_mask
+
+
+def merge_second_stage(init_mask):
+ merged_frozen_mask = []
+ for e in range(0, len(init_mask), 2):
+ if init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
+ t = init_mask[e + 1]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE':
+ t = 'SPC'
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ return merged_frozen_mask
+
+
+def merge_stage_n(init_mask):
+ merged_frozen_mask = []
+ n_elems = len(init_mask) - (len(init_mask) % 2)
+ for e in range(0, n_elems, 2):
+ if init_mask[e]['size'] == init_mask[e + 1]['size']:
+ if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']:
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT':
+ t = init_mask[e + 1]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE':
+ t = init_mask[e]['type']
+ v = init_mask[e]['value']
+ v.extend(init_mask[e + 1]['value'])
+ s = init_mask[e]['size'] * 2
+ merged_frozen_mask.append({'value': v, 'type': t, 'size': s})
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ else:
+ merged_frozen_mask.append(init_mask[e])
+ merged_frozen_mask.append(init_mask[e + 1])
+ if n_elems < len(init_mask):
+ merged_frozen_mask.append(init_mask[-1])
+ return merged_frozen_mask
+
+
+def print_decode_subframes(subframes):
+ for e in subframes:
+ print(e)
+
+
+def find_decoder_subframes(frozen_mask):
+ stages = power_of_2_int(len(frozen_mask))
+ frame_size = 2 ** stages
+
+ lock_mask = np.zeros(frame_size, dtype=int)
+ sub_mask = []
+
+ for e in frozen_mask:
+ if e == 1:
+ sub_mask.append(0)
+ else:
+ sub_mask.append(1)
+ sub_mask = np.array(sub_mask, dtype=int)
+
+ for s in range(0, stages):
+ stage_size = 2 ** s
+ mask = np.reshape(sub_mask, (-1, stage_size))
+ lock = np.reshape(lock_mask, (-1, stage_size))
+ for p in range(0, (frame_size // stage_size) - 1, 2):
+ l0 = lock[p]
+ l1 = lock[p + 1]
+ first = mask[p]
+ second = mask[p + 1]
+ print(l0, l1)
+ print(first, second)
+ if np.all(l0 == l1):
+ for eq in range(2):
+ if np.all(first == eq) and np.all(second == eq):
+ mask[p].fill(eq)
+ mask[p + 1].fill(eq)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if np.all(first == 0) and np.all(second == 2):
+ mask[p].fill(2)
+ mask[p + 1].fill(2)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if np.all(first == 3) and np.all(second == 1):
+ mask[p].fill(3)
+ mask[p + 1].fill(3)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if s == 0 and np.all(first == 0) and np.all(second == 1):
+ mask[p].fill(2)
+ mask[p + 1].fill(2)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ if s == 1 and np.all(first == 2) and np.all(second == 1):
+ mask[p].fill(3)
+ mask[p + 1].fill(3)
+ lock[p].fill(s)
+ lock[p + 1].fill(s)
+
+ sub_mask = mask.flatten()
+ lock_mask = lock.flatten()
+
+ words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'}
+ ll = lock_mask[0]
+ sub_t = sub_mask[0]
+ for i in range(len(frozen_mask)):
+ v = frozen_mask[i]
+ t = words[sub_mask[i]]
+ l = lock_mask[i]
+ # if i % 8 == 0:
+ # print
+ if not l == ll or not sub_mask[i] == sub_t:
+ print('--------------------------')
+ ll = l
+ sub_t = sub_mask[i]
+ print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t))
+
+
+
def main():
+ np.set_printoptions(precision=3, linewidth=150)
print 'channel construction Bhattacharyya bounds by Arikan'
- n = 8
+ n = 10
m = 2 ** n
k = m // 2
design_snr = -1.59
mu = 32
- # ztv = tal_vardy_tpm_algorithm(m, design_snr, mu)
-
z_params = load_z_parameters(m, design_snr, mu)
- plt.plot(z_params)
- plt.show()
+ # plt.plot(z_params)
+ # plt.show()
+ frozen_indices = get_frozen_bit_indices_from_z_parameters(z_params, k)
+
+ frozen_mask = np.zeros(m, dtype=int)
+ frozen_mask[frozen_indices] = 1
+ # frozen_mask = np.reshape(frozen_mask, (-1, 32))
+ # for p in frozen_mask:
+ # print(p)
+ # if np.all(p == 1):
+ # print("zero rate")
+ # elif np.all(p == 0):
+ # print("ONE rate")
+ # elif p[0] == 1 and np.all(p[1:] == 0):
+ # print("SPC code")
+ # elif np.all(p[0:-1] == 1) and p[-1] == 0:
+ # print("REPETITION code")
+
+ find_decoder_subframes(frozen_mask)
+
diff --git a/gr-fec/python/fec/polar/channel_construction_bsc.py b/gr-fec/python/fec/polar/channel_construction_bsc.py
index fddad2e755..69acea861d 100755
--- a/gr-fec/python/fec/polar/channel_construction_bsc.py
+++ b/gr-fec/python/fec/polar/channel_construction_bsc.py
@@ -315,7 +315,7 @@ def normalize_q(q, tpm):
def main():
print 'channel construction BSC main'
- n = 8
+ n = 10
m = 2 ** n
k = m // 2
design_snr = 0.5
@@ -323,6 +323,7 @@ def main():
z_params = tal_vardy_tpm_algorithm(m, design_snr, mu)
+ print(z_params)
plt.plot(z_params)
plt.show()
diff --git a/gr-fec/python/fec/polar/helper_functions.py b/gr-fec/python/fec/polar/helper_functions.py
index e93fa9a507..72501beae3 100644
--- a/gr-fec/python/fec/polar/helper_functions.py
+++ b/gr-fec/python/fec/polar/helper_functions.py
@@ -20,6 +20,7 @@
import numpy as np
import time, sys
+import copy
def power_of_2_int(num):
@@ -119,24 +120,72 @@ def main():
for i in range(8):
print(i, 'is power of 2: ', is_power_of_two(i))
- n = 2 ** 6
- k = n // 2
+ n = 6
+ m = 2 ** n
+ k = m // 2
eta = 0.3
- # frozen_bit_positions = get_frozen_bit_positions('.', 256, 128, 0.11)
- # print(frozen_bit_positions)
+ pos = np.arange(m)
+ rev_pos = bit_reverse_vector(pos, n)
+ print(pos)
+ print(rev_pos)
+
+ bound = 16
+ num_lanes = m // bound
+
+
+ lanes = np.zeros((num_lanes, bound), dtype=int)
+ for i in range(0, num_lanes):
+ p = i * bound
+ part = rev_pos[p: p + bound]
+ lanes[i] = part
+
+ print('reved lanes')
+ print(lanes)
+
+ # SHUFFLE!
+ shuffle_pos = bit_reverse_vector(np.arange(bound), 4)
+ for i in range(num_lanes):
+ lane = lanes[i]
+ lanes[i] = lanes[i, shuffle_pos]
+ print('\nshuffled lanes')
+ print(lanes)
+
+ # SORT HALVES
+ hb = bound // 2
+ for i in range(num_lanes // 2):
+ l0 = lanes[i]
+ l1 = lanes[i + (num_lanes // 2)]
+ l0p = copy.deepcopy(l0[hb:])
+ l0[hb:] = l1[0:hb]
+ l1[0:hb] = l0p
+ lanes[i] =l0
+ lanes[i + (num_lanes // 2)] = l1
+ print('\nsort halves')
+ print(lanes)
+
+ # 'MELT' SHUFFLE INTERLEAVE!
+ melt_pos = np.arange(bound, dtype=int)
+ melt_pos = np.reshape(melt_pos, (2, -1)).T.flatten()
+ for i in range(num_lanes):
+ lanes[i] = lanes[i, melt_pos]
+ print('\nmelt lanes')
+ print(lanes)
+
+
+
+ for i in range(0, m, bound):
+ print("\nlook at this part")
+ part = pos[i: i + bound]
+ rev = bit_reverse_vector(part, n)
+ sorted_rev = np.sort(rev)
+ print(part)
+ print(rev)
+ print(sorted_rev)
+ sorted_part = rev[shuffle_pos]
+ print(sorted_part)
- print(np.arange(16))
- print bit_reverse_vector(np.arange(16), 4)
- ntotal = 99
- for i in range(ntotal):
- show_progress_bar(i, ntotal)
- time.sleep(0.1)
-
- # sys.stdout.write('Hello')
- # time.sleep(1)
- # sys.stdout.write('\rMomy ')
if __name__ == '__main__':
main()
diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py
index bdf9ae437c..c35b62099c 100755
--- a/gr-fec/python/fec/polar/testbed.py
+++ b/gr-fec/python/fec/polar/testbed.py
@@ -153,11 +153,11 @@ def main():
# frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
# print frozenbitposition
- # test_enc_dec_chain()
+ test_enc_dec_chain()
# test_1024_rate_1_code()
- channel_analysis()
+ # channel_analysis()
if __name__ == '__main__':
diff --git a/gr-fec/python/fec/qa_polar_decoder_sc.py b/gr-fec/python/fec/qa_polar_decoder_sc.py
index 966a9f169a..1e7cd25e26 100644
--- a/gr-fec/python/fec/qa_polar_decoder_sc.py
+++ b/gr-fec/python/fec/qa_polar_decoder_sc.py
@@ -32,7 +32,7 @@ from extended_decoder import extended_decoder
from polar.encoder import PolarEncoder
from polar.decoder import PolarDecoder
import polar.channel_construction as cc
-# from polar.helper_functions import bit_reverse_vector
+from polar.helper_functions import bit_reverse_vector
# print('PID:', os.getpid())
# raw_input('tell me smth')
@@ -62,22 +62,14 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
def test_002_one_vector(self):
print "test_002_one_vector"
is_packed = False
- block_power = 8
+ block_power = 10
block_size = 2 ** block_power
num_info_bits = 2 ** (block_power - 1)
num_frozen_bits = block_size - num_info_bits
frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
frozen_bit_values = np.array([0] * num_frozen_bits,)
- print frozen_bit_positions
-
- python_decoder = PolarDecoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
- bits = np.ones(num_info_bits, dtype=int)
- # bits = np.random.randint(2, size=num_info_bits)
- encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
- data = encoder.encode(bits)
- # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int)
- gr_data = 2.0 * data - 1.0
+ bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, 1, True)
polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
src = blocks.vector_source_f(gr_data, False)
@@ -90,16 +82,14 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
res = np.array(snk.data()).astype(dtype=int)
- ref = python_decoder.decode(data)
-
- print("input:", data)
+ print("input:", gr_data.astype(dtype=int))
+ print("ref :", bits)
print("res :", res)
- print("ref :", ref)
- self.assertTupleEqual(tuple(res), tuple(ref))
+ self.assertTupleEqual(tuple(res), tuple(bits))
def test_003_stream(self):
- print "test_002_stream"
+ print "test_003_stream"
nframes = 3
is_packed = False
block_power = 8
@@ -108,23 +98,8 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
num_frozen_bits = block_size - num_info_bits
frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
frozen_bit_values = np.array([0] * num_frozen_bits,)
- print frozen_bit_positions
- python_decoder = PolarDecoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
- encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
-
- bits = np.array([], dtype=int)
- data = np.array([], dtype=int)
- for n in range(nframes):
- b = np.random.randint(2, size=num_info_bits)
- d = encoder.encode(b)
- bits = np.append(bits, b)
- data = np.append(data, d)
- # bits = np.ones(num_info_bits, dtype=int)
- # bits = np.random.randint(2, size=num_info_bits)
- # data = encoder.encode(bits)
- # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int)
- gr_data = 2.0 * data - 1.0
+ bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, False)
polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
src = blocks.vector_source_f(gr_data, False)
@@ -137,14 +112,27 @@ class test_polar_decoder_sc(gr_unittest.TestCase):
res = np.array(snk.data()).astype(dtype=int)
- # ref = python_decoder.decode(data)
-
- print("input:", data)
+ print("input:", gr_data.astype(dtype=int))
+ print("ref :", bits)
print("res :", res)
- # print("ref :", ref)
self.assertTupleEqual(tuple(res), tuple(bits))
+ def generate_test_data(self, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, onlyones):
+ encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
+ bits = np.array([], dtype=int)
+ data = np.array([], dtype=int)
+ for n in range(nframes):
+ if onlyones:
+ b = np.ones(num_info_bits, dtype=int)
+ else:
+ b = np.random.randint(2, size=num_info_bits)
+ d = encoder.encode(b)
+ bits = np.append(bits, b)
+ data = np.append(data, d)
+ gr_data = 2.0 * data - 1.0
+ return bits, gr_data
+
if __name__ == '__main__':
gr_unittest.run(test_polar_decoder_sc)
diff --git a/gr-fec/python/fec/qa_polar_encoder.py b/gr-fec/python/fec/qa_polar_encoder.py
index b4f26e4017..90190cd719 100644
--- a/gr-fec/python/fec/qa_polar_encoder.py
+++ b/gr-fec/python/fec/qa_polar_encoder.py
@@ -28,6 +28,10 @@ from extended_encoder import extended_encoder
from polar.encoder import PolarEncoder
import polar.channel_construction as cc
+# import os
+# print('PID:', os.getpid())
+# raw_input('tell me smth')
+
class test_polar_encoder(gr_unittest.TestCase):
@@ -50,54 +54,45 @@ class test_polar_encoder(gr_unittest.TestCase):
self.assertFloatTuplesAlmostEqual((float(num_info_bits) / block_size, ), (polar_encoder.rate(), ))
self.assertFalse(polar_encoder.set_frame_size(10))
- def test_002_work_function(self):
- block_size = 256
- num_info_bits = 128
- num_frozen_bits = block_size - num_info_bits
- frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
- frozen_bit_values = np.array([0] * num_frozen_bits,)
- python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
-
+ def test_002_work_function_packed(self):
is_packed = True
- polar_encoder = fec.polar_encoder.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ block_size = 256
+ num_info_bits = block_size // 2
- data = np.ones(num_info_bits, dtype=int)
+ data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, 1, is_packed)
src = blocks.vector_source_b(data, False)
enc_block = extended_encoder(polar_encoder, None, '11')
snk = blocks.vector_sink_b(1)
self.tb.connect(src, enc_block, snk)
self.tb.run()
- print(self.tb.edge_list())
res = np.array(snk.data()).astype(dtype=int)
- penc = python_encoder.encode(data)
-
- print(res)
- print(penc)
- self.assertTupleEqual(tuple(res), tuple(penc))
+ self.assertTupleEqual(tuple(res), tuple(ref))
- def test_003_big_input(self):
- is_packed = True
- num_blocks = 30
+ def test_003_work_function_unpacked(self):
+ is_packed = False
block_size = 256
- num_info_bits = 128
- num_frozen_bits = block_size - num_info_bits
- frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
- frozen_bit_values = np.array([0] * num_frozen_bits,)
- python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
+ num_info_bits = block_size // 2
- polar_encoder = fec.polar_encoder.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, 1, is_packed)
+ src = blocks.vector_source_b(data, False)
+ enc_block = extended_encoder(polar_encoder, None, '11')
+ snk = blocks.vector_sink_b(1)
- data = np.array([], dtype=int)
- ref = np.array([], dtype=int)
+ self.tb.connect(src, enc_block, snk)
+ self.tb.run()
- for i in range(num_blocks):
- d = np.random.randint(2, size=num_info_bits)
- data = np.append(data, d)
- ref = np.append(ref, python_encoder.encode(d))
+ res = np.array(snk.data()).astype(dtype=int)
+ self.assertTupleEqual(tuple(res), tuple(ref))
+ def test_004_big_input(self):
+ is_packed = False
+ num_blocks = 30
+ block_size = 1024
+ num_info_bits = block_size // 8
+ data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, num_blocks, is_packed)
src = blocks.vector_source_b(data, False)
enc_block = extended_encoder(polar_encoder, None, '11')
snk = blocks.vector_sink_b(1)
@@ -111,6 +106,22 @@ class test_polar_encoder(gr_unittest.TestCase):
print(ref)
self.assertTupleEqual(tuple(res), tuple(ref))
+ def get_test_data(self, block_size, num_info_bits, num_blocks, is_packed):
+ num_frozen_bits = block_size - num_info_bits
+ frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
+ frozen_bit_values = np.array([0] * num_frozen_bits,)
+ python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
+
+ data = np.array([], dtype=int)
+ ref = np.array([], dtype=int)
+ for i in range(num_blocks):
+ d = np.random.randint(2, size=num_info_bits)
+ data = np.append(data, d)
+ ref = np.append(ref, python_encoder.encode(d))
+ polar_encoder = fec.polar_encoder.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed)
+ return data, ref, polar_encoder
+
+