/* -*- c++ -*- */ /* * Copyright 2006,2012-2013 Free Software Foundation, Inc. * * This file is part of GNU Radio * * SPDX-License-Identifier: GPL-3.0-or-later * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include <gnuradio/basic_block.h> #include <gnuradio/block_registry.h> #include <gnuradio/logger.h> #include <iostream> #include <sstream> #include <stdexcept> namespace gr { static long s_next_id = 0; static long s_ncurrently_allocated = 0; long basic_block_ncurrently_allocated() { return s_ncurrently_allocated; } basic_block::basic_block(const std::string& name, io_signature::sptr input_signature, io_signature::sptr output_signature) : d_name(name), d_input_signature(input_signature), d_output_signature(output_signature), d_unique_id(s_next_id++), d_symbolic_id(global_block_registry.block_register(this)), d_symbol_name(global_block_registry.register_symbolic_name(this)), d_color(WHITE), d_rpc_set(false), d_message_subscribers(pmt::make_dict()) { configure_default_loggers(d_logger, d_debug_logger, d_symbol_name); s_ncurrently_allocated++; } basic_block::~basic_block() { s_ncurrently_allocated--; global_block_registry.block_unregister(this); } basic_block_sptr basic_block::to_basic_block() { return shared_from_this(); } void basic_block::set_block_alias(std::string name) { // Only keep one alias'd name around for each block. If we don't // have an alias, add it; if we do, update the entry in the // registry. if (alias_set()) global_block_registry.update_symbolic_name(this, name); else global_block_registry.register_symbolic_name(this, name); // set the block's alias d_symbol_alias = name; update_logger_alias(symbol_name(), d_symbol_alias); } // ** Message passing interface ** // - register a new input message port void basic_block::message_port_register_in(pmt::pmt_t port_id) { if (!pmt::is_symbol(port_id)) { throw std::runtime_error("message_port_register_in: bad port id"); } msg_queue[port_id] = msg_queue_t(); msg_queue_ready[port_id] = std::shared_ptr<boost::condition_variable>(new boost::condition_variable()); } pmt::pmt_t basic_block::message_ports_in() { pmt::pmt_t port_names = pmt::make_vector(msg_queue.size(), pmt::PMT_NIL); msg_queue_map_itr itr = msg_queue.begin(); for (size_t i = 0; i < msg_queue.size(); i++) { pmt::vector_set(port_names, i, (*itr).first); itr++; } return port_names; } // - register a new output message port void basic_block::message_port_register_out(pmt::pmt_t port_id) { if (!pmt::is_symbol(port_id)) { throw std::runtime_error("message_port_register_out: bad port id"); } if (pmt::dict_has_key(d_message_subscribers, port_id)) { throw std::runtime_error("message_port_register_out: port already in use"); } d_message_subscribers = pmt::dict_add(d_message_subscribers, port_id, pmt::PMT_NIL); } pmt::pmt_t basic_block::message_ports_out() { size_t len = pmt::length(d_message_subscribers); pmt::pmt_t port_names = pmt::make_vector(len, pmt::PMT_NIL); pmt::pmt_t keys = pmt::dict_keys(d_message_subscribers); for (size_t i = 0; i < len; i++) { pmt::vector_set(port_names, i, pmt::nth(i, keys)); } return port_names; } // - publish a message on a message port void basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg) { if (!pmt::dict_has_key(d_message_subscribers, port_id)) { throw std::runtime_error("port does not exist"); } pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL); // iterate through subscribers on port while (pmt::is_pair(currlist)) { pmt::pmt_t target = pmt::car(currlist); pmt::pmt_t block = pmt::car(target); pmt::pmt_t port = pmt::cdr(target); currlist = pmt::cdr(currlist); basic_block_sptr blk = global_block_registry.block_lookup(block); // blk->post(msg); blk->post(port, msg); } } // - subscribe to a message port void basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target) { if (!pmt::dict_has_key(d_message_subscribers, port_id)) { std::stringstream ss; ss << "Port does not exist: \"" << pmt::write_string(port_id) << "\" on block: " << pmt::write_string(target) << std::endl; throw std::runtime_error(ss.str()); } pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL); // ignore re-adds of the same target if (!pmt::list_has(currlist, target)) d_message_subscribers = pmt::dict_add( d_message_subscribers, port_id, pmt::list_add(currlist, target)); } void basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target) { if (!pmt::dict_has_key(d_message_subscribers, port_id)) { std::stringstream ss; ss << "Port does not exist: \"" << pmt::write_string(port_id) << "\" on block: " << pmt::write_string(target) << std::endl; throw std::runtime_error(ss.str()); } // ignore unsubs of unknown targets pmt::pmt_t currlist = pmt::dict_ref(d_message_subscribers, port_id, pmt::PMT_NIL); d_message_subscribers = pmt::dict_add(d_message_subscribers, port_id, pmt::list_rm(currlist, target)); } void basic_block::_post(pmt::pmt_t which_port, pmt::pmt_t msg) { insert_tail(which_port, msg); } void basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg) { gr::thread::scoped_lock guard(mutex); if ((msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())) { GR_LOG_ERROR(d_logger, std::string("attempted insertion on invalid queue ") + pmt::symbol_to_string(which_port)); throw std::runtime_error("attempted to insert_tail on invalid queue!"); } msg_queue[which_port].push_back(msg); msg_queue_ready[which_port]->notify_one(); // wake up thread if BLKD_IN or BLKD_OUT global_block_registry.notify_blk(d_symbol_name); } pmt::pmt_t basic_block::delete_head_nowait(pmt::pmt_t which_port) { gr::thread::scoped_lock guard(mutex); if (empty_p(which_port)) { return pmt::pmt_t(); } pmt::pmt_t m(msg_queue[which_port].front()); msg_queue[which_port].pop_front(); return m; } pmt::pmt_t basic_block::message_subscribers(pmt::pmt_t port) { return pmt::dict_ref(d_message_subscribers, port, pmt::PMT_NIL); } } /* namespace gr */