summaryrefslogtreecommitdiff
path: root/gr-blocks
diff options
context:
space:
mode:
authorIgor Freire <igor@blockstream.com>2020-01-06 16:00:48 -0300
committermormj <34754695+mormj@users.noreply.github.com>2021-04-17 19:58:05 -0400
commit9e24ad6d94e75827f2ea1fe0e7371dc16dc39db6 (patch)
tree30488f0a7c3a665579107d5cd6eb8da797d5cac5 /gr-blocks
parentea9a86153ba48697ff3cf81e7d002433af93c9ac (diff)
blocks: Add msg port to set rotator's phase inc
Receivers commonly estimate the frequency offset on a block that is downstream relative to the frequency correction NCO. In such architectures, typically the frequency offset estimator feeds the estimation back to the NCO. Until now, this was not possible if using the rotator block as the NCO. This patch adds a message port to the rotator block such that the referred feedback architecture becomes feasible. A downstream block can estimate the frequency offset and send a message to the rotator block to update its rotating frequency (i.e., its phase increment). The requesting block can schedule the phase increment update to a specific absolute sample index. This feature is useful for receivers that rely on a frame structure and pilot-aided frequency offset estimation. If the true frequency offset is allowed to change at any random instant throughout the frame, the frame-averaged frequency offset estimation can become less reliable. Instead, it is often better to update the frequency correction right at the start of a new frame. With that, the true frequency offset is preserved in the course of a frame and only changes around the boundary between consecutive frames. For flexibility when using this feature, it is the responsibility of the downstream block to schedule the phase increment update properly. For example, if there is a decimator in between the rotator block and the frequency offset estimator, the latter will need to take the sample rate conversion into account when defining the absolute sample offset when the update should be applied. Besides, the rotator block can now place a tag on the sample where it updates the phase increment. This allows a downstream block to better calibrate and validate the scheduling of phase increment updates. Signed-off-by: Igor Freire <igor@blockstream.com>
Diffstat (limited to 'gr-blocks')
-rw-r--r--gr-blocks/grc/blocks_rotator_cc.block.yml14
-rw-r--r--gr-blocks/include/gnuradio/blocks/rotator_cc.h24
-rw-r--r--gr-blocks/lib/rotator_cc_impl.cc92
-rw-r--r--gr-blocks/lib/rotator_cc_impl.h23
-rw-r--r--gr-blocks/python/blocks/bindings/rotator_cc_python.cc7
-rw-r--r--gr-blocks/python/blocks/qa_rotator_cc.py322
6 files changed, 470 insertions, 12 deletions
diff --git a/gr-blocks/grc/blocks_rotator_cc.block.yml b/gr-blocks/grc/blocks_rotator_cc.block.yml
index 19143f98bb..6cd0e1321d 100644
--- a/gr-blocks/grc/blocks_rotator_cc.block.yml
+++ b/gr-blocks/grc/blocks_rotator_cc.block.yml
@@ -7,10 +7,20 @@ parameters:
label: Phase Increment
dtype: real
default: '0.0'
+- id: tag_inc_update
+ label: Tag Increment Updates
+ dtype: enum
+ default: 'False'
+ options: ['True', 'False']
+ option_labels: ['Yes', 'No']
+ hide: part
inputs:
- domain: stream
dtype: complex
+- domain: message
+ id: cmd
+ optional: true
outputs:
- domain: stream
@@ -18,14 +28,14 @@ outputs:
templates:
imports: from gnuradio import blocks
- make: blocks.rotator_cc(${phase_inc})
+ make: blocks.rotator_cc(${phase_inc}, ${tag_inc_update})
callbacks:
- set_phase_inc(${phase_inc})
cpp_templates:
includes: ['#include <gnuradio/blocks/rotator_cc.h>']
declarations: 'blocks::rotator_cc::sptr ${id};'
- make: 'this->${id} = blocks::rotator_cc::make(${phase_inc});'
+ make: 'this->${id} = blocks::rotator_cc::make(${phase_inc}, ${tag_inc_update});'
callbacks:
- set_phase_inc(${phase_inc})
diff --git a/gr-blocks/include/gnuradio/blocks/rotator_cc.h b/gr-blocks/include/gnuradio/blocks/rotator_cc.h
index 9979ddd98a..65fd8d5f2c 100644
--- a/gr-blocks/include/gnuradio/blocks/rotator_cc.h
+++ b/gr-blocks/include/gnuradio/blocks/rotator_cc.h
@@ -20,6 +20,23 @@ namespace blocks {
/*!
* \brief Complex rotator
* \ingroup math_operators_blk
+ *
+ * \details
+ *
+ * Rotates an input complex sequence using a complex exponential in the form of
+ * exp(1j * phase_inc * n), where "phase_inc" is a chosen phase increment in
+ * radians and "n" is the sample index.
+ *
+ * Message Ports:
+ *
+ * - cmd (input):
+ * Receives a PMT dictionary with a command message to set a new phase
+ * increment on the rotator at a specified sample offset. The new increment
+ * must be provided as a PMT double on a key named "inc". The target sample
+ * offset on which to update the phase increment must be given as a PMT
+ * uint64 (with the absolute output item number) on a key named
+ * "offset". Unlike the "inc" key, the "offset" key is optional. When not
+ * provided, the rotator updates its phase increment immediately.
*/
class BLOCKS_API rotator_cc : virtual public sync_block
{
@@ -28,10 +45,13 @@ public:
typedef std::shared_ptr<rotator_cc> sptr;
/*!
- * \brief Make an complex rotator block
+ * \brief Make a complex rotator block
* \param phase_inc rotational velocity
+ * \param tag_inc_updates Tag the sample where a phase increment update is
+ * applied following the reception of a control
+ * message received via the input message port.
*/
- static sptr make(double phase_inc = 0.0);
+ static sptr make(double phase_inc = 0.0, bool tag_inc_updates = false);
virtual void set_phase_inc(double phase_inc) = 0;
};
diff --git a/gr-blocks/lib/rotator_cc_impl.cc b/gr-blocks/lib/rotator_cc_impl.cc
index d7d9fa92cf..6d7d8ac9b9 100644
--- a/gr-blocks/lib/rotator_cc_impl.cc
+++ b/gr-blocks/lib/rotator_cc_impl.cc
@@ -20,17 +20,23 @@
namespace gr {
namespace blocks {
-rotator_cc::sptr rotator_cc::make(double phase_inc)
+rotator_cc::sptr rotator_cc::make(double phase_inc, bool tag_inc_updates)
{
- return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc);
+ return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc, tag_inc_updates);
}
-rotator_cc_impl::rotator_cc_impl(double phase_inc)
+rotator_cc_impl::rotator_cc_impl(double phase_inc, bool tag_inc_updates)
: sync_block("rotator_cc",
io_signature::make(1, 1, sizeof(gr_complex)),
- io_signature::make(1, 1, sizeof(gr_complex)))
+ io_signature::make(1, 1, sizeof(gr_complex))),
+ d_tag_inc_updates(tag_inc_updates),
+ d_inc_update_queue(cmp_phase_inc_update_offset)
{
set_phase_inc(phase_inc);
+
+ const pmt::pmt_t port_id = pmt::mp("cmd");
+ message_port_register_in(port_id);
+ set_msg_handler(port_id, [this](pmt::pmt_t msg) { this->handle_cmd_msg(msg); });
}
rotator_cc_impl::~rotator_cc_impl() {}
@@ -40,10 +46,43 @@ void rotator_cc_impl::set_phase_inc(double phase_inc)
d_r.set_phase_incr(exp(gr_complex(0, phase_inc)));
}
+void rotator_cc_impl::handle_cmd_msg(pmt::pmt_t msg)
+{
+ gr::thread::scoped_lock l(d_mutex);
+
+ static const pmt::pmt_t inc_key = pmt::intern("inc");
+ static const pmt::pmt_t offset_key = pmt::intern("offset");
+
+ if (!pmt::is_dict(msg)) {
+ throw std::runtime_error("rotator_cc: Rotator command message "
+ "must be a PMT dictionary");
+ }
+
+ bool handled = false;
+
+ if (pmt::dict_has_key(msg, inc_key)) {
+ /* Prepare to update the phase increment on a specific absolute sample
+ * offset. If the offset is not defined in the command message,
+ * configure the phase increment update to occur immediately. */
+ phase_inc_update_t update{};
+ update.phase_inc = pmt::to_double(pmt::dict_ref(msg, inc_key, pmt::PMT_NIL));
+ update.offset = pmt::dict_has_key(msg, offset_key)
+ ? pmt::to_uint64(pmt::dict_ref(msg, offset_key, pmt::PMT_NIL))
+ : nitems_written(0);
+ d_inc_update_queue.push(update);
+ handled = true;
+ }
+
+ if (!handled) {
+ throw std::runtime_error("rotator_cc: Unsupported command message");
+ }
+}
+
int rotator_cc_impl::work(int noutput_items,
gr_vector_const_void_star& input_items,
gr_vector_void_star& output_items)
{
+ gr::thread::scoped_lock l(d_mutex);
const gr_complex* in = (const gr_complex*)input_items[0];
gr_complex* out = (gr_complex*)output_items[0];
@@ -51,7 +90,50 @@ int rotator_cc_impl::work(int noutput_items,
for (int i=0; i<noutput_items; i++)
out[i] = d_r.rotate(in[i]);
#else
- d_r.rotateN(out, in, noutput_items);
+
+ const uint64_t n_written = nitems_written(0);
+
+ if (d_inc_update_queue.empty()) {
+ d_r.rotateN(out, in, noutput_items);
+ } else {
+ /* If there are phase increment updates scheduled for now, handle the
+ * rotation in steps and update the phase increment in between. */
+ int nprocessed_items = 0;
+ while (!d_inc_update_queue.empty()) {
+ auto next_update = d_inc_update_queue.top();
+
+ if (next_update.offset < (n_written + nprocessed_items)) {
+ d_inc_update_queue.pop();
+ continue; // we didn't process this update on time - drop it
+ }
+
+ if (next_update.offset >= (n_written + noutput_items)) {
+ break; // the update is for a future batch of samples
+ }
+
+ // The update is scheduled for this batch of samples. Apply it now.
+ d_inc_update_queue.pop();
+
+ // Process all samples until the scheduled phase increment update
+ int items_before_update = next_update.offset - n_written - nprocessed_items;
+ d_r.rotateN(out, in, items_before_update);
+ nprocessed_items += items_before_update;
+
+ set_phase_inc(next_update.phase_inc);
+
+ if (d_tag_inc_updates) {
+ add_item_tag(0,
+ next_update.offset,
+ pmt::string_to_symbol("rot_phase_inc"),
+ pmt::from_float(next_update.phase_inc));
+ }
+ }
+
+ // Rotate the remaining samples
+ d_r.rotateN(out + nprocessed_items,
+ in + nprocessed_items,
+ (noutput_items - nprocessed_items));
+ }
#endif
return noutput_items;
diff --git a/gr-blocks/lib/rotator_cc_impl.h b/gr-blocks/lib/rotator_cc_impl.h
index c1001d11b3..dbdb13a679 100644
--- a/gr-blocks/lib/rotator_cc_impl.h
+++ b/gr-blocks/lib/rotator_cc_impl.h
@@ -13,10 +13,26 @@
#include <gnuradio/blocks/rotator.h>
#include <gnuradio/blocks/rotator_cc.h>
+#include <queue>
namespace gr {
namespace blocks {
+struct phase_inc_update_t {
+ uint64_t offset;
+ double phase_inc;
+};
+
+bool cmp_phase_inc_update_offset(phase_inc_update_t lhs, phase_inc_update_t rhs)
+{
+ return lhs.offset > rhs.offset;
+};
+
+typedef std::priority_queue<phase_inc_update_t,
+ std::vector<phase_inc_update_t>,
+ decltype(&cmp_phase_inc_update_offset)>
+ phase_inc_queue_t;
+
/*!
* \brief Complex rotator
* \ingroup math_blk
@@ -25,9 +41,14 @@ class rotator_cc_impl : public rotator_cc
{
private:
rotator d_r;
+ bool d_tag_inc_updates;
+ phase_inc_queue_t d_inc_update_queue;
+ gr::thread::mutex d_mutex;
+
+ void handle_cmd_msg(pmt::pmt_t msg);
public:
- rotator_cc_impl(double phase_inc = 0.0);
+ rotator_cc_impl(double phase_inc = 0.0, bool tag_inc_updates = false);
~rotator_cc_impl() override;
void set_phase_inc(double phase_inc) override;
diff --git a/gr-blocks/python/blocks/bindings/rotator_cc_python.cc b/gr-blocks/python/blocks/bindings/rotator_cc_python.cc
index 906a7e2105..08587999bd 100644
--- a/gr-blocks/python/blocks/bindings/rotator_cc_python.cc
+++ b/gr-blocks/python/blocks/bindings/rotator_cc_python.cc
@@ -14,7 +14,7 @@
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
/* BINDTOOL_HEADER_FILE(rotator_cc.h) */
-/* BINDTOOL_HEADER_FILE_HASH(e127d40b39a00e6725b5f7214ef0e6d2) */
+/* BINDTOOL_HEADER_FILE_HASH(a8a69ee8f26e479fef8ba11414826da8) */
/***********************************************************************************/
#include <pybind11/complex.h>
@@ -39,7 +39,10 @@ void bind_rotator_cc(py::module& m)
gr::basic_block,
std::shared_ptr<rotator_cc>>(m, "rotator_cc", D(rotator_cc))
- .def(py::init(&rotator_cc::make), py::arg("phase_inc") = 0., D(rotator_cc, make))
+ .def(py::init(&rotator_cc::make),
+ py::arg("phase_inc") = 0.,
+ py::arg("tag_inc_updates") = false,
+ D(rotator_cc, make))
.def("set_phase_inc",
diff --git a/gr-blocks/python/blocks/qa_rotator_cc.py b/gr-blocks/python/blocks/qa_rotator_cc.py
new file mode 100644
index 0000000000..826bde2e39
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_rotator_cc.py
@@ -0,0 +1,322 @@
+#!/usr/bin/env python
+#
+# Copyright 2021 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+#
+#
+
+from gnuradio import gr, gr_unittest
+from gnuradio import blocks
+import numpy as np
+from numpy.random import uniform
+import pmt
+
+
+class qa_rotator_cc(gr_unittest.TestCase):
+ def _setUp(self, n_samples=100, tag_inc_updates=True):
+ """Base fixture: set up flowgraph and parameters"""
+ self.n_samples = n_samples # number of IQ samples to generate
+ self.f_in = uniform(high=0.5) # input frequency
+ self.f_shift = uniform(high=0.5) - \
+ self.f_in # rotator's starting frequency
+
+ # Input IQ samples
+ in_angles = 2 * np.pi * np.arange(self.n_samples) * self.f_in
+ in_samples = np.exp(1j * in_angles)
+
+ # Rotator's starting phase increment
+ phase_inc = 2 * np.pi * self.f_shift
+
+ # Flowgraph:
+ #
+ # /-> Vector Sink
+ # Vector Source -> Rotator --/
+ # \
+ # \-> Tag Debug
+ #
+ self.tb = gr.top_block()
+ self.source = blocks.vector_source_c(in_samples)
+ self.rotator_cc = blocks.rotator_cc(phase_inc, tag_inc_updates)
+ self.sink = blocks.vector_sink_c()
+ self.tag_sink = blocks.tag_debug(gr.sizeof_gr_complex, "rot_phase_inc",
+ "rot_phase_inc")
+ self.tag_sink.set_save_all(True)
+ self.tb.connect(self.source, self.rotator_cc, self.sink)
+ self.tb.connect(self.rotator_cc, self.tag_sink)
+
+ def setUp(self):
+ self._setUp()
+
+ def tearDown(self):
+ self.tb = None
+
+ def _post_phase_inc_cmd(self, new_phase_inc, offset=None):
+ """Post phase increment update command to the rotator block"""
+ cmd = pmt.make_dict()
+ cmd = pmt.dict_add(cmd, pmt.intern("inc"),
+ pmt.from_double(new_phase_inc))
+ if (offset is not None):
+ cmd = pmt.dict_add(cmd, pmt.intern("offset"),
+ pmt.from_uint64(offset))
+ self.rotator_cc.insert_tail(pmt.to_pmt("cmd"), cmd)
+
+ def _assert_tags(self, expected_values, expected_offsets):
+ """Check the tags received by the tag debug block"""
+ tags = self.tag_sink.current_tags()
+ expected_tags = list(zip(expected_values, expected_offsets))
+ self.assertEqual(len(tags), len(expected_tags))
+
+ for idx, (val, offset) in enumerate(expected_tags):
+ self.assertAlmostEqual(pmt.to_double(tags[idx].value),
+ val,
+ places=5)
+ self.assertEqual(tags[idx].offset, offset)
+
+ def test_freq_shift(self):
+ """Complex sinusoid frequency shift"""
+ f_out = self.f_in + self.f_shift # expected output frequency
+
+ # Expected IQ samples
+ expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
+ expected_samples = np.exp(1j * expected_angles)
+
+ self.tb.run()
+ self.assertComplexTuplesAlmostEqual(self.sink.data(),
+ expected_samples,
+ places=4)
+
+ def _test_scheduled_phase_inc_update(self):
+ """Update the phase increment at a chosen offset via command message
+
+ Returns:
+ Tuple with the new phase increment, the phase increment update
+ sample offset, and a list with the expected rotated IQ samples.
+
+ """
+ new_f_shift = uniform(high=0.5) - self.f_in # rotator's new frequency
+ new_phase_inc = float(2 * np.pi * new_f_shift) # new phase increment
+ offset = int(self.n_samples / 2) # when to update the phase increment
+ f_out_1 = self.f_in + self.f_shift # output frequency before update
+ f_out_2 = self.f_in + new_f_shift # output frequency after update
+
+ # Post the phase increment command message to the rotator block
+ self._post_phase_inc_cmd(new_phase_inc, offset)
+
+ # Samples before and after the phase increment update
+ n_before = offset
+ n_after = self.n_samples - offset
+
+ # Expected IQ samples
+ #
+ # Note: when the phase increment is updated, the assumption is that the
+ # update occurs at the beginning of the sample period. Hence, the new
+ # phase rotation pace will only be observed in the following sample. In
+ # other words, the new phase increment can only be observed after some
+ # time goes by such that the rotator's phase accumulates.
+ #
+ # For example, suppose there are ten samples, and the update occurs on
+ # offset 5 (i.e., the sixth sample). Suppose also that the initial
+ # phase increment is 0.1, and the new increment is 0.2. In this case,
+ # we have the following sequence of angles:
+ #
+ # [0, 0.1, 0.2, 0.3, 0.4, 0.5. 0.7, 0.9, 1.1, 1.3]
+ # | \
+ # | \--------\
+ # update \
+ # applied new phase
+ # increment observed
+ #
+ # Ultimately, this means that the phase seen at the sample where the
+ # update is applied is still determined by the old phase increment.
+ angles_before_update = 2 * np.pi * np.arange(n_before + 1) * f_out_1
+ angles_after_update = angles_before_update[-1] + (
+ 2 * np.pi * np.arange(1, n_after) * f_out_2)
+ expected_angles = np.concatenate(
+ (angles_before_update, angles_after_update))
+ expected_samples = np.exp(1j * expected_angles)
+
+ return new_phase_inc, offset, expected_samples
+
+ def test_scheduled_phase_inc_update(self):
+ """Update the phase increment at a chosen offset via command message"""
+ new_phase_inc, \
+ offset, \
+ expected_samples = self._test_scheduled_phase_inc_update()
+
+ self.tb.run()
+ self._assert_tags([new_phase_inc], [offset])
+ self.assertComplexTuplesAlmostEqual(self.sink.data(),
+ expected_samples,
+ places=4)
+
+ def test_scheduled_phase_inc_update_with_tagging_disabled(self):
+ """Test a scheduled phase increment update without tagging the update
+
+ Same as test_scheduled_phase_inc_update but with tagging disabled.
+
+ """
+ self._setUp(tag_inc_updates=False)
+
+ _, _, expected_samples = self._test_scheduled_phase_inc_update()
+
+ self.tb.run()
+ tags = self.tag_sink.current_tags()
+ self.assertEqual(len(tags), 0)
+ self.assertComplexTuplesAlmostEqual(self.sink.data(),
+ expected_samples,
+ places=4)
+
+ def test_immediate_phase_inc_update(self):
+ """Immediate phase increment update via command message
+
+ In this test, the command message does not include the offset
+ key. Hence, the rotator should update its phase increment immediately.
+
+ """
+ new_f_shift = uniform(high=0.5) - self.f_in # rotator's new frequency
+ new_phase_inc = float(2 * np.pi * new_f_shift) # new phase increment
+ f_out = self.f_in + new_f_shift # output frequency after update
+
+ # Post the phase increment command message to the rotator block
+ self._post_phase_inc_cmd(new_phase_inc)
+
+ # The rotator updates the increment immediately (on the first sample)
+ expected_tag_offset = 0
+
+ # Expected samples (all of them with the new frequency set via message)
+ expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
+ expected_samples = np.exp(1j * expected_angles)
+
+ self.tb.run()
+ self._assert_tags([new_phase_inc], [expected_tag_offset])
+ self.assertComplexTuplesAlmostEqual(self.sink.data(),
+ expected_samples,
+ places=4)
+
+ def test_zero_change_phase_inc_update(self):
+ """Schedule a phase increment update that does not change anything
+
+ If the scheduled phase increment update sets the same phase increment
+ that is already active in the rotator block, there should be no effect
+ on the output signal. Nevertheless, the rotator should still tag the
+ update.
+
+ """
+ new_phase_inc = 2 * np.pi * self.f_shift # no change
+ offset = int(self.n_samples / 2) # when to update the phase increment
+ f_out = self.f_in + self.f_shift # expected output frequency
+
+ # Post the phase increment command message to the rotator block
+ self._post_phase_inc_cmd(new_phase_inc, offset)
+
+ # Expected IQ samples
+ expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out
+ expected_samples = np.exp(1j * expected_angles)
+
+ self.tb.run()
+ self._assert_tags([new_phase_inc], [offset])
+ self.assertComplexTuplesAlmostEqual(self.sink.data(),
+ expected_samples,
+ places=4)
+
+ def test_consecutive_phase_inc_updates(self):
+ """Test tagging of a few consecutive phase increment updates"""
+ n_updates = 3
+ new_f_shifts = uniform(high=0.5, size=n_updates) # new frequencies
+ new_phase_incs = 2 * np.pi * new_f_shifts # new phase increments
+ offsets = self.n_samples * np.arange(1, 4, 1) / 4 # when to update
+
+ for new_phase_inc, offset in zip(new_phase_incs, offsets):
+ self._post_phase_inc_cmd(new_phase_inc, int(offset))
+
+ self.tb.run()
+ self._assert_tags(new_phase_incs, offsets)
+
+ def test_out_of_order_phase_inc_updates(self):
+ """Test tagging of a few out-of-order phase increment updates
+
+ The rotator should sort the increment updates and apply them in order.
+
+ """
+ n_updates = 3
+ new_f_shifts = uniform(high=0.5, size=n_updates) # new frequencies
+ new_phase_incs = 2 * np.pi * new_f_shifts # new phase increments
+ offsets = self.n_samples * np.arange(1, 4, 1) / 4 # when to update
+
+ # Post the phase increment command messages out of order
+ self._post_phase_inc_cmd(new_phase_incs[0], int(offsets[0]))
+ self._post_phase_inc_cmd(new_phase_incs[2], int(offsets[2]))
+ self._post_phase_inc_cmd(new_phase_incs[1], int(offsets[1]))
+
+ self.tb.run()
+
+ # Confirm they are received in order
+ self._assert_tags(new_phase_incs, offsets)
+
+ def test_duplicate_phase_inc_updates(self):
+ """Test multiple phase increment updates scheduled for the same sample
+
+ The rotator block applies all updates scheduled for the same sample
+ offset. Hence, only the last update shall take effect.
+
+ """
+ n_updates = 3
+
+ # Post the phase increment command messages
+ new_phase_incs = []
+ for i in range(n_updates):
+ new_phase_inc, \
+ offset, \
+ expected_samples = self._test_scheduled_phase_inc_update()
+ new_phase_incs.append(new_phase_inc)
+
+ self.tb.run()
+
+ # All "n_updates" tags are expected to be present
+ self._assert_tags(new_phase_incs, [offset] * n_updates)
+
+ # However, only the last update takes effect on the rotated samples
+ self.assertComplexTuplesAlmostEqual(self.sink.data(),
+ expected_samples,
+ places=4)
+
+ def test_phase_inc_update_out_of_range(self):
+ """Test phase increment update sent for an out-of-range offset"""
+ self._setUp(n_samples=2**16)
+ n_half_samples = int(self.n_samples / 2)
+
+ # Schedule a phase increment update whose offset is initially
+ # out-of-range due to being in the future
+ new_phase_inc = 2 * np.pi * 0.1
+ self._post_phase_inc_cmd(new_phase_inc, offset=n_half_samples)
+
+ # Run the flowgraph and wait until the rotator block does some work
+ self.tb.start()
+ while (self.rotator_cc.nitems_written(0) == 0):
+ pass
+
+ # The out-of-range update (scheduled for the future) should not have
+ # been applied yet, but it should not have been discarded either.
+ self._assert_tags([], [])
+
+ # Wait until at least the first half of samples have been processed
+ while (self.rotator_cc.nitems_written(0) < n_half_samples):
+ pass
+
+ # Now, schedule an update that is out-of-range due to being in the past
+ self._post_phase_inc_cmd(new_phase_inc, offset=0)
+
+ # And run the flowgraph to completion
+ self.tb.wait()
+
+ # The increment update initially scheduled for the future should have
+ # been applied when processing the second half of samples. In contrast,
+ # the update scheduled for the past should have been discarded.
+ self._assert_tags([new_phase_inc], [n_half_samples])
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_rotator_cc)