diff options
-rw-r--r-- | gnuradio-runtime/include/gnuradio/block.h | 19 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block.cc | 57 | ||||
-rw-r--r-- | gnuradio-runtime/lib/tpb_thread_body.cc | 17 |
3 files changed, 91 insertions, 2 deletions
diff --git a/gnuradio-runtime/include/gnuradio/block.h b/gnuradio-runtime/include/gnuradio/block.h index 7390e93bc0..1a978f9f0f 100644 --- a/gnuradio-runtime/include/gnuradio/block.h +++ b/gnuradio-runtime/include/gnuradio/block.h @@ -565,6 +565,15 @@ namespace gr { // ---------------------------------------------------------------------------- + /*! + * \breif the system message handler + */ + void system_handler(pmt::pmt_t msg); + + /*! + * \brief returns true when execution has completed due to a message connection + bool finished(); + private: int d_output_multiple; bool d_output_multiple_set; @@ -583,6 +592,7 @@ namespace gr { int d_priority; // thread priority level bool d_pc_rpc_set; bool d_update_rate; // should sched update rel rate? + bool d_finished; // true if msg ports think we are finished protected: block(void) {} // allows pure virtual interface sub-classes @@ -766,6 +776,15 @@ namespace gr { public: block_detail_sptr detail() const { return d_detail; } void set_detail(block_detail_sptr detail) { d_detail = detail; } + + /*! \brief Tell msg neighbors we are finished + */ + void notify_msg_neighbors(); + + /*! \brief Make sure we dont think we are finished + */ + void clear_finished(){ d_finished = false; } + }; typedef std::vector<block_sptr> block_vector_t; diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index bdf484e8db..46cbc9858a 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -59,6 +59,8 @@ namespace gr { d_min_output_buffer(std::max(output_signature->max_streams(),1), -1) { global_block_registry.register_primitive(alias(), this); + message_port_register_in(pmt::mp("system")); + set_msg_handler(pmt::mp("system"), boost::bind(&block::system_handler, this, _1)); #ifdef ENABLE_GR_LOG #ifdef HAVE_LOG4CPP @@ -734,6 +736,61 @@ namespace gr { } } + + void + block::system_handler(pmt::pmt_t msg) + { + //std::cout << "system_handler " << msg << "\n"; + pmt::pmt_t op = pmt::car(msg); + if(pmt::eqv(op, pmt::mp("done"))){ + d_finished = pmt::to_long(pmt::cdr(msg)); + global_block_registry.notify_blk(alias()); + } else { + std::cout << "WARNING: bad message op on system port!\n"; + pmt::print(msg); + } + } + + void + block::notify_msg_neighbors() + { + size_t len = pmt::length(d_message_subscribers); + pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL); + pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers); + for(size_t i = 0; i < len; i++) { + // for each output port + pmt::pmt_t oport = pmt::nth(i,keys); + + // for each subscriber on this port + pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, oport, pmt::PMT_NIL); + + // iterate through subscribers on port + while(pmt::is_pair(currlist)) { + pmt::pmt_t target = pmt::car(currlist); + + pmt::pmt_t block = pmt::car(target); + pmt::pmt_t port = pmt::mp("system"); + + currlist = pmt::cdr(currlist); + basic_block_sptr blk = global_block_registry.block_lookup(block); + blk->post(port, pmt::cons(pmt::mp("done"), pmt::mp(true))); + + //std::cout << "notify finished --> "; + //pmt::print(pmt::cons(block,port)); + //std::cout << "\n"; + + } + } + } + + bool + block::finished() + { + return d_finished; + } + + + void block::setup_pc_rpc() { diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc index 79abd0e61d..eb47a43ee3 100644 --- a/gnuradio-runtime/lib/tpb_thread_body.cc +++ b/gnuradio-runtime/lib/tpb_thread_body.cc @@ -83,8 +83,12 @@ namespace gr { if(block->thread_priority() > 0) { gr::thread::set_thread_priority(d->thread, block->thread_priority()); } + + // make sure our block isnt finished + block->clear_finished(); while(1) { + tpb_loop_top: boost::this_thread::interruption_point(); // handle any queued up messages @@ -116,6 +120,10 @@ namespace gr { s = block_executor::BLKD_IN; } + // if msg ports think we are done, we are done + if(block->finished()) + s = block_executor::DONE; + switch(s){ case block_executor::READY: // Tell neighbors we made progress. d->d_tpb.notify_neighbors(d); @@ -126,6 +134,7 @@ namespace gr { break; case block_executor::DONE: // Game over. + block->notify_msg_neighbors(); d->d_tpb.notify_neighbors(d); return; @@ -135,8 +144,12 @@ namespace gr { while(!d->d_tpb.input_changed) { // wait for input or message - while(!d->d_tpb.input_changed && block->empty_handled_p()) - d->d_tpb.input_cond.wait(guard); + while(!d->d_tpb.input_changed && block->empty_handled_p()){ + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250); + if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){ + goto tpb_loop_top; // timeout occured (perform sanity checks up top) + } + } // handle all pending messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { |