diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2014-04-18 11:16:53 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2014-04-22 15:07:21 -0700 |
commit | 9c4cf2b829f05d58d5089cf90a75d3f3f98513dc (patch) | |
tree | e0c1d3ad697137813cbac73ca7d0bc21f66d5035 | |
parent | 4b8c849546573b32337835c4956dc59bc6dad636 (diff) |
zeromq: fix import in example
24 files changed, 280 insertions, 236 deletions
diff --git a/gr-zeromq/CMakeLists.txt b/gr-zeromq/CMakeLists.txt index d96b043251..db1469a012 100644 --- a/gr-zeromq/CMakeLists.txt +++ b/gr-zeromq/CMakeLists.txt @@ -90,7 +90,7 @@ add_subdirectory(lib) if(ENABLE_PYTHON) add_subdirectory(swig) add_subdirectory(python/zeromq) -# add_subdirectory(grc) + add_subdirectory(grc) # add_subdirectory(examples) endif(ENABLE_PYTHON) #add_subdirectory(doc) diff --git a/gr-zeromq/examples/gui.py b/gr-zeromq/examples/gui.py index 5656fd9a39..6d2304a71b 100755 --- a/gr-zeromq/examples/gui.py +++ b/gr-zeromq/examples/gui.py @@ -1,25 +1,25 @@ #!/usr/bin/env python -# +# # Copyright 2013 Institute for Theoretical Information Technology, # RWTH Aachen University -# +# # Authors: Johannes Schmitz <schmitz@ti.rwth-aachen.de> -# +# # This is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. -# +# # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. -# +# # You should have received a copy of the GNU General Public License # along with this software; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. -# +# ############################################################################### # Imports ############################################################################### @@ -30,7 +30,7 @@ import sys import os from PyQt4 import Qt, QtGui, QtCore, uic import PyQt4.Qwt5 as Qwt -import zeromq +from gnuradio import zeromq import signal class gui(QtGui.QMainWindow): diff --git a/gr-zeromq/grc/CMakeLists.txt b/gr-zeromq/grc/CMakeLists.txt index 6f1b619b4c..42ccfa5ca4 100644 --- a/gr-zeromq/grc/CMakeLists.txt +++ b/gr-zeromq/grc/CMakeLists.txt @@ -16,14 +16,17 @@ # along with GNU Radio; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. + install(FILES - zmqblocks_sink_reqrep.xml - zmqblocks_source_reqrep.xml - zmqblocks_sink_reqrep_nopoll.xml - zmqblocks_source_reqrep_nopoll.xml - zmqblocks_sink_pushpull.xml - zmqblocks_source_pushpull.xml - zmqblocks_source_pushpull_feedback.xml - zmqblocks_sink_pubsub.xml - zmqblocks_probe_pubsub.xml DESTINATION share/gnuradio/grc/blocks + zeromq_sink_pubsub.xml + zeromq_sink_pushpull.xml + zeromq_source_pushpull.xml + + DESTINATION share/gnuradio/grc/blocks ) +# zeromq_sink_reqrep.xml +# zeromq_source_reqrep.xml +# zeromq_sink_reqrep_nopoll.xml +# zeromq_source_reqrep_nopoll.xml +# zeromq_source_pushpull.xml +# zeromq_source_pushpull_feedback.xml diff --git a/gr-zeromq/grc/zeromq_sink_pubsub.xml b/gr-zeromq/grc/zeromq_sink_pubsub.xml new file mode 100644 index 0000000000..787eefb2cd --- /dev/null +++ b/gr-zeromq/grc/zeromq_sink_pubsub.xml @@ -0,0 +1,58 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ PUB/SUB Sink</name> + <key>zeromq_sink_pubsub</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.sink_pubsub($type.itemsize, $address, $blocking)</make> + + <param> + <name>IO Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Complex</name> + <key>complex</key> + <opt>itemsize:gr.sizeof_gr_complex</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>itemsize:gr.sizeof_float</opt> + </option> + <option> + <name>Int</name> + <key>int</key> + <opt>itemsize:gr.sizeof_int</opt> + </option> + <option> + <name>Short</name> + <key>short</key> + <opt>itemsize:gr.sizeof_short</opt> + </option> + <option> + <name>Byte</name> + <key>byte</key> + <opt>itemsize:gr.sizeof_char</opt> + </option> + </param> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Blocking</name> + <key>blocking</key> + <value>True</value> + <type>bool</type> + </param> + + <sink> + <name>in</name> + <type>$type</type> + </sink> + +</block> diff --git a/gr-zeromq/grc/zeromq_sink_pushpull.xml b/gr-zeromq/grc/zeromq_sink_pushpull.xml new file mode 100644 index 0000000000..173f4a5e32 --- /dev/null +++ b/gr-zeromq/grc/zeromq_sink_pushpull.xml @@ -0,0 +1,58 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ PUSH/PULL Sink</name> + <key>zeromq_sink_pushpull</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.sink_pushpull($type.itemsize, $address, $blocking)</make> + + <param> + <name>IO Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Complex</name> + <key>complex</key> + <opt>itemsize:gr.sizeof_gr_complex</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>itemsize:gr.sizeof_float</opt> + </option> + <option> + <name>Int</name> + <key>int</key> + <opt>itemsize:gr.sizeof_int</opt> + </option> + <option> + <name>Short</name> + <key>short</key> + <opt>itemsize:gr.sizeof_short</opt> + </option> + <option> + <name>Byte</name> + <key>byte</key> + <opt>itemsize:gr.sizeof_char</opt> + </option> + </param> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Blocking</name> + <key>blocking</key> + <value>True</value> + <type>bool</type> + </param> + + <sink> + <name>in</name> + <type>$type</type> + </sink> + +</block> diff --git a/gr-zeromq/grc/zmqblocks_sink_reqrep.xml b/gr-zeromq/grc/zeromq_sink_reqrep.xml index 99d39fc7b8..eb0cd0b756 100644 --- a/gr-zeromq/grc/zmqblocks_sink_reqrep.xml +++ b/gr-zeromq/grc/zeromq_sink_reqrep.xml @@ -1,10 +1,10 @@ <?xml version="1.0"?> <block> <name>sink_reqrep</name> - <key>zmqblocks_sink_reqrep</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.sink_reqrep($itemsize, $*address)</make> + <key>zeromq_sink_reqrep</key> + <category>zeromq</category> + <import>import zeromq</import> + <make>zeromq.sink_reqrep($itemsize, $*address)</make> <!-- Make one 'param' node for every Parameter you want settable from the GUI. Sub-nodes: * name diff --git a/gr-zeromq/grc/zmqblocks_sink_reqrep_nopoll.xml b/gr-zeromq/grc/zeromq_sink_reqrep_nopoll.xml index 4f9f6f44bd..a09c82ae53 100644 --- a/gr-zeromq/grc/zmqblocks_sink_reqrep_nopoll.xml +++ b/gr-zeromq/grc/zeromq_sink_reqrep_nopoll.xml @@ -1,10 +1,10 @@ <?xml version="1.0"?> <block> <name>sink_reqrep_nopoll</name> - <key>zmqblocks_sink_reqrep_nopoll</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.sink_reqrep_nopoll($itemsize, $*address)</make> + <key>zeromq_sink_reqrep_nopoll</key> + <category>zeromq</category> + <import>import zeromq</import> + <make>zeromq.sink_reqrep_nopoll($itemsize, $*address)</make> <!-- Make one 'param' node for every Parameter you want settable from the GUI. Sub-nodes: * name diff --git a/gr-zeromq/grc/zeromq_source_pushpull.xml b/gr-zeromq/grc/zeromq_source_pushpull.xml new file mode 100644 index 0000000000..500bf21968 --- /dev/null +++ b/gr-zeromq/grc/zeromq_source_pushpull.xml @@ -0,0 +1,58 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ PUSH/PULL Source</name> + <key>zeromq_source_pushpull</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.source_pushpull($type.itemsize, $address, $timeout)</make> + + <param> + <name>IO Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Complex</name> + <key>complex</key> + <opt>itemsize:gr.sizeof_gr_complex</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>itemsize:gr.sizeof_float</opt> + </option> + <option> + <name>Int</name> + <key>int</key> + <opt>itemsize:gr.sizeof_int</opt> + </option> + <option> + <name>Short</name> + <key>short</key> + <opt>itemsize:gr.sizeof_short</opt> + </option> + <option> + <name>Byte</name> + <key>byte</key> + <opt>itemsize:gr.sizeof_char</opt> + </option> + </param> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (sec)</name> + <key>timeout</key> + <value>0.1</value> + <type>float</type> + </param> + + <source> + <name>out</name> + <type>$type</type> + </source> + +</block> diff --git a/gr-zeromq/grc/zmqblocks_source_reqrep.xml b/gr-zeromq/grc/zeromq_source_reqrep.xml index 5a121cf572..56ac7871bf 100644 --- a/gr-zeromq/grc/zmqblocks_source_reqrep.xml +++ b/gr-zeromq/grc/zeromq_source_reqrep.xml @@ -1,10 +1,10 @@ <?xml version="1.0"?> <block> <name>source_reqrep</name> - <key>zmqblocks_source_reqrep</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.source_reqrep($itemsize, $*address)</make> + <key>zeromq_source_reqrep</key> + <category>zeromq</category> + <import>import zeromq</import> + <make>zeromq.source_reqrep($itemsize, $*address)</make> <!-- Make one 'param' node for every Parameter you want settable from the GUI. Sub-nodes: * name diff --git a/gr-zeromq/grc/zmqblocks_source_reqrep_nopoll.xml b/gr-zeromq/grc/zeromq_source_reqrep_nopoll.xml index da2604e504..88567a219c 100644 --- a/gr-zeromq/grc/zmqblocks_source_reqrep_nopoll.xml +++ b/gr-zeromq/grc/zeromq_source_reqrep_nopoll.xml @@ -1,10 +1,10 @@ <?xml version="1.0"?> <block> <name>source_reqrep_nopoll</name> - <key>zmqblocks_source_reqrep_nopoll</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.source_reqrep_nopoll($itemsize, $*address)</make> + <key>zeromq_source_reqrep_nopoll</key> + <category>zeromq</category> + <import>import zeromq</import> + <make>zeromq.source_reqrep_nopoll($itemsize, $*address)</make> <!-- Make one 'param' node for every Parameter you want settable from the GUI. Sub-nodes: * name diff --git a/gr-zeromq/grc/zmqblocks_source_pushpull_feedback.xml b/gr-zeromq/grc/zeromq_soure_pushpull_feedback.xml index e4dc715b9d..6c5cd5bbd3 100644 --- a/gr-zeromq/grc/zmqblocks_source_pushpull_feedback.xml +++ b/gr-zeromq/grc/zeromq_soure_pushpull_feedback.xml @@ -1,10 +1,10 @@ <?xml version="1.0"?> <block> <name>source_pushpull_feedback</name> - <key>zmqblocks_source_pushpull_feedback</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.source_pushpull_feedback($itemsize, $*address)</make> + <key>zeromq_source_pushpull_feedback</key> + <category>zeromq</category> + <import>import zeromq</import> + <make>zeromq.source_pushpull_feedback($itemsize, $*address)</make> <!-- Make one 'param' node for every Parameter you want settable from the GUI. Sub-nodes: * name diff --git a/gr-zeromq/grc/zmqblocks_probe_pubsub.xml b/gr-zeromq/grc/zmqblocks_probe_pubsub.xml deleted file mode 100644 index fdda610daa..0000000000 --- a/gr-zeromq/grc/zmqblocks_probe_pubsub.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0"?> -<block> - <name>probe_pubsub</name> - <key>zmqblocks_probe_pubsub</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.probe_pubsub($itemsize, $*address+)</make> - <!-- Make one 'param' node for every Parameter you want settable from the GUI. - Sub-nodes: - * name - * key (makes the value accessible as $keyname, e.g. in the make node) - * type --> - <param> - <name>...</name> - <key>...</key> - <type>...</type> - </param> - - <!-- Make one 'sink' node per input. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <sink> - <name>in</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </sink> - - <!-- Make one 'source' node per output. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <source> - <name>out</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </source> -</block> diff --git a/gr-zeromq/grc/zmqblocks_sink_pubsub.xml b/gr-zeromq/grc/zmqblocks_sink_pubsub.xml deleted file mode 100644 index 0471c0598b..0000000000 --- a/gr-zeromq/grc/zmqblocks_sink_pubsub.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0"?> -<block> - <name>sink_pubsub</name> - <key>zmqblocks_sink_pubsub</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.sink_pubsub($itemsize, $*address)</make> - <!-- Make one 'param' node for every Parameter you want settable from the GUI. - Sub-nodes: - * name - * key (makes the value accessible as $keyname, e.g. in the make node) - * type --> - <param> - <name>...</name> - <key>...</key> - <type>...</type> - </param> - - <!-- Make one 'sink' node per input. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <sink> - <name>in</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </sink> - - <!-- Make one 'source' node per output. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <source> - <name>out</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </source> -</block> diff --git a/gr-zeromq/grc/zmqblocks_sink_pushpull.xml b/gr-zeromq/grc/zmqblocks_sink_pushpull.xml deleted file mode 100644 index 437affa0cb..0000000000 --- a/gr-zeromq/grc/zmqblocks_sink_pushpull.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0"?> -<block> - <name>sink_pushpull</name> - <key>zmqblocks_sink_pushpull</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.sink_pushpull($itemsize, $*address)</make> - <!-- Make one 'param' node for every Parameter you want settable from the GUI. - Sub-nodes: - * name - * key (makes the value accessible as $keyname, e.g. in the make node) - * type --> - <param> - <name>...</name> - <key>...</key> - <type>...</type> - </param> - - <!-- Make one 'sink' node per input. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <sink> - <name>in</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </sink> - - <!-- Make one 'source' node per output. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <source> - <name>out</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </source> -</block> diff --git a/gr-zeromq/grc/zmqblocks_source_pushpull.xml b/gr-zeromq/grc/zmqblocks_source_pushpull.xml deleted file mode 100644 index 82b0420d76..0000000000 --- a/gr-zeromq/grc/zmqblocks_source_pushpull.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0"?> -<block> - <name>source_pushpull</name> - <key>zmqblocks_source_pushpull</key> - <category>zmqblocks</category> - <import>import zmqblocks</import> - <make>zmqblocks.source_pushpull($itemsize, $*address)</make> - <!-- Make one 'param' node for every Parameter you want settable from the GUI. - Sub-nodes: - * name - * key (makes the value accessible as $keyname, e.g. in the make node) - * type --> - <param> - <name>...</name> - <key>...</key> - <type>...</type> - </param> - - <!-- Make one 'sink' node per input. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <sink> - <name>in</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </sink> - - <!-- Make one 'source' node per output. Sub-nodes: - * name (an identifier for the GUI) - * type - * vlen - * optional (set to 1 for optional inputs) --> - <source> - <name>out</name> - <type><!-- e.g. int, float, complex, byte, short, xxx_vector, ...--></type> - </source> -</block> diff --git a/gr-zeromq/include/gnuradio/zeromq/sink_pubsub.h b/gr-zeromq/include/gnuradio/zeromq/sink_pubsub.h index 9f9e603f91..23aaba92a8 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sink_pubsub.h +++ b/gr-zeromq/include/gnuradio/zeromq/sink_pubsub.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -30,9 +30,15 @@ namespace gr { namespace zeromq { /*! - * \brief <+description of block+> + * \brief Sink the contents of a stream to a ZMQ PUB socket * \ingroup zeromq * + * \details + * This block acts a a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ PUB socket. A PUB socket may + * have subscribers and will pass all incoming stream data to each + * subscriber. Subscribers can be either another gr-zeromq source + * block or a non-GNU Radio ZMQ socket. */ class ZEROMQ_API sink_pubsub : virtual public gr::sync_block { @@ -42,12 +48,11 @@ namespace gr { /*! * \brief Return a shared_ptr to a new instance of zeromq::sink_pubsub. * - * To avoid accidental use of raw pointers, zeromq::sink_pubsub's - * constructor is in a private implementation - * class. zeromq::sink_pubsub::make is the public interface for - * creating new instances. + * \param itemsize Size of a stream item in bytes + * \param address ZMQ socket address specifier + * \param blocking Indicate whether blocking sends should be used, default true. */ - static sptr make(size_t itemsize, char *address); + static sptr make(size_t itemsize, char *address, bool blocking=true); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/sink_pushpull.h b/gr-zeromq/include/gnuradio/zeromq/sink_pushpull.h index a997a17d60..1ca8a15b38 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sink_pushpull.h +++ b/gr-zeromq/include/gnuradio/zeromq/sink_pushpull.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -30,9 +30,16 @@ namespace gr { namespace zeromq { /*! - * \brief <+description of block+> + * \brief Sink the contents of a stream to a ZMQ PUSH socket * \ingroup zeromq * + * \details + * This block acts a a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ PUSH socket. A PUSH socket + * will round-robin send its messages to each connected ZMQ PULL + * socket, either another gr-zeromq source block or a regular, + * non-GNU Radio ZMQ socket. + * */ class ZEROMQ_API sink_pushpull : virtual public gr::sync_block { @@ -40,14 +47,14 @@ namespace gr { typedef boost::shared_ptr<sink_pushpull> sptr; /*! - * \brief Return a shared_ptr to a new instance of zeromq::sink_pushpull. + * \brief Return a shared_ptr to a new instance of gr::zeromq::sink_pushpull + * + * \param itemsize Size of a stream item in bytes + * \param address ZMQ socket address specifier + * \param blocking Indicate whether blocking sends should be used, default true. * - * To avoid accidental use of raw pointers, zeromq::sink_pushpull's - * constructor is in a private implementation - * class. zeromq::sink_pushpull::make is the public interface for - * creating new instances. */ - static sptr make(size_t itemsize, char *address); + static sptr make(size_t itemsize, char *address, bool blocking=true); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/source_pushpull.h b/gr-zeromq/include/gnuradio/zeromq/source_pushpull.h index 32f6bd9a79..51367c1f8c 100644 --- a/gr-zeromq/include/gnuradio/zeromq/source_pushpull.h +++ b/gr-zeromq/include/gnuradio/zeromq/source_pushpull.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -30,9 +30,12 @@ namespace gr { namespace zeromq { /*! - * \brief <+description of block+> + * \brief Receive messages on ZMQ PULL socket and source stream * \ingroup zeromq * + * \details + * This block will connect to a ZMQ PUSH socket, then produce all + * incoming messages as streaming output. */ class ZEROMQ_API source_pushpull : virtual public gr::sync_block { @@ -40,14 +43,14 @@ namespace gr { typedef boost::shared_ptr<source_pushpull> sptr; /*! - * \brief Return a shared_ptr to a new instance of zeromq::source_pushpull. + * \brief Return a shared_ptr to a new instance of gr::zeromq::source_pushpull. + * + * \param itemsize Size of a stream item in bytes + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * - * To avoid accidental use of raw pointers, zeromq::source_pushpull's - * constructor is in a private implementation - * class. zeromq::source_pushpull::make is the public interface for - * creating new instances. */ - static sptr make(size_t itemsize, char *address); + static sptr make(size_t itemsize, char *address, float timeout=0.1); }; } // namespace zeromq diff --git a/gr-zeromq/lib/sink_pubsub_impl.cc b/gr-zeromq/lib/sink_pubsub_impl.cc index 0d76d992fb..5388d50a5d 100644 --- a/gr-zeromq/lib/sink_pubsub_impl.cc +++ b/gr-zeromq/lib/sink_pubsub_impl.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -31,17 +31,17 @@ namespace gr { namespace zeromq { sink_pubsub::sptr - sink_pubsub::make(size_t itemsize, char *address) + sink_pubsub::make(size_t itemsize, char *address, bool blocking) { return gnuradio::get_initial_sptr - (new sink_pubsub_impl(itemsize, address)); + (new sink_pubsub_impl(itemsize, address, blocking)); } - sink_pubsub_impl::sink_pubsub_impl(size_t itemsize, char *address) + sink_pubsub_impl::sink_pubsub_impl(size_t itemsize, char *address, bool blocking) : gr::sync_block("sink_pubsub", gr::io_signature::make(1, 1, itemsize), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize) + d_itemsize(itemsize), d_blocking(blocking) { d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); @@ -64,7 +64,7 @@ namespace gr { // create message copy and send zmq::message_t msg(d_itemsize*noutput_items); memcpy((void *)msg.data(), in, d_itemsize*noutput_items); - d_socket->send(msg); + d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; } diff --git a/gr-zeromq/lib/sink_pubsub_impl.h b/gr-zeromq/lib/sink_pubsub_impl.h index 0e55c7968c..ded41c59f7 100644 --- a/gr-zeromq/lib/sink_pubsub_impl.h +++ b/gr-zeromq/lib/sink_pubsub_impl.h @@ -33,11 +33,12 @@ namespace gr { { private: size_t d_itemsize; + bool d_blocking; zmq::context_t *d_context; zmq::socket_t *d_socket; public: - sink_pubsub_impl(size_t itemsize, char *address); + sink_pubsub_impl(size_t itemsize, char *address, bool blocking); ~sink_pubsub_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/sink_pushpull_impl.cc b/gr-zeromq/lib/sink_pushpull_impl.cc index dec03b7f91..784b4e3035 100644 --- a/gr-zeromq/lib/sink_pushpull_impl.cc +++ b/gr-zeromq/lib/sink_pushpull_impl.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -31,22 +31,22 @@ namespace gr { namespace zeromq { sink_pushpull::sptr - sink_pushpull::make(size_t itemsize, char *address) + sink_pushpull::make(size_t itemsize, char *address, bool blocking) { return gnuradio::get_initial_sptr - (new sink_pushpull_impl(itemsize, address)); + (new sink_pushpull_impl(itemsize, address, blocking)); } - sink_pushpull_impl::sink_pushpull_impl(size_t itemsize, char *address) + sink_pushpull_impl::sink_pushpull_impl(size_t itemsize, char *address, bool blocking) : gr::sync_block("sink_pushpull", gr::io_signature::make(1, 1, itemsize), gr::io_signature::make(0, 0, 0)), d_itemsize(itemsize) { + d_blocking = blocking; d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); d_socket->bind (address); - std::cout << "sink_pushpull on " << address << std::endl; } sink_pushpull_impl::~sink_pushpull_impl() @@ -63,9 +63,9 @@ namespace gr { const char *in = (const char *) input_items[0]; // create message copy and send - zmq::message_t msg(d_itemsize*noutput_items); + zmq::message_t msg(d_itemsize*noutput_items); // FIXME: make blocking optional memcpy((void *)msg.data(), in, d_itemsize*noutput_items); - d_socket->send(msg); + d_socket->send(msg, d_blocking ? 0 : ZMQ_NOBLOCK); return noutput_items; } diff --git a/gr-zeromq/lib/sink_pushpull_impl.h b/gr-zeromq/lib/sink_pushpull_impl.h index d8a4d2618b..1a84a9494d 100644 --- a/gr-zeromq/lib/sink_pushpull_impl.h +++ b/gr-zeromq/lib/sink_pushpull_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -33,11 +33,12 @@ namespace gr { { private: size_t d_itemsize; + bool d_blocking; zmq::context_t *d_context; zmq::socket_t *d_socket; public: - sink_pushpull_impl(size_t itemsize, char *address); + sink_pushpull_impl(size_t itemsize, char *address, bool blocking); ~sink_pushpull_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/source_pushpull_impl.cc b/gr-zeromq/lib/source_pushpull_impl.cc index 6a0fe2b716..db99f27f9e 100644 --- a/gr-zeromq/lib/source_pushpull_impl.cc +++ b/gr-zeromq/lib/source_pushpull_impl.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -31,18 +31,19 @@ namespace gr { namespace zeromq { source_pushpull::sptr - source_pushpull::make(size_t itemsize, char *address) + source_pushpull::make(size_t itemsize, char *address, float timeout) { return gnuradio::get_initial_sptr - (new source_pushpull_impl(itemsize, address)); + (new source_pushpull_impl(itemsize, address, timeout)); } - source_pushpull_impl::source_pushpull_impl(size_t itemsize, char *address) + source_pushpull_impl::source_pushpull_impl(size_t itemsize, char *address, float timeout) : gr::sync_block("source_pushpull", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize)), d_itemsize(itemsize) { + d_timeout = timeout >=0 ? (int)(timeout*1e6) : 0; d_context = new zmq::context_t(1); d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); d_socket->connect (address); @@ -65,7 +66,7 @@ namespace gr { char *out = (char*)output_items[0]; zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, 1000); + zmq::poll (&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { @@ -87,7 +88,7 @@ namespace gr { } } else { - return 0; + return 0; // FIXME: someday when the scheduler does all the poll/selects } } diff --git a/gr-zeromq/lib/source_pushpull_impl.h b/gr-zeromq/lib/source_pushpull_impl.h index fdb60de7da..da2afbba59 100644 --- a/gr-zeromq/lib/source_pushpull_impl.h +++ b/gr-zeromq/lib/source_pushpull_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -33,11 +33,12 @@ namespace gr { { private: size_t d_itemsize; + int d_timeout; // microseconds, -1 is blocking zmq::context_t *d_context; zmq::socket_t *d_socket; public: - source_pushpull_impl(size_t itemsize, char *address); + source_pushpull_impl(size_t itemsize, char *address, float timeout); ~source_pushpull_impl(); int work(int noutput_items, |