diff options
31 files changed, 112 insertions, 492 deletions
diff --git a/gnuradio-core/src/lib/general/gr_nop.cc b/gnuradio-core/src/lib/general/gr_nop.cc index edfe1d76d9..ca5983c397 100644 --- a/gnuradio-core/src/lib/general/gr_nop.cc +++ b/gnuradio-core/src/lib/general/gr_nop.cc @@ -40,8 +40,7 @@ gr_nop::gr_nop (size_t sizeof_stream_item) d_nmsgs_recvd(0) { // Arrange to have count_received_msgs called when messages are received. - message_port_register_in(pmt::mp("port")); - set_msg_handler(pmt::mp("port"), boost::bind(&gr_nop::count_received_msgs, this, _1)); + set_msg_handler(boost::bind(&gr_nop::count_received_msgs, this, _1)); } // Trivial message handler that just counts them. diff --git a/gnuradio-core/src/lib/runtime/CMakeLists.txt b/gnuradio-core/src/lib/runtime/CMakeLists.txt index 70938a0f17..5f3672dde9 100644 --- a/gnuradio-core/src/lib/runtime/CMakeLists.txt +++ b/gnuradio-core/src/lib/runtime/CMakeLists.txt @@ -54,7 +54,6 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.cc - ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.cc @@ -117,7 +116,6 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gr_block.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_detail.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_executor.h - ${CMAKE_CURRENT_SOURCE_DIR}/gr_block_registry.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_hier_block2_detail.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_buffer.h diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 3d08b63d1a..d7263b92d0 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -25,9 +25,7 @@ #endif #include <gr_basic_block.h> -#include <gr_block_registry.h> #include <stdexcept> -#include <sstream> using namespace pmt; @@ -47,10 +45,7 @@ gr_basic_block::gr_basic_block(const std::string &name, d_input_signature(input_signature), d_output_signature(output_signature), d_unique_id(s_next_id++), - d_symbolic_id(global_block_registry.block_register(this)), - d_symbol_name(global_block_registry.register_symbolic_name(this)), - d_color(WHITE), - message_subscribers(pmt::pmt_make_dict()) + d_color(WHITE) { s_ncurrently_allocated++; } @@ -58,7 +53,6 @@ gr_basic_block::gr_basic_block(const std::string &name, gr_basic_block::~gr_basic_block() { s_ncurrently_allocated--; - global_block_registry.block_unregister(this); } gr_basic_block_sptr @@ -66,112 +60,3 @@ gr_basic_block::to_basic_block() { return shared_from_this(); } - -void -gr_basic_block::set_block_alias(std::string name) -{ - global_block_registry.register_symbolic_name(this, name); -} - -// ** Message passing interface ** - -// - register a new input message port -void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){ - msg_queue[port_id] = msg_queue_t(); - } - -// - register a new output message port -void gr_basic_block::message_port_register_out(pmt::pmt_t port_id){ - if(!pmt::pmt_is_symbol(port_id)){ throw std::runtime_error("bad port id"); } - if(pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port already in use"); } - message_subscribers = pmt::pmt_dict_add(message_subscribers, port_id, pmt::PMT_NIL); - } - -// - publish a message on a message port -void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ throw std::runtime_error("port does not exist"); } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - // iterate through subscribers on port - while( pmt::pmt_is_pair(currlist) ){ - pmt::pmt_t target = pmt::pmt_car(currlist); - - pmt::pmt_t block = pmt::pmt_car(target); - pmt::pmt_t port = pmt::pmt_cdr(target); - - currlist = pmt::pmt_cdr(currlist); - gr_basic_block_sptr blk = global_block_registry.block_lookup(block); - //blk->post(msg); - blk->post(port, msg); - } - } - -// - subscribe to a message port -void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ - std::stringstream ss; - ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; - throw std::runtime_error(ss.str()); - } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target)); - } - -void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ - std::stringstream ss; - ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; - throw std::runtime_error(ss.str()); - } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target)); - } - -void -gr_basic_block::_post(pmt_t which_port, pmt_t msg) -{ - insert_tail(which_port, msg); - //notify_msg(); -} - -void -gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg) -{ - gruel::scoped_lock guard(mutex); - - msg_queue[which_port].push_back(msg); - - // wake up thread if BLKD_IN or BLKD_OUT - //input_cond.notify_one(); - //output_cond.notify_one(); - // TODO: reconsider the need for notification of input and output conditions! -} - -pmt_t -gr_basic_block::delete_head_nowait(pmt::pmt_t which_port) -{ - gruel::scoped_lock guard(mutex); - - if (empty_p(which_port)) - return pmt_t(); - - pmt_t m(msg_queue[which_port].front()); - msg_queue[which_port].pop_front(); - - return m; -} - -/* - * Caller must already be holding the mutex - */ -pmt_t -gr_basic_block::delete_head_nowait_already_holding_mutex(pmt::pmt_t which_port) -{ - if (empty_p(which_port)) - return pmt_t(); - - pmt_t m(msg_queue[which_port].front()); - msg_queue[which_port].pop_front(); - - return m; -} - diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index 31026a2e48..cb6a983c48 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -30,11 +30,7 @@ #include <boost/function.hpp> #include <gr_msg_accepter.h> #include <string> -#include <deque> -#include <map> #include <gr_io_signature.h> -#include <gruel/thread.h> -#include <boost/foreach.hpp> /*! * \brief The abstract base class for all signal processing blocks. @@ -58,23 +54,14 @@ private: * The thread-safety guarantees mentioned in set_msg_handler are implemented * by the callers of this method. */ - void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) + void dispatch_msg(pmt::pmt_t msg) { - // AA Update this - if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? - d_msg_handlers[which_port](msg); // Yes, invoke it. + if (d_msg_handler) // Is there a handler? + d_msg_handler(msg); // Yes, invoke it. }; - //msg_handler_t d_msg_handler; - typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t; - d_msg_handlers_t d_msg_handlers; + msg_handler_t d_msg_handler; - typedef std::deque<pmt::pmt_t> msg_queue_t; - typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; - msg_queue_map_t msg_queue; - gruel::mutex mutex; //< protects all vars - - protected: friend class gr_flowgraph; friend class gr_flat_flowgraph; // TODO: will be redundant @@ -86,9 +73,6 @@ protected: gr_io_signature_sptr d_input_signature; gr_io_signature_sptr d_output_signature; long d_unique_id; - long d_symbolic_id; - std::string d_symbol_name; - std::string d_symbol_alias; vcolor d_color; gr_basic_block(void){} //allows pure virtual interface sub-classes @@ -114,65 +98,13 @@ protected: void set_color(vcolor color) { d_color = color; } vcolor color() const { return d_color; } - // Message passing interface - std::vector<pmt::pmt_t> message_inputs; - pmt::pmt_t message_subscribers; - public: virtual ~gr_basic_block(); long unique_id() const { return d_unique_id; } - long symbolic_id() const { return d_symbolic_id; } std::string name() const { return d_name; } - std::string symbol_name() const { return d_symbol_name; } gr_io_signature_sptr input_signature() const { return d_input_signature; } gr_io_signature_sptr output_signature() const { return d_output_signature; } gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion - bool alias_set() { return !d_symbol_alias.empty(); } - std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); } - pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); } - void set_block_alias(std::string name); - - // ** Message passing interface ** - void message_port_register_in(pmt::pmt_t port_id); - void message_port_register_out(pmt::pmt_t port_id); - void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); - void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); - void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); - - /*! - * Accept msg, place in queue, arrange for thread to be awakened if it's not already. - */ - void _post(pmt::pmt_t which_port, pmt::pmt_t msg); - - - //! is the queue empty? - //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } - bool empty_p(pmt::pmt_t which_port) { return msg_queue[which_port].empty(); } - bool empty_p() { - bool rv = true; - BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); } - return rv; - } - - //| Acquires and release the mutex - void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - */ - pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - * Caller must already be holding the mutex - */ - pmt::pmt_t delete_head_nowait_already_holding_mutex( pmt::pmt_t which_port); - - msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ - return msg_queue[which_port].begin(); - } - void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){ - msg_queue[which_port].erase(it); - } - /*! * \brief Confirm that ninputs and noutputs is an acceptable combination. @@ -215,13 +147,8 @@ public: * If the block inherits from gr_hier_block2, the runtime system will * ensure that no reentrant calls are made to msg_handler. */ - //template <typename T> void set_msg_handler(T msg_handler){ - // d_msg_handler = msg_handler_t(msg_handler); - //} - template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){ - if(msg_queue.find(which_port) == msg_queue.end()){ - throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); } - d_msg_handlers[which_port] = msg_handler_t(msg_handler); + template <typename T> void set_msg_handler(T msg_handler){ + d_msg_handler = msg_handler_t(msg_handler); } }; diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.i b/gnuradio-core/src/lib/runtime/gr_basic_block.i index 0a8473ba27..e43cc114c9 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.i +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.i @@ -37,14 +37,11 @@ protected: public: virtual ~gr_basic_block(); std::string name() const; - std::string symbol_name() const; gr_io_signature_sptr input_signature() const; gr_io_signature_sptr output_signature() const; long unique_id() const; gr_basic_block_sptr to_basic_block(); bool check_topology (int ninputs, int noutputs); - std::string alias(); - void set_block_alias(std::string name); }; %rename(block_ncurrently_allocated) gr_basic_block_ncurrently_allocated; diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc index 337c9518ef..2792cd4713 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc @@ -127,6 +127,12 @@ gr_block_detail::produce_each (int how_many_items) } +void +gr_block_detail::_post(pmt_t msg) +{ + d_tpb.insert_tail(msg); +} + uint64_t gr_block_detail::nitems_read(unsigned int which_input) { diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h index 16d9f4d42e..c96f00db8b 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h @@ -85,6 +85,11 @@ class GR_CORE_API gr_block_detail { */ void produce_each (int how_many_items); + /*! + * Accept msg, place in queue, arrange for thread to be awakened if it's not already. + */ + void _post(pmt::pmt_t msg); + // Return the number of items read on input stream which_input uint64_t nitems_read(unsigned int which_input); diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.cc b/gnuradio-core/src/lib/runtime/gr_block_registry.cc deleted file mode 100644 index 2478e00196..0000000000 --- a/gnuradio-core/src/lib/runtime/gr_block_registry.cc +++ /dev/null @@ -1,58 +0,0 @@ -#include <gr_basic_block.h> -#include <gr_block_registry.h> - -gr_block_registry global_block_registry; - -gr_block_registry::gr_block_registry(){ - d_ref_map = pmt::pmt_make_dict(); -} - -long gr_block_registry::block_register(gr_basic_block* block){ - if(d_map.find(block->name()) == d_map.end()){ - d_map[block->name()] = blocksubmap_t(); - d_map[block->name()][0] = block; - return 0; - } else { - for(size_t i=0; i<=d_map[block->name()].size(); i++){ - if(d_map[block->name()].find(i) == d_map[block->name()].end()){ - d_map[block->name()][i] = block; - return i; - } - } - } - throw std::runtime_error("should not reach this"); -} - -void gr_block_registry::block_unregister(gr_basic_block* block){ - d_map[block->name()].erase( d_map[block->name()].find(block->symbolic_id())); - d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->symbol_name())); - if(block->alias_set()){ - d_ref_map = pmt::pmt_dict_delete(d_ref_map, pmt::pmt_intern(block->alias())); - } -} - -std::string gr_block_registry::register_symbolic_name(gr_basic_block* block){ - std::stringstream ss; - ss << block->name() << block->symbolic_id(); - //std::cout << "register_symbolic_name: " << ss.str() << std::endl; - register_symbolic_name(block, ss.str()); - return ss.str(); -} - -void gr_block_registry::register_symbolic_name(gr_basic_block* block, std::string name){ - if(pmt_dict_has_key(d_ref_map, pmt::pmt_intern(name))){ - throw std::runtime_error("symbol already exists, can not re-use!"); - } - d_ref_map = pmt_dict_add(d_ref_map, pmt::pmt_intern(name), pmt::pmt_make_any(block)); -} - -gr_basic_block_sptr gr_block_registry::block_lookup(pmt::pmt_t symbol){ - pmt::pmt_t ref = pmt_dict_ref(d_ref_map, symbol, pmt::PMT_NIL); - if(pmt::pmt_eq(ref, pmt::PMT_NIL)){ - throw std::runtime_error("block lookup failed! block not found!"); - } - gr_basic_block* blk = boost::any_cast<gr_basic_block*>( pmt::pmt_any_ref(ref) ); - return blk->shared_from_this(); -} - - diff --git a/gnuradio-core/src/lib/runtime/gr_block_registry.h b/gnuradio-core/src/lib/runtime/gr_block_registry.h deleted file mode 100644 index 8f1982984f..0000000000 --- a/gnuradio-core/src/lib/runtime/gr_block_registry.h +++ /dev/null @@ -1,36 +0,0 @@ -#ifndef GR_BLOCK_REGISTRY_H -#define GR_BLOCK_REGISTRY_H - -#include <map> - -#ifndef GR_BASIC_BLOCK_H -class gr_basic_block; -#endif - -class gr_block_registry { - public: - gr_block_registry(); - - long block_register(gr_basic_block* block); - void block_unregister(gr_basic_block* block); - - std::string register_symbolic_name(gr_basic_block* block); - void register_symbolic_name(gr_basic_block* block, std::string name); - - gr_basic_block_sptr block_lookup(pmt::pmt_t symbol); - - private: - - //typedef std::map< long, gr_basic_block_sptr > blocksubmap_t; - typedef std::map< long, gr_basic_block* > blocksubmap_t; - typedef std::map< std::string, blocksubmap_t > blockmap_t; - - blockmap_t d_map; - pmt::pmt_t d_ref_map; - -}; - -extern gr_block_registry global_block_registry; - -#endif - diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc index a19bfe1954..756852df80 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc @@ -81,36 +81,6 @@ gr_hier_block2::connect(gr_basic_block_sptr src, int src_port, } void -gr_hier_block2::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport) -{ - if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); } - d_detail->msg_connect(src, srcport, dst, dstport); -} - -void -gr_hier_block2::msg_connect(gr_basic_block_sptr src, std::string srcport, - gr_basic_block_sptr dst, std::string dstport) -{ - d_detail->msg_connect(src, pmt::mp(srcport), dst, pmt::mp(dstport)); -} - -void -gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport) -{ - if(!pmt::pmt_is_symbol(srcport)){throw std::runtime_error("bad port id"); } - d_detail->msg_disconnect(src, srcport, dst, dstport); -} - -void -gr_hier_block2::msg_disconnect(gr_basic_block_sptr src, std::string srcport, - gr_basic_block_sptr dst, std::string dstport) -{ - d_detail->msg_disconnect(src, pmt::mp(srcport), dst, pmt::mp(dstport)); -} - -void gr_hier_block2::disconnect(gr_basic_block_sptr block) { d_detail->disconnect(block); diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h index e8364a740b..1231787241 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h @@ -98,21 +98,6 @@ public: gr_basic_block_sptr dst, int dst_port); /*! - * \brief Add gr-blocks or hierarchical blocks to internal graph and wire together - * - * This adds (if not done earlier by another connect) a pair of gr-blocks or - * hierarchical blocks to the internal message port subscription - */ - void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport); - void msg_connect(gr_basic_block_sptr src, std::string srcport, - gr_basic_block_sptr dst, std::string dstport); - void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport); - void msg_disconnect(gr_basic_block_sptr src, std::string srcport, - gr_basic_block_sptr dst, std::string dstport); - - /*! * \brief Remove a gr-block or hierarchical block from the internal flowgraph. * * This removes a gr-block or hierarchical block from the internal flowgraph, diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i index 7c0e62f288..eefb965b4e 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i @@ -38,8 +38,6 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name, // better interface in scripting land. %rename(primitive_connect) gr_hier_block2::connect; %rename(primitive_disconnect) gr_hier_block2::disconnect; -%rename(primitive_msg_connect) gr_hier_block2::msg_connect; -%rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect; class gr_hier_block2 : public gr_basic_block { @@ -56,19 +54,6 @@ public: void connect(gr_basic_block_sptr src, int src_port, gr_basic_block_sptr dst, int dst_port) throw (std::invalid_argument); - void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport) - throw (std::runtime_error); - void msg_connect(gr_basic_block_sptr src, std::string srcport, - gr_basic_block_sptr dst, std::string dstport) - throw (std::runtime_error); - void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport) - throw (std::runtime_error); - void msg_disconnect(gr_basic_block_sptr src, std::string srcport, - gr_basic_block_sptr dst, std::string dstport) - throw (std::runtime_error); - void disconnect(gr_basic_block_sptr block) throw (std::invalid_argument); void disconnect(gr_basic_block_sptr src, int src_port, diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc index 099b2f8e88..76c5ce06fe 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc @@ -144,28 +144,6 @@ gr_hier_block2_detail::connect(gr_basic_block_sptr src, int src_port, } void -gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport) -{ - if (GR_HIER_BLOCK2_DETAIL_DEBUG) - std::cout << "connecting message port..." << std::endl; - - // register the subscription - src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); -} - -void -gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport) -{ - if (GR_HIER_BLOCK2_DETAIL_DEBUG) - std::cout << "disconnecting message port..." << std::endl; - - // register the subscription - src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); -} - -void gr_hier_block2_detail::disconnect(gr_basic_block_sptr block) { // Check on singleton list diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h index f2d2b3c4e8..f4f950e9de 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h @@ -39,10 +39,6 @@ public: void connect(gr_basic_block_sptr block); void connect(gr_basic_block_sptr src, int src_port, gr_basic_block_sptr dst, int dst_port); - void msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport); - void msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcport, - gr_basic_block_sptr dst, pmt::pmt_t dstport); void disconnect(gr_basic_block_sptr block); void disconnect(gr_basic_block_sptr, int src_port, gr_basic_block_sptr, int dst_port); diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc index 93d5fb20e8..5018ee9e69 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc @@ -41,12 +41,12 @@ gr_msg_accepter::~gr_msg_accepter() } void -gr_msg_accepter::post(pmt_t which_port, pmt_t msg) +gr_msg_accepter::post(pmt_t msg) { // Notify derived class, handled case by case gr_block *p = dynamic_cast<gr_block *>(this); if (p) { - p->_post(which_port,msg); + p->detail()->_post(msg); return; } gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this); diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h index a497ba6e7d..3e5c976388 100644 --- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h +++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h @@ -36,7 +36,7 @@ public: gr_msg_accepter(); ~gr_msg_accepter(); - void post(pmt::pmt_t which_port, pmt::pmt_t msg); + void post(pmt::pmt_t msg); }; diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc index 46eb6bbe0d..46b33d91fd 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc @@ -68,3 +68,43 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d) notify_upstream(d); } +void +gr_tpb_detail::insert_tail(pmt::pmt_t msg) +{ + gruel::scoped_lock guard(mutex); + + msg_queue.push_back(msg); + + // wake up thread if BLKD_IN or BLKD_OUT + input_cond.notify_one(); + output_cond.notify_one(); +} + +pmt_t +gr_tpb_detail::delete_head_nowait() +{ + gruel::scoped_lock guard(mutex); + + if (empty_p()) + return pmt_t(); + + pmt_t m(msg_queue.front()); + msg_queue.pop_front(); + + return m; +} + +/* + * Caller must already be holding the mutex + */ +pmt_t +gr_tpb_detail::delete_head_nowait_already_holding_mutex() +{ + if (empty_p()) + return pmt_t(); + + pmt_t m(msg_queue.front()); + msg_queue.pop_front(); + + return m; +} diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h index 69feb60073..b6e342dee3 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h @@ -39,6 +39,9 @@ struct GR_CORE_API gr_tpb_detail { bool output_changed; gruel::condition_variable output_cond; +private: + std::deque<pmt::pmt_t> msg_queue; + public: gr_tpb_detail() : input_changed(false), output_changed(false) { } @@ -52,12 +55,6 @@ public: //! Called by us to notify both upstream and downstream void notify_neighbors(gr_block_detail *d); - //! Called by pmt msg posters - void notify_msg(){ - input_cond.notify_one(); - output_cond.notify_one(); - } - //! Called by us void clear_changed() { @@ -66,6 +63,23 @@ public: output_changed = false; } + //! is the queue empty? + bool empty_p() const { return msg_queue.empty(); } + + //| Acquires and release the mutex + void insert_tail(pmt::pmt_t msg); + + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_nowait(); + + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + * Caller must already be holding the mutex + */ + pmt::pmt_t delete_head_nowait_already_holding_mutex(); + private: //! Used by notify_downstream diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index 1bd3014adc..a5aabb379f 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -25,7 +25,6 @@ #include <iostream> #include <boost/thread.hpp> #include <gruel/pmt.h> -#include <boost/foreach.hpp> using namespace pmt; @@ -43,14 +42,8 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item boost::this_thread::interruption_point(); // handle any queued up messages - //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() ) - - BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) - { - while ((msg = block->delete_head_nowait(i.first))){ - block->dispatch_msg(i.first,msg); - } - } + while ((msg = d->d_tpb.delete_head_nowait())) + block->dispatch_msg(msg); d->d_tpb.clear_changed(); s = d_exec.run_one_iteration(); @@ -74,18 +67,15 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item while (!d->d_tpb.input_changed){ // wait for input or message - while(!d->d_tpb.input_changed && block->empty_p()) + while(!d->d_tpb.input_changed && d->d_tpb.empty_p()) d->d_tpb.input_cond.wait(guard); // handle all pending messages - BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) - { - while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){ - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); - } - } + while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){ + guard.unlock(); // release lock while processing msg + block->dispatch_msg(msg); + guard.lock(); + } } } break; @@ -97,18 +87,15 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item while (!d->d_tpb.output_changed){ // wait for output room or message - while(!d->d_tpb.output_changed && block->empty_p()) + while(!d->d_tpb.output_changed && d->d_tpb.empty_p()) d->d_tpb.output_cond.wait(guard); // handle all pending messages - BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue ) - { - while ((msg = block->delete_head_nowait_already_holding_mutex(i.first))){ - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first,msg); - guard.lock(); - } - } + while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){ + guard.unlock(); // release lock while processing msg + block->dispatch_msg(msg); + guard.lock(); + } } } break; diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc index dc8f0f8a95..25ae0b1e19 100644 --- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc +++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc @@ -65,15 +65,14 @@ void qa_set_msg_handler::t0() tb->start(); // Send them... - pmt_t port(pmt_intern("port")); for (int i = 0; i < NMSGS; i++){ - send(nop, port, mp(mp("example-msg"), mp(i))); + send(nop, mp(mp("example-msg"), mp(i))); } // And send a message to null_source to confirm that the default // message handling action (which should be a nop) doesn't dump // core. - send(src, port, mp(mp("example-msg"), mp(0))); + send(src, mp(mp("example-msg"), mp(0))); // Give the messages a chance to be processed boost::this_thread::sleep(boost::posix_time::milliseconds(100)); diff --git a/gnuradio-core/src/python/gnuradio/gr/top_block.py b/gnuradio-core/src/python/gnuradio/gr/top_block.py index 947e46bc55..2d315a2780 100644 --- a/gnuradio-core/src/python/gnuradio/gr/top_block.py +++ b/gnuradio-core/src/python/gnuradio/gr/top_block.py @@ -129,12 +129,6 @@ class top_block(object): for i in range (1, len (points)): self._connect(points[i-1], points[i]) - def msg_connect(self, src, srcport, dst, dstport): - self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); - - def msg_disconnect(self, src, srcport, dst, dstport): - self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); - def _connect(self, src, dst): (src_block, src_port) = self._coerce_endpoint(src) (dst_block, dst_port) = self._coerce_endpoint(dst) diff --git a/grc/python/Connection.py b/grc/python/Connection.py index 341dd2d821..218baf0743 100644 --- a/grc/python/Connection.py +++ b/grc/python/Connection.py @@ -31,9 +31,6 @@ class Connection(_Connection, _GUIConnection): def is_msg(self): return self.get_source().get_type() == self.get_sink().get_type() == 'msg' - def is_message(self): - return self.get_source().get_type() == self.get_sink().get_type() == 'message' - def validate(self): """ Validate the connections. diff --git a/grc/python/Constants.py b/grc/python/Constants.py index 09c3081967..1a65caf1c0 100644 --- a/grc/python/Constants.py +++ b/grc/python/Constants.py @@ -58,7 +58,6 @@ CORE_TYPES = ( #name, key, sizeof, color ('Integer 16', 's16', 2, '#FFFF66'), ('Integer 8', 's8', 1, '#FF66FF'), ('Message Queue', 'msg', 0, '#777777'), - ('Async Message', 'message', 0, '#777777'), ('Wildcard', '', 0, '#FFFFFF'), ) diff --git a/grc/python/Generator.py b/grc/python/Generator.py index 912ce13752..f8490227a7 100644 --- a/grc/python/Generator.py +++ b/grc/python/Generator.py @@ -122,9 +122,8 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''') #list of regular blocks (all blocks minus the special ones) blocks = filter(lambda b: b not in (imports + parameters), blocks) #list of connections where each endpoint is enabled - connections = filter(lambda c: not (c.is_msg() or c.is_message()), self._flow_graph.get_enabled_connections()) + connections = filter(lambda c: not c.is_msg(), self._flow_graph.get_enabled_connections()) messages = filter(lambda c: c.is_msg(), self._flow_graph.get_enabled_connections()) - messages2 = filter(lambda c: c.is_message(), self._flow_graph.get_enabled_connections()) #list of variable names var_ids = [var.get_id() for var in parameters + variables] #prepend self. @@ -149,7 +148,6 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''') 'blocks': blocks, 'connections': connections, 'messages': messages, - 'messages2': messages2, 'generate_options': self._generate_options, 'var_id2cbs': var_id2cbs, } diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl index 086c478c6a..6e504815a6 100644 --- a/grc/python/flow_graph.tmpl +++ b/grc/python/flow_graph.tmpl @@ -192,17 +192,6 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))]) self.connect($make_port_sig($source), $make_port_sig($sink)) #end if #end for -######################################################## -##Create Asynch Message Connections -######################################################## -#if $messages2 - $DIVIDER - # Asynch Message Connections - $DIVIDER -#end if -#for $msg in $messages2 - self.msg_connect(self.$msg.get_source().get_parent().get_id(), "$msg.get_source().get_name()", self.$msg.get_sink().get_parent().get_id(), "$msg.get_sink().get_name()") -#end for ######################################################## # QT sink close method reimplementation diff --git a/gruel/src/include/gruel/msg_accepter.h b/gruel/src/include/gruel/msg_accepter.h index 65abd5a6b8..2dc1a68592 100644 --- a/gruel/src/include/gruel/msg_accepter.h +++ b/gruel/src/include/gruel/msg_accepter.h @@ -43,7 +43,7 @@ namespace gruel { * call will not wait for the message either to arrive at the * destination or to be received. */ - virtual void post(pmt::pmt_t which_port, pmt::pmt_t msg) = 0; + virtual void post(pmt::pmt_t msg) = 0; }; typedef boost::shared_ptr<msg_accepter> msg_accepter_sptr; diff --git a/gruel/src/include/gruel/msg_passing.h b/gruel/src/include/gruel/msg_passing.h index 7230dfc5b2..0cc0cd1111 100644 --- a/gruel/src/include/gruel/msg_passing.h +++ b/gruel/src/include/gruel/msg_passing.h @@ -45,9 +45,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(msg_accepter_sptr accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) + send(msg_accepter_sptr accepter, const pmt::pmt_t &msg) { - accepter->post(which_port, msg); + accepter->post(msg); return msg; } @@ -64,9 +64,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(msg_accepter *accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) + send(msg_accepter *accepter, const pmt::pmt_t &msg) { - accepter->post(which_port, msg); + accepter->post(msg); return msg; } @@ -83,9 +83,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(msg_accepter &accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) + send(msg_accepter &accepter, const pmt::pmt_t &msg) { - accepter.post(which_port, msg); + accepter.post(msg); return msg; } @@ -102,9 +102,9 @@ namespace gruel { * \returns msg */ static inline pmt::pmt_t - send(pmt::pmt_t accepter, const pmt::pmt_t &which_port, const pmt::pmt_t &msg) + send(pmt::pmt_t accepter, const pmt::pmt_t &msg) { - return send(pmt_msg_accepter_ref(accepter), which_port, msg); + return send(pmt_msg_accepter_ref(accepter), msg); } } /* namespace gruel */ diff --git a/gruel/src/include/gruel/pmt.h b/gruel/src/include/gruel/pmt.h index a462155c5f..1e8b38627f 100644 --- a/gruel/src/include/gruel/pmt.h +++ b/gruel/src/include/gruel/pmt.h @@ -729,10 +729,6 @@ GRUEL_API pmt_t pmt_list6(const pmt_t& x1, const pmt_t& x2, const pmt_t& x3, con */ GRUEL_API pmt_t pmt_list_add(pmt_t list, const pmt_t& item); -/*! - * \brief Return \p list with \p item removed from it. - */ -GRUEL_API pmt_t pmt_list_rm(pmt_t list, const pmt_t& item); /* * ------------------------------------------------------------------------ @@ -809,15 +805,6 @@ GRUEL_API std::string pmt_serialize_str(pmt_t obj); */ GRUEL_API pmt_t pmt_deserialize_str(std::string str); -/*! - * \brief Provide a comparator function object to allow pmt use in stl types - */ -class pmt_comperator { - public: - bool operator()(pmt::pmt_t const& p1, pmt::pmt_t const& p2) const - { return pmt::pmt_eqv(p1,p2)?false:p1.get()>p2.get(); } - }; - } /* namespace pmt */ #include <gruel/pmt_sugar.h> diff --git a/gruel/src/lib/pmt/pmt.cc b/gruel/src/lib/pmt/pmt.cc index 3eb39ed7b1..1d9125d4e0 100644 --- a/gruel/src/lib/pmt/pmt.cc +++ b/gruel/src/lib/pmt/pmt.cc @@ -1325,22 +1325,6 @@ pmt_list_add(pmt_t list, const pmt_t& item) } pmt_t -pmt_list_rm(pmt_t list, const pmt_t& item) -{ - if(pmt_is_pair(list)){ - pmt_t left = pmt_car(list); - pmt_t right = pmt_cdr(list); - if(!pmt_equal(left, item)){ - return pmt_cons(left, pmt_list_rm(right, item)); - } else { - return pmt_list_rm(right, item); - } - } else { - return list; - } -} - -pmt_t pmt_caar(pmt_t pair) { return (pmt_car(pmt_car(pair))); diff --git a/gruel/src/lib/pmt/qa_pmt_prims.cc b/gruel/src/lib/pmt/qa_pmt_prims.cc index 1bf5fcfb16..6212b8ea4f 100644 --- a/gruel/src/lib/pmt/qa_pmt_prims.cc +++ b/gruel/src/lib/pmt/qa_pmt_prims.cc @@ -472,7 +472,7 @@ class qa_pmt_msg_accepter_nop : public gruel::msg_accepter { public: qa_pmt_msg_accepter_nop(){}; ~qa_pmt_msg_accepter_nop(); - void post(pmt_t,pmt_t){}; + void post(pmt_t){}; }; qa_pmt_msg_accepter_nop::~qa_pmt_msg_accepter_nop(){} @@ -495,10 +495,9 @@ qa_pmt_prims::test_msg_accepter() CPPUNIT_ASSERT_THROW(pmt_msg_accepter_ref(p0), pmt_wrong_type); // just confirm interfaces on send are OK - pmt_t port(pmt_intern("port")); - gruel::send(ma0.get(), port, sym); - gruel::send(ma0, port, sym); - gruel::send(p1, port, sym); + gruel::send(ma0.get(), sym); + gruel::send(ma0, sym); + gruel::send(p1, sym); } diff --git a/gruel/src/swig/pmt_swig.i b/gruel/src/swig/pmt_swig.i index d46143424b..45cfceadcb 100644 --- a/gruel/src/swig/pmt_swig.i +++ b/gruel/src/swig/pmt_swig.i @@ -696,10 +696,6 @@ pmt_t pmt_list6(const pmt_t& x1, const pmt_t& x2, const pmt_t& x3, const pmt_t& */ pmt_t pmt_list_add(pmt_t list, const pmt_t& item); -/*! - * \brief Return \p list with \p item removed - */ -pmt_t pmt_list_rm(pmt_t list, const pmt_t& item); /* * ------------------------------------------------------------------------ |