diff options
Diffstat (limited to 'gnuradio-runtime/lib/tpb_thread_body.cc')
-rw-r--r-- | gnuradio-runtime/lib/tpb_thread_body.cc | 92 |
1 files changed, 24 insertions, 68 deletions
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc index 93591feee2..59f1d3162e 100644 --- a/gnuradio-runtime/lib/tpb_thread_body.cc +++ b/gnuradio-runtime/lib/tpb_thread_body.cc @@ -55,8 +55,6 @@ namespace gr { size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100)); // Setup the logger for the scheduler -#ifdef ENABLE_GR_LOG -#ifdef HAVE_LOG4CPP #undef LOG std::string config_file = p->get_string("LOG", "log_config", ""); std::string log_level = p->get_string("LOG", "log_level", "off"); @@ -75,9 +73,6 @@ namespace gr { GR_LOG_SET_FILE_APPENDER(LOG, log_file , true,"%r :%p: %c{1} - %m%n"); } } -#endif /* HAVE_LOG4CPP */ -#endif /* ENABLE_GR_LOG */ - // Set thread affinity if it was set before fg was started. if(block->processor_affinity().size() > 0) { @@ -93,9 +88,10 @@ namespace gr { block->clear_finished(); while(1) { - tpb_loop_top: boost::this_thread::interruption_point(); + d->d_tpb.clear_changed(); + // handle any queued up messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { // Check if we have a message handler attached before getting @@ -116,97 +112,57 @@ namespace gr { } } - d->d_tpb.clear_changed(); // run one iteration if we are a connected stream block if(d->noutputs() >0 || d->ninputs()>0){ s = d_exec.run_one_iteration(); } else { s = block_executor::BLKD_IN; + // a msg port only block wants to shutdown + if(block->finished()) { + s = block_executor::DONE; + } } - // if msg ports think we are done, we are done - if(block->finished()) + if(block->finished() && s == block_executor::READY_NO_OUTPUT) { s = block_executor::DONE; + d->set_done(true); + } + + if(!d->ninputs() && s == block_executor::READY_NO_OUTPUT) { + s = block_executor::BLKD_IN; + } switch(s){ - case block_executor::READY: // Tell neighbors we made progress. + case block_executor::READY: // Tell neighbors we made progress. d->d_tpb.notify_neighbors(d); break; - case block_executor::READY_NO_OUTPUT: // Notify upstream only + case block_executor::READY_NO_OUTPUT: // Notify upstream only d->d_tpb.notify_upstream(d); break; - case block_executor::DONE: // Game over. + case block_executor::DONE: // Game over. block->notify_msg_neighbors(); d->d_tpb.notify_neighbors(d); return; - case block_executor::BLKD_IN: // Wait for input. + case block_executor::BLKD_IN: // Wait for input. { gr::thread::scoped_lock guard(d->d_tpb.mutex); - while(!d->d_tpb.input_changed) { - - // wait for input or message - while(!d->d_tpb.input_changed && block->empty_handled_p()){ - boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250); - if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){ - goto tpb_loop_top; // timeout occurred (perform sanity checks up top) - } - } - // handle all pending messages - BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - if(block->has_msg_handler(i.first)) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); - } - } - else { - // leave msg in queue if no handler is defined - // start dropping if we have too many - if(block->nmsgs(i.first) > max_nmsgs){ - GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message"); - msg = block->delete_head_nowait(i.first); - } - } - } - if (d->done()) { - return; - } + if(!d->d_tpb.input_changed) { + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250); + d->d_tpb.input_cond.timed_wait(guard, timeout); } } break; - case block_executor::BLKD_OUT: // Wait for output buffer space. + case block_executor::BLKD_OUT: // Wait for output buffer space. { - gr::thread::scoped_lock guard(d->d_tpb.mutex); - while(!d->d_tpb.output_changed) { - // wait for output room or message - while(!d->d_tpb.output_changed && block->empty_handled_p()) - d->d_tpb.output_cond.wait(guard); - - // handle all pending messages - BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - if(block->has_msg_handler(i.first)) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); - } - } - else { - // leave msg in queue if no handler is defined - // start dropping if we have too many - if(block->nmsgs(i.first) > max_nmsgs){ - GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message"); - msg = block->delete_head_nowait(i.first); - } - } - } + gr::thread::scoped_lock guard(d->d_tpb.mutex); + while(!d->d_tpb.output_changed) { + d->d_tpb.output_cond.wait(guard); } } break; |