diff options
author | Tom Rondeau <tom@trondeau.com> | 2014-05-19 21:27:40 -0400 |
---|---|---|
committer | Tom Rondeau <tom@trondeau.com> | 2014-05-19 21:27:40 -0400 |
commit | eebc74a7d6e37a61bc7c3f21c4aa1b94b8d28be3 (patch) | |
tree | 81b48289c9b8f6ca4e0cca82ec482583d3c0656d /gr-fec | |
parent | fbbde8d01ab43b7a3ffd5ca9d65cf50a84e2a7be (diff) |
fec: wip: adding qa code.
capillary threaded encoders and decoders check to make sure dimension is a power of 2 for proper threading.
Diffstat (limited to 'gr-fec')
-rw-r--r-- | gr-fec/examples/fecapi_decoders.grc | 116 | ||||
-rwxr-xr-x | gr-fec/python/fec/_qa_helper.py | 90 | ||||
-rw-r--r-- | gr-fec/python/fec/capillary_threaded_decoder.py | 5 | ||||
-rw-r--r-- | gr-fec/python/fec/capillary_threaded_encoder.py | 5 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_fecapi_cc.py | 195 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_fecapi_dummy.py | 190 | ||||
-rw-r--r-- | gr-fec/python/fec/qa_fecapi_repetition.py | 161 |
7 files changed, 706 insertions, 56 deletions
diff --git a/gr-fec/examples/fecapi_decoders.grc b/gr-fec/examples/fecapi_decoders.grc index 46e6064d83..e48ee596a7 100644 --- a/gr-fec/examples/fecapi_decoders.grc +++ b/gr-fec/examples/fecapi_decoders.grc @@ -1,6 +1,6 @@ <?xml version='1.0' encoding='ASCII'?> <flow_graph> - <timestamp>Sat May 17 16:52:55 2014</timestamp> + <timestamp>Mon May 19 20:50:43 2014</timestamp> <block> <key>options</key> <param> @@ -68,7 +68,7 @@ <key>variable</key> <param> <key>id</key> - <value>polys</value> + <value>samp_rate</value> </param> <param> <key>_enabled</key> @@ -76,7 +76,7 @@ </param> <param> <key>value</key> - <value>[109, 79]</value> + <value>50000</value> </param> <param> <key>alias</key> @@ -84,7 +84,7 @@ </param> <param> <key>_coordinate</key> - <value>(98, 623)</value> + <value>(10, 74)</value> </param> <param> <key>_rotation</key> @@ -95,7 +95,7 @@ <key>variable</key> <param> <key>id</key> - <value>rate</value> + <value>k</value> </param> <param> <key>_enabled</key> @@ -103,7 +103,7 @@ </param> <param> <key>value</key> - <value>2</value> + <value>7</value> </param> <param> <key>alias</key> @@ -111,7 +111,7 @@ </param> <param> <key>_coordinate</key> - <value>(123, 562)</value> + <value>(58, 562)</value> </param> <param> <key>_rotation</key> @@ -122,7 +122,7 @@ <key>variable</key> <param> <key>id</key> - <value>k</value> + <value>rate</value> </param> <param> <key>_enabled</key> @@ -130,7 +130,7 @@ </param> <param> <key>value</key> - <value>7</value> + <value>2</value> </param> <param> <key>alias</key> @@ -138,7 +138,7 @@ </param> <param> <key>_coordinate</key> - <value>(58, 562)</value> + <value>(123, 562)</value> </param> <param> <key>_rotation</key> @@ -149,7 +149,7 @@ <key>variable</key> <param> <key>id</key> - <value>samp_rate</value> + <value>polys</value> </param> <param> <key>_enabled</key> @@ -157,7 +157,7 @@ </param> <param> <key>value</key> - <value>50000</value> + <value>[109, 79]</value> </param> <param> <key>alias</key> @@ -165,7 +165,7 @@ </param> <param> <key>_coordinate</key> - <value>(10, 74)</value> + <value>(98, 623)</value> </param> <param> <key>_rotation</key> @@ -368,49 +368,6 @@ </param> </block> <block> - <key>variable_dummy_encoder_def</key> - <param> - <key>id</key> - <value>enc_dummy</value> - </param> - <param> - <key>_enabled</key> - <value>True</value> - </param> - <param> - <key>value</key> - <value>"ok"</value> - </param> - <param> - <key>ndim</key> - <value>1</value> - </param> - <param> - <key>dim1</key> - <value>1</value> - </param> - <param> - <key>dim2</key> - <value>1</value> - </param> - <param> - <key>framebits</key> - <value>frame_size*8</value> - </param> - <param> - <key>alias</key> - <value></value> - </param> - <param> - <key>_coordinate</key> - <value>(370, 562)</value> - </param> - <param> - <key>_rotation</key> - <value>0</value> - </param> - </block> - <block> <key>variable_cc_decoder_def</key> <param> <key>id</key> @@ -465,6 +422,10 @@ <value>fec.CC_TAILBITING</value> </param> <param> + <key>padding</key> + <value>False</value> + </param> + <param> <key>alias</key> <value></value> </param> @@ -1838,6 +1799,49 @@ <value>0</value> </param> </block> + <block> + <key>variable_dummy_encoder_def</key> + <param> + <key>id</key> + <value>enc_dummy</value> + </param> + <param> + <key>_enabled</key> + <value>True</value> + </param> + <param> + <key>value</key> + <value>"ok"</value> + </param> + <param> + <key>ndim</key> + <value>2</value> + </param> + <param> + <key>dim1</key> + <value>1</value> + </param> + <param> + <key>dim2</key> + <value>4</value> + </param> + <param> + <key>framebits</key> + <value>frame_size*8</value> + </param> + <param> + <key>alias</key> + <value></value> + </param> + <param> + <key>_coordinate</key> + <value>(370, 562)</value> + </param> + <param> + <key>_rotation</key> + <value>0</value> + </param> + </block> <connection> <source_block_id>blocks_char_to_float_0_0_0_0</source_block_id> <sink_block_id>qtgui_time_sink_x_0</sink_block_id> diff --git a/gr-fec/python/fec/_qa_helper.py b/gr-fec/python/fec/_qa_helper.py new file mode 100755 index 0000000000..8722453441 --- /dev/null +++ b/gr-fec/python/fec/_qa_helper.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import blocks +from gnuradio import gr +import sys, numpy + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class map_bb(gr.sync_block): + def __init__(self, bitmap): + gr.sync_block.__init__( + self, + name = "map_bb", + in_sig = [numpy.int8], + out_sig = [numpy.int8]) + self.bitmap = bitmap + + def work(self, input_items, output_items): + output_items[0][:] = map(lambda x: self.bitmap[x], input_items[0]) + return len(output_items[0]) + + +class _qa_helper(gr.top_block): + + def __init__(self, data_size, enc, dec, threading): + gr.top_block.__init__(self, "_qa_helper") + + self.puncpat = puncpat = '11' + + self.enc = enc + self.dec = dec + self.data_size = data_size + self.threading = threading + + self.ext_encoder = extended_encoder(enc, threading=self.threading, puncpat=self.puncpat) + self.ext_decoder= extended_decoder(dec, threading=self.threading, ann=None, + puncpat=self.puncpat, integration_period=10000) + + self.src = blocks.vector_source_b(data_size*[0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False) + self.unpack = blocks.unpack_k_bits_bb(8) + self.map = map_bb([-1, 1]) + self.to_float = blocks.char_to_float(1) + self.snk_input = blocks.vector_sink_b() + self.snk_output = blocks.vector_sink_b() + + self.connect(self.src, self.unpack, self.ext_encoder) + self.connect(self.ext_encoder, self.map, self.to_float) + self.connect(self.to_float, self.ext_decoder) + self.connect(self.unpack, self.snk_input) + self.connect(self.ext_decoder, self.snk_output) + +if __name__ == '__main__': + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + #enc = fec.repetition_encoder_make(frame_size*8, 3) + dec = fec.dummy_decoder.make(frame_size*8) + + tb = _qa_helper(10*frame_size, enc, dec, None) + tb.run() + + errs = 0 + for i,o in zip(tb.snk_input.data(), tb.snk_output.data()): + if i-o != 0: + errs += 1 + + if errs == 0: + print "Decoded properly" + else: + print "Problem Decoding" diff --git a/gr-fec/python/fec/capillary_threaded_decoder.py b/gr-fec/python/fec/capillary_threaded_decoder.py index c4990ab21e..9a00cde192 100644 --- a/gr-fec/python/fec/capillary_threaded_decoder.py +++ b/gr-fec/python/fec/capillary_threaded_decoder.py @@ -33,6 +33,11 @@ class capillary_threaded_decoder(gr.hier_block2): self.decoder_list_0 = decoder_list_0 + check = math.log10(len(self.decoder_list_0)) / math.log10(2.0) + if(abs(check - int(check)) > 0): + gr.log.info("fec.capillary_threaded_decoder: number of decoders must be a power of 2.") + raise AttributeError + self.deinterleaves_0 = [] for i in range(int(math.log(len(decoder_list_0), 2))): for j in range(int(math.pow(2, i))): diff --git a/gr-fec/python/fec/capillary_threaded_encoder.py b/gr-fec/python/fec/capillary_threaded_encoder.py index 377250fb23..21d4af62ca 100644 --- a/gr-fec/python/fec/capillary_threaded_encoder.py +++ b/gr-fec/python/fec/capillary_threaded_encoder.py @@ -32,6 +32,11 @@ class capillary_threaded_encoder(gr.hier_block2): self.encoder_list_0 = encoder_list_0 + check = math.log10(len(self.encoder_list_0)) / math.log10(2.0) + if(abs(check - int(check)) > 0.0): + gr.log.info("fec.capillary_threaded_encoder: number of encoders must be a power of 2.") + raise AttributeError + self.deinterleaves_0 = []; for i in range(int(math.log(len(encoder_list_0), 2))): for j in range(int(math.pow(2, i))): diff --git a/gr-fec/python/fec/qa_fecapi_cc.py b/gr-fec/python/fec/qa_fecapi_cc.py new file mode 100644 index 0000000000..bbd500161e --- /dev/null +++ b/gr-fec/python/fec/qa_fecapi_cc.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +from _qa_helper import _qa_helper + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class test_fecapi_cc(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_parallelism0_00(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = fec.cc_encoder_make(frame_size*8, k, rate, polys) + dec = fec.cc_decoder.make(frame_size*8, k, rate, polys) + threading = None + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism0_01(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = fec.cc_encoder_make(frame_size*8, k, rate, polys) + dec = fec.cc_decoder.make(frame_size*8, k, rate, polys) + threading = 'ordinary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism0_02(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = fec.cc_encoder_make(frame_size*8, k, rate, polys) + dec = fec.cc_decoder.make(frame_size*8, k, rate, polys) + threading = 'capillary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_00(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1)) + threading = None + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_01(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1)) + threading = 'ordinary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_02(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys)), range(0,1)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys)), range(0,1)) + threading = 'capillary' + self.test = _qa_helper(5*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_03(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + mode = fec.CC_TERMINATED + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + threading = 'capillary' + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_04(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + mode = fec.CC_TRUNCATED + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + threading = 'capillary' + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + + def test_parallelism1_05(self): + frame_size = 30 + k = 7 + rate = 2 + polys = [109,79] + mode = fec.CC_TAILBITING + enc = map((lambda a: fec.cc_encoder_make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + dec = map((lambda a: fec.cc_decoder.make(frame_size*8, k, rate, polys, mode=mode)), range(0,4)) + threading = 'capillary' + self.test = _qa_helper(4*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_out = self.test.snk_output.data() + data_in = self.test.snk_input.data()[0:len(data_out)] + + self.assertEqual(data_in, data_out) + +if __name__ == '__main__': + gr_unittest.run(test_fecapi_cc, "test_fecapi_cc.xml") diff --git a/gr-fec/python/fec/qa_fecapi_dummy.py b/gr-fec/python/fec/qa_fecapi_dummy.py new file mode 100644 index 0000000000..a07890fb84 --- /dev/null +++ b/gr-fec/python/fec/qa_fecapi_dummy.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +from _qa_helper import _qa_helper + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class test_fecapi_dummy(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_parallelism0_00(self): + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + dec = fec.dummy_decoder.make(frame_size*8) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_01(self): + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + dec = fec.dummy_decoder.make(frame_size*8) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_02(self): + frame_size = 30 + enc = fec.dummy_encoder_make(frame_size*8) + dec = fec.dummy_decoder.make(frame_size*8) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_00(self): + frame_size = 30 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1)) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_01(self): + frame_size = 30 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1)) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_02(self): + frame_size = 300 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,1)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,1)) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_03(self): + frame_size = 30 + dims = 10 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'ordinary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_04(self): + frame_size = 30 + dims = 16 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'capillary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_05(self): + frame_size = 30 + dims = 5 + enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + #dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_encoder(enc, threading=threading, puncpat="11")) + + def test_parallelism1_06(self): + frame_size = 30 + dims = 5 + #enc = map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)) + dec = map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_decoder(dec, threading=threading, puncpat="11")) + + def test_parallelism2_00(self): + frame_size = 30 + dims1 = 16 + dims2 = 16 + enc = map((lambda b: map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)) + #dec = map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_encoder(enc, threading=threading, puncpat="11")) + + def test_parallelism2_01(self): + frame_size = 30 + dims1 = 16 + dims2 = 16 + dec = map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)) + threading = 'capillary' + + self.assertRaises(AttributeError, lambda: extended_decoder(dec, threading=threading, puncpat="11")) + +if __name__ == '__main__': + gr_unittest.run(test_fecapi_dummy, "test_fecapi_dummy.xml") diff --git a/gr-fec/python/fec/qa_fecapi_repetition.py b/gr-fec/python/fec/qa_fecapi_repetition.py new file mode 100644 index 0000000000..7998d61bd1 --- /dev/null +++ b/gr-fec/python/fec/qa_fecapi_repetition.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +# +# Copyright 2014 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import fec_swig as fec +from _qa_helper import _qa_helper + +from extended_encoder import extended_encoder +from extended_decoder import extended_decoder + +class test_fecapi_repetition(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_parallelism0_00(self): + frame_size = 30 + rep = 3 + enc = fec.repetition_encoder_make(frame_size*8, rep) + dec = fec.repetition_decoder.make(frame_size*8, rep) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_01(self): + frame_size = 30 + rep = 3 + enc = fec.repetition_encoder_make(frame_size*8, rep) + dec = fec.repetition_decoder.make(frame_size*8, rep) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism0_02(self): + frame_size = 30 + rep = 3 + enc = fec.repetition_encoder_make(frame_size*8, rep) + dec = fec.repetition_decoder.make(frame_size*8, rep) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_00(self): + frame_size = 30 + rep = 3 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1)) + threading = None + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_01(self): + frame_size = 30 + rep = 3 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1)) + threading = 'ordinary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_02(self): + frame_size = 300 + rep = 3 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,1)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,1)) + threading = 'capillary' + self.test = _qa_helper(10*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_03(self): + frame_size = 30 + rep = 3 + dims = 10 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,dims)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,dims)) + threading = 'ordinary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + + def test_parallelism1_04(self): + frame_size = 30 + rep = 3 + dims = 16 + enc = map((lambda a: fec.repetition_encoder_make(frame_size*8, rep)), range(0,dims)) + dec = map((lambda a: fec.repetition_decoder.make(frame_size*8, rep)), range(0,dims)) + threading = 'capillary' + self.test = _qa_helper(dims*frame_size, enc, dec, threading) + self.tb.connect(self.test) + self.tb.run() + + data_in = self.test.snk_input.data() + data_out =self.test.snk_output.data() + + self.assertEqual(data_in, data_out) + +if __name__ == '__main__': + gr_unittest.run(test_fecapi_repetition, "test_fecapi_repetition.xml") |