diff options
author | Sylvain Munaut <tnt@246tNt.com> | 2016-01-27 12:58:50 +0100 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2016-01-27 10:31:26 -0800 |
commit | 6e482c5bb6bf49f000f6b8d35a1ca84127e38c46 (patch) | |
tree | ee9e4e3323da08dfabe91257229b970a38dff066 /gr-zeromq/lib/req_source_impl.cc | |
parent | 393624c072417e0b8a74480979eeccdfb8278e77 (diff) |
gr-zeromq: Big rework for performance and correctness
- Use class hierarchy trying to maximize code re-use.
- Dont' drop samples on receive if the output buffer doesn't have
enough space.
- Don't drop tags on receive by putting tags in the future.
- Better metadata creation/parsing avoiding copying lots data.
- Always do as much work as possible in a single call to work()
to avoid scheduler overhead as long as possible.
- Allow setting the high watermark to avoid older version of
zeromq's default of buffering infinite messages and causing a
paging thrash to/from disk when the flow graph can't keep up.
Signed-off-by: Sylvain Munaut <tnt@246tNt.com>
Diffstat (limited to 'gr-zeromq/lib/req_source_impl.cc')
-rw-r--r-- | gr-zeromq/lib/req_source_impl.cc | 107 |
1 files changed, 48 insertions, 59 deletions
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index f69d447f98..526736389e 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -32,38 +32,20 @@ namespace gr { namespace zeromq { req_source::sptr - req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new req_source_impl(itemsize, vlen, address, timeout, pass_tags)); + (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("req_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm), + d_req_pending(false) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->connect (address); - } - - req_source_impl::~req_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -71,49 +53,56 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - char *out = (char*)output_items[0]; +#if 0 +#endif + uint8_t *out = (uint8_t *) output_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) + { + if (has_pending()) + { + /* Flush anything pending */ + done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + + /* No more space ? */ + if (done == noutput_items) + break; + } + else + { + /* Send request if needed */ + if (!d_req_pending) + { + /* The REP/REQ pattern state machine guarantees we can send at this point */ + uint32_t req_len = noutput_items - done; + zmq::message_t request(sizeof(uint32_t)); + memcpy ((void *) request.data (), &req_len, sizeof(uint32_t)); + d_socket->send(request); + + d_req_pending = true; + } - zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; - zmq::poll (&itemsout[0], 1, d_timeout); + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ - // If we got a reply, process - if (itemsout[0].revents & ZMQ_POLLOUT) { - // Request data, FIXME non portable? - zmq::message_t request(sizeof(int)); - memcpy ((void *) request.data (), &noutput_items, sizeof(int)); - d_socket->send(request); - } + /* Got response */ + d_req_pending = false; - zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&itemsin[0], 1, d_timeout); - - // If we got a reply, process - if (itemsin[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t reply; - d_socket->recv(&reply); - - // Deserialize header data / tags - std::string buf(static_cast<char*>(reply.data()), reply.size()); - - if(d_pass_tags){ - uint64_t rcv_offset; - std::vector<gr::tag_t> tags; - buf = parse_tag_header(buf, rcv_offset, tags); - for(size_t i=0; i<tags.size(); i++){ - tags[i].offset -= rcv_offset - nitems_written(0); - add_item_tag(0, tags[i]); - } + /* Not the first anymore */ + first = false; } - - - // Copy to ouput buffer and return - memcpy(out, (void *)&buf[0], buf.size()); - return buf.size()/(d_itemsize*d_vlen); } + return done; + return 0; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab |