diff options
Diffstat (limited to 'gr-fec/python')
20 files changed, 2293 insertions, 1 deletions
diff --git a/gr-fec/python/fec/CMakeLists.txt b/gr-fec/python/fec/CMakeLists.txt index a7eefaa0c1..032816866d 100644 --- a/gr-fec/python/fec/CMakeLists.txt +++ b/gr-fec/python/fec/CMakeLists.txt @@ -23,6 +23,18 @@ include(GrPython) GR_PYTHON_INSTALL( FILES __init__.py + bitflip.py + extended_encoder.py + extended_decoder.py + capillary_threaded_decoder.py + capillary_threaded_encoder.py + threaded_decoder.py + threaded_encoder.py + extended_async_encoder.py + extended_tagged_encoder.py + extended_tagged_decoder.py + fec_test.py + bercurve_generator.py DESTINATION ${GR_PYTHON_DIR}/gnuradio/fec COMPONENT "fec_python" ) diff --git a/gr-fec/python/fec/__init__.py b/gr-fec/python/fec/__init__.py index bfec694739..6c82232d4f 100644 --- a/gr-fec/python/fec/__init__.py +++ b/gr-fec/python/fec/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2012 Free Software Foundation, Inc. +# Copyright 2012,2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -30,3 +30,18 @@ except ImportError: dirname, filename = os.path.split(os.path.abspath(__file__)) __path__.append(os.path.join(dirname, "..", "..", "swig")) from fec_swig import * + +from bitflip import * +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder +from threaded_encoder import threaded_encoder +from threaded_decoder import threaded_decoder +from capillary_threaded_decoder import capillary_threaded_decoder +from capillary_threaded_encoder import capillary_threaded_encoder +from extended_async_encoder import extended_async_encoder +from extended_tagged_encoder import extended_tagged_encoder +from extended_tagged_decoder import extended_tagged_decoder + + +from fec_test import fec_test +from bercurve_generator import bercurve_generator diff --git a/gr-fec/python/fec/_qa_helper.py b/gr-fec/python/fec/_qa_helper.py new file mode 100755 index 0000000000..8722453441 --- /dev/null +++ b/gr-fec/python/fec/_qa_helper.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import blocks +from gnuradio import gr +import sys, numpy + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class map_bb(gr.sync_block): + def __init__(self, bitmap): + gr.sync_block.__init__( + self, + name = "map_bb", + in_sig = [numpy.int8], + out_sig = [numpy.int8]) + self.bitmap = bitmap + + def work(self, input_items, output_items): + output_items[0][:] = map(lambda x: self.bitmap[x], input_items[0]) + return len(output_items[0]) + + +class _qa_helper(gr.top_block): + + def __init__(self, data_size, enc, dec, threading): + gr.top_block.__init__(self, "_qa_helper") + + self.puncpat = puncpat = '11' + + self.enc = enc + self.dec = dec + self.data_size = data_size + self.threading = threading + + self.ext_encoder = extended_encoder(enc, threading=self.threading, puncpat=self.puncpat) + self.ext_decoder= extended_decoder(dec, threading=self.threading, ann=None, + puncpat=self.puncpat, integration_period=10000) + + self.src = blocks.vector_source_b(data_size*[0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False) + self.unpack = blocks.unpack_k_bits_bb(8) + self.map = map_bb([-1, 1]) + self.to_float = blocks.char_to_float(1) + self.snk_input = blocks.vector_sink_b() + self.snk_output = blocks.vector_sink_b() + + self.connect(self.src, self.unpack, self.ext_encoder) + self.connect(self.ext_encoder, self.map, self.to_float) + self.connect(self.to_float, self.ext_decoder) + self.connect(self.unpack, self.snk_input) + self.connect(self.ext_decoder, self.snk_output) + +if __name__ == '__main__': + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + #enc = fec.repetition_encoder_make(frame_size*8, 3) + dec = fec.dummy_decoder.make(frame_size*8) + + tb = _qa_helper(10*frame_size, enc, dec, None) + tb.run() + + errs = 0 + for i,o in zip(tb.snk_input.data(), tb.snk_output.data()): + if i-o != 0: + errs += 1 + + if errs == 0: + print "Decoded properly" + else: + print "Problem Decoding" diff --git a/gr-fec/python/fec/bercurve_generator.py b/gr-fec/python/fec/bercurve_generator.py new file mode 100644 index 0000000000..e67d1e17c2 --- /dev/null +++ b/gr-fec/python/fec/bercurve_generator.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import numpy + +from fec_test import fec_test + +class bercurve_generator(gr.hier_block2): + + def __init__(self, encoder_list, decoder_list, esno=numpy.arange(0.0, 3.0, .25), + samp_rate=3200000, threading='capillary', puncpat='11', seed=0): + gr.hier_block2.__init__( + self, "ber_curve_generator", + gr.io_signature(0, 0, 0), + gr.io_signature(len(esno) * 2, len(esno) * 2, gr.sizeof_char*1)) + + self.esno = esno + self.samp_rate = samp_rate + self.encoder_list = encoder_list + self.decoder_list = decoder_list + self.puncpat = puncpat + + self.random_gen_b_0 = blocks.vector_source_b(map(int, numpy.random.randint(0, 256, 100000)), True) + self.deinterleave = blocks.deinterleave(gr.sizeof_char*1) + self.connect(self.random_gen_b_0, self.deinterleave) + self.ber_generators = [] + for i in range(0, len(esno)): + ber_generator_temp = fec_test( + generic_encoder=encoder_list[i], + generic_decoder=decoder_list[i], + esno=esno[i], + samp_rate=samp_rate, + threading=threading, + puncpat=puncpat, + seed=seed) + self.ber_generators.append(ber_generator_temp); + + for i in range(0, len(esno)): + self.connect((self.deinterleave, i), (self.ber_generators[i])) + self.connect((self.ber_generators[i], 0), (self, i*2)); + self.connect((self.ber_generators[i], 1), (self, i*2 + 1)); + + def get_esno(self): + return self.esno + + def set_esno(self, esno): + self.esno = esno + self.ber_generator_0.set_esno(self.esno) + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + self.ber_generator_0.set_samp_rate(self.samp_rate) + + def get_encoder_list(self): + return self.encoder_list + + def set_encoder_list(self, encoder_list): + self.encoder_list = encoder_list + self.ber_generator_0.set_generic_encoder(self.encoder_list) + + def get_decoder_list(self): + return self.decoder_list + + def set_decoder_list(self, decoder_list): + self.decoder_list = decoder_list + self.ber_generator_0.set_generic_decoder(self.decoder_list) + + def get_puncpat(self): + return self.puncpat + + def set_puncpat(self, puncpat): + self.puncpat = puncpat diff --git a/gr-fec/python/fec/bitflip.py b/gr-fec/python/fec/bitflip.py new file mode 100644 index 0000000000..235dc19a05 --- /dev/null +++ b/gr-fec/python/fec/bitflip.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +def bitreverse(mint): + res = 0; + while mint != 0: + res = res << 1; + res += mint & 1; + mint = mint >> 1; + return res; + +const_lut = [2]; +specinvert_lut = [[0, 2, 1, 3]]; + +def bitflip(mint, bitflip_lut, index, csize): + res = 0; + cnt = 0; + mask = (1 << const_lut[index]) - 1; + while (cnt < csize): + res += (bitflip_lut[(mint >> cnt) & (mask)]) << cnt; + cnt += const_lut[index]; + return res; + + +def read_bitlist(bitlist): + res = 0; + for i in range(len(bitlist)): + if int(bitlist[i]) == 1: + res += 1 << (len(bitlist) - i - 1); + return res; + + +def read_big_bitlist(bitlist): + ret = [] + for j in range(0, len(bitlist)/64): + res = 0; + for i in range(0, 64): + if int(bitlist[j*64+i]) == 1: + res += 1 << (64 - i - 1); + ret.append(res); + res = 0; + j = 0; + for i in range(len(bitlist)%64): + if int(bitlist[len(ret)*64+i]) == 1: + res += 1 << (64 - j - 1); + j += 1; + ret.append(res); + return ret; + +def generate_symmetries(symlist): + retlist = [] + if len(symlist) == 1: + for i in range(len(symlist[0])): + retlist.append(symlist[0][i:] + symlist[0][0:i]); + invlist = symlist[0]; + for i in range(1, len(symlist[0])/2): + invlist[i] = symlist[0][i + len(symlist[0])/2]; + invlist[i + len(symlist[0])/2] = symlist[0][i]; + for i in range(len(symlist[0])): + retlist.append(symlist[0][i:] + symlist[0][0:i]); + return retlist; diff --git a/gr-fec/python/fec/capillary_threaded_decoder.py b/gr-fec/python/fec/capillary_threaded_decoder.py new file mode 100644 index 0000000000..9a00cde192 --- /dev/null +++ b/gr-fec/python/fec/capillary_threaded_decoder.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import fec_swig as fec +import math + +class capillary_threaded_decoder(gr.hier_block2): + def __init__(self, decoder_list_0, input_size, output_size): + gr.hier_block2.__init__( + self, "Capillary Threaded Decoder", + gr.io_signature(1, 1, input_size*1), + gr.io_signature(1, 1, output_size*1)) + + self.decoder_list_0 = decoder_list_0 + + check = math.log10(len(self.decoder_list_0)) / math.log10(2.0) + if(abs(check - int(check)) > 0): + gr.log.info("fec.capillary_threaded_decoder: number of decoders must be a power of 2.") + raise AttributeError + + self.deinterleaves_0 = [] + for i in range(int(math.log(len(decoder_list_0), 2))): + for j in range(int(math.pow(2, i))): + self.deinterleaves_0.append(blocks.deinterleave(input_size, + fec.get_decoder_input_size(decoder_list_0[0]))) + + self.generic_decoders_0 = [] + for i in range(len(decoder_list_0)): + self.generic_decoders_0.append(fec.decoder(decoder_list_0[i], input_size, output_size)) + + self.interleaves_0 = [] + for i in range(int(math.log(len(decoder_list_0), 2))): + for j in range(int(math.pow(2, i))): + self.interleaves_0.append(blocks.interleave(output_size, + fec.get_decoder_output_size(decoder_list_0[0]))) + + rootcount = 0 + branchcount = 1 + for i in range(int(math.log(len(decoder_list_0), 2)) - 1): + for j in range(int(math.pow(2, i))): + self.connect((self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0)) + self.connect((self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0)) + rootcount += 1 + branchcount += 2 + + codercount = 0 + for i in range(len(decoder_list_0)/2): + self.connect((self.deinterleaves_0[rootcount], 0), (self.generic_decoders_0[codercount], 0)) + self.connect((self.deinterleaves_0[rootcount], 1), (self.generic_decoders_0[codercount + 1], 0)) + rootcount += 1 + codercount += 2 + + rootcount = 0 + branchcount = 1 + for i in range(int(math.log(len(decoder_list_0), 2)) - 1): + for j in range(int(math.pow(2, i))): + self.connect((self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0)) + self.connect((self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1)) + rootcount += 1 + branchcount += 2 + + codercount = 0 + for i in range(len(decoder_list_0)/2): + self.connect((self.generic_decoders_0[codercount], 0), (self.interleaves_0[rootcount], 0)) + self.connect((self.generic_decoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1)) + rootcount += 1 + codercount += 2 + + if ((len(self.decoder_list_0)) > 1): + self.connect((self, 0), (self.deinterleaves_0[0], 0)) + self.connect((self.interleaves_0[0], 0), (self, 0)) + else: + self.connect((self, 0), (self.generic_decoders_0[0], 0)) + self.connect((self.generic_decoders_0[0], 0), (self, 0)) + + def get_decoder_list_0(self): + return self.decoder_list_0 + + def set_decoder_list_0(self, decoder_list_0): + self.decoder_list_0 = decoder_list_0 diff --git a/gr-fec/python/fec/capillary_threaded_encoder.py b/gr-fec/python/fec/capillary_threaded_encoder.py new file mode 100644 index 0000000000..21d4af62ca --- /dev/null +++ b/gr-fec/python/fec/capillary_threaded_encoder.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import fec_swig as fec +import math + +class capillary_threaded_encoder(gr.hier_block2): + def __init__(self, encoder_list_0, input_size=gr.sizeof_char, output_size=gr.sizeof_char): + gr.hier_block2.__init__(self, "Capillary Threaded Encoder", + gr.io_signature(1, 1, input_size), + gr.io_signature(1, 1, output_size)) + + self.encoder_list_0 = encoder_list_0 + + check = math.log10(len(self.encoder_list_0)) / math.log10(2.0) + if(abs(check - int(check)) > 0.0): + gr.log.info("fec.capillary_threaded_encoder: number of encoders must be a power of 2.") + raise AttributeError + + self.deinterleaves_0 = []; + for i in range(int(math.log(len(encoder_list_0), 2))): + for j in range(int(math.pow(2, i))): + self.deinterleaves_0.append(blocks.deinterleave(input_size, + fec.get_encoder_input_size(encoder_list_0[0]))) + + self.generic_encoders_0 = []; + for i in range(len(encoder_list_0)): + self.generic_encoders_0.append(fec.encoder(encoder_list_0[i], + input_size, output_size)) + + self.interleaves_0 = []; + for i in range(int(math.log(len(encoder_list_0), 2))): + for j in range(int(math.pow(2, i))): + self.interleaves_0.append(blocks.interleave(output_size, + fec.get_encoder_output_size(encoder_list_0[0]))) + + rootcount = 0; + branchcount = 1; + for i in range(int(math.log(len(encoder_list_0), 2)) - 1): + for j in range(int(math.pow(2, i))): + self.connect((self.deinterleaves_0[rootcount], 0), (self.deinterleaves_0[branchcount], 0)) + self.connect((self.deinterleaves_0[rootcount], 1), (self.deinterleaves_0[branchcount + 1], 0)) + rootcount += 1; + branchcount += 2; + + codercount = 0; + for i in range(len(encoder_list_0)/2): + self.connect((self.deinterleaves_0[rootcount], 0), (self.generic_encoders_0[codercount], 0)) + self.connect((self.deinterleaves_0[rootcount], 1), (self.generic_encoders_0[codercount + 1], 0)) + rootcount += 1; + codercount += 2; + + + rootcount = 0; + branchcount = 1; + for i in range(int(math.log(len(encoder_list_0), 2)) - 1): + for j in range(int(math.pow(2, i))): + self.connect((self.interleaves_0[branchcount], 0), (self.interleaves_0[rootcount], 0)) + self.connect((self.interleaves_0[branchcount + 1], 0), (self.interleaves_0[rootcount], 1)) + rootcount += 1; + branchcount += 2; + + + codercount = 0; + for i in range(len(encoder_list_0)/2): + self.connect((self.generic_encoders_0[codercount], 0), (self.interleaves_0[rootcount], 0)) + self.connect((self.generic_encoders_0[codercount + 1], 0), (self.interleaves_0[rootcount], 1)) + rootcount += 1; + codercount += 2; + + if((len(self.encoder_list_0)) > 1): + self.connect((self, 0), (self.deinterleaves_0[0], 0)) + self.connect((self.interleaves_0[0], 0), (self, 0)) + else: + self.connect((self, 0), (self.generic_encoders_0[0], 0)) + self.connect((self.generic_encoders_0[0], 0), (self, 0)) + + def get_encoder_list_0(self): + return self.encoder_list_0 + + def set_encoder_list_0(self, encoder_list_0): + self.encoder_list_0 = encoder_list_0 diff --git a/gr-fec/python/fec/extended_async_encoder.py b/gr-fec/python/fec/extended_async_encoder.py new file mode 100644 index 0000000000..fffe64aeb8 --- /dev/null +++ b/gr-fec/python/fec/extended_async_encoder.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr +import fec_swig as fec +from bitflip import read_bitlist +import weakref + +class extended_async_encoder(gr.hier_block2): + def __init__(self, encoder_obj_list, puncpat=None): + gr.hier_block2.__init__(self, "extended_async_encoder", + gr.io_signature(0, 0, 0), + gr.io_signature(0, 0, 0)) + + # Set us up as a message passing block + self.message_port_register_hier_in('in') + self.message_port_register_hier_out('out') + + self.puncpat=puncpat + + # If it's a list of encoders, take the first one, unless it's + # a list of lists of encoders. + if(type(encoder_obj_list) == list): + # This block doesn't handle parallelism of > 1 + if(type(encoder_obj_list[0]) == list): + gr.log.info("fec.extended_encoder: Parallelism must be 0 or 1.") + raise AttributeError + + encoder_obj = encoder_obj_list[0] + + # Otherwise, just take it as is + else: + encoder_obj = encoder_obj_list + + self.encoder = fec.async_encoder(encoder_obj) + + #self.puncture = None + #if self.puncpat != '11': + # self.puncture = fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0) + + self.msg_connect(weakref.proxy(self), "in", self.encoder, "in") + + #if(self.puncture): + # self.msg_connect(self.encoder, "out", self.puncture, "in") + # self.msg_connect(self.puncture, "out", weakref.proxy(self), "out") + #else: + # self.msg_connect(self.encoder, "out", weakref.proxy(self), "out") + self.msg_connect(self.encoder, "out", weakref.proxy(self), "out") diff --git a/gr-fec/python/fec/extended_decoder.py b/gr-fec/python/fec/extended_decoder.py new file mode 100644 index 0000000000..7e6cf452f9 --- /dev/null +++ b/gr-fec/python/fec/extended_decoder.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import fec_swig as fec +from bitflip import * +import sys + +if sys.modules.has_key("gnuradio.digital"): + digital = sys.modules["gnuradio.digital"] +else: + from gnuradio import digital + +from threaded_decoder import threaded_decoder +from capillary_threaded_decoder import capillary_threaded_decoder + +class extended_decoder(gr.hier_block2): + +#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density) +#for i in numpy.arange(.1, .499, .01): + #print str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i); + garbletable = { + 0.310786835319:0.1, + 0.279118162802:0.11, + 0.252699589071:0.12, + 0.230318516016:0.13, + 0.211108735347:0.14, + 0.194434959095:0.15, + 0.179820650401:0.16, + 0.166901324951:0.17, + 0.15539341766:0.18, + 0.145072979886:0.19, + 0.135760766313:0.2, + 0.127311581396:0.21, + 0.119606529806:0.22, + 0.112547286766:0.23, + 0.106051798775:0.24, + 0.10005101381:0.25, + 0.0944863633098:0.26, + 0.0893078003966:0.27, + 0.084472254501:0.28, + 0.0799424008658:0.29, + 0.0756856701944:0.3, + 0.0716734425668:0.31, + 0.0678803831565:0.32, + 0.0642838867856:0.33, + 0.0608636049994:0.34, + 0.0576010337489:0.35, + 0.0544791422522:0.36, + 0.0514820241933:0.37, + 0.0485945507251:0.38, + 0.0458019998183:0.39, + 0.0430896262596:0.4, + 0.0404421166935:0.41, + 0.0378428350972:0.42, + 0.0352726843274:0.43, + 0.0327082350617:0.44, + 0.0301183562535:0.45, + 0.0274574540266:0.46, + 0.0246498236897:0.47, + 0.0215448131298:0.48, + 0.0177274208353:0.49, + } + + def __init__(self, decoder_obj_list, threading, ann=None, puncpat='11', + integration_period=10000, flush=None, rotator=None): + gr.hier_block2.__init__(self, "extended_decoder", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_char)) + self.blocks=[] + self.ann=ann + self.puncpat=puncpat + self.flush=flush + + if(type(decoder_obj_list) == list): + if(type(decoder_obj_list[0]) == list): + gr.log.info("fec.extended_decoder: Parallelism must be 1.") + raise AttributeError + else: + # If it has parallelism of 0, force it into a list of 1 + decoder_obj_list = [decoder_obj_list,] + + message_collector_connected=False + + ##anything going through the annihilator needs shifted, uchar vals + if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \ + fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": + self.blocks.append(blocks.multiply_const_ff(48.0)) + + if fec.get_shift(decoder_obj_list[0]) != 0.0: + self.blocks.append(blocks.add_const_ff(fec.get_shift(decoder_obj_list[0]))) + elif fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": + self.blocks.append(blocks.add_const_ff(128.0)) + + if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "uchar" or \ + fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": + self.blocks.append(blocks.float_to_uchar()); + + const_index = 0; #index that corresponds to mod order for specinvert purposes + + if not self.flush: + flush = 10000; + else: + flush = self.flush; + if self.ann: #ann and puncpat are strings of 0s and 1s + cat = fec.ULLVector(); + for i in fec.read_big_bitlist(ann): + cat.append(i); + + synd_garble = .49 + idx_list = self.garbletable.keys() + idx_list.sort() + for i in idx_list: + if 1.0/self.ann.count('1') >= i: + synd_garble = self.garbletable[i] + print 'using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb' + print 'ceiling: .0335 data garble rate' + self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'), + len(ann), integration_period, flush, synd_garble)) + + if self.puncpat != '11': + self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0)) + + if fec.get_decoder_input_conversion(decoder_obj_list[0]) == "packed_bits": + self.blocks.append(blocks.uchar_to_float()) + self.blocks.append(blocks.add_const_ff(-128.0)) + self.blocks.append(digital.binary_slicer_fb()) + self.blocks.append(blocks.unpacked_to_packed_bb(1,0)) + + if(len(decoder_obj_list) > 1): + if(fec.get_history(decoder_obj_list[0]) != 0): + gr.log.info("fec.extended_decoder: Cannot use multi-threaded parallelism on a decoder with history.") + raise AttributeError + + if threading == 'capillary': + self.blocks.append(capillary_threaded_decoder(decoder_obj_list, + fec.get_decoder_input_item_size(decoder_obj_list[0]), + fec.get_decoder_output_item_size(decoder_obj_list[0]))) + + elif threading == 'ordinary': + self.blocks.append(threaded_decoder(decoder_obj_list, + fec.get_decoder_input_item_size(decoder_obj_list[0]), + fec.get_decoder_output_item_size(decoder_obj_list[0]))) + + else: + self.blocks.append(fec.decoder(decoder_obj_list[0], + fec.get_decoder_input_item_size(decoder_obj_list[0]), + fec.get_decoder_output_item_size(decoder_obj_list[0]))) + + if fec.get_decoder_output_conversion(decoder_obj_list[0]) == "unpack": + self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)); + + self.connect((self, 0), (self.blocks[0], 0)); + self.connect((self.blocks[-1], 0), (self, 0)); + + for i in range(len(self.blocks) - 1): + self.connect((self.blocks[i], 0), (self.blocks[i+1], 0)); diff --git a/gr-fec/python/fec/extended_encoder.py b/gr-fec/python/fec/extended_encoder.py new file mode 100644 index 0000000000..50a8891ea5 --- /dev/null +++ b/gr-fec/python/fec/extended_encoder.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks + +import fec_swig as fec +from threaded_encoder import threaded_encoder +from capillary_threaded_encoder import capillary_threaded_encoder +from bitflip import read_bitlist + +class extended_encoder(gr.hier_block2): + def __init__(self, encoder_obj_list, threading, puncpat=None): + gr.hier_block2.__init__(self, "extended_encoder", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_char)) + + self.blocks=[] + self.puncpat=puncpat + + if(type(encoder_obj_list) == list): + if(type(encoder_obj_list[0]) == list): + gr.log.info("fec.extended_encoder: Parallelism must be 1.") + raise AttributeError + else: + # If it has parallelism of 0, force it into a list of 1 + encoder_obj_list = [encoder_obj_list,] + + if fec.get_encoder_input_conversion(encoder_obj_list[0]) == "pack": + self.blocks.append(blocks.pack_k_bits_bb(8)) + + if threading == 'capillary': + self.blocks.append(capillary_threaded_encoder(encoder_obj_list, + gr.sizeof_char, + gr.sizeof_char)) + elif threading == 'ordinary': + self.blocks.append(threaded_encoder(encoder_obj_list, + gr.sizeof_char, + gr.sizeof_char)) + else: + self.blocks.append(fec.encoder(encoder_obj_list[0], + gr.sizeof_char, + gr.sizeof_char)) + + if self.puncpat != '11': + self.blocks.append(fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0)) + + # Connect the input to the encoder and the output to the + # puncture if used or the encoder if not. + self.connect((self, 0), (self.blocks[0], 0)); + self.connect((self.blocks[-1], 0), (self, 0)); + + # If using the puncture block, add it into the flowgraph after + # the encoder. + for i in range(len(self.blocks) - 1): + self.connect((self.blocks[i], 0), (self.blocks[i+1], 0)); diff --git a/gr-fec/python/fec/extended_tagged_decoder.py b/gr-fec/python/fec/extended_tagged_decoder.py new file mode 100644 index 0000000000..1865cbfbe4 --- /dev/null +++ b/gr-fec/python/fec/extended_tagged_decoder.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import fec_swig as fec +from bitflip import * +import sys + +if sys.modules.has_key("gnuradio.digital"): + digital = sys.modules["gnuradio.digital"] +else: + from gnuradio import digital + +class extended_tagged_decoder(gr.hier_block2): + +#solution to log_(1-2*t)(1-2*.0335) = 1/taps where t is thresh (syndrome density) +#for i in numpy.arange(.1, .499, .01): + #print str(log((1-(2 * .035)), (1-(2 * i)))) + ':' + str(i); + garbletable = { + 0.310786835319:0.1, + 0.279118162802:0.11, + 0.252699589071:0.12, + 0.230318516016:0.13, + 0.211108735347:0.14, + 0.194434959095:0.15, + 0.179820650401:0.16, + 0.166901324951:0.17, + 0.15539341766:0.18, + 0.145072979886:0.19, + 0.135760766313:0.2, + 0.127311581396:0.21, + 0.119606529806:0.22, + 0.112547286766:0.23, + 0.106051798775:0.24, + 0.10005101381:0.25, + 0.0944863633098:0.26, + 0.0893078003966:0.27, + 0.084472254501:0.28, + 0.0799424008658:0.29, + 0.0756856701944:0.3, + 0.0716734425668:0.31, + 0.0678803831565:0.32, + 0.0642838867856:0.33, + 0.0608636049994:0.34, + 0.0576010337489:0.35, + 0.0544791422522:0.36, + 0.0514820241933:0.37, + 0.0485945507251:0.38, + 0.0458019998183:0.39, + 0.0430896262596:0.4, + 0.0404421166935:0.41, + 0.0378428350972:0.42, + 0.0352726843274:0.43, + 0.0327082350617:0.44, + 0.0301183562535:0.45, + 0.0274574540266:0.46, + 0.0246498236897:0.47, + 0.0215448131298:0.48, + 0.0177274208353:0.49, + } + + def __init__(self, decoder_obj_list, ann=None, puncpat='11', + integration_period=10000, flush=None, rotator=None, lentagname=None): + gr.hier_block2.__init__(self, "extended_decoder", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_char)) + self.blocks=[] + self.ann=ann + self.puncpat=puncpat + self.flush=flush + + if(type(decoder_obj_list) == list): + # This block doesn't handle parallelism of > 1 + # We could just grab encoder [0][0], but we don't want to encourage this. + if(type(decoder_obj_list[0]) == list): + gr.log.info("fec.extended_tagged_decoder: Parallelism must be 1.") + raise AttributeError + + decoder_obj = decoder_obj_list[0] + + # Otherwise, just take it as is + else: + decoder_obj = decoder_obj_list + + # If lentagname is None, fall back to using the non tagged + # stream version + if type(lentagname) == str: + if(lentagname.lower() == 'none'): + lentagname = None + + message_collector_connected=False + + ##anything going through the annihilator needs shifted, uchar vals + if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \ + fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": + self.blocks.append(blocks.multiply_const_ff(48.0)) + + if fec.get_shift(decoder_obj) != 0.0: + self.blocks.append(blocks.add_const_ff(fec.get_shift(decoder_obj))) + elif fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": + self.blocks.append(blocks.add_const_ff(128.0)) + + if fec.get_decoder_input_conversion(decoder_obj) == "uchar" or \ + fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": + self.blocks.append(blocks.float_to_uchar()); + + const_index = 0; #index that corresponds to mod order for specinvert purposes + + if not self.flush: + flush = 10000; + else: + flush = self.flush; + if self.ann: #ann and puncpat are strings of 0s and 1s + cat = fec.ULLVector(); + for i in fec.read_big_bitlist(ann): + cat.append(i); + + synd_garble = .49 + idx_list = self.garbletable.keys() + idx_list.sort() + for i in idx_list: + if 1.0/self.ann.count('1') >= i: + synd_garble = self.garbletable[i] + print 'using syndrom garble threshold ' + str(synd_garble) + 'for conv_bit_corr_bb' + print 'ceiling: .0335 data garble rate' + self.blocks.append(fec.conv_bit_corr_bb(cat, len(puncpat) - puncpat.count('0'), + len(ann), integration_period, flush, synd_garble)) + + if self.puncpat != '11': + self.blocks.append(fec.depuncture_bb(len(puncpat), read_bitlist(puncpat), 0)) + + if fec.get_decoder_input_conversion(decoder_obj) == "packed_bits": + self.blocks.append(blocks.uchar_to_float()) + self.blocks.append(blocks.add_const_ff(-128.0)) + self.blocks.append(digital.binary_slicer_fb()) + self.blocks.append(blocks.unpacked_to_packed_bb(1,0)) + + else: + if(not lentagname): + self.blocks.append(fec.decoder(decoder_obj, + fec.get_decoder_input_item_size(decoder_obj), + fec.get_decoder_output_item_size(decoder_obj))) + else: + self.blocks.append(fec.tagged_decoder(decoder_obj, + fec.get_decoder_input_item_size(decoder_obj), + fec.get_decoder_output_item_size(decoder_obj), + lentagname)) + + if fec.get_decoder_output_conversion(decoder_obj) == "unpack": + self.blocks.append(blocks.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)); + + self.connect((self, 0), (self.blocks[0], 0)); + self.connect((self.blocks[-1], 0), (self, 0)); + + for i in range(len(self.blocks) - 1): + self.connect((self.blocks[i], 0), (self.blocks[i+1], 0)); diff --git a/gr-fec/python/fec/extended_tagged_encoder.py b/gr-fec/python/fec/extended_tagged_encoder.py new file mode 100644 index 0000000000..2f78b8e5c9 --- /dev/null +++ b/gr-fec/python/fec/extended_tagged_encoder.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks + +import fec_swig as fec +from bitflip import read_bitlist + +class extended_tagged_encoder(gr.hier_block2): + def __init__(self, encoder_obj_list, puncpat=None, lentagname=None): + gr.hier_block2.__init__(self, "extended_tagged_encoder", + gr.io_signature(1, 1, gr.sizeof_char), + gr.io_signature(1, 1, gr.sizeof_char)) + + self.blocks=[] + self.puncpat=puncpat + + # If it's a list of encoders, take the first one, unless it's + # a list of lists of encoders. + if(type(encoder_obj_list) == list): + # This block doesn't handle parallelism of > 1 + # We could just grab encoder [0][0], but we don't want to encourage this. + if(type(encoder_obj_list[0]) == list): + gr.log.info("fec.extended_tagged_encoder: Parallelism must be 0 or 1.") + raise AttributeError + + encoder_obj = encoder_obj_list[0] + + # Otherwise, just take it as is + else: + encoder_obj = encoder_obj_list + + # If lentagname is None, fall back to using the non tagged + # stream version + if type(lentagname) == str: + if(lentagname.lower() == 'none'): + lentagname = None + + if fec.get_encoder_input_conversion(encoder_obj) == "pack": + self.blocks.append(blocks.pack_k_bits_bb(8)) + + if(not lentagname): + self.blocks.append(fec.encoder(encoder_obj, + gr.sizeof_char, + gr.sizeof_char)) + else: + self.blocks.append(fec.tagged_encoder(encoder_obj, + gr.sizeof_char, + gr.sizeof_char, + lentagname)) + + if self.puncpat != '11': + self.blocks.append(fec.puncture_bb(len(puncpat), read_bitlist(puncpat), 0)) + + # Connect the input to the encoder and the output to the + # puncture if used or the encoder if not. + self.connect((self, 0), (self.blocks[0], 0)); + self.connect((self.blocks[-1], 0), (self, 0)); + + # If using the puncture block, add it into the flowgraph after + # the encoder. + for i in range(len(self.blocks) - 1): + self.connect((self.blocks[i], 0), (self.blocks[i+1], 0)); diff --git a/gr-fec/python/fec/fec_test.py b/gr-fec/python/fec/fec_test.py new file mode 100644 index 0000000000..6466a0bcb4 --- /dev/null +++ b/gr-fec/python/fec/fec_test.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio.fec.bitflip import read_bitlist +from gnuradio import gr, blocks, analog +import math +import sys + +if sys.modules.has_key("gnuradio.digital"): + digital = sys.modules["gnuradio.digital"] +else: + from gnuradio import digital + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class fec_test(gr.hier_block2): + + def __init__(self, generic_encoder=0, generic_decoder=0, esno=0, + samp_rate=3200000, threading="capillary", puncpat='11', + seed=0): + gr.hier_block2.__init__(self, "fec_test", + gr.io_signature(1, 1, gr.sizeof_char*1), + gr.io_signature(2, 2, gr.sizeof_char*1)) + + self.generic_encoder = generic_encoder + self.generic_decoder = generic_decoder + self.esno = esno + self.samp_rate = samp_rate + self.threading = threading + self.puncpat = puncpat + + self.map_bb = digital.map_bb(([-1, 1])) + self.b2f = blocks.char_to_float(1, 1) + + self.unpack8 = blocks.unpack_k_bits_bb(8) + self.pack8 = blocks.pack_k_bits_bb(8) + + self.encoder = extended_encoder(encoder_obj_list=generic_encoder, + threading=threading, + puncpat=puncpat) + + self.decoder = extended_decoder(decoder_obj_list=generic_decoder, + threading=threading, + ann=None, puncpat=puncpat, + integration_period=10000, rotator=None) + + noise = math.sqrt((10.0**(-esno/10.0))/2.0) + #self.fastnoise = analog.fastnoise_source_f(analog.GR_GAUSSIAN, noise, seed, 8192) + self.fastnoise = analog.noise_source_f(analog.GR_GAUSSIAN, noise, seed) + self.addnoise = blocks.add_ff(1) + + # Send packed input directly to the second output + self.copy_packed = blocks.copy(gr.sizeof_char) + self.connect(self, self.copy_packed) + self.connect(self.copy_packed, (self, 1)) + + # Unpack inputl encode, convert to +/-1, add noise, decode, repack + self.connect(self, self.unpack8) + self.connect(self.unpack8, self.encoder) + self.connect(self.encoder, self.map_bb) + self.connect(self.map_bb, self.b2f) + self.connect(self.b2f, (self.addnoise, 0)) + self.connect(self.fastnoise, (self.addnoise,1)) + self.connect(self.addnoise, self.decoder) + self.connect(self.decoder, self.pack8) + self.connect(self.pack8, (self, 0)) + + + def get_generic_encoder(self): + return self.generic_encoder + + def set_generic_encoder(self, generic_encoder): + self.generic_encoder = generic_encoder + + def get_generic_decoder(self): + return self.generic_decoder + + def set_generic_decoder(self, generic_decoder): + self.generic_decoder = generic_decoder + + def get_esno(self): + return self.esno + + def set_esno(self, esno): + self.esno = esno + + def get_samp_rate(self): + return self.samp_rate + + def set_samp_rate(self, samp_rate): + self.samp_rate = samp_rate + + def get_threading(self): + return self.threading + + def set_threading(self, threading): + self.threading = threading + + def get_puncpat(self): + return self.puncpat + + def set_puncpat(self, puncpat): + self.puncpat = puncpat diff --git a/gr-fec/python/fec/qa_depuncture.py b/gr-fec/python/fec/qa_depuncture.py new file mode 100644 index 0000000000..5566e83a25 --- /dev/null +++ b/gr-fec/python/fec/qa_depuncture.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +import blocks_swig as blocks +from collections import deque + +class test_depuncture (gr_unittest.TestCase): + + def depuncture_setup(self): + p = [] + for i in range(self.puncsize): + p.append(self.puncpat >> (self.puncsize - 1 - i) & 1) + d = deque(p) + d.rotate(self.delay) + _puncpat = list(d) + + k = 0 + self.expected = [] + for n in range(len(self.src_data)/(self.puncsize - self.puncholes)): + for i in range(self.puncsize): + if _puncpat[i] == 1: + self.expected.append(self.src_data[k]); + k+=1 + else: + self.expected.append(self.sym) + + def setUp(self): + self.src_data = 2000*range(64) + self.tb = gr.top_block () + + def tearDown(self): + self.tb = None + + def test_000(self): + # Test normal operation of the depuncture block + + self.puncsize = 8 + self.puncpat = 0xEF + self.delay = 0 + self.sym = 0 + self.puncholes = 1 + + self.depuncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.depuncture_bb(self.puncsize, self.puncpat, + self.delay, self.sym) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + + def test_001(self): + # Test normal operation of the depuncture block with a delay + + self.puncsize = 8 + self.puncpat = 0xEF + self.delay = 1 + self.sym = 0 + self.puncholes = 1 + + self.depuncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.depuncture_bb(self.puncsize, self.puncpat, + self.delay, self.sym) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + + def test_002(self): + # Test scenario where we have defined a puncture pattern with + # more bits than the puncsize. + + self.puncsize = 4 + self.puncpat = 0x5555 + self.delay = 0 + self.sym = 0 + self.puncholes = 2 + + self.depuncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.depuncture_bb(self.puncsize, self.puncpat, + self.delay, self.sym) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + + def test_003(self): + # Test scenario where we have defined a puncture pattern with + # more bits than the puncsize with a delay. The python code + # doesn't account for this when creating self.expected, but + # this should be equivalent to a puncpat of the correct size. + + self.puncsize = 4 + self.puncpat0 = 0x5555 # too many bits set + self.puncpat1 = 0x55 # num bits = puncsize + self.delay = 1 + self.sym = 0 + + src = blocks.vector_source_b(self.src_data) + op0 = fec.depuncture_bb(self.puncsize, self.puncpat0, + self.delay, self.sym) + op1 = fec.depuncture_bb(self.puncsize, self.puncpat1, + self.delay, self.sym) + dst0 = blocks.vector_sink_b() + dst1 = blocks.vector_sink_b() + + self.tb.connect(src, op0, dst0) + self.tb.connect(src, op1, dst1) + self.tb.run() + + dst_data0 = list(dst0.data()) + for i in xrange(len(dst_data0)): + dst_data0[i] = int(dst_data0[i]) + + dst_data1 = list(dst1.data()) + for i in xrange(len(dst_data1)): + dst_data1[i] = int(dst_data1[i]) + + self.assertEqual(dst_data1, dst_data0) + + def test_004(self): + # Test normal operation of the depuncture block without + # specifying the fill symbol (defaults to 127). + + self.puncsize = 8 + self.puncpat = 0xEF + self.delay = 0 + self.sym = 127 + self.puncholes = 1 + + self.depuncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.depuncture_bb(self.puncsize, self.puncpat, + self.delay) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + +if __name__ == '__main__': + gr_unittest.run(test_depuncture, "test_depuncture.xml") diff --git a/gr-fec/python/fec/qa_fecapi_cc.py b/gr-fec/python/fec/qa_fecapi_cc.py new file mode 100644 index 0000000000..bbd500161e --- /dev/null +++ b/gr-fec/python/fec/qa_fecapi_cc.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +from _qa_helper import _qa_helper + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class test_fecapi_cc(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_parallelism0_00(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = fec.cc_encoder_make(frame_size*8, k, rate, polys) + dec = fec.cc_decoder.make(frame_size*8, k, rate, polys) + threading = None + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism0_01(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = fec.cc_encoder_make(frame_size*8, k, rate, polys) + dec = fec.cc_decoder.make(frame_size*8, k, rate, polys) + threading = 'ordinary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism0_02(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = fec.cc_encoder_make(frame_size*8, k, rate, polys) + dec = fec.cc_decoder.make(frame_size*8, k, rate, polys) + threading = 'capillary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_00(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1)) + threading = None + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_01(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1)) + threading = 'ordinary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_02(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1)) + threading = 'capillary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_03(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + mode = fec.CC_TERMINATED + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + threading = 'capillary' + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_04(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + mode = fec.CC_TRUNCATED + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + threading = 'capillary' + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_05(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + mode = fec.CC_TAILBITING + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + threading = 'capillary' + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + +if __name__ == '__main__': + gr_unittest.run(test_fecapi_cc, "test_fecapi_cc.xml") diff --git a/gr-fec/python/fec/qa_fecapi_dummy.py b/gr-fec/python/fec/qa_fecapi_dummy.py new file mode 100644 index 0000000000..a07890fb84 --- /dev/null +++ b/gr-fec/python/fec/qa_fecapi_dummy.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +from _qa_helper import _qa_helper + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class test_fecapi_dummy(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_parallelism0_00(self): + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + dec = fec.dummy_decoder.make(frame_size*8) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_01(self): + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + dec = fec.dummy_decoder.make(frame_size*8) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_02(self): + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + dec = fec.dummy_decoder.make(frame_size*8) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_00(self): + frame_size = 30 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1)) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_01(self): + frame_size = 30 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1)) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_02(self): + frame_size = 300 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1)) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_03(self): + frame_size = 30 + dims = 10 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'ordinary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_04(self): + frame_size = 30 + dims = 16 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'capillary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_05(self): + frame_size = 30 + dims = 5 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + #dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_encoder(enc, threading=threading, puncpat="11")) + + def test_parallelism1_06(self): + frame_size = 30 + dims = 5 + #enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_decoder(dec, threading=threading, puncpat="11")) + + def test_parallelism2_00(self): + frame_size = 30 + dims1 = 16 + dims2 = 16 + enc = map((lambda b: map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)) + #dec = map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_encoder(enc, threading=threading, puncpat="11")) + + def test_parallelism2_01(self): + frame_size = 30 + dims1 = 16 + dims2 = 16 + dec = map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_decoder(dec, threading=threading, puncpat="11")) + +if __name__ == '__main__': + gr_unittest.run(test_fecapi_dummy, "test_fecapi_dummy.xml") diff --git a/gr-fec/python/fec/qa_fecapi_repetition.py b/gr-fec/python/fec/qa_fecapi_repetition.py new file mode 100644 index 0000000000..7998d61bd1 --- /dev/null +++ b/gr-fec/python/fec/qa_fecapi_repetition.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +from _qa_helper import _qa_helper + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class test_fecapi_repetition(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_parallelism0_00(self): + frame_size = 30 + rep = 3 + enc = fec.repetition_encoder_make(frame_size*8, rep) + dec = fec.repetition_decoder.make(frame_size*8, rep) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_01(self): + frame_size = 30 + rep = 3 + enc = fec.repetition_encoder_make(frame_size*8, rep) + dec = fec.repetition_decoder.make(frame_size*8, rep) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_02(self): + frame_size = 30 + rep = 3 + enc = fec.repetition_encoder_make(frame_size*8, rep) + dec = fec.repetition_decoder.make(frame_size*8, rep) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_00(self): + frame_size = 30 + rep = 3 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1)) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_01(self): + frame_size = 30 + rep = 3 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1)) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_02(self): + frame_size = 300 + rep = 3 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1)) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_03(self): + frame_size = 30 + rep = 3 + dims = 10 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,dims)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,dims)) + threading = 'ordinary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_04(self): + frame_size = 30 + rep = 3 + dims = 16 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,dims)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,dims)) + threading = 'capillary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + +if __name__ == '__main__': + gr_unittest.run(test_fecapi_repetition, "test_fecapi_repetition.xml") diff --git a/gr-fec/python/fec/qa_puncture.py b/gr-fec/python/fec/qa_puncture.py new file mode 100644 index 0000000000..fdd15c9a08 --- /dev/null +++ b/gr-fec/python/fec/qa_puncture.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +import blocks_swig as blocks +from collections import deque + +class test_puncture (gr_unittest.TestCase): + + def puncture_setup(self): + p = [] + for i in range(self.puncsize): + p.append(self.puncpat >> (self.puncsize - 1 - i) & 1) + d = deque(p) + d.rotate(self.delay) + _puncpat = list(d) + + self.expected = [] + for n in range(len(self.src_data)/self.puncsize): + for i in range(self.puncsize): + if _puncpat[i] == 1: + self.expected.append(self.src_data[n*self.puncsize+i]); + + def setUp(self): + self.src_data = 10000*range(64) + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_000(self): + # Test normal operation of the puncture block + + self.puncsize = 8 + self.puncpat = 0xEF + self.delay = 0 + + self.puncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.puncture_bb(self.puncsize, self.puncpat, self.delay) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + + + def test_001(self): + # Test normal operation of the puncture block with a delay + + self.puncsize = 8 + self.puncpat = 0xEE + self.delay = 1 + + self.src_data = range(16) + + self.puncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.puncture_bb(self.puncsize, self.puncpat, self.delay) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + + + def test_002(self): + # Test scenario where we have defined a puncture pattern with + # more bits than the puncsize. + + self.puncsize = 4 + self.puncpat = 0x5555 + self.delay = 0 + + self.puncture_setup() + + src = blocks.vector_source_b(self.src_data) + op = fec.puncture_bb(self.puncsize, self.puncpat, self.delay) + dst = blocks.vector_sink_b() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + for i in xrange(len(dst_data)): + dst_data[i] = int(dst_data[i]) + + self.assertEqual(self.expected, dst_data) + + def test_003(self): + # Test scenario where we have defined a puncture pattern with + # more bits than the puncsize with a delay. The python code + # doesn't account for this when creating self.expected, but + # this should be equivalent to a puncpat of the correct size. + + self.puncsize = 4 + self.puncpat0 = 0x5555 # too many bits set + self.puncpat1 = 0x55 # num bits = puncsize + self.delay = 1 + + src = blocks.vector_source_b(self.src_data) + op0 = fec.puncture_bb(self.puncsize, self.puncpat0, self.delay) + op1 = fec.puncture_bb(self.puncsize, self.puncpat1, self.delay) + dst0 = blocks.vector_sink_b() + dst1 = blocks.vector_sink_b() + + self.tb.connect(src, op0, dst0) + self.tb.connect(src, op1, dst1) + self.tb.run() + + dst_data0 = list(dst0.data()) + for i in xrange(len(dst_data0)): + dst_data0[i] = int(dst_data0[i]) + + dst_data1 = list(dst1.data()) + for i in xrange(len(dst_data1)): + dst_data1[i] = int(dst_data1[i]) + + self.assertEqual(dst_data1, dst_data0) + + + + def test_f_000(self): + # Test normal operation of the float puncture block + + self.puncsize = 8 + self.puncpat = 0xEF + self.delay = 0 + + self.puncture_setup() + + src = blocks.vector_source_f(self.src_data) + op = fec.puncture_ff(self.puncsize, self.puncpat, self.delay) + dst = blocks.vector_sink_f() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + self.assertEqual(self.expected, dst_data) + + + def test_f_001(self): + # Test normal operation of the puncture block with a delay + + self.puncsize = 8 + self.puncpat = 0xEE + self.delay = 1 + + self.src_data = range(16) + + self.puncture_setup() + + src = blocks.vector_source_f(self.src_data) + op = fec.puncture_ff(self.puncsize, self.puncpat, self.delay) + dst = blocks.vector_sink_f() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + self.assertEqual(self.expected, dst_data) + + + def test_f_002(self): + # Test scenariou where we have defined a puncture pattern with + # more bits than the puncsize. + + self.puncsize = 4 + self.puncpat = 0x5555 + self.delay = 0 + + self.puncture_setup() + + src = blocks.vector_source_f(self.src_data) + op = fec.puncture_ff(self.puncsize, self.puncpat, self.delay) + dst = blocks.vector_sink_f() + + self.tb.connect(src, op, dst) + self.tb.run() + + dst_data = list(dst.data()) + self.assertEqual(self.expected, dst_data) + + def test_f_003(self): + # Test scenariou where we have defined a puncture pattern with + # more bits than the puncsize with a delay. The python code + # doesn't account for this when creating self.expected, but + # this should be equivalent to a puncpat of the correct size. + + self.puncsize = 4 + self.puncpat0 = 0x5555 # too many bits set + self.puncpat1 = 0x55 # num bits = puncsize + self.delay = 1 + + src = blocks.vector_source_f(self.src_data) + op0 = fec.puncture_ff(self.puncsize, self.puncpat0, self.delay) + op1 = fec.puncture_ff(self.puncsize, self.puncpat1, self.delay) + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + + self.tb.connect(src, op0, dst0) + self.tb.connect(src, op1, dst1) + self.tb.run() + + dst_data0 = list(dst0.data()) + dst_data1 = list(dst1.data()) + + self.assertEqual(dst_data1, dst_data0) + +if __name__ == '__main__': + gr_unittest.run(test_puncture, "test_puncture.xml") diff --git a/gr-fec/python/fec/threaded_decoder.py b/gr-fec/python/fec/threaded_decoder.py new file mode 100644 index 0000000000..115ad7b5d3 --- /dev/null +++ b/gr-fec/python/fec/threaded_decoder.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import fec_swig as fec + +class threaded_decoder(gr.hier_block2): + def __init__(self, decoder_list_0, input_size, output_size): + gr.hier_block2.__init__( + self, "Threaded Decoder", + gr.io_signature(1, 1, input_size*1), + gr.io_signature(1, 1, output_size*1)) + + self.decoder_list_0 = decoder_list_0 + + self.deinterleave_0 = blocks.deinterleave(input_size, + fec.get_decoder_input_size(decoder_list_0[0])) + + self.generic_decoders_0 = [] + for i in range(len(decoder_list_0)): + self.generic_decoders_0.append(fec.decoder(decoder_list_0[i], + input_size, output_size)) + + self.interleave_0 = blocks.interleave(output_size, + fec.get_decoder_output_size(decoder_list_0[0])) + + for i in range(len(decoder_list_0)): + self.connect((self.deinterleave_0, i), (self.generic_decoders_0[i], 0)) + + for i in range(len(decoder_list_0)): + self.connect((self.generic_decoders_0[i], 0), (self.interleave_0, i)) + + + self.connect((self, 0), (self.deinterleave_0, 0)) + self.connect((self.interleave_0, 0), (self, 0)) + + def get_decoder_list_0(self): + return self.decoder_list_0 + + def set_decoder_list_0(self, decoder_list_0): + self.decoder_list_0 = decoder_list_0 diff --git a/gr-fec/python/fec/threaded_encoder.py b/gr-fec/python/fec/threaded_encoder.py new file mode 100644 index 0000000000..391baa5442 --- /dev/null +++ b/gr-fec/python/fec/threaded_encoder.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, blocks +import fec_swig as fec + +class threaded_encoder(gr.hier_block2): + def __init__(self, encoder_list_0, input_size, output_size): + gr.hier_block2.__init__( + self, "Threaded Encoder", + gr.io_signature(1, 1, input_size*1), + gr.io_signature(1, 1, output_size*1)) + + self.encoder_list_0 = encoder_list_0 + + self.fec_deinterleave_0 = blocks.deinterleave(input_size, + fec.get_encoder_input_size(encoder_list_0[0])) + + self.generic_encoders_0 = []; + for i in range(len(encoder_list_0)): + self.generic_encoders_0.append(fec.encoder(encoder_list_0[i], + input_size, output_size)) + + self.fec_interleave_0 = blocks.interleave(output_size, + fec.get_encoder_output_size(encoder_list_0[0])) + + for i in range(len(encoder_list_0)): + self.connect((self.fec_deinterleave_0, i), (self.generic_encoders_0[i], 0)) + + for i in range(len(encoder_list_0)): + self.connect((self.generic_encoders_0[i], 0), (self.fec_interleave_0, i)) + + self.connect((self, 0), (self.fec_deinterleave_0, 0)) + self.connect((self.fec_interleave_0, 0), (self, 0)) + + def get_encoder_list_0(self): + return self.encoder_list_0 + + def set_encoder_list_0(self, encoder_list_0): + self.encoder_list_0 = encoder_list_0 |