diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 14:03:03 -0800 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 14:03:03 -0800 |
commit | 7b77431d3a4971ac62c6e71a813151404c1ae5e3 (patch) | |
tree | de507c57b070ec4759ab85fc53b37f5b0a11d327 /gr-zeromq | |
parent | 9bf7123e477772bcb5fc53d3139e75c4d63a044a (diff) |
zeromq: cleanup and converted push_msg_sink to derive from gr::block
Diffstat (limited to 'gr-zeromq')
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h | 2 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h | 14 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h | 2 | ||||
-rw-r--r-- | gr-zeromq/lib/push_msg_sink_impl.cc | 60 | ||||
-rw-r--r-- | gr-zeromq/lib/push_msg_sink_impl.h | 3 |
5 files changed, 21 insertions, 60 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h index ffcb47544f..8cf4bcfab4 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h @@ -24,7 +24,7 @@ #define INCLUDED_ZEROMQ_PUB_MSG_SINK_H #include <gnuradio/zeromq/api.h> -#include <gnuradio/sync_block.h> +#include <gnuradio/block.h> namespace gr { namespace zeromq { diff --git a/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h index 27a19b210b..3ce6ebbdc0 100644 --- a/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h @@ -24,24 +24,22 @@ #define INCLUDED_ZEROMQ_PUSH_MSG_SINK_H #include <gnuradio/zeromq/api.h> -#include <gnuradio/sync_block.h> +#include <gnuradio/block.h> namespace gr { namespace zeromq { /*! - * \brief Sink the contents of a stream to a ZMQ PUSH socket + * \brief Sink the contents of a msg port to a ZMQ PUSH socket * \ingroup zeromq * * \details - * This block acts a a streaming sink for a GNU Radio flowgraph - * and writes its contents to a ZMQ PUSH socket. A PUSH socket - * will round-robin send its messages to each connected ZMQ PULL - * socket, either another gr-zeromq source block or a regular, + * This block acts a message port receiver and writes individual + * messages to a ZMQ PUSH socket. The corresponding receiving ZMQ + * PULL socket can be either another gr-zeromq source block or a * non-GNU Radio ZMQ socket. - * */ - class ZEROMQ_API push_msg_sink : virtual public gr::sync_block + class ZEROMQ_API push_msg_sink : virtual public gr::block { public: typedef boost::shared_ptr<push_msg_sink> sptr; diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h index 2e3fc7bed5..d06a83c1fd 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h @@ -24,7 +24,7 @@ #define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H #include <gnuradio/zeromq/api.h> -#include <gnuradio/sync_block.h> +#include <gnuradio/block.h> namespace gr { namespace zeromq { diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc index 6266cd6a57..e9cc5bc435 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.cc +++ b/gr-zeromq/lib/push_msg_sink_impl.cc @@ -39,25 +39,28 @@ namespace gr { } push_msg_sink_impl::push_msg_sink_impl(char *address, int timeout) - : gr::sync_block("push_msg_sink", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), + : gr::block("push_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout) { int major, minor, patch; zmq::version (&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind (address); + d_socket->bind(address); message_port_register_in(pmt::mp("in")); - set_msg_handler( pmt::mp("in"), - boost::bind(&push_msg_sink_impl::handler, this, _1)); + set_msg_handler(pmt::mp("in"), + boost::bind(&push_msg_sink_impl::handler, this, _1)); } push_msg_sink_impl::~push_msg_sink_impl() @@ -67,52 +70,15 @@ namespace gr { delete d_context; } - void push_msg_sink_impl::handler(pmt::pmt_t msg){ + void push_msg_sink_impl::handler(pmt::pmt_t msg) + { std::stringbuf sb(""); pmt::serialize( msg, sb ); std::string s = sb.str(); zmq::message_t zmsg(s.size()); - memcpy( zmsg.data(), s.c_str(), s.size() ); - d_socket->send(zmsg); - } - - int - push_msg_sink_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - return noutput_items; -/* const char *in = (const char *) input_items[0]; - - zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; - zmq::poll (&itemsout[0], 1, d_timeout); - - // If we got a reply, process - if (itemsout[0].revents & ZMQ_POLLOUT) { - - // encode the current offset, # tags, and tags into header - std::string header(""); - - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); - } - - // create message copy and send - zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); - if(d_pass_tags) - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - d_socket->send(msg); - - return noutput_items; - } - else { - return 0; - }*/ + memcpy(zmsg.data(), s.c_str(), s.size()); + d_socket->send(zmsg); } } /* namespace zeromq */ diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h index b77c998506..d669d327cb 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.h +++ b/gr-zeromq/lib/push_msg_sink_impl.h @@ -40,9 +40,6 @@ namespace gr { push_msg_sink_impl(char *address, int timeout); ~push_msg_sink_impl(); - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); void handler(pmt::pmt_t msg); }; |