summaryrefslogtreecommitdiff
path: root/gr-blocks/lib
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/lib')
-rw-r--r--gr-blocks/lib/rotator_cc_impl.cc92
-rw-r--r--gr-blocks/lib/rotator_cc_impl.h23
2 files changed, 109 insertions, 6 deletions
diff --git a/gr-blocks/lib/rotator_cc_impl.cc b/gr-blocks/lib/rotator_cc_impl.cc
index d7d9fa92cf..6d7d8ac9b9 100644
--- a/gr-blocks/lib/rotator_cc_impl.cc
+++ b/gr-blocks/lib/rotator_cc_impl.cc
@@ -20,17 +20,23 @@
namespace gr {
namespace blocks {
-rotator_cc::sptr rotator_cc::make(double phase_inc)
+rotator_cc::sptr rotator_cc::make(double phase_inc, bool tag_inc_updates)
{
- return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc);
+ return gnuradio::make_block_sptr<rotator_cc_impl>(phase_inc, tag_inc_updates);
}
-rotator_cc_impl::rotator_cc_impl(double phase_inc)
+rotator_cc_impl::rotator_cc_impl(double phase_inc, bool tag_inc_updates)
: sync_block("rotator_cc",
io_signature::make(1, 1, sizeof(gr_complex)),
- io_signature::make(1, 1, sizeof(gr_complex)))
+ io_signature::make(1, 1, sizeof(gr_complex))),
+ d_tag_inc_updates(tag_inc_updates),
+ d_inc_update_queue(cmp_phase_inc_update_offset)
{
set_phase_inc(phase_inc);
+
+ const pmt::pmt_t port_id = pmt::mp("cmd");
+ message_port_register_in(port_id);
+ set_msg_handler(port_id, [this](pmt::pmt_t msg) { this->handle_cmd_msg(msg); });
}
rotator_cc_impl::~rotator_cc_impl() {}
@@ -40,10 +46,43 @@ void rotator_cc_impl::set_phase_inc(double phase_inc)
d_r.set_phase_incr(exp(gr_complex(0, phase_inc)));
}
+void rotator_cc_impl::handle_cmd_msg(pmt::pmt_t msg)
+{
+ gr::thread::scoped_lock l(d_mutex);
+
+ static const pmt::pmt_t inc_key = pmt::intern("inc");
+ static const pmt::pmt_t offset_key = pmt::intern("offset");
+
+ if (!pmt::is_dict(msg)) {
+ throw std::runtime_error("rotator_cc: Rotator command message "
+ "must be a PMT dictionary");
+ }
+
+ bool handled = false;
+
+ if (pmt::dict_has_key(msg, inc_key)) {
+ /* Prepare to update the phase increment on a specific absolute sample
+ * offset. If the offset is not defined in the command message,
+ * configure the phase increment update to occur immediately. */
+ phase_inc_update_t update{};
+ update.phase_inc = pmt::to_double(pmt::dict_ref(msg, inc_key, pmt::PMT_NIL));
+ update.offset = pmt::dict_has_key(msg, offset_key)
+ ? pmt::to_uint64(pmt::dict_ref(msg, offset_key, pmt::PMT_NIL))
+ : nitems_written(0);
+ d_inc_update_queue.push(update);
+ handled = true;
+ }
+
+ if (!handled) {
+ throw std::runtime_error("rotator_cc: Unsupported command message");
+ }
+}
+
int rotator_cc_impl::work(int noutput_items,
gr_vector_const_void_star& input_items,
gr_vector_void_star& output_items)
{
+ gr::thread::scoped_lock l(d_mutex);
const gr_complex* in = (const gr_complex*)input_items[0];
gr_complex* out = (gr_complex*)output_items[0];
@@ -51,7 +90,50 @@ int rotator_cc_impl::work(int noutput_items,
for (int i=0; i<noutput_items; i++)
out[i] = d_r.rotate(in[i]);
#else
- d_r.rotateN(out, in, noutput_items);
+
+ const uint64_t n_written = nitems_written(0);
+
+ if (d_inc_update_queue.empty()) {
+ d_r.rotateN(out, in, noutput_items);
+ } else {
+ /* If there are phase increment updates scheduled for now, handle the
+ * rotation in steps and update the phase increment in between. */
+ int nprocessed_items = 0;
+ while (!d_inc_update_queue.empty()) {
+ auto next_update = d_inc_update_queue.top();
+
+ if (next_update.offset < (n_written + nprocessed_items)) {
+ d_inc_update_queue.pop();
+ continue; // we didn't process this update on time - drop it
+ }
+
+ if (next_update.offset >= (n_written + noutput_items)) {
+ break; // the update is for a future batch of samples
+ }
+
+ // The update is scheduled for this batch of samples. Apply it now.
+ d_inc_update_queue.pop();
+
+ // Process all samples until the scheduled phase increment update
+ int items_before_update = next_update.offset - n_written - nprocessed_items;
+ d_r.rotateN(out, in, items_before_update);
+ nprocessed_items += items_before_update;
+
+ set_phase_inc(next_update.phase_inc);
+
+ if (d_tag_inc_updates) {
+ add_item_tag(0,
+ next_update.offset,
+ pmt::string_to_symbol("rot_phase_inc"),
+ pmt::from_float(next_update.phase_inc));
+ }
+ }
+
+ // Rotate the remaining samples
+ d_r.rotateN(out + nprocessed_items,
+ in + nprocessed_items,
+ (noutput_items - nprocessed_items));
+ }
#endif
return noutput_items;
diff --git a/gr-blocks/lib/rotator_cc_impl.h b/gr-blocks/lib/rotator_cc_impl.h
index c1001d11b3..dbdb13a679 100644
--- a/gr-blocks/lib/rotator_cc_impl.h
+++ b/gr-blocks/lib/rotator_cc_impl.h
@@ -13,10 +13,26 @@
#include <gnuradio/blocks/rotator.h>
#include <gnuradio/blocks/rotator_cc.h>
+#include <queue>
namespace gr {
namespace blocks {
+struct phase_inc_update_t {
+ uint64_t offset;
+ double phase_inc;
+};
+
+bool cmp_phase_inc_update_offset(phase_inc_update_t lhs, phase_inc_update_t rhs)
+{
+ return lhs.offset > rhs.offset;
+};
+
+typedef std::priority_queue<phase_inc_update_t,
+ std::vector<phase_inc_update_t>,
+ decltype(&cmp_phase_inc_update_offset)>
+ phase_inc_queue_t;
+
/*!
* \brief Complex rotator
* \ingroup math_blk
@@ -25,9 +41,14 @@ class rotator_cc_impl : public rotator_cc
{
private:
rotator d_r;
+ bool d_tag_inc_updates;
+ phase_inc_queue_t d_inc_update_queue;
+ gr::thread::mutex d_mutex;
+
+ void handle_cmd_msg(pmt::pmt_t msg);
public:
- rotator_cc_impl(double phase_inc = 0.0);
+ rotator_cc_impl(double phase_inc = 0.0, bool tag_inc_updates = false);
~rotator_cc_impl() override;
void set_phase_inc(double phase_inc) override;