diff options
Diffstat (limited to 'gr-zeromq/lib/base_impl.cc')
-rw-r--r-- | gr-zeromq/lib/base_impl.cc | 198 |
1 files changed, 198 insertions, 0 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc new file mode 100644 index 0000000000..f41e5cbfb5 --- /dev/null +++ b/gr-zeromq/lib/base_impl.cc @@ -0,0 +1,198 @@ +/* -*- c++ -*- */ +/* + * Copyright 2016 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "base_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) + : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) + { + /* "Fix" timeout value (ms for new API, us for old API) */ + int major, minor, patch; + zmq::version (&major, &minor, &patch); + + if (major < 3) { + d_timeout *= 1000; + } + + /* Create context & socket */ + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, type); + } + + base_impl::~base_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + + base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags) + { + /* Set high watermark */ + if (hwm >= 0) { +#ifdef ZMQ_SNDHWM + d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); +#else // major < 3 + uint64_t tmp = hwm; + d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); +#endif + } + + /* Bind */ + d_socket->bind(address); + } + + int + base_sink_impl::send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset) + { + /* Meta-data header */ + std::string header(""); + if(d_pass_tags){ + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems); + header = gen_tag_header(in_offset, tags); + } + + /* Create message */ + size_t payload_len = in_nitems * d_vsize; + size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; + zmq::message_t msg(msg_len); + + if(d_pass_tags){ + memcpy(msg.data(), header.c_str(), header.length()); + memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len); + } else { + memcpy(msg.data(), in_buf, payload_len); + } + + /* Send */ + d_socket->send(msg); + + /* Report back */ + return in_nitems; + } + + base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags), + d_consumed_bytes(0), d_consumed_items(0) + { + /* Set high watermark */ + if (hwm >= 0) { +#ifdef ZMQ_RCVHWM + d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); +#else // major < 3 + uint64_t tmp = hwm; + d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); +#endif + } + + /* Connect */ + d_socket->connect(address); + } + + bool + base_source_impl::has_pending() + { + return d_msg.size() > d_consumed_bytes; + } + + int + base_source_impl::flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset) + { + /* How much to copy in this call */ + int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); + int to_copy_bytes = d_vsize * to_copy_items; + + /* Copy actual data */ + memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes); + + /* Add tags matching this segment of samples */ + for (unsigned int i=0; i<d_tags.size(); i++) + { + if ((d_tags[i].offset >= (uint64_t)d_consumed_items) && + (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) + { + gr::tag_t nt = d_tags[i]; + nt.offset += out_offset - d_consumed_items; + add_item_tag(0, nt); + } + } + + /* Update pointer */ + d_consumed_items += to_copy_items; + d_consumed_bytes += to_copy_bytes; + + return to_copy_items; + } + + bool + base_source_impl::load_message(bool wait) + { + /* Poll for input */ + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, wait ? d_timeout : 0); + + if (!(items[0].revents & ZMQ_POLLIN)) + return false; + + /* Reset */ + d_msg.rebuild(); + d_tags.clear(); + d_consumed_items = 0; + d_consumed_bytes = 0; + + /* Get the message */ + d_socket->recv(&d_msg); + + /* Parse header */ + if (d_pass_tags) + { + uint64_t rcv_offset; + + /* Parse header */ + d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags); + + /* Fixup the tags offset to be relative to the start of this message */ + for (unsigned int i=0; i<d_tags.size(); i++) { + d_tags[i].offset -= rcv_offset; + } + } + + /* We got one ! */ + return true; + } + + } /* namespace zeromq */ +} /* namespace gr */ + +// vim: ts=2 sw=2 expandtab |