From 92cfb0240005675f4e7a55a81552f4c7a5128cd8 Mon Sep 17 00:00:00 2001 From: Tim O'Shea <tim.oshea753@gmail.com> Date: Wed, 28 Nov 2012 15:15:58 -0800 Subject: core: adding msg_connect, updating msg interface, adding symbolic block names --- grc/python/Connection.py | 3 +++ grc/python/Constants.py | 1 + grc/python/Generator.py | 4 +++- grc/python/flow_graph.tmpl | 11 +++++++++++ 4 files changed, 18 insertions(+), 1 deletion(-) (limited to 'grc') diff --git a/grc/python/Connection.py b/grc/python/Connection.py index 218baf0743..341dd2d821 100644 --- a/grc/python/Connection.py +++ b/grc/python/Connection.py @@ -31,6 +31,9 @@ class Connection(_Connection, _GUIConnection): def is_msg(self): return self.get_source().get_type() == self.get_sink().get_type() == 'msg' + def is_message(self): + return self.get_source().get_type() == self.get_sink().get_type() == 'message' + def validate(self): """ Validate the connections. diff --git a/grc/python/Constants.py b/grc/python/Constants.py index 1a65caf1c0..09c3081967 100644 --- a/grc/python/Constants.py +++ b/grc/python/Constants.py @@ -58,6 +58,7 @@ CORE_TYPES = ( #name, key, sizeof, color ('Integer 16', 's16', 2, '#FFFF66'), ('Integer 8', 's8', 1, '#FF66FF'), ('Message Queue', 'msg', 0, '#777777'), + ('Async Message', 'message', 0, '#777777'), ('Wildcard', '', 0, '#FFFFFF'), ) diff --git a/grc/python/Generator.py b/grc/python/Generator.py index 2a6fe51d5d..616ea00fcb 100644 --- a/grc/python/Generator.py +++ b/grc/python/Generator.py @@ -116,8 +116,9 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''') #list of regular blocks (all blocks minus the special ones) blocks = filter(lambda b: b not in (imports + parameters), blocks) #list of connections where each endpoint is enabled - connections = filter(lambda c: not c.is_msg(), self._flow_graph.get_enabled_connections()) + connections = filter(lambda c: not (c.is_msg() or c.is_message()), self._flow_graph.get_enabled_connections()) messages = filter(lambda c: c.is_msg(), self._flow_graph.get_enabled_connections()) + messages2 = filter(lambda c: c.is_message(), self._flow_graph.get_enabled_connections()) #list of variable names var_ids = [var.get_id() for var in parameters + variables] #prepend self. @@ -142,6 +143,7 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''') 'blocks': blocks, 'connections': connections, 'messages': messages, + 'messages2': messages2, 'generate_options': self._generate_options, 'var_id2cbs': var_id2cbs, } diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl index 17feb01f65..af55ad641a 100644 --- a/grc/python/flow_graph.tmpl +++ b/grc/python/flow_graph.tmpl @@ -189,6 +189,17 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))]) self.connect($make_port_sig($source), $make_port_sig($sink)) #end if #end for +######################################################## +##Create Asynch Message Connections +######################################################## +#if $messages2 + $DIVIDER + # Asynch Message Connections + $DIVIDER +#end if +#for $msg in $messages2 + self.msg_connect(self.$msg.get_source().get_parent().get_id(), "$msg.get_source().get_name()", self.$msg.get_sink().get_parent().get_id(), "$msg.get_sink().get_name()") +#end for ######################################################## ##Create Callbacks -- cgit v1.2.3 From 6cc818260128df57c51a41e4e6aa459de5faf4fe Mon Sep 17 00:00:00 2001 From: Tim O'Shea <tim.oshea753@gmail.com> Date: Fri, 30 Nov 2012 22:31:43 -0800 Subject: core: gr_blocks can now have only message ports with no general_work() * msg only blocks now get thread context * added blocking msg queue delete call * added gr_message_strobe block * added grc definitions for message_debug, message_strobe, pdu_to_tagged_stream, tagged_stream_to_pdu. * allow message fan-in connections in GRC --- gnuradio-core/src/lib/general/CMakeLists.txt | 1 + gnuradio-core/src/lib/general/general.i | 2 + gnuradio-core/src/lib/general/gr_message_strobe.cc | 83 ++++++++++++++++++++++ gnuradio-core/src/lib/general/gr_message_strobe.h | 65 +++++++++++++++++ gnuradio-core/src/lib/general/gr_message_strobe.i | 30 ++++++++ gnuradio-core/src/lib/io/gr_message_debug.cc | 4 +- .../src/lib/io/gr_pdu_to_tagged_stream.cc | 7 +- gnuradio-core/src/lib/runtime/gr_basic_block.cc | 29 ++++++-- gnuradio-core/src/lib/runtime/gr_basic_block.h | 9 +++ gnuradio-core/src/lib/runtime/gr_block.cc | 9 +++ gnuradio-core/src/lib/runtime/gr_block.h | 2 +- gnuradio-core/src/lib/runtime/gr_flowgraph.cc | 9 +++ gnuradio-core/src/lib/runtime/gr_flowgraph.h | 3 + .../src/lib/runtime/gr_hier_block2_detail.cc | 9 +++ .../src/lib/runtime/gr_tpb_thread_body.cc | 9 ++- .../src/lib/runtime/qa_set_msg_handler.cc | 5 -- grc/blocks/block_tree.xml | 7 ++ grc/blocks/gr_message_debug.xml | 17 +++++ grc/blocks/gr_message_strobe.xml | 35 +++++++++ grc/blocks/gr_pdu_to_tagged_stream.xml | 40 +++++++++++ grc/blocks/gr_tagged_stream_to_pdu.xml | 40 +++++++++++ grc/python/Constants.py | 2 +- grc/python/Port.py | 2 +- gruel/src/include/gruel/msg_accepter.h | 2 +- 24 files changed, 401 insertions(+), 20 deletions(-) create mode 100644 gnuradio-core/src/lib/general/gr_message_strobe.cc create mode 100644 gnuradio-core/src/lib/general/gr_message_strobe.h create mode 100644 gnuradio-core/src/lib/general/gr_message_strobe.i create mode 100644 grc/blocks/gr_message_debug.xml create mode 100644 grc/blocks/gr_message_strobe.xml create mode 100644 grc/blocks/gr_pdu_to_tagged_stream.xml create mode 100644 grc/blocks/gr_tagged_stream_to_pdu.xml (limited to 'grc') diff --git a/gnuradio-core/src/lib/general/CMakeLists.txt b/gnuradio-core/src/lib/general/CMakeLists.txt index 074f583a74..4c99acfc36 100644 --- a/gnuradio-core/src/lib/general/CMakeLists.txt +++ b/gnuradio-core/src/lib/general/CMakeLists.txt @@ -299,6 +299,7 @@ set(gr_core_general_triple_threats gr_burst_tagger gr_correlate_access_code_tag_bb gr_tag_debug + gr_message_strobe ) foreach(file_tt ${gr_core_general_triple_threats}) diff --git a/gnuradio-core/src/lib/general/general.i b/gnuradio-core/src/lib/general/general.i index e5a9e970dd..1446088a2c 100644 --- a/gnuradio-core/src/lib/general/general.i +++ b/gnuradio-core/src/lib/general/general.i @@ -143,6 +143,7 @@ #include <gr_add_ff.h> #include <gr_vector_map.h> #include <gr_tag_debug.h> +#include <gr_message_strobe.h> %} %include "gri_control_loop.i" @@ -267,3 +268,4 @@ %include "gr_vector_map.i" %include "gr_tag_debug.i" %include "gr_block_gateway.i" +%include "gr_message_strobe.i" diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.cc b/gnuradio-core/src/lib/general/gr_message_strobe.cc new file mode 100644 index 0000000000..371f472efd --- /dev/null +++ b/gnuradio-core/src/lib/general/gr_message_strobe.cc @@ -0,0 +1,83 @@ +/* -*- c++ -*- */ +/* + * Copyright 2005,2010 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gr_message_strobe.h> +#include <gr_io_signature.h> +#include <cstdio> +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdexcept> +#include <string.h> +#include <iostream> + +// public constructor that returns a shared_ptr + +gr_message_strobe_sptr +gr_make_message_strobe (pmt::pmt_t msg, float period_ms) +{ + return gnuradio::get_initial_sptr(new gr_message_strobe(msg, period_ms)); +} + +gr_message_strobe::gr_message_strobe (pmt::pmt_t msg, float period_ms) + : gr_sync_block("message_strobe", + gr_make_io_signature(0, 0, 0), + gr_make_io_signature(0, 0, 0)), + d_finished(false), + d_period_ms(period_ms), + d_msg(msg) +{ + message_port_register_out(pmt::mp("strobe")); + d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_message_strobe::run, this))); + + message_port_register_in(pmt::mp("set_msg")); + set_msg_handler(pmt::mp("set_msg"), boost::bind(&gr_message_strobe::set_msg, this, _1)); +} + +gr_message_strobe::~gr_message_strobe() +{ + d_finished = true; + d_thread->interrupt(); + d_thread->join(); +} + +void gr_message_strobe::run(){ + while(!d_finished) { + boost::this_thread::sleep(boost::posix_time::milliseconds(d_period_ms)); + if(d_finished){ return; } +// std::cout << "strobing...\n"; + message_port_pub( pmt::mp("strobe"), d_msg ); + } +} + +int +gr_message_strobe::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) +{ + return 0; // FIXME: replace with default NOP work function in gr_block +} diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.h b/gnuradio-core/src/lib/general/gr_message_strobe.h new file mode 100644 index 0000000000..a5151a30b2 --- /dev/null +++ b/gnuradio-core/src/lib/general/gr_message_strobe.h @@ -0,0 +1,65 @@ +/* -*- c++ -*- */ +/* + * Copyright 2005 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_MESSAGE_STROBE_H +#define INCLUDED_GR_MESSAGE_STROBE_H + +#include <gr_core_api.h> +#include <gr_sync_block.h> +#include <gr_message.h> +#include <gr_msg_queue.h> + +class gr_message_strobe; +typedef boost::shared_ptr<gr_message_strobe> gr_message_strobe_sptr; + +GR_CORE_API gr_message_strobe_sptr gr_make_message_strobe (pmt::pmt_t msg, float period_ms); + +/*! + * \brief Gather received items into messages and insert into msgq + * \ingroup sink_blk + */ +class GR_CORE_API gr_message_strobe : public gr_sync_block +{ + private: + friend GR_CORE_API gr_message_strobe_sptr + gr_make_message_strobe(pmt::pmt_t msg, float period_ms); + boost::shared_ptr<boost::thread> d_thread; + bool d_finished; + float d_period_ms; + pmt::pmt_t d_msg; + + void run(); + + protected: + gr_message_strobe (pmt::pmt_t msg, float period_ms); + + public: + ~gr_message_strobe (); + + void set_msg(pmt::pmt_t msg){ d_msg = msg; } + + int work (int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); +}; + +#endif /* INCLUDED_GR_MESSAGE_STROBE_H */ diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.i b/gnuradio-core/src/lib/general/gr_message_strobe.i new file mode 100644 index 0000000000..490aa8e8a1 --- /dev/null +++ b/gnuradio-core/src/lib/general/gr_message_strobe.i @@ -0,0 +1,30 @@ +/* -*- c++ -*- */ +/* + * Copyright 2005 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +GR_SWIG_BLOCK_MAGIC(gr,message_strobe); + +%{ +#include <gr_message_strobe.h> +%} + +%include "gr_message_strobe.h" + diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc index 84c11c46eb..99f4a1f7b8 100644 --- a/gnuradio-core/src/lib/io/gr_message_debug.cc +++ b/gnuradio-core/src/lib/io/gr_message_debug.cc @@ -44,8 +44,9 @@ gr_make_message_debug () } void gr_message_debug::print(pmt::pmt_t msg){ - std::cout << "******* DEBUG PRINT ********\n"; + std::cout << "******* MESSAGE DEBUG PRINT ********\n"; pmt::pmt_print(msg); + std::cout << "************************************\n"; } @@ -67,5 +68,6 @@ gr_message_debug::work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { + printf("gr_message_debug::work\n"); return 0; // FIXME: replace with default NOP work function in gr_block } diff --git a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc index 26c1babd68..06a1c95969 100644 --- a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc +++ b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc @@ -77,7 +77,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items, if(noutput_items > 0){ // grab a message if one exists - pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) ); + //pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) ); + pmt::pmt_t msg( delete_head_blocking( pdu_port_id ) ); if(msg.get() == NULL ){ return nout; } @@ -87,8 +88,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items, throw std::runtime_error("received a malformed pdu message!"); } - printf("got a msg\n"); - pmt::pmt_print(msg); +// printf("got a msg\n"); +// pmt::pmt_print(msg); // grab the components of the pdu message pmt::pmt_t meta(pmt::pmt_car(msg)); // make sure this is NIL || Dict ? diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 7d2f275e86..69f2e09f98 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2006 Free Software Foundation, Inc. + * Copyright 2006,2012 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -28,6 +28,7 @@ #include <gr_block_registry.h> #include <stdexcept> #include <sstream> +#include <iostream> using namespace pmt; @@ -79,6 +80,7 @@ gr_basic_block::set_block_alias(std::string name) // - register a new input message port void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){ msg_queue[port_id] = msg_queue_t(); + msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable()); } // - register a new output message port @@ -131,7 +133,6 @@ void gr_basic_block::_post(pmt_t which_port, pmt_t msg) { insert_tail(which_port, msg); - global_block_registry.notify_blk(alias()); } void @@ -139,12 +140,16 @@ gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg) { gruel::scoped_lock guard(mutex); + if( (msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())){ + std::cout << "target port = " << pmt::pmt_symbol_to_string(which_port) << std::endl; + throw std::runtime_error("attempted to insert_tail on invalid queue!"); + } + msg_queue[which_port].push_back(msg); + msg_queue_ready[which_port]->notify_one(); // wake up thread if BLKD_IN or BLKD_OUT - //input_cond.notify_one(); - //output_cond.notify_one(); - // TODO: reconsider the need for notification of input and output conditions! + global_block_registry.notify_blk(alias()); } pmt_t @@ -162,4 +167,18 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port) return m; } +pmt_t +gr_basic_block::delete_head_blocking(pmt::pmt_t which_port) +{ + gruel::scoped_lock guard(mutex); + + while (empty_p(which_port)){ + msg_queue_ready[which_port]->wait(guard); + } + + pmt_t m(msg_queue[which_port].front()); + msg_queue[which_port].pop_front(); + return m; +} + diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index 2ee8161c1d..e0fd5d2afd 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -35,6 +35,7 @@ #include <gr_io_signature.h> #include <gruel/thread.h> #include <boost/foreach.hpp> +#include <boost/thread/condition_variable.hpp> /*! * \brief The abstract base class for all signal processing blocks. @@ -72,6 +73,9 @@ private: typedef std::deque<pmt::pmt_t> msg_queue_t; typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; msg_queue_map_t msg_queue; +// boost::condition_variable msg_queue_ready; + std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready; + gruel::mutex mutex; //< protects all vars @@ -163,6 +167,11 @@ public: */ pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port); + msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ return msg_queue[which_port].begin(); } diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index dc77128a39..43aebf0bfd 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -251,3 +251,12 @@ operator << (std::ostream& os, const gr_block *m) return os; } +int +gr_block::general_work(int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) +{ + throw std::runtime_error("gr_block::general_work() not implemented"); + return 0; +} diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h index 98339080d0..57e3fda90a 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.h +++ b/gnuradio-core/src/lib/runtime/gr_block.h @@ -124,7 +124,7 @@ class GR_CORE_API gr_block : public gr_basic_block { virtual int general_work (int noutput_items, gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) = 0; + gr_vector_void_star &output_items); /*! * \brief Called to enable drivers, etc for i/o devices. diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc index 78e1bc99af..69c304a3d8 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc @@ -180,6 +180,11 @@ gr_flowgraph::calc_used_blocks() { gr_basic_block_vector_t tmp; + // make sure free standing message blocks are included + for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){ + tmp.push_back(*it); + } + // Collect all blocks in the edge list for (gr_edge_viter_t p = d_edges.begin(); p != d_edges.end(); p++) { tmp.push_back(p->src().block()); @@ -472,3 +477,7 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve output.push_back(block); } +void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){ + d_msgblocks.push_back(blk); +} + diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h index a2c1580eb8..860cb0ff1e 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h @@ -110,6 +110,8 @@ public: void disconnect(gr_basic_block_sptr src_block, int src_port, gr_basic_block_sptr dst_block, int dst_port); + void add_msg_block(gr_basic_block_sptr blk); + // Validate connectivity, raise exception if invalid void validate(); @@ -128,6 +130,7 @@ public: // Return vector of vectors of disjointly connected blocks, topologically // sorted. std::vector<gr_basic_block_vector_t> partition(); + gr_basic_block_vector_t d_msgblocks; protected: gr_basic_block_vector_t d_blocks; diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc index 099b2f8e88..ff2a5db8cc 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc @@ -152,6 +152,14 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, // register the subscription src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); + + // add block uniquely to list to internal blocks + if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){ + d_blocks.push_back(dst); + } + + // make sure we instantiate a thread for this block + d_fg->add_msg_block(dst); } void @@ -449,6 +457,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const } } } + sfg->d_msgblocks = d_fg->d_msgblocks; // Construct unique list of blocks used either in edges, inputs, // outputs, or by themselves. I still hate STL. diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc index ff2afca103..9f17a48a80 100644 --- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc +++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc @@ -32,7 +32,7 @@ using namespace pmt; gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items) : d_exec(block, max_noutput_items) { - // std::cerr << "gr_tpb_thread_body: " << block << std::endl; + //std::cerr << "gr_tpb_thread_body: " << block << std::endl; gr_block_detail *d = block->detail().get(); gr_block_executor::state s; @@ -53,7 +53,12 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item } d->d_tpb.clear_changed(); - s = d_exec.run_one_iteration(); + // run one iteration if we are a connected stream block + if(d->noutputs() >0 || d->ninputs()>0){ + s = d_exec.run_one_iteration(); + } else { + s = gr_block_executor::BLKD_IN; + } switch(s){ case gr_block_executor::READY: // Tell neighbors we made progress. diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc index dc8f0f8a95..c84a219bd1 100644 --- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc +++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc @@ -70,11 +70,6 @@ void qa_set_msg_handler::t0() send(nop, port, mp(mp("example-msg"), mp(i))); } - // And send a message to null_source to confirm that the default - // message handling action (which should be a nop) doesn't dump - // core. - send(src, port, mp(mp("example-msg"), mp(0))); - // Give the messages a chance to be processed boost::this_thread::sleep(boost::posix_time::milliseconds(100)); diff --git a/grc/blocks/block_tree.xml b/grc/blocks/block_tree.xml index d7ec82e4ab..95bd7bb3ce 100644 --- a/grc/blocks/block_tree.xml +++ b/grc/blocks/block_tree.xml @@ -37,6 +37,13 @@ <block>virtual_sink</block> <block>gr_tag_debug</block> </cat> + <cat> + <name>Message Tools</name> + <block>gr_message_debug</block> + <block>gr_message_strobe</block> + <block>gr_pdu_to_tagged_stream</block> + <block>gr_tagged_stream_to_pdu</block> + </cat> <cat> <name>Operators</name> <block>gr_add_xx</block> diff --git a/grc/blocks/gr_message_debug.xml b/grc/blocks/gr_message_debug.xml new file mode 100644 index 0000000000..9478f53045 --- /dev/null +++ b/grc/blocks/gr_message_debug.xml @@ -0,0 +1,17 @@ +<?xml version="1.0"?> +<!-- +################################################### +##Message Debug +################################################### + --> +<block> + <name>Message Debug</name> + <key>gr_message_debug</key> + <import>from gnuradio import gr</import> + <make>gr.message_debug()</make> + <sink> + <name>print</name> + <type>message</type> + <optional>1</optional> + </sink> +</block> diff --git a/grc/blocks/gr_message_strobe.xml b/grc/blocks/gr_message_strobe.xml new file mode 100644 index 0000000000..60a7724dfc --- /dev/null +++ b/grc/blocks/gr_message_strobe.xml @@ -0,0 +1,35 @@ +<?xml version="1.0"?> +<!-- +################################################### +##Message Strobe +################################################### + --> +<block> + <name>Message Strobe</name> + <key>gr_message_strobe</key> + <import>from gnuradio import gr</import> + <import>from gruel import pmt</import> + <make>gr.message_strobe($msg, $period)</make> + <param> + <name>Message PMT</name> + <key>msg</key> + <value>pmt.pmt_intern("TEST")</value> + <type>raw</type> + </param> + <param> + <name>Period (ms)</name> + <key>period</key> + <value>1000</value> + <type>real</type> + </param> + <sink> + <name>set_msg</name> + <type>message</type> + <optional>1</optional> + </sink> + <source> + <name>strobe</name> + <type>message</type> + <optional>1</optional> + </source> +</block> diff --git a/grc/blocks/gr_pdu_to_tagged_stream.xml b/grc/blocks/gr_pdu_to_tagged_stream.xml new file mode 100644 index 0000000000..fc1c4d16a3 --- /dev/null +++ b/grc/blocks/gr_pdu_to_tagged_stream.xml @@ -0,0 +1,40 @@ +<?xml version="1.0"?> +<!-- +################################################### +## PDU Message to Tagged Stream +################################################### + --> +<block> + <name>PDU to Tagged Stream</name> + <key>gr_pdu_to_tagged_stream</key> + <import>from gnuradio import gr</import> + <make>gr.pdu_to_tagged_stream($type.tv)</make> + <param> + <name>Item Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Byte</name> + <key>byte</key> + <opt>tv:gr.BYTE</opt> + </option> + <option> + <name>Complex</name> + <key>complex</key> + <opt>tv:gr.COMPLEX</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>tv:gr.FLOAT</opt> + </option> + </param> + <sink> + <name>pdus</name> + <type>message</type> + </sink> + <source> + <name>out</name> + <type>$type</type> + </source> +</block> diff --git a/grc/blocks/gr_tagged_stream_to_pdu.xml b/grc/blocks/gr_tagged_stream_to_pdu.xml new file mode 100644 index 0000000000..e70a016080 --- /dev/null +++ b/grc/blocks/gr_tagged_stream_to_pdu.xml @@ -0,0 +1,40 @@ +<?xml version="1.0"?> +<!-- +################################################### +## Tagged Stream to PDU Message +################################################### + --> +<block> + <name>Tagged Stream to PDU</name> + <key>gr_tagged_stream_to_pdu</key> + <import>from gnuradio import gr</import> + <make>gr.tagged_stream_to_pdu($type.tv)</make> + <param> + <name>Item Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Byte</name> + <key>byte</key> + <opt>tv:gr.BYTE</opt> + </option> + <option> + <name>Complex</name> + <key>complex</key> + <opt>tv:gr.COMPLEX</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>tv:gr.FLOAT</opt> + </option> + </param> + <sink> + <name>in</name> + <type>$type</type> + </sink> + <source> + <name>pdus</name> + <type>message</type> + </source> +</block> diff --git a/grc/python/Constants.py b/grc/python/Constants.py index 09c3081967..b8dc9a96a1 100644 --- a/grc/python/Constants.py +++ b/grc/python/Constants.py @@ -58,7 +58,7 @@ CORE_TYPES = ( #name, key, sizeof, color ('Integer 16', 's16', 2, '#FFFF66'), ('Integer 8', 's8', 1, '#FF66FF'), ('Message Queue', 'msg', 0, '#777777'), - ('Async Message', 'message', 0, '#777777'), + ('Async Message', 'message', 0, '#C0C0C0'), ('Wildcard', '', 0, '#FFFFFF'), ) diff --git a/grc/python/Port.py b/grc/python/Port.py index 9f8b50d052..738a33ba72 100644 --- a/grc/python/Port.py +++ b/grc/python/Port.py @@ -116,7 +116,7 @@ class Port(_Port, _GUIPort): _Port.validate(self) if not self.get_enabled_connections() and not self.get_optional(): self.add_error_message('Port is not connected.') - if not self.is_source() and len(self.get_enabled_connections()) > 1: + if not self.is_source() and (not self.get_type() == "message") and len(self.get_enabled_connections()) > 1: self.add_error_message('Port has too many connections.') #message port logic if self.get_type() == 'msg': diff --git a/gruel/src/include/gruel/msg_accepter.h b/gruel/src/include/gruel/msg_accepter.h index 65abd5a6b8..45acb3c784 100644 --- a/gruel/src/include/gruel/msg_accepter.h +++ b/gruel/src/include/gruel/msg_accepter.h @@ -37,7 +37,7 @@ namespace gruel { virtual ~msg_accepter(); /*! - * \brief send \p msg to \p msg_accepter + * \brief send \p msg to \p msg_accepter on port \p which_port * * Sending a message is an asynchronous operation. The \p post * call will not wait for the message either to arrive at the -- cgit v1.2.3 From 53be45f118e6e73d2a50fe0ba4622d6dfe96117c Mon Sep 17 00:00:00 2001 From: Tom Rondeau <trondeau@vt.edu> Date: Thu, 6 Dec 2012 12:52:35 -0500 Subject: core: updated the message debug block to have a 'store' port where messages can be retrieved afterwards. Updated qa_pdu to use the new 'store' port for testing the resulting message. --- gnuradio-core/src/lib/io/gr_message_debug.cc | 39 +++++++++++++++++--- gnuradio-core/src/lib/io/gr_message_debug.h | 49 +++++++++++++++++++++++++- gnuradio-core/src/python/gnuradio/gr/qa_pdu.py | 49 +++++++++++++++++--------- grc/blocks/gr_message_debug.xml | 7 +++- 4 files changed, 121 insertions(+), 23 deletions(-) (limited to 'grc') diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc index d98954576a..7d28ff18e9 100644 --- a/gnuradio-core/src/lib/io/gr_message_debug.cc +++ b/gnuradio-core/src/lib/io/gr_message_debug.cc @@ -43,20 +43,49 @@ gr_make_message_debug () return gnuradio::get_initial_sptr(new gr_message_debug()); } -void gr_message_debug::print(pmt::pmt_t msg){ - std::cout << "******* MESSAGE DEBUG PRINT ********\n"; - pmt::pmt_print(msg); - std::cout << "************************************\n"; +void +gr_message_debug::print(pmt::pmt_t msg) +{ + std::cout << "******* MESSAGE DEBUG PRINT ********\n"; + pmt::pmt_print(msg); + std::cout << "************************************\n"; +} + +void +gr_message_debug::store(pmt::pmt_t msg) +{ + gruel::scoped_lock guard(d_mutex); + d_messages.push_back(msg); +} + +int +gr_message_debug::num_messages() +{ + return (int)d_messages.size(); } +pmt::pmt_t +gr_message_debug::get_message(int i) +{ + gruel::scoped_lock guard(d_mutex); + + if((size_t)i >= d_messages.size()) { + throw std::runtime_error("gr_message_debug: index for message out of bounds.\n"); + } -gr_message_debug::gr_message_debug () + return d_messages[i]; +} + +gr_message_debug::gr_message_debug() : gr_block("message_debug", gr_make_io_signature(0, 0, 0), gr_make_io_signature(0, 0, 0)) { message_port_register_in(pmt::mp("print")); set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1)); + + message_port_register_in(pmt::mp("store")); + set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1)); } gr_message_debug::~gr_message_debug() diff --git a/gnuradio-core/src/lib/io/gr_message_debug.h b/gnuradio-core/src/lib/io/gr_message_debug.h index 120694a916..1ffef1b023 100644 --- a/gnuradio-core/src/lib/io/gr_message_debug.h +++ b/gnuradio-core/src/lib/io/gr_message_debug.h @@ -27,11 +27,12 @@ #include <gr_block.h> #include <gr_message.h> #include <gr_msg_queue.h> +#include <gruel/thread.h> class gr_message_debug; typedef boost::shared_ptr<gr_message_debug> gr_message_debug_sptr; -GR_CORE_API gr_message_debug_sptr gr_make_message_debug (); +GR_CORE_API gr_message_debug_sptr gr_make_message_debug(); /*! * \brief Print received messages to stdout @@ -43,13 +44,59 @@ class GR_CORE_API gr_message_debug : public gr_block friend GR_CORE_API gr_message_debug_sptr gr_make_message_debug(); + /*! + * \brief Messages received in this port are printed to stdout. + * + * This port receives messages from the scheduler's message handling + * mechanism and prints it to stdout. This message handler function + * is only meant to be used by the scheduler to handle messages + * posted to port 'print'. + * + * \param msg A pmt message passed from the scheduler's message handling. + */ void print(pmt::pmt_t msg); + /*! + * \brief Messages received in this port are stored in a vector. + * + * This port receives messages from the scheduler's message handling + * mechanism and stores it in a vector. Messages can be retrieved + * later using the 'get_message' function. This message handler + * function is only meant to be used by the scheduler to handle + * messages posted to port 'store'. + * + * \param msg A pmt message passed from the scheduler's message handling. + */ + void store(pmt::pmt_t msg); + + gruel::mutex d_mutex; + std::vector<pmt::pmt_t> d_messages; + protected: gr_message_debug (); public: ~gr_message_debug (); + + /*! + * \brief Reports the number of messages received by this block. + */ + int num_messages(); + + /*! + * \brief Get a message (as a PMT) from the message vector at index \p i. + * + * Messages passed to the 'store' port will be stored in a + * vector. This function retrieves those messages by index. They are + * index in order of when they were received (all messages are just + * pushed onto the back of a vector). This is mostly useful in + * debugging message passing graphs and in QA code. + * + * \param i The index in the vector for the message to retrieve. + * + * \return a message at index \p i as a pmt_t. + */ + pmt::pmt_t get_message(int i); }; #endif /* INCLUDED_GR_MESSAGE_DEBUG_H */ diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py index 83c7748af8..da1331d968 100755 --- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py +++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py @@ -36,36 +36,53 @@ class test_pdu(gr_unittest.TestCase): # Just run some data through and make sure it doesn't puke. src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - src = gr.pdu_to_tagged_stream(gr.BYTE); - snk3 = gr.tagged_stream_to_pdu(gr.BYTE); - snk2 = gr.vector_sink_b(); - snk = gr.tag_debug(1, "test"); + src = gr.pdu_to_tagged_stream(gr.BYTE) + snk3 = gr.tagged_stream_to_pdu(gr.BYTE) + snk2 = gr.vector_sink_b() + snk = gr.tag_debug(1, "test") - dbg = gr.message_debug(); + dbg = gr.message_debug() self.tb.connect(src, snk) self.tb.connect(src, snk2) self.tb.connect(src, snk3) - self.tb.msg_connect(snk3, "pdus", dbg, "print"); + self.tb.msg_connect(snk3, "pdus", dbg, "store") self.tb.start() # make our reference and message pmts - port = pmt.pmt_intern("pdus"); - msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF) ); + port = pmt.pmt_intern("pdus") + msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF) ) - print "printing port & msg" - pmt.pmt_print(port); - pmt.pmt_print(msg); + #print "printing port & msg" + #pmt.pmt_print(port) + #pmt.pmt_print(msg) # post the message - src.to_basic_block()._post( port, msg ); + src.to_basic_block()._post( port, msg ) - time.sleep(1); - self.tb.stop(); - self.tb.wait(); + while(dbg.num_messages() < 1): + time.sleep(0.5) + self.tb.stop() + self.tb.wait() - print snk2.data(); + # Get the vector of data from the vector sink + result_data = snk2.data() + + # Get the vector of data from the message sink + # Convert the message PMT as a pair into its vector + result_msg = dbg.get_message(0) + msg_vec = pmt.pmt_cdr(result_msg) + pmt.pmt_print(msg_vec) + + # Convert the PMT vector into a Python list + msg_data = [] + for i in xrange(16): + msg_data.append(pmt.pmt_u8vector_ref(msg_vec, i)) + + actual_data = 16*[0xFF,] + self.assertEqual(actual_data, list(result_data)) + self.assertEqual(actual_data, msg_data) if __name__ == '__main__': gr_unittest.run(test_pdu, "test_pdu.xml") diff --git a/grc/blocks/gr_message_debug.xml b/grc/blocks/gr_message_debug.xml index 9478f53045..705a7cc5f3 100644 --- a/grc/blocks/gr_message_debug.xml +++ b/grc/blocks/gr_message_debug.xml @@ -12,6 +12,11 @@ <sink> <name>print</name> <type>message</type> - <optional>1</optional> + <optional>1</optional> + </sink> + <sink> + <name>store</name> + <type>message</type> + <optional>1</optional> </sink> </block> -- cgit v1.2.3 From 52ca5e2765b7a4532d26502b5b76b7c85c5019d7 Mon Sep 17 00:00:00 2001 From: Tim O'Shea <tim.oshea753@gmail.com> Date: Fri, 7 Dec 2012 09:28:41 -0800 Subject: core: added gr_tuntap_pdu, gr_socket_pdu, and msg passing enhancements --- CMakeLists.txt | 7 + gnuradio-core/src/lib/io/CMakeLists.txt | 4 + gnuradio-core/src/lib/io/gr_message_debug.cc | 34 +- gnuradio-core/src/lib/io/gr_message_debug.h | 1 + gnuradio-core/src/lib/io/gr_pdu.cc | 70 ++-- gnuradio-core/src/lib/io/gr_pdu.h | 1 + gnuradio-core/src/lib/io/gr_socket_pdu.cc | 157 ++++++++ gnuradio-core/src/lib/io/gr_socket_pdu.h | 203 +++++++++++ gnuradio-core/src/lib/io/gr_socket_pdu.i | 33 ++ gnuradio-core/src/lib/io/gr_stream_pdu_base.cc | 117 ++++++ gnuradio-core/src/lib/io/gr_stream_pdu_base.h | 62 ++++ gnuradio-core/src/lib/io/gr_tuntap_pdu.cc | 143 ++++++++ gnuradio-core/src/lib/io/gr_tuntap_pdu.h | 66 ++++ gnuradio-core/src/lib/io/gr_tuntap_pdu.i | 30 ++ gnuradio-core/src/lib/io/io.i | 5 + gnuradio-core/src/lib/runtime/gr_basic_block.cc | 41 +-- gnuradio-core/src/lib/runtime/gr_basic_block.h | 406 +++++++++++---------- gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc | 34 +- gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h | 4 +- gnuradio-core/src/lib/runtime/gr_flowgraph.cc | 40 +- gnuradio-core/src/lib/runtime/gr_flowgraph.h | 61 +++- gnuradio-core/src/lib/runtime/gr_hier_block2.cc | 5 +- gnuradio-core/src/lib/runtime/gr_hier_block2.h | 33 ++ gnuradio-core/src/lib/runtime/gr_hier_block2.i | 6 + .../src/lib/runtime/gr_hier_block2_detail.cc | 90 ++++- .../src/lib/runtime/gr_hier_block2_detail.h | 1 + .../src/python/gnuradio/gr/hier_block2.py | 13 + grc/blocks/block_tree.xml | 2 + grc/blocks/gr_message_debug.xml | 5 + grc/blocks/gr_socket_pdu.xml | 62 ++++ grc/blocks/gr_tuntap_pdu.xml | 34 ++ grc/blocks/pad_sink.xml | 9 +- grc/blocks/pad_source.xml | 9 +- grc/python/FlowGraph.py | 10 + grc/python/convert_hier.py | 20 +- grc/python/flow_graph.tmpl | 17 +- gruel/src/include/gruel/pmt.h | 6 + gruel/src/lib/pmt/pmt.cc | 16 + gruel/src/swig/pmt_swig.i | 5 + volk/CMakeLists.txt | 10 +- volk/apps/CMakeLists.txt | 2 +- 41 files changed, 1595 insertions(+), 279 deletions(-) create mode 100644 gnuradio-core/src/lib/io/gr_socket_pdu.cc create mode 100644 gnuradio-core/src/lib/io/gr_socket_pdu.h create mode 100644 gnuradio-core/src/lib/io/gr_socket_pdu.i create mode 100644 gnuradio-core/src/lib/io/gr_stream_pdu_base.cc create mode 100644 gnuradio-core/src/lib/io/gr_stream_pdu_base.h create mode 100644 gnuradio-core/src/lib/io/gr_tuntap_pdu.cc create mode 100644 gnuradio-core/src/lib/io/gr_tuntap_pdu.h create mode 100644 gnuradio-core/src/lib/io/gr_tuntap_pdu.i create mode 100644 grc/blocks/gr_socket_pdu.xml create mode 100644 grc/blocks/gr_tuntap_pdu.xml (limited to 'grc') diff --git a/CMakeLists.txt b/CMakeLists.txt index 9af8d7eb9b..bc076b9e74 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -49,6 +49,13 @@ include(GrVersion) #setup version info SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O2") SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O2") +######################################################################## +# Environment setup +######################################################################## +IF(NOT DEFINED BOOST_ROOT) + SET(BOOST_ROOT ${CMAKE_INSTALL_PREFIX}) +ENDIF() + ######################################################################## # Import executables from a native build (for cross compiling) # http://www.vtk.org/Wiki/CMake_Cross_Compiling#Using_executables_in_the_build_created_during_the_build diff --git a/gnuradio-core/src/lib/io/CMakeLists.txt b/gnuradio-core/src/lib/io/CMakeLists.txt index 7041f28206..59ca06b5a2 100644 --- a/gnuradio-core/src/lib/io/CMakeLists.txt +++ b/gnuradio-core/src/lib/io/CMakeLists.txt @@ -39,6 +39,7 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/ppio_ppdev.cc ${CMAKE_CURRENT_SOURCE_DIR}/gri_wavfile.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_pdu.cc + ${CMAKE_CURRENT_SOURCE_DIR}/gr_stream_pdu_base.cc ) ######################################################################## @@ -61,6 +62,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/ppio_ppdev.h ${CMAKE_CURRENT_SOURCE_DIR}/gri_wavfile.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_pdu.h + ${CMAKE_CURRENT_SOURCE_DIR}/gr_stream_pdu_base.h DESTINATION ${GR_INCLUDE_DIR}/gnuradio COMPONENT "core_devel" ) @@ -103,6 +105,8 @@ set(gr_core_io_triple_threats gr_wavfile_sink gr_tagged_file_sink gr_tagged_stream_to_pdu + gr_tuntap_pdu + gr_socket_pdu ) foreach(file_tt ${gr_core_io_triple_threats}) diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc index 7d28ff18e9..27f4c65fdc 100644 --- a/gnuradio-core/src/lib/io/gr_message_debug.cc +++ b/gnuradio-core/src/lib/io/gr_message_debug.cc @@ -58,6 +58,30 @@ gr_message_debug::store(pmt::pmt_t msg) d_messages.push_back(msg); } +void +gr_message_debug::print_verbose(pmt::pmt_t msg) +{ + pmt::pmt_t meta = pmt::pmt_car(msg); + pmt::pmt_t vector = pmt::pmt_cdr(msg); + std::cout << "* MESSAGE DEBUG PRINT PDU VERBOSE *\n"; + pmt::pmt_print(meta); + size_t len = pmt::pmt_length(vector); + std::cout << "pdu_length = " << len << std::endl; + std::cout << "contents = " << std::endl; + size_t offset(0); + const uint8_t* d = (const uint8_t*) pmt_uniform_vector_elements(vector, offset); + for(size_t i=0; i<len; i+=16){ + printf("%04x: ", i); + for(size_t j=i; j<std::min(i+16,len); j++){ + printf("%02x ",d[j] ); + } + + std::cout << std::endl; + } + + std::cout << "***********************************\n"; +} + int gr_message_debug::num_messages() { @@ -81,11 +105,11 @@ gr_message_debug::gr_message_debug() gr_make_io_signature(0, 0, 0), gr_make_io_signature(0, 0, 0)) { - message_port_register_in(pmt::mp("print")); - set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1)); - - message_port_register_in(pmt::mp("store")); - set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1)); + message_port_register_in(pmt::mp("print")); + set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1)); + + message_port_register_in(pmt::mp("store")); + set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1)); } gr_message_debug::~gr_message_debug() diff --git a/gnuradio-core/src/lib/io/gr_message_debug.h b/gnuradio-core/src/lib/io/gr_message_debug.h index 1ffef1b023..6e6e5103cb 100644 --- a/gnuradio-core/src/lib/io/gr_message_debug.h +++ b/gnuradio-core/src/lib/io/gr_message_debug.h @@ -55,6 +55,7 @@ class GR_CORE_API gr_message_debug : public gr_block * \param msg A pmt message passed from the scheduler's message handling. */ void print(pmt::pmt_t msg); + void print_verbose(pmt::pmt_t msg); /*! * \brief Messages received in this port are stored in a vector. diff --git a/gnuradio-core/src/lib/io/gr_pdu.cc b/gnuradio-core/src/lib/io/gr_pdu.cc index f33eed0a37..b2757c307e 100644 --- a/gnuradio-core/src/lib/io/gr_pdu.cc +++ b/gnuradio-core/src/lib/io/gr_pdu.cc @@ -28,42 +28,52 @@ size_t gr_pdu_itemsize(gr_pdu_vector_type type){ - switch(type){ - case BYTE: - return 1; - case FLOAT: - return sizeof(float); - case COMPLEX: - return sizeof(gr_complex); - default: - throw std::runtime_error("bad type!"); - } + switch(type){ + case BYTE: + return 1; + case FLOAT: + return sizeof(float); + case COMPLEX: + return sizeof(gr_complex); + default: + throw std::runtime_error("bad type!"); + } } bool gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v){ - switch(type){ - case BYTE: - return pmt::pmt_is_u8vector(v); - case FLOAT: - return pmt::pmt_is_f32vector(v); - case COMPLEX: - return pmt::pmt_is_c32vector(v); - default: - throw std::runtime_error("bad type!"); - } + switch(type){ + case BYTE: + return pmt::pmt_is_u8vector(v); + case FLOAT: + return pmt::pmt_is_f32vector(v); + case COMPLEX: + return pmt::pmt_is_c32vector(v); + default: + throw std::runtime_error("bad type!"); + } } pmt::pmt_t gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items){ - switch(type){ - case BYTE: - return pmt::pmt_init_u8vector(items, buf); - case FLOAT: - return pmt::pmt_init_f32vector(items, (const float*)buf); - case COMPLEX: - return pmt::pmt_init_c32vector(items, (const gr_complex*)buf); - default: - throw std::runtime_error("bad type!"); - } + switch(type){ + case BYTE: + return pmt::pmt_init_u8vector(items, buf); + case FLOAT: + return pmt::pmt_init_f32vector(items, (const float*)buf); + case COMPLEX: + return pmt::pmt_init_c32vector(items, (const gr_complex*)buf); + default: + throw std::runtime_error("bad type!"); + } +} + +gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector){ + if(pmt_is_u8vector(vector)) + return BYTE; + if(pmt_is_f32vector(vector)) + return FLOAT; + if(pmt_is_c32vector(vector)) + return COMPLEX; + throw std::runtime_error("bad type!"); } diff --git a/gnuradio-core/src/lib/io/gr_pdu.h b/gnuradio-core/src/lib/io/gr_pdu.h index 67519c89db..5ed9cdded8 100644 --- a/gnuradio-core/src/lib/io/gr_pdu.h +++ b/gnuradio-core/src/lib/io/gr_pdu.h @@ -34,5 +34,6 @@ enum gr_pdu_vector_type { BYTE, FLOAT, COMPLEX }; size_t gr_pdu_itemsize(gr_pdu_vector_type type); bool gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v); pmt::pmt_t gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items); +gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector); #endif diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.cc b/gnuradio-core/src/lib/io/gr_socket_pdu.cc new file mode 100644 index 0000000000..bb374b3006 --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_socket_pdu.cc @@ -0,0 +1,157 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gr_socket_pdu.h> +#include <gr_io_signature.h> +#include <cstdio> +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdexcept> +#include <string.h> +#include <iostream> +#include <gr_pdu.h> +#include <boost/format.hpp> + +// public constructor that returns a shared_ptr +gr_socket_pdu_sptr +gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU) +{ + return gnuradio::get_initial_sptr(new gr_socket_pdu(type,addr,port,MTU)); +} + +gr_socket_pdu::gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU) + : gr_stream_pdu_base(MTU) +{ + + if( (type == "TCP_SERVER") || (type == "TCP_CLIENT")){ + boost::asio::ip::tcp::resolver resolver(_io_service); + boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), addr, port); + _tcp_endpoint = *resolver.resolve(query); + } + if( (type == "UDP_SERVER") || (type == "UDP_CLIENT")){ + boost::asio::ip::udp::resolver resolver(_io_service); + boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); + if( (type == "UDP_SERVER") ){ + _udp_endpoint = *resolver.resolve(query); + } else { + _udp_endpoint_other = *resolver.resolve(query); + } + } + + // register ports + message_port_register_out(pmt::mp("pdus")); + message_port_register_in(pmt::mp("pdus")); + + // set up socketry + if (type == "TCP_SERVER"){ + _acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(_io_service, _tcp_endpoint)); + _acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + start_tcp_accept(); + // bind tcp server send handler + set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::tcp_server_send, this, _1)); + } else if(type =="TCP_CLIENT"){ + boost::system::error_code error = boost::asio::error::host_not_found; + _tcp_socket.reset(new boost::asio::ip::tcp::socket(_io_service)); + _tcp_socket->connect(_tcp_endpoint, error); + if(error){ + throw boost::system::system_error(error); + } + set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::tcp_client_send, this, _1)); + _tcp_socket->async_read_some( + boost::asio::buffer(rxbuf), + boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + + } else if(type =="UDP_SERVER"){ + _udp_socket.reset(new boost::asio::ip::udp::socket(_io_service, _udp_endpoint)); + _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other, + boost::bind(&gr_socket_pdu::handle_udp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::udp_send, this, _1)); + } else if(type =="UDP_CLIENT"){ + _udp_socket.reset(new boost::asio::ip::udp::socket(_io_service, _udp_endpoint)); + _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other, + boost::bind(&gr_socket_pdu::handle_udp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::udp_send, this, _1)); + } else { + throw std::runtime_error("unknown socket type!"); + } + + // start thread for io_service + d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_socket_pdu::run_io_service, this))); + d_started = true; +} + +void tcp_connection::handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred) + { + if(!error) + { + pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&buf[0]); + pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector); + + d_block->message_port_pub( pmt::mp("pdus"), pdu ); + + socket_.async_read_some( + boost::asio::buffer(buf), + boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + + } else { + std::cout << "error occurred\n"; + } + + } + + +void gr_socket_pdu::tcp_server_send(pmt::pmt_t msg){ + pmt::pmt_t vector = pmt::pmt_cdr(msg); + for(size_t i=0; i<d_tcp_connections.size(); i++){ + d_tcp_connections[i]->send(vector); + } +} + +void gr_socket_pdu::tcp_client_send(pmt::pmt_t msg){ + pmt::pmt_t vector = pmt::pmt_cdr(msg); + size_t len = pmt::pmt_length(vector); + size_t offset(0); + boost::array<char, 10000> txbuf; + memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len); + _tcp_socket->send(boost::asio::buffer(txbuf,len)); +} + +void gr_socket_pdu::udp_send(pmt::pmt_t msg){ + pmt::pmt_t vector = pmt::pmt_cdr(msg); + size_t len = pmt::pmt_length(vector); + size_t offset(0); + boost::array<char, 10000> txbuf; + memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len); + if(_udp_endpoint_other.address().to_string() != "0.0.0.0") + _udp_socket->send_to(boost::asio::buffer(txbuf,len), _udp_endpoint_other); +} diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.h b/gnuradio-core/src/lib/io/gr_socket_pdu.h new file mode 100644 index 0000000000..3a96a3f97f --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_socket_pdu.h @@ -0,0 +1,203 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_SOCKET_PDU_H +#define INCLUDED_GR_SOCKET_PDU_H + +#include <gr_core_api.h> +#include <gr_sync_block.h> +#include <gr_message.h> +#include <gr_msg_queue.h> +#include <gr_stream_pdu_base.h> +#include <boost/asio.hpp> + +#include <linux/if_tun.h> + +class gr_socket_pdu; +typedef boost::shared_ptr<gr_socket_pdu> gr_socket_pdu_sptr; + +GR_CORE_API gr_socket_pdu_sptr gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000); + +class tcp_connection + : public boost::enable_shared_from_this<tcp_connection> +{ +public: + typedef boost::shared_ptr<tcp_connection> pointer; + gr_socket_pdu *d_block; + boost::array<char, 10000> buf; + + static pointer create(boost::asio::io_service& io_service) + { + return pointer(new tcp_connection(io_service)); + } + + boost::asio::ip::tcp::socket& socket() + { + return socket_; + } + + void start(gr_socket_pdu* parent) + { + d_block = parent; +// message_ = "connected to gr_socket_pdu\n"; +// boost::asio::async_write(socket_, boost::asio::buffer(message_), +// boost::bind(&tcp_connection::handle_write, shared_from_this(), +// boost::asio::placeholders::error, +// boost::asio::placeholders::bytes_transferred)); + + socket_.async_read_some( + boost::asio::buffer(buf), + boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + void send(pmt::pmt_t vector){ + size_t len = pmt::pmt_length(vector); + size_t offset(0); + boost::array<char, 10000> txbuf; + memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len); + boost::asio::async_write(socket_, boost::asio::buffer(txbuf, len), + boost::bind(&tcp_connection::handle_write, shared_from_this(), + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } + + ~tcp_connection(){ +// std::cout << "tcp_connection destroyed\n"; + } + +private: + tcp_connection(boost::asio::io_service& io_service) + : socket_(io_service) + { + } + + void handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred); + + void handle_write(const boost::system::error_code& /*error*/, + size_t /*bytes_transferred*/) + { + } + + boost::asio::ip::tcp::socket socket_; + std::string message_; +}; + + +/*! + * \brief Gather received items into messages and insert into msgq + * \ingroup sink_blk + */ +class GR_CORE_API gr_socket_pdu : public gr_stream_pdu_base +{ + private: + friend GR_CORE_API gr_socket_pdu_sptr + gr_make_socket_pdu(std::string type, std::string addr, std::string port, int MTU); + + boost::asio::io_service _io_service; + + boost::array<char, 10000> rxbuf; + + // tcp specific + boost::asio::ip::tcp::endpoint _tcp_endpoint; + + // specific to tcp server + boost::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor_tcp; + std::vector<tcp_connection::pointer> d_tcp_connections; + void tcp_server_send(pmt::pmt_t msg); + void tcp_client_send(pmt::pmt_t msg); + void udp_send(pmt::pmt_t msg); + + // specific to tcp client + boost::shared_ptr<boost::asio::ip::tcp::socket> _tcp_socket; + + // specific to udp client/server + boost::asio::ip::udp::endpoint _udp_endpoint; + boost::asio::ip::udp::endpoint _udp_endpoint_other; + boost::shared_ptr<boost::asio::ip::udp::socket> _udp_socket; + + void handle_receive(const boost::system::error_code& error, std::size_t ){ + } + + void start_tcp_accept(){ + tcp_connection::pointer new_connection = + tcp_connection::create(_acceptor_tcp->get_io_service()); + + _acceptor_tcp->async_accept(new_connection->socket(), + boost::bind(&gr_socket_pdu::handle_tcp_accept, this, new_connection, + boost::asio::placeholders::error)); + } + + void handle_tcp_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error){ + if (!error) + { + new_connection->start(this); + d_tcp_connections.push_back(new_connection); + start_tcp_accept(); + } else { + std::cout << error << std::endl; + } + } + + void run_io_service(){ + _io_service.run(); + } + + void handle_udp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){ + if(!error){ + pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]); + pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector); + + message_port_pub( pmt::mp("pdus"), pdu ); + + _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other, + boost::bind(&gr_socket_pdu::handle_udp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } else { + throw boost::system::system_error(error); +// std::cout << "error occurred\n"; + } + } + void handle_tcp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){ + if(!error) + { + pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]); + pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector); + + message_port_pub( pmt::mp("pdus"), pdu ); + + _tcp_socket->async_read_some( + boost::asio::buffer(rxbuf), + boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + + } else { + //std::cout << "error occurred\n"; + throw boost::system::system_error(error); + } + } + + protected: + gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000); + public: + ~gr_socket_pdu () {} +}; + +#endif /* INCLUDED_GR_TUNTAP_PDU_H */ diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.i b/gnuradio-core/src/lib/io/gr_socket_pdu.i new file mode 100644 index 0000000000..3e20b63e20 --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_socket_pdu.i @@ -0,0 +1,33 @@ +/* -*- c++ -*- */ +/* + * Copyright 2005 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +GR_SWIG_BLOCK_MAGIC(gr,socket_pdu); + +%ignore tcp_connection; + +%{ +#include <gr_socket_pdu.h> +%} + +%include "gr_stream_pdu_base.h" +%include "gr_socket_pdu.h" + diff --git a/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc b/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc new file mode 100644 index 0000000000..cff7296cba --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc @@ -0,0 +1,117 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gr_stream_pdu_base.h> +#include <gr_io_signature.h> +#include <cstdio> +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdexcept> +#include <string.h> +#include <iostream> +#include <gr_pdu.h> +#include <boost/format.hpp> + +static const long timeout_us = 100*1000; //100ms + +gr_stream_pdu_base::gr_stream_pdu_base (int MTU) + : gr_sync_block("stream_pdu_base", + gr_make_io_signature(0, 0, 0), + gr_make_io_signature(0, 0, 0)), + d_finished(false), d_started(false), d_fd(-1) +{ + // reserve space for rx buffer + d_rxbuf.resize(MTU,0); +} + +gr_stream_pdu_base::~gr_stream_pdu_base() +{ + stop_rxthread(); +} + +void gr_stream_pdu_base::stop_rxthread(){ + d_finished = true; + if(d_started){ + d_thread->interrupt(); + d_thread->join(); + } + } + +void gr_stream_pdu_base::start_rxthread(pmt::pmt_t _rxport){ + rxport = _rxport; + d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_stream_pdu_base::run, this))); + d_started = true; + } + +void gr_stream_pdu_base::run(){ + while(!d_finished) { + if(not wait_ready()){ continue; } + const int result = read( d_fd, &d_rxbuf[0], d_rxbuf.size() ); + if(result <= 0){ throw std::runtime_error("gr_stream_pdu_base, bad socket read!"); } + pmt::pmt_t vector = pmt::pmt_init_u8vector(result, &d_rxbuf[0]); + pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector); + message_port_pub(rxport, pdu); + } +} + +void gr_stream_pdu_base::send(pmt::pmt_t msg){ + pmt::pmt_t vector = pmt::pmt_cdr(msg); + size_t offset(0); + size_t itemsize(gr_pdu_itemsize(type_from_pmt(vector))); + int len( pmt::pmt_length(vector)*itemsize ); + + const int rv = write(d_fd, pmt::pmt_uniform_vector_elements(vector, offset), len); + if(rv != len){ + std::cerr << boost::format("WARNING: gr_stream_pdu_base::send(pdu) write failed! (d_fd=%d, len=%d, rv=%d)") + % d_fd % len % rv << std::endl; + } +} + +int +gr_stream_pdu_base::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) +{ + throw std::runtime_error("should not be called.\n"); + return 0; +} + +bool gr_stream_pdu_base::wait_ready(){ + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = timeout_us; + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(d_fd, &rset); + + //call select with timeout on receive socket + return ::select(d_fd+1, &rset, NULL, NULL, &tv) > 0; +} diff --git a/gnuradio-core/src/lib/io/gr_stream_pdu_base.h b/gnuradio-core/src/lib/io/gr_stream_pdu_base.h new file mode 100644 index 0000000000..dc5dc5c2e9 --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_stream_pdu_base.h @@ -0,0 +1,62 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_STREAM_PDU_BASE_H +#define INCLUDED_GR_STREAM_PDU_BASE_H + +#include <gr_core_api.h> +#include <gr_sync_block.h> +#include <gr_message.h> +#include <gr_msg_queue.h> + +#include <linux/if_tun.h> + + +/*! + * \brief Gather received items into messages and insert into msgq + * \ingroup sink_blk + */ +class GR_CORE_API gr_stream_pdu_base : public gr_sync_block +{ + public: + boost::shared_ptr<boost::thread> d_thread; + bool d_finished; + bool d_started; + std::vector<uint8_t> d_rxbuf; + void run(); + int d_fd; + gr_stream_pdu_base (int MTU=10000); + ~gr_stream_pdu_base (); + void send(pmt::pmt_t msg); + bool wait_ready(); + int work (int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + void start_rxthread(pmt::pmt_t _rxport); + void stop_rxthread(); + private: + pmt::pmt_t rxport; +}; + +typedef boost::shared_ptr<gr_stream_pdu_base> gr_stream_pdu_base_sptr; + +#endif /* INCLUDED_GR_TUNTAP_PDU_H */ diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc b/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc new file mode 100644 index 0000000000..44de1a5f7d --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc @@ -0,0 +1,143 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gr_tuntap_pdu.h> +#include <gr_io_signature.h> +#include <cstdio> +#include <errno.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> +#include <stdexcept> +#include <string.h> +#include <iostream> +#include <gr_pdu.h> +#include <boost/format.hpp> + +#if (defined(linux) || defined(__linux) || defined(__linux__)) + +#include <sys/ioctl.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <fcntl.h> + +#include <arpa/inet.h> +#include <linux/if.h> + + +// public constructor that returns a shared_ptr + +gr_tuntap_pdu_sptr +gr_make_tuntap_pdu (std::string dev, int MTU) +{ + return gnuradio::get_initial_sptr(new gr_tuntap_pdu(dev, MTU)); +} + +gr_tuntap_pdu::gr_tuntap_pdu (std::string dev, int MTU) + : gr_stream_pdu_base(MTU) +{ + + // make the tuntap + char dev_cstr[1024]; + memset(dev_cstr, 0x00, 1024); + strncpy(dev_cstr, dev.c_str(), std::min(sizeof(dev_cstr), dev.size())); + d_fd = tun_alloc(dev_cstr); + if(d_fd <= 0){ + throw std::runtime_error("TunTap make: tun_alloc failed (are you running as root?)"); + } + + std::cout << boost::format( + "Allocated virtual ethernet interface: %s\n" + "You must now use ifconfig to set its IP address. E.g.,\n" + " $ sudo ifconfig %s 192.168.200.1\n" + "Be sure to use a different address in the same subnet for each machine.\n" + ) % dev % dev << std::endl; + + // set up output message port + message_port_register_out(pmt::mp("pdus")); + start_rxthread(pmt::mp("pdus")); + + // set up input message port + message_port_register_in(pmt::mp("pdus")); + set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_tuntap_pdu::send, this, _1)); +} + + +int gr_tuntap_pdu::tun_alloc(char *dev, int flags) { + struct ifreq ifr; + int fd, err; + const char *clonedev = "/dev/net/tun"; + + /* Arguments taken by the function: + * + * char *dev: the name of an interface (or '\0'). MUST have enough + * space to hold the interface name if '\0' is passed + * int flags: interface flags (eg, IFF_TUN etc.) + */ + + /* open the clone device */ + if( (fd = open(clonedev, O_RDWR)) < 0 ) { + return fd; + } + + /* preparation of the struct ifr, of type "struct ifreq" */ + memset(&ifr, 0, sizeof(ifr)); + + ifr.ifr_flags = flags; /* IFF_TUN or IFF_TAP, plus maybe IFF_NO_PI */ + + if (*dev) { + /* if a device name was specified, put it in the structure; otherwise, + * the kernel will try to allocate the "next" device of the + * specified type */ + strncpy(ifr.ifr_name, dev, IFNAMSIZ); + } + + /* try to create the device */ + if( (err = ioctl(fd, TUNSETIFF, (void *) &ifr)) < 0 ) { + close(fd); + return err; + } + + /* if the operation was successful, write back the name of the + * interface to the variable "dev", so the caller can know + * it. Note that the caller MUST reserve space in *dev (see calling + * code below) */ + strcpy(dev, ifr.ifr_name); + + /* this is the special file descriptor that the caller will use to talk + * with the virtual interface */ + return fd; +} + +#else //if not linux + +boost::shared_ptr<gr_block> gr_make_tuntap_pdu (std::string dev, int MTU){ + boost::shared_ptr<gr_block> rv; + throw std::runtime_error("tuntap only implemented on linux"); + return rv; +} + +#endif diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.h b/gnuradio-core/src/lib/io/gr_tuntap_pdu.h new file mode 100644 index 0000000000..0e8071c30d --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.h @@ -0,0 +1,66 @@ +/* -*- c++ -*- */ +/* + * Copyright 2012 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_TUNTAP_PDU_H +#define INCLUDED_GR_TUNTAP_PDU_H + +#include <gr_core_api.h> +#include <gr_sync_block.h> +#include <gr_message.h> +#include <gr_msg_queue.h> +#include <gr_stream_pdu_base.h> + +#if (defined(linux) || defined(__linux) || defined(__linux__)) + +#include <linux/if_tun.h> + +class gr_tuntap_pdu; +typedef boost::shared_ptr<gr_tuntap_pdu> gr_tuntap_pdu_sptr; + +GR_CORE_API gr_tuntap_pdu_sptr gr_make_tuntap_pdu (std::string dev, int MTU=10000); + +/*! + * \brief Gather received items into messages and insert into msgq + * \ingroup sink_blk + */ +class GR_CORE_API gr_tuntap_pdu : public gr_stream_pdu_base +{ + private: + friend GR_CORE_API gr_tuntap_pdu_sptr + gr_make_tuntap_pdu(std::string dev, int MTU); + int tun_alloc(char* dev, int flags = IFF_TAP | IFF_NO_PI); + std::string d_dev; + protected: + gr_tuntap_pdu (std::string dev, int MTU=10000); + + public: + ~gr_tuntap_pdu () {} + +}; + +#else // if not linux + +GR_CORE_API boost::shared_ptr<gr_block> gr_make_tuntap_pdu (std::string dev, int MTU=0); + +#endif + +#endif /* INCLUDED_GR_TUNTAP_PDU_H */ diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.i b/gnuradio-core/src/lib/io/gr_tuntap_pdu.i new file mode 100644 index 0000000000..589bbc3853 --- /dev/null +++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.i @@ -0,0 +1,30 @@ +/* -*- c++ -*- */ +/* + * Copyright 2005 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +GR_SWIG_BLOCK_MAGIC(gr,tuntap_pdu); + +%{ +#include <gr_tuntap_pdu.h> +%} + +%include "gr_tuntap_pdu.h" + diff --git a/gnuradio-core/src/lib/io/io.i b/gnuradio-core/src/lib/io/io.i index 871ce1356e..e2de4eb976 100644 --- a/gnuradio-core/src/lib/io/io.i +++ b/gnuradio-core/src/lib/io/io.i @@ -49,6 +49,8 @@ #include <gr_tagged_stream_to_pdu.h> #include <gr_message_debug.h> #include <gr_pdu.h> +#include <gr_tuntap_pdu.h> +#include <gr_socket_pdu.h> %} %include "gr_file_sink_base.i" @@ -75,4 +77,7 @@ %include "gr_tagged_stream_to_pdu.i" %include "gr_message_debug.i" %include "gr_pdu.i" +%include "gr_tuntap_pdu.i" +%include "gr_socket_pdu.i" + diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc index 0f7875a122..6ff57a1d6c 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc @@ -38,7 +38,7 @@ static long s_ncurrently_allocated = 0; long gr_basic_block_ncurrently_allocated() { - return s_ncurrently_allocated; + return s_ncurrently_allocated; } gr_basic_block::gr_basic_block(const std::string &name, @@ -53,25 +53,25 @@ gr_basic_block::gr_basic_block(const std::string &name, d_color(WHITE), message_subscribers(pmt::pmt_make_dict()) { - s_ncurrently_allocated++; + s_ncurrently_allocated++; } gr_basic_block::~gr_basic_block() { - s_ncurrently_allocated--; - global_block_registry.block_unregister(this); + s_ncurrently_allocated--; + global_block_registry.block_unregister(this); } gr_basic_block_sptr gr_basic_block::to_basic_block() { - return shared_from_this(); + return shared_from_this(); } void gr_basic_block::set_block_alias(std::string name) { - global_block_registry.register_symbolic_name(this, name); + global_block_registry.register_symbolic_name(this, name); } // ** Message passing interface ** @@ -147,28 +147,29 @@ void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg) } // - subscribe to a message port -void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target) -{ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) { +void +gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ std::stringstream ss; - ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) - << "\" on block: " << pmt::pmt_write_string(target) << std::endl; + ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; throw std::runtime_error(ss.str()); } - pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); - message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target)); + + // ignore re-adds of the same target + if(!pmt::pmt_list_has(currlist, target)) + message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target)); } -void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target) -{ - if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) { +void +gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){ + if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ std::stringstream ss; - ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) - << "\" on block: " << pmt::pmt_write_string(target) << std::endl; + ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl; throw std::runtime_error(ss.str()); } - + + // ignore unsubs of unknown targets pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL); message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target)); } @@ -224,5 +225,3 @@ gr_basic_block::delete_head_blocking(pmt::pmt_t which_port) msg_queue[which_port].pop_front(); return m; } - - diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index 00e9c2192f..f3b7b835b4 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -36,6 +36,7 @@ #include <gruel/thread.h> #include <boost/foreach.hpp> #include <boost/thread/condition_variable.hpp> +#include <iostream> /*! * \brief The abstract base class for all signal processing blocks. @@ -50,202 +51,215 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block> { - typedef boost::function<void(pmt::pmt_t)> msg_handler_t; - -private: - /* - * This function is called by the runtime system to dispatch messages. - * - * The thread-safety guarantees mentioned in set_msg_handler are implemented - * by the callers of this method. - */ - void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) - { - // AA Update this - if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? - d_msg_handlers[which_port](msg); // Yes, invoke it. - }; - - //msg_handler_t d_msg_handler; - typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t; - d_msg_handlers_t d_msg_handlers; - - typedef std::deque<pmt::pmt_t> msg_queue_t; - typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; - typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr; - msg_queue_map_t msg_queue; -// boost::condition_variable msg_queue_ready; - std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready; - - gruel::mutex mutex; //< protects all vars - - -protected: - friend class gr_flowgraph; - friend class gr_flat_flowgraph; // TODO: will be redundant - friend class gr_tpb_thread_body; - - enum vcolor { WHITE, GREY, BLACK }; - - std::string d_name; - gr_io_signature_sptr d_input_signature; - gr_io_signature_sptr d_output_signature; - long d_unique_id; - long d_symbolic_id; - std::string d_symbol_name; - std::string d_symbol_alias; - vcolor d_color; - - gr_basic_block(void){} //allows pure virtual interface sub-classes - - //! Protected constructor prevents instantiation by non-derived classes - gr_basic_block(const std::string &name, - gr_io_signature_sptr input_signature, - gr_io_signature_sptr output_signature); - - //! may only be called during constructor - void set_input_signature(gr_io_signature_sptr iosig) { - d_input_signature = iosig; + typedef boost::function<void(pmt::pmt_t)> msg_handler_t; + + private: + /* + * This function is called by the runtime system to dispatch messages. + * + * The thread-safety guarantees mentioned in set_msg_handler are implemented + * by the callers of this method. + */ + void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) + { + // AA Update this + if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? + d_msg_handlers[which_port](msg); // Yes, invoke it. + }; + + //msg_handler_t d_msg_handler; + typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t; + d_msg_handlers_t d_msg_handlers; + + typedef std::deque<pmt::pmt_t> msg_queue_t; + typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t; + typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr; + std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready; + + gruel::mutex mutex; //< protects all vars + + protected: + friend class gr_flowgraph; + friend class gr_flat_flowgraph; // TODO: will be redundant + friend class gr_tpb_thread_body; + + enum vcolor { WHITE, GREY, BLACK }; + + std::string d_name; + gr_io_signature_sptr d_input_signature; + gr_io_signature_sptr d_output_signature; + long d_unique_id; + long d_symbolic_id; + std::string d_symbol_name; + std::string d_symbol_alias; + vcolor d_color; + msg_queue_map_t msg_queue; + + gr_basic_block(void){} //allows pure virtual interface sub-classes + + //! Protected constructor prevents instantiation by non-derived classes + gr_basic_block(const std::string &name, + gr_io_signature_sptr input_signature, + gr_io_signature_sptr output_signature); + + //! may only be called during constructor + void set_input_signature(gr_io_signature_sptr iosig) { + d_input_signature = iosig; + } + + //! may only be called during constructor + void set_output_signature(gr_io_signature_sptr iosig) { + d_output_signature = iosig; + } + + /*! + * \brief Allow the flowgraph to set for sorting and partitioning + */ + void set_color(vcolor color) { d_color = color; } + vcolor color() const { return d_color; } + + // Message passing interface + pmt::pmt_t message_subscribers; + + public: + virtual ~gr_basic_block(); + long unique_id() const { return d_unique_id; } + long symbolic_id() const { return d_symbolic_id; } + std::string name() const { return d_name; } + std::string symbol_name() const { return d_symbol_name; } + gr_io_signature_sptr input_signature() const { return d_input_signature; } + gr_io_signature_sptr output_signature() const { return d_output_signature; } + gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion + bool alias_set() { return !d_symbol_alias.empty(); } + std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); } + pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); } + void set_block_alias(std::string name); + + // ** Message passing interface ** + void message_port_register_in(pmt::pmt_t port_id); + void message_port_register_out(pmt::pmt_t port_id); + void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); + void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); + void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); + + virtual bool message_port_is_hier(pmt::pmt_t port_id) { std::cout << "is_hier\n"; return false; } + virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { std::cout << "is_hier_in\n"; return false; } + virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { std::cout << "is_hier_out\n"; return false; } + + /*! + * \brief Get input message port names. + * + * Returns the available input message ports for a block. The + * return object is a PMT vector that is filled with PMT symbols. + */ + pmt::pmt_t message_ports_in(); + + /*! + * \brief Get output message port names. + * + * Returns the available output message ports for a block. The + * return object is a PMT vector that is filled with PMT symbols. + */ + pmt::pmt_t message_ports_out(); + + /*! + * Accept msg, place in queue, arrange for thread to be awakened if it's not already. + */ + void _post(pmt::pmt_t which_port, pmt::pmt_t msg); + + //! is the queue empty? + //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } + bool empty_p(pmt::pmt_t which_port) { + if(msg_queue.find(which_port) == msg_queue.end()) + throw std::runtime_error("port does not exist!"); + return msg_queue[which_port].empty(); + } + bool empty_p() { + bool rv = true; + BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); } + return rv; + } + + //| Acquires and release the mutex + void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); + + /*! + * \returns returns pmt at head of queue or pmt_t() if empty. + */ + pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port); + + msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ + return msg_queue[which_port].begin(); + } + + void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){ + msg_queue[which_port].erase(it); + } + + virtual bool has_msg_port(pmt::pmt_t which_port){ + if(msg_queue.find(which_port) != msg_queue.end()){ + return true; } - - //! may only be called during constructor - void set_output_signature(gr_io_signature_sptr iosig) { - d_output_signature = iosig; - } - - /*! - * \brief Allow the flowgraph to set for sorting and partitioning - */ - void set_color(vcolor color) { d_color = color; } - vcolor color() const { return d_color; } - - // Message passing interface - pmt::pmt_t message_subscribers; - -public: - virtual ~gr_basic_block(); - long unique_id() const { return d_unique_id; } - long symbolic_id() const { return d_symbolic_id; } - std::string name() const { return d_name; } - std::string symbol_name() const { return d_symbol_name; } - gr_io_signature_sptr input_signature() const { return d_input_signature; } - gr_io_signature_sptr output_signature() const { return d_output_signature; } - gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion - bool alias_set() { return !d_symbol_alias.empty(); } - std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); } - pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); } - void set_block_alias(std::string name); - - // ** Message passing interface ** - void message_port_register_in(pmt::pmt_t port_id); - void message_port_register_out(pmt::pmt_t port_id); - void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); - void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); - void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); - - /*! - * \brief Get input message port names. - * - * Returns the available input message ports for a block. The - * return object is a PMT vector that is filled with PMT symbols. - */ - pmt::pmt_t message_ports_in(); - - /*! - * \brief Get output message port names. - * - * Returns the available output message ports for a block. The - * return object is a PMT vector that is filled with PMT symbols. - */ - pmt::pmt_t message_ports_out(); - - /*! - * Accept msg, place in queue, arrange for thread to be awakened if it's not already. - */ - void _post(pmt::pmt_t which_port, pmt::pmt_t msg); - - //! is the queue empty? - //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } - bool empty_p(pmt::pmt_t which_port) { - if(msg_queue.find(which_port) == msg_queue.end()) - throw std::runtime_error("port does not exist!"); - return msg_queue[which_port].empty(); - } - bool empty_p() { - bool rv = true; - BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); } - return rv; - } - - //| Acquires and release the mutex - void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - */ - pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); - - /*! - * \returns returns pmt at head of queue or pmt_t() if empty. - */ - pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port); - - msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){ - return msg_queue[which_port].begin(); - } - void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){ - msg_queue[which_port].erase(it); - } - - - /*! - * \brief Confirm that ninputs and noutputs is an acceptable combination. - * - * \param ninputs number of input streams connected - * \param noutputs number of output streams connected - * - * \returns true if this is a valid configuration for this block. - * - * This function is called by the runtime system whenever the - * topology changes. Most classes do not need to override this. - * This check is in addition to the constraints specified by the input - * and output gr_io_signatures. - */ - virtual bool check_topology(int ninputs, int noutputs) { return true; } - - /*! - * \brief Set the callback that is fired when messages are available. - * - * \p msg_handler can be any kind of function pointer or function object - * that has the signature: - * <pre> - * void msg_handler(pmt::pmt msg); - * </pre> - * - * (You may want to use boost::bind to massage your callable into the - * correct form. See gr_nop.{h,cc} for an example that sets up a class - * method as the callback.) - * - * Blocks that desire to handle messages must call this method in their - * constructors to register the handler that will be invoked when messages - * are available. - * - * If the block inherits from gr_block, the runtime system will ensure that - * msg_handler is called in a thread-safe manner, such that work and - * msg_handler will never be called concurrently. This allows msg_handler - * to update state variables without having to worry about thread-safety - * issues with work, general_work or another invocation of msg_handler. - * - * If the block inherits from gr_hier_block2, the runtime system will - * ensure that no reentrant calls are made to msg_handler. - */ - //template <typename T> void set_msg_handler(T msg_handler){ - // d_msg_handler = msg_handler_t(msg_handler); - //} - template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){ - if(msg_queue.find(which_port) == msg_queue.end()){ - throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); } - d_msg_handlers[which_port] = msg_handler_t(msg_handler); + if(pmt::pmt_dict_has_key(message_subscribers, which_port)){ + return true; } + return false; + } + + + /*! + * \brief Confirm that ninputs and noutputs is an acceptable combination. + * + * \param ninputs number of input streams connected + * \param noutputs number of output streams connected + * + * \returns true if this is a valid configuration for this block. + * + * This function is called by the runtime system whenever the + * topology changes. Most classes do not need to override this. + * This check is in addition to the constraints specified by the input + * and output gr_io_signatures. + */ + virtual bool check_topology(int ninputs, int noutputs) { return true; } + + /*! + * \brief Set the callback that is fired when messages are available. + * + * \p msg_handler can be any kind of function pointer or function object + * that has the signature: + * <pre> + * void msg_handler(pmt::pmt msg); + * </pre> + * + * (You may want to use boost::bind to massage your callable into the + * correct form. See gr_nop.{h,cc} for an example that sets up a class + * method as the callback.) + * + * Blocks that desire to handle messages must call this method in their + * constructors to register the handler that will be invoked when messages + * are available. + * + * If the block inherits from gr_block, the runtime system will ensure that + * msg_handler is called in a thread-safe manner, such that work and + * msg_handler will never be called concurrently. This allows msg_handler + * to update state variables without having to worry about thread-safety + * issues with work, general_work or another invocation of msg_handler. + * + * If the block inherits from gr_hier_block2, the runtime system will + * ensure that no reentrant calls are made to msg_handler. + */ + //template <typename T> void set_msg_handler(T msg_handler){ + // d_msg_handler = msg_handler_t(msg_handler); + //} + template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){ + if(msg_queue.find(which_port) == msg_queue.end()){ + throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); } + d_msg_handlers[which_port] = msg_handler_t(msg_handler); + } }; inline bool operator<(gr_basic_block_sptr lhs, gr_basic_block_sptr rhs) @@ -260,8 +274,8 @@ GR_CORE_API long gr_basic_block_ncurrently_allocated(); inline std::ostream &operator << (std::ostream &os, gr_basic_block_sptr basic_block) { - os << basic_block->name() << "(" << basic_block->unique_id() << ")"; - return os; + os << basic_block->name() << "(" << basic_block->unique_id() << ")"; + return os; } #endif /* INCLUDED_GR_BASIC_BLOCK_H */ diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc index e04deb9485..c19863f347 100644 --- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc +++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc @@ -31,8 +31,9 @@ #include <volk/volk.h> #include <iostream> #include <map> +#include <boost/format.hpp> -#define GR_FLAT_FLOWGRAPH_DEBUG 0 +#define GR_FLAT_FLOWGRAPH_DEBUG 0 // 32Kbyte buffer size between blocks #define GR_FIXED_BUFFER_SIZE (32*(1L<<10)) @@ -71,6 +72,15 @@ gr_flat_flowgraph::setup_connections() block->set_is_unaligned(false); } + // Connect message ports connetions + for(gr_msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++){ + if(GR_FLAT_FLOWGRAPH_DEBUG) + std::cout << boost::format("flat_fg connecting msg primitives: (%s, %s)->(%s, %s)\n") % + i->src().block() % i->src().port() % + i->dst().block() % i->dst().port(); + i->src().block()->message_port_sub( i->src().port(), pmt::pmt_cons(i->dst().block()->alias_pmt(), i->dst().port()) ); + } + } gr_block_detail_sptr @@ -350,3 +360,25 @@ gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks) return result; } + + +void gr_flat_flowgraph::replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src){ + size_t n_replr(0); + if(GR_FLAT_FLOWGRAPH_DEBUG) + std::cout << boost::format("gr_flat_flowgraph::replace_endpoint( %s, %s, %d )\n") % e.block()% r.block()% is_src; + for(size_t i=0; i<d_msg_edges.size(); i++){ + if(is_src){ + if(d_msg_edges[i].src() == e){ + d_msg_edges[i] = gr_msg_edge(r, d_msg_edges[i].dst() ); + n_replr++; + } + } else { + if(d_msg_edges[i].dst() == e){ + d_msg_edges[i] = gr_msg_edge(d_msg_edges[i].src(), r ); + n_replr++; + } + } + } +// std::cout << "n_repl = " << n_repl <<"\n"; +} + diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h index 0926bcc8f3..52f2023347 100644 --- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h +++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h @@ -46,7 +46,7 @@ public: // Wire gr_blocks together in new flat_flowgraph void setup_connections(); - + // Merge applicable connections from existing flat flowgraph void merge_connections(gr_flat_flowgraph_sptr sfg); @@ -57,6 +57,8 @@ public: */ static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks); + void replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src); + private: gr_flat_flowgraph(); diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc index 69c304a3d8..63a2084802 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc @@ -149,6 +149,16 @@ gr_flowgraph::check_valid_port(gr_io_signature_sptr sig, int port) } } +void +gr_flowgraph::check_valid_port(const gr_msg_endpoint &e) +{ + if (GR_FLOWGRAPH_DEBUG) + std::cout << "check_valid_port( " << e.block() << ", " << e.port() << ")\n"; + + if(!e.block()->has_msg_port(e.port())) + throw std::invalid_argument("invalid msg port in connect() or disconnect()"); +} + void gr_flowgraph::check_dst_not_used(const gr_endpoint &dst) { @@ -181,8 +191,10 @@ gr_flowgraph::calc_used_blocks() gr_basic_block_vector_t tmp; // make sure free standing message blocks are included - for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){ - tmp.push_back(*it); + for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) { +// for now only blocks receiving messages get a thread context - uncomment to allow senders to also obtain one +// tmp.push_back(p->src().block()); + tmp.push_back(p->dst().block()); } // Collect all blocks in the edge list @@ -477,7 +489,27 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve output.push_back(block); } -void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){ - d_msgblocks.push_back(blk); +void gr_flowgraph::connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){ + check_valid_port(src); + check_valid_port(dst); + for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) { + if(p->src() == src && p->dst() == dst){ + throw std::runtime_error("connect called on already connected edge!"); + } + } + d_msg_edges.push_back(gr_msg_edge(src,dst)); } +void gr_flowgraph::disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){ + check_valid_port(src); + check_valid_port(dst); + for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) { + if(p->src() == src && p->dst() == dst){ + d_msg_edges.erase(p); + return; + } + } + throw std::runtime_error("disconnect called on non-connected edge!"); +} + + diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h index 860cb0ff1e..bef70f626f 100644 --- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h +++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h @@ -52,6 +52,31 @@ inline bool gr_endpoint::operator==(const gr_endpoint &other) const d_port == other.d_port); } +class GR_CORE_API gr_msg_endpoint +{ +private: + gr_basic_block_sptr d_basic_block; + pmt::pmt_t d_port; + bool d_is_hier; +public: + gr_msg_endpoint() : d_basic_block(), d_port(pmt::PMT_NIL) { } + gr_msg_endpoint(gr_basic_block_sptr block, pmt::pmt_t port, bool is_hier=false){ d_basic_block = block; d_port = port; d_is_hier = is_hier;} + gr_basic_block_sptr block() const { return d_basic_block; } + pmt::pmt_t port() const { return d_port; } + bool is_hier() const { return d_is_hier; } + void set_hier(bool h) { d_is_hier = h; } + + bool operator==(const gr_msg_endpoint &other) const; + +}; + +inline bool gr_msg_endpoint::operator==(const gr_msg_endpoint &other) const +{ + return (d_basic_block == other.d_basic_block && + pmt::pmt_equal(d_port, other.d_port)); +} + + // Hold vectors of gr_endpoint objects typedef std::vector<gr_endpoint> gr_endpoint_vector_t; typedef std::vector<gr_endpoint>::iterator gr_endpoint_viter_t; @@ -75,11 +100,35 @@ private: gr_endpoint d_dst; }; + // Hold vectors of gr_edge objects typedef std::vector<gr_edge> gr_edge_vector_t; typedef std::vector<gr_edge>::iterator gr_edge_viter_t; +/*! + *\brief Class representing a msg connection between to graph msg endpoints + * + */ +class GR_CORE_API gr_msg_edge +{ +public: + gr_msg_edge() : d_src(), d_dst() { }; + gr_msg_edge(const gr_msg_endpoint &src, const gr_msg_endpoint &dst) : d_src(src), d_dst(dst) { } + ~gr_msg_edge() {} + + const gr_msg_endpoint &src() const { return d_src; } + const gr_msg_endpoint &dst() const { return d_dst; } + +private: + gr_msg_endpoint d_src; + gr_msg_endpoint d_dst; +}; + +// Hold vectors of gr_edge objects +typedef std::vector<gr_msg_edge> gr_msg_edge_vector_t; +typedef std::vector<gr_msg_edge>::iterator gr_msg_edge_viter_t; + // Create a shared pointer to a heap allocated flowgraph // (types defined in gr_runtime_types.h) GR_CORE_API gr_flowgraph_sptr gr_make_flowgraph(); @@ -110,7 +159,11 @@ public: void disconnect(gr_basic_block_sptr src_block, int src_port, gr_basic_block_sptr dst_block, int dst_port); - void add_msg_block(gr_basic_block_sptr blk); + // Connect two msg endpoints + void connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst); + + // Disconnect two msg endpoints + void disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst); // Validate connectivity, raise exception if invalid void validate(); @@ -120,6 +173,9 @@ public: // Return vector of edges const gr_edge_vector_t &edges() const { return d_edges; } + + // Return vector of msg edges + const gr_msg_edge_vector_t &msg_edges() const { return d_msg_edges; } // Return vector of connected blocks gr_basic_block_vector_t calc_used_blocks(); @@ -130,11 +186,11 @@ public: // Return vector of vectors of disjointly connected blocks, topologically // sorted. std::vector<gr_basic_block_vector_t> partition(); - gr_basic_block_vector_t d_msgblocks; protected: gr_basic_block_vector_t d_blocks; gr_edge_vector_t d_edges; + gr_msg_edge_vector_t d_msg_edges; gr_flowgraph(); std::vector<int> calc_used_ports(gr_basic_block_sptr block, bool check_inputs); @@ -146,6 +202,7 @@ protected: private: void check_valid_port(gr_io_signature_sptr sig, int port); + void check_valid_port(const gr_msg_endpoint &e); void check_dst_not_used(const gr_endpoint &dst); void check_type_match(const gr_endpoint &src, const gr_endpoint &dst); gr_edge_vector_t calc_connections(gr_basic_block_sptr block, bool check_inputs); // false=use outputs diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc index a19bfe1954..8c2794c63c 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc @@ -44,7 +44,9 @@ gr_hier_block2::gr_hier_block2(const std::string &name, gr_io_signature_sptr input_signature, gr_io_signature_sptr output_signature) : gr_basic_block(name, input_signature, output_signature), - d_detail(new gr_hier_block2_detail(this)) + d_detail(new gr_hier_block2_detail(this)), + hier_message_ports_in(pmt::PMT_NIL), + hier_message_ports_out(pmt::PMT_NIL) { // This bit of magic ensures that self() works in the constructors of derived classes. gnuradio::detail::sptr_magic::create_and_stash_initial_sptr(this); @@ -141,6 +143,7 @@ gr_hier_block2::unlock() d_detail->unlock(); } + gr_flat_flowgraph_sptr gr_hier_block2::flatten() const { diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h index e8364a740b..f80dd73e4b 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h @@ -166,6 +166,39 @@ public: gr_flat_flowgraph_sptr flatten() const; gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion + + bool has_msg_port(pmt::pmt_t which_port){ + return message_port_is_hier(which_port) || gr_basic_block::has_msg_port(which_port); + } + + bool message_port_is_hier(pmt::pmt_t port_id){ + return message_port_is_hier_in(port_id) || message_port_is_hier_out(port_id); + } + bool message_port_is_hier_in(pmt::pmt_t port_id){ + return pmt::pmt_list_has(hier_message_ports_in, port_id); + } + bool message_port_is_hier_out(pmt::pmt_t port_id){ + return pmt::pmt_list_has(hier_message_ports_out, port_id); + } + + pmt::pmt_t hier_message_ports_in; + pmt::pmt_t hier_message_ports_out; + + void message_port_register_hier_in(pmt::pmt_t port_id){ + if(pmt::pmt_list_has(hier_message_ports_in, port_id)) + throw std::invalid_argument("hier msg in port by this name already registered"); + if(msg_queue.find(port_id) != msg_queue.end()) + throw std::invalid_argument("block already has a primitive input port by this name"); + hier_message_ports_in = pmt::pmt_list_add(hier_message_ports_in, port_id); + } + void message_port_register_hier_out(pmt::pmt_t port_id){ + if(pmt::pmt_list_has(hier_message_ports_out, port_id)) + throw std::invalid_argument("hier msg out port by this name already registered"); + if(pmt::pmt_dict_has_key(message_subscribers, port_id)) + throw std::invalid_argument("block already has a primitive output port by this name"); + hier_message_ports_out = pmt::pmt_list_add(hier_message_ports_out, port_id); + } + }; inline gr_hier_block2_sptr cast_to_hier_block2_sptr(gr_basic_block_sptr block) { diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i index 7c0e62f288..a857394ca7 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i @@ -40,6 +40,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name, %rename(primitive_disconnect) gr_hier_block2::disconnect; %rename(primitive_msg_connect) gr_hier_block2::msg_connect; %rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect; +%rename(primitive_message_port_register_hier_in) gr_hier_block2::message_port_register_hier_in; +%rename(primitive_message_port_register_hier_out) gr_hier_block2::message_port_register_hier_out; class gr_hier_block2 : public gr_basic_block { @@ -78,5 +80,9 @@ public: void lock(); void unlock(); + void message_port_register_hier_in(pmt::pmt_t port_id); + void message_port_register_hier_out(pmt::pmt_t port_id); + + gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion }; diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc index ff2a5db8cc..e70553ddc1 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc @@ -27,6 +27,7 @@ #include <gr_io_signature.h> #include <stdexcept> #include <sstream> +#include <boost/format.hpp> #define GR_HIER_BLOCK2_DETAIL_DEBUG 0 @@ -53,6 +54,7 @@ gr_hier_block2_detail::gr_hier_block2_detail(gr_hier_block2 *owner) : d_outputs = gr_endpoint_vector_t(max_outputs); } + gr_hier_block2_detail::~gr_hier_block2_detail() { d_owner = 0; // Don't use delete, we didn't allocate @@ -151,15 +153,39 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport, std::cout << "connecting message port..." << std::endl; // register the subscription - src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); +// this is done later... +// src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); // add block uniquely to list to internal blocks if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){ + d_blocks.push_back(src); d_blocks.push_back(dst); } - // make sure we instantiate a thread for this block - d_fg->add_msg_block(dst); + bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);; + bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport); + + gr_hier_block2_sptr src_block(cast_to_hier_block2_sptr(src)); + gr_hier_block2_sptr dst_block(cast_to_hier_block2_sptr(dst)); + + if (src_block && src.get() != d_owner) { + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << "connect: src is hierarchical, setting parent to " << this << std::endl; + src_block->d_detail->d_parent_detail = this; + } + + if (dst_block && dst.get() != d_owner) { + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << "connect: dst is hierarchical, setting parent to " << this << std::endl; + dst_block->d_detail->d_parent_detail = this; + } + + // add edge for this message connection + if(GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << boost::format("connect( (%s, %s, %d), (%s, %s, %d) )\n") % + src % srcport % hier_out % + dst % dstport % hier_in; + d_fg->connect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in)); } void @@ -169,8 +195,13 @@ gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcpor if (GR_HIER_BLOCK2_DETAIL_DEBUG) std::cout << "disconnecting message port..." << std::endl; - // register the subscription + // unregister the subscription - if already subscribed src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport)); + + // remove edge for this message connection + bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);; + bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport); + d_fg->disconnect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in)); } void @@ -435,11 +466,16 @@ void gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const { if (GR_HIER_BLOCK2_DETAIL_DEBUG) - std::cout << "Flattening " << d_owner->name() << std::endl; + std::cout << " ** Flattening " << d_owner->name() << std::endl; // Add my edges to the flow graph, resolving references to actual endpoints gr_edge_vector_t edges = d_fg->edges(); + gr_msg_edge_vector_t msg_edges = d_fg->msg_edges(); gr_edge_viter_t p; + gr_msg_edge_viter_t q,u; + + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << "Flattening stream connections: " << std::endl; for (p = edges.begin(); p != edges.end(); p++) { if (GR_HIER_BLOCK2_DETAIL_DEBUG) @@ -457,7 +493,46 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const } } } - sfg->d_msgblocks = d_fg->d_msgblocks; + + // loop through flattening hierarchical connections + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << "Flattening msg connections: " << std::endl; + + for(q = msg_edges.begin(); q != msg_edges.end(); q++) { + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << boost::format(" flattening edge ( %s, %s, %d) -> ( %s, %s, %d)\n") % q->src().block() % q->src().port() % q->src().is_hier() % q->dst().block() % q->dst().port() % q->dst().is_hier(); + + bool normal_connection = true; + + // resolve existing connections to hier ports + if(q->dst().is_hier()){ + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << boost::format(" resolve hier output (%s, %s)") % q->dst().block() % q->dst().port() << std::endl; + sfg->replace_endpoint( q->dst(), q->src(), true ); + normal_connection = false; + } + + if(q->src().is_hier()){ + if (GR_HIER_BLOCK2_DETAIL_DEBUG) + std::cout << boost::format(" resolve hier input (%s, %s)") % q->src().block() % q->src().port() << std::endl; + sfg->replace_endpoint( q->src(), q->dst(), false ); + normal_connection = false; + } + + // propogate non hier connections through + if(normal_connection){ + sfg->connect( q->src(), q->dst() ); + } + } + +/* // connect primitive edges in the new fg + for(q = msg_edges.begin(); q != msg_edges.end(); q++) { + if( (!q->src().is_hier()) && (!q->dst().is_hier()) ){ + sfg->connect( q->src(), q->dst() ); + } else { + std::cout << "not connecting hier connection!" << std::endl; + } + }*/ // Construct unique list of blocks used either in edges, inputs, // outputs, or by themselves. I still hate STL. @@ -499,7 +574,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const // Recurse hierarchical children for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { gr_hier_block2_sptr hier_block2(cast_to_hier_block2_sptr(*p)); - if (hier_block2) { + if (hier_block2 && (hier_block2.get() != d_owner)) { if (GR_HIER_BLOCK2_DETAIL_DEBUG) std::cout << "flatten_aux: recursing into hierarchical block " << hier_block2 << std::endl; hier_block2->d_detail->flatten_aux(sfg); @@ -530,3 +605,4 @@ gr_hier_block2_detail::unlock() else d_owner->unlock(); } + diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h index f2d2b3c4e8..b38dae3016 100644 --- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h +++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h @@ -61,6 +61,7 @@ private: gr_endpoint_vector_t d_outputs; // Single internal endpoint per external output gr_basic_block_vector_t d_blocks; + void connect_input(int my_port, int port, gr_basic_block_sptr block); void connect_output(int my_port, int port, gr_basic_block_sptr block); void disconnect_input(int my_port, int port, gr_basic_block_sptr block); diff --git a/gnuradio-core/src/python/gnuradio/gr/hier_block2.py b/gnuradio-core/src/python/gnuradio/gr/hier_block2.py index 0c45f1691d..f5f0c00f53 100644 --- a/gnuradio-core/src/python/gnuradio/gr/hier_block2.py +++ b/gnuradio-core/src/python/gnuradio/gr/hier_block2.py @@ -20,6 +20,7 @@ # from gnuradio_core import hier_block2_swig +from gruel import pmt # # This hack forces a 'has-a' relationship to look like an 'is-a' one. @@ -111,3 +112,15 @@ class hier_block2(object): self._hb.primitive_disconnect(src_block.to_basic_block(), src_port, dst_block.to_basic_block(), dst_port) + def msg_connect(self, src, srcport, dst, dstport): + self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def msg_disconnect(self, src, srcport, dst, dstport): + self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def message_port_register_hier_in(self, portname): + self.primitive_message_port_register_hier_in(pmt.pmt_intern(portname)); + + def message_port_register_hier_out(self, portname): + self.primitive_message_port_register_hier_out(pmt.pmt_intern(portname)); + diff --git a/grc/blocks/block_tree.xml b/grc/blocks/block_tree.xml index 95bd7bb3ce..183883959d 100644 --- a/grc/blocks/block_tree.xml +++ b/grc/blocks/block_tree.xml @@ -43,6 +43,8 @@ <block>gr_message_strobe</block> <block>gr_pdu_to_tagged_stream</block> <block>gr_tagged_stream_to_pdu</block> + <block>gr_tuntap_pdu</block> + <block>gr_socket_pdu</block> </cat> <cat> <name>Operators</name> diff --git a/grc/blocks/gr_message_debug.xml b/grc/blocks/gr_message_debug.xml index 705a7cc5f3..4d73fbd9cc 100644 --- a/grc/blocks/gr_message_debug.xml +++ b/grc/blocks/gr_message_debug.xml @@ -19,4 +19,9 @@ <type>message</type> <optional>1</optional> </sink> + <sink> + <name>print_pdu_verbose</name> + <type>message</type> + <optional>1</optional> + </sink> </block> diff --git a/grc/blocks/gr_socket_pdu.xml b/grc/blocks/gr_socket_pdu.xml new file mode 100644 index 0000000000..a175c36991 --- /dev/null +++ b/grc/blocks/gr_socket_pdu.xml @@ -0,0 +1,62 @@ +<?xml version="1.0"?> +<!-- +################################################### +## Socket PDU Message source/sink +################################################### + --> +<block> + <name>Socket PDU</name> + <key>gr_socket_pdu</key> + <import>from gnuradio import gr</import> + <make>gr.socket_pdu($type, $host, $port, $mtu)</make> + <param> + <name>Type</name> + <key>type</key> + <value>TCP_SERVER</value> + <type>enum</type> + <option> + <name>TCP Server</name> + <key>"TCP_SERVER"</key> + </option> + <option> + <name>TCP Client</name> + <key>"TCP_CLIENT"</key> + </option> + <option> + <name>UDP Server</name> + <key>"UDP_SERVER"</key> + </option> + <option> + <name>UDP Client</name> + <key>"UDP_CLIENT"</key> + </option> + </param> + <param> + <name>Host</name> + <key>host</key> + <value></value> + <type>string</type> + </param> + <param> + <name>Port</name> + <key>port</key> + <value>52001</value> + <type>string</type> + </param> + <param> + <name>MTU</name> + <key>mtu</key> + <value>10000</value> + <type>int</type> + </param> + <sink> + <name>pdus</name> + <type>message</type> + <optional>1</optional> + </sink> + <source> + <name>pdus</name> + <type>message</type> + <optional>1</optional> + </source> +</block> diff --git a/grc/blocks/gr_tuntap_pdu.xml b/grc/blocks/gr_tuntap_pdu.xml new file mode 100644 index 0000000000..f169345afa --- /dev/null +++ b/grc/blocks/gr_tuntap_pdu.xml @@ -0,0 +1,34 @@ +<?xml version="1.0"?> +<!-- +################################################### +## Tuntap PDU Message source/sink +################################################### + --> +<block> + <name>TunTap PDU</name> + <key>gr_tuntap_pdu</key> + <import>from gnuradio import gr</import> + <make>gr.tuntap_pdu($ifn, $mtu)</make> + <param> + <name>Interface Name</name> + <key>ifn</key> + <value>tun0</value> + <type>string</type> + </param> + <param> + <name>MTU</name> + <key>mtu</key> + <value>10000</value> + <type>int</type> + </param> + <sink> + <name>pdus</name> + <type>message</type> + <optional>1</optional> + </sink> + <source> + <name>pdus</name> + <type>message</type> + <optional>1</optional> + </source> +</block> diff --git a/grc/blocks/pad_sink.xml b/grc/blocks/pad_sink.xml index f89eaa53c5..f0e10a3391 100644 --- a/grc/blocks/pad_sink.xml +++ b/grc/blocks/pad_sink.xml @@ -7,7 +7,9 @@ <block> <name>Pad Sink</name> <key>pad_sink</key> - <make></make> + <make>#if str($type) == "message" +None;self.message_port_register_hier_in($label) +#end if</make> <param> <name>Label</name> <key>label</key> @@ -43,6 +45,11 @@ <key>byte</key> <opt>size:gr.sizeof_char</opt> </option> + <option> + <name>Message</name> + <key>message</key> + <opt>size:0</opt> + </option> <option> <name>Wildcard</name> <key></key> diff --git a/grc/blocks/pad_source.xml b/grc/blocks/pad_source.xml index cbf38eb390..a56a65dcc3 100644 --- a/grc/blocks/pad_source.xml +++ b/grc/blocks/pad_source.xml @@ -7,7 +7,9 @@ <block> <name>Pad Source</name> <key>pad_source</key> - <make></make> + <make>#if str($type) == "message" +None;self.message_port_register_hier_out($label) +#end if</make> <param> <name>Label</name> <key>label</key> @@ -43,6 +45,11 @@ <key>byte</key> <opt>size:gr.sizeof_char</opt> </option> + <option> + <name>Message</name> + <key>message</key> + <opt>size:0</opt> + </option> <option> <name>Wildcard</name> <key></key> diff --git a/grc/python/FlowGraph.py b/grc/python/FlowGraph.py index efe362760c..376c2e337f 100644 --- a/grc/python/FlowGraph.py +++ b/grc/python/FlowGraph.py @@ -58,6 +58,8 @@ class FlowGraph(_FlowGraph, _GUIFlowGraph): 'in': self.get_pad_sources(), 'out': self.get_pad_sinks(), }[direction] + # we only want stream ports + sorted_pads = filter(lambda b: b.get_param('type').get_evaluated() != 'message', sorted_pads); #load io signature return [{ 'label': str(pad.get_param('label').get_evaluated()), @@ -83,6 +85,14 @@ class FlowGraph(_FlowGraph, _GUIFlowGraph): pads = filter(lambda b: b.get_key() == 'pad_sink', self.get_enabled_blocks()) return sorted(pads, lambda x, y: cmp(x.get_id(), y.get_id())) + def get_msg_pad_sources(self): + ps = self.get_pad_sources(); + return filter(lambda b: b.get_param('type').get_evaluated() == 'message', ps); + + def get_msg_pad_sinks(self): + ps = self.get_pad_sinks(); + return filter(lambda b: b.get_param('type').get_evaluated() == 'message', ps); + def get_imports(self): """ Get a set of all import statments in this flow graph namespace. diff --git a/grc/python/convert_hier.py b/grc/python/convert_hier.py index b609af24ae..508ec63b2b 100644 --- a/grc/python/convert_hier.py +++ b/grc/python/convert_hier.py @@ -25,6 +25,8 @@ def convert_hier(flow_graph, python_file): #extract info from the flow graph input_sigs = flow_graph.get_io_signaturev('in') output_sigs = flow_graph.get_io_signaturev('out') + input_msgp = flow_graph.get_msg_pad_sources(); + output_msgp = flow_graph.get_msg_pad_sinks(); parameters = flow_graph.get_parameters() block_key = flow_graph.get_option('id') block_name = flow_graph.get_option('title') or flow_graph.get_option('id').replace('_', ' ').title() @@ -55,7 +57,7 @@ def convert_hier(flow_graph, python_file): param_n['type'] = 'raw' params_n.append(param_n) block_n['param'] = params_n - #sink data + #sink data stream ports block_n['sink'] = list() for input_sig in input_sigs: sink_n = odict() @@ -64,7 +66,14 @@ def convert_hier(flow_graph, python_file): sink_n['vlen'] = input_sig['vlen'] if input_sig['optional']: sink_n['optional'] = '1' block_n['sink'].append(sink_n) - #source data + #sink data msg ports + for input_sig in input_msgp: + sink_n = odict() + sink_n['name'] = input_sig.get_param("label").get_value(); + sink_n['type'] = "message" + sink_n['optional'] = input_sig.get_param("optional").get_value(); + block_n['sink'].append(sink_n) + #source data stream ports block_n['source'] = list() for output_sig in output_sigs: source_n = odict() @@ -73,6 +82,13 @@ def convert_hier(flow_graph, python_file): source_n['vlen'] = output_sig['vlen'] if output_sig['optional']: source_n['optional'] = '1' block_n['source'].append(source_n) + #source data msg ports + for output_sig in output_msgp: + source_n = odict() + source_n['name'] = output_sig.get_param("label").get_value(); + source_n['type'] = "message" + source_n['optional'] = output_sig.get_param("optional").get_value(); + block_n['source'].append(source_n) #doc data block_n['doc'] = "%s\n%s\n%s"%(block_author, block_desc, python_file) block_n['grc_source'] = "%s"%(flow_graph.grc_file_path) diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl index af55ad641a..163e7f76aa 100644 --- a/grc/python/flow_graph.tmpl +++ b/grc/python/flow_graph.tmpl @@ -189,6 +189,7 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))]) self.connect($make_port_sig($source), $make_port_sig($sink)) #end if #end for + ######################################################## ##Create Asynch Message Connections ######################################################## @@ -198,7 +199,21 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))]) $DIVIDER #end if #for $msg in $messages2 - self.msg_connect(self.$msg.get_source().get_parent().get_id(), "$msg.get_source().get_name()", self.$msg.get_sink().get_parent().get_id(), "$msg.get_sink().get_name()") + #set $sr = $msg.get_source() + #set $source = "self.%s"%($sr.get_parent().get_id()) + #set $source_port = $sr.get_name(); + #if $sr.get_parent().get_key() == "pad_source" + #set $source = "self" + #set $source_port = $sr.get_parent().get_param("label").get_value(); + #end if + #set $sk = $msg.get_sink() + #set $sink = "self.%s"%($sk.get_parent().get_id()) + #set $sink_port = $sk.get_name(); + #if $sk.get_parent().get_key() == "pad_sink" + #set $sink = "self" + #set $sink_port = $sk.get_parent().get_param("label").get_value(); + #end if + self.msg_connect($source, "$source_port", $sink, "$sink_port") #end for ######################################################## diff --git a/gruel/src/include/gruel/pmt.h b/gruel/src/include/gruel/pmt.h index a462155c5f..d09686783c 100644 --- a/gruel/src/include/gruel/pmt.h +++ b/gruel/src/include/gruel/pmt.h @@ -734,6 +734,12 @@ GRUEL_API pmt_t pmt_list_add(pmt_t list, const pmt_t& item); */ GRUEL_API pmt_t pmt_list_rm(pmt_t list, const pmt_t& item); +/*! + * \brief Return bool of \p list contains \p item + */ +GRUEL_API bool pmt_list_has(pmt_t list, const pmt_t& item); + + /* * ------------------------------------------------------------------------ * read / write diff --git a/gruel/src/lib/pmt/pmt.cc b/gruel/src/lib/pmt/pmt.cc index 3eb39ed7b1..e5baca98a8 100644 --- a/gruel/src/lib/pmt/pmt.cc +++ b/gruel/src/lib/pmt/pmt.cc @@ -1340,6 +1340,22 @@ pmt_list_rm(pmt_t list, const pmt_t& item) } } +bool +pmt_list_has(pmt_t list, const pmt_t& item) +{ + if(pmt_is_pair(list)){ + pmt_t left = pmt_car(list); + pmt_t right = pmt_cdr(list); + if(pmt_equal(left,item)) + return true; + return pmt_list_has(right, item); + } else { + if(pmt_is_null(list)) + return false; + throw std::runtime_error("list contains invalid format!"); + } +} + pmt_t pmt_caar(pmt_t pair) { diff --git a/gruel/src/swig/pmt_swig.i b/gruel/src/swig/pmt_swig.i index d46143424b..b1628c9983 100644 --- a/gruel/src/swig/pmt_swig.i +++ b/gruel/src/swig/pmt_swig.i @@ -701,6 +701,11 @@ pmt_t pmt_list_add(pmt_t list, const pmt_t& item); */ pmt_t pmt_list_rm(pmt_t list, const pmt_t& item); +/*! + * \brief Return bool of \p list contains \p item + */ +bool pmt_list_has(pmt_t list, const pmt_t& item); + /* * ------------------------------------------------------------------------ * read / write diff --git a/volk/CMakeLists.txt b/volk/CMakeLists.txt index 68385f9740..9519505eb9 100644 --- a/volk/CMakeLists.txt +++ b/volk/CMakeLists.txt @@ -38,12 +38,8 @@ set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) #location for custom "M # Environment setup ######################################################################## IF(NOT DEFINED BOOST_ROOT) - SET(BOOST_ROOT "") + SET(BOOST_ROOT ${CMAKE_INSTALL_PREFIX}) ENDIF() -SET(BOOST_ROOT ${BOOST_ROOT} CACHE STRING "Modify search path for Boost components") - -#after caching user-defined value, make sure to add the install prefix -SET(BOOST_ROOT ${BOOST_ROOT}:${CMAKE_INSTALL_PREFIX}) IF(NOT DEFINED CROSSCOMPILE_MULTILIB) SET(CROSSCOMPILE_MULTILIB "") @@ -77,6 +73,10 @@ set(Boost_ADDITIONAL_VERSIONS ) find_package(Boost COMPONENTS unit_test_framework) +if(NOT Boost_FOUND) + message(FATAL_ERROR "VOLK Requires boost to build") +endif() + find_package(ORC) ######################################################################## diff --git a/volk/apps/CMakeLists.txt b/volk/apps/CMakeLists.txt index 175105a5a3..a89a9409d8 100644 --- a/volk/apps/CMakeLists.txt +++ b/volk/apps/CMakeLists.txt @@ -18,7 +18,7 @@ ######################################################################## # Setup profiler ######################################################################## -find_package(Boost) +find_package(Boost COMPONENTS unit_test_framework) if(Boost_FOUND AND UNIX) #uses mkdir and $HOME -- cgit v1.2.3 From ee53cca50e3f39ddc1d44669c3ea9f0d73d32022 Mon Sep 17 00:00:00 2001 From: "Brett L. Trotter" <blt@webtrotter.com> Date: Tue, 11 Dec 2012 21:53:08 -0500 Subject: core: Patch to file source to allow opening of new files while running. Addresses issues #352. --- gnuradio-core/src/lib/io/gr_file_source.cc | 96 ++++++++++++++++++++++++------ gnuradio-core/src/lib/io/gr_file_source.h | 64 ++++++++++++++++---- gnuradio-core/src/lib/io/gr_file_source.i | 2 + grc/blocks/gr_file_source.xml | 1 + 4 files changed, 131 insertions(+), 32 deletions(-) (limited to 'grc') diff --git a/gnuradio-core/src/lib/io/gr_file_source.cc b/gnuradio-core/src/lib/io/gr_file_source.cc index 96333fa24c..09f3986cd2 100644 --- a/gnuradio-core/src/lib/io/gr_file_source.cc +++ b/gnuradio-core/src/lib/io/gr_file_source.cc @@ -49,24 +49,14 @@ #define OUR_O_LARGEFILE 0 #endif -gr_file_source::gr_file_source (size_t itemsize, const char *filename, bool repeat) - : gr_sync_block ("file_source", - gr_make_io_signature (0, 0, 0), - gr_make_io_signature (1, 1, itemsize)), - d_itemsize (itemsize), d_fp (0), d_repeat (repeat) +gr_file_source::gr_file_source(size_t itemsize, const char *filename, bool repeat) + : gr_sync_block("file_source", + gr_make_io_signature (0, 0, 0), + gr_make_io_signature (1, 1, itemsize)), + d_itemsize(itemsize), d_fp(0), d_new_fp (0), d_repeat(repeat), + d_updated(false) { - // we use "open" to use to the O_LARGEFILE flag - - int fd; - if ((fd = open (filename, O_RDONLY | OUR_O_LARGEFILE | OUR_O_BINARY)) < 0){ - perror (filename); - throw std::runtime_error ("can't open file"); - } - - if ((d_fp = fdopen (fd, "rb")) == NULL){ - perror (filename); - throw std::runtime_error ("can't open file"); - } + open(filename, repeat); } // public constructor that returns a shared_ptr @@ -79,7 +69,11 @@ gr_make_file_source (size_t itemsize, const char *filename, bool repeat) gr_file_source::~gr_file_source () { - fclose ((FILE *) d_fp); + close(); + if(d_fp) { + fclose(d_fp); + d_fp = 0; + } } int @@ -91,6 +85,11 @@ gr_file_source::work (int noutput_items, int i; int size = noutput_items; + do_update(); // update d_fp is reqd + if(d_fp == NULL) + throw std::runtime_error("work with file not open"); + + boost::mutex::scoped_lock lock(fp_mutex); // hold for the rest of this function while (size) { i = fread(o, d_itemsize, size, (FILE *) d_fp); @@ -129,5 +128,64 @@ gr_file_source::work (int noutput_items, bool gr_file_source::seek (long seek_point, int whence) { - return fseek ((FILE *) d_fp, seek_point * d_itemsize, whence) == 0; + // obtain exclusive access for duration of this function + boost::mutex::scoped_lock lock(fp_mutex); + return fseek((FILE *) d_fp, seek_point * d_itemsize, whence) == 0; +} + +void +gr_file_source::open(const char *filename, bool repeat) +{ + // obtain exclusive access for duration of this function + boost::mutex::scoped_lock lock(fp_mutex); + + int fd; + + // we use "open" to use to the O_LARGEFILE flag + if((fd = ::open(filename, O_RDONLY | OUR_O_LARGEFILE | OUR_O_BINARY)) < 0) { + perror(filename); + throw std::runtime_error("can't open file"); + } + + if(d_new_fp) { + fclose(d_new_fp); + d_new_fp = 0; + } + + if((d_new_fp = fdopen (fd, "rb")) == NULL) { + perror(filename); + ::close(fd); // don't leak file descriptor if fdopen fails + throw std::runtime_error("can't open file"); + } + + d_updated = true; + d_repeat = repeat; +} + +void +gr_file_source::close() +{ + // obtain exclusive access for duration of this function + boost::mutex::scoped_lock lock(fp_mutex); + + if(d_new_fp != NULL) { + fclose(d_new_fp); + d_new_fp = NULL; + } + d_updated = true; +} + +void +gr_file_source::do_update() +{ + if(d_updated) { + boost::mutex::scoped_lock lock(fp_mutex); // hold while in scope + + if(d_fp) + fclose(d_fp); + + d_fp = d_new_fp; // install new file pointer + d_new_fp = 0; + d_updated = false; + } } diff --git a/gnuradio-core/src/lib/io/gr_file_source.h b/gnuradio-core/src/lib/io/gr_file_source.h index 1cc44a3b1f..0478fba04b 100644 --- a/gnuradio-core/src/lib/io/gr_file_source.h +++ b/gnuradio-core/src/lib/io/gr_file_source.h @@ -25,6 +25,7 @@ #include <gr_core_api.h> #include <gr_sync_block.h> +#include <boost/thread/mutex.hpp> class gr_file_source; typedef boost::shared_ptr<gr_file_source> gr_file_source_sptr; @@ -39,31 +40,68 @@ gr_make_file_source (size_t itemsize, const char *filename, bool repeat = false) class GR_CORE_API gr_file_source : public gr_sync_block { - friend GR_CORE_API gr_file_source_sptr gr_make_file_source (size_t itemsize, - const char *filename, - bool repeat); private: - size_t d_itemsize; - void *d_fp; - bool d_repeat; + size_t d_itemsize; + FILE *d_fp; + FILE *d_new_fp; + bool d_repeat; + bool d_updated; protected: - gr_file_source (size_t itemsize, const char *filename, bool repeat); + gr_file_source(size_t itemsize, const char *filename, bool repeat); + + void do_update(); + + boost::mutex fp_mutex; public: - ~gr_file_source (); + /*! + * \brief Create a file source. + * + * Opens \p filename as a source of items into a flowgraph. The data + * is expected to be in binary format, item after item. The \p + * itemsize of the block determines the conversion from bits to + * items. + * + * If \p repeat is turned on, the file will repeat the file after + * it's reached the end. + * + * \param itemsize the size of each item in the file, in bytes + * \param filename name of the file to source from + * \param repeat repeat file from start + */ + friend GR_CORE_API gr_file_source_sptr + gr_make_file_source(size_t itemsize, + const char *filename, + bool repeat); + + ~gr_file_source(); - int work (int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); /*! - * \brief seek file to \p seek_point relative to \p whence + * \brief Seek file to \p seek_point relative to \p whence * * \param seek_point sample offset in file * \param whence one of SEEK_SET, SEEK_CUR, SEEK_END (man fseek) */ - bool seek (long seek_point, int whence); + bool seek(long seek_point, int whence); + + /*! + * \brief Opens a new file. + * + * \param filename name of the file to source from + * \param repeat repeat file from start + */ + void open(const char *filename, bool repeat); + + /*! + * \brief Close the file handle. + */ + void close(); + }; #endif /* INCLUDED_GR_FILE_SOURCE_H */ diff --git a/gnuradio-core/src/lib/io/gr_file_source.i b/gnuradio-core/src/lib/io/gr_file_source.i index 9bf44691d0..e71cef0d14 100644 --- a/gnuradio-core/src/lib/io/gr_file_source.i +++ b/gnuradio-core/src/lib/io/gr_file_source.i @@ -40,4 +40,6 @@ class gr_file_source : public gr_sync_block ~gr_file_source (); bool seek (long seek_point, int whence); + void open (const char *filename, bool repeat); + void close(); }; diff --git a/grc/blocks/gr_file_source.xml b/grc/blocks/gr_file_source.xml index fcc7a70401..5f0e16b279 100644 --- a/grc/blocks/gr_file_source.xml +++ b/grc/blocks/gr_file_source.xml @@ -9,6 +9,7 @@ <key>gr_file_source</key> <import>from gnuradio import gr</import> <make>gr.file_source($type.size*$vlen, $file, $repeat)</make> + <callback>open($file, $repeat)</callback> <param> <name>File</name> <key>file</key> -- cgit v1.2.3