diff options
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.cc | 14 | ||||
-rw-r--r-- | gr-zeromq/lib/pub_sink_impl.h | 3 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_source_impl.cc | 29 | ||||
-rw-r--r-- | gr-zeromq/lib/pull_source_impl.h | 1 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.cc | 9 | ||||
-rw-r--r-- | gr-zeromq/lib/push_sink_impl.h | 1 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc | 10 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.h | 1 | ||||
-rw-r--r-- | gr-zeromq/lib/req_source_impl.cc | 27 | ||||
-rw-r--r-- | gr-zeromq/lib/req_source_impl.h | 1 |
10 files changed, 51 insertions, 45 deletions
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 086e995b2e..07002395f5 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -31,17 +31,17 @@ namespace gr { namespace zeromq { pub_sink::sptr - pub_sink::make(size_t itemsize, char *address, bool blocking) + pub_sink::make(size_t itemsize, size_t vlen, char *address, bool blocking) { return gnuradio::get_initial_sptr - (new pub_sink_impl(itemsize, address, blocking)); + (new pub_sink_impl(itemsize, vlen, address, blocking)); } - pub_sink_impl::pub_sink_impl(size_t itemsize, char *address, bool blocking) + pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking) : gr::sync_block("pub_sink", - gr::io_signature::make(1, 1, itemsize), + gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_blocking(blocking) + d_itemsize(itemsize), d_vlen(vlen), d_blocking(blocking) { d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); @@ -62,8 +62,8 @@ namespace gr { const char *in = (const char *)input_items[0]; // create message copy and send - zmq::message_t msg(d_itemsize*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*d_vlen*noutput_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index d1cb02d289..6d36e207cd 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -33,12 +33,13 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; bool d_blocking; zmq::context_t *d_context; zmq::socket_t *d_socket; public: - pub_sink_impl(size_t itemsize, char *address, bool blocking); + pub_sink_impl(size_t itemsize, size_t vlen, char *address, bool blocking); ~pub_sink_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 9b9e50a38f..b29a056d84 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("pull_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_vlen(vlen) { d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0; d_context = new zmq::context_t(1); @@ -71,21 +71,22 @@ namespace gr { // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t msg; - d_socket->recv(&msg); + // Receive data + zmq::message_t msg; + std::cout << "pull before" << std::endl; + d_socket->recv(&msg); + std::cout << "pull after" << std::endl; + // Copy to ouput buffer and return + if (msg.size() >= d_itemsize*d_vlen*noutput_items) { + memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items); - // Copy to ouput buffer and return - if (msg.size() >= d_itemsize*noutput_items) { - memcpy(out, (void *)msg.data(), d_itemsize*noutput_items); + return noutput_items; + } + else { + memcpy(out, (void *)msg.data(), msg.size()); - return noutput_items; - } - else { - memcpy(out, (void *)msg.data(), msg.size()); - - return msg.size()/d_itemsize; - } + return msg.size()/(d_itemsize*d_vlen); + } } else { return 0; // FIXME: someday when the scheduler does all the poll/selects diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h index 3e56bdc4c5..de2e903a85 100644 --- a/gr-zeromq/lib/pull_source_impl.h +++ b/gr-zeromq/lib/pull_source_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; int d_timeout; // microseconds, -1 is blocking zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index 43ccd9dbca..1438e524a5 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("push_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_vlen(vlen) { d_blocking = blocking; d_context = new zmq::context_t(1); @@ -63,10 +63,11 @@ namespace gr { const char *in = (const char *) input_items[0]; // create message copy and send - zmq::message_t msg(d_itemsize*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*d_vlen*noutput_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); + std::cout << "before" << std::endl; d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); - + std::cout << "after" << std::endl; return noutput_items; } diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h index 77f1f60c8d..76ee2c2b39 100644 --- a/gr-zeromq/lib/push_sink_impl.h +++ b/gr-zeromq/lib/push_sink_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; bool d_blocking; zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 38bb7d779e..7e5f43df4c 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_blocking(blocking) + d_itemsize(itemsize), d_vlen(vlen), d_blocking(blocking) { d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0; d_context = new zmq::context_t(1); @@ -74,15 +74,15 @@ namespace gr { // create message copy and send if (noutput_items < req_output_items) { - zmq::message_t msg(d_itemsize*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*d_vlen*noutput_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; } else { - zmq::message_t msg(d_itemsize*req_output_items); - memcpy((void *)msg.data(), in, d_itemsize*req_output_items); + zmq::message_t msg(d_itemsize*d_vlen*req_output_items); + memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items); d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return req_output_items; diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h index 0996d53616..17e1da790d 100644 --- a/gr-zeromq/lib/rep_sink_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index ffe31f9953..8dedf6c6af 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -41,7 +41,7 @@ namespace gr { : gr::sync_block("req_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_vlen(vlen) { d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); @@ -56,8 +56,8 @@ namespace gr { int req_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) { char *out = (char*)output_items[0]; @@ -66,10 +66,10 @@ namespace gr { // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { - // Request data, FIXME non portable - zmq::message_t request(sizeof(int)); - memcpy ((void *) request.data (), &noutput_items, sizeof(int)); - d_socket->send(request); + // Request data, FIXME non portable + zmq::message_t request(sizeof(int)); + memcpy ((void *) request.data (), &noutput_items, sizeof(int)); + d_socket->send(request); } zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; @@ -77,14 +77,13 @@ namespace gr { // If we got a reply, process if (itemsin[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t reply; - d_socket->recv(&reply); + // Receive data + zmq::message_t reply; + d_socket->recv(&reply); - // Copy to ouput buffer and return - memcpy(out, (void *)reply.data(), reply.size()); - - return reply.size()/d_itemsize; + // Copy to ouput buffer and return + memcpy(out, (void *)reply.data(), reply.size()); + return reply.size()/(d_itemsize*d_vlen); } return 0; diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h index 6c87b48bca..4d40f80cb2 100644 --- a/gr-zeromq/lib/req_source_impl.h +++ b/gr-zeromq/lib/req_source_impl.h @@ -33,6 +33,7 @@ namespace gr { { private: size_t d_itemsize; + size_t d_vlen; zmq::context_t *d_context; zmq::socket_t *d_socket; |