#!/usr/bin/env python
#
# Copyright 2021 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#

from gnuradio import gr, gr_unittest
from gnuradio import blocks
import numpy as np
from numpy.random import uniform
import pmt


class qa_rotator_cc(gr_unittest.TestCase):
    def _setUp(self, n_samples=100, tag_inc_updates=True):
        """Base fixture: set up flowgraph and parameters"""
        self.n_samples = n_samples  # number of IQ samples to generate
        self.f_in = uniform(high=0.5)  # input frequency
        self.f_shift = uniform(high=0.5) - \
            self.f_in  # rotator's starting frequency

        # Input IQ samples
        in_angles = 2 * np.pi * np.arange(self.n_samples) * self.f_in
        in_samples = np.exp(1j * in_angles)

        # Rotator's starting phase increment
        phase_inc = 2 * np.pi * self.f_shift

        # Flowgraph:
        #
        #                             /-> Vector Sink
        # Vector Source -> Rotator --/
        #                            \
        #                             \-> Tag Debug
        #
        self.tb = gr.top_block()
        self.source = blocks.vector_source_c(in_samples)
        self.rotator_cc = blocks.rotator_cc(phase_inc, tag_inc_updates)
        self.sink = blocks.vector_sink_c()
        self.tag_sink = blocks.tag_debug(gr.sizeof_gr_complex, "rot_phase_inc",
                                         "rot_phase_inc")
        self.tag_sink.set_save_all(True)
        self.tb.connect(self.source, self.rotator_cc, self.sink)
        self.tb.connect(self.rotator_cc, self.tag_sink)

    def setUp(self):
        self._setUp()

    def tearDown(self):
        self.tb = None

    def _post_phase_inc_cmd(self, new_phase_inc, offset=None):
        """Post phase increment update command to the rotator block"""
        cmd = pmt.make_dict()
        cmd = pmt.dict_add(cmd, pmt.intern("inc"),
                           pmt.from_double(new_phase_inc))
        if (offset is not None):
            cmd = pmt.dict_add(cmd, pmt.intern("offset"),
                               pmt.from_uint64(offset))
        self.rotator_cc.insert_tail(pmt.to_pmt("cmd"), cmd)

    def _assert_tags(self, expected_values, expected_offsets):
        """Check the tags received by the tag debug block"""
        tags = self.tag_sink.current_tags()
        expected_tags = list(zip(expected_values, expected_offsets))
        self.assertEqual(len(tags), len(expected_tags))

        for idx, (val, offset) in enumerate(expected_tags):
            self.assertAlmostEqual(pmt.to_double(tags[idx].value),
                                   val,
                                   places=5)
            self.assertEqual(tags[idx].offset, offset)

    def test_freq_shift(self):
        """Complex sinusoid frequency shift"""
        f_out = self.f_in + self.f_shift  # expected output frequency

        # Expected IQ samples
        expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
        expected_samples = np.exp(1j * expected_angles)

        self.tb.run()
        self.assertComplexTuplesAlmostEqual(self.sink.data(),
                                            expected_samples,
                                            places=4)

    def _test_scheduled_phase_inc_update(self):
        """Update the phase increment at a chosen offset via command message

        Returns:
            Tuple with the new phase increment, the phase increment update
            sample offset, and a list with the expected rotated IQ samples.

        """
        new_f_shift = uniform(high=0.5) - self.f_in  # rotator's new frequency
        new_phase_inc = float(2 * np.pi * new_f_shift)  # new phase increment
        offset = int(self.n_samples / 2)  # when to update the phase increment
        f_out_1 = self.f_in + self.f_shift  # output frequency before update
        f_out_2 = self.f_in + new_f_shift  # output frequency after update

        # Post the phase increment command message to the rotator block
        self._post_phase_inc_cmd(new_phase_inc, offset)

        # Samples before and after the phase increment update
        n_before = offset
        n_after = self.n_samples - offset

        # Expected IQ samples
        #
        # Note: when the phase increment is updated, the assumption is that the
        # update occurs at the beginning of the sample period. Hence, the new
        # phase rotation pace will only be observed in the following sample. In
        # other words, the new phase increment can only be observed after some
        # time goes by such that the rotator's phase accumulates.
        #
        # For example, suppose there are ten samples, and the update occurs on
        # offset 5 (i.e., the sixth sample). Suppose also that the initial
        # phase increment is 0.1, and the new increment is 0.2. In this case,
        # we have the following sequence of angles:
        #
        #    [0, 0.1, 0.2, 0.3, 0.4, 0.5. 0.7, 0.9, 1.1, 1.3]
        #                             |     \
        #                             |      \--------\
        #                           update             \
        #                           applied         new phase
        #                                        increment observed
        #
        # Ultimately, this means that the phase seen at the sample where the
        # update is applied is still determined by the old phase increment.
        angles_before_update = 2 * np.pi * np.arange(n_before + 1) * f_out_1
        angles_after_update = angles_before_update[-1] + (
            2 * np.pi * np.arange(1, n_after) * f_out_2)
        expected_angles = np.concatenate(
            (angles_before_update, angles_after_update))
        expected_samples = np.exp(1j * expected_angles)

        return new_phase_inc, offset, expected_samples

    def test_scheduled_phase_inc_update(self):
        """Update the phase increment at a chosen offset via command message"""
        new_phase_inc, \
            offset, \
            expected_samples = self._test_scheduled_phase_inc_update()

        self.tb.run()
        self._assert_tags([new_phase_inc], [offset])
        self.assertComplexTuplesAlmostEqual(self.sink.data(),
                                            expected_samples,
                                            places=4)

    def test_scheduled_phase_inc_update_with_tagging_disabled(self):
        """Test a scheduled phase increment update without tagging the update

        Same as test_scheduled_phase_inc_update but with tagging disabled.

        """
        self._setUp(tag_inc_updates=False)

        _, _, expected_samples = self._test_scheduled_phase_inc_update()

        self.tb.run()
        tags = self.tag_sink.current_tags()
        self.assertEqual(len(tags), 0)
        self.assertComplexTuplesAlmostEqual(self.sink.data(),
                                            expected_samples,
                                            places=4)

    def test_immediate_phase_inc_update(self):
        """Immediate phase increment update via command message

        In this test, the command message does not include the offset
        key. Hence, the rotator should update its phase increment immediately.

        """
        new_f_shift = uniform(high=0.5) - self.f_in  # rotator's new frequency
        new_phase_inc = float(2 * np.pi * new_f_shift)  # new phase increment
        f_out = self.f_in + new_f_shift  # output frequency after update

        # Post the phase increment command message to the rotator block
        self._post_phase_inc_cmd(new_phase_inc)

        # The rotator updates the increment immediately (on the first sample)
        expected_tag_offset = 0

        # Expected samples (all of them with the new frequency set via message)
        expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
        expected_samples = np.exp(1j * expected_angles)

        self.tb.run()
        self._assert_tags([new_phase_inc], [expected_tag_offset])
        self.assertComplexTuplesAlmostEqual(self.sink.data(),
                                            expected_samples,
                                            places=4)

    def test_zero_change_phase_inc_update(self):
        """Schedule a phase increment update that does not change anything

        If the scheduled phase increment update sets the same phase increment
        that is already active in the rotator block, there should be no effect
        on the output signal. Nevertheless, the rotator should still tag the
        update.

        """
        new_phase_inc = 2 * np.pi * self.f_shift  # no change
        offset = int(self.n_samples / 2)  # when to update the phase increment
        f_out = self.f_in + self.f_shift  # expected output frequency

        # Post the phase increment command message to the rotator block
        self._post_phase_inc_cmd(new_phase_inc, offset)

        # Expected IQ samples
        expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
        expected_samples = np.exp(1j * expected_angles)

        self.tb.run()
        self._assert_tags([new_phase_inc], [offset])
        self.assertComplexTuplesAlmostEqual(self.sink.data(),
                                            expected_samples,
                                            places=4)

    def test_consecutive_phase_inc_updates(self):
        """Test tagging of a few consecutive phase increment updates"""
        n_updates = 3
        new_f_shifts = uniform(high=0.5, size=n_updates)  # new frequencies
        new_phase_incs = 2 * np.pi * new_f_shifts  # new phase increments
        offsets = self.n_samples * np.arange(1, 4, 1) / 4  # when to update

        for new_phase_inc, offset in zip(new_phase_incs, offsets):
            self._post_phase_inc_cmd(new_phase_inc, int(offset))

        self.tb.run()
        self._assert_tags(new_phase_incs, offsets)

    def test_out_of_order_phase_inc_updates(self):
        """Test tagging of a few out-of-order phase increment updates

        The rotator should sort the increment updates and apply them in order.

        """
        n_updates = 3
        new_f_shifts = uniform(high=0.5, size=n_updates)  # new frequencies
        new_phase_incs = 2 * np.pi * new_f_shifts  # new phase increments
        offsets = self.n_samples * np.arange(1, 4, 1) / 4  # when to update

        # Post the phase increment command messages out of order
        self._post_phase_inc_cmd(new_phase_incs[0], int(offsets[0]))
        self._post_phase_inc_cmd(new_phase_incs[2], int(offsets[2]))
        self._post_phase_inc_cmd(new_phase_incs[1], int(offsets[1]))

        self.tb.run()

        # Confirm they are received in order
        self._assert_tags(new_phase_incs, offsets)

    def test_duplicate_phase_inc_updates(self):
        """Test multiple phase increment updates scheduled for the same sample

        The rotator block applies all updates scheduled for the same sample
        offset. Hence, only the last update shall take effect.

        """
        n_updates = 3

        # Post the phase increment command messages
        new_phase_incs = []
        for i in range(n_updates):
            new_phase_inc, \
                offset, \
                expected_samples = self._test_scheduled_phase_inc_update()
            new_phase_incs.append(new_phase_inc)

        self.tb.run()

        # All "n_updates" tags are expected to be present
        self._assert_tags(new_phase_incs, [offset] * n_updates)

        # However, only the last update takes effect on the rotated samples
        self.assertComplexTuplesAlmostEqual(self.sink.data(),
                                            expected_samples,
                                            places=4)

    def test_phase_inc_update_out_of_range(self):
        """Test phase increment update sent for an out-of-range offset"""
        self._setUp(n_samples=2**16)
        n_half_samples = int(self.n_samples / 2)

        # Schedule a phase increment update whose offset is initially
        # out-of-range due to being in the future
        new_phase_inc = 2 * np.pi * 0.1
        self._post_phase_inc_cmd(new_phase_inc, offset=n_half_samples)

        # Run the flowgraph and wait until the rotator block does some work
        self.tb.start()
        while (self.rotator_cc.nitems_written(0) == 0):
            pass

        # The out-of-range update (scheduled for the future) should not have
        # been applied yet, but it should not have been discarded either.
        self._assert_tags([], [])

        # Wait until at least the first half of samples have been processed
        while (self.rotator_cc.nitems_written(0) < n_half_samples):
            pass

        # Now, schedule an update that is out-of-range due to being in the past
        self._post_phase_inc_cmd(new_phase_inc, offset=0)

        # And run the flowgraph to completion
        self.tb.wait()

        # The increment update initially scheduled for the future should have
        # been applied when processing the second half of samples. In contrast,
        # the update scheduled for the past should have been discarded.
        self._assert_tags([new_phase_inc], [n_half_samples])


if __name__ == '__main__':
    gr_unittest.run(qa_rotator_cc)