summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/push_sink_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/push_sink_impl.cc')
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc69
1 files changed, 13 insertions, 56 deletions
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 7c06dc590c..a5aec2cf80 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -32,38 +32,19 @@ namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+ base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind (address);
- }
-
- push_sink_impl::~push_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -71,43 +52,19 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- const char *in = (const char *) input_items[0];
-
+ // Poll with a timeout (FIXME: scheduler can't wait for us)
zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
zmq::poll(&itemsout[0], 1, d_timeout);
- // If we got a reply, process
- if (itemsout[0].revents & ZMQ_POLLOUT) {
-
- // encode the current offset, # tags, and tags into header
- std::string header("");
-
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- header = gen_tag_header(offset, tags);
- }
+ // If we can send something, do it
+ if (itemsout[0].revents & ZMQ_POLLOUT)
+ return send_message(input_items[0], noutput_items, nitems_read(0));
- // create message copy and send
- int payloadlen = d_itemsize * d_vlen * noutput_items;
- int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
- zmq::message_t msg(msglen);
-
- if(d_pass_tags){
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
- } else {
- memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- }
-
- d_socket->send(msg);
- return noutput_items;
- }
- else {
- return 0; // FIXME: when scheduler supports return blocking
- }
+ // If not, do nothing
+ return 0;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab