summaryrefslogtreecommitdiff
path: root/gr-fec/python
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2015-12-07 16:15:31 -0800
committerJohnathan Corgan <johnathan@corganlabs.com>2015-12-07 16:15:31 -0800
commit422271f1e025bd1835e4a7ee68e2a60e23167b45 (patch)
treef49812df4a01de3d40af40e80ee9b6d62db6064c /gr-fec/python
parenta96c1dd97ecb6ca9a40522d297df27bbdeca5d7f (diff)
parent9ac7203fffb37fd456b82b737a71d4c7a1607b06 (diff)
Merge remote-tracking branch 'jdemel/polar-systematic'
Diffstat (limited to 'gr-fec/python')
-rw-r--r--gr-fec/python/fec/polar/common.py15
-rw-r--r--gr-fec/python/fec/polar/decoder.py75
-rw-r--r--gr-fec/python/fec/polar/encoder.py40
-rwxr-xr-xgr-fec/python/fec/polar/testbed.py49
-rw-r--r--gr-fec/python/fec/qa_polar_decoder_sc_systematic.py117
-rw-r--r--gr-fec/python/fec/qa_polar_encoder_systematic.py107
6 files changed, 337 insertions, 66 deletions
diff --git a/gr-fec/python/fec/polar/common.py b/gr-fec/python/fec/polar/common.py
index b4b152de61..fa5987b6d2 100644
--- a/gr-fec/python/fec/polar/common.py
+++ b/gr-fec/python/fec/polar/common.py
@@ -65,5 +65,20 @@ class PolarCommon:
def _vector_bit_reversed(self, vec, n):
return bit_reverse_vector(vec, n)
+ def _encode_efficient(self, vec):
+ n_stages = self.power
+ pos = np.arange(self.N, dtype=int)
+ for i in range(n_stages):
+ splitted = np.reshape(pos, (2 ** (i + 1), -1))
+ upper_branch = splitted[0::2].flatten()
+ lower_branch = splitted[1::2].flatten()
+ vec[upper_branch] = (vec[upper_branch] + vec[lower_branch]) % 2
+ return vec
+
+ def _encode_natural_order(self, vec):
+ # use this function. It reflects the encoding process implemented in VOLK.
+ vec = vec[self.bit_reverse_positions]
+ return self._encode_efficient(vec)
+
def info_print(self):
print "POLAR code ({0}, {1})".format(self.N, self.K)
diff --git a/gr-fec/python/fec/polar/decoder.py b/gr-fec/python/fec/polar/decoder.py
index ae62943275..8748d284f7 100644
--- a/gr-fec/python/fec/polar/decoder.py
+++ b/gr-fec/python/fec/polar/decoder.py
@@ -202,6 +202,33 @@ class PolarDecoder(PolarCommon):
data = np.packbits(data)
return data
+ def _extract_info_bits_reversed(self, y):
+ info_bit_positions_reversed = self._vector_bit_reversed(self.info_bit_position, self.power)
+ return y[info_bit_positions_reversed]
+
+ def decode_systematic(self, data):
+ if not len(data) == self.N:
+ raise ValueError("len(data)={0} is not equal to n={1}!".format(len(data), self.N))
+ # data = self._reverse_bits(data)
+ data = self._lr_sc_decoder_efficient(data)
+ data = self._encode_natural_order(data)
+ data = self._extract_info_bits_reversed(data)
+ return data
+
+
+def test_systematic_decoder():
+ ntests = 1000
+ n = 16
+ k = 8
+ frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
+ encoder = PolarEncoder(n, k, frozenbitposition)
+ decoder = PolarDecoder(n, k, frozenbitposition)
+ for i in range(ntests):
+ bits = np.random.randint(2, size=k)
+ y = encoder.encode_systematic(bits)
+ u_hat = decoder.decode_systematic(y)
+ assert (bits == u_hat).all()
+
def test_reverse_enc_dec():
n = 16
@@ -240,29 +267,31 @@ def compare_decoder_impls():
def main():
- power = 3
- n = 2 ** power
- k = 4
- frozenbits = np.zeros(n - k, dtype=int)
- frozenbitposition = np.array((0, 1, 2, 4), dtype=int)
- frozenbitposition4 = np.array((0, 1), dtype=int)
-
-
- encoder = PolarEncoder(n, k, frozenbitposition, frozenbits)
- decoder = PolarDecoder(n, k, frozenbitposition, frozenbits)
-
- bits = np.ones(k, dtype=int)
- print "bits: ", bits
- evec = encoder.encode(bits)
- print "froz: ", encoder._insert_frozen_bits(bits)
- print "evec: ", evec
-
- evec[1] = 0
- deced = decoder._lr_sc_decoder(evec)
- print 'SC decoded:', deced
-
- test_reverse_enc_dec()
- compare_decoder_impls()
+ # power = 3
+ # n = 2 ** power
+ # k = 4
+ # frozenbits = np.zeros(n - k, dtype=int)
+ # frozenbitposition = np.array((0, 1, 2, 4), dtype=int)
+ # frozenbitposition4 = np.array((0, 1), dtype=int)
+ #
+ #
+ # encoder = PolarEncoder(n, k, frozenbitposition, frozenbits)
+ # decoder = PolarDecoder(n, k, frozenbitposition, frozenbits)
+ #
+ # bits = np.ones(k, dtype=int)
+ # print "bits: ", bits
+ # evec = encoder.encode(bits)
+ # print "froz: ", encoder._insert_frozen_bits(bits)
+ # print "evec: ", evec
+ #
+ # evec[1] = 0
+ # deced = decoder._lr_sc_decoder(evec)
+ # print 'SC decoded:', deced
+ #
+ # test_reverse_enc_dec()
+ # compare_decoder_impls()
+
+ test_systematic_decoder()
if __name__ == '__main__':
diff --git a/gr-fec/python/fec/polar/encoder.py b/gr-fec/python/fec/polar/encoder.py
index 3b5eea2a94..cc8fda2d1b 100644
--- a/gr-fec/python/fec/polar/encoder.py
+++ b/gr-fec/python/fec/polar/encoder.py
@@ -41,16 +41,6 @@ class PolarEncoder(PolarCommon):
data = data.astype(dtype=int)
return data
- def _encode_efficient(self, vec):
- n_stages = int(np.log2(self.N))
- pos = np.arange(self.N, dtype=int)
- for i in range(n_stages):
- splitted = np.reshape(pos, (2 ** (i + 1), -1))
- upper_branch = splitted[0::2].flatten()
- lower_branch = splitted[1::2].flatten()
- vec[upper_branch] = (vec[upper_branch] + vec[lower_branch]) % 2
- return vec
-
def encode(self, data, is_packed=False):
if not len(data) == self.K:
raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K))
@@ -64,6 +54,31 @@ class PolarEncoder(PolarCommon):
data = np.packbits(data)
return data
+ def encode_systematic(self, data):
+ if not len(data) == self.K:
+ raise ValueError("len(data)={0} is not equal to k={1}!".format(len(data), self.K))
+ if np.max(data) > 1 or np.min(data) < 0:
+ raise ValueError("can only encode bits!")
+
+ d = self._insert_frozen_bits(data)
+ d = self._encode_natural_order(d)
+ d = self._reverse_bits(d)
+ d[self.frozen_bit_position] = 0
+ d = self._encode_natural_order(d)
+ # d = self._reverse_bits(d) # for more accuracy, do another bit-reversal. or don't for computational simplicity.
+ return d
+
+
+def test_systematic_encoder(encoder, ntests, k):
+ for n in range(ntests):
+ bits = np.random.randint(2, size=k)
+ x = encoder.encode_systematic(bits)
+ x = encoder._reverse_bits(x)
+ u_hat = encoder._extract_info_bits(x)
+
+ assert (bits == u_hat).all()
+ # print((bits == u_hat).all())
+
def compare_results(encoder, ntests, k):
for n in range(ntests):
@@ -95,15 +110,16 @@ def test_encoder_impls():
ntests = 1000
n = 16
k = 8
- frozenbits = np.zeros(n - k)
+ # frozenbits = np.zeros(n - k)
# frozenbitposition8 = np.array((0, 1, 2, 4), dtype=int) # keep it!
frozenbitposition = np.array((0, 1, 2, 3, 4, 5, 8, 9), dtype=int)
- encoder = PolarEncoder(n, k, frozenbitposition, frozenbits)
+ encoder = PolarEncoder(n, k, frozenbitposition) #, frozenbits)
print 'result:', compare_results(encoder, ntests, k)
print('Test rate-1 encoder/decoder chain results')
r1_test = test_pseudo_rate_1_encoder(encoder, ntests, k)
print 'Test rate-1 encoder/decoder:', r1_test
+ test_systematic_encoder(encoder, ntests, k)
def main():
diff --git a/gr-fec/python/fec/polar/testbed.py b/gr-fec/python/fec/polar/testbed.py
index d60c83e776..3f8e814e4f 100755
--- a/gr-fec/python/fec/polar/testbed.py
+++ b/gr-fec/python/fec/polar/testbed.py
@@ -305,17 +305,19 @@ def find_decoder_subframes(frozen_mask):
print('{0:4} lock {1:4} value: {2} in sub {3}'.format(i, 2 ** (l + 1), v, t))
-def load_file(filename):
- z_params = []
- with open(filename, 'r') as f:
- for line in f:
- if 'Bhattacharyya:' in line:
- l = line.split(' ')
- l = l[10:-2]
- l = l[0][:-1]
- l = float(l)
- z_params.append(l)
- return np.array(z_params)
+def systematic_encoder_decoder_chain_test():
+ print('systematic encoder decoder chain test')
+ block_size = int(2 ** 8)
+ info_bit_size = block_size // 2
+ ntests = 100
+ frozenbitposition = cc.get_frozen_bit_indices_from_z_parameters(cc.bhattacharyya_bounds(0.0, block_size), block_size - info_bit_size)
+ encoder = PolarEncoder(block_size, info_bit_size, frozenbitposition)
+ decoder = PolarDecoder(block_size, info_bit_size, frozenbitposition)
+ for i in range(ntests):
+ bits = np.random.randint(2, size=info_bit_size)
+ y = encoder.encode_systematic(bits)
+ u_hat = decoder.decode_systematic(y)
+ assert (bits == u_hat).all()
def main():
@@ -334,27 +336,12 @@ def main():
# test_1024_rate_1_code()
# channel_analysis()
- frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11)
- frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m)
- find_decoder_subframes(frozen_mask)
-
- frozen_mask = np.zeros(m, dtype=int)
- frozen_mask[frozen_indices] = 1
+ # frozen_indices = cc.get_bec_frozen_indices(m, n_frozen, 0.11)
+ # frozen_mask = cc.get_frozen_bit_mask(frozen_indices, m)
+ # find_decoder_subframes(frozen_mask)
- # filename = 'channel_z-parameters.txt'
- # ido = load_file(filename)
- # ido_frozen = cc.get_frozen_bit_indices_from_z_parameters(ido, k)
- # ido_mask = np.zeros(m, dtype=int)
- # ido_mask[ido_frozen] = 1
- #
- #
- # plt.plot(ido_mask)
- # plt.plot(frozen_mask)
- # for i in range(m):
- # if not ido_mask[i] == frozen_mask[i]:
- # plt.axvline(i, color='r')
- # plt.show()
+ systematic_encoder_decoder_chain_test()
if __name__ == '__main__':
- main() \ No newline at end of file
+ main()
diff --git a/gr-fec/python/fec/qa_polar_decoder_sc_systematic.py b/gr-fec/python/fec/qa_polar_decoder_sc_systematic.py
new file mode 100644
index 0000000000..fb2381e069
--- /dev/null
+++ b/gr-fec/python/fec/qa_polar_decoder_sc_systematic.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest, blocks
+import fec_swig as fec
+
+import numpy as np
+from extended_decoder import extended_decoder
+from polar.encoder import PolarEncoder
+import polar.channel_construction as cc
+
+# import os
+# print('PID:', os.getpid())
+# raw_input('tell me smth')
+
+
+class test_polar_decoder_sc_systematic(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001_setup(self):
+ block_size = 16
+ num_info_bits = 8
+ frozen_bit_positions = np.arange(block_size - num_info_bits)
+
+ polar_decoder = fec.polar_decoder_sc_systematic.make(block_size, num_info_bits, frozen_bit_positions)
+
+ self.assertEqual(num_info_bits, polar_decoder.get_output_size())
+ self.assertEqual(block_size, polar_decoder.get_input_size())
+ self.assertFloatTuplesAlmostEqual((float(num_info_bits) / block_size, ), (polar_decoder.rate(), ))
+ self.assertFalse(polar_decoder.set_frame_size(10))
+
+ def test_002_one_vector(self):
+ block_power = 4
+ block_size = 2 ** block_power
+ num_info_bits = block_size // 2
+ frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
+
+ bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, 1, False)
+
+ polar_decoder = fec.polar_decoder_sc_systematic.make(block_size, num_info_bits, frozen_bit_positions)
+ src = blocks.vector_source_f(gr_data, False)
+ dec_block = extended_decoder(polar_decoder, None)
+ snk = blocks.vector_sink_b(1)
+
+ self.tb.connect(src, dec_block)
+ self.tb.connect(dec_block, snk)
+ self.tb.run()
+
+ res = np.array(snk.data()).astype(dtype=int)
+ self.assertTupleEqual(tuple(res), tuple(bits))
+
+ def test_003_stream(self):
+ nframes = 3
+ block_power = 8
+ block_size = 2 ** block_power
+ num_info_bits = block_size // 2
+ frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
+
+ bits, gr_data = self.generate_test_data(block_size, num_info_bits, frozen_bit_positions, nframes, False)
+
+ polar_decoder = fec.polar_decoder_sc_systematic.make(block_size, num_info_bits, frozen_bit_positions)
+ src = blocks.vector_source_f(gr_data, False)
+ dec_block = extended_decoder(polar_decoder, None)
+ snk = blocks.vector_sink_b(1)
+
+ self.tb.connect(src, dec_block)
+ self.tb.connect(dec_block, snk)
+ self.tb.run()
+
+ res = np.array(snk.data()).astype(dtype=int)
+ self.assertTupleEqual(tuple(res), tuple(bits))
+
+ def generate_test_data(self, block_size, num_info_bits, frozen_bit_positions, nframes, onlyones):
+ frozen_bit_values = np.zeros(block_size - num_info_bits, dtype=int)
+ encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
+ bits = np.array([], dtype=int)
+ data = np.array([], dtype=int)
+ for n in range(nframes):
+ if onlyones:
+ b = np.ones(num_info_bits, dtype=int)
+ else:
+ b = np.random.randint(2, size=num_info_bits)
+ d = encoder.encode_systematic(b)
+ bits = np.append(bits, b)
+ data = np.append(data, d)
+ gr_data = 2.0 * data - 1.0
+ return bits, gr_data
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_polar_decoder_sc_systematic)
+
+
diff --git a/gr-fec/python/fec/qa_polar_encoder_systematic.py b/gr-fec/python/fec/qa_polar_encoder_systematic.py
new file mode 100644
index 0000000000..015a31b3cb
--- /dev/null
+++ b/gr-fec/python/fec/qa_polar_encoder_systematic.py
@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest, blocks
+import fec_swig as fec
+import numpy as np
+
+from extended_encoder import extended_encoder
+from polar.encoder import PolarEncoder
+import polar.channel_construction as cc
+
+# import os
+# print('PID:', os.getpid())
+# raw_input('tell me smth')
+
+
+class test_polar_encoder_systematic(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_001_setup(self):
+ block_size = 16
+ num_info_bits = 8
+ frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
+
+ polar_encoder = fec.polar_encoder_systematic.make(block_size, num_info_bits, frozen_bit_positions)
+
+ self.assertEqual(block_size, polar_encoder.get_output_size())
+ self.assertEqual(num_info_bits, polar_encoder.get_input_size())
+ self.assertFloatTuplesAlmostEqual((float(num_info_bits) / block_size, ), (polar_encoder.rate(), ))
+ self.assertFalse(polar_encoder.set_frame_size(10))
+
+ def test_002_work_function_packed(self):
+ block_size = 256
+ num_info_bits = block_size // 2
+
+ data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, 1)
+ src = blocks.vector_source_b(data, False)
+ enc_block = extended_encoder(polar_encoder, None, '11')
+ snk = blocks.vector_sink_b(1)
+
+ self.tb.connect(src, enc_block, snk)
+ self.tb.run()
+
+ res = np.array(snk.data()).astype(dtype=int)
+ self.assertTupleEqual(tuple(res), tuple(ref))
+
+ def test_004_big_input(self):
+ num_blocks = 30
+ block_size = 1024
+ num_info_bits = block_size // 8
+
+ data, ref, polar_encoder = self.get_test_data(block_size, num_info_bits, num_blocks)
+ src = blocks.vector_source_b(data, False)
+ enc_block = extended_encoder(polar_encoder, None, '11')
+ snk = blocks.vector_sink_b(1)
+
+ self.tb.connect(src, enc_block, snk)
+ self.tb.run()
+
+ res = np.array(snk.data()).astype(dtype=int)
+ self.assertTupleEqual(tuple(res), tuple(ref))
+
+ def get_test_data(self, block_size, num_info_bits, num_blocks):
+ # helper function to set up test data and together with encoder object.
+ num_frozen_bits = block_size - num_info_bits
+ frozen_bit_positions = cc.frozen_bit_positions(block_size, num_info_bits, 0.0)
+ frozen_bit_values = np.array([0] * num_frozen_bits,)
+ python_encoder = PolarEncoder(block_size, num_info_bits, frozen_bit_positions, frozen_bit_values)
+
+ data = np.array([], dtype=int)
+ ref = np.array([], dtype=int)
+ for i in range(num_blocks):
+ d = np.random.randint(2, size=num_info_bits)
+ data = np.append(data, d)
+ ref = np.append(ref, python_encoder.encode_systematic(d))
+ polar_encoder = fec.polar_encoder_systematic.make(block_size, num_info_bits, frozen_bit_positions)
+ return data, ref, polar_encoder
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_polar_encoder_systematic)
+
+