diff options
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r-- | gnuradio-runtime/lib/CMakeLists.txt | 6 | ||||
-rw-r--r-- | gnuradio-runtime/lib/basic_block.cc | 23 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block.cc | 21 | ||||
-rw-r--r-- | gnuradio-runtime/lib/controlport/CMakeLists.txt | 1 | ||||
-rw-r--r-- | gnuradio-runtime/lib/hier_block2.cc | 12 | ||||
-rw-r--r-- | gnuradio-runtime/lib/hier_block2_detail.cc | 18 | ||||
-rw-r--r-- | gnuradio-runtime/lib/hier_block2_detail.h | 3 | ||||
-rw-r--r-- | gnuradio-runtime/lib/logger.cc | 46 | ||||
-rw-r--r-- | gnuradio-runtime/lib/pmt/CMakeLists.txt | 5 | ||||
-rw-r--r-- | gnuradio-runtime/lib/pmt/pmt.cc | 8 | ||||
-rw-r--r-- | gnuradio-runtime/lib/pmt/pmt_int.h | 21 | ||||
-rw-r--r-- | gnuradio-runtime/lib/qa_logger.cc | 5 | ||||
-rw-r--r-- | gnuradio-runtime/lib/scheduler_sts.cc | 90 | ||||
-rw-r--r-- | gnuradio-runtime/lib/scheduler_sts.h | 66 | ||||
-rw-r--r-- | gnuradio-runtime/lib/single_threaded_scheduler.cc | 363 | ||||
-rw-r--r-- | gnuradio-runtime/lib/single_threaded_scheduler.h | 65 | ||||
-rw-r--r-- | gnuradio-runtime/lib/top_block_impl.cc | 4 | ||||
-rw-r--r-- | gnuradio-runtime/lib/tpb_thread_body.cc | 92 |
18 files changed, 80 insertions, 769 deletions
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt index 3da550d37b..c6e42876ce 100644 --- a/gnuradio-runtime/lib/CMakeLists.txt +++ b/gnuradio-runtime/lib/CMakeLists.txt @@ -106,9 +106,7 @@ list(APPEND gnuradio_runtime_sources realtime.cc realtime_impl.cc scheduler.cc - scheduler_sts.cc scheduler_tpb.cc - single_threaded_scheduler.cc sptr_magic.cc sync_block.cc sync_decimator.cc @@ -203,7 +201,7 @@ endif(TRY_SHM_VMCIRCBUF) ####################################################### add_library(gnuradio-runtime SHARED ${gnuradio_runtime_sources}) target_link_libraries(gnuradio-runtime ${gnuradio_runtime_libs}) -GR_LIBRARY_FOO(gnuradio-runtime RUNTIME_COMPONENT "runtime_runtime" DEVEL_COMPONENT "runtime_devel") +GR_LIBRARY_FOO(gnuradio-runtime) add_dependencies(gnuradio-runtime pmt_generated runtime_generated_includes @@ -241,7 +239,7 @@ if(ENABLE_STATIC_LIBS) endif(NOT WIN32) install(TARGETS gnuradio-runtime_static - ARCHIVE DESTINATION lib${LIB_SUFFIX} COMPONENT "runtime_devel" # .lib file + ARCHIVE DESTINATION lib${LIB_SUFFIX} # .lib file ) endif(ENABLE_STATIC_LIBS) diff --git a/gnuradio-runtime/lib/basic_block.cc b/gnuradio-runtime/lib/basic_block.cc index 082d0753c8..89aa9b8671 100644 --- a/gnuradio-runtime/lib/basic_block.cc +++ b/gnuradio-runtime/lib/basic_block.cc @@ -228,29 +228,6 @@ namespace gr { } pmt::pmt_t - basic_block::delete_head_blocking(pmt::pmt_t which_port, unsigned int millisec) - { - gr::thread::scoped_lock guard(mutex); - - if (millisec) { - boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(millisec); - while (empty_p(which_port)) { - if (!msg_queue_ready[which_port]->timed_wait(guard, timeout)) { - return pmt::pmt_t(); - } - } - } else { - while(empty_p(which_port)) { - msg_queue_ready[which_port]->wait(guard); - } - } - - pmt::pmt_t m(msg_queue[which_port].front()); - msg_queue[which_port].pop_front(); - return m; - } - - pmt::pmt_t basic_block::message_subscribers(pmt::pmt_t port) { return pmt::dict_ref(d_message_subscribers,port,pmt::PMT_NIL); diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index 2bae8ea9f8..4c408ab7ed 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -707,8 +707,7 @@ namespace gr { //std::cout << "system_handler " << msg << "\n"; pmt::pmt_t op = pmt::car(msg); if(pmt::eqv(op, pmt::mp("done"))){ - d_finished = pmt::to_long(pmt::cdr(msg)); - global_block_registry.notify_blk(alias()); + d_finished = pmt::to_bool(pmt::cdr(msg)); } else { std::cout << "WARNING: bad message op on system port!\n"; pmt::print(msg); @@ -716,6 +715,20 @@ namespace gr { } void + block::set_log_level(std::string level) + { + logger_set_level(d_logger, level); + } + + std::string + block::log_level() + { + std::string level; + logger_get_level(d_logger, level); + return level; + } + + void block::notify_msg_neighbors() { size_t len = pmt::length(d_message_subscribers); @@ -737,7 +750,7 @@ namespace gr { currlist = pmt::cdr(currlist); basic_block_sptr blk = global_block_registry.block_lookup(block); - blk->post(port, pmt::cons(pmt::mp("done"), pmt::mp(true))); + blk->post(port, pmt::cons(pmt::mp("done"), pmt::PMT_T)); //std::cout << "notify finished --> "; //pmt::print(pmt::cons(block,port)); @@ -750,7 +763,7 @@ namespace gr { bool block::finished() { - if((detail()->ninputs() != 0) || (detail()->noutputs() != 0)) + if(detail()->ninputs() != 0) return false; else return d_finished; diff --git a/gnuradio-runtime/lib/controlport/CMakeLists.txt b/gnuradio-runtime/lib/controlport/CMakeLists.txt index 0d5d0376cc..0aa8539742 100644 --- a/gnuradio-runtime/lib/controlport/CMakeLists.txt +++ b/gnuradio-runtime/lib/controlport/CMakeLists.txt @@ -84,7 +84,6 @@ list(APPEND gnuradio_runtime_libs install( FILES ${CMAKE_CURRENT_SOURCE_DIR}/thrift/thrift.conf.example DESTINATION ${SYSCONFDIR}/${CMAKE_PROJECT_NAME} - COMPONENT "runtime_runtime" ) endif(THRIFT_FOUND) diff --git a/gnuradio-runtime/lib/hier_block2.cc b/gnuradio-runtime/lib/hier_block2.cc index 597ae032ec..8ebbbda587 100644 --- a/gnuradio-runtime/lib/hier_block2.cc +++ b/gnuradio-runtime/lib/hier_block2.cc @@ -178,6 +178,18 @@ namespace gr { return d_detail->processor_affinity(); } + void + hier_block2::set_log_level(std::string level) + { + d_detail->set_log_level(level); + } + + std::string + hier_block2::log_level() + { + return d_detail->log_level(); + } + std::string dot_graph(hier_block2_sptr hierblock2) { diff --git a/gnuradio-runtime/lib/hier_block2_detail.cc b/gnuradio-runtime/lib/hier_block2_detail.cc index 49eb34a6d1..e23ca2bd3d 100644 --- a/gnuradio-runtime/lib/hier_block2_detail.cc +++ b/gnuradio-runtime/lib/hier_block2_detail.cc @@ -952,4 +952,22 @@ namespace gr { return tmp[0]->processor_affinity(); } + void + hier_block2_detail::set_log_level(std::string level) + { + basic_block_vector_t tmp = d_fg->calc_used_blocks(); + for(basic_block_viter_t p = tmp.begin(); p != tmp.end(); p++) { + (*p)->set_log_level(level); + } + } + + std::string + hier_block2_detail::log_level() + { + // Assume that log_level was set for all hier_block2 blocks + basic_block_vector_t tmp = d_fg->calc_used_blocks(); + return tmp[0]->log_level(); + } + + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/hier_block2_detail.h b/gnuradio-runtime/lib/hier_block2_detail.h index a5584fe92a..aa419c49bd 100644 --- a/gnuradio-runtime/lib/hier_block2_detail.h +++ b/gnuradio-runtime/lib/hier_block2_detail.h @@ -57,6 +57,9 @@ namespace gr { void set_processor_affinity(const std::vector<int> &mask); void unset_processor_affinity(); std::vector<int> processor_affinity(); + + void set_log_level(std::string level); + std::string log_level(); // Track output buffer min/max settings std::vector<size_t> d_max_output_buffer; diff --git a/gnuradio-runtime/lib/logger.cc b/gnuradio-runtime/lib/logger.cc index fd9a482c94..0bb898833b 100644 --- a/gnuradio-runtime/lib/logger.cc +++ b/gnuradio-runtime/lib/logger.cc @@ -35,10 +35,6 @@ #include <stdexcept> #include <algorithm> - -#ifdef ENABLE_GR_LOG -#ifdef HAVE_LOG4CPP - namespace gr { bool logger_config::logger_configured(false); @@ -311,8 +307,6 @@ namespace gr { } /* namespace gr */ -#endif /* HAVE_LOG4CPP */ - /****** Start Methods to provide Python the capabilities of the macros ********/ void gr_logger_config(const std::string config_filename, unsigned int watch_period) @@ -336,38 +330,12 @@ gr_logger_reset_config(void) // Remaining capability provided by gr::logger class in gnuradio/logger.h -#else /* ENABLE_GR_LOG */ - -/****** Start Methods to provide Python the capabilities of the macros ********/ -void -gr_logger_config(const std::string config_filename, unsigned int watch_period) -{ - //NOP -} - -std::vector<std::string> -gr_logger_get_logger_names(void) -{ - return std::vector<std::string>(1, ""); -} - -void -gr_logger_reset_config(void) -{ - //NOP -} - -#endif /* ENABLE_GR_LOG */ - - namespace gr { bool configure_default_loggers(gr::logger_ptr &l, gr::logger_ptr &d, const std::string name) { -#ifdef ENABLE_GR_LOG -#ifdef HAVE_LOG4CPP prefs *p = prefs::singleton(); std::string config_file = p->get_string("LOG", "log_config", ""); std::string log_level = p->get_string("LOG", "log_level", "off"); @@ -407,21 +375,11 @@ namespace gr { } d = DLOG; return true; -#endif /* HAVE_LOG4CPP */ - -#else /* ENABLE_GR_LOG */ - l = NULL; - d = NULL; - return false; -#endif /* ENABLE_GR_LOG */ - return false; } bool update_logger_alias(const std::string &name, const std::string &alias) { -#ifdef ENABLE_GR_LOG -#ifdef HAVE_LOG4CPP prefs *p = prefs::singleton(); std::string log_file = p->get_string("LOG", "log_file", ""); std::string debug_file = p->get_string("LOG", "debug_file", ""); @@ -442,10 +400,6 @@ namespace gr { } } return true; -#endif /* HAVE_LOG4CPP */ -#endif /* ENABLE_GR_LOG */ - - return false; } } /* namespace gr */ diff --git a/gnuradio-runtime/lib/pmt/CMakeLists.txt b/gnuradio-runtime/lib/pmt/CMakeLists.txt index 32c0e57a6a..e5c8f2f47e 100644 --- a/gnuradio-runtime/lib/pmt/CMakeLists.txt +++ b/gnuradio-runtime/lib/pmt/CMakeLists.txt @@ -44,7 +44,6 @@ add_custom_command( install( FILES ${PMT_SERIAL_TAGS_H} DESTINATION ${GR_INCLUDE_DIR}/pmt - COMPONENT "runtime_devel" ) include(AddFileDependencies) @@ -108,7 +107,7 @@ endif(MSVC) add_library(gnuradio-pmt SHARED ${pmt_sources}) target_link_libraries(gnuradio-pmt ${gnuradio_pmt_libs}) -GR_LIBRARY_FOO(gnuradio-pmt RUNTIME_COMPONENT "runtime_runtime" DEVEL_COMPONENT "runtime_devel") +GR_LIBRARY_FOO(gnuradio-pmt) add_dependencies(gnuradio-pmt pmt_generated @@ -125,7 +124,7 @@ if(ENABLE_STATIC_LIBS) endif(NOT WIN32) install(TARGETS gnuradio-pmt_static - ARCHIVE DESTINATION lib${LIB_SUFFIX} COMPONENT "runtime_devel" # .lib file + ARCHIVE DESTINATION lib${LIB_SUFFIX} # .lib file ) endif(ENABLE_STATIC_LIBS) diff --git a/gnuradio-runtime/lib/pmt/pmt.cc b/gnuradio-runtime/lib/pmt/pmt.cc index 3b92481549..da2a7e5cc2 100644 --- a/gnuradio-runtime/lib/pmt/pmt.cc +++ b/gnuradio-runtime/lib/pmt/pmt.cc @@ -63,7 +63,6 @@ pmt_base::operator delete(void *p, size_t size) #endif -#if ((BOOST_VERSION / 100000 >= 1) && (BOOST_VERSION / 100 % 1000 >= 53)) void intrusive_ptr_add_ref(pmt_base* p) { p->refcount_.fetch_add(1, boost::memory_order_relaxed); @@ -75,13 +74,6 @@ void intrusive_ptr_release(pmt_base* p) { delete p; } } -#else -// boost::atomic not available before 1.53 -// This section will be removed when support for boost 1.48 ceases -// NB: This code is prone to segfault on non-Intel architectures. -void intrusive_ptr_add_ref(pmt_base* p) { ++(p->count_); } -void intrusive_ptr_release(pmt_base* p) { if (--(p->count_) == 0 ) delete p; } -#endif pmt_base::~pmt_base() { diff --git a/gnuradio-runtime/lib/pmt/pmt_int.h b/gnuradio-runtime/lib/pmt/pmt_int.h index da48a0ddc8..f06f507944 100644 --- a/gnuradio-runtime/lib/pmt/pmt_int.h +++ b/gnuradio-runtime/lib/pmt/pmt_int.h @@ -25,13 +25,7 @@ #include <pmt/pmt.h> #include <boost/utility.hpp> #include <boost/version.hpp> -#if ((BOOST_VERSION / 100000 >= 1) && (BOOST_VERSION / 100 % 1000 >= 53)) - #include <boost/atomic.hpp> -#else - // boost::atomic not available before 1.53 - // This section will be removed when support for boost 1.48 ceases - #include <boost/detail/atomic_count.hpp> -#endif +#include <boost/atomic.hpp> /* * EVERYTHING IN THIS FILE IS PRIVATE TO THE IMPLEMENTATION! @@ -43,23 +37,10 @@ namespace pmt { class PMT_API pmt_base : boost::noncopyable { - -#if ((BOOST_VERSION / 100000 >= 1) && (BOOST_VERSION / 100 % 1000 >= 53)) mutable boost::atomic<int> refcount_; -#else - // boost::atomic not available before 1.53 - // This section will be removed when support for boost 1.48 ceases - mutable boost::detail::atomic_count count_; -#endif protected: -#if ((BOOST_VERSION / 100000 >= 1) && (BOOST_VERSION / 100 % 1000 >= 53)) pmt_base() : refcount_(0) {}; -#else - // boost::atomic not available before 1.53 - // This section will be removed when support for boost 1.48 ceases - pmt_base() : count_(0) {}; -#endif virtual ~pmt_base(); public: diff --git a/gnuradio-runtime/lib/qa_logger.cc b/gnuradio-runtime/lib/qa_logger.cc index 904893cc4d..20d7392143 100644 --- a/gnuradio-runtime/lib/qa_logger.cc +++ b/gnuradio-runtime/lib/qa_logger.cc @@ -35,10 +35,6 @@ void qa_logger::t1() { -#ifdef ENABLE_GR_LOG - // This doesn't really test anything, more just - // making sure nothing's gone horribly wrong. - GR_LOG_GETLOGGER(LOG,"main"); GR_ADD_CONSOLE_APPENDER("main","cout","%d{%H:%M:%S} : %m%n"); GR_LOG_NOTICE(LOG,"test from c++ NOTICE"); @@ -48,5 +44,4 @@ qa_logger::t1() GR_LOG_ERROR(LOG,"test from c++ ERROR"); GR_LOG_FATAL(LOG,"test from c++ FATAL"); CPPUNIT_ASSERT(true); -#endif } diff --git a/gnuradio-runtime/lib/scheduler_sts.cc b/gnuradio-runtime/lib/scheduler_sts.cc deleted file mode 100644 index 19d05b2316..0000000000 --- a/gnuradio-runtime/lib/scheduler_sts.cc +++ /dev/null @@ -1,90 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2008,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifdef HAVE_CONFIG_H -#include <config.h> -#endif - -#include "scheduler_sts.h" -#include "single_threaded_scheduler.h" -#include <gnuradio/thread/thread_body_wrapper.h> - -namespace gr { - - class sts_container - { - block_vector_t d_blocks; - - public: - sts_container(block_vector_t blocks) - : d_blocks(blocks) {} - - void operator()() - { - make_single_threaded_scheduler(d_blocks)->run(); - } - }; - - scheduler_sptr - scheduler_sts::make(flat_flowgraph_sptr ffg, int max_noutput_items) - { - return scheduler_sptr(new scheduler_sts(ffg, max_noutput_items)); - } - - scheduler_sts::scheduler_sts(flat_flowgraph_sptr ffg, int max_noutput_items) - : scheduler(ffg, max_noutput_items) - { - // Split the flattened flow graph into discrete partitions, each - // of which is topologically sorted. - - std::vector<basic_block_vector_t> graphs = ffg->partition(); - - // For each partition, create a thread to evaluate it using - // an instance of the gr_single_threaded_scheduler - - for(std::vector<basic_block_vector_t>::iterator p = graphs.begin(); - p != graphs.end(); p++) { - - block_vector_t blocks = flat_flowgraph::make_block_vector(*p); - d_threads.create_thread( - gr::thread::thread_body_wrapper<sts_container>(sts_container(blocks), - "single-threaded-scheduler")); - } - } - - scheduler_sts::~scheduler_sts() - { - stop(); - } - - void - scheduler_sts::stop() - { - d_threads.interrupt_all(); - } - - void - scheduler_sts::wait() - { - d_threads.join_all(); - } - -} /* namespace gr */ diff --git a/gnuradio-runtime/lib/scheduler_sts.h b/gnuradio-runtime/lib/scheduler_sts.h deleted file mode 100644 index b4cddb4614..0000000000 --- a/gnuradio-runtime/lib/scheduler_sts.h +++ /dev/null @@ -1,66 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2008,2013 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License along - * with this program; if not, write to the Free Software Foundation, Inc., - * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - */ - -#ifndef INCLUDED_GR_SCHEDULER_STS_H -#define INCLUDED_GR_SCHEDULER_STS_H - -#include <gnuradio/api.h> -#include <gnuradio/thread/thread_group.h> -#include "scheduler.h" - -namespace gr { - - /*! - * \brief Concrete scheduler that uses the single_threaded_scheduler - */ - class GR_RUNTIME_API scheduler_sts : public scheduler - { - gr::thread::thread_group d_threads; - - protected: - /*! - * \brief Construct a scheduler and begin evaluating the graph. - * - * The scheduler will continue running until all blocks until they - * report that they are done or the stop method is called. - */ - scheduler_sts(flat_flowgraph_sptr ffg, int max_noutput_items); - - public: - static scheduler_sptr make(flat_flowgraph_sptr ffg, - int max_noutput_items); - - ~scheduler_sts(); - - /*! - * \brief Tell the scheduler to stop executing. - */ - void stop(); - - /*! - * \brief Block until the graph is done. - */ - void wait(); - }; - -} /* namespace gr */ - -#endif /* INCLUDED_GR_SCHEDULER_STS_H */ diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.cc b/gnuradio-runtime/lib/single_threaded_scheduler.cc deleted file mode 100644 index c86d26ca3a..0000000000 --- a/gnuradio-runtime/lib/single_threaded_scheduler.cc +++ /dev/null @@ -1,363 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2004 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "single_threaded_scheduler.h" -#include <gnuradio/block.h> -#include <gnuradio/block_detail.h> -#include <gnuradio/buffer.h> -#include <boost/thread.hpp> -#include <boost/format.hpp> -#include <iostream> -#include <limits> -#include <assert.h> -#include <stdio.h> - -namespace gr { - - // must be defined to either 0 or 1 -#define ENABLE_LOGGING 0 - -#if (ENABLE_LOGGING) -#define LOG(x) do { x; } while(0) -#else -#define LOG(x) do {;} while(0) -#endif - - static int which_scheduler = 0; - - single_threaded_scheduler_sptr - make_single_threaded_scheduler(const std::vector<block_sptr> &blocks) - { - return single_threaded_scheduler_sptr - (new single_threaded_scheduler(blocks)); - } - - single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks) - : d_blocks(blocks), d_enabled(true), d_log(0) - { - if(ENABLE_LOGGING) { - std::string name = str(boost::format("sst-%d.log") % which_scheduler++); - d_log = new std::ofstream(name.c_str()); - *d_log << "single_threaded_scheduler: " - << d_blocks.size () - << " blocks\n"; - } - } - - single_threaded_scheduler::~single_threaded_scheduler() - { - if(ENABLE_LOGGING) - delete d_log; - } - - void - single_threaded_scheduler::run() - { - // d_enabled = true; // KLUDGE - main_loop (); - } - - void - single_threaded_scheduler::stop() - { - if(0) - std::cout << "gr_singled_threaded_scheduler::stop() " - << this << std::endl; - d_enabled = false; - } - - inline static unsigned int - round_up(unsigned int n, unsigned int multiple) - { - return ((n + multiple - 1) / multiple) * multiple; - } - - inline static unsigned int - round_down(unsigned int n, unsigned int multiple) - { - return (n / multiple) * multiple; - } - - // - // Return minimum available write space in all our downstream - // buffers or -1 if we're output blocked and the output we're - // blocked on is done. - // - static int - min_available_space(block_detail *d, int output_multiple) - { - int min_space = std::numeric_limits<int>::max(); - - for(int i = 0; i < d->noutputs (); i++) { - int n = round_down (d->output(i)->space_available (), output_multiple); - if(n == 0) { // We're blocked on output. - if(d->output(i)->done()) { // Downstream is done, therefore we're done. - return -1; - } - return 0; - } - min_space = std::min (min_space, n); - } - return min_space; - } - - void - single_threaded_scheduler::main_loop() - { - static const int DEFAULT_CAPACITY = 16; - - int noutput_items; - gr_vector_int ninput_items_required(DEFAULT_CAPACITY); - gr_vector_int ninput_items(DEFAULT_CAPACITY); - gr_vector_const_void_star input_items(DEFAULT_CAPACITY); - gr_vector_void_star output_items(DEFAULT_CAPACITY); - unsigned int bi; - unsigned int nalive; - int max_items_avail; - bool made_progress_last_pass; - bool making_progress; - - for(unsigned i = 0; i < d_blocks.size (); i++) - d_blocks[i]->detail()->set_done (false); // reset any done flags - - for(unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc. - d_blocks[i]->start(); - - bi = 0; - made_progress_last_pass = true; - making_progress = false; - - // Loop while there are still blocks alive - - nalive = d_blocks.size (); - while(d_enabled && nalive > 0) { - if(boost::this_thread::interruption_requested()) - break; - - block *m = d_blocks[bi].get (); - block_detail *d = m->detail().get (); - - LOG(*d_log << std::endl << m); - - if(d->done ()) - goto next_block; - - if(d->source_p ()) { - // Invoke sources as a last resort. As long as the previous - // pass made progress, don't call a source. - if(made_progress_last_pass) { - LOG(*d_log << " Skipping source\n"); - goto next_block; - } - - ninput_items_required.resize (0); - ninput_items.resize (0); - input_items.resize (0); - output_items.resize (d->noutputs ()); - - // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); - LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); - if(noutput_items == -1) // we're done - goto were_done; - - if(noutput_items == 0) { // we're output blocked - LOG(*d_log << " BLKD_OUT\n"); - goto next_block; - } - - goto setup_call_to_work; // jump to common code - } - - else if(d->sink_p ()) { - ninput_items_required.resize (d->ninputs ()); - ninput_items.resize (d->ninputs ()); - input_items.resize (d->ninputs ()); - output_items.resize (0); - LOG(*d_log << " sink\n"); - - max_items_avail = 0; - for(int i = 0; i < d->ninputs (); i++) { - ninput_items[i] = d->input(i)->items_available(); - //if (ninput_items[i] == 0 && d->input(i)->done()) - if(ninput_items[i] < m->output_multiple() && d->input(i)->done()) - goto were_done; - - max_items_avail = std::max (max_items_avail, ninput_items[i]); - } - - // take a swag at how much output we can sink - noutput_items = (int) (max_items_avail * m->relative_rate ()); - noutput_items = round_down (noutput_items, m->output_multiple ()); - LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl); - LOG(*d_log << " noutput_items = " << noutput_items << std::endl); - - if(noutput_items == 0) { // we're blocked on input - LOG(*d_log << " BLKD_IN\n"); - goto next_block; - } - - goto try_again; // Jump to code shared with regular case. - } - - else { - // do the regular thing - ninput_items_required.resize(d->ninputs ()); - ninput_items.resize(d->ninputs ()); - input_items.resize(d->ninputs ()); - output_items.resize(d->noutputs ()); - - max_items_avail = 0; - for(int i = 0; i < d->ninputs (); i++) { - ninput_items[i] = d->input(i)->items_available (); - max_items_avail = std::max(max_items_avail, ninput_items[i]); - } - - // determine the minimum available output space - noutput_items = min_available_space(d, m->output_multiple ()); - if(ENABLE_LOGGING){ - *d_log << " regular "; - if(m->relative_rate() >= 1.0) - *d_log << "1:" << m->relative_rate() << std::endl; - else - *d_log << 1.0/m->relative_rate() << ":1\n"; - *d_log << " max_items_avail = " << max_items_avail << std::endl; - *d_log << " noutput_items = " << noutput_items << std::endl; - } - if(noutput_items == -1) // we're done - goto were_done; - - if(noutput_items == 0) { // we're output blocked - LOG(*d_log << " BLKD_OUT\n"); - goto next_block; - } - -#if 0 - // Compute best estimate of noutput_items that we can really use. - noutput_items = - std::min((unsigned)noutput_items, - std::max((unsigned)m->output_multiple(), - round_up((unsigned)(max_items_avail * m->relative_rate()), - m->output_multiple()))); - - LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl); -#endif - - try_again: - if(m->fixed_rate()) { - // try to work it forward starting with max_items_avail. - // We want to try to consume all the input we've got. - int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); - reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple()); - if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items) - noutput_items = reqd_noutput_items; - } - - // ask the block how much input they need to produce noutput_items - m->forecast(noutput_items, ninput_items_required); - - // See if we've got sufficient input available - int i; - for(i = 0; i < d->ninputs (); i++) - if(ninput_items_required[i] > ninput_items[i]) // not enough - break; - - if(i < d->ninputs()) { // not enough input on input[i] - // if we can, try reducing the size of our output request - if(noutput_items > m->output_multiple ()){ - noutput_items /= 2; - noutput_items = round_up (noutput_items, m->output_multiple ()); - goto try_again; - } - - // We're blocked on input - LOG(*d_log << " BLKD_IN\n"); - if(d->input(i)->done()) // If the upstream block is done, we're done - goto were_done; - - // Is it possible to ever fulfill this request? - if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) { - // Nope, never going to happen... - std::cerr << "\nsched: <block " << m->name() - << " (" << m->unique_id() << ")>" - << " is requesting more input data\n" - << " than we can provide.\n" - << " ninput_items_required = " - << ninput_items_required[i] << "\n" - << " max_possible_items_available = " - << d->input(i)->max_possible_items_available() << "\n" - << " If this is a filter, consider reducing the number of taps.\n"; - goto were_done; - } - - goto next_block; - } - - // We've got enough data on each input to produce noutput_items. - // Finish setting up the call to work. - for(int i = 0; i < d->ninputs (); i++) - input_items[i] = d->input(i)->read_pointer(); - - setup_call_to_work: - - for(int i = 0; i < d->noutputs (); i++) - output_items[i] = d->output(i)->write_pointer(); - - // Do the actual work of the block - int n = m->general_work(noutput_items, ninput_items, - input_items, output_items); - LOG(*d_log << " general_work: noutput_items = " << noutput_items - << " result = " << n << std::endl); - - if(n == -1) // block is done - goto were_done; - - d->produce_each(n); // advance write pointers - if(n > 0) - making_progress = true; - - goto next_block; - } - assert(0); - - were_done: - LOG(*d_log << " were_done\n"); - d->set_done (true); - nalive--; - - next_block: - if(++bi >= d_blocks.size ()) { - bi = 0; - made_progress_last_pass = making_progress; - making_progress = false; - } - } - - for(unsigned i = 0; i < d_blocks.size(); i++) // disable any drivers, etc. - d_blocks[i]->stop(); - } - -} /* namespace gr */ diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.h b/gnuradio-runtime/lib/single_threaded_scheduler.h deleted file mode 100644 index eccbf03b36..0000000000 --- a/gnuradio-runtime/lib/single_threaded_scheduler.h +++ /dev/null @@ -1,65 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2004 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifndef INCLUDED_GR_SINGLE_THREADED_SCHEDULER_H -#define INCLUDED_GR_SINGLE_THREADED_SCHEDULER_H - -#include <gnuradio/api.h> -#include <gnuradio/runtime_types.h> -#include <fstream> - -namespace gr { - - class single_threaded_scheduler; - typedef boost::shared_ptr<single_threaded_scheduler> single_threaded_scheduler_sptr; - - /*! - * \brief Simple scheduler for stream computations. - * \ingroup internal - */ - class GR_RUNTIME_API single_threaded_scheduler - { - public: - ~single_threaded_scheduler(); - - void run(); - void stop(); - - private: - const std::vector<block_sptr> d_blocks; - volatile bool d_enabled; - std::ofstream *d_log; - - single_threaded_scheduler(const std::vector<block_sptr> &blocks); - - void main_loop(); - - friend GR_RUNTIME_API single_threaded_scheduler_sptr - make_single_threaded_scheduler(const std::vector<block_sptr> &blocks); - }; - - GR_RUNTIME_API single_threaded_scheduler_sptr - make_single_threaded_scheduler(const std::vector<block_sptr> &blocks); - -} /* namespace gr */ - -#endif /* INCLUDED_GR_SINGLE_THREADED_SCHEDULER_H */ diff --git a/gnuradio-runtime/lib/top_block_impl.cc b/gnuradio-runtime/lib/top_block_impl.cc index 3f94867bc2..d2a07e89ef 100644 --- a/gnuradio-runtime/lib/top_block_impl.cc +++ b/gnuradio-runtime/lib/top_block_impl.cc @@ -26,7 +26,6 @@ #include "top_block_impl.h" #include "flat_flowgraph.h" -#include "scheduler_sts.h" #include "scheduler_tpb.h" #include <gnuradio/top_block.h> #include <gnuradio/prefs.h> @@ -48,8 +47,7 @@ namespace gr { const char *name; scheduler_maker f; } scheduler_table[] = { - { "TPB", scheduler_tpb::make }, // first entry is default - { "STS", scheduler_sts::make } + { "TPB", scheduler_tpb::make } // first entry is default }; static scheduler_sptr diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc index 93591feee2..59f1d3162e 100644 --- a/gnuradio-runtime/lib/tpb_thread_body.cc +++ b/gnuradio-runtime/lib/tpb_thread_body.cc @@ -55,8 +55,6 @@ namespace gr { size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100)); // Setup the logger for the scheduler -#ifdef ENABLE_GR_LOG -#ifdef HAVE_LOG4CPP #undef LOG std::string config_file = p->get_string("LOG", "log_config", ""); std::string log_level = p->get_string("LOG", "log_level", "off"); @@ -75,9 +73,6 @@ namespace gr { GR_LOG_SET_FILE_APPENDER(LOG, log_file , true,"%r :%p: %c{1} - %m%n"); } } -#endif /* HAVE_LOG4CPP */ -#endif /* ENABLE_GR_LOG */ - // Set thread affinity if it was set before fg was started. if(block->processor_affinity().size() > 0) { @@ -93,9 +88,10 @@ namespace gr { block->clear_finished(); while(1) { - tpb_loop_top: boost::this_thread::interruption_point(); + d->d_tpb.clear_changed(); + // handle any queued up messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { // Check if we have a message handler attached before getting @@ -116,97 +112,57 @@ namespace gr { } } - d->d_tpb.clear_changed(); // run one iteration if we are a connected stream block if(d->noutputs() >0 || d->ninputs()>0){ s = d_exec.run_one_iteration(); } else { s = block_executor::BLKD_IN; + // a msg port only block wants to shutdown + if(block->finished()) { + s = block_executor::DONE; + } } - // if msg ports think we are done, we are done - if(block->finished()) + if(block->finished() && s == block_executor::READY_NO_OUTPUT) { s = block_executor::DONE; + d->set_done(true); + } + + if(!d->ninputs() && s == block_executor::READY_NO_OUTPUT) { + s = block_executor::BLKD_IN; + } switch(s){ - case block_executor::READY: // Tell neighbors we made progress. + case block_executor::READY: // Tell neighbors we made progress. d->d_tpb.notify_neighbors(d); break; - case block_executor::READY_NO_OUTPUT: // Notify upstream only + case block_executor::READY_NO_OUTPUT: // Notify upstream only d->d_tpb.notify_upstream(d); break; - case block_executor::DONE: // Game over. + case block_executor::DONE: // Game over. block->notify_msg_neighbors(); d->d_tpb.notify_neighbors(d); return; - case block_executor::BLKD_IN: // Wait for input. + case block_executor::BLKD_IN: // Wait for input. { gr::thread::scoped_lock guard(d->d_tpb.mutex); - while(!d->d_tpb.input_changed) { - - // wait for input or message - while(!d->d_tpb.input_changed && block->empty_handled_p()){ - boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250); - if(!d->d_tpb.input_cond.timed_wait(guard, timeout)){ - goto tpb_loop_top; // timeout occurred (perform sanity checks up top) - } - } - // handle all pending messages - BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - if(block->has_msg_handler(i.first)) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); - } - } - else { - // leave msg in queue if no handler is defined - // start dropping if we have too many - if(block->nmsgs(i.first) > max_nmsgs){ - GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message"); - msg = block->delete_head_nowait(i.first); - } - } - } - if (d->done()) { - return; - } + if(!d->d_tpb.input_changed) { + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(250); + d->d_tpb.input_cond.timed_wait(guard, timeout); } } break; - case block_executor::BLKD_OUT: // Wait for output buffer space. + case block_executor::BLKD_OUT: // Wait for output buffer space. { - gr::thread::scoped_lock guard(d->d_tpb.mutex); - while(!d->d_tpb.output_changed) { - // wait for output room or message - while(!d->d_tpb.output_changed && block->empty_handled_p()) - d->d_tpb.output_cond.wait(guard); - - // handle all pending messages - BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - if(block->has_msg_handler(i.first)) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); - } - } - else { - // leave msg in queue if no handler is defined - // start dropping if we have too many - if(block->nmsgs(i.first) > max_nmsgs){ - GR_LOG_WARN(LOG,"asynchronous message buffer overflowing, dropping message"); - msg = block->delete_head_nowait(i.first); - } - } - } + gr::thread::scoped_lock guard(d->d_tpb.mutex); + while(!d->d_tpb.output_changed) { + d->d_tpb.output_cond.wait(guard); } } break; |