summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/base_impl.cc
diff options
context:
space:
mode:
authorMarcus Müller <mmueller@gnuradio.org>2019-08-07 21:45:12 +0200
committerMarcus Müller <marcus@hostalia.de>2019-08-09 23:04:28 +0200
commitf7bbf2c1d8d780294f3e016aff239ca35eb6516e (patch)
treee09ab6112e02b2215b2d59ac24d3d6ea2edac745 /gr-zeromq/lib/base_impl.cc
parent78431dc6941e3acc67c858277dfe4a0ed583643c (diff)
Tree: clang-format without the include sorting
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r--gr-zeromq/lib/base_impl.cc269
1 files changed, 138 insertions, 131 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index 662f7b1596..b3a107c815 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -29,200 +29,207 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags)
- : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
- {
- /* "Fix" timeout value (ms for new API, us for old API) */
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags)
+ : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+{
+ /* "Fix" timeout value (ms for new API, us for old API) */
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- if (major < 3) {
+ if (major < 3) {
d_timeout *= 1000;
- }
-
- /* Create context & socket */
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, type);
- }
-
- base_impl::~base_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
- }
-
- std::string
- base_impl::last_endpoint()
- {
- char addr[256];
- size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
}
-
- base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags)
- {
- /* Set high watermark */
- if (hwm >= 0) {
+ /* Create context & socket */
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, type);
+}
+
+base_impl::~base_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
+
+std::string base_impl::last_endpoint()
+{
+ char addr[256];
+ size_t addr_len = sizeof(addr);
+ d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ return std::string(addr, addr_len - 1);
+}
+
+
+base_sink_impl::base_sink_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags)
+{
+ /* Set high watermark */
+ if (hwm >= 0) {
#ifdef ZMQ_SNDHWM
d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
- }
-
- /* Bind */
- d_socket->bind(address);
}
- int
- base_sink_impl::send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset)
- {
- /* Meta-data header */
- std::string header("");
- if(d_pass_tags){
+ /* Bind */
+ d_socket->bind(address);
+}
+
+int base_sink_impl::send_message(const void* in_buf,
+ const int in_nitems,
+ const uint64_t in_offset)
+{
+ /* Meta-data header */
+ std::string header("");
+ if (d_pass_tags) {
std::vector<gr::tag_t> tags;
get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems);
header = gen_tag_header(in_offset, tags);
- }
+ }
- /* Create message */
- size_t payload_len = in_nitems * d_vsize;
- size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len;
- zmq::message_t msg(msg_len);
+ /* Create message */
+ size_t payload_len = in_nitems * d_vsize;
+ size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len;
+ zmq::message_t msg(msg_len);
- if(d_pass_tags){
+ if (d_pass_tags) {
memcpy(msg.data(), header.c_str(), header.length());
memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len);
- } else {
+ } else {
memcpy(msg.data(), in_buf, payload_len);
- }
+ }
- /* Send */
+ /* Send */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(msg, zmq::send_flags::none);
+ d_socket->send(msg, zmq::send_flags::none);
#else
- d_socket->send(msg);
+ d_socket->send(msg);
#endif
- /* Report back */
- return in_nitems;
- }
-
- base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags),
- d_consumed_bytes(0), d_consumed_items(0)
- {
- /* Set high watermark */
- if (hwm >= 0) {
+ /* Report back */
+ return in_nitems;
+}
+
+base_source_impl::base_source_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags),
+ d_consumed_bytes(0),
+ d_consumed_items(0)
+{
+ /* Set high watermark */
+ if (hwm >= 0) {
#ifdef ZMQ_RCVHWM
d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
- }
-
- /* Connect */
- d_socket->connect(address);
}
- bool
- base_source_impl::has_pending()
- {
- return d_msg.size() > d_consumed_bytes;
- }
+ /* Connect */
+ d_socket->connect(address);
+}
- int
- base_source_impl::flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset)
- {
- /* How much to copy in this call */
- int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize));
- int to_copy_bytes = d_vsize * to_copy_items;
+bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; }
- /* Copy actual data */
- memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes);
+int base_source_impl::flush_pending(void* out_buf,
+ const int out_nitems,
+ const uint64_t out_offset)
+{
+ /* How much to copy in this call */
+ int to_copy_items =
+ std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize));
+ int to_copy_bytes = d_vsize * to_copy_items;
- /* Add tags matching this segment of samples */
- for (unsigned int i=0; i<d_tags.size(); i++)
- {
+ /* Copy actual data */
+ memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes);
+
+ /* Add tags matching this segment of samples */
+ for (unsigned int i = 0; i < d_tags.size(); i++) {
if ((d_tags[i].offset >= (uint64_t)d_consumed_items) &&
- (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items))
- {
- gr::tag_t nt = d_tags[i];
- nt.offset += out_offset - d_consumed_items;
- add_item_tag(0, nt);
+ (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) {
+ gr::tag_t nt = d_tags[i];
+ nt.offset += out_offset - d_consumed_items;
+ add_item_tag(0, nt);
}
- }
+ }
- /* Update pointer */
- d_consumed_items += to_copy_items;
- d_consumed_bytes += to_copy_bytes;
+ /* Update pointer */
+ d_consumed_items += to_copy_items;
+ d_consumed_bytes += to_copy_bytes;
- return to_copy_items;
- }
+ return to_copy_items;
+}
- bool
- base_source_impl::load_message(bool wait)
- {
- /* Poll for input */
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, wait ? d_timeout : 0);
+bool base_source_impl::load_message(bool wait)
+{
+ /* Poll for input */
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, wait ? d_timeout : 0);
- if (!(items[0].revents & ZMQ_POLLIN))
+ if (!(items[0].revents & ZMQ_POLLIN))
return false;
- /* Is this the start or continuation of a multi-part message? */
- int64_t more = 0;
- size_t more_len = sizeof(more);
- d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
+ /* Is this the start or continuation of a multi-part message? */
+ int64_t more = 0;
+ size_t more_len = sizeof(more);
+ d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
- /* Reset */
- d_msg.rebuild();
- d_tags.clear();
- d_consumed_items = 0;
- d_consumed_bytes = 0;
+ /* Reset */
+ d_msg.rebuild();
+ d_tags.clear();
+ d_consumed_items = 0;
+ d_consumed_bytes = 0;
- /* Get the message */
+ /* Get the message */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(d_msg);
+ d_socket->recv(d_msg);
#else
- d_socket->recv(&d_msg);
+ d_socket->recv(&d_msg);
#endif
- /* Parse header from the first (or only) message of a multi-part message */
- if (d_pass_tags && !more)
- {
+ /* Parse header from the first (or only) message of a multi-part message */
+ if (d_pass_tags && !more) {
uint64_t rcv_offset;
/* Parse header */
d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags);
/* Fixup the tags offset to be relative to the start of this message */
- for (unsigned int i=0; i<d_tags.size(); i++) {
- d_tags[i].offset -= rcv_offset;
+ for (unsigned int i = 0; i < d_tags.size(); i++) {
+ d_tags[i].offset -= rcv_offset;
}
- }
+ }
- /* Each message must contain an integer multiple of data vectors */
- if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0)
- {
+ /* Each message must contain an integer multiple of data vectors */
+ if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) {
throw std::runtime_error(
boost::str(boost::format("Incompatible vector sizes: "
- "need a multiple of %1% bytes per message") % d_vsize));
- }
-
- /* We got one ! */
- return true;
+ "need a multiple of %1% bytes per message") %
+ d_vsize));
}
- } /* namespace zeromq */
+ /* We got one ! */
+ return true;
+}
+
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab