diff options
author | Sylvain Munaut <tnt@246tNt.com> | 2016-01-27 12:58:50 +0100 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2016-01-27 10:31:26 -0800 |
commit | 6e482c5bb6bf49f000f6b8d35a1ca84127e38c46 (patch) | |
tree | ee9e4e3323da08dfabe91257229b970a38dff066 /gr-zeromq/lib/pub_sink_impl.cc | |
parent | 393624c072417e0b8a74480979eeccdfb8278e77 (diff) |
gr-zeromq: Big rework for performance and correctness
- Use class hierarchy trying to maximize code re-use.
- Dont' drop samples on receive if the output buffer doesn't have
enough space.
- Don't drop tags on receive by putting tags in the future.
- Better metadata creation/parsing avoiding copying lots data.
- Always do as much work as possible in a single call to work()
to avoid scheduler overhead as long as possible.
- Allow setting the high watermark to avoid older version of
zeromq's default of buffering infinite messages and causing a
paging thrash to/from disk when the flow graph can't keep up.
Signed-off-by: Sylvain Munaut <tnt@246tNt.com>
Diffstat (limited to 'gr-zeromq/lib/pub_sink_impl.cc')
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 55 |
1 files changed, 8 insertions, 47 deletions
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index c1030696ac..b602bc83a6 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -32,35 +32,19 @@ namespace gr { namespace zeromq { pub_sink::sptr - pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags)); + (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("pub_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - if (major < 3) { - d_timeout = timeout*1000; - } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind(address); - } - - pub_sink_impl::~pub_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -68,33 +52,10 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - const char *in = (const char *)input_items[0]; - - // encode the current offset, # tags, and tags into header - std::string header(""); - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); - } - - // create message copy and send - int payloadlen = d_itemsize * d_vlen * noutput_items; - int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen; - zmq::message_t msg(msglen); - - if(d_pass_tags){ - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - } else { - memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - } - - d_socket->send(msg); - - return noutput_items; + return send_message(input_items[0], noutput_items, nitems_read(0)); } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab |