diff options
author | Marcus Müller <mmueller@gnuradio.org> | 2019-08-07 21:45:12 +0200 |
---|---|---|
committer | Marcus Müller <marcus@hostalia.de> | 2019-08-09 23:04:28 +0200 |
commit | f7bbf2c1d8d780294f3e016aff239ca35eb6516e (patch) | |
tree | e09ab6112e02b2215b2d59ac24d3d6ea2edac745 /gr-zeromq/lib/base_impl.cc | |
parent | 78431dc6941e3acc67c858277dfe4a0ed583643c (diff) |
Tree: clang-format without the include sorting
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 269 |
1 files changed, 138 insertions, 131 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index 662f7b1596..b3a107c815 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -29,200 +29,207 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) - : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) - { - /* "Fix" timeout value (ms for new API, us for old API) */ - int major, minor, patch; - zmq::version (&major, &minor, &patch); +base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) + : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) +{ + /* "Fix" timeout value (ms for new API, us for old API) */ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - if (major < 3) { + if (major < 3) { d_timeout *= 1000; - } - - /* Create context & socket */ - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, type); - } - - base_impl::~base_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; - } - - std::string - base_impl::last_endpoint() - { - char addr[256]; - size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); } - - base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags) - { - /* Set high watermark */ - if (hwm >= 0) { + /* Create context & socket */ + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, type); +} + +base_impl::~base_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} + +std::string base_impl::last_endpoint() +{ + char addr[256]; + size_t addr_len = sizeof(addr); + d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + return std::string(addr, addr_len - 1); +} + + +base_sink_impl::base_sink_impl(int type, + size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags) +{ + /* Set high watermark */ + if (hwm >= 0) { #ifdef ZMQ_SNDHWM d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif - } - - /* Bind */ - d_socket->bind(address); } - int - base_sink_impl::send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset) - { - /* Meta-data header */ - std::string header(""); - if(d_pass_tags){ + /* Bind */ + d_socket->bind(address); +} + +int base_sink_impl::send_message(const void* in_buf, + const int in_nitems, + const uint64_t in_offset) +{ + /* Meta-data header */ + std::string header(""); + if (d_pass_tags) { std::vector<gr::tag_t> tags; get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems); header = gen_tag_header(in_offset, tags); - } + } - /* Create message */ - size_t payload_len = in_nitems * d_vsize; - size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; - zmq::message_t msg(msg_len); + /* Create message */ + size_t payload_len = in_nitems * d_vsize; + size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; + zmq::message_t msg(msg_len); - if(d_pass_tags){ + if (d_pass_tags) { memcpy(msg.data(), header.c_str(), header.length()); memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len); - } else { + } else { memcpy(msg.data(), in_buf, payload_len); - } + } - /* Send */ + /* Send */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(msg, zmq::send_flags::none); + d_socket->send(msg, zmq::send_flags::none); #else - d_socket->send(msg); + d_socket->send(msg); #endif - /* Report back */ - return in_nitems; - } - - base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags), - d_consumed_bytes(0), d_consumed_items(0) - { - /* Set high watermark */ - if (hwm >= 0) { + /* Report back */ + return in_nitems; +} + +base_source_impl::base_source_impl(int type, + size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags), + d_consumed_bytes(0), + d_consumed_items(0) +{ + /* Set high watermark */ + if (hwm >= 0) { #ifdef ZMQ_RCVHWM d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif - } - - /* Connect */ - d_socket->connect(address); } - bool - base_source_impl::has_pending() - { - return d_msg.size() > d_consumed_bytes; - } + /* Connect */ + d_socket->connect(address); +} - int - base_source_impl::flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset) - { - /* How much to copy in this call */ - int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); - int to_copy_bytes = d_vsize * to_copy_items; +bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; } - /* Copy actual data */ - memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes); +int base_source_impl::flush_pending(void* out_buf, + const int out_nitems, + const uint64_t out_offset) +{ + /* How much to copy in this call */ + int to_copy_items = + std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); + int to_copy_bytes = d_vsize * to_copy_items; - /* Add tags matching this segment of samples */ - for (unsigned int i=0; i<d_tags.size(); i++) - { + /* Copy actual data */ + memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes); + + /* Add tags matching this segment of samples */ + for (unsigned int i = 0; i < d_tags.size(); i++) { if ((d_tags[i].offset >= (uint64_t)d_consumed_items) && - (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) - { - gr::tag_t nt = d_tags[i]; - nt.offset += out_offset - d_consumed_items; - add_item_tag(0, nt); + (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) { + gr::tag_t nt = d_tags[i]; + nt.offset += out_offset - d_consumed_items; + add_item_tag(0, nt); } - } + } - /* Update pointer */ - d_consumed_items += to_copy_items; - d_consumed_bytes += to_copy_bytes; + /* Update pointer */ + d_consumed_items += to_copy_items; + d_consumed_bytes += to_copy_bytes; - return to_copy_items; - } + return to_copy_items; +} - bool - base_source_impl::load_message(bool wait) - { - /* Poll for input */ - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, wait ? d_timeout : 0); +bool base_source_impl::load_message(bool wait) +{ + /* Poll for input */ + zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, wait ? d_timeout : 0); - if (!(items[0].revents & ZMQ_POLLIN)) + if (!(items[0].revents & ZMQ_POLLIN)) return false; - /* Is this the start or continuation of a multi-part message? */ - int64_t more = 0; - size_t more_len = sizeof(more); - d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); + /* Is this the start or continuation of a multi-part message? */ + int64_t more = 0; + size_t more_len = sizeof(more); + d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); - /* Reset */ - d_msg.rebuild(); - d_tags.clear(); - d_consumed_items = 0; - d_consumed_bytes = 0; + /* Reset */ + d_msg.rebuild(); + d_tags.clear(); + d_consumed_items = 0; + d_consumed_bytes = 0; - /* Get the message */ + /* Get the message */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(d_msg); + d_socket->recv(d_msg); #else - d_socket->recv(&d_msg); + d_socket->recv(&d_msg); #endif - /* Parse header from the first (or only) message of a multi-part message */ - if (d_pass_tags && !more) - { + /* Parse header from the first (or only) message of a multi-part message */ + if (d_pass_tags && !more) { uint64_t rcv_offset; /* Parse header */ d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags); /* Fixup the tags offset to be relative to the start of this message */ - for (unsigned int i=0; i<d_tags.size(); i++) { - d_tags[i].offset -= rcv_offset; + for (unsigned int i = 0; i < d_tags.size(); i++) { + d_tags[i].offset -= rcv_offset; } - } + } - /* Each message must contain an integer multiple of data vectors */ - if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) - { + /* Each message must contain an integer multiple of data vectors */ + if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) { throw std::runtime_error( boost::str(boost::format("Incompatible vector sizes: " - "need a multiple of %1% bytes per message") % d_vsize)); - } - - /* We got one ! */ - return true; + "need a multiple of %1% bytes per message") % + d_vsize)); } - } /* namespace zeromq */ + /* We got one ! */ + return true; +} + +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab |