summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/req_msg_source_impl.cc
diff options
context:
space:
mode:
authorMarcus Müller <mmueller@gnuradio.org>2019-08-07 21:45:12 +0200
committerMarcus Müller <marcus@hostalia.de>2019-08-09 23:04:28 +0200
commitf7bbf2c1d8d780294f3e016aff239ca35eb6516e (patch)
treee09ab6112e02b2215b2d59ac24d3d6ea2edac745 /gr-zeromq/lib/req_msg_source_impl.cc
parent78431dc6941e3acc67c858277dfe4a0ed583643c (diff)
Tree: clang-format without the include sorting
Diffstat (limited to 'gr-zeromq/lib/req_msg_source_impl.cc')
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc144
1 files changed, 72 insertions, 72 deletions
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index 6c80a77f27..6d44aa3cf0 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -31,104 +31,104 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- req_msg_source::sptr
- req_msg_source::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new req_msg_source_impl(address, timeout));
- }
+req_msg_source::sptr req_msg_source::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new req_msg_source_impl(address, timeout));
+}
- req_msg_source_impl::req_msg_source_impl(char *address, int timeout)
- : gr::block("req_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+req_msg_source_impl::req_msg_source_impl(char* address, int timeout)
+ : gr::block("req_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
d_port(pmt::mp("out"))
- {
- int major, minor, patch;
- zmq::version(&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->connect (address);
-
- message_port_register_out(d_port);
- }
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- req_msg_source_impl::~req_msg_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ if (major < 3) {
+ d_timeout = timeout * 1000;
}
- bool req_msg_source_impl::start()
- {
- d_finished = false;
- d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this));
- return true;
- }
-
- bool req_msg_source_impl::stop()
- {
- d_finished = true;
- d_thread->join();
- return true;
- }
-
- void req_msg_source_impl::readloop()
- {
- while(!d_finished){
- //std::cout << "readloop\n";
-
- zmq::pollitem_t itemsout[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLOUT, 0 } };
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->connect(address);
+
+ message_port_register_out(d_port);
+}
+
+req_msg_source_impl::~req_msg_source_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
+
+bool req_msg_source_impl::start()
+{
+ d_finished = false;
+ d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this));
+ return true;
+}
+
+bool req_msg_source_impl::stop()
+{
+ d_finished = true;
+ d_thread->join();
+ return true;
+}
+
+void req_msg_source_impl::readloop()
+{
+ while (!d_finished) {
+ // std::cout << "readloop\n";
+
+ zmq::pollitem_t itemsout[] = {
+ { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 }
+ };
zmq::poll(&itemsout[0], 1, d_timeout);
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable?
- int nmsg = 1;
- zmq::message_t request(sizeof(int));
- memcpy((void *) request.data (), &nmsg, sizeof(int));
+ // Request data, FIXME non portable?
+ int nmsg = 1;
+ zmq::message_t request(sizeof(int));
+ memcpy((void*)request.data(), &nmsg, sizeof(int));
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(request, zmq::send_flags::none);
+ d_socket->send(request, zmq::send_flags::none);
#else
- d_socket->send(request);
+ d_socket->send(request);
#endif
}
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t msg;
+ // Receive data
+ zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket->recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket->recv(&msg);
#endif
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
- message_port_pub(d_port, m);
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ message_port_pub(d_port, m);
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ boost::this_thread::sleep(boost::posix_time::microseconds(100));
}
- }
}
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */