path: root/gr-blocks/python
diff options
Diffstat (limited to 'gr-blocks/python')
2 files changed, 327 insertions, 2 deletions
diff --git a/gr-blocks/python/blocks/bindings/ b/gr-blocks/python/blocks/bindings/
index 906a7e2105..08587999bd 100644
--- a/gr-blocks/python/blocks/bindings/
+++ b/gr-blocks/python/blocks/bindings/
@@ -14,7 +14,7 @@
/* BINDTOOL_HEADER_FILE(rotator_cc.h) */
-/* BINDTOOL_HEADER_FILE_HASH(e127d40b39a00e6725b5f7214ef0e6d2) */
+/* BINDTOOL_HEADER_FILE_HASH(a8a69ee8f26e479fef8ba11414826da8) */
#include <pybind11/complex.h>
@@ -39,7 +39,10 @@ void bind_rotator_cc(py::module& m)
std::shared_ptr<rotator_cc>>(m, "rotator_cc", D(rotator_cc))
- .def(py::init(&rotator_cc::make), py::arg("phase_inc") = 0., D(rotator_cc, make))
+ .def(py::init(&rotator_cc::make),
+ py::arg("phase_inc") = 0.,
+ py::arg("tag_inc_updates") = false,
+ D(rotator_cc, make))
diff --git a/gr-blocks/python/blocks/ b/gr-blocks/python/blocks/
new file mode 100644
index 0000000000..826bde2e39
--- /dev/null
+++ b/gr-blocks/python/blocks/
@@ -0,0 +1,322 @@
+#!/usr/bin/env python
+# Copyright 2021 Free Software Foundation, Inc.
+# This file is part of GNU Radio
+# SPDX-License-Identifier: GPL-3.0-or-later
+from gnuradio import gr, gr_unittest
+from gnuradio import blocks
+import numpy as np
+from numpy.random import uniform
+import pmt
+class qa_rotator_cc(gr_unittest.TestCase):
+ def _setUp(self, n_samples=100, tag_inc_updates=True):
+ """Base fixture: set up flowgraph and parameters"""
+ self.n_samples = n_samples # number of IQ samples to generate
+ self.f_in = uniform(high=0.5) # input frequency
+ self.f_shift = uniform(high=0.5) - \
+ self.f_in # rotator's starting frequency
+ # Input IQ samples
+ in_angles = 2 * np.pi * np.arange(self.n_samples) * self.f_in
+ in_samples = np.exp(1j * in_angles)
+ # Rotator's starting phase increment
+ phase_inc = 2 * np.pi * self.f_shift
+ # Flowgraph:
+ #
+ # /-> Vector Sink
+ # Vector Source -> Rotator --/
+ # \
+ # \-> Tag Debug
+ #
+ self.tb = gr.top_block()
+ self.source = blocks.vector_source_c(in_samples)
+ self.rotator_cc = blocks.rotator_cc(phase_inc, tag_inc_updates)
+ self.sink = blocks.vector_sink_c()
+ self.tag_sink = blocks.tag_debug(gr.sizeof_gr_complex, "rot_phase_inc",
+ "rot_phase_inc")
+ self.tag_sink.set_save_all(True)
+ self.tb.connect(self.source, self.rotator_cc, self.sink)
+ self.tb.connect(self.rotator_cc, self.tag_sink)
+ def setUp(self):
+ self._setUp()
+ def tearDown(self):
+ self.tb = None
+ def _post_phase_inc_cmd(self, new_phase_inc, offset=None):
+ """Post phase increment update command to the rotator block"""
+ cmd = pmt.make_dict()
+ cmd = pmt.dict_add(cmd, pmt.intern("inc"),
+ pmt.from_double(new_phase_inc))
+ if (offset is not None):
+ cmd = pmt.dict_add(cmd, pmt.intern("offset"),
+ pmt.from_uint64(offset))
+ self.rotator_cc.insert_tail(pmt.to_pmt("cmd"), cmd)
+ def _assert_tags(self, expected_values, expected_offsets):
+ """Check the tags received by the tag debug block"""
+ tags = self.tag_sink.current_tags()
+ expected_tags = list(zip(expected_values, expected_offsets))
+ self.assertEqual(len(tags), len(expected_tags))
+ for idx, (val, offset) in enumerate(expected_tags):
+ self.assertAlmostEqual(pmt.to_double(tags[idx].value),
+ val,
+ places=5)
+ self.assertEqual(tags[idx].offset, offset)
+ def test_freq_shift(self):
+ """Complex sinusoid frequency shift"""
+ f_out = self.f_in + self.f_shift # expected output frequency
+ # Expected IQ samples
+ expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
+ expected_samples = np.exp(1j * expected_angles)
+ self.assertComplexTuplesAlmostEqual(,
+ expected_samples,
+ places=4)
+ def _test_scheduled_phase_inc_update(self):
+ """Update the phase increment at a chosen offset via command message
+ Returns:
+ Tuple with the new phase increment, the phase increment update
+ sample offset, and a list with the expected rotated IQ samples.
+ """
+ new_f_shift = uniform(high=0.5) - self.f_in # rotator's new frequency
+ new_phase_inc = float(2 * np.pi * new_f_shift) # new phase increment
+ offset = int(self.n_samples / 2) # when to update the phase increment
+ f_out_1 = self.f_in + self.f_shift # output frequency before update
+ f_out_2 = self.f_in + new_f_shift # output frequency after update
+ # Post the phase increment command message to the rotator block
+ self._post_phase_inc_cmd(new_phase_inc, offset)
+ # Samples before and after the phase increment update
+ n_before = offset
+ n_after = self.n_samples - offset
+ # Expected IQ samples
+ #
+ # Note: when the phase increment is updated, the assumption is that the
+ # update occurs at the beginning of the sample period. Hence, the new
+ # phase rotation pace will only be observed in the following sample. In
+ # other words, the new phase increment can only be observed after some
+ # time goes by such that the rotator's phase accumulates.
+ #
+ # For example, suppose there are ten samples, and the update occurs on
+ # offset 5 (i.e., the sixth sample). Suppose also that the initial
+ # phase increment is 0.1, and the new increment is 0.2. In this case,
+ # we have the following sequence of angles:
+ #
+ # [0, 0.1, 0.2, 0.3, 0.4, 0.5. 0.7, 0.9, 1.1, 1.3]
+ # | \
+ # | \--------\
+ # update \
+ # applied new phase
+ # increment observed
+ #
+ # Ultimately, this means that the phase seen at the sample where the
+ # update is applied is still determined by the old phase increment.
+ angles_before_update = 2 * np.pi * np.arange(n_before + 1) * f_out_1
+ angles_after_update = angles_before_update[-1] + (
+ 2 * np.pi * np.arange(1, n_after) * f_out_2)
+ expected_angles = np.concatenate(
+ (angles_before_update, angles_after_update))
+ expected_samples = np.exp(1j * expected_angles)
+ return new_phase_inc, offset, expected_samples
+ def test_scheduled_phase_inc_update(self):
+ """Update the phase increment at a chosen offset via command message"""
+ new_phase_inc, \
+ offset, \
+ expected_samples = self._test_scheduled_phase_inc_update()
+ self._assert_tags([new_phase_inc], [offset])
+ self.assertComplexTuplesAlmostEqual(,
+ expected_samples,
+ places=4)
+ def test_scheduled_phase_inc_update_with_tagging_disabled(self):
+ """Test a scheduled phase increment update without tagging the update
+ Same as test_scheduled_phase_inc_update but with tagging disabled.
+ """
+ self._setUp(tag_inc_updates=False)
+ _, _, expected_samples = self._test_scheduled_phase_inc_update()
+ tags = self.tag_sink.current_tags()
+ self.assertEqual(len(tags), 0)
+ self.assertComplexTuplesAlmostEqual(,
+ expected_samples,
+ places=4)
+ def test_immediate_phase_inc_update(self):
+ """Immediate phase increment update via command message
+ In this test, the command message does not include the offset
+ key. Hence, the rotator should update its phase increment immediately.
+ """
+ new_f_shift = uniform(high=0.5) - self.f_in # rotator's new frequency
+ new_phase_inc = float(2 * np.pi * new_f_shift) # new phase increment
+ f_out = self.f_in + new_f_shift # output frequency after update
+ # Post the phase increment command message to the rotator block
+ self._post_phase_inc_cmd(new_phase_inc)
+ # The rotator updates the increment immediately (on the first sample)
+ expected_tag_offset = 0
+ # Expected samples (all of them with the new frequency set via message)
+ expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
+ expected_samples = np.exp(1j * expected_angles)
+ self._assert_tags([new_phase_inc], [expected_tag_offset])
+ self.assertComplexTuplesAlmostEqual(,
+ expected_samples,
+ places=4)
+ def test_zero_change_phase_inc_update(self):
+ """Schedule a phase increment update that does not change anything
+ If the scheduled phase increment update sets the same phase increment
+ that is already active in the rotator block, there should be no effect
+ on the output signal. Nevertheless, the rotator should still tag the
+ update.
+ """
+ new_phase_inc = 2 * np.pi * self.f_shift # no change
+ offset = int(self.n_samples / 2) # when to update the phase increment
+ f_out = self.f_in + self.f_shift # expected output frequency
+ # Post the phase increment command message to the rotator block
+ self._post_phase_inc_cmd(new_phase_inc, offset)
+ # Expected IQ samples
+ expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
+ expected_samples = np.exp(1j * expected_angles)
+ self._assert_tags([new_phase_inc], [offset])
+ self.assertComplexTuplesAlmostEqual(,
+ expected_samples,
+ places=4)
+ def test_consecutive_phase_inc_updates(self):
+ """Test tagging of a few consecutive phase increment updates"""
+ n_updates = 3
+ new_f_shifts = uniform(high=0.5, size=n_updates) # new frequencies
+ new_phase_incs = 2 * np.pi * new_f_shifts # new phase increments
+ offsets = self.n_samples * np.arange(1, 4, 1) / 4 # when to update
+ for new_phase_inc, offset in zip(new_phase_incs, offsets):
+ self._post_phase_inc_cmd(new_phase_inc, int(offset))
+ self._assert_tags(new_phase_incs, offsets)
+ def test_out_of_order_phase_inc_updates(self):
+ """Test tagging of a few out-of-order phase increment updates
+ The rotator should sort the increment updates and apply them in order.
+ """
+ n_updates = 3
+ new_f_shifts = uniform(high=0.5, size=n_updates) # new frequencies
+ new_phase_incs = 2 * np.pi * new_f_shifts # new phase increments
+ offsets = self.n_samples * np.arange(1, 4, 1) / 4 # when to update
+ # Post the phase increment command messages out of order
+ self._post_phase_inc_cmd(new_phase_incs[0], int(offsets[0]))
+ self._post_phase_inc_cmd(new_phase_incs[2], int(offsets[2]))
+ self._post_phase_inc_cmd(new_phase_incs[1], int(offsets[1]))
+ # Confirm they are received in order
+ self._assert_tags(new_phase_incs, offsets)
+ def test_duplicate_phase_inc_updates(self):
+ """Test multiple phase increment updates scheduled for the same sample
+ The rotator block applies all updates scheduled for the same sample
+ offset. Hence, only the last update shall take effect.
+ """
+ n_updates = 3
+ # Post the phase increment command messages
+ new_phase_incs = []
+ for i in range(n_updates):
+ new_phase_inc, \
+ offset, \
+ expected_samples = self._test_scheduled_phase_inc_update()
+ new_phase_incs.append(new_phase_inc)
+ # All "n_updates" tags are expected to be present
+ self._assert_tags(new_phase_incs, [offset] * n_updates)
+ # However, only the last update takes effect on the rotated samples
+ self.assertComplexTuplesAlmostEqual(,
+ expected_samples,
+ places=4)
+ def test_phase_inc_update_out_of_range(self):
+ """Test phase increment update sent for an out-of-range offset"""
+ self._setUp(n_samples=2**16)
+ n_half_samples = int(self.n_samples / 2)
+ # Schedule a phase increment update whose offset is initially
+ # out-of-range due to being in the future
+ new_phase_inc = 2 * np.pi * 0.1
+ self._post_phase_inc_cmd(new_phase_inc, offset=n_half_samples)
+ # Run the flowgraph and wait until the rotator block does some work
+ self.tb.start()
+ while (self.rotator_cc.nitems_written(0) == 0):
+ pass
+ # The out-of-range update (scheduled for the future) should not have
+ # been applied yet, but it should not have been discarded either.
+ self._assert_tags([], [])
+ # Wait until at least the first half of samples have been processed
+ while (self.rotator_cc.nitems_written(0) < n_half_samples):
+ pass
+ # Now, schedule an update that is out-of-range due to being in the past
+ self._post_phase_inc_cmd(new_phase_inc, offset=0)
+ # And run the flowgraph to completion
+ self.tb.wait()
+ # The increment update initially scheduled for the future should have
+ # been applied when processing the second half of samples. In contrast,
+ # the update scheduled for the past should have been discarded.
+ self._assert_tags([new_phase_inc], [n_half_samples])
+if __name__ == '__main__':