diff options
Diffstat (limited to 'gr-fec/python')
-rw-r--r-- | gr-fec/python/fec/polar/channel_construction.py | 194 | ||||
-rwxr-xr-x | gr-fec/python/fec/polar/channel_construction_bsc.py | 3 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/helper_functions.py | 77 | ||||
-rwxr-xr-x | gr-fec/python/fec/polar/testbed.py | 4 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_decoder_sc.py | 62 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_encoder.py | 73 |
6 files changed, 322 insertions, 91 deletions
diff --git a/gr-fec/python/fec/polar/channel_construction.py b/gr-fec/python/fec/polar/channel_construction.py index f971ce4f61..9c38d3a7e6 100644 --- a/gr-fec/python/fec/polar/channel_construction.py +++ b/gr-fec/python/fec/polar/channel_construction.py @@ -28,6 +28,7 @@ import numpy as np from channel_construction_bec import calculate_bec_channel_capacities from channel_construction_bec import design_snr_to_bec_eta from channel_construction_bsc import tal_vardy_tpm_algorithm +from helper_functions import * import matplotlib.pyplot as plt @@ -39,7 +40,7 @@ def get_frozen_bit_indices_from_capacities(chan_caps, nfrozen): while indexes.size < nfrozen: index = np.argmin(chan_caps) indexes = np.append(indexes, index) - chan_caps[index] = 1.0 + chan_caps[index] = 2.0 # make absolutely sure value is out of range! return np.sort(indexes) @@ -109,19 +110,200 @@ def load_z_parameters(block_size, design_snr, mu): return z_params +def prepare_merger(frozen_mask): + mask = [] + for e in frozen_mask: + mask.append([e, ]) + return np.array(mask, dtype=int) + + +def merge_first_stage(init_mask): + merged_frozen_mask = [] + for e in range(0, len(init_mask), 2): + v = [init_mask[e]['value'][0], init_mask[e + 1]['value'][0]] + s = init_mask[e]['size'] * 2 + if init_mask[e]['type'] == init_mask[e + 1]['type']: + t = init_mask[e]['type'] + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + else: + t = 'RPT' + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + return merged_frozen_mask + + +def merge_second_stage(init_mask): + merged_frozen_mask = [] + for e in range(0, len(init_mask), 2): + if init_mask[e]['type'] == init_mask[e + 1]['type']: + t = init_mask[e]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': + t = init_mask[e + 1]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'RPT' and init_mask[e + 1]['type'] == 'ONE': + t = 'SPC' + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + else: + merged_frozen_mask.append(init_mask[e]) + merged_frozen_mask.append(init_mask[e + 1]) + return merged_frozen_mask + + +def merge_stage_n(init_mask): + merged_frozen_mask = [] + n_elems = len(init_mask) - (len(init_mask) % 2) + for e in range(0, n_elems, 2): + if init_mask[e]['size'] == init_mask[e + 1]['size']: + if (init_mask[e]['type'] == 'ZERO' or init_mask[e]['type'] == 'ONE') and init_mask[e]['type'] == init_mask[e + 1]['type']: + t = init_mask[e]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'ZERO' and init_mask[e + 1]['type'] == 'RPT': + t = init_mask[e + 1]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + elif init_mask[e]['type'] == 'SPC' and init_mask[e + 1]['type'] == 'ONE': + t = init_mask[e]['type'] + v = init_mask[e]['value'] + v.extend(init_mask[e + 1]['value']) + s = init_mask[e]['size'] * 2 + merged_frozen_mask.append({'value': v, 'type': t, 'size': s}) + else: + merged_frozen_mask.append(init_mask[e]) + merged_frozen_mask.append(init_mask[e + 1]) + else: + merged_frozen_mask.append(init_mask[e]) + merged_frozen_mask.append(init_mask[e + 1]) + if n_elems < len(init_mask): + merged_frozen_mask.append(init_mask[-1]) + return merged_frozen_mask + + +def print_decode_subframes(subframes): + for e in subframes: + print(e) + + +def find_decoder_subframes(frozen_mask): + stages = power_of_2_int(len(frozen_mask)) + frame_size = 2 ** stages + + lock_mask = np.zeros(frame_size, dtype=int) + sub_mask = [] + + for e in frozen_mask: + if e == 1: + sub_mask.append(0) + else: + sub_mask.append(1) + sub_mask = np.array(sub_mask, dtype=int) + + for s in range(0, stages): + stage_size = 2 ** s + mask = np.reshape(sub_mask, (-1, stage_size)) + lock = np.reshape(lock_mask, (-1, stage_size)) + for p in range(0, (frame_size // stage_size) - 1, 2): + l0 = lock[p] + l1 = lock[p + 1] + first = mask[p] + second = mask[p + 1] + print(l0, l1) + print(first, second) + if np.all(l0 == l1): + for eq in range(2): + if np.all(first == eq) and np.all(second == eq): + mask[p].fill(eq) + mask[p + 1].fill(eq) + lock[p].fill(s) + lock[p + 1].fill(s) + + if np.all(first == 0) and np.all(second == 2): + mask[p].fill(2) + mask[p + 1].fill(2) + lock[p].fill(s) + lock[p + 1].fill(s) + + if np.all(first == 3) and np.all(second == 1): + mask[p].fill(3) + mask[p + 1].fill(3) + lock[p].fill(s) + lock[p + 1].fill(s) + + if s == 0 and np.all(first == 0) and np.all(second == 1): + mask[p].fill(2) + mask[p + 1].fill(2) + lock[p].fill(s) + lock[p + 1].fill(s) + + if s == 1 and np.all(first == 2) and np.all(second == 1): + mask[p].fill(3) + mask[p + 1].fill(3) + lock[p].fill(s) + lock[p + 1].fill(s) + + sub_mask = mask.flatten() + lock_mask = lock.flatten() + + words = {0: 'ZERO', 1: 'ONE', 2: 'RPT', 3: 'SPC'} + ll = lock_mask[0] + sub_t = sub_mask[0] + for i in range(len(frozen_mask)): + v = frozen_mask[i] + t = words[sub_mask[i]] + l = lock_mask[i] + # if i % 8 == 0: + # print + if not l == ll or not sub_mask[i] == sub_t: + print('--------------------------') + ll = l + sub_t = sub_mask[i] + print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t)) + + + def main(): + np.set_printoptions(precision=3, linewidth=150) print 'channel construction Bhattacharyya bounds by Arikan' - n = 8 + n = 10 m = 2 ** n k = m // 2 design_snr = -1.59 mu = 32 - # ztv = tal_vardy_tpm_algorithm(m, design_snr, mu) - z_params = load_z_parameters(m, design_snr, mu) - plt.plot(z_params) - plt.show() + # plt.plot(z_params) + # plt.show() + frozen_indices = get_frozen_bit_indices_from_z_parameters(z_params, k) + + frozen_mask = np.zeros(m, dtype=int) + frozen_mask[frozen_indices] = 1 + # frozen_mask = np.reshape(frozen_mask, (-1, 32)) + # for p in frozen_mask: + # print(p) + # if np.all(p == 1): + # print("zero rate") + # elif np.all(p == 0): + # print("ONE rate") + # elif p[0] == 1 and np.all(p[1:] == 0): + # print("SPC code") + # elif np.all(p[0:-1] == 1) and p[-1] == 0: + # print("REPETITION code") + + find_decoder_subframes(frozen_mask) + diff --git a/gr-fec/python/fec/polar/channel_construction_bsc.py b/gr-fec/python/fec/polar/channel_construction_bsc.py index fddad2e755..69acea861d 100755 --- a/gr-fec/python/fec/polar/channel_construction_bsc.py +++ b/gr-fec/python/fec/polar/channel_construction_bsc.py @@ -315,7 +315,7 @@ def normalize_q(q, tpm): def main(): print 'channel construction BSC main' - n = 8 + n = 10 m = 2 ** n k = m // 2 design_snr = 0.5 @@ -323,6 +323,7 @@ def main(): z_params = tal_vardy_tpm_algorithm(m, design_snr, mu) + print(z_params) plt.plot(z_params) plt.show() diff --git a/gr-fec/python/fec/polar/helper_functions.py b/gr-fec/python/fec/polar/helper_functions.py index e93fa9a507..72501beae3 100644 --- a/gr-fec/python/fec/polar/helper_functions.py +++ b/gr-fec/python/fec/polar/helper_functions.py @@ -20,6 +20,7 @@ import numpy as np import time, sys +import copy def power_of_2_int(num): @@ -119,24 +120,72 @@ def main(): for i in range(8): print(i, 'is power of 2: ', is_power_of_two(i)) - n = 2 ** 6 - k = n // 2 + n = 6 + m = 2 ** n + k = m // 2 eta = 0.3 - # frozen_bit_positions = get_frozen_bit_positions('.', 256, 128, 0.11) - # print(frozen_bit_positions) + pos = np.arange(m) + rev_pos = bit_reverse_vector(pos, n) + print(pos) + print(rev_pos) + + bound = 16 + num_lanes = m // bound + + + lanes = np.zeros((num_lanes, bound), dtype=int) + for i in range(0, num_lanes): + p = i * bound + part = rev_pos[p: p + bound] + lanes[i] = part + + print('reved lanes') + print(lanes) + + # SHUFFLE! + shuffle_pos = bit_reverse_vector(np.arange(bound), 4) + for i in range(num_lanes): + lane = lanes[i] + lanes[i] = lanes[i, shuffle_pos] + print('\nshuffled lanes') + print(lanes) + + # SORT HALVES + hb = bound // 2 + for i in range(num_lanes // 2): + l0 = lanes[i] + l1 = lanes[i + (num_lanes // 2)] + l0p = copy.deepcopy(l0[hb:]) + l0[hb:] = l1[0:hb] + l1[0:hb] = l0p + lanes[i] =l0 + lanes[i + (num_lanes // 2)] = l1 + print('\nsort halves') + print(lanes) + + # 'MELT' SHUFFLE INTERLEAVE! + melt_pos = np.arange(bound, dtype=int) + melt_pos = np.reshape(melt_pos, (2, -1)).T.flatten() + for i in range(num_lanes): + lanes[i] = lanes[i, melt_pos] + print('\nmelt lanes') + print(lanes) + + + + for i in range(0, m, bound): + print("\nlook at this part") + part = pos[i: i + bound] + rev = bit_reverse_vector(part, n) + sorted_rev = np.sort(rev) + print(part) + print(rev) + print(sorted_rev) + sorted_part = rev[shuffle_pos] + print(sorted_part) - print(np.arange(16)) - print bit_reverse_vector(np.arange(16), 4) - ntotal = 99 - for i in range(ntotal): - show_progress_bar(i, ntotal) - time.sleep(0.1) - - # sys.stdout.write('Hello') - # time.sleep(1) - # sys.stdout.write('\rMomy ') if __name__ == '__main__': main() diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py index bdf9ae437c..c35b62099c 100755 --- a/gr-fec/python/fec/polar/testbed.py +++ b/gr-fec/python/fec/polar/testbed.py @@ -153,11 +153,11 @@ def main(): # frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) # print frozenbitposition - # test_enc_dec_chain() + test_enc_dec_chain() # test_1024_rate_1_code() - channel_analysis() + # channel_analysis() if __name__ == '__main__': diff --git a/gr-fec/python/fec/qa_polar_decoder_sc.py b/gr-fec/python/fec/qa_polar_decoder_sc.py index 966a9f169a..1e7cd25e26 100644 --- a/gr-fec/python/fec/qa_polar_decoder_sc.py +++ b/gr-fec/python/fec/qa_polar_decoder_sc.py @@ -32,7 +32,7 @@ from extended_decoder import extended_decoder from polar.encoder import PolarEncoder from polar.decoder import PolarDecoder import polar.channel_construction as cc -# from polar.helper_functions import bit_reverse_vector +from polar.helper_functions import bit_reverse_vector # print('PID:', os.getpid()) # raw_input('tell me smth') @@ -62,22 +62,14 @@ class test_polar_decoder_sc(gr_unittest.TestCase): def test_002_one_vector(self): print "test_002_one_vector" is_packed = False - block_power = 8 + block_power = 10 block_size = 2 ** block_power num_info_bits = 2 ** (block_power - 1) num_frozen_bits = block_size - num_info_bits frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) frozen_bit_values = np.array([0] * num_frozen_bits,) - print frozen_bit_positions - - python_decoder = PolarDecoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - bits = np.ones(num_info_bits, dtype=int) - # bits = np.random.randint(2, size=num_info_bits) - encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - data = encoder.encode(bits) - # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int) - gr_data = 2.0 * data - 1.0 + bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, 1, True) polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) src = blocks.vector_source_f(gr_data, False) @@ -90,16 +82,14 @@ class test_polar_decoder_sc(gr_unittest.TestCase): res = np.array(snk.data()).astype(dtype=int) - ref = python_decoder.decode(data) - - print("input:", data) + print("input:", gr_data.astype(dtype=int)) + print("ref :", bits) print("res :", res) - print("ref :", ref) - self.assertTupleEqual(tuple(res), tuple(ref)) + self.assertTupleEqual(tuple(res), tuple(bits)) def test_003_stream(self): - print "test_002_stream" + print "test_003_stream" nframes = 3 is_packed = False block_power = 8 @@ -108,23 +98,8 @@ class test_polar_decoder_sc(gr_unittest.TestCase): num_frozen_bits = block_size - num_info_bits frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) frozen_bit_values = np.array([0] * num_frozen_bits,) - print frozen_bit_positions - python_decoder = PolarDecoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - - bits = np.array([], dtype=int) - data = np.array([], dtype=int) - for n in range(nframes): - b = np.random.randint(2, size=num_info_bits) - d = encoder.encode(b) - bits = np.append(bits, b) - data = np.append(data, d) - # bits = np.ones(num_info_bits, dtype=int) - # bits = np.random.randint(2, size=num_info_bits) - # data = encoder.encode(bits) - # data = np.array([0, 1, 1, 0, 1, 0, 1, 0], dtype=int) - gr_data = 2.0 * data - 1.0 + bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, False) polar_decoder = fec.polar_decoder_sc.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) src = blocks.vector_source_f(gr_data, False) @@ -137,14 +112,27 @@ class test_polar_decoder_sc(gr_unittest.TestCase): res = np.array(snk.data()).astype(dtype=int) - # ref = python_decoder.decode(data) - - print("input:", data) + print("input:", gr_data.astype(dtype=int)) + print("ref :", bits) print("res :", res) - # print("ref :", ref) self.assertTupleEqual(tuple(res), tuple(bits)) + def generate_test_data(self, block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, nframes, onlyones): + encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) + bits = np.array([], dtype=int) + data = np.array([], dtype=int) + for n in range(nframes): + if onlyones: + b = np.ones(num_info_bits, dtype=int) + else: + b = np.random.randint(2, size=num_info_bits) + d = encoder.encode(b) + bits = np.append(bits, b) + data = np.append(data, d) + gr_data = 2.0 * data - 1.0 + return bits, gr_data + if __name__ == '__main__': gr_unittest.run(test_polar_decoder_sc) diff --git a/gr-fec/python/fec/qa_polar_encoder.py b/gr-fec/python/fec/qa_polar_encoder.py index b4f26e4017..90190cd719 100644 --- a/gr-fec/python/fec/qa_polar_encoder.py +++ b/gr-fec/python/fec/qa_polar_encoder.py @@ -28,6 +28,10 @@ from extended_encoder import extended_encoder from polar.encoder import PolarEncoder import polar.channel_construction as cc +# import os +# print('PID:', os.getpid()) +# raw_input('tell me smth') + class test_polar_encoder(gr_unittest.TestCase): @@ -50,54 +54,45 @@ class test_polar_encoder(gr_unittest.TestCase): self.assertFloatTuplesAlmostEqual((float(num_info_bits) / block_size, ), (polar_encoder.rate(), )) self.assertFalse(polar_encoder.set_frame_size(10)) - def test_002_work_function(self): - block_size = 256 - num_info_bits = 128 - num_frozen_bits = block_size - num_info_bits - frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) - frozen_bit_values = np.array([0] * num_frozen_bits,) - python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) - + def test_002_work_function_packed(self): is_packed = True - polar_encoder = fec.polar_encoder.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + block_size = 256 + num_info_bits = block_size // 2 - data = np.ones(num_info_bits, dtype=int) + data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, 1, is_packed) src = blocks.vector_source_b(data, False) enc_block = extended_encoder(polar_encoder, None, '11') snk = blocks.vector_sink_b(1) self.tb.connect(src, enc_block, snk) self.tb.run() - print(self.tb.edge_list()) res = np.array(snk.data()).astype(dtype=int) - penc = python_encoder.encode(data) - - print(res) - print(penc) - self.assertTupleEqual(tuple(res), tuple(penc)) + self.assertTupleEqual(tuple(res), tuple(ref)) - def test_003_big_input(self): - is_packed = True - num_blocks = 30 + def test_003_work_function_unpacked(self): + is_packed = False block_size = 256 - num_info_bits = 128 - num_frozen_bits = block_size - num_info_bits - frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) - frozen_bit_values = np.array([0] * num_frozen_bits,) - python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) + num_info_bits = block_size // 2 - polar_encoder = fec.polar_encoder.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, 1, is_packed) + src = blocks.vector_source_b(data, False) + enc_block = extended_encoder(polar_encoder, None, '11') + snk = blocks.vector_sink_b(1) - data = np.array([], dtype=int) - ref = np.array([], dtype=int) + self.tb.connect(src, enc_block, snk) + self.tb.run() - for i in range(num_blocks): - d = np.random.randint(2, size=num_info_bits) - data = np.append(data, d) - ref = np.append(ref, python_encoder.encode(d)) + res = np.array(snk.data()).astype(dtype=int) + self.assertTupleEqual(tuple(res), tuple(ref)) + def test_004_big_input(self): + is_packed = False + num_blocks = 30 + block_size = 1024 + num_info_bits = block_size // 8 + data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, num_blocks, is_packed) src = blocks.vector_source_b(data, False) enc_block = extended_encoder(polar_encoder, None, '11') snk = blocks.vector_sink_b(1) @@ -111,6 +106,22 @@ class test_polar_encoder(gr_unittest.TestCase): print(ref) self.assertTupleEqual(tuple(res), tuple(ref)) + def get_test_data(self, block_size, num_info_bits, num_blocks, is_packed): + num_frozen_bits = block_size - num_info_bits + frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) + frozen_bit_values = np.array([0] * num_frozen_bits,) + python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) + + data = np.array([], dtype=int) + ref = np.array([], dtype=int) + for i in range(num_blocks): + d = np.random.randint(2, size=num_info_bits) + data = np.append(data, d) + ref = np.append(ref, python_encoder.encode(d)) + polar_encoder = fec.polar_encoder.make(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values, is_packed) + return data, ref, polar_encoder + + |