diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 13:46:33 -0800 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2015-01-12 13:46:33 -0800 |
commit | 9bf7123e477772bcb5fc53d3139e75c4d63a044a (patch) | |
tree | 61bfa9d6d5ca23f531a8e2f99a7ead339bca05bc /gr-zeromq/lib/sub_msg_source_impl.cc | |
parent | 3827ff325ef1bcdbeedb46b7b4efff4f897da0ae (diff) |
zeromq: cleanup and convert sub_msg_source to derive from gr::block
Diffstat (limited to 'gr-zeromq/lib/sub_msg_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/sub_msg_source_impl.cc | 65 |
1 files changed, 29 insertions, 36 deletions
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index f7a9bc9c26..b016405d40 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -39,19 +39,21 @@ namespace gr { } sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout) - : gr::sync_block("sub_msg_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), + : gr::block("sub_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout) { int major, minor, patch; - zmq::version (&major, &minor, &patch); + zmq::version(&major, &minor, &patch); + if (major < 3) { d_timeout = timeout*1000; } + d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); - //int time = 0; + d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); d_socket->connect (address); @@ -65,52 +67,43 @@ namespace gr { delete d_context; } - bool sub_msg_source_impl::start(){ + bool sub_msg_source_impl::start() + { d_finished = false; - d_thread = new boost::thread( boost::bind( &sub_msg_source_impl::readloop , this ) ); + d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this)); return true; } - bool sub_msg_source_impl::stop(){ + bool sub_msg_source_impl::stop() + { d_finished = true; - d_thread->join(); + d_thread->join(); return true; } - void sub_msg_source_impl::readloop(){ + void sub_msg_source_impl::readloop() + { while(!d_finished){ - //std::cout << "readloop\n"; - + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); + zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t msg; - d_socket->recv(&msg); - - //std::cout << "got msg...\n"; - - std::string buf(static_cast<char*>(msg.data()), msg.size()); - std::stringbuf sb(buf); - pmt::pmt_t m = pmt::deserialize(sb); - //std::cout << m << "\n"; - message_port_pub(pmt::mp("out"), m); - - } else { - usleep(100); - } - } - } + // Receive data + zmq::message_t msg; + d_socket->recv(&msg); - int - sub_msg_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - return noutput_items; + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + + message_port_pub(pmt::mp("out"), m); + } else { + usleep(100); + } + } } } /* namespace zeromq */ |