summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/base_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r--gr-zeromq/lib/base_impl.cc54
1 files changed, 25 insertions, 29 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index 67f800274f..5889490898 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -25,7 +25,12 @@ base_impl::base_impl(int type,
int timeout,
bool pass_tags,
const std::string& key)
- : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key)
+ : d_context(1),
+ d_socket(d_context, type),
+ d_vsize(itemsize * vlen),
+ d_timeout(timeout),
+ d_pass_tags(pass_tags),
+ d_key(key)
{
/* "Fix" timeout value (ms for new API, us for old API) */
int major, minor, patch;
@@ -34,24 +39,15 @@ base_impl::base_impl(int type,
if (major < 3) {
d_timeout *= 1000;
}
-
- /* Create context & socket */
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, type);
}
-base_impl::~base_impl()
-{
- d_socket->close();
- delete d_socket;
- delete d_context;
-}
+base_impl::~base_impl() {}
std::string base_impl::last_endpoint()
{
char addr[256];
size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
return std::string(addr, addr_len - 1);
}
@@ -69,15 +65,15 @@ base_sink_impl::base_sink_impl(int type,
/* Set high watermark */
if (hwm >= 0) {
#ifdef ZMQ_SNDHWM
- d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
+ d_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
- d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+ d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
}
/* Bind */
- d_socket->bind(address);
+ d_socket.bind(address);
}
int base_sink_impl::send_message(const void* in_buf,
@@ -89,9 +85,9 @@ int base_sink_impl::send_message(const void* in_buf,
zmq::message_t key_message(d_key.size());
memcpy(key_message.data(), d_key.data(), d_key.size());
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(key_message, zmq::send_flags::sndmore);
+ d_socket.send(key_message, zmq::send_flags::sndmore);
#else
- d_socket->send(key_message, ZMQ_SNDMORE);
+ d_socket.send(key_message, ZMQ_SNDMORE);
#endif
}
/* Meta-data header */
@@ -116,9 +112,9 @@ int base_sink_impl::send_message(const void* in_buf,
/* Send */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(msg, zmq::send_flags::none);
+ d_socket.send(msg, zmq::send_flags::none);
#else
- d_socket->send(msg);
+ d_socket.send(msg);
#endif
/* Report back */
@@ -140,15 +136,15 @@ base_source_impl::base_source_impl(int type,
/* Set high watermark */
if (hwm >= 0) {
#ifdef ZMQ_RCVHWM
- d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
+ d_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
- d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
+ d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
}
/* Connect */
- d_socket->connect(address);
+ d_socket.connect(address);
}
bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; }
@@ -185,7 +181,7 @@ int base_source_impl::flush_pending(void* out_buf,
bool base_source_impl::load_message(bool wait)
{
/* Poll for input */
- zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, wait ? d_timeout : 0);
if (!(items[0].revents & ZMQ_POLLIN))
@@ -194,7 +190,7 @@ bool base_source_impl::load_message(bool wait)
/* Is this the start or continuation of a multi-part message? */
int64_t more = 0;
size_t more_len = sizeof(more);
- d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
+ d_socket.getsockopt(ZMQ_RCVMORE, &more, &more_len);
/* Reset */
d_msg.rebuild();
@@ -204,23 +200,23 @@ bool base_source_impl::load_message(bool wait)
/* Get the message */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(d_msg);
+ d_socket.recv(d_msg);
#else
- d_socket->recv(&d_msg);
+ d_socket.recv(&d_msg);
#endif
/* Throw away key and get the first message. Avoid blocking if a multi-part
* message is not sent */
if (d_key.size() > 0 && !more) {
int64_t is_multipart;
- d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
+ d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len);
d_msg.rebuild();
if (is_multipart)
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(d_msg);
+ d_socket.recv(d_msg);
#else
- d_socket->recv(&d_msg);
+ d_socket.recv(&d_msg);
#endif
else
return false;