diff options
author | Thomas Habets <thomas@habets.se> | 2020-08-30 13:26:41 +0100 |
---|---|---|
committer | Marcus Müller <marcus@hostalia.de> | 2020-09-22 21:48:57 +0200 |
commit | 9b4e6276897493c10067544b6469a0fa40d9402e (patch) | |
tree | 4f200bedd371255a2f970103af22180daeccfe8d /gr-zeromq/lib | |
parent | ebf4b70e3fa9187c947b6b2f8352dafe36b7a77d (diff) |
zeromq: Remove manual memory management
I believe this fixes a memory leak, as the thread objects were never
deleted.
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 54 | ||||
-rw-r--r-- | gr-zeromq/lib/base_impl.h | 4 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_msg_sink_impl.cc | 24 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_msg_sink_impl.h | 6 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_msg_source_impl.cc | 28 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_msg_source_impl.h | 8 | ||||
-rw-r--r-- | gr-zeromq/lib/push_msg_sink_impl.cc | 24 | ||||
-rw-r--r-- | gr-zeromq/lib/push_msg_sink_impl.h | 6 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.cc | 2 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_msg_sink_impl.cc | 32 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_msg_sink_impl.h | 8 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 6 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.cc | 34 | ||||
-rw-r--r-- | gr-zeromq/lib/req_msg_source_impl.h | 8 | ||||
-rw-r--r-- | gr-zeromq/lib/req_source_impl.cc | 4 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_msg_source_impl.cc | 28 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_msg_source_impl.h | 8 | ||||
-rw-r--r-- | gr-zeromq/lib/sub_source_impl.cc | 2 |
18 files changed, 127 insertions, 159 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index 67f800274f..5889490898 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -25,7 +25,12 @@ base_impl::base_impl(int type, int timeout, bool pass_tags, const std::string& key) - : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key) + : d_context(1), + d_socket(d_context, type), + d_vsize(itemsize * vlen), + d_timeout(timeout), + d_pass_tags(pass_tags), + d_key(key) { /* "Fix" timeout value (ms for new API, us for old API) */ int major, minor, patch; @@ -34,24 +39,15 @@ base_impl::base_impl(int type, if (major < 3) { d_timeout *= 1000; } - - /* Create context & socket */ - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, type); } -base_impl::~base_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +base_impl::~base_impl() {} std::string base_impl::last_endpoint() { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } @@ -69,15 +65,15 @@ base_sink_impl::base_sink_impl(int type, /* Set high watermark */ if (hwm >= 0) { #ifdef ZMQ_SNDHWM - d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + d_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; - d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); + d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif } /* Bind */ - d_socket->bind(address); + d_socket.bind(address); } int base_sink_impl::send_message(const void* in_buf, @@ -89,9 +85,9 @@ int base_sink_impl::send_message(const void* in_buf, zmq::message_t key_message(d_key.size()); memcpy(key_message.data(), d_key.data(), d_key.size()); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(key_message, zmq::send_flags::sndmore); + d_socket.send(key_message, zmq::send_flags::sndmore); #else - d_socket->send(key_message, ZMQ_SNDMORE); + d_socket.send(key_message, ZMQ_SNDMORE); #endif } /* Meta-data header */ @@ -116,9 +112,9 @@ int base_sink_impl::send_message(const void* in_buf, /* Send */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(msg, zmq::send_flags::none); + d_socket.send(msg, zmq::send_flags::none); #else - d_socket->send(msg); + d_socket.send(msg); #endif /* Report back */ @@ -140,15 +136,15 @@ base_source_impl::base_source_impl(int type, /* Set high watermark */ if (hwm >= 0) { #ifdef ZMQ_RCVHWM - d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + d_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; - d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); + d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif } /* Connect */ - d_socket->connect(address); + d_socket.connect(address); } bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; } @@ -185,7 +181,7 @@ int base_source_impl::flush_pending(void* out_buf, bool base_source_impl::load_message(bool wait) { /* Poll for input */ - zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, wait ? d_timeout : 0); if (!(items[0].revents & ZMQ_POLLIN)) @@ -194,7 +190,7 @@ bool base_source_impl::load_message(bool wait) /* Is this the start or continuation of a multi-part message? */ int64_t more = 0; size_t more_len = sizeof(more); - d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); + d_socket.getsockopt(ZMQ_RCVMORE, &more, &more_len); /* Reset */ d_msg.rebuild(); @@ -204,23 +200,23 @@ bool base_source_impl::load_message(bool wait) /* Get the message */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(d_msg); + d_socket.recv(d_msg); #else - d_socket->recv(&d_msg); + d_socket.recv(&d_msg); #endif /* Throw away key and get the first message. Avoid blocking if a multi-part * message is not sent */ if (d_key.size() > 0 && !more) { int64_t is_multipart; - d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); + d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); d_msg.rebuild(); if (is_multipart) #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(d_msg); + d_socket.recv(d_msg); #else - d_socket->recv(&d_msg); + d_socket.recv(&d_msg); #endif else return false; diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h index c9b5245fdb..eb98ce1b3c 100644 --- a/gr-zeromq/lib/base_impl.h +++ b/gr-zeromq/lib/base_impl.h @@ -30,8 +30,8 @@ public: protected: std::string last_endpoint(); - zmq::context_t* d_context; - zmq::socket_t* d_socket; + zmq::context_t d_context; + zmq::socket_t d_socket; size_t d_vsize; int d_timeout; bool d_pass_tags; diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc index 232597e814..c88f9e8f41 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.cc +++ b/gr-zeromq/lib/pub_msg_sink_impl.cc @@ -28,7 +28,9 @@ pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout, bool bind) : gr::block("pub_msg_sink", gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), - d_timeout(timeout) + d_timeout(timeout), + d_context(1), + d_socket(d_context, ZMQ_PUB) { int major, minor, patch; zmq::version(&major, &minor, &patch); @@ -37,28 +39,20 @@ pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout, bool bind) d_timeout = timeout * 1000; } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time)); if (bind) { - d_socket->bind(address); + d_socket.bind(address); } else { - d_socket->connect(address); + d_socket.connect(address); } message_port_register_in(pmt::mp("in")); set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); }); } -pub_msg_sink_impl::~pub_msg_sink_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +pub_msg_sink_impl::~pub_msg_sink_impl() {} void pub_msg_sink_impl::handler(pmt::pmt_t msg) { @@ -69,9 +63,9 @@ void pub_msg_sink_impl::handler(pmt::pmt_t msg) memcpy(zmsg.data(), s.c_str(), s.size()); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(zmsg, zmq::send_flags::none); + d_socket.send(zmsg, zmq::send_flags::none); #else - d_socket->send(zmsg); + d_socket.send(zmsg); #endif } diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h index 1c278ce70a..49e547ea9f 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.h +++ b/gr-zeromq/lib/pub_msg_sink_impl.h @@ -21,8 +21,8 @@ class pub_msg_sink_impl : public pub_msg_sink { private: float d_timeout; - zmq::context_t* d_context; - zmq::socket_t* d_socket; + zmq::context_t d_context; + zmq::socket_t d_socket; public: pub_msg_sink_impl(char* address, int timeout, bool bind); @@ -33,7 +33,7 @@ public: { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } }; diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc index 9a467e597f..56dba69735 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.cc +++ b/gr-zeromq/lib/pull_msg_source_impl.cc @@ -16,6 +16,7 @@ #include "tag_headers.h" #include <gnuradio/io_signature.h> #include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/make_unique.hpp> #include <boost/thread/thread.hpp> namespace gr { @@ -31,6 +32,8 @@ pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout, bool bind gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), d_timeout(timeout), + d_context(1), + d_socket(d_context, ZMQ_PULL), d_port(pmt::mp("out")) { int major, minor, patch; @@ -40,32 +43,25 @@ pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout, bool bind d_timeout = timeout * 1000; } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time)); if (bind) { - d_socket->bind(address); + d_socket.bind(address); } else { - d_socket->connect(address); + d_socket.connect(address); } message_port_register_out(d_port); } -pull_msg_source_impl::~pull_msg_source_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +pull_msg_source_impl::~pull_msg_source_impl() {} bool pull_msg_source_impl::start() { d_finished = false; - d_thread = new boost::thread(boost::bind(&pull_msg_source_impl::readloop, this)); + d_thread = boost::make_unique<boost::thread>( + boost::bind(&pull_msg_source_impl::readloop, this)); return true; } @@ -80,7 +76,7 @@ void pull_msg_source_impl::readloop() { while (!d_finished) { - zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process @@ -89,9 +85,9 @@ void pull_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(msg); + d_socket.recv(msg); #else - d_socket->recv(&msg); + d_socket.recv(&msg); #endif std::string buf(static_cast<char*>(msg.data()), msg.size()); diff --git a/gr-zeromq/lib/pull_msg_source_impl.h b/gr-zeromq/lib/pull_msg_source_impl.h index 4d84b11119..0680284253 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.h +++ b/gr-zeromq/lib/pull_msg_source_impl.h @@ -21,9 +21,9 @@ class pull_msg_source_impl : public pull_msg_source { private: int d_timeout; // microseconds, -1 is blocking - zmq::context_t* d_context; - zmq::socket_t* d_socket; - boost::thread* d_thread; + zmq::context_t d_context; + zmq::socket_t d_socket; + std::unique_ptr<boost::thread> d_thread; const pmt::pmt_t d_port; void readloop(); @@ -41,7 +41,7 @@ public: { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } }; diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc index b32bea620b..08b2eff40d 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.cc +++ b/gr-zeromq/lib/push_msg_sink_impl.cc @@ -28,7 +28,9 @@ push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout, bool bind) : gr::block("push_msg_sink", gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), - d_timeout(timeout) + d_timeout(timeout), + d_context(1), + d_socket(d_context, ZMQ_PUSH) { int major, minor, patch; zmq::version(&major, &minor, &patch); @@ -37,28 +39,20 @@ push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout, bool bind) d_timeout = timeout * 1000; } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time)); if (bind) { - d_socket->bind(address); + d_socket.bind(address); } else { - d_socket->connect(address); + d_socket.connect(address); } message_port_register_in(pmt::mp("in")); set_msg_handler(pmt::mp("in"), [this](pmt::pmt_t msg) { this->handler(msg); }); } -push_msg_sink_impl::~push_msg_sink_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +push_msg_sink_impl::~push_msg_sink_impl() {} void push_msg_sink_impl::handler(pmt::pmt_t msg) { @@ -69,9 +63,9 @@ void push_msg_sink_impl::handler(pmt::pmt_t msg) memcpy(zmsg.data(), s.c_str(), s.size()); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(zmsg, zmq::send_flags::none); + d_socket.send(zmsg, zmq::send_flags::none); #else - d_socket->send(zmsg); + d_socket.send(zmsg); #endif } diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h index a197e68569..57a3a2da36 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.h +++ b/gr-zeromq/lib/push_msg_sink_impl.h @@ -21,8 +21,8 @@ class push_msg_sink_impl : public push_msg_sink { private: float d_timeout; - zmq::context_t* d_context; - zmq::socket_t* d_socket; + zmq::context_t d_context; + zmq::socket_t d_socket; public: push_msg_sink_impl(char* address, int timeout, bool bind); @@ -33,7 +33,7 @@ public: { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } }; diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index fb3d3f323a..427bb8abb3 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -41,7 +41,7 @@ int push_sink_impl::work(int noutput_items, gr_vector_void_star& output_items) { // Poll with a timeout (FIXME: scheduler can't wait for us) - zmq::pollitem_t itemsout[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 } }; + zmq::pollitem_t itemsout[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLOUT, 0 } }; zmq::poll(&itemsout[0], 1, d_timeout); // If we can send something, do it diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc index 5496d26253..5bb4f3b152 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.cc +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -15,6 +15,7 @@ #include "rep_msg_sink_impl.h" #include "tag_headers.h" #include <gnuradio/io_signature.h> +#include <boost/make_unique.hpp> namespace gr { namespace zeromq { @@ -29,6 +30,8 @@ rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout, bool bind) gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), d_timeout(timeout), + d_context(1), + d_socket(d_context, ZMQ_REP), d_port(pmt::mp("in")) { int major, minor, patch; @@ -38,32 +41,25 @@ rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout, bool bind) d_timeout = timeout * 1000; } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REP); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time)); if (bind) { - d_socket->bind(address); + d_socket.bind(address); } else { - d_socket->connect(address); + d_socket.connect(address); } message_port_register_in(d_port); } -rep_msg_sink_impl::~rep_msg_sink_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +rep_msg_sink_impl::~rep_msg_sink_impl() {} bool rep_msg_sink_impl::start() { d_finished = false; - d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, this)); + d_thread = boost::make_unique<boost::thread>( + boost::bind(&rep_msg_sink_impl::readloop, this)); return true; } @@ -83,7 +79,7 @@ void rep_msg_sink_impl::readloop() // wait for query... zmq::pollitem_t items[] = { - { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } + { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, d_timeout); @@ -93,9 +89,9 @@ void rep_msg_sink_impl::readloop() // receive data request zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(request); + d_socket.recv(request); #else - d_socket->recv(&request); + d_socket.recv(&request); #endif int req_output_items = *(static_cast<int*>(request.data())); @@ -111,9 +107,9 @@ void rep_msg_sink_impl::readloop() zmq::message_t zmsg(s.size()); memcpy(zmsg.data(), s.c_str(), s.size()); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(zmsg, zmq::send_flags::none); + d_socket.send(zmsg, zmq::send_flags::none); #else - d_socket->send(zmsg); + d_socket.send(zmsg); #endif } // if req } // while !empty diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h index d42bc8077b..b5685d4403 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.h +++ b/gr-zeromq/lib/rep_msg_sink_impl.h @@ -21,9 +21,9 @@ class rep_msg_sink_impl : public rep_msg_sink { private: int d_timeout; - zmq::context_t* d_context; - zmq::socket_t* d_socket; - boost::thread* d_thread; + zmq::context_t d_context; + zmq::socket_t d_socket; + std::unique_ptr<boost::thread> d_thread; bool d_finished; const pmt::pmt_t d_port; @@ -41,7 +41,7 @@ public: { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } }; diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 7f207f53ad..920cc1ff27 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -49,7 +49,7 @@ int rep_sink_impl::work(int noutput_items, /* Wait for a small time (FIXME: scheduler can't wait for us) */ /* We only wait if its the first iteration, for the others we'll * let the scheduler retry */ - zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, first ? d_timeout : 0); /* If we don't have anything, we're done */ @@ -59,9 +59,9 @@ int rep_sink_impl::work(int noutput_items, /* Get and parse the request */ zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(request); + d_socket.recv(request); #else - d_socket->recv(&request); + d_socket.recv(&request); #endif int nitems_send = noutput_items - done; diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index a1d2bf76f7..ff4405cb0e 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -16,6 +16,7 @@ #include "tag_headers.h" #include <gnuradio/io_signature.h> #include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/make_unique.hpp> #include <boost/thread/thread.hpp> namespace gr { @@ -31,6 +32,8 @@ req_msg_source_impl::req_msg_source_impl(char* address, int timeout, bool bind) gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), d_timeout(timeout), + d_context(1), + d_socket(d_context, ZMQ_REQ), d_port(pmt::mp("out")) { int major, minor, patch; @@ -40,32 +43,25 @@ req_msg_source_impl::req_msg_source_impl(char* address, int timeout, bool bind) d_timeout = timeout * 1000; } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket.setsockopt(ZMQ_LINGER, &time, sizeof(time)); if (bind) { - d_socket->bind(address); + d_socket.bind(address); } else { - d_socket->connect(address); + d_socket.connect(address); } message_port_register_out(d_port); } -req_msg_source_impl::~req_msg_source_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +req_msg_source_impl::~req_msg_source_impl() {} bool req_msg_source_impl::start() { d_finished = false; - d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this)); + d_thread = boost::make_unique<boost::thread>( + boost::bind(&req_msg_source_impl::readloop, this)); return true; } @@ -82,7 +78,7 @@ void req_msg_source_impl::readloop() // std::cout << "readloop\n"; zmq::pollitem_t itemsout[] = { - { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 } + { static_cast<void*>(d_socket), 0, ZMQ_POLLOUT, 0 } }; zmq::poll(&itemsout[0], 1, d_timeout); @@ -93,13 +89,13 @@ void req_msg_source_impl::readloop() zmq::message_t request(sizeof(int)); memcpy((void*)request.data(), &nmsg, sizeof(int)); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(request, zmq::send_flags::none); + d_socket.send(request, zmq::send_flags::none); #else - d_socket->send(request); + d_socket.send(request); #endif } - zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process @@ -107,9 +103,9 @@ void req_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(msg); + d_socket.recv(msg); #else - d_socket->recv(&msg); + d_socket.recv(&msg); #endif std::string buf(static_cast<char*>(msg.data()), msg.size()); diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h index 3ea44c632d..e8b85c6801 100644 --- a/gr-zeromq/lib/req_msg_source_impl.h +++ b/gr-zeromq/lib/req_msg_source_impl.h @@ -21,9 +21,9 @@ class req_msg_source_impl : public req_msg_source { private: int d_timeout; - zmq::context_t* d_context; - zmq::socket_t* d_socket; - boost::thread* d_thread; + zmq::context_t d_context; + zmq::socket_t d_socket; + std::unique_ptr<boost::thread> d_thread; const pmt::pmt_t d_port; void readloop(); @@ -41,7 +41,7 @@ public: { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } }; diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index b200ed076c..4ecaeb08d9 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -66,9 +66,9 @@ int req_source_impl::work(int noutput_items, zmq::message_t request(sizeof(uint32_t)); memcpy((void*)request.data(), &req_len, sizeof(uint32_t)); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(request, zmq::send_flags::none); + d_socket.send(request, zmq::send_flags::none); #else - d_socket->send(request); + d_socket.send(request); #endif d_req_pending = true; diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index d5b06b4df1..9c4b283097 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -16,6 +16,7 @@ #include "tag_headers.h" #include <gnuradio/io_signature.h> #include <boost/date_time/posix_time/posix_time.hpp> +#include <boost/make_unique.hpp> #include <boost/thread/thread.hpp> namespace gr { @@ -31,6 +32,8 @@ sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout, bool bind) gr::io_signature::make(0, 0, 0), gr::io_signature::make(0, 0, 0)), d_timeout(timeout), + d_context(1), + d_socket(d_context, ZMQ_SUB), d_port(pmt::mp("out")) { int major, minor, patch; @@ -40,31 +43,24 @@ sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout, bool bind) d_timeout = timeout * 1000; } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); - - d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + d_socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); if (bind) { - d_socket->bind(address); + d_socket.bind(address); } else { - d_socket->connect(address); + d_socket.connect(address); } message_port_register_out(d_port); } -sub_msg_source_impl::~sub_msg_source_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +sub_msg_source_impl::~sub_msg_source_impl() {} bool sub_msg_source_impl::start() { d_finished = false; - d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this)); + d_thread = boost::make_unique<boost::thread>( + boost::bind(&sub_msg_source_impl::readloop, this)); return true; } @@ -79,7 +75,7 @@ void sub_msg_source_impl::readloop() { while (!d_finished) { - zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process @@ -88,9 +84,9 @@ void sub_msg_source_impl::readloop() // Receive data zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(msg); + d_socket.recv(msg); #else - d_socket->recv(&msg); + d_socket.recv(&msg); #endif std::string buf(static_cast<char*>(msg.data()), msg.size()); std::stringbuf sb(buf); diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h index 6707a217de..52759c3f1a 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.h +++ b/gr-zeromq/lib/sub_msg_source_impl.h @@ -21,9 +21,9 @@ class sub_msg_source_impl : public sub_msg_source { private: int d_timeout; // microseconds, -1 is blocking - zmq::context_t* d_context; - zmq::socket_t* d_socket; - boost::thread* d_thread; + zmq::context_t d_context; + zmq::socket_t d_socket; + std::unique_ptr<boost::thread> d_thread; const pmt::pmt_t d_port; void readloop(); @@ -41,7 +41,7 @@ public: { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } }; diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index ddc2eb7ad8..168ac9eebb 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -44,7 +44,7 @@ sub_source_impl::sub_source_impl(size_t itemsize, base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm, key) { /* Subscribe */ - d_socket->setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size()); + d_socket.setsockopt(ZMQ_SUBSCRIBE, key.c_str(), key.size()); } int sub_source_impl::work(int noutput_items, |