diff options
Diffstat (limited to 'gr-zeromq/lib/rep_sink_impl.cc')
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 91 |
1 files changed, 35 insertions, 56 deletions
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 034a5b0f83..ac6fc9c8b1 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -32,38 +32,19 @@ namespace gr { namespace zeromq { rep_sink::sptr - rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) { return gnuradio::get_initial_sptr - (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags)); + (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); } - rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) + rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) + base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm) { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REP); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind (address); - } - - rep_sink_impl::~rep_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + /* All is delegated */ } int @@ -71,46 +52,44 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - const char *in = (const char *) input_items[0]; - - zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, d_timeout); - - // If we got a reply, process - if (items[0].revents & ZMQ_POLLIN) { - // receive data request + const uint8_t *in = (const uint8_t *) input_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) + { + /* Wait for a small time (FIXME: scheduler can't wait for us) */ + /* We only wait if its the first iteration, for the others we'll + * let the scheduler retry */ + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, first ? d_timeout : 0); + + /* If we dont have anything, we're done */ + if (!(items[0].revents & ZMQ_POLLIN)) + break; + + /* Get and parse the request */ zmq::message_t request; d_socket->recv(&request); - int req_output_items = *(static_cast<int*>(request.data())); - int nitems_send = std::min(noutput_items, req_output_items); - // encode the current offset, # tags, and tags into header - std::string header(""); - if(d_pass_tags){ - uint64_t offset = nitems_read(0); - std::vector<gr::tag_t> tags; - get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); - header = gen_tag_header( offset, tags ); + int nitems_send = noutput_items - done; + if (request.size() >= sizeof(uint32_t)) + { + int req = (int)*(static_cast<uint32_t*>(request.data())); + nitems_send = std::min(nitems_send, req); } + /* Delegate the actual send */ + done += send_message(in + (done * d_vsize), nitems_send, nitems_read(0) + done); - // create message copy and send - int payloadlen = d_itemsize * d_vlen * noutput_items; - int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen; - zmq::message_t msg(msglen); - - if(d_pass_tags){ - memcpy((void*) msg.data(), header.c_str(), header.length() ); - memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); - } else { - memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - } - d_socket->send(msg); - - return nitems_send; + /* Not the first anymore */ + first = false; } - return 0; + return done; } } /* namespace zeromq */ } /* namespace gr */ + +// vim: ts=2 sw=2 expandtab |