summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-12-29 16:01:43 +0100
committerTim O'Shea <tim.oshea753@gmail.com>2014-12-29 16:01:43 +0100
commitcd7e604d527f6324138f778dd15253d78df49350 (patch)
tree9f9deb7da50c7c921cddd0d6d450679f3b1947f7
parent232a9bc8f9e05671e54fc204ed209dc8e7948f7f (diff)
zmq: Adding zmq pub/sub blocks for message passing
-rw-r--r--gr-zeromq/grc/CMakeLists.txt2
-rw-r--r--gr-zeromq/grc/zeromq_pub_msg_sink.xml28
-rw-r--r--gr-zeromq/grc/zeromq_sub_msg_source.xml28
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h60
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h58
-rw-r--r--gr-zeromq/lib/CMakeLists.txt2
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.cc88
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.h52
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.cc115
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.h57
-rw-r--r--gr-zeromq/swig/zeromq_swig.i6
12 files changed, 498 insertions, 0 deletions
diff --git a/gr-zeromq/grc/CMakeLists.txt b/gr-zeromq/grc/CMakeLists.txt
index 548c2f28ed..95d7d6c3e9 100644
--- a/gr-zeromq/grc/CMakeLists.txt
+++ b/gr-zeromq/grc/CMakeLists.txt
@@ -19,7 +19,9 @@
install(FILES
zeromq_pub_sink.xml
+ zeromq_pub_msg_sink.xml
zeromq_sub_source.xml
+ zeromq_sub_msg_source.xml
zeromq_push_sink.xml
zeromq_pull_source.xml
zeromq_rep_sink.xml
diff --git a/gr-zeromq/grc/zeromq_pub_msg_sink.xml b/gr-zeromq/grc/zeromq_pub_msg_sink.xml
new file mode 100644
index 0000000000..8f541ce731
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_pub_msg_sink.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ PUB Message Sink</name>
+ <key>zeromq_pub_msg_sink</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.pub_msg_sink($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <sink>
+ <name>in</name>
+ <type>message</type>
+ <optional>1</optional>
+ </sink>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_sub_msg_source.xml b/gr-zeromq/grc/zeromq_sub_msg_source.xml
new file mode 100644
index 0000000000..32a1c9862b
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_sub_msg_source.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ SUB Message Source</name>
+ <key>zeromq_sub_msg_source</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.sub_msg_source($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <source>
+ <name>out</name>
+ <type>message</type>
+ <optional>1</optional>
+ </source>
+
+</block>
diff --git a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt
index 970cf1ce9a..c8d155f1d8 100644
--- a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt
+++ b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt
@@ -23,7 +23,9 @@
install(FILES
api.h
pub_sink.h
+ pub_msg_sink.h
sub_source.h
+ sub_msg_source.h
pull_source.h
push_sink.h
rep_sink.h
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
new file mode 100644
index 0000000000..d626ddd147
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
@@ -0,0 +1,60 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PUB_MSG_SINK_H
+#define INCLUDED_ZEROMQ_PUB_MSG_SINK_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Sink the contents of a stream to a ZMQ PUB socket
+ * \ingroup zeromq
+ *
+ * \details
+ * This block acts a a streaming sink for a GNU Radio flowgraph
+ * and writes its contents to a ZMQ PUB socket. A PUB socket may
+ * have subscribers and will pass all incoming stream data to each
+ * subscriber. Subscribers can be either another gr-zeromq source
+ * block or a non-GNU Radio ZMQ socket.
+ */
+ class ZEROMQ_API pub_msg_sink : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<pub_msg_sink> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of zeromq::pub_msg_sink.
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
new file mode 100644
index 0000000000..3af8fdbec9
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
@@ -0,0 +1,58 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H
+#define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Receive messages on ZMQ SUB socket and source stream
+ * \ingroup zeromq
+ *
+ * \details
+ * This block will connect to a ZMQ PUB socket, then produce all
+ * incoming messages as streaming output.
+ */
+ class ZEROMQ_API sub_msg_source : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<sub_msg_source> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of gr::zeromq::sub_msg_source.
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ *
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H */
diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt
index b04754e353..0964d819c2 100644
--- a/gr-zeromq/lib/CMakeLists.txt
+++ b/gr-zeromq/lib/CMakeLists.txt
@@ -38,7 +38,9 @@ endif(ENABLE_GR_CTRLPORT)
########################################################################
list(APPEND zeromq_sources
pub_sink_impl.cc
+ pub_msg_sink_impl.cc
sub_source_impl.cc
+ sub_msg_source_impl.cc
pull_source_impl.cc
push_sink_impl.cc
rep_sink_impl.cc
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc
new file mode 100644
index 0000000000..0264aef83b
--- /dev/null
+++ b/gr-zeromq/lib/pub_msg_sink_impl.cc
@@ -0,0 +1,88 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "pub_msg_sink_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ pub_msg_sink::sptr
+ pub_msg_sink::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new pub_msg_sink_impl(address, timeout));
+ }
+
+ pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout)
+ : gr::sync_block("pub_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind(address);
+
+ message_port_register_in(pmt::mp("in"));
+ set_msg_handler( pmt::mp("in"),
+ boost::bind(&pub_msg_sink_impl::handler, this, _1));
+ }
+
+ pub_msg_sink_impl::~pub_msg_sink_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ void pub_msg_sink_impl::handler(pmt::pmt_t msg){
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ d_socket->send(zmsg);
+ }
+
+ int
+ pub_msg_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h
new file mode 100644
index 0000000000..4bbf7ee944
--- /dev/null
+++ b/gr-zeromq/lib/pub_msg_sink_impl.h
@@ -0,0 +1,52 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H
+#define INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H
+
+#include <gnuradio/zeromq/pub_msg_sink.h>
+#include <zmq.hpp>
+
+namespace gr {
+ namespace zeromq {
+
+ class pub_msg_sink_impl : public pub_msg_sink
+ {
+ private:
+ float d_timeout;
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+
+ public:
+ pub_msg_sink_impl(char *address, int timeout);
+ ~pub_msg_sink_impl();
+
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ void handler(pmt::pmt_t msg);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc
new file mode 100644
index 0000000000..1c63f2d97c
--- /dev/null
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -0,0 +1,115 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "sub_msg_source_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ sub_msg_source::sptr
+ sub_msg_source::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new sub_msg_source_impl(address, timeout));
+ }
+
+ sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout)
+ : gr::sync_block("sub_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
+ //int time = 0;
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket->connect (address);
+
+ message_port_register_out(pmt::mp("out"));
+ }
+
+ sub_msg_source_impl::~sub_msg_source_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ bool sub_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &sub_msg_source_impl::readloop , this ) );
+ }
+
+ bool sub_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ }
+
+ void sub_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
+ int
+ sub_msg_source_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h
new file mode 100644
index 0000000000..0db544038a
--- /dev/null
+++ b/gr-zeromq/lib/sub_msg_source_impl.h
@@ -0,0 +1,57 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H
+#define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H
+
+#include <gnuradio/zeromq/sub_msg_source.h>
+#include "zmq.hpp"
+
+namespace gr {
+ namespace zeromq {
+
+ class sub_msg_source_impl : public sub_msg_source
+ {
+ private:
+ int d_timeout; // microseconds, -1 is blocking
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+ void readloop();
+ boost::thread *d_thread;
+
+ public:
+ sub_msg_source_impl(char *address, int timeout);
+ ~sub_msg_source_impl();
+
+ bool start();
+ bool stop();
+ bool d_finished;
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H */
+
diff --git a/gr-zeromq/swig/zeromq_swig.i b/gr-zeromq/swig/zeromq_swig.i
index bcd90550d4..c9a891d9a3 100644
--- a/gr-zeromq/swig/zeromq_swig.i
+++ b/gr-zeromq/swig/zeromq_swig.i
@@ -29,23 +29,29 @@
%{
#include "gnuradio/zeromq/pub_sink.h"
+#include "gnuradio/zeromq/pub_msg_sink.h"
#include "gnuradio/zeromq/push_sink.h"
#include "gnuradio/zeromq/rep_sink.h"
#include "gnuradio/zeromq/sub_source.h"
+#include "gnuradio/zeromq/sub_msg_source.h"
#include "gnuradio/zeromq/pull_source.h"
#include "gnuradio/zeromq/req_source.h"
%}
%include "gnuradio/zeromq/pub_sink.h"
+%include "gnuradio/zeromq/pub_msg_sink.h"
%include "gnuradio/zeromq/push_sink.h"
%include "gnuradio/zeromq/rep_sink.h"
%include "gnuradio/zeromq/sub_source.h"
+%include "gnuradio/zeromq/sub_msg_source.h"
%include "gnuradio/zeromq/pull_source.h"
%include "gnuradio/zeromq/req_source.h"
GR_SWIG_BLOCK_MAGIC2(zeromq, pub_sink);
+GR_SWIG_BLOCK_MAGIC2(zeromq, pub_msg_sink);
GR_SWIG_BLOCK_MAGIC2(zeromq, push_sink);
GR_SWIG_BLOCK_MAGIC2(zeromq, rep_sink);
GR_SWIG_BLOCK_MAGIC2(zeromq, sub_source);
+GR_SWIG_BLOCK_MAGIC2(zeromq, sub_msg_source);
GR_SWIG_BLOCK_MAGIC2(zeromq, pull_source);
GR_SWIG_BLOCK_MAGIC2(zeromq, req_source);