summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-04-23 21:33:15 -0400
committerTim O'Shea <tim.oshea753@gmail.com>2014-04-23 21:33:15 -0400
commit93494c1c17d6751e94fa8a1a5479d2c701750baf (patch)
tree73123830b2bb916526d3feda669b961e21b3cd03 /gnuradio-runtime/lib
parent3831dd37c8df19e25fa258db4d393ee068889dae (diff)
runtime: fix propagation of DONE state to message blocks
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r--gnuradio-runtime/lib/block.cc57
-rw-r--r--gnuradio-runtime/lib/tpb_thread_body.cc17
2 files changed, 72 insertions, 2 deletions
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index bdf484e8db..46cbc9858a 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -59,6 +59,8 @@ namespace gr {
d_min_output_buffer(std::max(output_signature->max_streams(),1), -1)
{
global_block_registry.register_primitive(alias(), this);
+ message_port_register_in(pmt::mp("system"));
+ set_msg_handler(pmt::mp("system"), boost::bind(&block::system_handler, this, _1));
#ifdef ENABLE_GR_LOG
#ifdef HAVE_LOG4CPP
@@ -734,6 +736,61 @@ namespace gr {
}
}
+
+ void
+ block::system_handler(pmt::pmt_t msg)
+ {
+ //std::cout << "system_handler " << msg << "\n";
+ pmt::pmt_t op = pmt::car(msg);
+ if(pmt::eqv(op, pmt::mp("done"))){
+ d_finished = pmt::to_long(pmt::cdr(msg));
+ global_block_registry.notify_blk(alias());
+ } else {
+ std::cout << "WARNING: bad message op on system port!\n";
+ pmt::print(msg);
+ }
+ }
+
+ void
+ block::notify_msg_neighbors()
+ {
+ size_t len = pmt::length(d_message_subscribers);
+ pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL);
+ pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers);
+ for(size_t i = 0; i < len; i++) {
+ // for each output port
+ pmt::pmt_t oport = pmt::nth(i,keys);
+
+ // for each subscriber on this port
+ pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, oport, pmt::PMT_NIL);
+
+ // iterate through subscribers on port
+ while(pmt::is_pair(currlist)) {
+ pmt::pmt_t target = pmt::car(currlist);
+
+ pmt::pmt_t block = pmt::car(target);
+ pmt::pmt_t port = pmt::mp("system");
+
+ currlist = pmt::cdr(currlist);
+ basic_block_sptr blk = global_block_registry.block_lookup(block);
+ blk->post(port, pmt::cons(pmt::mp("done"), pmt::mp(true)));
+
+ //std::cout << "notify finished --> ";
+ //pmt::print(pmt::cons(block,port));
+ //std::cout << "\n";
+
+ }
+ }
+ }
+
+ bool
+ block::finished()
+ {
+ return d_finished;
+ }
+
+
+
void
block::setup_pc_rpc()
{
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc
index 79abd0e61d..eb47a43ee3 100644
--- a/gnuradio-runtime/lib/tpb_thread_body.cc
+++ b/gnuradio-runtime/lib/tpb_thread_body.cc
@@ -83,8 +83,12 @@ namespace gr {
if(block->thread_priority() > 0) {
gr::thread::set_thread_priority(d->thread, block->thread_priority());
}
+
+ // make sure our block isnt finished
+ block->clear_finished();
while(1) {
+ tpb_loop_top:
boost::this_thread::interruption_point();
// handle any queued up messages
@@ -116,6 +120,10 @@ namespace gr {
s = block_executor::BLKD_IN;
}
+ // if msg ports think we are done, we are done
+ if(block->finished())
+ s = block_executor::DONE;
+
switch(s){
case block_executor::READY: // Tell neighbors we made progress.
d->d_tpb.notify_neighbors(d);
@@ -126,6 +134,7 @@ namespace gr {
break;
case block_executor::DONE: // Game over.
+ block->notify_msg_neighbors();
d->d_tpb.notify_neighbors(d);
return;
@@ -135,8 +144,12 @@ namespace gr {
while(!d->d_tpb.input_changed) {
// wait for input or message
- while(!d->d_tpb.input_changed && block->empty_handled_p())
- d->d_tpb.input_cond.wait(guard);
+ while(!d->d_tpb.input_changed && block->empty_handled_p()){
+ boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250);
+ if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){
+ goto tpb_loop_top; // timeout occured (perform sanity checks up top)
+ }
+ }
// handle all pending messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {