summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc10
-rw-r--r--gr-zeromq/lib/pub_sink_impl.h3
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc10
-rw-r--r--gr-zeromq/lib/push_sink_impl.h3
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc12
-rw-r--r--gr-zeromq/lib/rep_sink_impl.h3
-rw-r--r--gr-zeromq/lib/req_source_impl.cc10
-rw-r--r--gr-zeromq/lib/req_source_impl.h3
8 files changed, 25 insertions, 29 deletions
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 3a86819fb3..2749ddff04 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -31,17 +31,17 @@ namespace gr {
namespace zeromq {
pub_sink::sptr
- pub_sink::make(size_t itemsize, size_t vlen, char *address, bool blocking)
+ pub_sink::make(size_t itemsize, size_t vlen, char *address)
{
return gnuradio::get_initial_sptr
- (new pub_sink_impl(itemsize, vlen, address, blocking));
+ (new pub_sink_impl(itemsize, vlen, address));
}
- pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking)
+ pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address)
: gr::sync_block("pub_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_blocking(blocking)
+ d_itemsize(itemsize), d_vlen(vlen)
{
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
@@ -68,7 +68,7 @@ namespace gr {
// create message copy and send
zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+ d_socket->send(msg);
return noutput_items;
}
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index ad0419ad26..bf5abdb8fe 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -34,12 +34,11 @@ namespace gr {
private:
size_t d_itemsize;
size_t d_vlen;
- bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
public:
- pub_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking);
+ pub_sink_impl(size_t itemsize, size_t vlen, char *address);
~pub_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 2df3a6f2db..2f7e4ce2e6 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -31,17 +31,17 @@ namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout, blocking));
+ (new push_sink_impl(itemsize, vlen, address, timeout));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_blocking(blocking)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
{
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
@@ -73,7 +73,7 @@ namespace gr {
// create message copy and send
zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+ d_socket->send(msg);
return noutput_items;
}
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 2ff9bc509a..9a10065eba 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -35,12 +35,11 @@ namespace gr {
size_t d_itemsize;
size_t d_vlen;
float d_timeout;
- bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking);
+ push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
~push_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 35efffa7f4..9f73fb1f99 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -31,17 +31,17 @@ namespace gr {
namespace zeromq {
rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking)
+ rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
{
return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout, blocking));
+ (new rep_sink_impl(itemsize, vlen, address, timeout));
}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking)
+ rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
: gr::sync_block("rep_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_blocking(blocking)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
{
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
@@ -79,14 +79,14 @@ namespace gr {
if (noutput_items < req_output_items) {
zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+ d_socket->send(msg);
return noutput_items;
}
else {
zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
- d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK);
+ d_socket->send(msg);
return req_output_items;
}
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index 500af7a3b3..ff69735757 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -35,12 +35,11 @@ namespace gr {
size_t d_itemsize;
size_t d_vlen;
int d_timeout;
- bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
public:
- rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking);
+ rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
~rep_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 121d2e37fb..3cbe6edbf1 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -31,17 +31,17 @@ namespace gr {
namespace zeromq {
req_source::sptr
- req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking)
+ req_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
{
return gnuradio::get_initial_sptr
- (new req_source_impl(itemsize, vlen, address, timeout, blocking));
+ (new req_source_impl(itemsize, vlen, address, timeout));
}
- req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking)
+ req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout)
: gr::sync_block("req_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_blocking(blocking)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
{
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
@@ -73,7 +73,7 @@ namespace gr {
// Request data, FIXME non portable?
zmq::message_t request(sizeof(int));
memcpy ((void *) request.data (), &noutput_items, sizeof(int));
- d_socket->send(request, d_blocking ? 0 : ZMQ_NOBLOCK);
+ d_socket->send(request);
}
zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index ab844efa00..f61b1d1ce4 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -35,12 +35,11 @@ namespace gr {
size_t d_itemsize;
size_t d_vlen;
int d_timeout;
- bool d_blocking;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
public:
- req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool blocking);
+ req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout);
~req_source_impl();
int work(int noutput_items,