summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r--gnuradio-runtime/lib/CMakeLists.txt6
-rw-r--r--gnuradio-runtime/lib/basic_block.cc23
-rw-r--r--gnuradio-runtime/lib/block.cc21
-rw-r--r--gnuradio-runtime/lib/controlport/CMakeLists.txt1
-rw-r--r--gnuradio-runtime/lib/hier_block2.cc12
-rw-r--r--gnuradio-runtime/lib/hier_block2_detail.cc18
-rw-r--r--gnuradio-runtime/lib/hier_block2_detail.h3
-rw-r--r--gnuradio-runtime/lib/logger.cc46
-rw-r--r--gnuradio-runtime/lib/pmt/CMakeLists.txt5
-rw-r--r--gnuradio-runtime/lib/qa_logger.cc5
-rw-r--r--gnuradio-runtime/lib/scheduler_sts.cc90
-rw-r--r--gnuradio-runtime/lib/scheduler_sts.h66
-rw-r--r--gnuradio-runtime/lib/single_threaded_scheduler.cc363
-rw-r--r--gnuradio-runtime/lib/single_threaded_scheduler.h65
-rw-r--r--gnuradio-runtime/lib/top_block_impl.cc4
-rw-r--r--gnuradio-runtime/lib/tpb_thread_body.cc92
16 files changed, 79 insertions, 741 deletions
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt
index 3da550d37b..c6e42876ce 100644
--- a/gnuradio-runtime/lib/CMakeLists.txt
+++ b/gnuradio-runtime/lib/CMakeLists.txt
@@ -106,9 +106,7 @@ list(APPEND gnuradio_runtime_sources
realtime.cc
realtime_impl.cc
scheduler.cc
- scheduler_sts.cc
scheduler_tpb.cc
- single_threaded_scheduler.cc
sptr_magic.cc
sync_block.cc
sync_decimator.cc
@@ -203,7 +201,7 @@ endif(TRY_SHM_VMCIRCBUF)
#######################################################
add_library(gnuradio-runtime SHARED ${gnuradio_runtime_sources})
target_link_libraries(gnuradio-runtime ${gnuradio_runtime_libs})
-GR_LIBRARY_FOO(gnuradio-runtime RUNTIME_COMPONENT "runtime_runtime" DEVEL_COMPONENT "runtime_devel")
+GR_LIBRARY_FOO(gnuradio-runtime)
add_dependencies(gnuradio-runtime
pmt_generated runtime_generated_includes
@@ -241,7 +239,7 @@ if(ENABLE_STATIC_LIBS)
endif(NOT WIN32)
install(TARGETS gnuradio-runtime_static
- ARCHIVE DESTINATION lib${LIB_SUFFIX} COMPONENT "runtime_devel" # .lib file
+ ARCHIVE DESTINATION lib${LIB_SUFFIX} # .lib file
)
endif(ENABLE_STATIC_LIBS)
diff --git a/gnuradio-runtime/lib/basic_block.cc b/gnuradio-runtime/lib/basic_block.cc
index 082d0753c8..89aa9b8671 100644
--- a/gnuradio-runtime/lib/basic_block.cc
+++ b/gnuradio-runtime/lib/basic_block.cc
@@ -228,29 +228,6 @@ namespace gr {
}
pmt::pmt_t
- basic_block::delete_head_blocking(pmt::pmt_t which_port, unsigned int millisec)
- {
- gr::thread::scoped_lock guard(mutex);
-
- if (millisec) {
- boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(millisec);
- while (empty_p(which_port)) {
- if (!msg_queue_ready[which_port]->timed_wait(guard, timeout)) {
- return pmt::pmt_t();
- }
- }
- } else {
- while(empty_p(which_port)) {
- msg_queue_ready[which_port]->wait(guard);
- }
- }
-
- pmt::pmt_t m(msg_queue[which_port].front());
- msg_queue[which_port].pop_front();
- return m;
- }
-
- pmt::pmt_t
basic_block::message_subscribers(pmt::pmt_t port)
{
return pmt::dict_ref(d_message_subscribers,port,pmt::PMT_NIL);
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index 2bae8ea9f8..4c408ab7ed 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -707,8 +707,7 @@ namespace gr {
//std::cout << "system_handler " << msg << "\n";
pmt::pmt_t op = pmt::car(msg);
if(pmt::eqv(op, pmt::mp("done"))){
- d_finished = pmt::to_long(pmt::cdr(msg));
- global_block_registry.notify_blk(alias());
+ d_finished = pmt::to_bool(pmt::cdr(msg));
} else {
std::cout << "WARNING: bad message op on system port!\n";
pmt::print(msg);
@@ -716,6 +715,20 @@ namespace gr {
}
void
+ block::set_log_level(std::string level)
+ {
+ logger_set_level(d_logger, level);
+ }
+
+ std::string
+ block::log_level()
+ {
+ std::string level;
+ logger_get_level(d_logger, level);
+ return level;
+ }
+
+ void
block::notify_msg_neighbors()
{
size_t len = pmt::length(d_message_subscribers);
@@ -737,7 +750,7 @@ namespace gr {
currlist = pmt::cdr(currlist);
basic_block_sptr blk = global_block_registry.block_lookup(block);
- blk->post(port, pmt::cons(pmt::mp("done"), pmt::mp(true)));
+ blk->post(port, pmt::cons(pmt::mp("done"), pmt::PMT_T));
//std::cout << "notify finished --> ";
//pmt::print(pmt::cons(block,port));
@@ -750,7 +763,7 @@ namespace gr {
bool
block::finished()
{
- if((detail()->ninputs() != 0) || (detail()->noutputs() != 0))
+ if(detail()->ninputs() != 0)
return false;
else
return d_finished;
diff --git a/gnuradio-runtime/lib/controlport/CMakeLists.txt b/gnuradio-runtime/lib/controlport/CMakeLists.txt
index c9bdeb949e..80d31fc721 100644
--- a/gnuradio-runtime/lib/controlport/CMakeLists.txt
+++ b/gnuradio-runtime/lib/controlport/CMakeLists.txt
@@ -84,7 +84,6 @@ list(APPEND gnuradio_runtime_libs
install(
FILES ${CMAKE_CURRENT_SOURCE_DIR}/thrift/thrift.conf.example
DESTINATION ${SYSCONFDIR}/${CMAKE_PROJECT_NAME}
- COMPONENT "runtime_runtime"
)
endif(THRIFT_FOUND)
diff --git a/gnuradio-runtime/lib/hier_block2.cc b/gnuradio-runtime/lib/hier_block2.cc
index 597ae032ec..8ebbbda587 100644
--- a/gnuradio-runtime/lib/hier_block2.cc
+++ b/gnuradio-runtime/lib/hier_block2.cc
@@ -178,6 +178,18 @@ namespace gr {
return d_detail->processor_affinity();
}
+ void
+ hier_block2::set_log_level(std::string level)
+ {
+ d_detail->set_log_level(level);
+ }
+
+ std::string
+ hier_block2::log_level()
+ {
+ return d_detail->log_level();
+ }
+
std::string
dot_graph(hier_block2_sptr hierblock2)
{
diff --git a/gnuradio-runtime/lib/hier_block2_detail.cc b/gnuradio-runtime/lib/hier_block2_detail.cc
index 0d0ddf55ba..e6d867b269 100644
--- a/gnuradio-runtime/lib/hier_block2_detail.cc
+++ b/gnuradio-runtime/lib/hier_block2_detail.cc
@@ -956,4 +956,22 @@ namespace gr {
return tmp[0]->processor_affinity();
}
+ void
+ hier_block2_detail::set_log_level(std::string level)
+ {
+ basic_block_vector_t tmp = d_fg->calc_used_blocks();
+ for(basic_block_viter_t p = tmp.begin(); p != tmp.end(); p++) {
+ (*p)->set_log_level(level);
+ }
+ }
+
+ std::string
+ hier_block2_detail::log_level()
+ {
+ // Assume that log_level was set for all hier_block2 blocks
+ basic_block_vector_t tmp = d_fg->calc_used_blocks();
+ return tmp[0]->log_level();
+ }
+
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/hier_block2_detail.h b/gnuradio-runtime/lib/hier_block2_detail.h
index a5584fe92a..aa419c49bd 100644
--- a/gnuradio-runtime/lib/hier_block2_detail.h
+++ b/gnuradio-runtime/lib/hier_block2_detail.h
@@ -57,6 +57,9 @@ namespace gr {
void set_processor_affinity(const std::vector<int> &mask);
void unset_processor_affinity();
std::vector<int> processor_affinity();
+
+ void set_log_level(std::string level);
+ std::string log_level();
// Track output buffer min/max settings
std::vector<size_t> d_max_output_buffer;
diff --git a/gnuradio-runtime/lib/logger.cc b/gnuradio-runtime/lib/logger.cc
index fd9a482c94..0bb898833b 100644
--- a/gnuradio-runtime/lib/logger.cc
+++ b/gnuradio-runtime/lib/logger.cc
@@ -35,10 +35,6 @@
#include <stdexcept>
#include <algorithm>
-
-#ifdef ENABLE_GR_LOG
-#ifdef HAVE_LOG4CPP
-
namespace gr {
bool logger_config::logger_configured(false);
@@ -311,8 +307,6 @@ namespace gr {
} /* namespace gr */
-#endif /* HAVE_LOG4CPP */
-
/****** Start Methods to provide Python the capabilities of the macros ********/
void
gr_logger_config(const std::string config_filename, unsigned int watch_period)
@@ -336,38 +330,12 @@ gr_logger_reset_config(void)
// Remaining capability provided by gr::logger class in gnuradio/logger.h
-#else /* ENABLE_GR_LOG */
-
-/****** Start Methods to provide Python the capabilities of the macros ********/
-void
-gr_logger_config(const std::string config_filename, unsigned int watch_period)
-{
- //NOP
-}
-
-std::vector<std::string>
-gr_logger_get_logger_names(void)
-{
- return std::vector<std::string>(1, "");
-}
-
-void
-gr_logger_reset_config(void)
-{
- //NOP
-}
-
-#endif /* ENABLE_GR_LOG */
-
-
namespace gr {
bool
configure_default_loggers(gr::logger_ptr &l, gr::logger_ptr &d,
const std::string name)
{
-#ifdef ENABLE_GR_LOG
-#ifdef HAVE_LOG4CPP
prefs *p = prefs::singleton();
std::string config_file = p->get_string("LOG", "log_config", "");
std::string log_level = p->get_string("LOG", "log_level", "off");
@@ -407,21 +375,11 @@ namespace gr {
}
d = DLOG;
return true;
-#endif /* HAVE_LOG4CPP */
-
-#else /* ENABLE_GR_LOG */
- l = NULL;
- d = NULL;
- return false;
-#endif /* ENABLE_GR_LOG */
- return false;
}
bool
update_logger_alias(const std::string &name, const std::string &alias)
{
-#ifdef ENABLE_GR_LOG
-#ifdef HAVE_LOG4CPP
prefs *p = prefs::singleton();
std::string log_file = p->get_string("LOG", "log_file", "");
std::string debug_file = p->get_string("LOG", "debug_file", "");
@@ -442,10 +400,6 @@ namespace gr {
}
}
return true;
-#endif /* HAVE_LOG4CPP */
-#endif /* ENABLE_GR_LOG */
-
- return false;
}
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/pmt/CMakeLists.txt b/gnuradio-runtime/lib/pmt/CMakeLists.txt
index 32c0e57a6a..e5c8f2f47e 100644
--- a/gnuradio-runtime/lib/pmt/CMakeLists.txt
+++ b/gnuradio-runtime/lib/pmt/CMakeLists.txt
@@ -44,7 +44,6 @@ add_custom_command(
install(
FILES ${PMT_SERIAL_TAGS_H}
DESTINATION ${GR_INCLUDE_DIR}/pmt
- COMPONENT "runtime_devel"
)
include(AddFileDependencies)
@@ -108,7 +107,7 @@ endif(MSVC)
add_library(gnuradio-pmt SHARED ${pmt_sources})
target_link_libraries(gnuradio-pmt ${gnuradio_pmt_libs})
-GR_LIBRARY_FOO(gnuradio-pmt RUNTIME_COMPONENT "runtime_runtime" DEVEL_COMPONENT "runtime_devel")
+GR_LIBRARY_FOO(gnuradio-pmt)
add_dependencies(gnuradio-pmt
pmt_generated
@@ -125,7 +124,7 @@ if(ENABLE_STATIC_LIBS)
endif(NOT WIN32)
install(TARGETS gnuradio-pmt_static
- ARCHIVE DESTINATION lib${LIB_SUFFIX} COMPONENT "runtime_devel" # .lib file
+ ARCHIVE DESTINATION lib${LIB_SUFFIX} # .lib file
)
endif(ENABLE_STATIC_LIBS)
diff --git a/gnuradio-runtime/lib/qa_logger.cc b/gnuradio-runtime/lib/qa_logger.cc
index 904893cc4d..20d7392143 100644
--- a/gnuradio-runtime/lib/qa_logger.cc
+++ b/gnuradio-runtime/lib/qa_logger.cc
@@ -35,10 +35,6 @@
void
qa_logger::t1()
{
-#ifdef ENABLE_GR_LOG
- // This doesn't really test anything, more just
- // making sure nothing's gone horribly wrong.
-
GR_LOG_GETLOGGER(LOG,"main");
GR_ADD_CONSOLE_APPENDER("main","cout","%d{%H:%M:%S} : %m%n");
GR_LOG_NOTICE(LOG,"test from c++ NOTICE");
@@ -48,5 +44,4 @@ qa_logger::t1()
GR_LOG_ERROR(LOG,"test from c++ ERROR");
GR_LOG_FATAL(LOG,"test from c++ FATAL");
CPPUNIT_ASSERT(true);
-#endif
}
diff --git a/gnuradio-runtime/lib/scheduler_sts.cc b/gnuradio-runtime/lib/scheduler_sts.cc
deleted file mode 100644
index 19d05b2316..0000000000
--- a/gnuradio-runtime/lib/scheduler_sts.cc
+++ /dev/null
@@ -1,90 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2008,2013 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include "scheduler_sts.h"
-#include "single_threaded_scheduler.h"
-#include <gnuradio/thread/thread_body_wrapper.h>
-
-namespace gr {
-
- class sts_container
- {
- block_vector_t d_blocks;
-
- public:
- sts_container(block_vector_t blocks)
- : d_blocks(blocks) {}
-
- void operator()()
- {
- make_single_threaded_scheduler(d_blocks)->run();
- }
- };
-
- scheduler_sptr
- scheduler_sts::make(flat_flowgraph_sptr ffg, int max_noutput_items)
- {
- return scheduler_sptr(new scheduler_sts(ffg, max_noutput_items));
- }
-
- scheduler_sts::scheduler_sts(flat_flowgraph_sptr ffg, int max_noutput_items)
- : scheduler(ffg, max_noutput_items)
- {
- // Split the flattened flow graph into discrete partitions, each
- // of which is topologically sorted.
-
- std::vector<basic_block_vector_t> graphs = ffg->partition();
-
- // For each partition, create a thread to evaluate it using
- // an instance of the gr_single_threaded_scheduler
-
- for(std::vector<basic_block_vector_t>::iterator p = graphs.begin();
- p != graphs.end(); p++) {
-
- block_vector_t blocks = flat_flowgraph::make_block_vector(*p);
- d_threads.create_thread(
- gr::thread::thread_body_wrapper<sts_container>(sts_container(blocks),
- "single-threaded-scheduler"));
- }
- }
-
- scheduler_sts::~scheduler_sts()
- {
- stop();
- }
-
- void
- scheduler_sts::stop()
- {
- d_threads.interrupt_all();
- }
-
- void
- scheduler_sts::wait()
- {
- d_threads.join_all();
- }
-
-} /* namespace gr */
diff --git a/gnuradio-runtime/lib/scheduler_sts.h b/gnuradio-runtime/lib/scheduler_sts.h
deleted file mode 100644
index b4cddb4614..0000000000
--- a/gnuradio-runtime/lib/scheduler_sts.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2008,2013 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
- */
-
-#ifndef INCLUDED_GR_SCHEDULER_STS_H
-#define INCLUDED_GR_SCHEDULER_STS_H
-
-#include <gnuradio/api.h>
-#include <gnuradio/thread/thread_group.h>
-#include "scheduler.h"
-
-namespace gr {
-
- /*!
- * \brief Concrete scheduler that uses the single_threaded_scheduler
- */
- class GR_RUNTIME_API scheduler_sts : public scheduler
- {
- gr::thread::thread_group d_threads;
-
- protected:
- /*!
- * \brief Construct a scheduler and begin evaluating the graph.
- *
- * The scheduler will continue running until all blocks until they
- * report that they are done or the stop method is called.
- */
- scheduler_sts(flat_flowgraph_sptr ffg, int max_noutput_items);
-
- public:
- static scheduler_sptr make(flat_flowgraph_sptr ffg,
- int max_noutput_items);
-
- ~scheduler_sts();
-
- /*!
- * \brief Tell the scheduler to stop executing.
- */
- void stop();
-
- /*!
- * \brief Block until the graph is done.
- */
- void wait();
- };
-
-} /* namespace gr */
-
-#endif /* INCLUDED_GR_SCHEDULER_STS_H */
diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.cc b/gnuradio-runtime/lib/single_threaded_scheduler.cc
deleted file mode 100644
index c86d26ca3a..0000000000
--- a/gnuradio-runtime/lib/single_threaded_scheduler.cc
+++ /dev/null
@@ -1,363 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2004 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "single_threaded_scheduler.h"
-#include <gnuradio/block.h>
-#include <gnuradio/block_detail.h>
-#include <gnuradio/buffer.h>
-#include <boost/thread.hpp>
-#include <boost/format.hpp>
-#include <iostream>
-#include <limits>
-#include <assert.h>
-#include <stdio.h>
-
-namespace gr {
-
- // must be defined to either 0 or 1
-#define ENABLE_LOGGING 0
-
-#if (ENABLE_LOGGING)
-#define LOG(x) do { x; } while(0)
-#else
-#define LOG(x) do {;} while(0)
-#endif
-
- static int which_scheduler = 0;
-
- single_threaded_scheduler_sptr
- make_single_threaded_scheduler(const std::vector<block_sptr> &blocks)
- {
- return single_threaded_scheduler_sptr
- (new single_threaded_scheduler(blocks));
- }
-
- single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks)
- : d_blocks(blocks), d_enabled(true), d_log(0)
- {
- if(ENABLE_LOGGING) {
- std::string name = str(boost::format("sst-%d.log") % which_scheduler++);
- d_log = new std::ofstream(name.c_str());
- *d_log << "single_threaded_scheduler: "
- << d_blocks.size ()
- << " blocks\n";
- }
- }
-
- single_threaded_scheduler::~single_threaded_scheduler()
- {
- if(ENABLE_LOGGING)
- delete d_log;
- }
-
- void
- single_threaded_scheduler::run()
- {
- // d_enabled = true; // KLUDGE
- main_loop ();
- }
-
- void
- single_threaded_scheduler::stop()
- {
- if(0)
- std::cout << "gr_singled_threaded_scheduler::stop() "
- << this << std::endl;
- d_enabled = false;
- }
-
- inline static unsigned int
- round_up(unsigned int n, unsigned int multiple)
- {
- return ((n + multiple - 1) / multiple) * multiple;
- }
-
- inline static unsigned int
- round_down(unsigned int n, unsigned int multiple)
- {
- return (n / multiple) * multiple;
- }
-
- //
- // Return minimum available write space in all our downstream
- // buffers or -1 if we're output blocked and the output we're
- // blocked on is done.
- //
- static int
- min_available_space(block_detail *d, int output_multiple)
- {
- int min_space = std::numeric_limits<int>::max();
-
- for(int i = 0; i < d->noutputs (); i++) {
- int n = round_down (d->output(i)->space_available (), output_multiple);
- if(n == 0) { // We're blocked on output.
- if(d->output(i)->done()) { // Downstream is done, therefore we're done.
- return -1;
- }
- return 0;
- }
- min_space = std::min (min_space, n);
- }
- return min_space;
- }
-
- void
- single_threaded_scheduler::main_loop()
- {
- static const int DEFAULT_CAPACITY = 16;
-
- int noutput_items;
- gr_vector_int ninput_items_required(DEFAULT_CAPACITY);
- gr_vector_int ninput_items(DEFAULT_CAPACITY);
- gr_vector_const_void_star input_items(DEFAULT_CAPACITY);
- gr_vector_void_star output_items(DEFAULT_CAPACITY);
- unsigned int bi;
- unsigned int nalive;
- int max_items_avail;
- bool made_progress_last_pass;
- bool making_progress;
-
- for(unsigned i = 0; i < d_blocks.size (); i++)
- d_blocks[i]->detail()->set_done (false); // reset any done flags
-
- for(unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc.
- d_blocks[i]->start();
-
- bi = 0;
- made_progress_last_pass = true;
- making_progress = false;
-
- // Loop while there are still blocks alive
-
- nalive = d_blocks.size ();
- while(d_enabled && nalive > 0) {
- if(boost::this_thread::interruption_requested())
- break;
-
- block *m = d_blocks[bi].get ();
- block_detail *d = m->detail().get ();
-
- LOG(*d_log << std::endl << m);
-
- if(d->done ())
- goto next_block;
-
- if(d->source_p ()) {
- // Invoke sources as a last resort. As long as the previous
- // pass made progress, don't call a source.
- if(made_progress_last_pass) {
- LOG(*d_log << " Skipping source\n");
- goto next_block;
- }
-
- ninput_items_required.resize (0);
- ninput_items.resize (0);
- input_items.resize (0);
- output_items.resize (d->noutputs ());
-
- // determine the minimum available output space
- noutput_items = min_available_space (d, m->output_multiple ());
- LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
- if(noutput_items == -1) // we're done
- goto were_done;
-
- if(noutput_items == 0) { // we're output blocked
- LOG(*d_log << " BLKD_OUT\n");
- goto next_block;
- }
-
- goto setup_call_to_work; // jump to common code
- }
-
- else if(d->sink_p ()) {
- ninput_items_required.resize (d->ninputs ());
- ninput_items.resize (d->ninputs ());
- input_items.resize (d->ninputs ());
- output_items.resize (0);
- LOG(*d_log << " sink\n");
-
- max_items_avail = 0;
- for(int i = 0; i < d->ninputs (); i++) {
- ninput_items[i] = d->input(i)->items_available();
- //if (ninput_items[i] == 0 && d->input(i)->done())
- if(ninput_items[i] < m->output_multiple() && d->input(i)->done())
- goto were_done;
-
- max_items_avail = std::max (max_items_avail, ninput_items[i]);
- }
-
- // take a swag at how much output we can sink
- noutput_items = (int) (max_items_avail * m->relative_rate ());
- noutput_items = round_down (noutput_items, m->output_multiple ());
- LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
- LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
-
- if(noutput_items == 0) { // we're blocked on input
- LOG(*d_log << " BLKD_IN\n");
- goto next_block;
- }
-
- goto try_again; // Jump to code shared with regular case.
- }
-
- else {
- // do the regular thing
- ninput_items_required.resize(d->ninputs ());
- ninput_items.resize(d->ninputs ());
- input_items.resize(d->ninputs ());
- output_items.resize(d->noutputs ());
-
- max_items_avail = 0;
- for(int i = 0; i < d->ninputs (); i++) {
- ninput_items[i] = d->input(i)->items_available ();
- max_items_avail = std::max(max_items_avail, ninput_items[i]);
- }
-
- // determine the minimum available output space
- noutput_items = min_available_space(d, m->output_multiple ());
- if(ENABLE_LOGGING){
- *d_log << " regular ";
- if(m->relative_rate() >= 1.0)
- *d_log << "1:" << m->relative_rate() << std::endl;
- else
- *d_log << 1.0/m->relative_rate() << ":1\n";
- *d_log << " max_items_avail = " << max_items_avail << std::endl;
- *d_log << " noutput_items = " << noutput_items << std::endl;
- }
- if(noutput_items == -1) // we're done
- goto were_done;
-
- if(noutput_items == 0) { // we're output blocked
- LOG(*d_log << " BLKD_OUT\n");
- goto next_block;
- }
-
-#if 0
- // Compute best estimate of noutput_items that we can really use.
- noutput_items =
- std::min((unsigned)noutput_items,
- std::max((unsigned)m->output_multiple(),
- round_up((unsigned)(max_items_avail * m->relative_rate()),
- m->output_multiple())));
-
- LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl);
-#endif
-
- try_again:
- if(m->fixed_rate()) {
- // try to work it forward starting with max_items_avail.
- // We want to try to consume all the input we've got.
- int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
- reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
- if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
- noutput_items = reqd_noutput_items;
- }
-
- // ask the block how much input they need to produce noutput_items
- m->forecast(noutput_items, ninput_items_required);
-
- // See if we've got sufficient input available
- int i;
- for(i = 0; i < d->ninputs (); i++)
- if(ninput_items_required[i] > ninput_items[i]) // not enough
- break;
-
- if(i < d->ninputs()) { // not enough input on input[i]
- // if we can, try reducing the size of our output request
- if(noutput_items > m->output_multiple ()){
- noutput_items /= 2;
- noutput_items = round_up (noutput_items, m->output_multiple ());
- goto try_again;
- }
-
- // We're blocked on input
- LOG(*d_log << " BLKD_IN\n");
- if(d->input(i)->done()) // If the upstream block is done, we're done
- goto were_done;
-
- // Is it possible to ever fulfill this request?
- if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) {
- // Nope, never going to happen...
- std::cerr << "\nsched: <block " << m->name()
- << " (" << m->unique_id() << ")>"
- << " is requesting more input data\n"
- << " than we can provide.\n"
- << " ninput_items_required = "
- << ninput_items_required[i] << "\n"
- << " max_possible_items_available = "
- << d->input(i)->max_possible_items_available() << "\n"
- << " If this is a filter, consider reducing the number of taps.\n";
- goto were_done;
- }
-
- goto next_block;
- }
-
- // We've got enough data on each input to produce noutput_items.
- // Finish setting up the call to work.
- for(int i = 0; i < d->ninputs (); i++)
- input_items[i] = d->input(i)->read_pointer();
-
- setup_call_to_work:
-
- for(int i = 0; i < d->noutputs (); i++)
- output_items[i] = d->output(i)->write_pointer();
-
- // Do the actual work of the block
- int n = m->general_work(noutput_items, ninput_items,
- input_items, output_items);
- LOG(*d_log << " general_work: noutput_items = " << noutput_items
- << " result = " << n << std::endl);
-
- if(n == -1) // block is done
- goto were_done;
-
- d->produce_each(n); // advance write pointers
- if(n > 0)
- making_progress = true;
-
- goto next_block;
- }
- assert(0);
-
- were_done:
- LOG(*d_log << " were_done\n");
- d->set_done (true);
- nalive--;
-
- next_block:
- if(++bi >= d_blocks.size ()) {
- bi = 0;
- made_progress_last_pass = making_progress;
- making_progress = false;
- }
- }
-
- for(unsigned i = 0; i < d_blocks.size(); i++) // disable any drivers, etc.
- d_blocks[i]->stop();
- }
-
-} /* namespace gr */
diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.h b/gnuradio-runtime/lib/single_threaded_scheduler.h
deleted file mode 100644
index eccbf03b36..0000000000
--- a/gnuradio-runtime/lib/single_threaded_scheduler.h
+++ /dev/null
@@ -1,65 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2004 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_SINGLE_THREADED_SCHEDULER_H
-#define INCLUDED_GR_SINGLE_THREADED_SCHEDULER_H
-
-#include <gnuradio/api.h>
-#include <gnuradio/runtime_types.h>
-#include <fstream>
-
-namespace gr {
-
- class single_threaded_scheduler;
- typedef boost::shared_ptr<single_threaded_scheduler> single_threaded_scheduler_sptr;
-
- /*!
- * \brief Simple scheduler for stream computations.
- * \ingroup internal
- */
- class GR_RUNTIME_API single_threaded_scheduler
- {
- public:
- ~single_threaded_scheduler();
-
- void run();
- void stop();
-
- private:
- const std::vector<block_sptr> d_blocks;
- volatile bool d_enabled;
- std::ofstream *d_log;
-
- single_threaded_scheduler(const std::vector<block_sptr> &blocks);
-
- void main_loop();
-
- friend GR_RUNTIME_API single_threaded_scheduler_sptr
- make_single_threaded_scheduler(const std::vector<block_sptr> &blocks);
- };
-
- GR_RUNTIME_API single_threaded_scheduler_sptr
- make_single_threaded_scheduler(const std::vector<block_sptr> &blocks);
-
-} /* namespace gr */
-
-#endif /* INCLUDED_GR_SINGLE_THREADED_SCHEDULER_H */
diff --git a/gnuradio-runtime/lib/top_block_impl.cc b/gnuradio-runtime/lib/top_block_impl.cc
index 3f94867bc2..d2a07e89ef 100644
--- a/gnuradio-runtime/lib/top_block_impl.cc
+++ b/gnuradio-runtime/lib/top_block_impl.cc
@@ -26,7 +26,6 @@
#include "top_block_impl.h"
#include "flat_flowgraph.h"
-#include "scheduler_sts.h"
#include "scheduler_tpb.h"
#include <gnuradio/top_block.h>
#include <gnuradio/prefs.h>
@@ -48,8 +47,7 @@ namespace gr {
const char *name;
scheduler_maker f;
} scheduler_table[] = {
- { "TPB", scheduler_tpb::make }, // first entry is default
- { "STS", scheduler_sts::make }
+ { "TPB", scheduler_tpb::make } // first entry is default
};
static scheduler_sptr
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc
index 93591feee2..59f1d3162e 100644
--- a/gnuradio-runtime/lib/tpb_thread_body.cc
+++ b/gnuradio-runtime/lib/tpb_thread_body.cc
@@ -55,8 +55,6 @@ namespace gr {
size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100));
// Setup the logger for the scheduler
-#ifdef ENABLE_GR_LOG
-#ifdef HAVE_LOG4CPP
#undef LOG
std::string config_file = p->get_string("LOG", "log_config", "");
std::string log_level = p->get_string("LOG", "log_level", "off");
@@ -75,9 +73,6 @@ namespace gr {
GR_LOG_SET_FILE_APPENDER(LOG, log_file , true,"%r :%p: %c{1} - %m%n");
}
}
-#endif /* HAVE_LOG4CPP */
-#endif /* ENABLE_GR_LOG */
-
// Set thread affinity if it was set before fg was started.
if(block->processor_affinity().size() > 0) {
@@ -93,9 +88,10 @@ namespace gr {
block->clear_finished();
while(1) {
- tpb_loop_top:
boost::this_thread::interruption_point();
+ d->d_tpb.clear_changed();
+
// handle any queued up messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
// Check if we have a message handler attached before getting
@@ -116,97 +112,57 @@ namespace gr {
}
}
- d->d_tpb.clear_changed();
// run one iteration if we are a connected stream block
if(d->noutputs() >0 || d->ninputs()>0){
s = d_exec.run_one_iteration();
}
else {
s = block_executor::BLKD_IN;
+ // a msg port only block wants to shutdown
+ if(block->finished()) {
+ s = block_executor::DONE;
+ }
}
- // if msg ports think we are done, we are done
- if(block->finished())
+ if(block->finished() && s == block_executor::READY_NO_OUTPUT) {
s = block_executor::DONE;
+ d->set_done(true);
+ }
+
+ if(!d->ninputs() && s == block_executor::READY_NO_OUTPUT) {
+ s = block_executor::BLKD_IN;
+ }
switch(s){
- case block_executor::READY: // Tell neighbors we made progress.
+ case block_executor::READY: // Tell neighbors we made progress.
d->d_tpb.notify_neighbors(d);
break;
- case block_executor::READY_NO_OUTPUT: // Notify upstream only
+ case block_executor::READY_NO_OUTPUT: // Notify upstream only
d->d_tpb.notify_upstream(d);
break;
- case block_executor::DONE: // Game over.
+ case block_executor::DONE: // Game over.
block->notify_msg_neighbors();
d->d_tpb.notify_neighbors(d);
return;
- case block_executor::BLKD_IN: // Wait for input.
+ case block_executor::BLKD_IN: // Wait for input.
{
gr::thread::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.input_changed) {
-
- // wait for input or message
- while(!d->d_tpb.input_changed && block->empty_handled_p()){
- boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250);
- if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){
- goto tpb_loop_top; // timeout occurred (perform sanity checks up top)
- }
- }
- // handle all pending messages
- BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- if(block->has_msg_handler(i.first)) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first, msg);
- guard.lock();
- }
- }
- else {
- // leave msg in queue if no handler is defined
- // start dropping if we have too many
- if(block->nmsgs(i.first) > max_nmsgs){
- GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message");
- msg = block->delete_head_nowait(i.first);
- }
- }
- }
- if (d->done()) {
- return;
- }
+ if(!d->d_tpb.input_changed) {
+ boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250);
+ d->d_tpb.input_cond.timed_wait(guard, timeout);
}
}
break;
- case block_executor::BLKD_OUT: // Wait for output buffer space.
+ case block_executor::BLKD_OUT: // Wait for output buffer space.
{
- gr::thread::scoped_lock guard(d->d_tpb.mutex);
- while(!d->d_tpb.output_changed) {
- // wait for output room or message
- while(!d->d_tpb.output_changed && block->empty_handled_p())
- d->d_tpb.output_cond.wait(guard);
-
- // handle all pending messages
- BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- if(block->has_msg_handler(i.first)) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first, msg);
- guard.lock();
- }
- }
- else {
- // leave msg in queue if no handler is defined
- // start dropping if we have too many
- if(block->nmsgs(i.first) > max_nmsgs){
- GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message");
- msg = block->delete_head_nowait(i.first);
- }
- }
- }
+ gr::thread::scoped_lock guard(d->d_tpb.mutex);
+ while(!d->d_tpb.output_changed) {
+ d->d_tpb.output_cond.wait(guard);
}
}
break;