diff options
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 54 |
1 files changed, 25 insertions, 29 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index 67f800274f..5889490898 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -25,7 +25,12 @@ base_impl::base_impl(int type, int timeout, bool pass_tags, const std::string& key) - : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags), d_key(key) + : d_context(1), + d_socket(d_context, type), + d_vsize(itemsize * vlen), + d_timeout(timeout), + d_pass_tags(pass_tags), + d_key(key) { /* "Fix" timeout value (ms for new API, us for old API) */ int major, minor, patch; @@ -34,24 +39,15 @@ base_impl::base_impl(int type, if (major < 3) { d_timeout *= 1000; } - - /* Create context & socket */ - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, type); } -base_impl::~base_impl() -{ - d_socket->close(); - delete d_socket; - delete d_context; -} +base_impl::~base_impl() {} std::string base_impl::last_endpoint() { char addr[256]; size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + d_socket.getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); return std::string(addr, addr_len - 1); } @@ -69,15 +65,15 @@ base_sink_impl::base_sink_impl(int type, /* Set high watermark */ if (hwm >= 0) { #ifdef ZMQ_SNDHWM - d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); + d_socket.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; - d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); + d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif } /* Bind */ - d_socket->bind(address); + d_socket.bind(address); } int base_sink_impl::send_message(const void* in_buf, @@ -89,9 +85,9 @@ int base_sink_impl::send_message(const void* in_buf, zmq::message_t key_message(d_key.size()); memcpy(key_message.data(), d_key.data(), d_key.size()); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(key_message, zmq::send_flags::sndmore); + d_socket.send(key_message, zmq::send_flags::sndmore); #else - d_socket->send(key_message, ZMQ_SNDMORE); + d_socket.send(key_message, ZMQ_SNDMORE); #endif } /* Meta-data header */ @@ -116,9 +112,9 @@ int base_sink_impl::send_message(const void* in_buf, /* Send */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(msg, zmq::send_flags::none); + d_socket.send(msg, zmq::send_flags::none); #else - d_socket->send(msg); + d_socket.send(msg); #endif /* Report back */ @@ -140,15 +136,15 @@ base_source_impl::base_source_impl(int type, /* Set high watermark */ if (hwm >= 0) { #ifdef ZMQ_RCVHWM - d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); + d_socket.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; - d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); + d_socket.setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif } /* Connect */ - d_socket->connect(address); + d_socket.connect(address); } bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; } @@ -185,7 +181,7 @@ int base_source_impl::flush_pending(void* out_buf, bool base_source_impl::load_message(bool wait) { /* Poll for input */ - zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::pollitem_t items[] = { { static_cast<void*>(d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, wait ? d_timeout : 0); if (!(items[0].revents & ZMQ_POLLIN)) @@ -194,7 +190,7 @@ bool base_source_impl::load_message(bool wait) /* Is this the start or continuation of a multi-part message? */ int64_t more = 0; size_t more_len = sizeof(more); - d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); + d_socket.getsockopt(ZMQ_RCVMORE, &more, &more_len); /* Reset */ d_msg.rebuild(); @@ -204,23 +200,23 @@ bool base_source_impl::load_message(bool wait) /* Get the message */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(d_msg); + d_socket.recv(d_msg); #else - d_socket->recv(&d_msg); + d_socket.recv(&d_msg); #endif /* Throw away key and get the first message. Avoid blocking if a multi-part * message is not sent */ if (d_key.size() > 0 && !more) { int64_t is_multipart; - d_socket->getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); + d_socket.getsockopt(ZMQ_RCVMORE, &is_multipart, &more_len); d_msg.rebuild(); if (is_multipart) #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(d_msg); + d_socket.recv(d_msg); #else - d_socket->recv(&d_msg); + d_socket.recv(&d_msg); #endif else return false; |