summaryrefslogtreecommitdiff
path: root/gr-fec/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-fec/python')
-rw-r--r--gr-fec/python/fec/CMakeLists.txt12
-rw-r--r--gr-fec/python/fec/__init__.py17
-rwxr-xr-xgr-fec/python/fec/_qa_helper.py90
-rw-r--r--gr-fec/python/fec/bercurve_generator.py95
-rw-r--r--gr-fec/python/fec/bitflip.py80
-rw-r--r--gr-fec/python/fec/capillary_threaded_decoder.py100
-rw-r--r--gr-fec/python/fec/capillary_threaded_encoder.py102
-rw-r--r--gr-fec/python/fec/extended_async_encoder.py67
-rw-r--r--gr-fec/python/fec/extended_decoder.py176
-rw-r--r--gr-fec/python/fec/extended_encoder.py74
-rw-r--r--gr-fec/python/fec/extended_tagged_decoder.py175
-rw-r--r--gr-fec/python/fec/extended_tagged_encoder.py82
-rw-r--r--gr-fec/python/fec/fec_test.py123
-rw-r--r--gr-fec/python/fec/qa_depuncture.py192
-rw-r--r--gr-fec/python/fec/qa_fecapi_cc.py195
-rw-r--r--gr-fec/python/fec/qa_fecapi_dummy.py190
-rw-r--r--gr-fec/python/fec/qa_fecapi_repetition.py161
-rw-r--r--gr-fec/python/fec/qa_puncture.py244
-rw-r--r--gr-fec/python/fec/threaded_decoder.py60
-rw-r--r--gr-fec/python/fec/threaded_encoder.py59
20 files changed, 2293 insertions, 1 deletions
diff --git a/gr-fec/python/fec/CMakeLists.txt b/gr-fec/python/fec/CMakeLists.txt
index a7eefaa0c1..032816866d 100644
--- a/gr-fec/python/fec/CMakeLists.txt
+++ b/gr-fec/python/fec/CMakeLists.txt
@@ -23,6 +23,18 @@ include(GrPython)
GR_PYTHON_INSTALL(
FILES
__init__.py
+ bitflip.py
+ extended_encoder.py
+ extended_decoder.py
+ capillary_threaded_decoder.py
+ capillary_threaded_encoder.py
+ threaded_decoder.py
+ threaded_encoder.py
+ extended_async_encoder.py
+ extended_tagged_encoder.py
+ extended_tagged_decoder.py
+ fec_test.py
+ bercurve_generator.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/fec
COMPONENT "fec_python"
)
diff --git a/gr-fec/python/fec/__init__.py b/gr-fec/python/fec/__init__.py
index bfec694739..6c82232d4f 100644
--- a/gr-fec/python/fec/__init__.py
+++ b/gr-fec/python/fec/__init__.py
@@ -1,5 +1,5 @@
#
-# Copyright 2012 Free Software Foundation, Inc.
+# Copyright 2012,2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -30,3 +30,18 @@ except ImportError:
dirname, filename = os.path.split(os.path.abspath(__file__))
__path__.append(os.path.join(dirname, "..", "..", "swig"))
from fec_swig import *
+
+from bitflip import *
+from extended_encoder import extended_encoder
+from extended_decoder import extended_decoder
+from threaded_encoder import threaded_encoder
+from threaded_decoder import threaded_decoder
+from capillary_threaded_decoder import capillary_threaded_decoder
+from capillary_threaded_encoder import capillary_threaded_encoder
+from extended_async_encoder import extended_async_encoder
+from extended_tagged_encoder import extended_tagged_encoder
+from extended_tagged_decoder import extended_tagged_decoder
+
+
+from fec_test import fec_test
+from bercurve_generator import bercurve_generator
diff --git a/gr-fec/python/fec/_qa_helper.py b/gr-fec/python/fec/_qa_helper.py
new file mode 100755
index 0000000000..8722453441
--- /dev/null
+++ b/gr-fec/python/fec/_qa_helper.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import blocks
+from gnuradio import gr
+import sys, numpy
+
+from extended_encoder import extended_encoder
+from extended_decoder import extended_decoder
+
+class map_bb(gr.sync_block):
+ def __init__(self, bitmap):
+ gr.sync_block.__init__(
+ self,
+ name = "map_bb",
+ in_sig = [numpy.int8],
+ out_sig = [numpy.int8])
+ self.bitmap = bitmap
+
+ def work(self, input_items, output_items):
+ output_items[0][:] = map(lambda x: self.bitmap[x], input_items[0])
+ return len(output_items[0])
+
+
+class _qa_helper(gr.top_block):
+
+ def __init__(self, data_size, enc, dec, threading):
+ gr.top_block.__init__(self, "_qa_helper")
+
+ self.puncpat = puncpat = '11'
+
+ self.enc = enc
+ self.dec = dec
+ self.data_size = data_size
+ self.threading = threading
+
+ self.ext_encoder = extended_encoder(enc, threading=self.threading, puncpat=self.puncpat)
+ self.ext_decoder= extended_decoder(dec, threading=self.threading, ann=None,
+ puncpat=self.puncpat, integration_period=10000)
+
+ self.src = blocks.vector_source_b(data_size*[0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False)
+ self.unpack = blocks.unpack_k_bits_bb(8)
+ self.map = map_bb([-1, 1])
+ self.to_float = blocks.char_to_float(1)
+ self.snk_input = blocks.vector_sink_b()
+ self.snk_output = blocks.vector_sink_b()
+
+ self.connect(self.src, self.unpack, self.ext_encoder)
+ self.connect(self.ext_encoder, self.map, self.to_float)
+ self.connect(self.to_float, self.ext_decoder)
+ self.connect(self.unpack, self.snk_input)
+ self.connect(self.ext_decoder, self.snk_output)
+
+if __name__ == '__main__':
+ frame_size = 30
+ enc = fec.dummy_encoder_make(frame_size*8)
+ #enc = fec.repetition_encoder_make(frame_size*8, 3)
+ dec = fec.dummy_decoder.make(frame_size*8)
+
+ tb = _qa_helper(10*frame_size, enc, dec, None)
+ tb.run()
+
+ errs = 0
+ for i,o in zip(tb.snk_input.data(), tb.snk_output.data()):
+ if i-o != 0:
+ errs += 1
+
+ if errs == 0:
+ print "Decoded properly"
+ else:
+ print "Problem Decoding"
diff --git a/gr-fec/python/fec/bercurve_generator.py b/gr-fec/python/fec/bercurve_generator.py
new file mode 100644
index 0000000000..e67d1e17c2
--- /dev/null
+++ b/gr-fec/python/fec/bercurve_generator.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import numpy
+
+from fec_test import fec_test
+
+class bercurve_generator(gr.hier_block2):
+
+ def __init__(self, encoder_list, decoder_list, esno=numpy.arange(0.0, 3.0, .25),
+ samp_rate=3200000, threading='capillary', puncpat='11', seed=0):
+ gr.hier_block2.__init__(
+ self, "ber_curve_generator",
+ gr.io_signature(0, 0, 0),
+ gr.io_signature(len(esno) * 2, len(esno) * 2, gr.sizeof_char*1))
+
+ self.esno = esno
+ self.samp_rate = samp_rate
+ self.encoder_list = encoder_list
+ self.decoder_list = decoder_list
+ self.puncpat = puncpat
+
+ self.random_gen_b_0 = blocks.vector_source_b(map(int, numpy.random.randint(0, 256, 100000)), True)
+ self.deinterleave = blocks.deinterleave(gr.sizeof_char*1)
+ self.connect(self.random_gen_b_0, self.deinterleave)
+ self.ber_generators = []
+ for i in range(0, len(esno)):
+ ber_generator_temp = fec_test(
+ generic_encoder=encoder_list[i],
+ generic_decoder=decoder_list[i],
+ esno=esno[i],
+ samp_rate=samp_rate,
+ threading=threading,
+ puncpat=puncpat,
+ seed=seed)
+ self.ber_generators.append(ber_generator_temp);
+
+ for i in range(0, len(esno)):
+ self.connect((self.deinterleave, i), (self.ber_generators[i]))
+ self.connect((self.ber_generators[i], 0), (self, i*2));
+ self.connect((self.ber_generators[i], 1), (self, i*2 + 1));
+
+ def get_esno(self):
+ return self.esno
+
+ def set_esno(self, esno):
+ self.esno = esno
+ self.ber_generator_0.set_esno(self.esno)
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+ self.ber_generator_0.set_samp_rate(self.samp_rate)
+
+ def get_encoder_list(self):
+ return self.encoder_list
+
+ def set_encoder_list(self, encoder_list):
+ self.encoder_list = encoder_list
+ self.ber_generator_0.set_generic_encoder(self.encoder_list)
+
+ def get_decoder_list(self):
+ return self.decoder_list
+
+ def set_decoder_list(self, decoder_list):
+ self.decoder_list = decoder_list
+ self.ber_generator_0.set_generic_decoder(self.decoder_list)
+
+ def get_puncpat(self):
+ return self.puncpat
+
+ def set_puncpat(self, puncpat):
+ self.puncpat = puncpat
diff --git a/gr-fec/python/fec/bitflip.py b/gr-fec/python/fec/bitflip.py
new file mode 100644
index 0000000000..235dc19a05
--- /dev/null
+++ b/gr-fec/python/fec/bitflip.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+def bitreverse(mint):
+ res = 0;
+ while mint != 0:
+ res = res << 1;
+ res += mint & 1;
+ mint = mint >> 1;
+ return res;
+
+const_lut = [2];
+specinvert_lut = [[0, 2, 1, 3]];
+
+def bitflip(mint, bitflip_lut, index, csize):
+ res = 0;
+ cnt = 0;
+ mask = (1 << const_lut[index]) - 1;
+ while (cnt < csize):
+ res += (bitflip_lut[(mint >> cnt) & (mask)]) << cnt;
+ cnt += const_lut[index];
+ return res;
+
+
+def read_bitlist(bitlist):
+ res = 0;
+ for i in range(len(bitlist)):
+ if int(bitlist[i]) == 1:
+ res += 1 << (len(bitlist) - i - 1);
+ return res;
+
+
+def read_big_bitlist(bitlist):
+ ret = []
+ for j in range(0, len(bitlist)/64):
+ res = 0;
+ for i in range(0, 64):
+ if int(bitlist[j*64+i]) == 1:
+ res += 1 << (64 - i - 1);
+ ret.append(res);
+ res = 0;
+ j = 0;
+ for i in range(len(bitlist)%64):
+ if int(bitlist[len(ret)*64+i]) == 1:
+ res += 1 << (64 - j - 1);
+ j += 1;
+ ret.append(res);
+ return ret;
+
+def generate_symmetries(symlist):
+ retlist = []
+ if len(symlist) == 1:
+ for i in range(len(symlist[0])):
+ retlist.append(symlist[0][i:] + symlist[0][0:i]);
+ invlist = symlist[0];
+ for i in range(1, len(symlist[0])/2):
+ invlist[i] = symlist[0][i + len(symlist[0])/2];
+ invlist[i + len(symlist[0])/2] = symlist[0][i];
+ for i in range(len(symlist[0])):
+ retlist.append(symlist[0][i:] + symlist[0][0:i]);
+ return retlist;
diff --git a/gr-fec/python/fec/capillary_threaded_decoder.py b/gr-fec/python/fec/capillary_threaded_decoder.py
new file mode 100644
index 0000000000..9a00cde192
--- /dev/null
+++ b/gr-fec/python/fec/capillary_threaded_decoder.py
@@ -0,0 +1,100 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import fec_swig as fec
+import math
+
+class capillary_threaded_decoder(gr.hier_block2):
+ def __init__(self, decoder_list_0, input_size, output_size):
+ gr.hier_block2.__init__(
+ self, "Capillary Threaded Decoder",
+ gr.io_signature(1, 1, input_size*1),
+ gr.io_signature(1, 1, output_size*1))
+
+ self.decoder_list_0 = decoder_list_0
+
+ check = math.log10(len(self.decoder_list_0)) / math.log10(2.0)
+ if(abs(check - int(check)) > 0):
+ gr.log.info("fec.capillary_threaded_decoder: number of decoders must be a power of 2.")
+ raise AttributeError
+
+ self.deinterleaves_0 = []
+ for i in range(int(math.log(len(decoder_list_0), 2))):
+ for j in range(int(math.pow(2, i))):
+ self.deinterleaves_0.append(blocks.deinterleave(input_size,
+ fec.get_decoder_input_size(decoder_list_0[0])))
+
+ self.generic_decoders_0 = []
+ for i in range(len(decoder_list_0)):
+ self.generic_decoders_0.append(fec.decoder(decoder_list_0[i], input_size, output_size))
+
+ self.interleaves_0 = []
+ for i in range(int(math.log(len(decoder_list_0), 2))):
+ for j in range(int(math.pow(2, i))):
+ self.interleaves_0.append(blocks.interleave(output_size,
+ fec.get_decoder_output_size(decoder_list_0[0])))
+
+ rootcount = 0
+ branchcount = 1
+ for i in range(int(math.log(len(decoder_list_0), 2)) - 1):
+ for j in range(int(math.pow(2, i))):
+ self.connect((self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0))
+ self.connect((self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0))
+ rootcount += 1
+ branchcount += 2
+
+ codercount = 0
+ for i in range(len(decoder_list_0)/2):
+ self.connect((self.deinterleaves_0[rootcount], 0), (self.generic_decoders_0[codercount], 0))
+ self.connect((self.deinterleaves_0[rootcount], 1), (self.generic_decoders_0[codercount + 1], 0))
+ rootcount += 1
+ codercount += 2
+
+ rootcount = 0
+ branchcount = 1
+ for i in range(int(math.log(len(decoder_list_0), 2)) - 1):
+ for j in range(int(math.pow(2, i))):
+ self.connect((self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect((self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1))
+ rootcount += 1
+ branchcount += 2
+
+ codercount = 0
+ for i in range(len(decoder_list_0)/2):
+ self.connect((self.generic_decoders_0[codercount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect((self.generic_decoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1))
+ rootcount += 1
+ codercount += 2
+
+ if ((len(self.decoder_list_0)) > 1):
+ self.connect((self, 0), (self.deinterleaves_0[0], 0))
+ self.connect((self.interleaves_0[0], 0), (self, 0))
+ else:
+ self.connect((self, 0), (self.generic_decoders_0[0], 0))
+ self.connect((self.generic_decoders_0[0], 0), (self, 0))
+
+ def get_decoder_list_0(self):
+ return self.decoder_list_0
+
+ def set_decoder_list_0(self, decoder_list_0):
+ self.decoder_list_0 = decoder_list_0
diff --git a/gr-fec/python/fec/capillary_threaded_encoder.py b/gr-fec/python/fec/capillary_threaded_encoder.py
new file mode 100644
index 0000000000..21d4af62ca
--- /dev/null
+++ b/gr-fec/python/fec/capillary_threaded_encoder.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import fec_swig as fec
+import math
+
+class capillary_threaded_encoder(gr.hier_block2):
+ def __init__(self, encoder_list_0, input_size=gr.sizeof_char, output_size=gr.sizeof_char):
+ gr.hier_block2.__init__(self, "Capillary Threaded Encoder",
+ gr.io_signature(1, 1, input_size),
+ gr.io_signature(1, 1, output_size))
+
+ self.encoder_list_0 = encoder_list_0
+
+ check = math.log10(len(self.encoder_list_0)) / math.log10(2.0)
+ if(abs(check - int(check)) > 0.0):
+ gr.log.info("fec.capillary_threaded_encoder: number of encoders must be a power of 2.")
+ raise AttributeError
+
+ self.deinterleaves_0 = [];
+ for i in range(int(math.log(len(encoder_list_0), 2))):
+ for j in range(int(math.pow(2, i))):
+ self.deinterleaves_0.append(blocks.deinterleave(input_size,
+ fec.get_encoder_input_size(encoder_list_0[0])))
+
+ self.generic_encoders_0 = [];
+ for i in range(len(encoder_list_0)):
+ self.generic_encoders_0.append(fec.encoder(encoder_list_0[i],
+ input_size, output_size))
+
+ self.interleaves_0 = [];
+ for i in range(int(math.log(len(encoder_list_0), 2))):
+ for j in range(int(math.pow(2, i))):
+ self.interleaves_0.append(blocks.interleave(output_size,
+ fec.get_encoder_output_size(encoder_list_0[0])))
+
+ rootcount = 0;
+ branchcount = 1;
+ for i in range(int(math.log(len(encoder_list_0), 2)) - 1):
+ for j in range(int(math.pow(2, i))):
+ self.connect((self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0))
+ self.connect((self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0))
+ rootcount += 1;
+ branchcount += 2;
+
+ codercount = 0;
+ for i in range(len(encoder_list_0)/2):
+ self.connect((self.deinterleaves_0[rootcount], 0), (self.generic_encoders_0[codercount], 0))
+ self.connect((self.deinterleaves_0[rootcount], 1), (self.generic_encoders_0[codercount + 1], 0))
+ rootcount += 1;
+ codercount += 2;
+
+
+ rootcount = 0;
+ branchcount = 1;
+ for i in range(int(math.log(len(encoder_list_0), 2)) - 1):
+ for j in range(int(math.pow(2, i))):
+ self.connect((self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect((self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1))
+ rootcount += 1;
+ branchcount += 2;
+
+
+ codercount = 0;
+ for i in range(len(encoder_list_0)/2):
+ self.connect((self.generic_encoders_0[codercount], 0), (self.interleaves_0[rootcount], 0))
+ self.connect((self.generic_encoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1))
+ rootcount += 1;
+ codercount += 2;
+
+ if((len(self.encoder_list_0)) > 1):
+ self.connect((self, 0), (self.deinterleaves_0[0], 0))
+ self.connect((self.interleaves_0[0], 0), (self, 0))
+ else:
+ self.connect((self, 0), (self.generic_encoders_0[0], 0))
+ self.connect((self.generic_encoders_0[0], 0), (self, 0))
+
+ def get_encoder_list_0(self):
+ return self.encoder_list_0
+
+ def set_encoder_list_0(self, encoder_list_0):
+ self.encoder_list_0 = encoder_list_0
diff --git a/gr-fec/python/fec/extended_async_encoder.py b/gr-fec/python/fec/extended_async_encoder.py
new file mode 100644
index 0000000000..fffe64aeb8
--- /dev/null
+++ b/gr-fec/python/fec/extended_async_encoder.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr
+import fec_swig as fec
+from bitflip import read_bitlist
+import weakref
+
+class extended_async_encoder(gr.hier_block2):
+ def __init__(self, encoder_obj_list, puncpat=None):
+ gr.hier_block2.__init__(self, "extended_async_encoder",
+ gr.io_signature(0, 0, 0),
+ gr.io_signature(0, 0, 0))
+
+ # Set us up as a message passing block
+ self.message_port_register_hier_in('in')
+ self.message_port_register_hier_out('out')
+
+ self.puncpat=puncpat
+
+ # If it's a list of encoders, take the first one, unless it's
+ # a list of lists of encoders.
+ if(type(encoder_obj_list) == list):
+ # This block doesn't handle parallelism of > 1
+ if(type(encoder_obj_list[0]) == list):
+ gr.log.info("fec.extended_encoder: Parallelism must be 0 or 1.")
+ raise AttributeError
+
+ encoder_obj = encoder_obj_list[0]
+
+ # Otherwise, just take it as is
+ else:
+ encoder_obj = encoder_obj_list
+
+ self.encoder = fec.async_encoder(encoder_obj)
+
+ #self.puncture = None
+ #if self.puncpat != '11':
+ # self.puncture = fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0)
+
+ self.msg_connect(weakref.proxy(self), "in", self.encoder, "in")
+
+ #if(self.puncture):
+ # self.msg_connect(self.encoder, "out", self.puncture, "in")
+ # self.msg_connect(self.puncture, "out", weakref.proxy(self), "out")
+ #else:
+ # self.msg_connect(self.encoder, "out", weakref.proxy(self), "out")
+ self.msg_connect(self.encoder, "out", weakref.proxy(self), "out")
diff --git a/gr-fec/python/fec/extended_decoder.py b/gr-fec/python/fec/extended_decoder.py
new file mode 100644
index 0000000000..7e6cf452f9
--- /dev/null
+++ b/gr-fec/python/fec/extended_decoder.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import fec_swig as fec
+from bitflip import *
+import sys
+
+if sys.modules.has_key("gnuradio.digital"):
+ digital = sys.modules["gnuradio.digital"]
+else:
+ from gnuradio import digital
+
+from threaded_decoder import threaded_decoder
+from capillary_threaded_decoder import capillary_threaded_decoder
+
+class extended_decoder(gr.hier_block2):
+
+#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density)
+#for i in numpy.arange(.1, .499, .01):
+ #print str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);
+ garbletable = {
+ 0.310786835319:0.1,
+ 0.279118162802:0.11,
+ 0.252699589071:0.12,
+ 0.230318516016:0.13,
+ 0.211108735347:0.14,
+ 0.194434959095:0.15,
+ 0.179820650401:0.16,
+ 0.166901324951:0.17,
+ 0.15539341766:0.18,
+ 0.145072979886:0.19,
+ 0.135760766313:0.2,
+ 0.127311581396:0.21,
+ 0.119606529806:0.22,
+ 0.112547286766:0.23,
+ 0.106051798775:0.24,
+ 0.10005101381:0.25,
+ 0.0944863633098:0.26,
+ 0.0893078003966:0.27,
+ 0.084472254501:0.28,
+ 0.0799424008658:0.29,
+ 0.0756856701944:0.3,
+ 0.0716734425668:0.31,
+ 0.0678803831565:0.32,
+ 0.0642838867856:0.33,
+ 0.0608636049994:0.34,
+ 0.0576010337489:0.35,
+ 0.0544791422522:0.36,
+ 0.0514820241933:0.37,
+ 0.0485945507251:0.38,
+ 0.0458019998183:0.39,
+ 0.0430896262596:0.4,
+ 0.0404421166935:0.41,
+ 0.0378428350972:0.42,
+ 0.0352726843274:0.43,
+ 0.0327082350617:0.44,
+ 0.0301183562535:0.45,
+ 0.0274574540266:0.46,
+ 0.0246498236897:0.47,
+ 0.0215448131298:0.48,
+ 0.0177274208353:0.49,
+ }
+
+ def __init__(self, decoder_obj_list, threading, ann=None, puncpat='11',
+ integration_period=10000, flush=None, rotator=None):
+ gr.hier_block2.__init__(self, "extended_decoder",
+ gr.io_signature(1, 1, gr.sizeof_float),
+ gr.io_signature(1, 1, gr.sizeof_char))
+ self.blocks=[]
+ self.ann=ann
+ self.puncpat=puncpat
+ self.flush=flush
+
+ if(type(decoder_obj_list) == list):
+ if(type(decoder_obj_list[0]) == list):
+ gr.log.info("fec.extended_decoder: Parallelism must be 1.")
+ raise AttributeError
+ else:
+ # If it has parallelism of 0, force it into a list of 1
+ decoder_obj_list = [decoder_obj_list,]
+
+ message_collector_connected=False
+
+ ##anything going through the annihilator needs shifted, uchar vals
+ if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \
+ fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
+ self.blocks.append(blocks.multiply_const_ff(48.0))
+
+ if fec.get_shift(decoder_obj_list[0]) != 0.0:
+ self.blocks.append(blocks.add_const_ff(fec.get_shift(decoder_obj_list[0])))
+ elif fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
+ self.blocks.append(blocks.add_const_ff(128.0))
+
+ if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \
+ fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
+ self.blocks.append(blocks.float_to_uchar());
+
+ const_index = 0; #index that corresponds to mod order for specinvert purposes
+
+ if not self.flush:
+ flush = 10000;
+ else:
+ flush = self.flush;
+ if self.ann: #ann and puncpat are strings of 0s and 1s
+ cat = fec.ULLVector();
+ for i in fec.read_big_bitlist(ann):
+ cat.append(i);
+
+ synd_garble = .49
+ idx_list = self.garbletable.keys()
+ idx_list.sort()
+ for i in idx_list:
+ if 1.0/self.ann.count('1') >= i:
+ synd_garble = self.garbletable[i]
+ print 'using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb'
+ print 'ceiling: .0335 data garble rate'
+ self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'),
+ len(ann), integration_period, flush, synd_garble))
+
+ if self.puncpat != '11':
+ self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+
+ if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits":
+ self.blocks.append(blocks.uchar_to_float())
+ self.blocks.append(blocks.add_const_ff(-128.0))
+ self.blocks.append(digital.binary_slicer_fb())
+ self.blocks.append(blocks.unpacked_to_packed_bb(1,0))
+
+ if(len(decoder_obj_list) > 1):
+ if(fec.get_history(decoder_obj_list[0]) != 0):
+ gr.log.info("fec.extended_decoder: Cannot use multi-threaded parallelism on a decoder with history.")
+ raise AttributeError
+
+ if threading == 'capillary':
+ self.blocks.append(capillary_threaded_decoder(decoder_obj_list,
+ fec.get_decoder_input_item_size(decoder_obj_list[0]),
+ fec.get_decoder_output_item_size(decoder_obj_list[0])))
+
+ elif threading == 'ordinary':
+ self.blocks.append(threaded_decoder(decoder_obj_list,
+ fec.get_decoder_input_item_size(decoder_obj_list[0]),
+ fec.get_decoder_output_item_size(decoder_obj_list[0])))
+
+ else:
+ self.blocks.append(fec.decoder(decoder_obj_list[0],
+ fec.get_decoder_input_item_size(decoder_obj_list[0]),
+ fec.get_decoder_output_item_size(decoder_obj_list[0])))
+
+ if fec.get_decoder_output_conversion(decoder_obj_list[0]) == "unpack":
+ self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST));
+
+ self.connect((self, 0), (self.blocks[0], 0));
+ self.connect((self.blocks[-1], 0), (self, 0));
+
+ for i in range(len(self.blocks) - 1):
+ self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
diff --git a/gr-fec/python/fec/extended_encoder.py b/gr-fec/python/fec/extended_encoder.py
new file mode 100644
index 0000000000..50a8891ea5
--- /dev/null
+++ b/gr-fec/python/fec/extended_encoder.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+
+import fec_swig as fec
+from threaded_encoder import threaded_encoder
+from capillary_threaded_encoder import capillary_threaded_encoder
+from bitflip import read_bitlist
+
+class extended_encoder(gr.hier_block2):
+ def __init__(self, encoder_obj_list, threading, puncpat=None):
+ gr.hier_block2.__init__(self, "extended_encoder",
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_char))
+
+ self.blocks=[]
+ self.puncpat=puncpat
+
+ if(type(encoder_obj_list) == list):
+ if(type(encoder_obj_list[0]) == list):
+ gr.log.info("fec.extended_encoder: Parallelism must be 1.")
+ raise AttributeError
+ else:
+ # If it has parallelism of 0, force it into a list of 1
+ encoder_obj_list = [encoder_obj_list,]
+
+ if fec.get_encoder_input_conversion(encoder_obj_list[0]) == "pack":
+ self.blocks.append(blocks.pack_k_bits_bb(8))
+
+ if threading == 'capillary':
+ self.blocks.append(capillary_threaded_encoder(encoder_obj_list,
+ gr.sizeof_char,
+ gr.sizeof_char))
+ elif threading == 'ordinary':
+ self.blocks.append(threaded_encoder(encoder_obj_list,
+ gr.sizeof_char,
+ gr.sizeof_char))
+ else:
+ self.blocks.append(fec.encoder(encoder_obj_list[0],
+ gr.sizeof_char,
+ gr.sizeof_char))
+
+ if self.puncpat != '11':
+ self.blocks.append(fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+
+ # Connect the input to the encoder and the output to the
+ # puncture if used or the encoder if not.
+ self.connect((self, 0), (self.blocks[0], 0));
+ self.connect((self.blocks[-1], 0), (self, 0));
+
+ # If using the puncture block, add it into the flowgraph after
+ # the encoder.
+ for i in range(len(self.blocks) - 1):
+ self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
diff --git a/gr-fec/python/fec/extended_tagged_decoder.py b/gr-fec/python/fec/extended_tagged_decoder.py
new file mode 100644
index 0000000000..1865cbfbe4
--- /dev/null
+++ b/gr-fec/python/fec/extended_tagged_decoder.py
@@ -0,0 +1,175 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import fec_swig as fec
+from bitflip import *
+import sys
+
+if sys.modules.has_key("gnuradio.digital"):
+ digital = sys.modules["gnuradio.digital"]
+else:
+ from gnuradio import digital
+
+class extended_tagged_decoder(gr.hier_block2):
+
+#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density)
+#for i in numpy.arange(.1, .499, .01):
+ #print str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i);
+ garbletable = {
+ 0.310786835319:0.1,
+ 0.279118162802:0.11,
+ 0.252699589071:0.12,
+ 0.230318516016:0.13,
+ 0.211108735347:0.14,
+ 0.194434959095:0.15,
+ 0.179820650401:0.16,
+ 0.166901324951:0.17,
+ 0.15539341766:0.18,
+ 0.145072979886:0.19,
+ 0.135760766313:0.2,
+ 0.127311581396:0.21,
+ 0.119606529806:0.22,
+ 0.112547286766:0.23,
+ 0.106051798775:0.24,
+ 0.10005101381:0.25,
+ 0.0944863633098:0.26,
+ 0.0893078003966:0.27,
+ 0.084472254501:0.28,
+ 0.0799424008658:0.29,
+ 0.0756856701944:0.3,
+ 0.0716734425668:0.31,
+ 0.0678803831565:0.32,
+ 0.0642838867856:0.33,
+ 0.0608636049994:0.34,
+ 0.0576010337489:0.35,
+ 0.0544791422522:0.36,
+ 0.0514820241933:0.37,
+ 0.0485945507251:0.38,
+ 0.0458019998183:0.39,
+ 0.0430896262596:0.4,
+ 0.0404421166935:0.41,
+ 0.0378428350972:0.42,
+ 0.0352726843274:0.43,
+ 0.0327082350617:0.44,
+ 0.0301183562535:0.45,
+ 0.0274574540266:0.46,
+ 0.0246498236897:0.47,
+ 0.0215448131298:0.48,
+ 0.0177274208353:0.49,
+ }
+
+ def __init__(self, decoder_obj_list, ann=None, puncpat='11',
+ integration_period=10000, flush=None, rotator=None, lentagname=None):
+ gr.hier_block2.__init__(self, "extended_decoder",
+ gr.io_signature(1, 1, gr.sizeof_float),
+ gr.io_signature(1, 1, gr.sizeof_char))
+ self.blocks=[]
+ self.ann=ann
+ self.puncpat=puncpat
+ self.flush=flush
+
+ if(type(decoder_obj_list) == list):
+ # This block doesn't handle parallelism of > 1
+ # We could just grab encoder [0][0], but we don't want to encourage this.
+ if(type(decoder_obj_list[0]) == list):
+ gr.log.info("fec.extended_tagged_decoder: Parallelism must be 1.")
+ raise AttributeError
+
+ decoder_obj = decoder_obj_list[0]
+
+ # Otherwise, just take it as is
+ else:
+ decoder_obj = decoder_obj_list
+
+ # If lentagname is None, fall back to using the non tagged
+ # stream version
+ if type(lentagname) == str:
+ if(lentagname.lower() == 'none'):
+ lentagname = None
+
+ message_collector_connected=False
+
+ ##anything going through the annihilator needs shifted, uchar vals
+ if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \
+ fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
+ self.blocks.append(blocks.multiply_const_ff(48.0))
+
+ if fec.get_shift(decoder_obj) != 0.0:
+ self.blocks.append(blocks.add_const_ff(fec.get_shift(decoder_obj)))
+ elif fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
+ self.blocks.append(blocks.add_const_ff(128.0))
+
+ if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \
+ fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
+ self.blocks.append(blocks.float_to_uchar());
+
+ const_index = 0; #index that corresponds to mod order for specinvert purposes
+
+ if not self.flush:
+ flush = 10000;
+ else:
+ flush = self.flush;
+ if self.ann: #ann and puncpat are strings of 0s and 1s
+ cat = fec.ULLVector();
+ for i in fec.read_big_bitlist(ann):
+ cat.append(i);
+
+ synd_garble = .49
+ idx_list = self.garbletable.keys()
+ idx_list.sort()
+ for i in idx_list:
+ if 1.0/self.ann.count('1') >= i:
+ synd_garble = self.garbletable[i]
+ print 'using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb'
+ print 'ceiling: .0335 data garble rate'
+ self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'),
+ len(ann), integration_period, flush, synd_garble))
+
+ if self.puncpat != '11':
+ self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+
+ if fec.get_decoder_input_conversion(decoder_obj) == "packed_bits":
+ self.blocks.append(blocks.uchar_to_float())
+ self.blocks.append(blocks.add_const_ff(-128.0))
+ self.blocks.append(digital.binary_slicer_fb())
+ self.blocks.append(blocks.unpacked_to_packed_bb(1,0))
+
+ else:
+ if(not lentagname):
+ self.blocks.append(fec.decoder(decoder_obj,
+ fec.get_decoder_input_item_size(decoder_obj),
+ fec.get_decoder_output_item_size(decoder_obj)))
+ else:
+ self.blocks.append(fec.tagged_decoder(decoder_obj,
+ fec.get_decoder_input_item_size(decoder_obj),
+ fec.get_decoder_output_item_size(decoder_obj),
+ lentagname))
+
+ if fec.get_decoder_output_conversion(decoder_obj) == "unpack":
+ self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST));
+
+ self.connect((self, 0), (self.blocks[0], 0));
+ self.connect((self.blocks[-1], 0), (self, 0));
+
+ for i in range(len(self.blocks) - 1):
+ self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
diff --git a/gr-fec/python/fec/extended_tagged_encoder.py b/gr-fec/python/fec/extended_tagged_encoder.py
new file mode 100644
index 0000000000..2f78b8e5c9
--- /dev/null
+++ b/gr-fec/python/fec/extended_tagged_encoder.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+
+import fec_swig as fec
+from bitflip import read_bitlist
+
+class extended_tagged_encoder(gr.hier_block2):
+ def __init__(self, encoder_obj_list, puncpat=None, lentagname=None):
+ gr.hier_block2.__init__(self, "extended_tagged_encoder",
+ gr.io_signature(1, 1, gr.sizeof_char),
+ gr.io_signature(1, 1, gr.sizeof_char))
+
+ self.blocks=[]
+ self.puncpat=puncpat
+
+ # If it's a list of encoders, take the first one, unless it's
+ # a list of lists of encoders.
+ if(type(encoder_obj_list) == list):
+ # This block doesn't handle parallelism of > 1
+ # We could just grab encoder [0][0], but we don't want to encourage this.
+ if(type(encoder_obj_list[0]) == list):
+ gr.log.info("fec.extended_tagged_encoder: Parallelism must be 0 or 1.")
+ raise AttributeError
+
+ encoder_obj = encoder_obj_list[0]
+
+ # Otherwise, just take it as is
+ else:
+ encoder_obj = encoder_obj_list
+
+ # If lentagname is None, fall back to using the non tagged
+ # stream version
+ if type(lentagname) == str:
+ if(lentagname.lower() == 'none'):
+ lentagname = None
+
+ if fec.get_encoder_input_conversion(encoder_obj) == "pack":
+ self.blocks.append(blocks.pack_k_bits_bb(8))
+
+ if(not lentagname):
+ self.blocks.append(fec.encoder(encoder_obj,
+ gr.sizeof_char,
+ gr.sizeof_char))
+ else:
+ self.blocks.append(fec.tagged_encoder(encoder_obj,
+ gr.sizeof_char,
+ gr.sizeof_char,
+ lentagname))
+
+ if self.puncpat != '11':
+ self.blocks.append(fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0))
+
+ # Connect the input to the encoder and the output to the
+ # puncture if used or the encoder if not.
+ self.connect((self, 0), (self.blocks[0], 0));
+ self.connect((self.blocks[-1], 0), (self, 0));
+
+ # If using the puncture block, add it into the flowgraph after
+ # the encoder.
+ for i in range(len(self.blocks) - 1):
+ self.connect((self.blocks[i], 0), (self.blocks[i+1], 0));
diff --git a/gr-fec/python/fec/fec_test.py b/gr-fec/python/fec/fec_test.py
new file mode 100644
index 0000000000..6466a0bcb4
--- /dev/null
+++ b/gr-fec/python/fec/fec_test.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio.fec.bitflip import read_bitlist
+from gnuradio import gr, blocks, analog
+import math
+import sys
+
+if sys.modules.has_key("gnuradio.digital"):
+ digital = sys.modules["gnuradio.digital"]
+else:
+ from gnuradio import digital
+
+from extended_encoder import extended_encoder
+from extended_decoder import extended_decoder
+
+class fec_test(gr.hier_block2):
+
+ def __init__(self, generic_encoder=0, generic_decoder=0, esno=0,
+ samp_rate=3200000, threading="capillary", puncpat='11',
+ seed=0):
+ gr.hier_block2.__init__(self, "fec_test",
+ gr.io_signature(1, 1, gr.sizeof_char*1),
+ gr.io_signature(2, 2, gr.sizeof_char*1))
+
+ self.generic_encoder = generic_encoder
+ self.generic_decoder = generic_decoder
+ self.esno = esno
+ self.samp_rate = samp_rate
+ self.threading = threading
+ self.puncpat = puncpat
+
+ self.map_bb = digital.map_bb(([-1, 1]))
+ self.b2f = blocks.char_to_float(1, 1)
+
+ self.unpack8 = blocks.unpack_k_bits_bb(8)
+ self.pack8 = blocks.pack_k_bits_bb(8)
+
+ self.encoder = extended_encoder(encoder_obj_list=generic_encoder,
+ threading=threading,
+ puncpat=puncpat)
+
+ self.decoder = extended_decoder(decoder_obj_list=generic_decoder,
+ threading=threading,
+ ann=None, puncpat=puncpat,
+ integration_period=10000, rotator=None)
+
+ noise = math.sqrt((10.0**(-esno/10.0))/2.0)
+ #self.fastnoise = analog.fastnoise_source_f(analog.GR_GAUSSIAN, noise, seed, 8192)
+ self.fastnoise = analog.noise_source_f(analog.GR_GAUSSIAN, noise, seed)
+ self.addnoise = blocks.add_ff(1)
+
+ # Send packed input directly to the second output
+ self.copy_packed = blocks.copy(gr.sizeof_char)
+ self.connect(self, self.copy_packed)
+ self.connect(self.copy_packed, (self, 1))
+
+ # Unpack inputl encode, convert to +/-1, add noise, decode, repack
+ self.connect(self, self.unpack8)
+ self.connect(self.unpack8, self.encoder)
+ self.connect(self.encoder, self.map_bb)
+ self.connect(self.map_bb, self.b2f)
+ self.connect(self.b2f, (self.addnoise, 0))
+ self.connect(self.fastnoise, (self.addnoise,1))
+ self.connect(self.addnoise, self.decoder)
+ self.connect(self.decoder, self.pack8)
+ self.connect(self.pack8, (self, 0))
+
+
+ def get_generic_encoder(self):
+ return self.generic_encoder
+
+ def set_generic_encoder(self, generic_encoder):
+ self.generic_encoder = generic_encoder
+
+ def get_generic_decoder(self):
+ return self.generic_decoder
+
+ def set_generic_decoder(self, generic_decoder):
+ self.generic_decoder = generic_decoder
+
+ def get_esno(self):
+ return self.esno
+
+ def set_esno(self, esno):
+ self.esno = esno
+
+ def get_samp_rate(self):
+ return self.samp_rate
+
+ def set_samp_rate(self, samp_rate):
+ self.samp_rate = samp_rate
+
+ def get_threading(self):
+ return self.threading
+
+ def set_threading(self, threading):
+ self.threading = threading
+
+ def get_puncpat(self):
+ return self.puncpat
+
+ def set_puncpat(self, puncpat):
+ self.puncpat = puncpat
diff --git a/gr-fec/python/fec/qa_depuncture.py b/gr-fec/python/fec/qa_depuncture.py
new file mode 100644
index 0000000000..5566e83a25
--- /dev/null
+++ b/gr-fec/python/fec/qa_depuncture.py
@@ -0,0 +1,192 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import fec_swig as fec
+import blocks_swig as blocks
+from collections import deque
+
+class test_depuncture (gr_unittest.TestCase):
+
+ def depuncture_setup(self):
+ p = []
+ for i in range(self.puncsize):
+ p.append(self.puncpat >> (self.puncsize - 1 - i) & 1)
+ d = deque(p)
+ d.rotate(self.delay)
+ _puncpat = list(d)
+
+ k = 0
+ self.expected = []
+ for n in range(len(self.src_data)/(self.puncsize - self.puncholes)):
+ for i in range(self.puncsize):
+ if _puncpat[i] == 1:
+ self.expected.append(self.src_data[k]);
+ k+=1
+ else:
+ self.expected.append(self.sym)
+
+ def setUp(self):
+ self.src_data = 2000*range(64)
+ self.tb = gr.top_block ()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_000(self):
+ # Test normal operation of the depuncture block
+
+ self.puncsize = 8
+ self.puncpat = 0xEF
+ self.delay = 0
+ self.sym = 0
+ self.puncholes = 1
+
+ self.depuncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.depuncture_bb(self.puncsize, self.puncpat,
+ self.delay, self.sym)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+ def test_001(self):
+ # Test normal operation of the depuncture block with a delay
+
+ self.puncsize = 8
+ self.puncpat = 0xEF
+ self.delay = 1
+ self.sym = 0
+ self.puncholes = 1
+
+ self.depuncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.depuncture_bb(self.puncsize, self.puncpat,
+ self.delay, self.sym)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+ def test_002(self):
+ # Test scenario where we have defined a puncture pattern with
+ # more bits than the puncsize.
+
+ self.puncsize = 4
+ self.puncpat = 0x5555
+ self.delay = 0
+ self.sym = 0
+ self.puncholes = 2
+
+ self.depuncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.depuncture_bb(self.puncsize, self.puncpat,
+ self.delay, self.sym)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+ def test_003(self):
+ # Test scenario where we have defined a puncture pattern with
+ # more bits than the puncsize with a delay. The python code
+ # doesn't account for this when creating self.expected, but
+ # this should be equivalent to a puncpat of the correct size.
+
+ self.puncsize = 4
+ self.puncpat0 = 0x5555 # too many bits set
+ self.puncpat1 = 0x55 # num bits = puncsize
+ self.delay = 1
+ self.sym = 0
+
+ src = blocks.vector_source_b(self.src_data)
+ op0 = fec.depuncture_bb(self.puncsize, self.puncpat0,
+ self.delay, self.sym)
+ op1 = fec.depuncture_bb(self.puncsize, self.puncpat1,
+ self.delay, self.sym)
+ dst0 = blocks.vector_sink_b()
+ dst1 = blocks.vector_sink_b()
+
+ self.tb.connect(src, op0, dst0)
+ self.tb.connect(src, op1, dst1)
+ self.tb.run()
+
+ dst_data0 = list(dst0.data())
+ for i in xrange(len(dst_data0)):
+ dst_data0[i] = int(dst_data0[i])
+
+ dst_data1 = list(dst1.data())
+ for i in xrange(len(dst_data1)):
+ dst_data1[i] = int(dst_data1[i])
+
+ self.assertEqual(dst_data1, dst_data0)
+
+ def test_004(self):
+ # Test normal operation of the depuncture block without
+ # specifying the fill symbol (defaults to 127).
+
+ self.puncsize = 8
+ self.puncpat = 0xEF
+ self.delay = 0
+ self.sym = 127
+ self.puncholes = 1
+
+ self.depuncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.depuncture_bb(self.puncsize, self.puncpat,
+ self.delay)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_depuncture, "test_depuncture.xml")
diff --git a/gr-fec/python/fec/qa_fecapi_cc.py b/gr-fec/python/fec/qa_fecapi_cc.py
new file mode 100644
index 0000000000..bbd500161e
--- /dev/null
+++ b/gr-fec/python/fec/qa_fecapi_cc.py
@@ -0,0 +1,195 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import fec_swig as fec
+from _qa_helper import _qa_helper
+
+from extended_encoder import extended_encoder
+from extended_decoder import extended_decoder
+
+class test_fecapi_cc(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_parallelism0_00(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ enc = fec.cc_encoder_make(frame_size*8, k, rate, polys)
+ dec = fec.cc_decoder.make(frame_size*8, k, rate, polys)
+ threading = None
+ self.test = _qa_helper(4*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism0_01(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ enc = fec.cc_encoder_make(frame_size*8, k, rate, polys)
+ dec = fec.cc_decoder.make(frame_size*8, k, rate, polys)
+ threading = 'ordinary'
+ self.test = _qa_helper(5*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism0_02(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ enc = fec.cc_encoder_make(frame_size*8, k, rate, polys)
+ dec = fec.cc_decoder.make(frame_size*8, k, rate, polys)
+ threading = 'capillary'
+ self.test = _qa_helper(5*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_00(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1))
+ dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1))
+ threading = None
+ self.test = _qa_helper(5*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_01(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1))
+ dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1))
+ threading = 'ordinary'
+ self.test = _qa_helper(5*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_02(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1))
+ dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1))
+ threading = 'capillary'
+ self.test = _qa_helper(5*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_03(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ mode = fec.CC_TERMINATED
+ enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4))
+ dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4))
+ threading = 'capillary'
+ self.test = _qa_helper(4*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_04(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ mode = fec.CC_TRUNCATED
+ enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4))
+ dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4))
+ threading = 'capillary'
+ self.test = _qa_helper(4*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_05(self):
+ frame_size = 30
+ k = 7
+ rate = 2
+ polys = [109,79]
+ mode = fec.CC_TAILBITING
+ enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4))
+ dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4))
+ threading = 'capillary'
+ self.test = _qa_helper(4*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_out = self.test.snk_output.data()
+ data_in = self.test.snk_input.data()[0:len(data_out)]
+
+ self.assertEqual(data_in, data_out)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_fecapi_cc, "test_fecapi_cc.xml")
diff --git a/gr-fec/python/fec/qa_fecapi_dummy.py b/gr-fec/python/fec/qa_fecapi_dummy.py
new file mode 100644
index 0000000000..a07890fb84
--- /dev/null
+++ b/gr-fec/python/fec/qa_fecapi_dummy.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import fec_swig as fec
+from _qa_helper import _qa_helper
+
+from extended_encoder import extended_encoder
+from extended_decoder import extended_decoder
+
+class test_fecapi_dummy(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_parallelism0_00(self):
+ frame_size = 30
+ enc = fec.dummy_encoder_make(frame_size*8)
+ dec = fec.dummy_decoder.make(frame_size*8)
+ threading = None
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism0_01(self):
+ frame_size = 30
+ enc = fec.dummy_encoder_make(frame_size*8)
+ dec = fec.dummy_decoder.make(frame_size*8)
+ threading = 'ordinary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism0_02(self):
+ frame_size = 30
+ enc = fec.dummy_encoder_make(frame_size*8)
+ dec = fec.dummy_decoder.make(frame_size*8)
+ threading = 'capillary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_00(self):
+ frame_size = 30
+ enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1))
+ dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1))
+ threading = None
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_01(self):
+ frame_size = 30
+ enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1))
+ dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1))
+ threading = 'ordinary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_02(self):
+ frame_size = 300
+ enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1))
+ dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1))
+ threading = 'capillary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_03(self):
+ frame_size = 30
+ dims = 10
+ enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims))
+ dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims))
+ threading = 'ordinary'
+ self.test = _qa_helper(dims*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_04(self):
+ frame_size = 30
+ dims = 16
+ enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims))
+ dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims))
+ threading = 'capillary'
+ self.test = _qa_helper(dims*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_05(self):
+ frame_size = 30
+ dims = 5
+ enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims))
+ #dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims))
+ threading = 'capillary'
+
+ self.assertRaises(AttributeError, lambda: extended_encoder(enc, threading=threading, puncpat="11"))
+
+ def test_parallelism1_06(self):
+ frame_size = 30
+ dims = 5
+ #enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims))
+ dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims))
+ threading = 'capillary'
+
+ self.assertRaises(AttributeError, lambda: extended_decoder(dec, threading=threading, puncpat="11"))
+
+ def test_parallelism2_00(self):
+ frame_size = 30
+ dims1 = 16
+ dims2 = 16
+ enc = map((lambda b: map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims1))), range(0,dims2))
+ #dec = map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2))
+ threading = 'capillary'
+
+ self.assertRaises(AttributeError, lambda: extended_encoder(enc, threading=threading, puncpat="11"))
+
+ def test_parallelism2_01(self):
+ frame_size = 30
+ dims1 = 16
+ dims2 = 16
+ dec = map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2))
+ threading = 'capillary'
+
+ self.assertRaises(AttributeError, lambda: extended_decoder(dec, threading=threading, puncpat="11"))
+
+if __name__ == '__main__':
+ gr_unittest.run(test_fecapi_dummy, "test_fecapi_dummy.xml")
diff --git a/gr-fec/python/fec/qa_fecapi_repetition.py b/gr-fec/python/fec/qa_fecapi_repetition.py
new file mode 100644
index 0000000000..7998d61bd1
--- /dev/null
+++ b/gr-fec/python/fec/qa_fecapi_repetition.py
@@ -0,0 +1,161 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import fec_swig as fec
+from _qa_helper import _qa_helper
+
+from extended_encoder import extended_encoder
+from extended_decoder import extended_decoder
+
+class test_fecapi_repetition(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_parallelism0_00(self):
+ frame_size = 30
+ rep = 3
+ enc = fec.repetition_encoder_make(frame_size*8, rep)
+ dec = fec.repetition_decoder.make(frame_size*8, rep)
+ threading = None
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism0_01(self):
+ frame_size = 30
+ rep = 3
+ enc = fec.repetition_encoder_make(frame_size*8, rep)
+ dec = fec.repetition_decoder.make(frame_size*8, rep)
+ threading = 'ordinary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism0_02(self):
+ frame_size = 30
+ rep = 3
+ enc = fec.repetition_encoder_make(frame_size*8, rep)
+ dec = fec.repetition_decoder.make(frame_size*8, rep)
+ threading = 'capillary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_00(self):
+ frame_size = 30
+ rep = 3
+ enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1))
+ dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1))
+ threading = None
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_01(self):
+ frame_size = 30
+ rep = 3
+ enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1))
+ dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1))
+ threading = 'ordinary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_02(self):
+ frame_size = 300
+ rep = 3
+ enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1))
+ dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1))
+ threading = 'capillary'
+ self.test = _qa_helper(10*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_03(self):
+ frame_size = 30
+ rep = 3
+ dims = 10
+ enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,dims))
+ dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,dims))
+ threading = 'ordinary'
+ self.test = _qa_helper(dims*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+ def test_parallelism1_04(self):
+ frame_size = 30
+ rep = 3
+ dims = 16
+ enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,dims))
+ dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,dims))
+ threading = 'capillary'
+ self.test = _qa_helper(dims*frame_size, enc, dec, threading)
+ self.tb.connect(self.test)
+ self.tb.run()
+
+ data_in = self.test.snk_input.data()
+ data_out =self.test.snk_output.data()
+
+ self.assertEqual(data_in, data_out)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_fecapi_repetition, "test_fecapi_repetition.xml")
diff --git a/gr-fec/python/fec/qa_puncture.py b/gr-fec/python/fec/qa_puncture.py
new file mode 100644
index 0000000000..fdd15c9a08
--- /dev/null
+++ b/gr-fec/python/fec/qa_puncture.py
@@ -0,0 +1,244 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import fec_swig as fec
+import blocks_swig as blocks
+from collections import deque
+
+class test_puncture (gr_unittest.TestCase):
+
+ def puncture_setup(self):
+ p = []
+ for i in range(self.puncsize):
+ p.append(self.puncpat >> (self.puncsize - 1 - i) & 1)
+ d = deque(p)
+ d.rotate(self.delay)
+ _puncpat = list(d)
+
+ self.expected = []
+ for n in range(len(self.src_data)/self.puncsize):
+ for i in range(self.puncsize):
+ if _puncpat[i] == 1:
+ self.expected.append(self.src_data[n*self.puncsize+i]);
+
+ def setUp(self):
+ self.src_data = 10000*range(64)
+ self.tb = gr.top_block()
+
+ def tearDown(self):
+ self.tb = None
+
+ def test_000(self):
+ # Test normal operation of the puncture block
+
+ self.puncsize = 8
+ self.puncpat = 0xEF
+ self.delay = 0
+
+ self.puncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.puncture_bb(self.puncsize, self.puncpat, self.delay)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+
+ def test_001(self):
+ # Test normal operation of the puncture block with a delay
+
+ self.puncsize = 8
+ self.puncpat = 0xEE
+ self.delay = 1
+
+ self.src_data = range(16)
+
+ self.puncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.puncture_bb(self.puncsize, self.puncpat, self.delay)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+
+ def test_002(self):
+ # Test scenario where we have defined a puncture pattern with
+ # more bits than the puncsize.
+
+ self.puncsize = 4
+ self.puncpat = 0x5555
+ self.delay = 0
+
+ self.puncture_setup()
+
+ src = blocks.vector_source_b(self.src_data)
+ op = fec.puncture_bb(self.puncsize, self.puncpat, self.delay)
+ dst = blocks.vector_sink_b()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ for i in xrange(len(dst_data)):
+ dst_data[i] = int(dst_data[i])
+
+ self.assertEqual(self.expected, dst_data)
+
+ def test_003(self):
+ # Test scenario where we have defined a puncture pattern with
+ # more bits than the puncsize with a delay. The python code
+ # doesn't account for this when creating self.expected, but
+ # this should be equivalent to a puncpat of the correct size.
+
+ self.puncsize = 4
+ self.puncpat0 = 0x5555 # too many bits set
+ self.puncpat1 = 0x55 # num bits = puncsize
+ self.delay = 1
+
+ src = blocks.vector_source_b(self.src_data)
+ op0 = fec.puncture_bb(self.puncsize, self.puncpat0, self.delay)
+ op1 = fec.puncture_bb(self.puncsize, self.puncpat1, self.delay)
+ dst0 = blocks.vector_sink_b()
+ dst1 = blocks.vector_sink_b()
+
+ self.tb.connect(src, op0, dst0)
+ self.tb.connect(src, op1, dst1)
+ self.tb.run()
+
+ dst_data0 = list(dst0.data())
+ for i in xrange(len(dst_data0)):
+ dst_data0[i] = int(dst_data0[i])
+
+ dst_data1 = list(dst1.data())
+ for i in xrange(len(dst_data1)):
+ dst_data1[i] = int(dst_data1[i])
+
+ self.assertEqual(dst_data1, dst_data0)
+
+
+
+ def test_f_000(self):
+ # Test normal operation of the float puncture block
+
+ self.puncsize = 8
+ self.puncpat = 0xEF
+ self.delay = 0
+
+ self.puncture_setup()
+
+ src = blocks.vector_source_f(self.src_data)
+ op = fec.puncture_ff(self.puncsize, self.puncpat, self.delay)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ self.assertEqual(self.expected, dst_data)
+
+
+ def test_f_001(self):
+ # Test normal operation of the puncture block with a delay
+
+ self.puncsize = 8
+ self.puncpat = 0xEE
+ self.delay = 1
+
+ self.src_data = range(16)
+
+ self.puncture_setup()
+
+ src = blocks.vector_source_f(self.src_data)
+ op = fec.puncture_ff(self.puncsize, self.puncpat, self.delay)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ self.assertEqual(self.expected, dst_data)
+
+
+ def test_f_002(self):
+ # Test scenariou where we have defined a puncture pattern with
+ # more bits than the puncsize.
+
+ self.puncsize = 4
+ self.puncpat = 0x5555
+ self.delay = 0
+
+ self.puncture_setup()
+
+ src = blocks.vector_source_f(self.src_data)
+ op = fec.puncture_ff(self.puncsize, self.puncpat, self.delay)
+ dst = blocks.vector_sink_f()
+
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+
+ dst_data = list(dst.data())
+ self.assertEqual(self.expected, dst_data)
+
+ def test_f_003(self):
+ # Test scenariou where we have defined a puncture pattern with
+ # more bits than the puncsize with a delay. The python code
+ # doesn't account for this when creating self.expected, but
+ # this should be equivalent to a puncpat of the correct size.
+
+ self.puncsize = 4
+ self.puncpat0 = 0x5555 # too many bits set
+ self.puncpat1 = 0x55 # num bits = puncsize
+ self.delay = 1
+
+ src = blocks.vector_source_f(self.src_data)
+ op0 = fec.puncture_ff(self.puncsize, self.puncpat0, self.delay)
+ op1 = fec.puncture_ff(self.puncsize, self.puncpat1, self.delay)
+ dst0 = blocks.vector_sink_f()
+ dst1 = blocks.vector_sink_f()
+
+ self.tb.connect(src, op0, dst0)
+ self.tb.connect(src, op1, dst1)
+ self.tb.run()
+
+ dst_data0 = list(dst0.data())
+ dst_data1 = list(dst1.data())
+
+ self.assertEqual(dst_data1, dst_data0)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_puncture, "test_puncture.xml")
diff --git a/gr-fec/python/fec/threaded_decoder.py b/gr-fec/python/fec/threaded_decoder.py
new file mode 100644
index 0000000000..115ad7b5d3
--- /dev/null
+++ b/gr-fec/python/fec/threaded_decoder.py
@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import fec_swig as fec
+
+class threaded_decoder(gr.hier_block2):
+ def __init__(self, decoder_list_0, input_size, output_size):
+ gr.hier_block2.__init__(
+ self, "Threaded Decoder",
+ gr.io_signature(1, 1, input_size*1),
+ gr.io_signature(1, 1, output_size*1))
+
+ self.decoder_list_0 = decoder_list_0
+
+ self.deinterleave_0 = blocks.deinterleave(input_size,
+ fec.get_decoder_input_size(decoder_list_0[0]))
+
+ self.generic_decoders_0 = []
+ for i in range(len(decoder_list_0)):
+ self.generic_decoders_0.append(fec.decoder(decoder_list_0[i],
+ input_size, output_size))
+
+ self.interleave_0 = blocks.interleave(output_size,
+ fec.get_decoder_output_size(decoder_list_0[0]))
+
+ for i in range(len(decoder_list_0)):
+ self.connect((self.deinterleave_0, i), (self.generic_decoders_0[i], 0))
+
+ for i in range(len(decoder_list_0)):
+ self.connect((self.generic_decoders_0[i], 0), (self.interleave_0, i))
+
+
+ self.connect((self, 0), (self.deinterleave_0, 0))
+ self.connect((self.interleave_0, 0), (self, 0))
+
+ def get_decoder_list_0(self):
+ return self.decoder_list_0
+
+ def set_decoder_list_0(self, decoder_list_0):
+ self.decoder_list_0 = decoder_list_0
diff --git a/gr-fec/python/fec/threaded_encoder.py b/gr-fec/python/fec/threaded_encoder.py
new file mode 100644
index 0000000000..391baa5442
--- /dev/null
+++ b/gr-fec/python/fec/threaded_encoder.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, blocks
+import fec_swig as fec
+
+class threaded_encoder(gr.hier_block2):
+ def __init__(self, encoder_list_0, input_size, output_size):
+ gr.hier_block2.__init__(
+ self, "Threaded Encoder",
+ gr.io_signature(1, 1, input_size*1),
+ gr.io_signature(1, 1, output_size*1))
+
+ self.encoder_list_0 = encoder_list_0
+
+ self.fec_deinterleave_0 = blocks.deinterleave(input_size,
+ fec.get_encoder_input_size(encoder_list_0[0]))
+
+ self.generic_encoders_0 = [];
+ for i in range(len(encoder_list_0)):
+ self.generic_encoders_0.append(fec.encoder(encoder_list_0[i],
+ input_size, output_size))
+
+ self.fec_interleave_0 = blocks.interleave(output_size,
+ fec.get_encoder_output_size(encoder_list_0[0]))
+
+ for i in range(len(encoder_list_0)):
+ self.connect((self.fec_deinterleave_0, i), (self.generic_encoders_0[i], 0))
+
+ for i in range(len(encoder_list_0)):
+ self.connect((self.generic_encoders_0[i], 0), (self.fec_interleave_0, i))
+
+ self.connect((self, 0), (self.fec_deinterleave_0, 0))
+ self.connect((self.fec_interleave_0, 0), (self, 0))
+
+ def get_encoder_list_0(self):
+ return self.encoder_list_0
+
+ def set_encoder_list_0(self, encoder_list_0):
+ self.encoder_list_0 = encoder_list_0