summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 09:22:44 -0800
committerJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 09:22:44 -0800
commitc13049f2fbe51dcb6e655b11e172514a3fc085b4 (patch)
tree81a87d6bacda9b120fa5c75ec40ec285c86a18bf
parent5cb307e6f347cf2cb15990848040f7c743c271e5 (diff)
parent18944a9a761eb7c2256e4ad450b943f09664c410 (diff)
Merge remote-tracking branch 'osh/zmqtags' into zmq_tags
-rw-r--r--gr-zeromq/grc/CMakeLists.txt6
-rw-r--r--gr-zeromq/grc/zeromq_pub_msg_sink.xml28
-rw-r--r--gr-zeromq/grc/zeromq_pub_sink.xml9
-rw-r--r--gr-zeromq/grc/zeromq_pull_msg_source.xml28
-rw-r--r--gr-zeromq/grc/zeromq_pull_source.xml9
-rw-r--r--gr-zeromq/grc/zeromq_push_msg_sink.xml28
-rw-r--r--gr-zeromq/grc/zeromq_push_sink.xml9
-rw-r--r--gr-zeromq/grc/zeromq_rep_msg_sink.xml28
-rw-r--r--gr-zeromq/grc/zeromq_rep_sink.xml9
-rw-r--r--gr-zeromq/grc/zeromq_req_msg_source.xml28
-rw-r--r--gr-zeromq/grc/zeromq_req_source.xml9
-rw-r--r--gr-zeromq/grc/zeromq_sub_msg_source.xml28
-rw-r--r--gr-zeromq/grc/zeromq_sub_source.xml9
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt6
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h60
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pub_sink.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h58
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pull_source.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h62
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/push_sink.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h60
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/rep_sink.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/req_msg_source.h59
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/req_source.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h58
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/sub_source.h2
-rw-r--r--gr-zeromq/lib/CMakeLists.txt7
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.cc88
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.h52
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc25
-rw-r--r--gr-zeromq/lib/pub_sink_impl.h5
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.cc118
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.h56
-rw-r--r--gr-zeromq/lib/pull_source_impl.cc31
-rw-r--r--gr-zeromq/lib/pull_source_impl.h3
-rw-r--r--gr-zeromq/lib/push_msg_sink_impl.cc119
-rw-r--r--gr-zeromq/lib/push_msg_sink_impl.h52
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc30
-rw-r--r--gr-zeromq/lib/push_sink_impl.h3
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.cc141
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.h56
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc38
-rw-r--r--gr-zeromq/lib/rep_sink_impl.h5
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc132
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.h56
-rw-r--r--gr-zeromq/lib/req_source_impl.cc29
-rw-r--r--gr-zeromq/lib/req_source_impl.h5
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.cc117
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.h56
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc36
-rw-r--r--gr-zeromq/lib/sub_source_impl.h3
-rw-r--r--gr-zeromq/lib/tag_headers.cc101
-rw-r--r--gr-zeromq/lib/tag_headers.h40
-rw-r--r--gr-zeromq/swig/zeromq_swig.i18
54 files changed, 1948 insertions, 77 deletions
diff --git a/gr-zeromq/grc/CMakeLists.txt b/gr-zeromq/grc/CMakeLists.txt
index 548c2f28ed..7807bcfe9d 100644
--- a/gr-zeromq/grc/CMakeLists.txt
+++ b/gr-zeromq/grc/CMakeLists.txt
@@ -19,11 +19,17 @@
install(FILES
zeromq_pub_sink.xml
+ zeromq_pub_msg_sink.xml
zeromq_sub_source.xml
+ zeromq_sub_msg_source.xml
zeromq_push_sink.xml
+ zeromq_push_msg_sink.xml
zeromq_pull_source.xml
+ zeromq_pull_msg_source.xml
zeromq_rep_sink.xml
+ zeromq_rep_msg_sink.xml
zeromq_req_source.xml
+ zeromq_req_msg_source.xml
DESTINATION share/gnuradio/grc/blocks
)
diff --git a/gr-zeromq/grc/zeromq_pub_msg_sink.xml b/gr-zeromq/grc/zeromq_pub_msg_sink.xml
new file mode 100644
index 0000000000..8f541ce731
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_pub_msg_sink.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ PUB Message Sink</name>
+ <key>zeromq_pub_msg_sink</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.pub_msg_sink($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <sink>
+ <name>in</name>
+ <type>message</type>
+ <optional>1</optional>
+ </sink>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_pub_sink.xml b/gr-zeromq/grc/zeromq_pub_sink.xml
index e78cd58b14..7babc9eb80 100644
--- a/gr-zeromq/grc/zeromq_pub_sink.xml
+++ b/gr-zeromq/grc/zeromq_pub_sink.xml
@@ -4,7 +4,7 @@
<key>zeromq_pub_sink</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout)</make>
+ <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
<param>
<name>IO Type</name>
@@ -57,6 +57,13 @@
<type>float</type>
</param>
+ <param>
+ <name>Pass Tags</name>
+ <key>pass_tags</key>
+ <value>False</value>
+ <type>bool</type>
+ </param>
+
<sink>
<name>in</name>
<type>$type</type>
diff --git a/gr-zeromq/grc/zeromq_pull_msg_source.xml b/gr-zeromq/grc/zeromq_pull_msg_source.xml
new file mode 100644
index 0000000000..c0a6ca5a17
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_pull_msg_source.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ PULL Message Source</name>
+ <key>zeromq_pull_msg_source</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.pull_msg_source($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <source>
+ <name>out</name>
+ <type>message</type>
+ <optional>1</optional>
+ </source>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_pull_source.xml b/gr-zeromq/grc/zeromq_pull_source.xml
index 3833cd7341..c8a7b890da 100644
--- a/gr-zeromq/grc/zeromq_pull_source.xml
+++ b/gr-zeromq/grc/zeromq_pull_source.xml
@@ -4,7 +4,7 @@
<key>zeromq_pull_source</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout)</make>
+ <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
<param>
<name>IO Type</name>
@@ -57,6 +57,13 @@
<type>float</type>
</param>
+ <param>
+ <name>Pass Tags</name>
+ <key>pass_tags</key>
+ <value>False</value>
+ <type>bool</type>
+ </param>
+
<source>
<name>out</name>
<type>$type</type>
diff --git a/gr-zeromq/grc/zeromq_push_msg_sink.xml b/gr-zeromq/grc/zeromq_push_msg_sink.xml
new file mode 100644
index 0000000000..65626c0761
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_push_msg_sink.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ PUSH Message Sink</name>
+ <key>zeromq_push_msg_sink</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.push_msg_sink($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <sink>
+ <name>in</name>
+ <type>message</type>
+ <optional>1</optional>
+ </sink>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_push_sink.xml b/gr-zeromq/grc/zeromq_push_sink.xml
index 54bb8d0d98..eb6ead5c65 100644
--- a/gr-zeromq/grc/zeromq_push_sink.xml
+++ b/gr-zeromq/grc/zeromq_push_sink.xml
@@ -4,7 +4,7 @@
<key>zeromq_push_sink</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout)</make>
+ <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
<param>
<name>IO Type</name>
@@ -57,6 +57,13 @@
<type>float</type>
</param>
+ <param>
+ <name>Pass Tags</name>
+ <key>pass_tags</key>
+ <value>False</value>
+ <type>bool</type>
+ </param>
+
<sink>
<name>in</name>
<type>$type</type>
diff --git a/gr-zeromq/grc/zeromq_rep_msg_sink.xml b/gr-zeromq/grc/zeromq_rep_msg_sink.xml
new file mode 100644
index 0000000000..f978f442a7
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_rep_msg_sink.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ REP Message Sink</name>
+ <key>zeromq_rep_msg_sink</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.rep_msg_sink($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <sink>
+ <name>in</name>
+ <type>message</type>
+ <optional>1</optional>
+ </sink>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml b/gr-zeromq/grc/zeromq_rep_sink.xml
index 1320bfef82..2209b4f3ff 100644
--- a/gr-zeromq/grc/zeromq_rep_sink.xml
+++ b/gr-zeromq/grc/zeromq_rep_sink.xml
@@ -4,7 +4,7 @@
<key>zeromq_rep_sink</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout)</make>
+ <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
<param>
<name>IO Type</name>
@@ -57,6 +57,13 @@
<type>float</type>
</param>
+ <param>
+ <name>Pass Tags</name>
+ <key>pass_tags</key>
+ <value>False</value>
+ <type>bool</type>
+ </param>
+
<sink>
<name>in</name>
<type>$type</type>
diff --git a/gr-zeromq/grc/zeromq_req_msg_source.xml b/gr-zeromq/grc/zeromq_req_msg_source.xml
new file mode 100644
index 0000000000..3ba7488223
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_req_msg_source.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ REQ Message Source</name>
+ <key>zeromq_req_msg_source</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.req_msg_source($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <source>
+ <name>out</name>
+ <type>message</type>
+ <optional>1</optional>
+ </source>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_req_source.xml b/gr-zeromq/grc/zeromq_req_source.xml
index d2951d2256..050718c1bf 100644
--- a/gr-zeromq/grc/zeromq_req_source.xml
+++ b/gr-zeromq/grc/zeromq_req_source.xml
@@ -4,7 +4,7 @@
<key>zeromq_req_source</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout)</make>
+ <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
<param>
<name>IO Type</name>
@@ -57,6 +57,13 @@
<type>float</type>
</param>
+ <param>
+ <name>Pass Tags</name>
+ <key>pass_tags</key>
+ <value>False</value>
+ <type>bool</type>
+ </param>
+
<source>
<name>out</name>
<type>$type</type>
diff --git a/gr-zeromq/grc/zeromq_sub_msg_source.xml b/gr-zeromq/grc/zeromq_sub_msg_source.xml
new file mode 100644
index 0000000000..32a1c9862b
--- /dev/null
+++ b/gr-zeromq/grc/zeromq_sub_msg_source.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<block>
+ <name>ZMQ SUB Message Source</name>
+ <key>zeromq_sub_msg_source</key>
+ <category>ZeroMQ Interfaces</category>
+ <import>from gnuradio import zeromq</import>
+ <make>zeromq.sub_msg_source($address, $timeout)</make>
+
+ <param>
+ <name>Address</name>
+ <key>address</key>
+ <type>string</type>
+ </param>
+
+ <param>
+ <name>Timeout (msec)</name>
+ <key>timeout</key>
+ <value>100</value>
+ <type>float</type>
+ </param>
+
+ <source>
+ <name>out</name>
+ <type>message</type>
+ <optional>1</optional>
+ </source>
+
+</block>
diff --git a/gr-zeromq/grc/zeromq_sub_source.xml b/gr-zeromq/grc/zeromq_sub_source.xml
index 2ec8cfa887..86af5063e3 100644
--- a/gr-zeromq/grc/zeromq_sub_source.xml
+++ b/gr-zeromq/grc/zeromq_sub_source.xml
@@ -4,7 +4,7 @@
<key>zeromq_sub_source</key>
<category>ZeroMQ Interfaces</category>
<import>from gnuradio import zeromq</import>
- <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout)</make>
+ <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make>
<param>
<name>IO Type</name>
@@ -57,6 +57,13 @@
<type>float</type>
</param>
+ <param>
+ <name>Pass Tags</name>
+ <key>pass_tags</key>
+ <value>False</value>
+ <type>bool</type>
+ </param>
+
<source>
<name>out</name>
<type>$type</type>
diff --git a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt
index 970cf1ce9a..d365532003 100644
--- a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt
+++ b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt
@@ -23,11 +23,17 @@
install(FILES
api.h
pub_sink.h
+ pub_msg_sink.h
sub_source.h
+ sub_msg_source.h
pull_source.h
+ pull_msg_source.h
push_sink.h
+ push_msg_sink.h
rep_sink.h
+ rep_msg_sink.h
req_source.h
+ req_msg_source.h
DESTINATION ${GR_INCLUDE_DIR}/gnuradio/zeromq
COMPONENT "zeromq_devel"
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
new file mode 100644
index 0000000000..d626ddd147
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h
@@ -0,0 +1,60 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PUB_MSG_SINK_H
+#define INCLUDED_ZEROMQ_PUB_MSG_SINK_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Sink the contents of a stream to a ZMQ PUB socket
+ * \ingroup zeromq
+ *
+ * \details
+ * This block acts a a streaming sink for a GNU Radio flowgraph
+ * and writes its contents to a ZMQ PUB socket. A PUB socket may
+ * have subscribers and will pass all incoming stream data to each
+ * subscriber. Subscribers can be either another gr-zeromq source
+ * block or a non-GNU Radio ZMQ socket.
+ */
+ class ZEROMQ_API pub_msg_sink : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<pub_msg_sink> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of zeromq::pub_msg_sink.
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
index a60fb15c88..11f251ede2 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
@@ -53,7 +53,7 @@ namespace gr {
* \param address ZMQ socket address specifier
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
new file mode 100644
index 0000000000..7d87ba1574
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h
@@ -0,0 +1,58 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H
+#define INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Receive messages on ZMQ PULL socket and source stream
+ * \ingroup zeromq
+ *
+ * \details
+ * This block will connect to a ZMQ PUSH socket, then produce all
+ * incoming messages as streaming output.
+ */
+ class ZEROMQ_API pull_msg_source : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<pull_msg_source> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of gr::zeromq::pull_msg_source.
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ *
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
index 5c1d37d353..6dec9c2b34 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h
@@ -51,7 +51,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h
new file mode 100644
index 0000000000..27a19b210b
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h
@@ -0,0 +1,62 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PUSH_MSG_SINK_H
+#define INCLUDED_ZEROMQ_PUSH_MSG_SINK_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Sink the contents of a stream to a ZMQ PUSH socket
+ * \ingroup zeromq
+ *
+ * \details
+ * This block acts a a streaming sink for a GNU Radio flowgraph
+ * and writes its contents to a ZMQ PUSH socket. A PUSH socket
+ * will round-robin send its messages to each connected ZMQ PULL
+ * socket, either another gr-zeromq source block or a regular,
+ * non-GNU Radio ZMQ socket.
+ *
+ */
+ class ZEROMQ_API push_msg_sink : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<push_msg_sink> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of gr::zeromq::push_msg_sink
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ *
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PUSH_MSG_SINK_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
index b54a1e40d8..1b8999e409 100644
--- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h
@@ -55,7 +55,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
new file mode 100644
index 0000000000..b0fd23c892
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h
@@ -0,0 +1,60 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_REP_MSG_SINK_H
+#define INCLUDED_ZEROMQ_REP_MSG_SINK_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Sink the contents of a stream to a ZMQ REP socket
+ * \ingroup zeromq
+ *
+ * \details
+ * This block acts a a streaming sink for a GNU Radio flowgraph
+ * and writes its contents to a ZMQ REP socket. A REP socket will
+ * only send its contents to an attached REQ socket when it
+ * requests items.
+ */
+ class ZEROMQ_API rep_msg_sink : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<rep_msg_sink> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of zeromq::rep_msg_sink.
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ *
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_REP_MSG_SINK_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
index 1da325257f..6d3c47b626 100644
--- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h
@@ -53,7 +53,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
new file mode 100644
index 0000000000..cf9183375f
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
@@ -0,0 +1,59 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H
+#define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Receive messages on ZMQ REQ socket and source stream
+ * \ingroup zeromq
+ *
+ * \details
+ * This block will connect to a ZMQ REP socket, then produce all
+ * incoming messages as streaming output.
+ */
+ class ZEROMQ_API req_msg_source : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<req_msg_source> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of zeromq::req_msg_source.
+ *
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ *
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_SOURCE_REQREP_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h b/gr-zeromq/include/gnuradio/zeromq/req_source.h
index c272c28510..d9f55d3c72 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h
@@ -52,7 +52,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
new file mode 100644
index 0000000000..3af8fdbec9
--- /dev/null
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h
@@ -0,0 +1,58 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H
+#define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H
+
+#include <gnuradio/zeromq/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace zeromq {
+
+ /*!
+ * \brief Receive messages on ZMQ SUB socket and source stream
+ * \ingroup zeromq
+ *
+ * \details
+ * This block will connect to a ZMQ PUB socket, then produce all
+ * incoming messages as streaming output.
+ */
+ class ZEROMQ_API sub_msg_source : virtual public gr::sync_block
+ {
+ public:
+ typedef boost::shared_ptr<sub_msg_source> sptr;
+
+ /*!
+ * \brief Return a shared_ptr to a new instance of gr::zeromq::sub_msg_source.
+ *
+ * \param address ZMQ socket address specifier
+ * \param timeout Receive timeout in seconds, default is 100ms, 1us increments
+ *
+ */
+ static sptr make(char *address, int timeout=100);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H */
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
index 9deaa7f3ff..f97dc5ac2c 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
@@ -51,7 +51,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt
index 90cbb12451..941e5ff12b 100644
--- a/gr-zeromq/lib/CMakeLists.txt
+++ b/gr-zeromq/lib/CMakeLists.txt
@@ -38,11 +38,18 @@ endif(ENABLE_GR_CTRLPORT)
########################################################################
list(APPEND zeromq_sources
pub_sink_impl.cc
+ pub_msg_sink_impl.cc
sub_source_impl.cc
+ sub_msg_source_impl.cc
pull_source_impl.cc
+ pull_msg_source_impl.cc
push_sink_impl.cc
+ push_msg_sink_impl.cc
rep_sink_impl.cc
+ rep_msg_sink_impl.cc
req_source_impl.cc
+ req_msg_source_impl.cc
+ tag_headers.cc
)
#Add Windows DLL resource file if using MSVC
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc
new file mode 100644
index 0000000000..0264aef83b
--- /dev/null
+++ b/gr-zeromq/lib/pub_msg_sink_impl.cc
@@ -0,0 +1,88 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "pub_msg_sink_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ pub_msg_sink::sptr
+ pub_msg_sink::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new pub_msg_sink_impl(address, timeout));
+ }
+
+ pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout)
+ : gr::sync_block("pub_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind(address);
+
+ message_port_register_in(pmt::mp("in"));
+ set_msg_handler( pmt::mp("in"),
+ boost::bind(&pub_msg_sink_impl::handler, this, _1));
+ }
+
+ pub_msg_sink_impl::~pub_msg_sink_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ void pub_msg_sink_impl::handler(pmt::pmt_t msg){
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ d_socket->send(zmsg);
+ }
+
+ int
+ pub_msg_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h
new file mode 100644
index 0000000000..4bbf7ee944
--- /dev/null
+++ b/gr-zeromq/lib/pub_msg_sink_impl.h
@@ -0,0 +1,52 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H
+#define INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H
+
+#include <gnuradio/zeromq/pub_msg_sink.h>
+#include <zmq.hpp>
+
+namespace gr {
+ namespace zeromq {
+
+ class pub_msg_sink_impl : public pub_msg_sink
+ {
+ private:
+ float d_timeout;
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+
+ public:
+ pub_msg_sink_impl(char *address, int timeout);
+ ~pub_msg_sink_impl();
+
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ void handler(pmt::pmt_t msg);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 13f86045d7..5afcb722b1 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "pub_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
pub_sink::sptr
- pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new pub_sink_impl(itemsize, vlen, address, timeout));
+ (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("pub_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -69,9 +70,21 @@ namespace gr {
{
const char *in = (const char *)input_items[0];
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
+
// create message copy and send
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+ //std::cout << "PUB: Header Len: " << header.length() << ", Data Length: " << d_itemsize*d_vlen*noutput_items << "\n";
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
d_socket->send(msg);
return noutput_items;
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 9c956ef2fa..100b0f5b8c 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2013 Free Software Foundation, Inc.
+ * Copyright 2013,2014 Free Software Foundation, Inc.
*
* This file is part of GNU Radio.
*
@@ -37,9 +37,10 @@ namespace gr {
float d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags);
~pub_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc
new file mode 100644
index 0000000000..0c848fc771
--- /dev/null
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -0,0 +1,118 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "pull_msg_source_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ pull_msg_source::sptr
+ pull_msg_source::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new pull_msg_source_impl(address, timeout));
+ }
+
+ pull_msg_source_impl::pull_msg_source_impl(char *address, int timeout)
+ : gr::sync_block("pull_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->connect (address);
+
+ message_port_register_out(pmt::mp("out"));
+ }
+
+ pull_msg_source_impl::~pull_msg_source_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ bool pull_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &pull_msg_source_impl::readloop , this ) );
+ return true;
+ }
+
+ bool pull_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+ void pull_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
+
+ int
+ pull_msg_source_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h b/gr-zeromq/lib/pull_msg_source_impl.h
new file mode 100644
index 0000000000..9ff89ef154
--- /dev/null
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -0,0 +1,56 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H
+#define INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H
+
+#include <gnuradio/zeromq/pull_msg_source.h>
+#include <zmq.hpp>
+
+namespace gr {
+ namespace zeromq {
+
+ class pull_msg_source_impl : public pull_msg_source
+ {
+ private:
+ int d_timeout; // microseconds, -1 is blocking
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+ void readloop();
+ boost::thread *d_thread;
+
+ public:
+ pull_msg_source_impl(char *address, int timeout);
+ ~pull_msg_source_impl();
+
+ bool start();
+ bool stop();
+ bool d_finished;
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc
index 479b15bb32..87b330a862 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "pull_source_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
pull_source::sptr
- pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new pull_source_impl(itemsize, vlen, address, timeout));
+ (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags));
}
- pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("pull_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -81,16 +82,30 @@ namespace gr {
// Receive data
zmq::message_t msg;
d_socket->recv(&msg);
+
+ // check header for tags...
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ if(d_pass_tags){
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ for(size_t i=0; i<tags.size(); i++){
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
+
+
// Copy to ouput buffer and return
- if (msg.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items);
+ if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
+ memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
return noutput_items;
}
else {
- memcpy(out, (void *)msg.data(), msg.size());
+ memcpy(out, (void *)&buf[0], buf.size());
- return msg.size()/(d_itemsize*d_vlen);
+ return buf.size()/(d_itemsize*d_vlen);
}
}
else {
diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h
index e69de81dcd..757867998d 100644
--- a/gr-zeromq/lib/pull_source_impl.h
+++ b/gr-zeromq/lib/pull_source_impl.h
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout; // microseconds, -1 is blocking
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags);
~pull_source_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc
new file mode 100644
index 0000000000..6266cd6a57
--- /dev/null
+++ b/gr-zeromq/lib/push_msg_sink_impl.cc
@@ -0,0 +1,119 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "push_msg_sink_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ push_msg_sink::sptr
+ push_msg_sink::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new push_msg_sink_impl(address, timeout));
+ }
+
+ push_msg_sink_impl::push_msg_sink_impl(char *address, int timeout)
+ : gr::sync_block("push_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind (address);
+
+ message_port_register_in(pmt::mp("in"));
+ set_msg_handler( pmt::mp("in"),
+ boost::bind(&push_msg_sink_impl::handler, this, _1));
+ }
+
+ push_msg_sink_impl::~push_msg_sink_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ void push_msg_sink_impl::handler(pmt::pmt_t msg){
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ d_socket->send(zmsg);
+ }
+
+ int
+ push_msg_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+
+/* const char *in = (const char *) input_items[0];
+
+ zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+ zmq::poll (&itemsout[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (itemsout[0].revents & ZMQ_POLLOUT) {
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
+
+ // create message copy and send
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
+ d_socket->send(msg);
+
+ return noutput_items;
+ }
+ else {
+ return 0;
+ }*/
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h
new file mode 100644
index 0000000000..b77c998506
--- /dev/null
+++ b/gr-zeromq/lib/push_msg_sink_impl.h
@@ -0,0 +1,52 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_PUSH_MSG_SINK_IMPL_H
+#define INCLUDED_ZEROMQ_PUSH_MSG_SINK_IMPL_H
+
+#include <gnuradio/zeromq/push_msg_sink.h>
+#include <zmq.hpp>
+
+namespace gr {
+ namespace zeromq {
+
+ class push_msg_sink_impl : public push_msg_sink
+ {
+ private:
+ float d_timeout;
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+
+ public:
+ push_msg_sink_impl(char *address, int timeout);
+ ~push_msg_sink_impl();
+
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ void handler(pmt::pmt_t msg);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_ZMQ_PUSH_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index d949a7f95f..4cc9ab9c2a 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "push_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout));
+ (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("push_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -74,10 +75,23 @@ namespace gr {
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // create message copy and send
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
+
+ // create message copy and send
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
+ d_socket->send(msg);
return noutput_items;
}
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 9a10065eba..924dee3f15 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
float d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags);
~push_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc
new file mode 100644
index 0000000000..0a18a8b3d2
--- /dev/null
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -0,0 +1,141 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "rep_msg_sink_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ rep_msg_sink::sptr
+ rep_msg_sink::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new rep_msg_sink_impl(address, timeout));
+ }
+
+ rep_msg_sink_impl::rep_msg_sink_impl(char *address, int timeout)
+ : gr::sync_block("rep_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind (address);
+
+ message_port_register_in(pmt::mp("in"));
+// set_msg_handler( pmt::mp("in"),
+// boost::bind(&rep_msg_sink_impl::handler, this, _1));
+ }
+
+ rep_msg_sink_impl::~rep_msg_sink_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ bool rep_msg_sink_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop , this ) );
+ return true;
+ }
+
+ bool rep_msg_sink_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+/*
+ void rep_msg_sink_impl::handler(pmt::pmt_t msg){
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ d_socket->send(zmsg);
+ }
+*/
+
+ int
+ rep_msg_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ void rep_msg_sink_impl::readloop(){
+
+ while(!d_finished){
+
+ // while we have data, wait for query...
+ while(!empty_p(pmt::mp("in"))){
+
+ //std::cout << "wait for req ...\n";
+ // wait for query...
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+ //std::cout << "wait for req ... got req\n";
+ // receive data request
+ zmq::message_t request;
+ d_socket->recv(&request);
+ int req_output_items = *(static_cast<int*>(request.data()));
+ if(req_output_items != 1)
+ throw std::runtime_error("Request was not 1 msg for rep/req request!!");
+
+ // create message copy and send
+ //std::cout << "get pmt in\n";
+ pmt::pmt_t msg = delete_head_nowait(pmt::mp("in"));
+ std::stringbuf sb("");
+ pmt::serialize( msg, sb );
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy( zmsg.data(), s.c_str(), s.size() );
+ //std::cout << "send pmt zmq\n";
+ d_socket->send(zmsg);
+ } // if req
+
+ } // while !empty
+
+ } // while !d_finished
+
+ }
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h
new file mode 100644
index 0000000000..25bd0e87a0
--- /dev/null
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -0,0 +1,56 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H
+#define INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H
+
+#include <gnuradio/zeromq/rep_msg_sink.h>
+#include <zmq.hpp>
+
+namespace gr {
+ namespace zeromq {
+
+ class rep_msg_sink_impl : public rep_msg_sink
+ {
+ private:
+ int d_timeout;
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+ boost::thread *d_thread;
+ bool d_finished;
+ void readloop();
+
+ public:
+ rep_msg_sink_impl(char *address, int timeout);
+ ~rep_msg_sink_impl();
+
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ bool start();
+ bool stop();
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index a8fd5881e5..88ed6c11c0 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "rep_sink_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout));
+ (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("rep_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -78,22 +79,25 @@ namespace gr {
zmq::message_t request;
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
+ int nitems_send = std::min(noutput_items, req_output_items);
+
+ // encode the current offset, # tags, and tags into header
+ std::string header("");
+ if(d_pass_tags){
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
// create message copy and send
- if (noutput_items < req_output_items) {
- zmq::message_t msg(d_itemsize*d_vlen*noutput_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- d_socket->send(msg);
+ zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send);
+ d_socket->send(msg);
- return noutput_items;
- }
- else {
- zmq::message_t msg(d_itemsize*d_vlen*req_output_items);
- memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items);
- d_socket->send(msg);
-
- return req_output_items;
- }
+ return nitems_send;
}
return 0;
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index ff69735757..55ebb69bfa 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2013,204 Free Software Foundation, Inc.
+ * Copyright 2013,2014 Free Software Foundation, Inc.
*
* This file is part of GNU Radio.
*
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags);
~rep_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
new file mode 100644
index 0000000000..2dda15208f
--- /dev/null
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -0,0 +1,132 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "req_msg_source_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ req_msg_source::sptr
+ req_msg_source::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new req_msg_source_impl(address, timeout));
+ }
+
+ req_msg_source_impl::req_msg_source_impl(char *address, int timeout)
+ : gr::sync_block("req_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->connect (address);
+
+ message_port_register_out(pmt::mp("out"));
+ }
+
+ req_msg_source_impl::~req_msg_source_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ bool req_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &req_msg_source_impl::readloop , this ) );
+ return true;
+ }
+
+ bool req_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+ void req_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
+ zmq::poll (&itemsout[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (itemsout[0].revents & ZMQ_POLLOUT) {
+ // Request data, FIXME non portable?
+ int nmsg = 1;
+ zmq::message_t request(sizeof(int));
+ memcpy ((void *) request.data (), &nmsg, sizeof(int));
+ d_socket->send(request);
+ //std::cout << "sent request...\n";
+ }
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+ //std::cout << "rx response...\n";
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+ //std::cout << "rx response... got data\n";
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
+ int
+ req_msg_source_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h
new file mode 100644
index 0000000000..3a691743b5
--- /dev/null
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -0,0 +1,56 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H
+#define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H
+
+#include <gnuradio/zeromq/req_msg_source.h>
+#include <zmq.hpp>
+
+namespace gr {
+ namespace zeromq {
+
+ class req_msg_source_impl : public req_msg_source
+ {
+ private:
+ int d_timeout;
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+ void readloop();
+ boost::thread *d_thread;
+
+ public:
+ req_msg_source_impl(char *address, int timeout);
+ ~req_msg_source_impl();
+
+ bool start();
+ bool stop();
+ bool d_finished;
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 54cd659972..5c5071e15e 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2013 Free Software Foundation, Inc.
+ * Copyright 2013,2014 Free Software Foundation, Inc.
*
* This file is part of GNU Radio.
*
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "req_source_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
req_source::sptr
- req_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new req_source_impl(itemsize, vlen, address, timeout));
+ (new req_source_impl(itemsize, vlen, address, timeout, pass_tags));
}
- req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("req_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -89,9 +90,23 @@ namespace gr {
zmq::message_t reply;
d_socket->recv(&reply);
+ // Deserialize header data / tags
+ std::string buf(static_cast<char*>(reply.data()), reply.size());
+
+ if(d_pass_tags){
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ for(size_t i=0; i<tags.size(); i++){
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
+
+
// Copy to ouput buffer and return
- memcpy(out, (void *)reply.data(), reply.size());
- return reply.size()/(d_itemsize*d_vlen);
+ memcpy(out, (void *)&buf[0], buf.size());
+ return buf.size()/(d_itemsize*d_vlen);
}
return 0;
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index f61b1d1ce4..7c6bc5310a 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2013 Free Software Foundation, Inc.
+ * Copyright 2013,2014 Free Software Foundation, Inc.
*
* This file is part of GNU Radio.
*
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags);
~req_source_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc
new file mode 100644
index 0000000000..f7a9bc9c26
--- /dev/null
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -0,0 +1,117 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/io_signature.h>
+#include "sub_msg_source_impl.h"
+#include "tag_headers.h"
+
+namespace gr {
+ namespace zeromq {
+
+ sub_msg_source::sptr
+ sub_msg_source::make(char *address, int timeout)
+ {
+ return gnuradio::get_initial_sptr
+ (new sub_msg_source_impl(address, timeout));
+ }
+
+ sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout)
+ : gr::sync_block("sub_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+ {
+ int major, minor, patch;
+ zmq::version (&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout*1000;
+ }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
+ //int time = 0;
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket->connect (address);
+
+ message_port_register_out(pmt::mp("out"));
+ }
+
+ sub_msg_source_impl::~sub_msg_source_impl()
+ {
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+ }
+
+ bool sub_msg_source_impl::start(){
+ d_finished = false;
+ d_thread = new boost::thread( boost::bind( &sub_msg_source_impl::readloop , this ) );
+ return true;
+ }
+
+ bool sub_msg_source_impl::stop(){
+ d_finished = true;
+ d_thread->join();
+ return true;
+ }
+
+ void sub_msg_source_impl::readloop(){
+ while(!d_finished){
+ //std::cout << "readloop\n";
+
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll (&items[0], 1, d_timeout);
+
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
+
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
+
+ //std::cout << "got msg...\n";
+
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ //std::cout << m << "\n";
+ message_port_pub(pmt::mp("out"), m);
+
+ } else {
+ usleep(100);
+ }
+ }
+ }
+
+ int
+ sub_msg_source_impl::work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ return noutput_items;
+ }
+
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h
new file mode 100644
index 0000000000..424eb470ad
--- /dev/null
+++ b/gr-zeromq/lib/sub_msg_source_impl.h
@@ -0,0 +1,56 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013,2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H
+#define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H
+
+#include <gnuradio/zeromq/sub_msg_source.h>
+#include "zmq.hpp"
+
+namespace gr {
+ namespace zeromq {
+
+ class sub_msg_source_impl : public sub_msg_source
+ {
+ private:
+ int d_timeout; // microseconds, -1 is blocking
+ zmq::context_t *d_context;
+ zmq::socket_t *d_socket;
+ void readloop();
+ boost::thread *d_thread;
+
+ public:
+ sub_msg_source_impl(char *address, int timeout);
+ ~sub_msg_source_impl();
+
+ bool start();
+ bool stop();
+ bool d_finished;
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ };
+
+ } // namespace zeromq
+} // namespace gr
+
+#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 38ddc78e59..813ff5a1c0 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -26,22 +26,23 @@
#include <gnuradio/io_signature.h>
#include "sub_source_impl.h"
+#include "tag_headers.h"
namespace gr {
namespace zeromq {
sub_source::sptr
- sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
- (new sub_source_impl(itemsize, vlen, address, timeout));
+ (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags));
}
- sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -81,16 +82,31 @@ namespace gr {
// Receive data
zmq::message_t msg;
d_socket->recv(&msg);
- // Copy to ouput buffer and return
- if (msg.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items);
+ // Deserialize header data / tags
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+
+ if(d_pass_tags){
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ //int olen = buf.size();
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ //std::cout << "SUB: Header Len = " << olen - buf.size() << ", data len = " << buf.size() << "\n";
+ for(size_t i=0; i<tags.size(); i++){
+ //std::cout << "add item tag ... (offset = " << tags[i].offset << " rcv_offset = " << rcv_offset << " nitems_read(0) = " << nitems_written(0) << "\n";
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
+
+ // Copy to ouput buffer and return
+ if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
+ memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
return noutput_items;
}
else {
- memcpy(out, (void *)msg.data(), msg.size());
-
- return msg.size()/(d_itemsize*d_vlen);
+ memcpy(out, (void *)&buf[0], buf.size());
+ return buf.size()/(d_itemsize*d_vlen);
}
}
else {
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 44647527b1..0fa8d179cd 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout; // microseconds, -1 is blocking
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags);
~sub_source_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc
new file mode 100644
index 0000000000..c97066629c
--- /dev/null
+++ b/gr-zeromq/lib/tag_headers.cc
@@ -0,0 +1,101 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <gnuradio/io_signature.h>
+#include <gnuradio/block.h>
+#include <sstream>
+#include <cstring>
+
+#define GR_HEADER_MAGIC 0x5FF0
+#define GR_HEADER_VERSION 0x01
+
+namespace gr {
+ namespace zeromq {
+
+ std::string
+ gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags) {
+
+ uint16_t header_magic = GR_HEADER_MAGIC;
+ uint8_t header_version = GR_HEADER_VERSION;
+
+ std::stringstream ss;
+ size_t ntags = tags.size();
+ ss.write( reinterpret_cast< const char* >( &header_magic ), sizeof(uint16_t) );
+ ss.write( reinterpret_cast< const char* >( &header_version ), sizeof(uint8_t) );
+
+ ss.write( reinterpret_cast< const char* >( &offset ), sizeof(uint64_t) ); // offset
+ ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags
+ std::stringbuf sb("");
+ //std::cout << "TX TAGS: (offset="<<offset<<" ntags="<<ntags<<")\n";
+ for(size_t i=0; i<tags.size(); i++){
+ //std::cout << "TX TAG: (" << tags[i].offset << ", " << tags[i].key << ", " << tags[i].value << ", " << tags[i].srcid << ")\n";
+ ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset
+ sb.str("");
+ pmt::serialize( tags[i].key, sb ); // key
+ pmt::serialize( tags[i].value, sb ); // value
+ pmt::serialize( tags[i].srcid, sb ); // srcid
+ ss.write( sb.str().c_str() , sb.str().length() );
+ }
+
+ return ss.str();
+ }
+
+ std::string
+ parse_tag_header(std::string &buf_in, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out) {
+
+ std::istringstream iss( buf_in );
+ size_t rcv_ntags;
+
+ uint16_t header_magic;
+ uint8_t header_version;
+
+ iss.read( (char*)&header_magic, sizeof(uint16_t ) );
+ iss.read( (char*)&header_version, sizeof(uint8_t ) );
+ if(header_magic != GR_HEADER_MAGIC){
+ throw std::runtime_error("gr header magic does not match!");
+ }
+ if(header_version != 1){
+ throw std::runtime_error("gr header version too high!");
+ }
+
+ iss.read( (char*)&offset_out, sizeof(uint64_t ) );
+ iss.read( (char*)&rcv_ntags, sizeof(size_t ) );
+ //std::cout << "RX TAGS: (offset="<<offset_out<<" ntags="<<rcv_ntags<<")\n";
+ int rd_offset = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(size_t);
+ std::stringbuf sb( iss.str().substr(rd_offset) );
+
+ for(size_t i=0; i<rcv_ntags; i++){
+ gr::tag_t newtag;
+ sb.sgetn( (char*) &(newtag.offset), sizeof(uint64_t) );
+ newtag.key = pmt::deserialize( sb );
+ newtag.value = pmt::deserialize( sb );
+ newtag.srcid = pmt::deserialize( sb );
+ //std::cout << "RX TAG: (" << newtag.offset << ", " << newtag.key << ", " << newtag.value << ", " << newtag.srcid << ")\n";
+ tags_out.push_back(newtag);
+ iss.str(sb.str());
+ }
+
+ int ndata = sb.in_avail();
+ return iss.str().substr(iss.str().size() - ndata);
+ }
+ } /* namespace zeromq */
+} /* namespace gr */
diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h
new file mode 100644
index 0000000000..4c7a81238e
--- /dev/null
+++ b/gr-zeromq/lib/tag_headers.h
@@ -0,0 +1,40 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio.
+ *
+ * This is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this software; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef ZEROMQ_TAG_HEADERS_H
+#define ZEROMQ_TAG_HEADERS_H
+
+#include <gnuradio/io_signature.h>
+#include <gnuradio/block.h>
+#include <sstream>
+#include <cstring>
+
+namespace gr {
+ namespace zeromq {
+
+ std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags);
+ std::string parse_tag_header(std::string &buf_in, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out);
+
+ } /* namespace zeromq */
+} /* namespace gr */
+
+#endif /* ZEROMQ_TAG_HEADERS_H */
diff --git a/gr-zeromq/swig/zeromq_swig.i b/gr-zeromq/swig/zeromq_swig.i
index bcd90550d4..78669bdc69 100644
--- a/gr-zeromq/swig/zeromq_swig.i
+++ b/gr-zeromq/swig/zeromq_swig.i
@@ -29,23 +29,41 @@
%{
#include "gnuradio/zeromq/pub_sink.h"
+#include "gnuradio/zeromq/pub_msg_sink.h"
#include "gnuradio/zeromq/push_sink.h"
+#include "gnuradio/zeromq/push_msg_sink.h"
#include "gnuradio/zeromq/rep_sink.h"
+#include "gnuradio/zeromq/rep_msg_sink.h"
#include "gnuradio/zeromq/sub_source.h"
+#include "gnuradio/zeromq/sub_msg_source.h"
#include "gnuradio/zeromq/pull_source.h"
+#include "gnuradio/zeromq/pull_msg_source.h"
#include "gnuradio/zeromq/req_source.h"
+#include "gnuradio/zeromq/req_msg_source.h"
%}
%include "gnuradio/zeromq/pub_sink.h"
+%include "gnuradio/zeromq/pub_msg_sink.h"
%include "gnuradio/zeromq/push_sink.h"
+%include "gnuradio/zeromq/push_msg_sink.h"
%include "gnuradio/zeromq/rep_sink.h"
+%include "gnuradio/zeromq/rep_msg_sink.h"
%include "gnuradio/zeromq/sub_source.h"
+%include "gnuradio/zeromq/sub_msg_source.h"
%include "gnuradio/zeromq/pull_source.h"
+%include "gnuradio/zeromq/pull_msg_source.h"
%include "gnuradio/zeromq/req_source.h"
+%include "gnuradio/zeromq/req_msg_source.h"
GR_SWIG_BLOCK_MAGIC2(zeromq, pub_sink);
+GR_SWIG_BLOCK_MAGIC2(zeromq, pub_msg_sink);
GR_SWIG_BLOCK_MAGIC2(zeromq, push_sink);
+GR_SWIG_BLOCK_MAGIC2(zeromq, push_msg_sink);
GR_SWIG_BLOCK_MAGIC2(zeromq, rep_sink);
+GR_SWIG_BLOCK_MAGIC2(zeromq, rep_msg_sink);
GR_SWIG_BLOCK_MAGIC2(zeromq, sub_source);
+GR_SWIG_BLOCK_MAGIC2(zeromq, sub_msg_source);
GR_SWIG_BLOCK_MAGIC2(zeromq, pull_source);
+GR_SWIG_BLOCK_MAGIC2(zeromq, pull_msg_source);
GR_SWIG_BLOCK_MAGIC2(zeromq, req_source);
+GR_SWIG_BLOCK_MAGIC2(zeromq, req_msg_source);