diff options
29 files changed, 651 insertions, 474 deletions
diff --git a/gr-zeromq/grc/zeromq_pub_sink.xml b/gr-zeromq/grc/zeromq_pub_sink.xml index 7babc9eb80..1b2f9ec52a 100644 --- a/gr-zeromq/grc/zeromq_pub_sink.xml +++ b/gr-zeromq/grc/zeromq_pub_sink.xml @@ -4,7 +4,7 @@ <key>zeromq_pub_sink</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> + <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make> <param> <name>IO Type</name> @@ -61,7 +61,23 @@ <name>Pass Tags</name> <key>pass_tags</key> <value>False</value> - <type>bool</type> + <type>enum</type> + <option> + <name>Yes</name> + <key>True</key> + </option> + <option> + <name>No</name> + <key>False</key> + </option> + </param> + + <param> + <name>High Watermark</name> + <key>hwm</key> + <value>-1</value> + <type>int</type> + <hide>#if $hwm() == -1 then 'part' else 'none'#</hide> </param> <sink> diff --git a/gr-zeromq/grc/zeromq_pull_source.xml b/gr-zeromq/grc/zeromq_pull_source.xml index c8a7b890da..8158b47361 100644 --- a/gr-zeromq/grc/zeromq_pull_source.xml +++ b/gr-zeromq/grc/zeromq_pull_source.xml @@ -4,7 +4,7 @@ <key>zeromq_pull_source</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> + <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make> <param> <name>IO Type</name> @@ -61,7 +61,23 @@ <name>Pass Tags</name> <key>pass_tags</key> <value>False</value> - <type>bool</type> + <type>enum</type> + <option> + <name>Yes</name> + <key>True</key> + </option> + <option> + <name>No</name> + <key>False</key> + </option> + </param> + + <param> + <name>High Watermark</name> + <key>hwm</key> + <value>-1</value> + <type>int</type> + <hide>#if $hwm() == -1 then 'part' else 'none'#</hide> </param> <source> diff --git a/gr-zeromq/grc/zeromq_push_sink.xml b/gr-zeromq/grc/zeromq_push_sink.xml index eb6ead5c65..528da94ee6 100644 --- a/gr-zeromq/grc/zeromq_push_sink.xml +++ b/gr-zeromq/grc/zeromq_push_sink.xml @@ -4,7 +4,7 @@ <key>zeromq_push_sink</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> + <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make> <param> <name>IO Type</name> @@ -61,7 +61,23 @@ <name>Pass Tags</name> <key>pass_tags</key> <value>False</value> - <type>bool</type> + <type>enum</type> + <option> + <name>Yes</name> + <key>True</key> + </option> + <option> + <name>No</name> + <key>False</key> + </option> + </param> + + <param> + <name>High Watermark</name> + <key>hwm</key> + <value>-1</value> + <type>int</type> + <hide>#if $hwm() == -1 then 'part' else 'none'#</hide> </param> <sink> diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml b/gr-zeromq/grc/zeromq_rep_sink.xml index 2209b4f3ff..db735a37bb 100644 --- a/gr-zeromq/grc/zeromq_rep_sink.xml +++ b/gr-zeromq/grc/zeromq_rep_sink.xml @@ -4,7 +4,7 @@ <key>zeromq_rep_sink</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> + <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make> <param> <name>IO Type</name> @@ -61,7 +61,23 @@ <name>Pass Tags</name> <key>pass_tags</key> <value>False</value> - <type>bool</type> + <type>enum</type> + <option> + <name>Yes</name> + <key>True</key> + </option> + <option> + <name>No</name> + <key>False</key> + </option> + </param> + + <param> + <name>High Watermark</name> + <key>hwm</key> + <value>-1</value> + <type>int</type> + <hide>#if $hwm() == -1 then 'part' else 'none'#</hide> </param> <sink> diff --git a/gr-zeromq/grc/zeromq_req_source.xml b/gr-zeromq/grc/zeromq_req_source.xml index 050718c1bf..2ef224399d 100644 --- a/gr-zeromq/grc/zeromq_req_source.xml +++ b/gr-zeromq/grc/zeromq_req_source.xml @@ -4,7 +4,7 @@ <key>zeromq_req_source</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> + <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make> <param> <name>IO Type</name> @@ -61,7 +61,23 @@ <name>Pass Tags</name> <key>pass_tags</key> <value>False</value> - <type>bool</type> + <type>enum</type> + <option> + <name>Yes</name> + <key>True</key> + </option> + <option> + <name>No</name> + <key>False</key> + </option> + </param> + + <param> + <name>High Watermark</name> + <key>hwm</key> + <value>-1</value> + <type>int</type> + <hide>#if $hwm() == -1 then 'part' else 'none'#</hide> </param> <source> diff --git a/gr-zeromq/grc/zeromq_sub_source.xml b/gr-zeromq/grc/zeromq_sub_source.xml index 86af5063e3..268a8938d5 100644 --- a/gr-zeromq/grc/zeromq_sub_source.xml +++ b/gr-zeromq/grc/zeromq_sub_source.xml @@ -4,7 +4,7 @@ <key>zeromq_sub_source</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> + <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, $pass_tags, $hwm)</make> <param> <name>IO Type</name> @@ -61,7 +61,23 @@ <name>Pass Tags</name> <key>pass_tags</key> <value>False</value> - <type>bool</type> + <type>enum</type> + <option> + <name>Yes</name> + <key>True</key> + </option> + <option> + <name>No</name> + <key>False</key> + </option> + </param> + + <param> + <name>High Watermark</name> + <key>hwm</key> + <value>-1</value> + <type>int</type> + <hide>#if $hwm() == -1 then 'part' else 'none'#</hide> </param> <source> diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h index e8871c22ac..e87c5522f9 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h @@ -53,9 +53,10 @@ namespace gr { * \param address ZMQ socket address specifier. * \param timeout Receive timeout in seconds, default is 100ms, 1us increments. * \param pass_tags Whether sink will serialize and pass tags over the link. + * \param hwm High Watermark to configure the socket to (-1 => zmq's default) */ static sptr make(size_t itemsize, size_t vlen, char *address, - int timeout=100, bool pass_tags=false); + int timeout=100, bool pass_tags=false, int hwm=-1); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h b/gr-zeromq/include/gnuradio/zeromq/pull_source.h index ca7b40726d..07cf6af128 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h @@ -50,9 +50,10 @@ namespace gr { * \param address ZMQ socket address specifier. * \param timeout Receive timeout in seconds, default is 100ms, 1us increments. * \param pass_tags Whether source will look for and deserialize tags. + * \param hwm High Watermark to configure the socket to (-1 => zmq's default) */ static sptr make(size_t itemsize, size_t vlen, char *address, - int timeout=100, bool pass_tags=false); + int timeout=100, bool pass_tags=false, int hwm=-1); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h index 0f21b446b4..e2260aa3f6 100644 --- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h @@ -54,9 +54,10 @@ namespace gr { * \param address ZMQ socket address specifier. * \param timeout Receive timeout in seconds, default is 100ms, 1us increments. * \param pass_tags Whether sink will serialize and pass tags over the link. + * \param hwm High Watermark to configure the socket to (-1 => zmq's default) */ static sptr make(size_t itemsize, size_t vlen, char *address, - int timeout=100, bool pass_tags=false); + int timeout=100, bool pass_tags=false, int hwm=-1); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h index 33fd38b2df..220bd34416 100644 --- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h @@ -52,9 +52,10 @@ namespace gr { * \param address ZMQ socket address specifier. * \param timeout Receive timeout in seconds, default is 100ms, 1us increments. * \param pass_tags Whether sink will serialize and pass tags over the link. + * \param hwm High Watermark to configure the socket to (-1 => zmq's default) */ static sptr make(size_t itemsize, size_t vlen, char *address, - int timeout=100, bool pass_tags=false); + int timeout=100, bool pass_tags=false, int hwm=-1); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h b/gr-zeromq/include/gnuradio/zeromq/req_source.h index 9936406c23..461f653b43 100644 --- a/gr-zeromq/include/gnuradio/zeromq/req_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h @@ -50,9 +50,10 @@ namespace gr { * \param address ZMQ socket address specifier. * \param timeout Receive timeout in seconds, default is 100ms, 1us increments. * \param pass_tags Whether source will look for and deserialize tags. + * \param hwm High Watermark to configure the socket to (-1 => zmq's default) */ static sptr make(size_t itemsize, size_t vlen, char *address, - int timeout=100, bool pass_tags=false); + int timeout=100, bool pass_tags=false, int hwm=-1); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_source.h index 5fdd8932ec..def3a703e6 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h @@ -50,9 +50,10 @@ namespace gr { * \param address ZMQ socket address specifier. * \param timeout Receive timeout in seconds, default is 100ms, 1us increments. * \param pass_tags Whether source will look for and deserialize tags. + * \param hwm High Watermark to configure the socket to (-1 => zmq's default) */ static sptr make(size_t itemsize, size_t vlen, char *address, - int timeout=100, bool pass_tags=false); + int timeout=100, bool pass_tags=false, int hwm=-1); }; } // namespace zeromq diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt index 941e5ff12b..d7b03fa4ed 100644 --- a/gr-zeromq/lib/CMakeLists.txt +++ b/gr-zeromq/lib/CMakeLists.txt @@ -37,6 +37,7 @@ endif(ENABLE_GR_CTRLPORT) # Setup library ######################################################################## list(APPEND zeromq_sources + base_impl.cc pub_sink_impl.cc pub_msg_sink_impl.cc sub_source_impl.cc diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc new file mode 100644 index 0000000000..f41e5cbfb5 --- /dev/null +++ b/gr-zeromq/lib/base_impl.cc @@ -0,0 +1,198 @@ +/* -*- c++ -*- */ +/* + * Copyright 2016 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "base_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) + : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) + { + /* "Fix" timeout value (ms for new API, us for old API) */ + int major, minor, patch; + zmq::version (&major, &minor, &patch); + + if (major < 3) { + d_timeout *= 1000; + } + + /* Create context & socket */ + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, type); + } + + base_impl::~base_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + + base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags) + { + /* Set high watermark */ + if (hwm >= 0) { +#ifdef ZMQ_SNDHWM + d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); +#else // major < 3 + uint64_t tmp = hwm; + d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); +#endif + } + + /* Bind */ + d_socket->bind(address); + } + + int + base_sink_impl::send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset) + { + /* Meta-data header */ + std::string header(""); + if(d_pass_tags){ + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems); + header = gen_tag_header(in_offset, tags); + } + + /* Create message */ + size_t payload_len = in_nitems * d_vsize; + size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; + zmq::message_t msg(msg_len); + + if(d_pass_tags){ + memcpy(msg.data(), header.c_str(), header.length()); + memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len); + } else { + memcpy(msg.data(), in_buf, payload_len); + } + + /* Send */ + d_socket->send(msg); + + /* Report back */ + return in_nitems; + } + + base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags), + d_consumed_bytes(0), d_consumed_items(0) + { + /* Set high watermark */ + if (hwm >= 0) { +#ifdef ZMQ_RCVHWM + d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); +#else // major < 3 + uint64_t tmp = hwm; + d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); +#endif + } + + /* Connect */ + d_socket->connect(address); + } + + bool + base_source_impl::has_pending() + { + return d_msg.size() > d_consumed_bytes; + } + + int + base_source_impl::flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset) + { + /* How much to copy in this call */ + int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); + int to_copy_bytes = d_vsize * to_copy_items; + + /* Copy actual data */ + memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes); + + /* Add tags matching this segment of samples */ + for (unsigned int i=0; i<d_tags.size(); i++) + { + if ((d_tags[i].offset >= (uint64_t)d_consumed_items) && + (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) + { + gr::tag_t nt = d_tags[i]; + nt.offset += out_offset - d_consumed_items; + add_item_tag(0, nt); + } + } + + /* Update pointer */ + d_consumed_items += to_copy_items; + d_consumed_bytes += to_copy_bytes; + + return to_copy_items; + } + + bool + base_source_impl::load_message(bool wait) + { + /* Poll for input */ + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, wait ? d_timeout : 0); + + if (!(items[0].revents & ZMQ_POLLIN)) + return false; + + /* Reset */ + d_msg.rebuild(); + d_tags.clear(); + d_consumed_items = 0; + d_consumed_bytes = 0; + + /* Get the message */ + d_socket->recv(&d_msg); + + /* Parse header */ + if (d_pass_tags) + { + uint64_t rcv_offset; + + /* Parse header */ + d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags); + + /* Fixup the tags offset to be relative to the start of this message */ + for (unsigned int i=0; i<d_tags.size(); i++) { + d_tags[i].offset -= rcv_offset; + } + } + + /* We got one ! */ + return true; + } + + } /* namespace zeromq */ +} /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h new file mode 100644 index 0000000000..ed1695102e --- /dev/null +++ b/gr-zeromq/lib/base_impl.h @@ -0,0 +1,77 @@ +/* -*- c++ -*- */ +/* + * Copyright 2016 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_BASE_IMPL_H +#define INCLUDED_ZEROMQ_BASE_IMPL_H + +#include <zmq.hpp> + +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + class base_impl : public virtual gr::sync_block + { + public: + base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags); + virtual ~base_impl(); + + protected: + zmq::context_t *d_context; + zmq::socket_t *d_socket; + size_t d_vsize; + int d_timeout; + bool d_pass_tags; + }; + + class base_sink_impl : public base_impl + { + public: + base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); + + protected: + int send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset); + }; + + class base_source_impl : public base_impl + { + public: + base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); + + protected: + zmq::message_t d_msg; + std::vector<gr::tag_t> d_tags; + size_t d_consumed_bytes; + int d_consumed_items; + + bool has_pending(); + int flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset); + bool load_message(bool wait); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_BASE_IMPL_H */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index c1030696ac..b602bc83a6 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -32,35 +32,19 @@ namespace gr { namespace zeromq { pub_sink::sptr - pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags)); + (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("pub_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - if (major < 3) { - d_timeout = timeout*1000; - } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind(address); - } - - pub_sink_impl::~pub_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -68,33 +52,10 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - const char *in = (const char *)input_items[0]; - - // encode the current offset, # tags, and tags into header - std::string header(""); - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); - } - - // create message copy and send - int payloadlen = d_itemsize * d_vlen * noutput_items; - int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen; - zmq::message_t msg(msglen); - - if(d_pass_tags){ - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - } else { - memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - } - - d_socket->send(msg); - - return noutput_items; + return send_message(input_items[0], noutput_items, nitems_read(0)); } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index 100b0f5b8c..8637c3565a 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -26,22 +26,15 @@ #include <gnuradio/zeromq/pub_sink.h> #include <zmq.hpp> +#include "base_impl.h" + namespace gr { namespace zeromq { - class pub_sink_impl : public pub_sink + class pub_sink_impl : public pub_sink, public base_sink_impl { - private: - size_t d_itemsize; - size_t d_vlen; - float d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; - bool d_pass_tags; - public: - pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); - ~pub_sink_impl(); + pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); int work(int noutput_items, gr_vector_const_void_star &input_items, diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 3215096f56..4045dd7b66 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -32,41 +32,19 @@ namespace gr { namespace zeromq { pull_source::sptr - pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags)); + (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("pull_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->connect (address); - } - - /* - * Our virtual destructor. - */ - pull_source_impl::~pull_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -74,44 +52,37 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - char *out = (char*)output_items[0]; - - zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); - - // If we got a reply, process - if (items[0].revents & ZMQ_POLLIN) { - - // Receive data - zmq::message_t msg; - d_socket->recv(&msg); - - // check header for tags... - std::string buf(static_cast<char*>(msg.data()), msg.size()); - if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } - } - - // Copy to ouput buffer and return - if (buf.size() >= d_itemsize*d_vlen*noutput_items) { - memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); - return noutput_items; + uint8_t *out = (uint8_t *) output_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) + { + if (has_pending()) + { + /* Flush anything pending */ + done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + + /* No more space ? */ + if (done == noutput_items) + break; } - else { - memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); + else + { + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ + + /* Not the first anymore */ + first = false; } } - else { - return 0; // FIXME: someday when the scheduler does all the poll/selects - } + + return done; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h index 757867998d..7d8ab53bd0 100644 --- a/gr-zeromq/lib/pull_source_impl.h +++ b/gr-zeromq/lib/pull_source_impl.h @@ -26,22 +26,15 @@ #include <gnuradio/zeromq/pull_source.h> #include <zmq.hpp> +#include "base_impl.h" + namespace gr { namespace zeromq { - class pull_source_impl : public pull_source + class pull_source_impl : public pull_source, public base_source_impl { - private: - size_t d_itemsize; - size_t d_vlen; - int d_timeout; // microseconds, -1 is blocking - zmq::context_t *d_context; - zmq::socket_t *d_socket; - bool d_pass_tags; - public: - pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); - ~pull_source_impl(); + pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); int work(int noutput_items, gr_vector_const_void_star &input_items, diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index 7c06dc590c..a5aec2cf80 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -32,38 +32,19 @@ namespace gr { namespace zeromq { push_sink::sptr - push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags)); + (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("push_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind (address); - } - - push_sink_impl::~push_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -71,43 +52,19 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - const char *in = (const char *) input_items[0]; - + // Poll with a timeout (FIXME: scheduler can't wait for us) zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; zmq::poll(&itemsout[0], 1, d_timeout); - // If we got a reply, process - if (itemsout[0].revents & ZMQ_POLLOUT) { - - // encode the current offset, # tags, and tags into header - std::string header(""); - - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header(offset, tags); - } + // If we can send something, do it + if (itemsout[0].revents & ZMQ_POLLOUT) + return send_message(input_items[0], noutput_items, nitems_read(0)); - // create message copy and send - int payloadlen = d_itemsize * d_vlen * noutput_items; - int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen; - zmq::message_t msg(msglen); - - if(d_pass_tags){ - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - } else { - memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - } - - d_socket->send(msg); - return noutput_items; - } - else { - return 0; // FIXME: when scheduler supports return blocking - } + // If not, do nothing + return 0; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h index 924dee3f15..0a5de10787 100644 --- a/gr-zeromq/lib/push_sink_impl.h +++ b/gr-zeromq/lib/push_sink_impl.h @@ -26,22 +26,15 @@ #include <gnuradio/zeromq/push_sink.h> #include <zmq.hpp> +#include "base_impl.h" + namespace gr { namespace zeromq { - class push_sink_impl : public push_sink + class push_sink_impl : public push_sink, public base_sink_impl { - private: - size_t d_itemsize; - size_t d_vlen; - float d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; - bool d_pass_tags; - public: - push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); - ~push_sink_impl(); + push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); int work(int noutput_items, gr_vector_const_void_star &input_items, diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 034a5b0f83..ac6fc9c8b1 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -32,38 +32,19 @@ namespace gr { namespace zeromq { rep_sink::sptr - rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags)); + (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REP); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind (address); - } - - rep_sink_impl::~rep_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -71,46 +52,44 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - const char *in = (const char *) input_items[0]; - - zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, d_timeout); - - // If we got a reply, process - if (items[0].revents & ZMQ_POLLIN) { - // receive data request + const uint8_t *in = (const uint8_t *) input_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) + { + /* Wait for a small time (FIXME: scheduler can't wait for us) */ + /* We only wait if its the first iteration, for the others we'll + * let the scheduler retry */ + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, first ? d_timeout : 0); + + /* If we dont have anything, we're done */ + if (!(items[0].revents & ZMQ_POLLIN)) + break; + + /* Get and parse the request */ zmq::message_t request; d_socket->recv(&request); - int req_output_items = *(static_cast<int*>(request.data())); - int nitems_send = std::min(noutput_items, req_output_items); - // encode the current offset, # tags, and tags into header - std::string header(""); - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); + int nitems_send = noutput_items - done; + if (request.size() >= sizeof(uint32_t)) + { + int req = (int)*(static_cast<uint32_t*>(request.data())); + nitems_send = std::min(nitems_send, req); } + /* Delegate the actual send */ + done += send_message(in + (done * d_vsize), nitems_send, nitems_read(0) + done); - // create message copy and send - int payloadlen = d_itemsize * d_vlen * noutput_items; - int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen; - zmq::message_t msg(msglen); - - if(d_pass_tags){ - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - } else { - memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - } - d_socket->send(msg); - - return nitems_send; + /* Not the first anymore */ + first = false; } - return 0; + return done; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h index 55ebb69bfa..012fc45e7b 100644 --- a/gr-zeromq/lib/rep_sink_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -26,22 +26,15 @@ #include <gnuradio/zeromq/rep_sink.h> #include <zmq.hpp> +#include "base_impl.h" + namespace gr { namespace zeromq { - class rep_sink_impl : public rep_sink + class rep_sink_impl : public rep_sink, public base_sink_impl { - private: - size_t d_itemsize; - size_t d_vlen; - int d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; - bool d_pass_tags; - public: - rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); - ~rep_sink_impl(); + rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); int work(int noutput_items, gr_vector_const_void_star &input_items, diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index f69d447f98..526736389e 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -32,38 +32,20 @@ namespace gr { namespace zeromq { req_source::sptr - req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new req_source_impl(itemsize, vlen, address, timeout, pass_tags)); + (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("req_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm), + d_req_pending(false) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->connect (address); - } - - req_source_impl::~req_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -71,49 +53,56 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - char *out = (char*)output_items[0]; +#if 0 +#endif + uint8_t *out = (uint8_t *) output_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) + { + if (has_pending()) + { + /* Flush anything pending */ + done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + + /* No more space ? */ + if (done == noutput_items) + break; + } + else + { + /* Send request if needed */ + if (!d_req_pending) + { + /* The REP/REQ pattern state machine guarantees we can send at this point */ + uint32_t req_len = noutput_items - done; + zmq::message_t request(sizeof(uint32_t)); + memcpy ((void *) request.data (), &req_len, sizeof(uint32_t)); + d_socket->send(request); + + d_req_pending = true; + } - zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; - zmq::poll (&itemsout[0], 1, d_timeout); + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ - // If we got a reply, process - if (itemsout[0].revents & ZMQ_POLLOUT) { - // Request data, FIXME non portable? - zmq::message_t request(sizeof(int)); - memcpy ((void *) request.data (), &noutput_items, sizeof(int)); - d_socket->send(request); - } + /* Got response */ + d_req_pending = false; - zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&itemsin[0], 1, d_timeout); - - // If we got a reply, process - if (itemsin[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t reply; - d_socket->recv(&reply); - - // Deserialize header data / tags - std::string buf(static_cast<char*>(reply.data()), reply.size()); - - if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } + /* Not the first anymore */ + first = false; } - - - // Copy to ouput buffer and return - memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); } + return done; + return 0; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h index 7c6bc5310a..8bdbd33459 100644 --- a/gr-zeromq/lib/req_source_impl.h +++ b/gr-zeromq/lib/req_source_impl.h @@ -26,26 +26,22 @@ #include <gnuradio/zeromq/req_source.h> #include <zmq.hpp> +#include "base_impl.h" + namespace gr { namespace zeromq { - class req_source_impl : public req_source + class req_source_impl : public req_source, public base_source_impl { - private: - size_t d_itemsize; - size_t d_vlen; - int d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; - bool d_pass_tags; - public: - req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); - ~req_source_impl(); + req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); int work(int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items); + + private: + bool d_req_pending; }; } // namespace zeromq diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 1242688a90..9a2e0bfe15 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -32,40 +32,20 @@ namespace gr { namespace zeromq { sub_source::sptr - sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags)); + (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("sub_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); - + /* Subscribe */ d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); - d_socket->connect (address); - } - - /* - * Our virtual destructor. - */ - sub_source_impl::~sub_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; } int @@ -73,47 +53,37 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - char *out = (char*)output_items[0]; - - zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); - - // If we got a reply, process - if (items[0].revents & ZMQ_POLLIN) { - - // Receive data - zmq::message_t msg; - d_socket->recv(&msg); - - // Deserialize header data / tags - std::string buf(static_cast<char*>(msg.data()), msg.size()); - - if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - - buf = parse_tag_header(buf, rcv_offset, tags); - - for(size_t i=0; i<tags.size(); i++) { - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } + uint8_t *out = (uint8_t *) output_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) + { + if (has_pending()) + { + /* Flush anything pending */ + done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + + /* No more space ? */ + if (done == noutput_items) + break; } - - // Copy to ouput buffer and return - if (buf.size() >= d_itemsize*d_vlen*noutput_items) { - memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); - return noutput_items; + else + { + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ + + /* Not the first anymore */ + first = false; } - else { - memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); - } - } - else { - return 0; // FIXME: someday when the scheduler does all the poll/selects } + + return done; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h index 0fa8d179cd..8f82a9ab94 100644 --- a/gr-zeromq/lib/sub_source_impl.h +++ b/gr-zeromq/lib/sub_source_impl.h @@ -24,28 +24,21 @@ #define INCLUDED_ZEROMQ_SUB_SOURCE_IMPL_H #include <gnuradio/zeromq/sub_source.h> -#include "zmq.hpp" +#include <zmq.hpp> + +#include "base_impl.h" namespace gr { namespace zeromq { - class sub_source_impl : public sub_source + class sub_source_impl : public sub_source, public base_source_impl { - private: - size_t d_itemsize; - size_t d_vlen; - int d_timeout; // microseconds, -1 is blocking - zmq::context_t *d_context; - zmq::socket_t *d_socket; - bool d_pass_tags; - - public: - sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); - ~sub_source_impl(); + public: + sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); }; } // namespace zeromq diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc index c97066629c..5a5a417420 100644 --- a/gr-zeromq/lib/tag_headers.cc +++ b/gr-zeromq/lib/tag_headers.cc @@ -24,78 +24,88 @@ #include <gnuradio/block.h> #include <sstream> #include <cstring> +#include <zmq.hpp> -#define GR_HEADER_MAGIC 0x5FF0 +#define GR_HEADER_MAGIC 0x5FF0 #define GR_HEADER_VERSION 0x01 namespace gr { namespace zeromq { + struct membuf: std::streambuf + { + membuf(void *b, size_t len) + { + char *bc = static_cast<char*>(b); + this->setg(bc, bc, bc+len); + } + }; + std::string - gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags) { + gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags) + { + std::stringbuf sb(""); + std::ostream ss(&sb); uint16_t header_magic = GR_HEADER_MAGIC; uint8_t header_version = GR_HEADER_VERSION; + uint64_t ntags = (uint64_t)tags.size(); - std::stringstream ss; - size_t ntags = tags.size(); - ss.write( reinterpret_cast< const char* >( &header_magic ), sizeof(uint16_t) ); - ss.write( reinterpret_cast< const char* >( &header_version ), sizeof(uint8_t) ); - - ss.write( reinterpret_cast< const char* >( &offset ), sizeof(uint64_t) ); // offset - ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags - std::stringbuf sb(""); - //std::cout << "TX TAGS: (offset="<<offset<<" ntags="<<ntags<<")\n"; - for(size_t i=0; i<tags.size(); i++){ - //std::cout << "TX TAG: (" << tags[i].offset << ", " << tags[i].key << ", " << tags[i].value << ", " << tags[i].srcid << ")\n"; - ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset - sb.str(""); - pmt::serialize( tags[i].key, sb ); // key - pmt::serialize( tags[i].value, sb ); // value - pmt::serialize( tags[i].srcid, sb ); // srcid - ss.write( sb.str().c_str() , sb.str().length() ); + ss.write( (const char*)&header_magic, sizeof(uint16_t) ); + ss.write( (const char*)&header_version, sizeof(uint8_t) ); + ss.write( (const char*)&offset, sizeof(uint64_t) ); + ss.write( (const char*)&ntags, sizeof(uint64_t) ); + + for(size_t i=0; i<tags.size(); i++) + { + ss.write( (const char *)&tags[i].offset, sizeof(uint64_t) ); + pmt::serialize( tags[i].key, sb ); + pmt::serialize( tags[i].value, sb ); + pmt::serialize( tags[i].srcid, sb ); } - return ss.str(); + return sb.str(); } - std::string - parse_tag_header(std::string &buf_in, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out) { + size_t + parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out) + { + membuf sb(msg.data(), msg.size()); + std::istream iss(&sb); - std::istringstream iss( buf_in ); - size_t rcv_ntags; + size_t min_len = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t); + if (msg.size() < min_len) + throw std::runtime_error("incoming zmq msg too small to hold gr tag header!"); uint16_t header_magic; - uint8_t header_version; + uint8_t header_version; + uint64_t rcv_ntags; + + iss.read( (char*)&header_magic, sizeof(uint16_t) ); + iss.read( (char*)&header_version, sizeof(uint8_t) ); - iss.read( (char*)&header_magic, sizeof(uint16_t ) ); - iss.read( (char*)&header_version, sizeof(uint8_t ) ); - if(header_magic != GR_HEADER_MAGIC){ + if(header_magic != GR_HEADER_MAGIC) throw std::runtime_error("gr header magic does not match!"); - } - if(header_version != 1){ + + if(header_version != 1) throw std::runtime_error("gr header version too high!"); - } - iss.read( (char*)&offset_out, sizeof(uint64_t ) ); - iss.read( (char*)&rcv_ntags, sizeof(size_t ) ); - //std::cout << "RX TAGS: (offset="<<offset_out<<" ntags="<<rcv_ntags<<")\n"; - int rd_offset = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(size_t); - std::stringbuf sb( iss.str().substr(rd_offset) ); + iss.read( (char*)&offset_out, sizeof(uint64_t) ); + iss.read( (char*)&rcv_ntags, sizeof(uint64_t) ); - for(size_t i=0; i<rcv_ntags; i++){ - gr::tag_t newtag; - sb.sgetn( (char*) &(newtag.offset), sizeof(uint64_t) ); + for(size_t i=0; i<rcv_ntags; i++) + { + gr::tag_t newtag; + sb.sgetn( (char*)&(newtag.offset), sizeof(uint64_t) ); newtag.key = pmt::deserialize( sb ); newtag.value = pmt::deserialize( sb ); newtag.srcid = pmt::deserialize( sb ); - //std::cout << "RX TAG: (" << newtag.offset << ", " << newtag.key << ", " << newtag.value << ", " << newtag.srcid << ")\n"; tags_out.push_back(newtag); - iss.str(sb.str()); } - int ndata = sb.in_avail(); - return iss.str().substr(iss.str().size() - ndata); + return msg.size() - sb.in_avail(); } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h index 4c7a81238e..dede5e93a6 100644 --- a/gr-zeromq/lib/tag_headers.h +++ b/gr-zeromq/lib/tag_headers.h @@ -27,12 +27,13 @@ #include <gnuradio/block.h> #include <sstream> #include <cstring> +#include <zmq.hpp> namespace gr { namespace zeromq { - std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags); - std::string parse_tag_header(std::string &buf_in, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out); + std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags); + size_t parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out); } /* namespace zeromq */ } /* namespace gr */ |