diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 13:19:48 -0800 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 13:19:48 -0800 |
commit | 3827ff325ef1bcdbeedb46b7b4efff4f897da0ae (patch) | |
tree | 1ff2d92f20d4635827fe06e672a39771d61422b1 | |
parent | c13049f2fbe51dcb6e655b11e172514a3fc085b4 (diff) |
zeromq: cleanup and convert pub_msg_sink to derive from gr::block
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h | 10 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_msg_sink_impl.cc | 25 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_msg_sink_impl.h | 3 |
3 files changed, 15 insertions, 23 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h index d626ddd147..ffcb47544f 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h @@ -30,17 +30,17 @@ namespace gr { namespace zeromq { /*! - * \brief Sink the contents of a stream to a ZMQ PUB socket + * \brief Sink the contents of a msg port to a ZMQ PUB socket * \ingroup zeromq * * \details - * This block acts a a streaming sink for a GNU Radio flowgraph - * and writes its contents to a ZMQ PUB socket. A PUB socket may - * have subscribers and will pass all incoming stream data to each + * This block acts a message port receiver and writes individual + * messages to a ZMQ PUB socket. A PUB socket may have + * subscribers and will pass all incoming messages to each * subscriber. Subscribers can be either another gr-zeromq source * block or a non-GNU Radio ZMQ socket. */ - class ZEROMQ_API pub_msg_sink : virtual public gr::sync_block + class ZEROMQ_API pub_msg_sink : virtual public gr::block { public: typedef boost::shared_ptr<pub_msg_sink> sptr; diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc index 0264aef83b..5fc7164d2b 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.cc +++ b/gr-zeromq/lib/pub_msg_sink_impl.cc @@ -39,9 +39,9 @@ namespace gr { } pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout) - : gr::sync_block("pub_msg_sink", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), + : gr::block("pub_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout) { int major, minor, patch; @@ -49,6 +49,7 @@ namespace gr { if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); int time = 0; @@ -56,7 +57,7 @@ namespace gr { d_socket->bind(address); message_port_register_in(pmt::mp("in")); - set_msg_handler( pmt::mp("in"), + set_msg_handler( pmt::mp("in"), boost::bind(&pub_msg_sink_impl::handler, this, _1)); } @@ -67,21 +68,15 @@ namespace gr { delete d_context; } - void pub_msg_sink_impl::handler(pmt::pmt_t msg){ + void pub_msg_sink_impl::handler(pmt::pmt_t msg) + { std::stringbuf sb(""); - pmt::serialize( msg, sb ); + pmt::serialize(msg, sb); std::string s = sb.str(); zmq::message_t zmsg(s.size()); - memcpy( zmsg.data(), s.c_str(), s.size() ); - d_socket->send(zmsg); - } - int - pub_msg_sink_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - return noutput_items; + memcpy(zmsg.data(), s.c_str(), s.size()); + d_socket->send(zmsg); } } /* namespace zeromq */ diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h index 4bbf7ee944..747ac7ee85 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.h +++ b/gr-zeromq/lib/pub_msg_sink_impl.h @@ -40,9 +40,6 @@ namespace gr { pub_msg_sink_impl(char *address, int timeout); ~pub_msg_sink_impl(); - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); void handler(pmt::pmt_t msg); }; |