diff options
Diffstat (limited to 'gr-blocks/lib')
-rw-r--r-- | gr-blocks/lib/rotator_cc_impl.cc | 92 | ||||
-rw-r--r-- | gr-blocks/lib/rotator_cc_impl.h | 23 |
2 files changed, 109 insertions, 6 deletions
diff --git a/gr-blocks/lib/rotator_cc_impl.cc b/gr-blocks/lib/rotator_cc_impl.cc index d7d9fa92cf..6d7d8ac9b9 100644 --- a/gr-blocks/lib/rotator_cc_impl.cc +++ b/gr-blocks/lib/rotator_cc_impl.cc @@ -20,17 +20,23 @@ namespace gr { namespace blocks { -rotator_cc::sptr rotator_cc::make(double phase_inc) +rotator_cc::sptr rotator_cc::make(double phase_inc, bool tag_inc_updates) { - return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc); + return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc, tag_inc_updates); } -rotator_cc_impl::rotator_cc_impl(double phase_inc) +rotator_cc_impl::rotator_cc_impl(double phase_inc, bool tag_inc_updates) : sync_block("rotator_cc", io_signature::make(1, 1, sizeof(gr_complex)), - io_signature::make(1, 1, sizeof(gr_complex))) + io_signature::make(1, 1, sizeof(gr_complex))), + d_tag_inc_updates(tag_inc_updates), + d_inc_update_queue(cmp_phase_inc_update_offset) { set_phase_inc(phase_inc); + + const pmt::pmt_t port_id = pmt::mp("cmd"); + message_port_register_in(port_id); + set_msg_handler(port_id, [this](pmt::pmt_t msg) { this->handle_cmd_msg(msg); }); } rotator_cc_impl::~rotator_cc_impl() {} @@ -40,10 +46,43 @@ void rotator_cc_impl::set_phase_inc(double phase_inc) d_r.set_phase_incr(exp(gr_complex(0, phase_inc))); } +void rotator_cc_impl::handle_cmd_msg(pmt::pmt_t msg) +{ + gr::thread::scoped_lock l(d_mutex); + + static const pmt::pmt_t inc_key = pmt::intern("inc"); + static const pmt::pmt_t offset_key = pmt::intern("offset"); + + if (!pmt::is_dict(msg)) { + throw std::runtime_error("rotator_cc: Rotator command message " + "must be a PMT dictionary"); + } + + bool handled = false; + + if (pmt::dict_has_key(msg, inc_key)) { + /* Prepare to update the phase increment on a specific absolute sample + * offset. If the offset is not defined in the command message, + * configure the phase increment update to occur immediately. */ + phase_inc_update_t update{}; + update.phase_inc = pmt::to_double(pmt::dict_ref(msg, inc_key, pmt::PMT_NIL)); + update.offset = pmt::dict_has_key(msg, offset_key) + ? pmt::to_uint64(pmt::dict_ref(msg, offset_key, pmt::PMT_NIL)) + : nitems_written(0); + d_inc_update_queue.push(update); + handled = true; + } + + if (!handled) { + throw std::runtime_error("rotator_cc: Unsupported command message"); + } +} + int rotator_cc_impl::work(int noutput_items, gr_vector_const_void_star& input_items, gr_vector_void_star& output_items) { + gr::thread::scoped_lock l(d_mutex); const gr_complex* in = (const gr_complex*)input_items[0]; gr_complex* out = (gr_complex*)output_items[0]; @@ -51,7 +90,50 @@ int rotator_cc_impl::work(int noutput_items, for (int i=0; i<noutput_items; i++) out[i] = d_r.rotate(in[i]); #else - d_r.rotateN(out, in, noutput_items); + + const uint64_t n_written = nitems_written(0); + + if (d_inc_update_queue.empty()) { + d_r.rotateN(out, in, noutput_items); + } else { + /* If there are phase increment updates scheduled for now, handle the + * rotation in steps and update the phase increment in between. */ + int nprocessed_items = 0; + while (!d_inc_update_queue.empty()) { + auto next_update = d_inc_update_queue.top(); + + if (next_update.offset < (n_written + nprocessed_items)) { + d_inc_update_queue.pop(); + continue; // we didn't process this update on time - drop it + } + + if (next_update.offset >= (n_written + noutput_items)) { + break; // the update is for a future batch of samples + } + + // The update is scheduled for this batch of samples. Apply it now. + d_inc_update_queue.pop(); + + // Process all samples until the scheduled phase increment update + int items_before_update = next_update.offset - n_written - nprocessed_items; + d_r.rotateN(out, in, items_before_update); + nprocessed_items += items_before_update; + + set_phase_inc(next_update.phase_inc); + + if (d_tag_inc_updates) { + add_item_tag(0, + next_update.offset, + pmt::string_to_symbol("rot_phase_inc"), + pmt::from_float(next_update.phase_inc)); + } + } + + // Rotate the remaining samples + d_r.rotateN(out + nprocessed_items, + in + nprocessed_items, + (noutput_items - nprocessed_items)); + } #endif return noutput_items; diff --git a/gr-blocks/lib/rotator_cc_impl.h b/gr-blocks/lib/rotator_cc_impl.h index c1001d11b3..dbdb13a679 100644 --- a/gr-blocks/lib/rotator_cc_impl.h +++ b/gr-blocks/lib/rotator_cc_impl.h @@ -13,10 +13,26 @@ #include <gnuradio/blocks/rotator.h> #include <gnuradio/blocks/rotator_cc.h> +#include <queue> namespace gr { namespace blocks { +struct phase_inc_update_t { + uint64_t offset; + double phase_inc; +}; + +bool cmp_phase_inc_update_offset(phase_inc_update_t lhs, phase_inc_update_t rhs) +{ + return lhs.offset > rhs.offset; +}; + +typedef std::priority_queue<phase_inc_update_t, + std::vector<phase_inc_update_t>, + decltype(&cmp_phase_inc_update_offset)> + phase_inc_queue_t; + /*! * \brief Complex rotator * \ingroup math_blk @@ -25,9 +41,14 @@ class rotator_cc_impl : public rotator_cc { private: rotator d_r; + bool d_tag_inc_updates; + phase_inc_queue_t d_inc_update_queue; + gr::thread::mutex d_mutex; + + void handle_cmd_msg(pmt::pmt_t msg); public: - rotator_cc_impl(double phase_inc = 0.0); + rotator_cc_impl(double phase_inc = 0.0, bool tag_inc_updates = false); ~rotator_cc_impl() override; void set_phase_inc(double phase_inc) override; |