summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/tpb_thread_body.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/tpb_thread_body.cc')
-rw-r--r--gnuradio-runtime/lib/tpb_thread_body.cc92
1 files changed, 24 insertions, 68 deletions
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc
index 93591feee2..59f1d3162e 100644
--- a/gnuradio-runtime/lib/tpb_thread_body.cc
+++ b/gnuradio-runtime/lib/tpb_thread_body.cc
@@ -55,8 +55,6 @@ namespace gr {
size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100));
// Setup the logger for the scheduler
-#ifdef ENABLE_GR_LOG
-#ifdef HAVE_LOG4CPP
#undef LOG
std::string config_file = p->get_string("LOG", "log_config", "");
std::string log_level = p->get_string("LOG", "log_level", "off");
@@ -75,9 +73,6 @@ namespace gr {
GR_LOG_SET_FILE_APPENDER(LOG, log_file , true,"%r :%p: %c{1} - %m%n");
}
}
-#endif /* HAVE_LOG4CPP */
-#endif /* ENABLE_GR_LOG */
-
// Set thread affinity if it was set before fg was started.
if(block->processor_affinity().size() > 0) {
@@ -93,9 +88,10 @@ namespace gr {
block->clear_finished();
while(1) {
- tpb_loop_top:
boost::this_thread::interruption_point();
+ d->d_tpb.clear_changed();
+
// handle any queued up messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
// Check if we have a message handler attached before getting
@@ -116,97 +112,57 @@ namespace gr {
}
}
- d->d_tpb.clear_changed();
// run one iteration if we are a connected stream block
if(d->noutputs() >0 || d->ninputs()>0){
s = d_exec.run_one_iteration();
}
else {
s = block_executor::BLKD_IN;
+ // a msg port only block wants to shutdown
+ if(block->finished()) {
+ s = block_executor::DONE;
+ }
}
- // if msg ports think we are done, we are done
- if(block->finished())
+ if(block->finished() && s == block_executor::READY_NO_OUTPUT) {
s = block_executor::DONE;
+ d->set_done(true);
+ }
+
+ if(!d->ninputs() && s == block_executor::READY_NO_OUTPUT) {
+ s = block_executor::BLKD_IN;
+ }
switch(s){
- case block_executor::READY: // Tell neighbors we made progress.
+ case block_executor::READY: // Tell neighbors we made progress.
d->d_tpb.notify_neighbors(d);
break;
- case block_executor::READY_NO_OUTPUT: // Notify upstream only
+ case block_executor::READY_NO_OUTPUT: // Notify upstream only
d->d_tpb.notify_upstream(d);
break;
- case block_executor::DONE: // Game over.
+ case block_executor::DONE: // Game over.
block->notify_msg_neighbors();
d->d_tpb.notify_neighbors(d);
return;
- case block_executor::BLKD_IN: // Wait for input.
+ case block_executor::BLKD_IN: // Wait for input.
{
gr::thread::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.input_changed) {
-
- // wait for input or message
- while(!d->d_tpb.input_changed && block->empty_handled_p()){
- boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250);
- if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){
- goto tpb_loop_top; // timeout occurred (perform sanity checks up top)
- }
- }
- // handle all pending messages
- BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- if(block->has_msg_handler(i.first)) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first, msg);
- guard.lock();
- }
- }
- else {
- // leave msg in queue if no handler is defined
- // start dropping if we have too many
- if(block->nmsgs(i.first) > max_nmsgs){
- GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message");
- msg = block->delete_head_nowait(i.first);
- }
- }
- }
- if (d->done()) {
- return;
- }
+ if(!d->d_tpb.input_changed) {
+ boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250);
+ d->d_tpb.input_cond.timed_wait(guard, timeout);
}
}
break;
- case block_executor::BLKD_OUT: // Wait for output buffer space.
+ case block_executor::BLKD_OUT: // Wait for output buffer space.
{
- gr::thread::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.output_changed) {
- // wait for output room or message
- while(!d->d_tpb.output_changed && block->empty_handled_p())
- d->d_tpb.output_cond.wait(guard);
-
- // handle all pending messages
- BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- if(block->has_msg_handler(i.first)) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first, msg);
- guard.lock();
- }
- }
- else {
- // leave msg in queue if no handler is defined
- // start dropping if we have too many
- if(block->nmsgs(i.first) > max_nmsgs){
- GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message");
- msg = block->delete_head_nowait(i.first);
- }
- }
- }
+ gr::thread::scoped_lock guard(d->d_tpb.mutex);
+ while(!d->d_tpb.output_changed) {
+ d->d_tpb.output_cond.wait(guard);
}
}
break;