#!/usr/bin/env python
#
# Copyright 2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#


import numpy as np

from gnuradio import gr, gr_unittest, blocks
from gnuradio import fec
from gnuradio.fec import extended_encoder
from gnuradio.fec import extended_decoder

from _qa_helper import _qa_helper


class test_fecapi_dummy(gr_unittest.TestCase):

    def setUp(self):
        self.tb = gr.top_block()

    def tearDown(self):
        self.tb = None

    def test_parallelism0_00(self):
        frame_size = 30
        enc = fec.dummy_encoder_make(frame_size * 8)
        dec = fec.dummy_decoder.make(frame_size * 8)
        threading = None
        self.test = _qa_helper(10 * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism0_01(self):
        frame_size = 30
        enc = fec.dummy_encoder_make(frame_size * 8)
        dec = fec.dummy_decoder.make(frame_size * 8)
        threading = 'ordinary'
        self.test = _qa_helper(10 * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism0_02(self):
        frame_size = 30
        enc = fec.dummy_encoder_make(frame_size * 8)
        dec = fec.dummy_decoder.make(frame_size * 8)
        threading = 'capillary'
        self.test = _qa_helper(10 * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism1_00(self):
        frame_size = 30
        enc = list(
            map((lambda a: fec.dummy_encoder_make(frame_size * 8)), list(range(0, 1))))
        dec = list(
            map((lambda a: fec.dummy_decoder.make(frame_size * 8)), list(range(0, 1))))
        threading = None
        self.test = _qa_helper(10 * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism1_01(self):
        frame_size = 30
        enc = list(
            map((lambda a: fec.dummy_encoder_make(frame_size * 8)), list(range(0, 1))))
        dec = list(
            map((lambda a: fec.dummy_decoder.make(frame_size * 8)), list(range(0, 1))))
        threading = 'ordinary'
        self.test = _qa_helper(10 * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism1_02(self):
        frame_size = 300
        enc = list(
            map((lambda a: fec.dummy_encoder_make(frame_size * 8)), list(range(0, 1))))
        dec = list(
            map((lambda a: fec.dummy_decoder.make(frame_size * 8)), list(range(0, 1))))
        threading = 'capillary'
        self.test = _qa_helper(10 * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()
        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism1_03(self):
        frame_size = 30
        dims = 10
        enc = list(map((lambda a: fec.dummy_encoder_make(
            frame_size * 8)), list(range(0, dims))))
        dec = list(map((lambda a: fec.dummy_decoder.make(
            frame_size * 8)), list(range(0, dims))))
        threading = 'ordinary'
        self.test = _qa_helper(dims * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism1_04(self):
        frame_size = 30
        dims = 16
        enc = list(map((lambda a: fec.dummy_encoder_make(
            frame_size * 8)), list(range(0, dims))))
        dec = list(map((lambda a: fec.dummy_decoder.make(
            frame_size * 8)), list(range(0, dims))))
        threading = 'capillary'
        self.test = _qa_helper(dims * frame_size, enc, dec, threading)
        self.tb.connect(self.test)
        self.tb.run()

        data_in = self.test.snk_input.data()
        data_out = self.test.snk_output.data()

        self.assertSequenceEqualGR(data_in, data_out)

    def test_parallelism1_05(self):
        frame_size = 30
        dims = 5
        enc = list(map((lambda a: fec.dummy_encoder_make(
            frame_size * 8)), list(range(0, dims))))
        #dec = list(map((lambda a: fec.dummy_decoder.make(frame_size*8)), range(0,dims)))
        threading = 'capillary'

        self.assertRaises(
            AttributeError,
            lambda: extended_encoder(
                enc,
                threading=threading,
                puncpat="11"))

    def test_parallelism1_06(self):
        frame_size = 30
        dims = 5
        #enc = list(map((lambda a: fec.dummy_encoder_make(frame_size*8)), range(0,dims)))
        dec = list(map((lambda a: fec.dummy_decoder.make(
            frame_size * 8)), list(range(0, dims))))
        threading = 'capillary'

        self.assertRaises(
            AttributeError,
            lambda: extended_decoder(
                dec,
                threading=threading,
                puncpat="11"))

    def test_parallelism2_00(self):
        frame_size = 30
        dims1 = 16
        dims2 = 16
        enc = list(map((lambda b: list(map((lambda a: fec.dummy_encoder_make(
            frame_size * 8)), list(range(0, dims1))))), list(range(0, dims2))))
        #dec = list(map((lambda b: map((lambda a: fec.dummy_decoder_make(frame_size*8)), range(0,dims1))), range(0,dims2)))
        threading = 'capillary'

        self.assertRaises(
            AttributeError,
            lambda: extended_encoder(
                enc,
                threading=threading,
                puncpat="11"))

    def test_parallelism2_01(self):
        frame_size = 30
        dims1 = 16
        dims2 = 16
        dec = list(map((lambda b: list(map((lambda a: fec.dummy_decoder_make(
            frame_size * 8)), list(range(0, dims1))))), list(range(0, dims2))))
        threading = 'capillary'

        self.assertRaises(
            AttributeError,
            lambda: extended_decoder(
                dec,
                threading=threading,
                puncpat="11"))

    def test_extended_pack_data(self):
        # test if extended encoder gets correct values for input and output
        # conversion.
        n_frames = 10
        frame_size = 32

        data = np.random.randint(0, 2, n_frames * frame_size)
        data.dtype = np.uint8
        packed_data = np.packbits(data)

        tb = gr.top_block()

        src = blocks.vector_source_b(data)
        snk0 = blocks.vector_sink_b(1)
        snk1 = blocks.vector_sink_b(1)
        snk2 = blocks.vector_sink_b(1)
        snk3 = blocks.vector_sink_b(1)

        packer = blocks.pack_k_bits_bb(8)
        tb.connect(src, packer, snk0)

        enc_unpacked = fec.dummy_encoder_make(frame_size, False, False)
        ext_enc_unp = extended_encoder(
            enc_unpacked, threading='none', puncpat='11')
        tb.connect(src, ext_enc_unp, snk1)

        enc_pack = fec.dummy_encoder_make(frame_size // 8, True, False)
        ext_enc_pack = extended_encoder(
            enc_pack, threading='none', puncpat='11')
        tb.connect(src, ext_enc_pack, snk2)

        enc_packed_bits = fec.dummy_encoder_make(frame_size // 8, False, True)
        ext_enc_packed_bits = extended_encoder(
            enc_packed_bits, threading='none', puncpat='11')
        tb.connect(packer, ext_enc_packed_bits, snk3)

        tb.run()

        r0 = snk0.data()
        r1 = snk1.data()
        r2 = snk2.data()
        r3 = snk3.data()

        data = list(data)
        packed_data = list(packed_data)
        self.assertSequenceEqualGR(packed_data, r0)
        self.assertSequenceEqualGR(data, r1)
        self.assertSequenceEqualGR(packed_data, r2)
        self.assertSequenceEqualGR(data, r3)


if __name__ == '__main__':
    gr_unittest.run(test_fecapi_dummy)