diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 16:04:45 -0800 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 16:06:41 -0800 |
commit | 5de5b1a87f71bdbed9925f80648756c566c3a91b (patch) | |
tree | 9ce1394c23245358974d9b32338c46c35d49c72b | |
parent | 7bf8b05bde26bfb1e2df684e2ab55b878b25350c (diff) |
zeromq: cleanup and made req_msg_source derive from gr::block
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/req_msg_source.h | 13 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.cc | 67 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.h | 9 |
3 files changed, 39 insertions, 50 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h index cf9183375f..05d80b8e7f 100644 --- a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013-2015 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -24,20 +24,20 @@ #define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H #include <gnuradio/zeromq/api.h> -#include <gnuradio/sync_block.h> +#include <gnuradio/block.h> namespace gr { namespace zeromq { /*! - * \brief Receive messages on ZMQ REQ socket and source stream + * \brief Receive messages on ZMQ REQ socket output async messages * \ingroup zeromq * * \details - * This block will connect to a ZMQ REP socket, then produce all - * incoming messages as streaming output. + * This block will connect to a ZMQ REP socket, then resend all + * incoming messages as asynchronous messages. */ - class ZEROMQ_API req_msg_source : virtual public gr::sync_block + class ZEROMQ_API req_msg_source : virtual public gr::block { public: typedef boost::shared_ptr<req_msg_source> sptr; @@ -45,7 +45,6 @@ namespace gr { /*! * \brief Return a shared_ptr to a new instance of zeromq::req_msg_source. * - * * \param address ZMQ socket address specifier * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index 2dda15208f..b30ef2679d 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -39,18 +39,21 @@ namespace gr { } req_msg_source_impl::req_msg_source_impl(char *address, int timeout) - : gr::sync_block("req_msg_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), - d_timeout(timeout) + : gr::block("req_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) { int major, minor, patch; - zmq::version (&major, &minor, &patch); + zmq::version(&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); + int time = 0; d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); d_socket->connect (address); @@ -65,67 +68,55 @@ namespace gr { delete d_context; } - bool req_msg_source_impl::start(){ + bool req_msg_source_impl::start() + { d_finished = false; - d_thread = new boost::thread( boost::bind( &req_msg_source_impl::readloop , this ) ); + d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this)); return true; } - bool req_msg_source_impl::stop(){ + bool req_msg_source_impl::stop() + { d_finished = true; d_thread->join(); return true; } - void req_msg_source_impl::readloop(){ + void req_msg_source_impl::readloop() + { while(!d_finished){ //std::cout << "readloop\n"; - + zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; - zmq::poll (&itemsout[0], 1, d_timeout); - + zmq::poll(&itemsout[0], 1, d_timeout); + // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { // Request data, FIXME non portable? int nmsg = 1; zmq::message_t request(sizeof(int)); - memcpy ((void *) request.data (), &nmsg, sizeof(int)); + memcpy((void *) request.data (), &nmsg, sizeof(int)); d_socket->send(request); - //std::cout << "sent request...\n"; } zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; zmq::poll (&items[0], 1, d_timeout); - //std::cout << "rx response...\n"; // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - //std::cout << "rx response... got data\n"; - - // Receive data - zmq::message_t msg; - d_socket->recv(&msg); + // Receive data + zmq::message_t msg; + d_socket->recv(&msg); - //std::cout << "got msg...\n"; + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + message_port_pub(pmt::mp("out"), m); - std::string buf(static_cast<char*>(msg.data()), msg.size()); - std::stringbuf sb(buf); - pmt::pmt_t m = pmt::deserialize(sb); - //std::cout << m << "\n"; - message_port_pub(pmt::mp("out"), m); - - } else { - usleep(100); - } + } else { + usleep(100); } - } - - int - req_msg_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - return noutput_items; + } } } /* namespace zeromq */ diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h index 3a691743b5..5835dd4b4d 100644 --- a/gr-zeromq/lib/req_msg_source_impl.h +++ b/gr-zeromq/lib/req_msg_source_impl.h @@ -35,19 +35,18 @@ namespace gr { int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + boost::thread *d_thread; + void readloop(); - boost::thread *d_thread; public: + bool d_finished; + req_msg_source_impl(char *address, int timeout); ~req_msg_source_impl(); bool start(); bool stop(); - bool d_finished; - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); }; } // namespace zeromq |