#!/usr/bin/env python3
#
# Copyright 2007-2018 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
"""
Unit tests for OFDM cyclic prefixer.
"""

from gnuradio import gr, gr_unittest, digital, blocks
import pmt


class test_ofdm_cyclic_prefixer (gr_unittest.TestCase):

    def setUp(self):
        self.tb = gr.top_block()

    def tearDown(self):
        self.tb = None

    def test_wo_tags_no_rolloff(self):
        " The easiest test: make sure the CP is added correctly. "
        fft_len = 8
        cp_len = 2
        expected_result = [6, 7, 0, 1, 2, 3, 4, 5, 6, 7,
                           6, 7, 0, 1, 2, 3, 4, 5, 6, 7]
        src = blocks.vector_source_c(list(range(fft_len)) * 2, False, fft_len)
        cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len)
        sink = blocks.vector_sink_c()
        self.tb.connect(src, cp, sink)
        self.tb.run()
        self.assertEqual(sink.data(), expected_result)

    def test_wo_tags_2s_rolloff(self):
        " No tags, but have a 2-sample rolloff "
        fft_len = 8
        cp_len = 2
        rolloff = 2
        expected_result = [7.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 1.0/2
                           7.0 / 2 + 1.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8]
        src = blocks.vector_source_c(
            list(
                range(
                    1,
                    fft_len +
                    1)) *
            2,
            False,
            fft_len)
        cp = digital.ofdm_cyclic_prefixer(fft_len, fft_len + cp_len, rolloff)
        sink = blocks.vector_sink_c()
        self.tb.connect(src, cp, sink)
        self.tb.run()
        self.assertEqual(sink.data(), expected_result)

    def test_with_tags_2s_rolloff(self):
        " With tags and a 2-sample rolloff "
        fft_len = 8
        cp_len = 2
        tag_name = "ts_last"
        expected_result = [7.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 1.0/2
                           7.0 / 2 + 1.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0 / 2]
        tag2 = gr.tag_t()
        tag2.offset = 1
        tag2.key = pmt.string_to_symbol("random_tag")
        tag2.value = pmt.from_long(42)
        src = blocks.vector_source_c(
            list(range(1, fft_len + 1)) * 2, False, fft_len, (tag2,))
        cp = digital.ofdm_cyclic_prefixer(
            fft_len, fft_len + cp_len, 2, tag_name)
        sink = blocks.tsb_vector_sink_c(tsb_key=tag_name)
        self.tb.connect(
            src,
            blocks.stream_to_tagged_stream(
                gr.sizeof_gr_complex,
                fft_len,
                2,
                tag_name),
            cp,
            sink)
        self.tb.run()
        self.assertEqual(sink.data()[0], expected_result)
        tags = [gr.tag_to_python(x) for x in sink.tags()]
        tags = sorted([(x.offset, x.key, x.value) for x in tags])
        expected_tags = [
            (fft_len + cp_len, "random_tag", 42)
        ]
        self.assertEqual(tags, expected_tags)

    def test_wo_tags_no_rolloff_multiple_cps(self):
        "Two CP lengths, no rolloff and no tags."
        fft_len = 8
        cp_lengths = (3, 2, 2)
        expected_result = [5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,  # 1
                           6, 7, 0, 1, 2, 3, 4, 5, 6, 7,    # 2
                           6, 7, 0, 1, 2, 3, 4, 5, 6, 7,    # 3
                           5, 6, 7, 0, 1, 2, 3, 4, 5, 6, 7,  # 4
                           6, 7, 0, 1, 2, 3, 4, 5, 6, 7,    # 5
                           ]
        src = blocks.vector_source_c(list(range(fft_len)) * 5, False, fft_len)
        cp = digital.ofdm_cyclic_prefixer(fft_len, cp_lengths)
        sink = blocks.vector_sink_c()
        self.tb.connect(src, cp, sink)
        self.tb.run()
        self.assertEqual(sink.data(), expected_result)

    def test_wo_tags_2s_rolloff_multiple_cps(self):
        "Two CP lengths, 2-sample rolloff and no tags."
        fft_len = 8
        cp_lengths = (3, 2, 2)
        rolloff = 2
        expected_result = [6.0 / 2, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 1
                           7.0 / 2 + 1.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 2
                           7.0 / 2 + 1.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 3
                           6.0 / 2 + 1.0 / 2, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 4
                           7.0 / 2 + 1.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8  # 5
                           ]
        src = blocks.vector_source_c(
            list(
                range(
                    1,
                    fft_len +
                    1)) *
            5,
            False,
            fft_len)
        cp = digital.ofdm_cyclic_prefixer(fft_len, cp_lengths, rolloff)
        sink = blocks.vector_sink_c()
        self.tb.connect(src, cp, sink)
        self.tb.run()
        self.assertEqual(sink.data(), expected_result)

    def test_with_tags_2s_rolloff_multiples_cps(self):
        "Two CP lengths, 2-sample rolloff and tags."
        fft_len = 8
        cp_lengths = (3, 2, 2)
        rolloff = 2
        tag_name = "ts_last"
        expected_result = [
            6.0 / 2, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8,  # 1
            7.0 / 2 + 1.0 / 2, 8, 1, 2, 3, 4, 5, 6, 7, 8, 1.0 / 2  # Last tail
        ]
        # First test tag
        tag0 = gr.tag_t()
        tag0.offset = 0
        tag0.key = pmt.string_to_symbol("first_tag")
        tag0.value = pmt.from_long(24)
        # Second test tag
        tag1 = gr.tag_t()
        tag1.offset = 1
        tag1.key = pmt.string_to_symbol("second_tag")
        tag1.value = pmt.from_long(42)
        src = blocks.vector_source_c(
            list(range(1, fft_len + 1)) * 2, False, fft_len, (tag0, tag1))
        cp = digital.ofdm_cyclic_prefixer(
            fft_len, cp_lengths, rolloff, tag_name)
        sink = blocks.tsb_vector_sink_c(tsb_key=tag_name)
        self.tb.connect(
            src,
            blocks.stream_to_tagged_stream(
                gr.sizeof_gr_complex,
                fft_len,
                2,
                tag_name),
            cp,
            sink)
        self.tb.run()
        self.assertEqual(sink.data()[0], expected_result)
        tags = [gr.tag_to_python(x) for x in sink.tags()]
        tags = sorted([(x.offset, x.key, x.value) for x in tags])
        expected_tags = [
            (0, "first_tag", 24),
            (fft_len + cp_lengths[0], "second_tag", 42)
        ]
        self.assertEqual(tags, expected_tags)


if __name__ == '__main__':
    gr_unittest.run(test_ofdm_cyclic_prefixer)