diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2015-12-07 16:15:31 -0800 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2015-12-07 16:15:31 -0800 |
commit | 422271f1e025bd1835e4a7ee68e2a60e23167b45 (patch) | |
tree | f49812df4a01de3d40af40e80ee9b6d62db6064c /gr-fec/python | |
parent | a96c1dd97ecb6ca9a40522d297df27bbdeca5d7f (diff) | |
parent | 9ac7203fffb37fd456b82b737a71d4c7a1607b06 (diff) |
Merge remote-tracking branch 'jdemel/polar-systematic'
Diffstat (limited to 'gr-fec/python')
-rw-r--r-- | gr-fec/python/fec/polar/common.py | 15 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/decoder.py | 75 | ||||
-rw-r--r-- | gr-fec/python/fec/polar/encoder.py | 40 | ||||
-rwxr-xr-x | gr-fec/python/fec/polar/testbed.py | 49 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_decoder_sc_systematic.py | 117 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_polar_encoder_systematic.py | 107 |
6 files changed, 337 insertions, 66 deletions
diff --git a/gr-fec/python/fec/polar/common.py b/gr-fec/python/fec/polar/common.py index b4b152de61..fa5987b6d2 100644 --- a/gr-fec/python/fec/polar/common.py +++ b/gr-fec/python/fec/polar/common.py @@ -65,5 +65,20 @@ class PolarCommon: def _vector_bit_reversed(self, vec, n): return bit_reverse_vector(vec, n) + def _encode_efficient(self, vec): + n_stages = self.power + pos = np.arange(self.N, dtype=int) + for i in range(n_stages): + splitted = np.reshape(pos, (2 ** (i + 1), -1)) + upper_branch = splitted[0::2].flatten() + lower_branch = splitted[1::2].flatten() + vec[upper_branch] = (vec[upper_branch] + vec[lower_branch]) % 2 + return vec + + def _encode_natural_order(self, vec): + # use this function. It reflects the encoding process implemented in VOLK. + vec = vec[self.bit_reverse_positions] + return self._encode_efficient(vec) + def info_print(self): print "POLAR code ({0}, {1})".format(self.N, self.K) diff --git a/gr-fec/python/fec/polar/decoder.py b/gr-fec/python/fec/polar/decoder.py index ae62943275..8748d284f7 100644 --- a/gr-fec/python/fec/polar/decoder.py +++ b/gr-fec/python/fec/polar/decoder.py @@ -202,6 +202,33 @@ class PolarDecoder(PolarCommon): data = np.packbits(data) return data + def _extract_info_bits_reversed(self, y): + info_bit_positions_reversed = self._vector_bit_reversed(self.info_bit_position, self.power) + return y[info_bit_positions_reversed] + + def decode_systematic(self, data): + if not len(data) == self.N: + raise ValueError("len(data)={0} is not equal to n={1}!".format(len(data), self.N)) + # data = self._reverse_bits(data) + data = self._lr_sc_decoder_efficient(data) + data = self._encode_natural_order(data) + data = self._extract_info_bits_reversed(data) + return data + + +def test_systematic_decoder(): + ntests = 1000 + n = 16 + k = 8 + frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) + encoder = PolarEncoder(n, k, frozenbitposition) + decoder = PolarDecoder(n, k, frozenbitposition) + for i in range(ntests): + bits = np.random.randint(2, size=k) + y = encoder.encode_systematic(bits) + u_hat = decoder.decode_systematic(y) + assert (bits == u_hat).all() + def test_reverse_enc_dec(): n = 16 @@ -240,29 +267,31 @@ def compare_decoder_impls(): def main(): - power = 3 - n = 2 ** power - k = 4 - frozenbits = np.zeros(n - k, dtype=int) - frozenbitposition = np.array((0, 1, 2, 4), dtype=int) - frozenbitposition4 = np.array((0, 1), dtype=int) - - - encoder = PolarEncoder(n, k, frozenbitposition, frozenbits) - decoder = PolarDecoder(n, k, frozenbitposition, frozenbits) - - bits = np.ones(k, dtype=int) - print "bits: ", bits - evec = encoder.encode(bits) - print "froz: ", encoder._insert_frozen_bits(bits) - print "evec: ", evec - - evec[1] = 0 - deced = decoder._lr_sc_decoder(evec) - print 'SC decoded:', deced - - test_reverse_enc_dec() - compare_decoder_impls() + # power = 3 + # n = 2 ** power + # k = 4 + # frozenbits = np.zeros(n - k, dtype=int) + # frozenbitposition = np.array((0, 1, 2, 4), dtype=int) + # frozenbitposition4 = np.array((0, 1), dtype=int) + # + # + # encoder = PolarEncoder(n, k, frozenbitposition, frozenbits) + # decoder = PolarDecoder(n, k, frozenbitposition, frozenbits) + # + # bits = np.ones(k, dtype=int) + # print "bits: ", bits + # evec = encoder.encode(bits) + # print "froz: ", encoder._insert_frozen_bits(bits) + # print "evec: ", evec + # + # evec[1] = 0 + # deced = decoder._lr_sc_decoder(evec) + # print 'SC decoded:', deced + # + # test_reverse_enc_dec() + # compare_decoder_impls() + + test_systematic_decoder() if __name__ == '__main__': diff --git a/gr-fec/python/fec/polar/encoder.py b/gr-fec/python/fec/polar/encoder.py index 3b5eea2a94..cc8fda2d1b 100644 --- a/gr-fec/python/fec/polar/encoder.py +++ b/gr-fec/python/fec/polar/encoder.py @@ -41,16 +41,6 @@ class PolarEncoder(PolarCommon): data = data.astype(dtype=int) return data - def _encode_efficient(self, vec): - n_stages = int(np.log2(self.N)) - pos = np.arange(self.N, dtype=int) - for i in range(n_stages): - splitted = np.reshape(pos, (2 ** (i + 1), -1)) - upper_branch = splitted[0::2].flatten() - lower_branch = splitted[1::2].flatten() - vec[upper_branch] = (vec[upper_branch] + vec[lower_branch]) % 2 - return vec - def encode(self, data, is_packed=False): if not len(data) == self.K: raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K)) @@ -64,6 +54,31 @@ class PolarEncoder(PolarCommon): data = np.packbits(data) return data + def encode_systematic(self, data): + if not len(data) == self.K: + raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K)) + if np.max(data) > 1 or np.min(data) < 0: + raise ValueError("can only encode bits!") + + d = self._insert_frozen_bits(data) + d = self._encode_natural_order(d) + d = self._reverse_bits(d) + d[self.frozen_bit_position] = 0 + d = self._encode_natural_order(d) + # d = self._reverse_bits(d) # for more accuracy, do another bit-reversal. or don't for computational simplicity. + return d + + +def test_systematic_encoder(encoder, ntests, k): + for n in range(ntests): + bits = np.random.randint(2, size=k) + x = encoder.encode_systematic(bits) + x = encoder._reverse_bits(x) + u_hat = encoder._extract_info_bits(x) + + assert (bits == u_hat).all() + # print((bits == u_hat).all()) + def compare_results(encoder, ntests, k): for n in range(ntests): @@ -95,15 +110,16 @@ def test_encoder_impls(): ntests = 1000 n = 16 k = 8 - frozenbits = np.zeros(n - k) + # frozenbits = np.zeros(n - k) # frozenbitposition8 = np.array((0, 1, 2, 4), dtype=int) # keep it! frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int) - encoder = PolarEncoder(n, k, frozenbitposition, frozenbits) + encoder = PolarEncoder(n, k, frozenbitposition) #, frozenbits) print 'result:', compare_results(encoder, ntests, k) print('Test rate-1 encoder/decoder chain results') r1_test = test_pseudo_rate_1_encoder(encoder, ntests, k) print 'Test rate-1 encoder/decoder:', r1_test + test_systematic_encoder(encoder, ntests, k) def main(): diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py index d60c83e776..3f8e814e4f 100755 --- a/gr-fec/python/fec/polar/testbed.py +++ b/gr-fec/python/fec/polar/testbed.py @@ -305,17 +305,19 @@ def find_decoder_subframes(frozen_mask): print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t)) -def load_file(filename): - z_params = [] - with open(filename, 'r') as f: - for line in f: - if 'Bhattacharyya:' in line: - l = line.split(' ') - l = l[10:-2] - l = l[0][:-1] - l = float(l) - z_params.append(l) - return np.array(z_params) +def systematic_encoder_decoder_chain_test(): + print('systematic encoder decoder chain test') + block_size = int(2 ** 8) + info_bit_size = block_size // 2 + ntests = 100 + frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, block_size), block_size - info_bit_size) + encoder = PolarEncoder(block_size, info_bit_size, frozenbitposition) + decoder = PolarDecoder(block_size, info_bit_size, frozenbitposition) + for i in range(ntests): + bits = np.random.randint(2, size=info_bit_size) + y = encoder.encode_systematic(bits) + u_hat = decoder.decode_systematic(y) + assert (bits == u_hat).all() def main(): @@ -334,27 +336,12 @@ def main(): # test_1024_rate_1_code() # channel_analysis() - frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11) - frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m) - find_decoder_subframes(frozen_mask) - - frozen_mask = np.zeros(m, dtype=int) - frozen_mask[frozen_indices] = 1 + # frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11) + # frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m) + # find_decoder_subframes(frozen_mask) - # filename = 'channel_z-parameters.txt' - # ido = load_file(filename) - # ido_frozen = cc.get_frozen_bit_indices_from_z_parameters(ido, k) - # ido_mask = np.zeros(m, dtype=int) - # ido_mask[ido_frozen] = 1 - # - # - # plt.plot(ido_mask) - # plt.plot(frozen_mask) - # for i in range(m): - # if not ido_mask[i] == frozen_mask[i]: - # plt.axvline(i, color='r') - # plt.show() + systematic_encoder_decoder_chain_test() if __name__ == '__main__': - main()
\ No newline at end of file + main() diff --git a/gr-fec/python/fec/qa_polar_decoder_sc_systematic.py b/gr-fec/python/fec/qa_polar_decoder_sc_systematic.py new file mode 100644 index 0000000000..fb2381e069 --- /dev/null +++ b/gr-fec/python/fec/qa_polar_decoder_sc_systematic.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Copyright 2015 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import fec_swig as fec + +import numpy as np +from extended_decoder import extended_decoder +from polar.encoder import PolarEncoder +import polar.channel_construction as cc + +# import os +# print('PID:', os.getpid()) +# raw_input('tell me smth') + + +class test_polar_decoder_sc_systematic(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001_setup(self): + block_size = 16 + num_info_bits = 8 + frozen_bit_positions = np.arange(block_size - num_info_bits) + + polar_decoder = fec.polar_decoder_sc_systematic.make(block_size, num_info_bits, frozen_bit_positions) + + self.assertEqual(num_info_bits, polar_decoder.get_output_size()) + self.assertEqual(block_size, polar_decoder.get_input_size()) + self.assertFloatTuplesAlmostEqual((float(num_info_bits) / block_size, ), (polar_decoder.rate(), )) + self.assertFalse(polar_decoder.set_frame_size(10)) + + def test_002_one_vector(self): + block_power = 4 + block_size = 2 ** block_power + num_info_bits = block_size // 2 + frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) + + bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, 1, False) + + polar_decoder = fec.polar_decoder_sc_systematic.make(block_size, num_info_bits, frozen_bit_positions) + src = blocks.vector_source_f(gr_data, False) + dec_block = extended_decoder(polar_decoder, None) + snk = blocks.vector_sink_b(1) + + self.tb.connect(src, dec_block) + self.tb.connect(dec_block, snk) + self.tb.run() + + res = np.array(snk.data()).astype(dtype=int) + self.assertTupleEqual(tuple(res), tuple(bits)) + + def test_003_stream(self): + nframes = 3 + block_power = 8 + block_size = 2 ** block_power + num_info_bits = block_size // 2 + frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) + + bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, nframes, False) + + polar_decoder = fec.polar_decoder_sc_systematic.make(block_size, num_info_bits, frozen_bit_positions) + src = blocks.vector_source_f(gr_data, False) + dec_block = extended_decoder(polar_decoder, None) + snk = blocks.vector_sink_b(1) + + self.tb.connect(src, dec_block) + self.tb.connect(dec_block, snk) + self.tb.run() + + res = np.array(snk.data()).astype(dtype=int) + self.assertTupleEqual(tuple(res), tuple(bits)) + + def generate_test_data(self, block_size, num_info_bits, frozen_bit_positions, nframes, onlyones): + frozen_bit_values = np.zeros(block_size - num_info_bits, dtype=int) + encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) + bits = np.array([], dtype=int) + data = np.array([], dtype=int) + for n in range(nframes): + if onlyones: + b = np.ones(num_info_bits, dtype=int) + else: + b = np.random.randint(2, size=num_info_bits) + d = encoder.encode_systematic(b) + bits = np.append(bits, b) + data = np.append(data, d) + gr_data = 2.0 * data - 1.0 + return bits, gr_data + + +if __name__ == '__main__': + gr_unittest.run(test_polar_decoder_sc_systematic) + + diff --git a/gr-fec/python/fec/qa_polar_encoder_systematic.py b/gr-fec/python/fec/qa_polar_encoder_systematic.py new file mode 100644 index 0000000000..015a31b3cb --- /dev/null +++ b/gr-fec/python/fec/qa_polar_encoder_systematic.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# +# Copyright 2015 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest, blocks +import fec_swig as fec +import numpy as np + +from extended_encoder import extended_encoder +from polar.encoder import PolarEncoder +import polar.channel_construction as cc + +# import os +# print('PID:', os.getpid()) +# raw_input('tell me smth') + + +class test_polar_encoder_systematic(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_001_setup(self): + block_size = 16 + num_info_bits = 8 + frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) + + polar_encoder = fec.polar_encoder_systematic.make(block_size, num_info_bits, frozen_bit_positions) + + self.assertEqual(block_size, polar_encoder.get_output_size()) + self.assertEqual(num_info_bits, polar_encoder.get_input_size()) + self.assertFloatTuplesAlmostEqual((float(num_info_bits) / block_size, ), (polar_encoder.rate(), )) + self.assertFalse(polar_encoder.set_frame_size(10)) + + def test_002_work_function_packed(self): + block_size = 256 + num_info_bits = block_size // 2 + + data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, 1) + src = blocks.vector_source_b(data, False) + enc_block = extended_encoder(polar_encoder, None, '11') + snk = blocks.vector_sink_b(1) + + self.tb.connect(src, enc_block, snk) + self.tb.run() + + res = np.array(snk.data()).astype(dtype=int) + self.assertTupleEqual(tuple(res), tuple(ref)) + + def test_004_big_input(self): + num_blocks = 30 + block_size = 1024 + num_info_bits = block_size // 8 + + data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, num_blocks) + src = blocks.vector_source_b(data, False) + enc_block = extended_encoder(polar_encoder, None, '11') + snk = blocks.vector_sink_b(1) + + self.tb.connect(src, enc_block, snk) + self.tb.run() + + res = np.array(snk.data()).astype(dtype=int) + self.assertTupleEqual(tuple(res), tuple(ref)) + + def get_test_data(self, block_size, num_info_bits, num_blocks): + # helper function to set up test data and together with encoder object. + num_frozen_bits = block_size - num_info_bits + frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0) + frozen_bit_values = np.array([0] * num_frozen_bits,) + python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values) + + data = np.array([], dtype=int) + ref = np.array([], dtype=int) + for i in range(num_blocks): + d = np.random.randint(2, size=num_info_bits) + data = np.append(data, d) + ref = np.append(ref, python_encoder.encode_systematic(d)) + polar_encoder = fec.polar_encoder_systematic.make(block_size, num_info_bits, frozen_bit_positions) + return data, ref, polar_encoder + + +if __name__ == '__main__': + gr_unittest.run(test_polar_encoder_systematic) + + |