diff options
-rw-r--r-- | gr-blocks/grc/blocks_rotator_cc.block.yml | 14 | ||||
-rw-r--r-- | gr-blocks/include/gnuradio/blocks/rotator_cc.h | 24 | ||||
-rw-r--r-- | gr-blocks/lib/rotator_cc_impl.cc | 92 | ||||
-rw-r--r-- | gr-blocks/lib/rotator_cc_impl.h | 23 | ||||
-rw-r--r-- | gr-blocks/python/blocks/bindings/rotator_cc_python.cc | 7 | ||||
-rw-r--r-- | gr-blocks/python/blocks/qa_rotator_cc.py | 322 |
6 files changed, 470 insertions, 12 deletions
diff --git a/gr-blocks/grc/blocks_rotator_cc.block.yml b/gr-blocks/grc/blocks_rotator_cc.block.yml index 19143f98bb..6cd0e1321d 100644 --- a/gr-blocks/grc/blocks_rotator_cc.block.yml +++ b/gr-blocks/grc/blocks_rotator_cc.block.yml @@ -7,10 +7,20 @@ parameters: label: Phase Increment dtype: real default: '0.0' +- id: tag_inc_update + label: Tag Increment Updates + dtype: enum + default: 'False' + options: ['True', 'False'] + option_labels: ['Yes', 'No'] + hide: part inputs: - domain: stream dtype: complex +- domain: message + id: cmd + optional: true outputs: - domain: stream @@ -18,14 +28,14 @@ outputs: templates: imports: from gnuradio import blocks - make: blocks.rotator_cc(${phase_inc}) + make: blocks.rotator_cc(${phase_inc}, ${tag_inc_update}) callbacks: - set_phase_inc(${phase_inc}) cpp_templates: includes: ['#include <gnuradio/blocks/rotator_cc.h>'] declarations: 'blocks::rotator_cc::sptr ${id};' - make: 'this->${id} = blocks::rotator_cc::make(${phase_inc});' + make: 'this->${id} = blocks::rotator_cc::make(${phase_inc}, ${tag_inc_update});' callbacks: - set_phase_inc(${phase_inc}) diff --git a/gr-blocks/include/gnuradio/blocks/rotator_cc.h b/gr-blocks/include/gnuradio/blocks/rotator_cc.h index 9979ddd98a..65fd8d5f2c 100644 --- a/gr-blocks/include/gnuradio/blocks/rotator_cc.h +++ b/gr-blocks/include/gnuradio/blocks/rotator_cc.h @@ -20,6 +20,23 @@ namespace blocks { /*! * \brief Complex rotator * \ingroup math_operators_blk + * + * \details + * + * Rotates an input complex sequence using a complex exponential in the form of + * exp(1j * phase_inc * n), where "phase_inc" is a chosen phase increment in + * radians and "n" is the sample index. + * + * Message Ports: + * + * - cmd (input): + * Receives a PMT dictionary with a command message to set a new phase + * increment on the rotator at a specified sample offset. The new increment + * must be provided as a PMT double on a key named "inc". The target sample + * offset on which to update the phase increment must be given as a PMT + * uint64 (with the absolute output item number) on a key named + * "offset". Unlike the "inc" key, the "offset" key is optional. When not + * provided, the rotator updates its phase increment immediately. */ class BLOCKS_API rotator_cc : virtual public sync_block { @@ -28,10 +45,13 @@ public: typedef std::shared_ptr<rotator_cc> sptr; /*! - * \brief Make an complex rotator block + * \brief Make a complex rotator block * \param phase_inc rotational velocity + * \param tag_inc_updates Tag the sample where a phase increment update is + * applied following the reception of a control + * message received via the input message port. */ - static sptr make(double phase_inc = 0.0); + static sptr make(double phase_inc = 0.0, bool tag_inc_updates = false); virtual void set_phase_inc(double phase_inc) = 0; }; diff --git a/gr-blocks/lib/rotator_cc_impl.cc b/gr-blocks/lib/rotator_cc_impl.cc index d7d9fa92cf..6d7d8ac9b9 100644 --- a/gr-blocks/lib/rotator_cc_impl.cc +++ b/gr-blocks/lib/rotator_cc_impl.cc @@ -20,17 +20,23 @@ namespace gr { namespace blocks { -rotator_cc::sptr rotator_cc::make(double phase_inc) +rotator_cc::sptr rotator_cc::make(double phase_inc, bool tag_inc_updates) { - return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc); + return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc, tag_inc_updates); } -rotator_cc_impl::rotator_cc_impl(double phase_inc) +rotator_cc_impl::rotator_cc_impl(double phase_inc, bool tag_inc_updates) : sync_block("rotator_cc", io_signature::make(1, 1, sizeof(gr_complex)), - io_signature::make(1, 1, sizeof(gr_complex))) + io_signature::make(1, 1, sizeof(gr_complex))), + d_tag_inc_updates(tag_inc_updates), + d_inc_update_queue(cmp_phase_inc_update_offset) { set_phase_inc(phase_inc); + + const pmt::pmt_t port_id = pmt::mp("cmd"); + message_port_register_in(port_id); + set_msg_handler(port_id, [this](pmt::pmt_t msg) { this->handle_cmd_msg(msg); }); } rotator_cc_impl::~rotator_cc_impl() {} @@ -40,10 +46,43 @@ void rotator_cc_impl::set_phase_inc(double phase_inc) d_r.set_phase_incr(exp(gr_complex(0, phase_inc))); } +void rotator_cc_impl::handle_cmd_msg(pmt::pmt_t msg) +{ + gr::thread::scoped_lock l(d_mutex); + + static const pmt::pmt_t inc_key = pmt::intern("inc"); + static const pmt::pmt_t offset_key = pmt::intern("offset"); + + if (!pmt::is_dict(msg)) { + throw std::runtime_error("rotator_cc: Rotator command message " + "must be a PMT dictionary"); + } + + bool handled = false; + + if (pmt::dict_has_key(msg, inc_key)) { + /* Prepare to update the phase increment on a specific absolute sample + * offset. If the offset is not defined in the command message, + * configure the phase increment update to occur immediately. */ + phase_inc_update_t update{}; + update.phase_inc = pmt::to_double(pmt::dict_ref(msg, inc_key, pmt::PMT_NIL)); + update.offset = pmt::dict_has_key(msg, offset_key) + ? pmt::to_uint64(pmt::dict_ref(msg, offset_key, pmt::PMT_NIL)) + : nitems_written(0); + d_inc_update_queue.push(update); + handled = true; + } + + if (!handled) { + throw std::runtime_error("rotator_cc: Unsupported command message"); + } +} + int rotator_cc_impl::work(int noutput_items, gr_vector_const_void_star& input_items, gr_vector_void_star& output_items) { + gr::thread::scoped_lock l(d_mutex); const gr_complex* in = (const gr_complex*)input_items[0]; gr_complex* out = (gr_complex*)output_items[0]; @@ -51,7 +90,50 @@ int rotator_cc_impl::work(int noutput_items, for (int i=0; i<noutput_items; i++) out[i] = d_r.rotate(in[i]); #else - d_r.rotateN(out, in, noutput_items); + + const uint64_t n_written = nitems_written(0); + + if (d_inc_update_queue.empty()) { + d_r.rotateN(out, in, noutput_items); + } else { + /* If there are phase increment updates scheduled for now, handle the + * rotation in steps and update the phase increment in between. */ + int nprocessed_items = 0; + while (!d_inc_update_queue.empty()) { + auto next_update = d_inc_update_queue.top(); + + if (next_update.offset < (n_written + nprocessed_items)) { + d_inc_update_queue.pop(); + continue; // we didn't process this update on time - drop it + } + + if (next_update.offset >= (n_written + noutput_items)) { + break; // the update is for a future batch of samples + } + + // The update is scheduled for this batch of samples. Apply it now. + d_inc_update_queue.pop(); + + // Process all samples until the scheduled phase increment update + int items_before_update = next_update.offset - n_written - nprocessed_items; + d_r.rotateN(out, in, items_before_update); + nprocessed_items += items_before_update; + + set_phase_inc(next_update.phase_inc); + + if (d_tag_inc_updates) { + add_item_tag(0, + next_update.offset, + pmt::string_to_symbol("rot_phase_inc"), + pmt::from_float(next_update.phase_inc)); + } + } + + // Rotate the remaining samples + d_r.rotateN(out + nprocessed_items, + in + nprocessed_items, + (noutput_items - nprocessed_items)); + } #endif return noutput_items; diff --git a/gr-blocks/lib/rotator_cc_impl.h b/gr-blocks/lib/rotator_cc_impl.h index c1001d11b3..dbdb13a679 100644 --- a/gr-blocks/lib/rotator_cc_impl.h +++ b/gr-blocks/lib/rotator_cc_impl.h @@ -13,10 +13,26 @@ #include <gnuradio/blocks/rotator.h> #include <gnuradio/blocks/rotator_cc.h> +#include <queue> namespace gr { namespace blocks { +struct phase_inc_update_t { + uint64_t offset; + double phase_inc; +}; + +bool cmp_phase_inc_update_offset(phase_inc_update_t lhs, phase_inc_update_t rhs) +{ + return lhs.offset > rhs.offset; +}; + +typedef std::priority_queue<phase_inc_update_t, + std::vector<phase_inc_update_t>, + decltype(&cmp_phase_inc_update_offset)> + phase_inc_queue_t; + /*! * \brief Complex rotator * \ingroup math_blk @@ -25,9 +41,14 @@ class rotator_cc_impl : public rotator_cc { private: rotator d_r; + bool d_tag_inc_updates; + phase_inc_queue_t d_inc_update_queue; + gr::thread::mutex d_mutex; + + void handle_cmd_msg(pmt::pmt_t msg); public: - rotator_cc_impl(double phase_inc = 0.0); + rotator_cc_impl(double phase_inc = 0.0, bool tag_inc_updates = false); ~rotator_cc_impl() override; void set_phase_inc(double phase_inc) override; diff --git a/gr-blocks/python/blocks/bindings/rotator_cc_python.cc b/gr-blocks/python/blocks/bindings/rotator_cc_python.cc index 906a7e2105..08587999bd 100644 --- a/gr-blocks/python/blocks/bindings/rotator_cc_python.cc +++ b/gr-blocks/python/blocks/bindings/rotator_cc_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(rotator_cc.h) */ -/* BINDTOOL_HEADER_FILE_HASH(e127d40b39a00e6725b5f7214ef0e6d2) */ +/* BINDTOOL_HEADER_FILE_HASH(a8a69ee8f26e479fef8ba11414826da8) */ /***********************************************************************************/ #include <pybind11/complex.h> @@ -39,7 +39,10 @@ void bind_rotator_cc(py::module& m) gr::basic_block, std::shared_ptr<rotator_cc>>(m, "rotator_cc", D(rotator_cc)) - .def(py::init(&rotator_cc::make), py::arg("phase_inc") = 0., D(rotator_cc, make)) + .def(py::init(&rotator_cc::make), + py::arg("phase_inc") = 0., + py::arg("tag_inc_updates") = false, + D(rotator_cc, make)) .def("set_phase_inc", diff --git a/gr-blocks/python/blocks/qa_rotator_cc.py b/gr-blocks/python/blocks/qa_rotator_cc.py new file mode 100644 index 0000000000..826bde2e39 --- /dev/null +++ b/gr-blocks/python/blocks/qa_rotator_cc.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python +# +# Copyright 2021 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# + +from gnuradio import gr, gr_unittest +from gnuradio import blocks +import numpy as np +from numpy.random import uniform +import pmt + + +class qa_rotator_cc(gr_unittest.TestCase): + def _setUp(self, n_samples=100, tag_inc_updates=True): + """Base fixture: set up flowgraph and parameters""" + self.n_samples = n_samples # number of IQ samples to generate + self.f_in = uniform(high=0.5) # input frequency + self.f_shift = uniform(high=0.5) - \ + self.f_in # rotator's starting frequency + + # Input IQ samples + in_angles = 2 * np.pi * np.arange(self.n_samples) * self.f_in + in_samples = np.exp(1j * in_angles) + + # Rotator's starting phase increment + phase_inc = 2 * np.pi * self.f_shift + + # Flowgraph: + # + # /-> Vector Sink + # Vector Source -> Rotator --/ + # \ + # \-> Tag Debug + # + self.tb = gr.top_block() + self.source = blocks.vector_source_c(in_samples) + self.rotator_cc = blocks.rotator_cc(phase_inc, tag_inc_updates) + self.sink = blocks.vector_sink_c() + self.tag_sink = blocks.tag_debug(gr.sizeof_gr_complex, "rot_phase_inc", + "rot_phase_inc") + self.tag_sink.set_save_all(True) + self.tb.connect(self.source, self.rotator_cc, self.sink) + self.tb.connect(self.rotator_cc, self.tag_sink) + + def setUp(self): + self._setUp() + + def tearDown(self): + self.tb = None + + def _post_phase_inc_cmd(self, new_phase_inc, offset=None): + """Post phase increment update command to the rotator block""" + cmd = pmt.make_dict() + cmd = pmt.dict_add(cmd, pmt.intern("inc"), + pmt.from_double(new_phase_inc)) + if (offset is not None): + cmd = pmt.dict_add(cmd, pmt.intern("offset"), + pmt.from_uint64(offset)) + self.rotator_cc.insert_tail(pmt.to_pmt("cmd"), cmd) + + def _assert_tags(self, expected_values, expected_offsets): + """Check the tags received by the tag debug block""" + tags = self.tag_sink.current_tags() + expected_tags = list(zip(expected_values, expected_offsets)) + self.assertEqual(len(tags), len(expected_tags)) + + for idx, (val, offset) in enumerate(expected_tags): + self.assertAlmostEqual(pmt.to_double(tags[idx].value), + val, + places=5) + self.assertEqual(tags[idx].offset, offset) + + def test_freq_shift(self): + """Complex sinusoid frequency shift""" + f_out = self.f_in + self.f_shift # expected output frequency + + # Expected IQ samples + expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out + expected_samples = np.exp(1j * expected_angles) + + self.tb.run() + self.assertComplexTuplesAlmostEqual(self.sink.data(), + expected_samples, + places=4) + + def _test_scheduled_phase_inc_update(self): + """Update the phase increment at a chosen offset via command message + + Returns: + Tuple with the new phase increment, the phase increment update + sample offset, and a list with the expected rotated IQ samples. + + """ + new_f_shift = uniform(high=0.5) - self.f_in # rotator's new frequency + new_phase_inc = float(2 * np.pi * new_f_shift) # new phase increment + offset = int(self.n_samples / 2) # when to update the phase increment + f_out_1 = self.f_in + self.f_shift # output frequency before update + f_out_2 = self.f_in + new_f_shift # output frequency after update + + # Post the phase increment command message to the rotator block + self._post_phase_inc_cmd(new_phase_inc, offset) + + # Samples before and after the phase increment update + n_before = offset + n_after = self.n_samples - offset + + # Expected IQ samples + # + # Note: when the phase increment is updated, the assumption is that the + # update occurs at the beginning of the sample period. Hence, the new + # phase rotation pace will only be observed in the following sample. In + # other words, the new phase increment can only be observed after some + # time goes by such that the rotator's phase accumulates. + # + # For example, suppose there are ten samples, and the update occurs on + # offset 5 (i.e., the sixth sample). Suppose also that the initial + # phase increment is 0.1, and the new increment is 0.2. In this case, + # we have the following sequence of angles: + # + # [0, 0.1, 0.2, 0.3, 0.4, 0.5. 0.7, 0.9, 1.1, 1.3] + # | \ + # | \--------\ + # update \ + # applied new phase + # increment observed + # + # Ultimately, this means that the phase seen at the sample where the + # update is applied is still determined by the old phase increment. + angles_before_update = 2 * np.pi * np.arange(n_before + 1) * f_out_1 + angles_after_update = angles_before_update[-1] + ( + 2 * np.pi * np.arange(1, n_after) * f_out_2) + expected_angles = np.concatenate( + (angles_before_update, angles_after_update)) + expected_samples = np.exp(1j * expected_angles) + + return new_phase_inc, offset, expected_samples + + def test_scheduled_phase_inc_update(self): + """Update the phase increment at a chosen offset via command message""" + new_phase_inc, \ + offset, \ + expected_samples = self._test_scheduled_phase_inc_update() + + self.tb.run() + self._assert_tags([new_phase_inc], [offset]) + self.assertComplexTuplesAlmostEqual(self.sink.data(), + expected_samples, + places=4) + + def test_scheduled_phase_inc_update_with_tagging_disabled(self): + """Test a scheduled phase increment update without tagging the update + + Same as test_scheduled_phase_inc_update but with tagging disabled. + + """ + self._setUp(tag_inc_updates=False) + + _, _, expected_samples = self._test_scheduled_phase_inc_update() + + self.tb.run() + tags = self.tag_sink.current_tags() + self.assertEqual(len(tags), 0) + self.assertComplexTuplesAlmostEqual(self.sink.data(), + expected_samples, + places=4) + + def test_immediate_phase_inc_update(self): + """Immediate phase increment update via command message + + In this test, the command message does not include the offset + key. Hence, the rotator should update its phase increment immediately. + + """ + new_f_shift = uniform(high=0.5) - self.f_in # rotator's new frequency + new_phase_inc = float(2 * np.pi * new_f_shift) # new phase increment + f_out = self.f_in + new_f_shift # output frequency after update + + # Post the phase increment command message to the rotator block + self._post_phase_inc_cmd(new_phase_inc) + + # The rotator updates the increment immediately (on the first sample) + expected_tag_offset = 0 + + # Expected samples (all of them with the new frequency set via message) + expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out + expected_samples = np.exp(1j * expected_angles) + + self.tb.run() + self._assert_tags([new_phase_inc], [expected_tag_offset]) + self.assertComplexTuplesAlmostEqual(self.sink.data(), + expected_samples, + places=4) + + def test_zero_change_phase_inc_update(self): + """Schedule a phase increment update that does not change anything + + If the scheduled phase increment update sets the same phase increment + that is already active in the rotator block, there should be no effect + on the output signal. Nevertheless, the rotator should still tag the + update. + + """ + new_phase_inc = 2 * np.pi * self.f_shift # no change + offset = int(self.n_samples / 2) # when to update the phase increment + f_out = self.f_in + self.f_shift # expected output frequency + + # Post the phase increment command message to the rotator block + self._post_phase_inc_cmd(new_phase_inc, offset) + + # Expected IQ samples + expected_angles = 2 * np.pi * np.arange(self.n_samples) * f_out + expected_samples = np.exp(1j * expected_angles) + + self.tb.run() + self._assert_tags([new_phase_inc], [offset]) + self.assertComplexTuplesAlmostEqual(self.sink.data(), + expected_samples, + places=4) + + def test_consecutive_phase_inc_updates(self): + """Test tagging of a few consecutive phase increment updates""" + n_updates = 3 + new_f_shifts = uniform(high=0.5, size=n_updates) # new frequencies + new_phase_incs = 2 * np.pi * new_f_shifts # new phase increments + offsets = self.n_samples * np.arange(1, 4, 1) / 4 # when to update + + for new_phase_inc, offset in zip(new_phase_incs, offsets): + self._post_phase_inc_cmd(new_phase_inc, int(offset)) + + self.tb.run() + self._assert_tags(new_phase_incs, offsets) + + def test_out_of_order_phase_inc_updates(self): + """Test tagging of a few out-of-order phase increment updates + + The rotator should sort the increment updates and apply them in order. + + """ + n_updates = 3 + new_f_shifts = uniform(high=0.5, size=n_updates) # new frequencies + new_phase_incs = 2 * np.pi * new_f_shifts # new phase increments + offsets = self.n_samples * np.arange(1, 4, 1) / 4 # when to update + + # Post the phase increment command messages out of order + self._post_phase_inc_cmd(new_phase_incs[0], int(offsets[0])) + self._post_phase_inc_cmd(new_phase_incs[2], int(offsets[2])) + self._post_phase_inc_cmd(new_phase_incs[1], int(offsets[1])) + + self.tb.run() + + # Confirm they are received in order + self._assert_tags(new_phase_incs, offsets) + + def test_duplicate_phase_inc_updates(self): + """Test multiple phase increment updates scheduled for the same sample + + The rotator block applies all updates scheduled for the same sample + offset. Hence, only the last update shall take effect. + + """ + n_updates = 3 + + # Post the phase increment command messages + new_phase_incs = [] + for i in range(n_updates): + new_phase_inc, \ + offset, \ + expected_samples = self._test_scheduled_phase_inc_update() + new_phase_incs.append(new_phase_inc) + + self.tb.run() + + # All "n_updates" tags are expected to be present + self._assert_tags(new_phase_incs, [offset] * n_updates) + + # However, only the last update takes effect on the rotated samples + self.assertComplexTuplesAlmostEqual(self.sink.data(), + expected_samples, + places=4) + + def test_phase_inc_update_out_of_range(self): + """Test phase increment update sent for an out-of-range offset""" + self._setUp(n_samples=2**16) + n_half_samples = int(self.n_samples / 2) + + # Schedule a phase increment update whose offset is initially + # out-of-range due to being in the future + new_phase_inc = 2 * np.pi * 0.1 + self._post_phase_inc_cmd(new_phase_inc, offset=n_half_samples) + + # Run the flowgraph and wait until the rotator block does some work + self.tb.start() + while (self.rotator_cc.nitems_written(0) == 0): + pass + + # The out-of-range update (scheduled for the future) should not have + # been applied yet, but it should not have been discarded either. + self._assert_tags([], []) + + # Wait until at least the first half of samples have been processed + while (self.rotator_cc.nitems_written(0) < n_half_samples): + pass + + # Now, schedule an update that is out-of-range due to being in the past + self._post_phase_inc_cmd(new_phase_inc, offset=0) + + # And run the flowgraph to completion + self.tb.wait() + + # The increment update initially scheduled for the future should have + # been applied when processing the second half of samples. In contrast, + # the update scheduled for the past should have been discarded. + self._assert_tags([new_phase_inc], [n_half_samples]) + + +if __name__ == '__main__': + gr_unittest.run(qa_rotator_cc) |