diff options
-rwxr-xr-x | gr-zeromq/examples/server.py | 4 | ||||
-rw-r--r-- | gr-zeromq/grc/CMakeLists.txt | 3 | ||||
-rw-r--r-- | gr-zeromq/grc/zeromq_rep_sink.xml | 65 | ||||
-rw-r--r-- | gr-zeromq/grc/zeromq_sink_reqrep.xml | 38 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt | 8 | ||||
-rw-r--r-- | gr-zeromq/include/gnuradio/zeromq/rep_sink.h (renamed from gr-zeromq/include/gnuradio/zeromq/sink_reqrep.h) | 32 | ||||
-rw-r--r-- | gr-zeromq/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.cc (renamed from gr-zeromq/lib/sink_reqrep_impl.cc) | 31 | ||||
-rw-r--r-- | gr-zeromq/lib/rep_sink_impl.h (renamed from gr-zeromq/lib/sink_reqrep_impl.h) | 18 | ||||
-rw-r--r-- | gr-zeromq/swig/zeromq_swig.i | 6 |
10 files changed, 122 insertions, 85 deletions
diff --git a/gr-zeromq/examples/server.py b/gr-zeromq/examples/server.py index 0ca1b4f99c..4750df9aa4 100755 --- a/gr-zeromq/examples/server.py +++ b/gr-zeromq/examples/server.py @@ -58,8 +58,8 @@ class top_block(gr.top_block): self.gr_sig_source = analog.sig_source_f(samp_rate, analog.GR_SIN_WAVE , 1000, 1, 0) self.throttle = blocks.throttle(gr.sizeof_float, samp_rate) self.mult = blocks.multiply_const_ff(1) - #self.zmq_sink = zeromq.sink_reqrep_nopoll(gr.sizeof_float, sink_adr) - self.zmq_sink = zeromq.sink_reqrep(gr.sizeof_float, sink_adr) + #self.zmq_sink = zeromq.rep_sink_nopoll(gr.sizeof_float, sink_adr) + self.zmq_sink = zeromq.rep_sink(gr.sizeof_float, sink_adr) #self.zmq_sink = zeromq.push_sink(gr.sizeof_float, sink_adr) #self.zmq_probe = zeromq.push_sink(gr.sizeof_float, probe_adr) self.zmq_probe = zeromq.pub_sink(gr.sizeof_float, probe_adr) diff --git a/gr-zeromq/grc/CMakeLists.txt b/gr-zeromq/grc/CMakeLists.txt index 8709511715..e30a147d0b 100644 --- a/gr-zeromq/grc/CMakeLists.txt +++ b/gr-zeromq/grc/CMakeLists.txt @@ -21,10 +21,11 @@ install(FILES zeromq_pub_sink.xml zeromq_push_sink.xml zeromq_pull_source.xml + zeromq_rep_sink.xml DESTINATION share/gnuradio/grc/blocks ) -# zeromq_sink_reqrep.xml + # zeromq_source_reqrep.xml # zeromq_sink_reqrep_nopoll.xml # zeromq_source_reqrep_nopoll.xml diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml b/gr-zeromq/grc/zeromq_rep_sink.xml new file mode 100644 index 0000000000..a6307943fb --- /dev/null +++ b/gr-zeromq/grc/zeromq_rep_sink.xml @@ -0,0 +1,65 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ REP Sink</name> + <key>zeromq_rep_sink</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.rep_sink($type.itemsize, $address, $blocking)</make> + + <param> + <name>IO Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Complex</name> + <key>complex</key> + <opt>itemsize:gr.sizeof_gr_complex</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>itemsize:gr.sizeof_float</opt> + </option> + <option> + <name>Int</name> + <key>int</key> + <opt>itemsize:gr.sizeof_int</opt> + </option> + <option> + <name>Short</name> + <key>short</key> + <opt>itemsize:gr.sizeof_short</opt> + </option> + <option> + <name>Byte</name> + <key>byte</key> + <opt>itemsize:gr.sizeof_char</opt> + </option> + </param> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (sec)</name> + <key>timeout</key> + <value>0.1</value> + <type>float</type> + </param> + + <param> + <name>Blocking</name> + <key>blocking</key> + <value>True</value> + <type>bool</type> + </param> + + <sink> + <name>in</name> + <type>$type</type> + </sink> + +</block> diff --git a/gr-zeromq/grc/zeromq_sink_reqrep.xml b/gr-zeromq/grc/zeromq_sink_reqrep.xml deleted file mode 100644 index eb0cd0b756..0000000000 --- a/gr-zeromq/grc/zeromq_sink_reqrep.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0"?> -<block> - <name>sink_reqrep</name> - <key>zeromq_sink_reqrep</key> - <category>zeromq</category> - <import>import zeromq</import> - <make>zeromq.sink_reqrep($itemsize, $*address)</make> - <!-- Make one 'param' node for every Parameter you want settable from the GUI. - Sub-nodes: - * name - * key (makes the value accessible as $keyname, e.g. in the make node) - * type --> - <param> - <name>...</name> - <key>...</key> - <type>...</type> - </param> - - <!-- Make one 'sink' node per input. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <sink> - <name>in</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </sink> - - <!-- Make one 'source' node per output. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <source> - <name>out</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </source> -</block> diff --git a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt index ec0f5f414e..82088c6b6b 100644 --- a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt +++ b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt @@ -25,11 +25,11 @@ install(FILES pub_sink.h pull_source.h push_sink.h - sink_reqrep.h - sink_reqrep_nopoll.h - source_reqrep.h - source_reqrep_nopoll.h + rep_sink.h DESTINATION ${GR_INCLUDE_DIR}/gnuradio/zeromq COMPONENT "zeromq_devel" ) +# sink_reqrep_nopoll.h +# source_reqrep.h +# source_reqrep_nopoll.h diff --git a/gr-zeromq/include/gnuradio/zeromq/sink_reqrep.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h index 0c5377f630..dcae967f32 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sink_reqrep.h +++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -20,8 +20,8 @@ * Boston, MA 02110-1301, USA. */ -#ifndef INCLUDED_ZEROMQ_SINK_REQREP_H -#define INCLUDED_ZEROMQ_SINK_REQREP_H +#ifndef INCLUDED_ZEROMQ_REP_SINK_H +#define INCLUDED_ZEROMQ_REP_SINK_H #include <gnuradio/zeromq/api.h> #include <gnuradio/sync_block.h> @@ -30,27 +30,33 @@ namespace gr { namespace zeromq { /*! - * \brief <+description of block+> + * \brief Sink the contents of a stream to a ZMQ REP socket * \ingroup zeromq * + * \details + * This block acts a a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ REP socket. A REP socket will + * only send its contents to an attached REQ socket when it + * requests items. */ - class ZEROMQ_API sink_reqrep : virtual public gr::sync_block + class ZEROMQ_API rep_sink : virtual public gr::sync_block { public: - typedef boost::shared_ptr<sink_reqrep> sptr; + typedef boost::shared_ptr<rep_sink> sptr; /*! - * \brief Return a shared_ptr to a new instance of zeromq::sink_reqrep. + * \brief Return a shared_ptr to a new instance of zeromq::rep_sink. + * + * \param itemsize Size of a stream item in bytes + * \param address ZMQ socket address specifier + * \param timeout Timeout for request poll, in seconds + * \param blocking Indicate whether blocking sends should be used, default true. * - * To avoid accidental use of raw pointers, zeromq::sink_reqrep's - * constructor is in a private implementation - * class. zeromq::sink_reqrep::make is the public interface for - * creating new instances. */ - static sptr make(size_t itemsize, char *address); + static sptr make(size_t itemsize, char *address, float timeout=0.1, bool blocking=true); }; } // namespace zeromq } // namespace gr -#endif /* INCLUDED_ZEROMQ_SINK_REQREP_H */ +#endif /* INCLUDED_ZEROMQ_REP_SINK_H */ diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt index d6c580ea0f..5e3c170609 100644 --- a/gr-zeromq/lib/CMakeLists.txt +++ b/gr-zeromq/lib/CMakeLists.txt @@ -41,7 +41,7 @@ list(APPEND zeromq_sources pub_sink_impl.cc pull_source_impl.cc push_sink_impl.cc - sink_reqrep_impl.cc + rep_sink_impl.cc sink_reqrep_nopoll_impl.cc source_reqrep_impl.cc source_reqrep_nopoll_impl.cc diff --git a/gr-zeromq/lib/sink_reqrep_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index 6c5c2c4b95..abd4e6d5a7 100644 --- a/gr-zeromq/lib/sink_reqrep_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -25,44 +25,45 @@ #endif #include <gnuradio/io_signature.h> -#include "sink_reqrep_impl.h" +#include "rep_sink_impl.h" namespace gr { namespace zeromq { - sink_reqrep::sptr - sink_reqrep::make(size_t itemsize, char *address) + rep_sink::sptr + rep_sink::make(size_t itemsize, char *address, float timeout, bool blocking) { return gnuradio::get_initial_sptr - (new sink_reqrep_impl(itemsize, address)); + (new rep_sink_impl(itemsize, address, timeout, blocking)); } - sink_reqrep_impl::sink_reqrep_impl(size_t itemsize, char *address) - : gr::sync_block("sink_reqrep", + rep_sink_impl::rep_sink_impl(size_t itemsize, char *address, float timeout, bool blocking) + : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_blocking(blocking) { + d_timeout = timeout >= 0 ? (int)(timeout*1e6) : 0; d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_REP); d_socket->bind (address); } - sink_reqrep_impl::~sink_reqrep_impl() + rep_sink_impl::~rep_sink_impl() { delete d_socket; delete d_context; } int - sink_reqrep_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) + rep_sink_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) { const char *in = (const char *) input_items[0]; zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, 1000); + zmq::poll (&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { @@ -75,14 +76,14 @@ namespace gr { if (noutput_items < req_output_items) { zmq::message_t msg(d_itemsize*noutput_items); memcpy((void *)msg.data(), in, d_itemsize*noutput_items); - d_socket->send(msg); + d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; } else { zmq::message_t msg(d_itemsize*req_output_items); memcpy((void *)msg.data(), in, d_itemsize*req_output_items); - d_socket->send(msg); + d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return req_output_items; } diff --git a/gr-zeromq/lib/sink_reqrep_impl.h b/gr-zeromq/lib/rep_sink_impl.h index ec700f01a3..ad2db2a081 100644 --- a/gr-zeromq/lib/sink_reqrep_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,204 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -20,25 +20,27 @@ * Boston, MA 02110-1301, USA. */ -#ifndef INCLUDED_ZMQBLOCKS_SINK_REQREP_IMPL_H -#define INCLUDED_ZMQBLOCKS_SINK_REQREP_IMPL_H +#ifndef INCLUDED_ZMQBLOCKS_REP_SINK_IMPL_H +#define INCLUDED_ZMQBLOCKS_REP_SINK_IMPL_H -#include <gnuradio/zeromq/sink_reqrep.h> +#include <gnuradio/zeromq/rep_sink.h> #include <zmq.hpp> namespace gr { namespace zeromq { - class sink_reqrep_impl : public sink_reqrep + class rep_sink_impl : public rep_sink { private: size_t d_itemsize; + int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_blocking; public: - sink_reqrep_impl(size_t itemsize, char *address); - ~sink_reqrep_impl(); + rep_sink_impl(size_t itemsize, char *address, float timeout, bool blocking); + ~rep_sink_impl(); int work(int noutput_items, gr_vector_const_void_star &input_items, @@ -48,4 +50,4 @@ namespace gr { } // namespace zeromq } // namespace gr -#endif /* INCLUDED_ZMQBLOCKS_SINK_REQREP_IMPL_H */ +#endif /* INCLUDED_ZMQBLOCKS_REP_SINK_IMPL_H */ diff --git a/gr-zeromq/swig/zeromq_swig.i b/gr-zeromq/swig/zeromq_swig.i index 8370ac64b6..ce95938e8d 100644 --- a/gr-zeromq/swig/zeromq_swig.i +++ b/gr-zeromq/swig/zeromq_swig.i @@ -30,7 +30,7 @@ %{ #include "gnuradio/zeromq/pub_sink.h" #include "gnuradio/zeromq/push_sink.h" -#include "gnuradio/zeromq/sink_reqrep.h" +#include "gnuradio/zeromq/rep_sink.h" #include "gnuradio/zeromq/sink_reqrep_nopoll.h" #include "gnuradio/zeromq/pull_source.h" #include "gnuradio/zeromq/source_reqrep.h" @@ -39,7 +39,7 @@ %include "gnuradio/zeromq/pub_sink.h" %include "gnuradio/zeromq/push_sink.h" -%include "gnuradio/zeromq/sink_reqrep.h" +%include "gnuradio/zeromq/rep_sink.h" %include "gnuradio/zeromq/sink_reqrep_nopoll.h" %include "gnuradio/zeromq/pull_source.h" %include "gnuradio/zeromq/source_reqrep.h" @@ -47,7 +47,7 @@ GR_SWIG_BLOCK_MAGIC2(zeromq, pub_sink); GR_SWIG_BLOCK_MAGIC2(zeromq, push_sink); -GR_SWIG_BLOCK_MAGIC2(zeromq, sink_reqrep); +GR_SWIG_BLOCK_MAGIC2(zeromq, rep_sink); GR_SWIG_BLOCK_MAGIC2(zeromq, sink_reqrep_nopoll); GR_SWIG_BLOCK_MAGIC2(zeromq, pull_source); GR_SWIG_BLOCK_MAGIC2(zeromq, source_reqrep); |