summaryrefslogtreecommitdiff
path: root/gr-blocks/lib/rotator_cc_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/lib/rotator_cc_impl.cc')
-rw-r--r--gr-blocks/lib/rotator_cc_impl.cc92
1 files changed, 87 insertions, 5 deletions
diff --git a/gr-blocks/lib/rotator_cc_impl.cc b/gr-blocks/lib/rotator_cc_impl.cc
index d7d9fa92cf..6d7d8ac9b9 100644
--- a/gr-blocks/lib/rotator_cc_impl.cc
+++ b/gr-blocks/lib/rotator_cc_impl.cc
@@ -20,17 +20,23 @@
namespace gr {
namespace blocks {
-rotator_cc::sptr rotator_cc::make(double phase_inc)
+rotator_cc::sptr rotator_cc::make(double phase_inc, bool tag_inc_updates)
{
- return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc);
+ return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc, tag_inc_updates);
}
-rotator_cc_impl::rotator_cc_impl(double phase_inc)
+rotator_cc_impl::rotator_cc_impl(double phase_inc, bool tag_inc_updates)
: sync_block("rotator_cc",
io_signature::make(1, 1, sizeof(gr_complex)),
- io_signature::make(1, 1, sizeof(gr_complex)))
+ io_signature::make(1, 1, sizeof(gr_complex))),
+ d_tag_inc_updates(tag_inc_updates),
+ d_inc_update_queue(cmp_phase_inc_update_offset)
{
set_phase_inc(phase_inc);
+
+ const pmt::pmt_t port_id = pmt::mp("cmd");
+ message_port_register_in(port_id);
+ set_msg_handler(port_id, [this](pmt::pmt_t msg) { this->handle_cmd_msg(msg); });
}
rotator_cc_impl::~rotator_cc_impl() {}
@@ -40,10 +46,43 @@ void rotator_cc_impl::set_phase_inc(double phase_inc)
d_r.set_phase_incr(exp(gr_complex(0, phase_inc)));
}
+void rotator_cc_impl::handle_cmd_msg(pmt::pmt_t msg)
+{
+ gr::thread::scoped_lock l(d_mutex);
+
+ static const pmt::pmt_t inc_key = pmt::intern("inc");
+ static const pmt::pmt_t offset_key = pmt::intern("offset");
+
+ if (!pmt::is_dict(msg)) {
+ throw std::runtime_error("rotator_cc: Rotator command message "
+ "must be a PMT dictionary");
+ }
+
+ bool handled = false;
+
+ if (pmt::dict_has_key(msg, inc_key)) {
+ /* Prepare to update the phase increment on a specific absolute sample
+ * offset. If the offset is not defined in the command message,
+ * configure the phase increment update to occur immediately. */
+ phase_inc_update_t update{};
+ update.phase_inc = pmt::to_double(pmt::dict_ref(msg, inc_key, pmt::PMT_NIL));
+ update.offset = pmt::dict_has_key(msg, offset_key)
+ ? pmt::to_uint64(pmt::dict_ref(msg, offset_key, pmt::PMT_NIL))
+ : nitems_written(0);
+ d_inc_update_queue.push(update);
+ handled = true;
+ }
+
+ if (!handled) {
+ throw std::runtime_error("rotator_cc: Unsupported command message");
+ }
+}
+
int rotator_cc_impl::work(int noutput_items,
gr_vector_const_void_star& input_items,
gr_vector_void_star& output_items)
{
+ gr::thread::scoped_lock l(d_mutex);
const gr_complex* in = (const gr_complex*)input_items[0];
gr_complex* out = (gr_complex*)output_items[0];
@@ -51,7 +90,50 @@ int rotator_cc_impl::work(int noutput_items,
for (int i=0; i<noutput_items; i++)
out[i] = d_r.rotate(in[i]);
#else
- d_r.rotateN(out, in, noutput_items);
+
+ const uint64_t n_written = nitems_written(0);
+
+ if (d_inc_update_queue.empty()) {
+ d_r.rotateN(out, in, noutput_items);
+ } else {
+ /* If there are phase increment updates scheduled for now, handle the
+ * rotation in steps and update the phase increment in between. */
+ int nprocessed_items = 0;
+ while (!d_inc_update_queue.empty()) {
+ auto next_update = d_inc_update_queue.top();
+
+ if (next_update.offset < (n_written + nprocessed_items)) {
+ d_inc_update_queue.pop();
+ continue; // we didn't process this update on time - drop it
+ }
+
+ if (next_update.offset >= (n_written + noutput_items)) {
+ break; // the update is for a future batch of samples
+ }
+
+ // The update is scheduled for this batch of samples. Apply it now.
+ d_inc_update_queue.pop();
+
+ // Process all samples until the scheduled phase increment update
+ int items_before_update = next_update.offset - n_written - nprocessed_items;
+ d_r.rotateN(out, in, items_before_update);
+ nprocessed_items += items_before_update;
+
+ set_phase_inc(next_update.phase_inc);
+
+ if (d_tag_inc_updates) {
+ add_item_tag(0,
+ next_update.offset,
+ pmt::string_to_symbol("rot_phase_inc"),
+ pmt::from_float(next_update.phase_inc));
+ }
+ }
+
+ // Rotate the remaining samples
+ d_r.rotateN(out + nprocessed_items,
+ in + nprocessed_items,
+ (noutput_items - nprocessed_items));
+ }
#endif
return noutput_items;