summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohannes Schmitz <schmitz@ti.rwth-aachen.de>2014-05-08 16:50:22 +0200
committerJohannes Schmitz <schmitz@ti.rwth-aachen.de>2014-05-08 16:50:22 +0200
commit1545615b3a2fdf233d56d88ea235f300f7267813 (patch)
tree1164bdad85ec2cf5badb353f5dbb8d446e8c88b7
parenta4e6a803f34b349f051a3ff2bd3cf5b2140cfbab (diff)
zeromq: Add missing timeout and blocking parameters and polling
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/push_sink.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/req_source.h2
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc27
-rw-r--r--gr-zeromq/lib/push_sink_impl.h3
-rw-r--r--gr-zeromq/lib/rep_sink_impl.h2
-rw-r--r--gr-zeromq/lib/req_source_impl.cc16
-rw-r--r--gr-zeromq/lib/req_source_impl.h4
7 files changed, 36 insertions, 20 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index 46ad6a4863..53db71b0ad 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
* \param blocking Indicate whether blocking sends should be used, default true.
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, bool blocking=true);
+ static sptr make(size_t itemsize, size_t vlen, char *address, float timeout=0.1, bool blocking=true);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h b/gr-zeromq/include/gnuradio/zeromq/req_source.h
index 5fc3682241..0f7c44d3c6 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h
@@ -52,7 +52,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address);
+ static sptr make(size_t itemsize, size_t vlen, char *address, float timeout=0.1, bool blocking=true);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 5acc3c555e..9d92c51c6d 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -31,18 +31,19 @@ namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, bool blocking)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, blocking));
+ (new push_sink_impl(itemsize, vlen, address, timeout, blocking));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
d_itemsize(itemsize), d_vlen(vlen)
{
+ d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
d_blocking = blocking;
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
@@ -64,11 +65,21 @@ namespace gr {
{
const char *in = (const char *) input_items[0];
- // create message copy and send
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
- return noutput_items;
+ zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+ zmq::poll (&itemsout[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (itemsout[0].revents & ZMQ_POLLOUT) {
+ // create message copy and send
+ zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
+ memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
+ d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+
+ return noutput_items;
+ }
+ else {
+ return 0;
+ }
}
} /* namespace zeromq */
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index a34bb28a70..e1a9051e21 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -34,12 +34,13 @@ namespace gr {
private:
size_t d_itemsize;
size_t d_vlen;
+ float d_timeout;
bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking);
+ push_sink_impl(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking);
~push_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index a933d94fa6..6e10a89b59 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -35,9 +35,9 @@ namespace gr {
size_t d_itemsize;
size_t d_vlen;
int d_timeout;
+ bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
- bool d_blocking;
public:
rep_sink_impl(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking);
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 937b5942c0..84cbcc7c8f 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -31,18 +31,19 @@ namespace gr {
namespace zeromq {
req_source::sptr
- req_source::make(size_t itemsize, size_t vlen, char *address)
+ req_source::make(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking)
{
return gnuradio::get_initial_sptr
- (new req_source_impl(itemsize, vlen, address));
+ (new req_source_impl(itemsize, vlen, address, timeout, blocking));
}
- req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address)
+ req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking)
: gr::sync_block("req_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_blocking(blocking)
{
+ d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0;
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
int time = 0;
@@ -68,14 +69,15 @@ namespace gr {
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable
+ // Request data, FIXME non portable?
zmq::message_t request(sizeof(int));
memcpy ((void *) request.data (), &noutput_items, sizeof(int));
- d_socket->send(request);
+ d_socket->send(request, d_blocking ? 0 : ZMQ_NOBLOCK);
+
}
zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&itemsin[0], 1, 0);
+ zmq::poll (&itemsin[0], 1, d_timeout);
// If we got a reply, process
if (itemsin[0].revents & ZMQ_POLLIN) {
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index c906e9137d..754f2081e8 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -34,11 +34,13 @@ namespace gr {
private:
size_t d_itemsize;
size_t d_vlen;
+ int d_timeout;
+ bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
public:
- req_source_impl(size_t itemsize, size_t vlen, char *address);
+ req_source_impl(size_t itemsize, size_t vlen, char *address, float timeout, bool blocking);
~req_source_impl();
int work(int noutput_items,