diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2014-04-23 21:33:15 -0400 |
---|---|---|
committer | Tim O'Shea <tim.oshea753@gmail.com> | 2014-04-23 21:33:15 -0400 |
commit | 93494c1c17d6751e94fa8a1a5479d2c701750baf (patch) | |
tree | 73123830b2bb916526d3feda669b961e21b3cd03 /gnuradio-runtime/lib | |
parent | 3831dd37c8df19e25fa258db4d393ee068889dae (diff) |
runtime: fix propagation of DONE state to message blocks
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r-- | gnuradio-runtime/lib/block.cc | 57 | ||||
-rw-r--r-- | gnuradio-runtime/lib/tpb_thread_body.cc | 17 |
2 files changed, 72 insertions, 2 deletions
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index bdf484e8db..46cbc9858a 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -59,6 +59,8 @@ namespace gr { d_min_output_buffer(std::max(output_signature->max_streams(),1), -1) { global_block_registry.register_primitive(alias(), this); + message_port_register_in(pmt::mp("system")); + set_msg_handler(pmt::mp("system"), boost::bind(&block::system_handler, this, _1)); #ifdef ENABLE_GR_LOG #ifdef HAVE_LOG4CPP @@ -734,6 +736,61 @@ namespace gr { } } + + void + block::system_handler(pmt::pmt_t msg) + { + //std::cout << "system_handler " << msg << "\n"; + pmt::pmt_t op = pmt::car(msg); + if(pmt::eqv(op, pmt::mp("done"))){ + d_finished = pmt::to_long(pmt::cdr(msg)); + global_block_registry.notify_blk(alias()); + } else { + std::cout << "WARNING: bad message op on system port!\n"; + pmt::print(msg); + } + } + + void + block::notify_msg_neighbors() + { + size_t len = pmt::length(d_message_subscribers); + pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL); + pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers); + for(size_t i = 0; i < len; i++) { + // for each output port + pmt::pmt_t oport = pmt::nth(i,keys); + + // for each subscriber on this port + pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, oport, pmt::PMT_NIL); + + // iterate through subscribers on port + while(pmt::is_pair(currlist)) { + pmt::pmt_t target = pmt::car(currlist); + + pmt::pmt_t block = pmt::car(target); + pmt::pmt_t port = pmt::mp("system"); + + currlist = pmt::cdr(currlist); + basic_block_sptr blk = global_block_registry.block_lookup(block); + blk->post(port, pmt::cons(pmt::mp("done"), pmt::mp(true))); + + //std::cout << "notify finished --> "; + //pmt::print(pmt::cons(block,port)); + //std::cout << "\n"; + + } + } + } + + bool + block::finished() + { + return d_finished; + } + + + void block::setup_pc_rpc() { diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc index 79abd0e61d..eb47a43ee3 100644 --- a/gnuradio-runtime/lib/tpb_thread_body.cc +++ b/gnuradio-runtime/lib/tpb_thread_body.cc @@ -83,8 +83,12 @@ namespace gr { if(block->thread_priority() > 0) { gr::thread::set_thread_priority(d->thread, block->thread_priority()); } + + // make sure our block isnt finished + block->clear_finished(); while(1) { + tpb_loop_top: boost::this_thread::interruption_point(); // handle any queued up messages @@ -116,6 +120,10 @@ namespace gr { s = block_executor::BLKD_IN; } + // if msg ports think we are done, we are done + if(block->finished()) + s = block_executor::DONE; + switch(s){ case block_executor::READY: // Tell neighbors we made progress. d->d_tpb.notify_neighbors(d); @@ -126,6 +134,7 @@ namespace gr { break; case block_executor::DONE: // Game over. + block->notify_msg_neighbors(); d->d_tpb.notify_neighbors(d); return; @@ -135,8 +144,12 @@ namespace gr { while(!d->d_tpb.input_changed) { // wait for input or message - while(!d->d_tpb.input_changed && block->empty_handled_p()) - d->d_tpb.input_cond.wait(guard); + while(!d->d_tpb.input_changed && block->empty_handled_p()){ + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250); + if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){ + goto tpb_loop_top; // timeout occured (perform sanity checks up top) + } + } // handle all pending messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { |