#!/usr/bin/env python
# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#


from gnuradio import gr, gr_unittest, blocks, digital
import pmt


class qa_crc32_bb(gr_unittest.TestCase):
    def setUp(self):
        self.tb = gr.top_block()
        self.tsb_key = "length"

    def tearDown(self):
        self.tb = None

    def test_001_crc_len(self):
        """ Make sure the output of a CRC set is 4 bytes longer than the input. """
        data = list(range(16))
        src = blocks.vector_source_b(data)
        crc = digital.crc32_bb(False, self.tsb_key)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(
            src,
            blocks.stream_to_tagged_stream(gr.sizeof_char, 1,
                                           len(data), self.tsb_key), crc, sink)
        self.tb.run()
        # Check that the packets before crc_check are 4 bytes longer that the
        # input.
        self.assertEqual(len(data) + 4, len(sink.data()[0]))

    def test_002_crc_equal(self):
        """ Go through CRC set / CRC check and make sure the output
        is the same as the input. """
        data = [0, 1, 2, 3, 4, 5, 6, 7, 8]
        src = blocks.vector_source_b(data)
        crc = digital.crc32_bb(False, self.tsb_key)
        crc_check = digital.crc32_bb(True, self.tsb_key)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key), crc,
                        crc_check, sink)
        self.tb.run()
        # Check that the packets after crc_check are the same as input.
        self.assertEqual(data, sink.data()[0])

    def test_003_crc_correct_lentag(self):
        tag_name = "length"
        pack_len = 8
        packets = list(range(pack_len * 2))
        tag1 = gr.tag_t()
        tag1.offset = 0
        tag1.key = pmt.string_to_symbol(tag_name)
        tag1.value = pmt.from_long(pack_len)
        tag2 = gr.tag_t()
        tag2.offset = pack_len
        tag2.key = pmt.string_to_symbol(tag_name)
        tag2.value = pmt.from_long(pack_len)
        testtag1 = gr.tag_t()
        testtag1.offset = 1
        testtag1.key = pmt.string_to_symbol("tag1")
        testtag1.value = pmt.from_long(0)
        testtag2 = gr.tag_t()
        testtag2.offset = pack_len
        testtag2.key = pmt.string_to_symbol("tag2")
        testtag2.value = pmt.from_long(0)
        testtag3 = gr.tag_t()
        testtag3.offset = len(packets) - 1
        testtag3.key = pmt.string_to_symbol("tag3")
        testtag3.value = pmt.from_long(0)
        src = blocks.vector_source_b(packets, False, 1,
                                     (testtag1, testtag2, testtag3))
        crc = digital.crc32_bb(False, self.tsb_key)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, pack_len, self.tsb_key), crc,
                        sink)
        self.tb.run()
        self.assertEqual(len(sink.data()), 2)
        self.assertEqual(len(sink.data()[0]), (pack_len + 4))
        self.assertEqual(len(sink.data()[1]), (pack_len + 4))
        correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19}
        tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
        for tag in sink.tags():
            key = pmt.symbol_to_string(tag.key)
            if key in list(correct_offsets.keys()):
                tags_found[key] = True
                self.assertEqual(correct_offsets[key], tag.offset)
        self.assertTrue(all(tags_found.values()))

    def test_004_fail(self):
        """ Corrupt the data and make sure it fails CRC test. """
        data = [0, 1, 2, 3, 4, 5, 6, 7]
        src = blocks.vector_source_b(data)
        crc = digital.crc32_bb(False, self.tsb_key)
        crc_check = digital.crc32_bb(True, self.tsb_key)
        corruptor = blocks.add_const_bb(1)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key), crc,
                        corruptor, crc_check, sink)
        self.tb.run()
        # crc_check will drop invalid packets
        self.assertEqual(len(sink.data()), 0)

    def test_005_tag_propagation(self):
        """ Make sure tags on the CRC aren't lost. """
        # Data with precalculated CRC
        data = [0, 1, 2, 3, 4, 5, 6, 7, 8, 2, 67, 225, 188]
        testtag = gr.tag_t()
        testtag.offset = len(data) - 1
        testtag.key = pmt.string_to_symbol('tag1')
        testtag.value = pmt.from_long(0)
        src = blocks.vector_source_b(data, False, 1, (testtag, ))
        crc_check = digital.crc32_bb(True, self.tsb_key)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key),
                        crc_check, sink)
        self.tb.run()
        self.assertEqual([len(data) - 5, ], [
            tag.offset for tag in sink.tags()
            if pmt.symbol_to_string(tag.key) == 'tag1'
        ])

    # NOTE: What follows are the same tests as before but with the packed flag
    # set to False

    def test_006_crc_len(self):
        """ Make sure the output of a CRC set is 32 (unpacked) bytes longer than the input. """
        data = list(range(16))
        src = blocks.vector_source_b(data)
        crc = digital.crc32_bb(False, self.tsb_key, False)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(
            src,
            blocks.stream_to_tagged_stream(gr.sizeof_char, 1,
                                           len(data), self.tsb_key), crc, sink)
        self.tb.run()
        # Check that the packets before crc_check are 4 bytes longer that the
        # input.
        self.assertEqual(len(data) + 32, len(sink.data()[0]))

    def test_007_crc_equal(self):
        """ Go through CRC set / CRC check and make sure the output
        is the same as the input. """
        data = [0, 1, 2, 3, 4, 5, 6, 7, 8]
        src = blocks.vector_source_b(data)
        crc = digital.crc32_bb(False, self.tsb_key, False)
        crc_check = digital.crc32_bb(True, self.tsb_key, False)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key), crc,
                        crc_check, sink)
        self.tb.run()
        # Check that the packets after crc_check are the same as input.
        self.assertEqual(data, sink.data()[0])

    def test_002_crc_equal_unpacked(self):
        """ Test unpacked operation with packed operation
        """
        data = [0, 1, 2, 3, 4, 5, 6, 7, 8]
        src = blocks.vector_source_b(data)
        unpack1 = blocks.repack_bits_bb(
            8, 1, self.tsb_key, False, gr.GR_LSB_FIRST)
        unpack2 = blocks.repack_bits_bb(
            8, 1, self.tsb_key, False, gr.GR_LSB_FIRST)
        crc_unpacked = digital.crc32_bb(False, self.tsb_key, False)
        crc_packed = digital.crc32_bb(False, self.tsb_key, True)
        sink1 = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        sink2 = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)

        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key),
                        crc_packed, unpack1, sink1)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key),
                        unpack2, crc_unpacked, sink2)
        self.tb.run()
        self.assertEqual(sink1.data(), sink2.data())

    def test_003_crc_equal_unpacked(self):
        """ Test unpacked operation with packed operation
        """
        data = list(range(35))
        src = blocks.vector_source_b(data)
        unpack1 = blocks.repack_bits_bb(8, 1, self.tsb_key, False,
                                        gr.GR_LSB_FIRST)
        unpack2 = blocks.repack_bits_bb(8, 1, self.tsb_key, False,
                                        gr.GR_LSB_FIRST)
        crc_unpacked = digital.crc32_bb(False, self.tsb_key, False)
        crc_packed = digital.crc32_bb(False, self.tsb_key, True)
        sink1 = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        sink2 = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)

        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key),
                        crc_packed, unpack1, sink1)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key),
                        unpack2, crc_unpacked, sink2)
        self.tb.run()
        self.assertEqual(sink1.data(), sink2.data())

    def test_008_crc_correct_lentag(self):
        tag_name = "length"
        pack_len = 8
        packets = list(range(pack_len * 2))
        tag1 = gr.tag_t()
        tag1.offset = 0
        tag1.key = pmt.string_to_symbol(tag_name)
        tag1.value = pmt.from_long(pack_len)
        tag2 = gr.tag_t()
        tag2.offset = pack_len
        tag2.key = pmt.string_to_symbol(tag_name)
        tag2.value = pmt.from_long(pack_len)
        testtag1 = gr.tag_t()
        testtag1.offset = 1
        testtag1.key = pmt.string_to_symbol("tag1")
        testtag1.value = pmt.from_long(0)
        testtag2 = gr.tag_t()
        testtag2.offset = pack_len
        testtag2.key = pmt.string_to_symbol("tag2")
        testtag2.value = pmt.from_long(0)
        testtag3 = gr.tag_t()
        testtag3.offset = len(packets) - 1
        testtag3.key = pmt.string_to_symbol("tag3")
        testtag3.value = pmt.from_long(0)
        src = blocks.vector_source_b(packets, False, 1,
                                     (testtag1, testtag2, testtag3))
        crc = digital.crc32_bb(False, self.tsb_key, False)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, pack_len, self.tsb_key), crc,
                        sink)
        self.tb.run()
        self.assertEqual(len(sink.data()), 2)
        self.assertEqual(len(sink.data()[0]), (pack_len + 32))
        self.assertEqual(len(sink.data()[1]), (pack_len + 32))
        correct_offsets = {'tag1': 1, 'tag2': 8 + 32, 'tag3': 15 + 32}
        tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
        for tag in sink.tags():
            key = pmt.symbol_to_string(tag.key)
            if key in list(correct_offsets.keys()):
                tags_found[key] = True
                self.assertEqual(correct_offsets[key], tag.offset)
        self.assertTrue(all(tags_found.values()))

    def test_009_fail(self):
        """ Corrupt the data and make sure it fails CRC test. """
        data = (0, 1, 2, 3, 4, 5, 6, 7)
        src = blocks.vector_source_b(data)
        crc = digital.crc32_bb(False, self.tsb_key, False)
        crc_check = digital.crc32_bb(True, self.tsb_key, False)
        corruptor = blocks.add_const_bb(1)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key), crc,
                        corruptor, crc_check, sink)
        self.tb.run()
        # crc_check will drop invalid packets
        self.assertEqual(len(sink.data()), 0)

    def test_0010_tag_propagation(self):
        """ Make sure tags on the CRC aren't lost. """
        # Data with precalculated CRC
        data = (0, 0, 0, 0, 0, 0, 0, 0,
                1, 0, 0, 0, 0, 0, 0, 0,
                0, 1, 0, 0, 0, 0, 0, 0,
                1, 1, 0, 0, 0, 0, 0, 0,
                0, 0, 1, 0, 0, 0, 0, 0,
                1, 0, 1, 0, 0, 0, 0, 0,
                0, 1, 1, 0, 0, 0, 0, 0,
                1, 1, 1, 0, 0, 0, 0, 0,
                0, 0, 0, 1, 0, 0, 0, 0,
                0, 1, 0, 0, 0, 0, 0, 0,
                1, 1, 0, 0, 0, 0, 1, 0,
                1, 0, 0, 0, 0, 1, 1, 1,
                0, 0, 1, 1, 1, 1, 0, 1)
        testtag = gr.tag_t()
        testtag.offset = len(data) - 1
        testtag.key = pmt.string_to_symbol('tag1')
        testtag.value = pmt.from_long(0)
        src = blocks.vector_source_b(data, False, 1, (testtag, ))
        crc_check = digital.crc32_bb(True, self.tsb_key, False)
        sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
        self.tb.connect(src,
                        blocks.stream_to_tagged_stream(
                            gr.sizeof_char, 1, len(data), self.tsb_key),
                        crc_check, sink)
        self.tb.run()
        self.assertEqual([len(data) - 33, ], [
            tag.offset for tag in sink.tags()
            if pmt.symbol_to_string(tag.key) == 'tag1'
        ])


if __name__ == '__main__':
    gr_unittest.run(qa_crc32_bb)