diff options
54 files changed, 1948 insertions, 77 deletions
diff --git a/gr-zeromq/grc/CMakeLists.txt b/gr-zeromq/grc/CMakeLists.txt index 548c2f28ed..7807bcfe9d 100644 --- a/gr-zeromq/grc/CMakeLists.txt +++ b/gr-zeromq/grc/CMakeLists.txt @@ -19,11 +19,17 @@ install(FILES zeromq_pub_sink.xml + zeromq_pub_msg_sink.xml zeromq_sub_source.xml + zeromq_sub_msg_source.xml zeromq_push_sink.xml + zeromq_push_msg_sink.xml zeromq_pull_source.xml + zeromq_pull_msg_source.xml zeromq_rep_sink.xml + zeromq_rep_msg_sink.xml zeromq_req_source.xml + zeromq_req_msg_source.xml DESTINATION share/gnuradio/grc/blocks ) diff --git a/gr-zeromq/grc/zeromq_pub_msg_sink.xml b/gr-zeromq/grc/zeromq_pub_msg_sink.xml new file mode 100644 index 0000000000..8f541ce731 --- /dev/null +++ b/gr-zeromq/grc/zeromq_pub_msg_sink.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ PUB Message Sink</name> + <key>zeromq_pub_msg_sink</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.pub_msg_sink($address, $timeout)</make> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (msec)</name> + <key>timeout</key> + <value>100</value> + <type>float</type> + </param> + + <sink> + <name>in</name> + <type>message</type> + <optional>1</optional> + </sink> + +</block> diff --git a/gr-zeromq/grc/zeromq_pub_sink.xml b/gr-zeromq/grc/zeromq_pub_sink.xml index e78cd58b14..7babc9eb80 100644 --- a/gr-zeromq/grc/zeromq_pub_sink.xml +++ b/gr-zeromq/grc/zeromq_pub_sink.xml @@ -4,7 +4,7 @@ <key>zeromq_pub_sink</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout)</make> + <make>zeromq.pub_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> <param> <name>IO Type</name> @@ -57,6 +57,13 @@ <type>float</type> </param> + <param> + <name>Pass Tags</name> + <key>pass_tags</key> + <value>False</value> + <type>bool</type> + </param> + <sink> <name>in</name> <type>$type</type> diff --git a/gr-zeromq/grc/zeromq_pull_msg_source.xml b/gr-zeromq/grc/zeromq_pull_msg_source.xml new file mode 100644 index 0000000000..c0a6ca5a17 --- /dev/null +++ b/gr-zeromq/grc/zeromq_pull_msg_source.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ PULL Message Source</name> + <key>zeromq_pull_msg_source</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.pull_msg_source($address, $timeout)</make> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (msec)</name> + <key>timeout</key> + <value>100</value> + <type>float</type> + </param> + + <source> + <name>out</name> + <type>message</type> + <optional>1</optional> + </source> + +</block> diff --git a/gr-zeromq/grc/zeromq_pull_source.xml b/gr-zeromq/grc/zeromq_pull_source.xml index 3833cd7341..c8a7b890da 100644 --- a/gr-zeromq/grc/zeromq_pull_source.xml +++ b/gr-zeromq/grc/zeromq_pull_source.xml @@ -4,7 +4,7 @@ <key>zeromq_pull_source</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout)</make> + <make>zeromq.pull_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> <param> <name>IO Type</name> @@ -57,6 +57,13 @@ <type>float</type> </param> + <param> + <name>Pass Tags</name> + <key>pass_tags</key> + <value>False</value> + <type>bool</type> + </param> + <source> <name>out</name> <type>$type</type> diff --git a/gr-zeromq/grc/zeromq_push_msg_sink.xml b/gr-zeromq/grc/zeromq_push_msg_sink.xml new file mode 100644 index 0000000000..65626c0761 --- /dev/null +++ b/gr-zeromq/grc/zeromq_push_msg_sink.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ PUSH Message Sink</name> + <key>zeromq_push_msg_sink</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.push_msg_sink($address, $timeout)</make> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (msec)</name> + <key>timeout</key> + <value>100</value> + <type>float</type> + </param> + + <sink> + <name>in</name> + <type>message</type> + <optional>1</optional> + </sink> + +</block> diff --git a/gr-zeromq/grc/zeromq_push_sink.xml b/gr-zeromq/grc/zeromq_push_sink.xml index 54bb8d0d98..eb6ead5c65 100644 --- a/gr-zeromq/grc/zeromq_push_sink.xml +++ b/gr-zeromq/grc/zeromq_push_sink.xml @@ -4,7 +4,7 @@ <key>zeromq_push_sink</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout)</make> + <make>zeromq.push_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> <param> <name>IO Type</name> @@ -57,6 +57,13 @@ <type>float</type> </param> + <param> + <name>Pass Tags</name> + <key>pass_tags</key> + <value>False</value> + <type>bool</type> + </param> + <sink> <name>in</name> <type>$type</type> diff --git a/gr-zeromq/grc/zeromq_rep_msg_sink.xml b/gr-zeromq/grc/zeromq_rep_msg_sink.xml new file mode 100644 index 0000000000..f978f442a7 --- /dev/null +++ b/gr-zeromq/grc/zeromq_rep_msg_sink.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ REP Message Sink</name> + <key>zeromq_rep_msg_sink</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.rep_msg_sink($address, $timeout)</make> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (msec)</name> + <key>timeout</key> + <value>100</value> + <type>float</type> + </param> + + <sink> + <name>in</name> + <type>message</type> + <optional>1</optional> + </sink> + +</block> diff --git a/gr-zeromq/grc/zeromq_rep_sink.xml b/gr-zeromq/grc/zeromq_rep_sink.xml index 1320bfef82..2209b4f3ff 100644 --- a/gr-zeromq/grc/zeromq_rep_sink.xml +++ b/gr-zeromq/grc/zeromq_rep_sink.xml @@ -4,7 +4,7 @@ <key>zeromq_rep_sink</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout)</make> + <make>zeromq.rep_sink($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> <param> <name>IO Type</name> @@ -57,6 +57,13 @@ <type>float</type> </param> + <param> + <name>Pass Tags</name> + <key>pass_tags</key> + <value>False</value> + <type>bool</type> + </param> + <sink> <name>in</name> <type>$type</type> diff --git a/gr-zeromq/grc/zeromq_req_msg_source.xml b/gr-zeromq/grc/zeromq_req_msg_source.xml new file mode 100644 index 0000000000..3ba7488223 --- /dev/null +++ b/gr-zeromq/grc/zeromq_req_msg_source.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ REQ Message Source</name> + <key>zeromq_req_msg_source</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.req_msg_source($address, $timeout)</make> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (msec)</name> + <key>timeout</key> + <value>100</value> + <type>float</type> + </param> + + <source> + <name>out</name> + <type>message</type> + <optional>1</optional> + </source> + +</block> diff --git a/gr-zeromq/grc/zeromq_req_source.xml b/gr-zeromq/grc/zeromq_req_source.xml index d2951d2256..050718c1bf 100644 --- a/gr-zeromq/grc/zeromq_req_source.xml +++ b/gr-zeromq/grc/zeromq_req_source.xml @@ -4,7 +4,7 @@ <key>zeromq_req_source</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout)</make> + <make>zeromq.req_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> <param> <name>IO Type</name> @@ -57,6 +57,13 @@ <type>float</type> </param> + <param> + <name>Pass Tags</name> + <key>pass_tags</key> + <value>False</value> + <type>bool</type> + </param> + <source> <name>out</name> <type>$type</type> diff --git a/gr-zeromq/grc/zeromq_sub_msg_source.xml b/gr-zeromq/grc/zeromq_sub_msg_source.xml new file mode 100644 index 0000000000..32a1c9862b --- /dev/null +++ b/gr-zeromq/grc/zeromq_sub_msg_source.xml @@ -0,0 +1,28 @@ +<?xml version="1.0"?> +<block> + <name>ZMQ SUB Message Source</name> + <key>zeromq_sub_msg_source</key> + <category>ZeroMQ Interfaces</category> + <import>from gnuradio import zeromq</import> + <make>zeromq.sub_msg_source($address, $timeout)</make> + + <param> + <name>Address</name> + <key>address</key> + <type>string</type> + </param> + + <param> + <name>Timeout (msec)</name> + <key>timeout</key> + <value>100</value> + <type>float</type> + </param> + + <source> + <name>out</name> + <type>message</type> + <optional>1</optional> + </source> + +</block> diff --git a/gr-zeromq/grc/zeromq_sub_source.xml b/gr-zeromq/grc/zeromq_sub_source.xml index 2ec8cfa887..86af5063e3 100644 --- a/gr-zeromq/grc/zeromq_sub_source.xml +++ b/gr-zeromq/grc/zeromq_sub_source.xml @@ -4,7 +4,7 @@ <key>zeromq_sub_source</key> <category>ZeroMQ Interfaces</category> <import>from gnuradio import zeromq</import> - <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout)</make> + <make>zeromq.sub_source($type.itemsize, $vlen, $address, $timeout, $pass_tags)</make> <param> <name>IO Type</name> @@ -57,6 +57,13 @@ <type>float</type> </param> + <param> + <name>Pass Tags</name> + <key>pass_tags</key> + <value>False</value> + <type>bool</type> + </param> + <source> <name>out</name> <type>$type</type> diff --git a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt index 970cf1ce9a..d365532003 100644 --- a/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt +++ b/gr-zeromq/include/gnuradio/zeromq/CMakeLists.txt @@ -23,11 +23,17 @@ install(FILES api.h pub_sink.h + pub_msg_sink.h sub_source.h + sub_msg_source.h pull_source.h + pull_msg_source.h push_sink.h + push_msg_sink.h rep_sink.h + rep_msg_sink.h req_source.h + req_msg_source.h DESTINATION ${GR_INCLUDE_DIR}/gnuradio/zeromq COMPONENT "zeromq_devel" diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h new file mode 100644 index 0000000000..d626ddd147 --- /dev/null +++ b/gr-zeromq/include/gnuradio/zeromq/pub_msg_sink.h @@ -0,0 +1,60 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_PUB_MSG_SINK_H +#define INCLUDED_ZEROMQ_PUB_MSG_SINK_H + +#include <gnuradio/zeromq/api.h> +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + /*! + * \brief Sink the contents of a stream to a ZMQ PUB socket + * \ingroup zeromq + * + * \details + * This block acts a a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ PUB socket. A PUB socket may + * have subscribers and will pass all incoming stream data to each + * subscriber. Subscribers can be either another gr-zeromq source + * block or a non-GNU Radio ZMQ socket. + */ + class ZEROMQ_API pub_msg_sink : virtual public gr::sync_block + { + public: + typedef boost::shared_ptr<pub_msg_sink> sptr; + + /*! + * \brief Return a shared_ptr to a new instance of zeromq::pub_msg_sink. + * + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments + */ + static sptr make(char *address, int timeout=100); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_H */ diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h index a60fb15c88..11f251ede2 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h @@ -53,7 +53,7 @@ namespace gr { * \param address ZMQ socket address specifier * \param timeout Receive timeout in seconds, default is 100ms, 1us increments */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h new file mode 100644 index 0000000000..7d87ba1574 --- /dev/null +++ b/gr-zeromq/include/gnuradio/zeromq/pull_msg_source.h @@ -0,0 +1,58 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H +#define INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H + +#include <gnuradio/zeromq/api.h> +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + /*! + * \brief Receive messages on ZMQ PULL socket and source stream + * \ingroup zeromq + * + * \details + * This block will connect to a ZMQ PUSH socket, then produce all + * incoming messages as streaming output. + */ + class ZEROMQ_API pull_msg_source : virtual public gr::sync_block + { + public: + typedef boost::shared_ptr<pull_msg_source> sptr; + + /*! + * \brief Return a shared_ptr to a new instance of gr::zeromq::pull_msg_source. + * + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments + * + */ + static sptr make(char *address, int timeout=100); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_PULL_MSG_SOURCE_H */ diff --git a/gr-zeromq/include/gnuradio/zeromq/pull_source.h b/gr-zeromq/include/gnuradio/zeromq/pull_source.h index 5c1d37d353..6dec9c2b34 100644 --- a/gr-zeromq/include/gnuradio/zeromq/pull_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/pull_source.h @@ -51,7 +51,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h new file mode 100644 index 0000000000..27a19b210b --- /dev/null +++ b/gr-zeromq/include/gnuradio/zeromq/push_msg_sink.h @@ -0,0 +1,62 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_PUSH_MSG_SINK_H +#define INCLUDED_ZEROMQ_PUSH_MSG_SINK_H + +#include <gnuradio/zeromq/api.h> +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + /*! + * \brief Sink the contents of a stream to a ZMQ PUSH socket + * \ingroup zeromq + * + * \details + * This block acts a a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ PUSH socket. A PUSH socket + * will round-robin send its messages to each connected ZMQ PULL + * socket, either another gr-zeromq source block or a regular, + * non-GNU Radio ZMQ socket. + * + */ + class ZEROMQ_API push_msg_sink : virtual public gr::sync_block + { + public: + typedef boost::shared_ptr<push_msg_sink> sptr; + + /*! + * \brief Return a shared_ptr to a new instance of gr::zeromq::push_msg_sink + * + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments + * + */ + static sptr make(char *address, int timeout=100); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_PUSH_MSG_SINK_H */ diff --git a/gr-zeromq/include/gnuradio/zeromq/push_sink.h b/gr-zeromq/include/gnuradio/zeromq/push_sink.h index b54a1e40d8..1b8999e409 100644 --- a/gr-zeromq/include/gnuradio/zeromq/push_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/push_sink.h @@ -55,7 +55,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h new file mode 100644 index 0000000000..b0fd23c892 --- /dev/null +++ b/gr-zeromq/include/gnuradio/zeromq/rep_msg_sink.h @@ -0,0 +1,60 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_REP_MSG_SINK_H +#define INCLUDED_ZEROMQ_REP_MSG_SINK_H + +#include <gnuradio/zeromq/api.h> +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + /*! + * \brief Sink the contents of a stream to a ZMQ REP socket + * \ingroup zeromq + * + * \details + * This block acts a a streaming sink for a GNU Radio flowgraph + * and writes its contents to a ZMQ REP socket. A REP socket will + * only send its contents to an attached REQ socket when it + * requests items. + */ + class ZEROMQ_API rep_msg_sink : virtual public gr::sync_block + { + public: + typedef boost::shared_ptr<rep_msg_sink> sptr; + + /*! + * \brief Return a shared_ptr to a new instance of zeromq::rep_msg_sink. + * + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments + * + */ + static sptr make(char *address, int timeout=100); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_REP_MSG_SINK_H */ diff --git a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h index 1da325257f..6d3c47b626 100644 --- a/gr-zeromq/include/gnuradio/zeromq/rep_sink.h +++ b/gr-zeromq/include/gnuradio/zeromq/rep_sink.h @@ -53,7 +53,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h new file mode 100644 index 0000000000..cf9183375f --- /dev/null +++ b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h @@ -0,0 +1,59 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H +#define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H + +#include <gnuradio/zeromq/api.h> +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + /*! + * \brief Receive messages on ZMQ REQ socket and source stream + * \ingroup zeromq + * + * \details + * This block will connect to a ZMQ REP socket, then produce all + * incoming messages as streaming output. + */ + class ZEROMQ_API req_msg_source : virtual public gr::sync_block + { + public: + typedef boost::shared_ptr<req_msg_source> sptr; + + /*! + * \brief Return a shared_ptr to a new instance of zeromq::req_msg_source. + * + * + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments + * + */ + static sptr make(char *address, int timeout=100); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_SOURCE_REQREP_H */ diff --git a/gr-zeromq/include/gnuradio/zeromq/req_source.h b/gr-zeromq/include/gnuradio/zeromq/req_source.h index c272c28510..d9f55d3c72 100644 --- a/gr-zeromq/include/gnuradio/zeromq/req_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/req_source.h @@ -52,7 +52,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h new file mode 100644 index 0000000000..3af8fdbec9 --- /dev/null +++ b/gr-zeromq/include/gnuradio/zeromq/sub_msg_source.h @@ -0,0 +1,58 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H +#define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H + +#include <gnuradio/zeromq/api.h> +#include <gnuradio/sync_block.h> + +namespace gr { + namespace zeromq { + + /*! + * \brief Receive messages on ZMQ SUB socket and source stream + * \ingroup zeromq + * + * \details + * This block will connect to a ZMQ PUB socket, then produce all + * incoming messages as streaming output. + */ + class ZEROMQ_API sub_msg_source : virtual public gr::sync_block + { + public: + typedef boost::shared_ptr<sub_msg_source> sptr; + + /*! + * \brief Return a shared_ptr to a new instance of gr::zeromq::sub_msg_source. + * + * \param address ZMQ socket address specifier + * \param timeout Receive timeout in seconds, default is 100ms, 1us increments + * + */ + static sptr make(char *address, int timeout=100); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_H */ diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_source.h index 9deaa7f3ff..f97dc5ac2c 100644 --- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h +++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h @@ -51,7 +51,7 @@ namespace gr { * \param timeout Receive timeout in seconds, default is 100ms, 1us increments * */ - static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100); + static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false); }; } // namespace zeromq diff --git a/gr-zeromq/lib/CMakeLists.txt b/gr-zeromq/lib/CMakeLists.txt index 90cbb12451..941e5ff12b 100644 --- a/gr-zeromq/lib/CMakeLists.txt +++ b/gr-zeromq/lib/CMakeLists.txt @@ -38,11 +38,18 @@ endif(ENABLE_GR_CTRLPORT) ######################################################################## list(APPEND zeromq_sources pub_sink_impl.cc + pub_msg_sink_impl.cc sub_source_impl.cc + sub_msg_source_impl.cc pull_source_impl.cc + pull_msg_source_impl.cc push_sink_impl.cc + push_msg_sink_impl.cc rep_sink_impl.cc + rep_msg_sink_impl.cc req_source_impl.cc + req_msg_source_impl.cc + tag_headers.cc ) #Add Windows DLL resource file if using MSVC diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc new file mode 100644 index 0000000000..0264aef83b --- /dev/null +++ b/gr-zeromq/lib/pub_msg_sink_impl.cc @@ -0,0 +1,88 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "pub_msg_sink_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + pub_msg_sink::sptr + pub_msg_sink::make(char *address, int timeout) + { + return gnuradio::get_initial_sptr + (new pub_msg_sink_impl(address, timeout)); + } + + pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout) + : gr::sync_block("pub_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) + { + int major, minor, patch; + zmq::version (&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout*1000; + } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->bind(address); + + message_port_register_in(pmt::mp("in")); + set_msg_handler( pmt::mp("in"), + boost::bind(&pub_msg_sink_impl::handler, this, _1)); + } + + pub_msg_sink_impl::~pub_msg_sink_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + void pub_msg_sink_impl::handler(pmt::pmt_t msg){ + std::stringbuf sb(""); + pmt::serialize( msg, sb ); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); + memcpy( zmsg.data(), s.c_str(), s.size() ); + d_socket->send(zmsg); + } + + int + pub_msg_sink_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + return noutput_items; + } + + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h new file mode 100644 index 0000000000..4bbf7ee944 --- /dev/null +++ b/gr-zeromq/lib/pub_msg_sink_impl.h @@ -0,0 +1,52 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H +#define INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H + +#include <gnuradio/zeromq/pub_msg_sink.h> +#include <zmq.hpp> + +namespace gr { + namespace zeromq { + + class pub_msg_sink_impl : public pub_msg_sink + { + private: + float d_timeout; + zmq::context_t *d_context; + zmq::socket_t *d_socket; + + public: + pub_msg_sink_impl(char *address, int timeout); + ~pub_msg_sink_impl(); + + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + void handler(pmt::pmt_t msg); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index 13f86045d7..5afcb722b1 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "pub_sink_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { pub_sink::sptr - pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout) + pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new pub_sink_impl(itemsize, vlen, address, timeout)); + (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags)); } - pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout) + pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("pub_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -69,9 +70,21 @@ namespace gr { { const char *in = (const char *)input_items[0]; + // encode the current offset, # tags, and tags into header + std::string header(""); + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } + // create message copy and send - zmq::message_t msg(d_itemsize*d_vlen*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); + zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); + //std::cout << "PUB: Header Len: " << header.length() << ", Data Length: " << d_itemsize*d_vlen*noutput_items << "\n"; + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); d_socket->send(msg); return noutput_items; diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index 9c956ef2fa..100b0f5b8c 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -37,9 +37,10 @@ namespace gr { float d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout); + pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); ~pub_sink_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc new file mode 100644 index 0000000000..0c848fc771 --- /dev/null +++ b/gr-zeromq/lib/pull_msg_source_impl.cc @@ -0,0 +1,118 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "pull_msg_source_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + pull_msg_source::sptr + pull_msg_source::make(char *address, int timeout) + { + return gnuradio::get_initial_sptr + (new pull_msg_source_impl(address, timeout)); + } + + pull_msg_source_impl::pull_msg_source_impl(char *address, int timeout) + : gr::sync_block("pull_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) + { + int major, minor, patch; + zmq::version (&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout*1000; + } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->connect (address); + + message_port_register_out(pmt::mp("out")); + } + + pull_msg_source_impl::~pull_msg_source_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + bool pull_msg_source_impl::start(){ + d_finished = false; + d_thread = new boost::thread( boost::bind( &pull_msg_source_impl::readloop , this ) ); + return true; + } + + bool pull_msg_source_impl::stop(){ + d_finished = true; + d_thread->join(); + return true; + } + + void pull_msg_source_impl::readloop(){ + while(!d_finished){ + //std::cout << "readloop\n"; + + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll (&items[0], 1, d_timeout); + + // If we got a reply, process + if (items[0].revents & ZMQ_POLLIN) { + + // Receive data + zmq::message_t msg; + d_socket->recv(&msg); + + //std::cout << "got msg...\n"; + + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + //std::cout << m << "\n"; + message_port_pub(pmt::mp("out"), m); + + } else { + usleep(100); + } + } + } + + + int + pull_msg_source_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + return noutput_items; + } + + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/pull_msg_source_impl.h b/gr-zeromq/lib/pull_msg_source_impl.h new file mode 100644 index 0000000000..9ff89ef154 --- /dev/null +++ b/gr-zeromq/lib/pull_msg_source_impl.h @@ -0,0 +1,56 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H +#define INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H + +#include <gnuradio/zeromq/pull_msg_source.h> +#include <zmq.hpp> + +namespace gr { + namespace zeromq { + + class pull_msg_source_impl : public pull_msg_source + { + private: + int d_timeout; // microseconds, -1 is blocking + zmq::context_t *d_context; + zmq::socket_t *d_socket; + void readloop(); + boost::thread *d_thread; + + public: + pull_msg_source_impl(char *address, int timeout); + ~pull_msg_source_impl(); + + bool start(); + bool stop(); + bool d_finished; + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 479b15bb32..87b330a862 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "pull_source_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { pull_source::sptr - pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout) + pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new pull_source_impl(itemsize, vlen, address, timeout)); + (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags)); } - pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout) + pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("pull_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -81,16 +82,30 @@ namespace gr { // Receive data zmq::message_t msg; d_socket->recv(&msg); + + // check header for tags... + std::string buf(static_cast<char*>(msg.data()), msg.size()); + if(d_pass_tags){ + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } + + // Copy to ouput buffer and return - if (msg.size() >= d_itemsize*d_vlen*noutput_items) { - memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items); + if (buf.size() >= d_itemsize*d_vlen*noutput_items) { + memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); return noutput_items; } else { - memcpy(out, (void *)msg.data(), msg.size()); + memcpy(out, (void *)&buf[0], buf.size()); - return msg.size()/(d_itemsize*d_vlen); + return buf.size()/(d_itemsize*d_vlen); } } else { diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h index e69de81dcd..757867998d 100644 --- a/gr-zeromq/lib/pull_source_impl.h +++ b/gr-zeromq/lib/pull_source_impl.h @@ -37,9 +37,10 @@ namespace gr { int d_timeout; // microseconds, -1 is blocking zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout); + pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); ~pull_source_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc new file mode 100644 index 0000000000..6266cd6a57 --- /dev/null +++ b/gr-zeromq/lib/push_msg_sink_impl.cc @@ -0,0 +1,119 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "push_msg_sink_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + push_msg_sink::sptr + push_msg_sink::make(char *address, int timeout) + { + return gnuradio::get_initial_sptr + (new push_msg_sink_impl(address, timeout)); + } + + push_msg_sink_impl::push_msg_sink_impl(char *address, int timeout) + : gr::sync_block("push_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) + { + int major, minor, patch; + zmq::version (&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout*1000; + } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->bind (address); + + message_port_register_in(pmt::mp("in")); + set_msg_handler( pmt::mp("in"), + boost::bind(&push_msg_sink_impl::handler, this, _1)); + } + + push_msg_sink_impl::~push_msg_sink_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + void push_msg_sink_impl::handler(pmt::pmt_t msg){ + std::stringbuf sb(""); + pmt::serialize( msg, sb ); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); + memcpy( zmsg.data(), s.c_str(), s.size() ); + d_socket->send(zmsg); + } + + int + push_msg_sink_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + return noutput_items; + +/* const char *in = (const char *) input_items[0]; + + zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; + zmq::poll (&itemsout[0], 1, d_timeout); + + // If we got a reply, process + if (itemsout[0].revents & ZMQ_POLLOUT) { + + // encode the current offset, # tags, and tags into header + std::string header(""); + + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } + + // create message copy and send + zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); + d_socket->send(msg); + + return noutput_items; + } + else { + return 0; + }*/ + } + + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h new file mode 100644 index 0000000000..b77c998506 --- /dev/null +++ b/gr-zeromq/lib/push_msg_sink_impl.h @@ -0,0 +1,52 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_PUSH_MSG_SINK_IMPL_H +#define INCLUDED_ZEROMQ_PUSH_MSG_SINK_IMPL_H + +#include <gnuradio/zeromq/push_msg_sink.h> +#include <zmq.hpp> + +namespace gr { + namespace zeromq { + + class push_msg_sink_impl : public push_msg_sink + { + private: + float d_timeout; + zmq::context_t *d_context; + zmq::socket_t *d_socket; + + public: + push_msg_sink_impl(char *address, int timeout); + ~push_msg_sink_impl(); + + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + void handler(pmt::pmt_t msg); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_ZMQ_PUSH_MSG_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index d949a7f95f..4cc9ab9c2a 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "push_sink_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { push_sink::sptr - push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout) + push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new push_sink_impl(itemsize, vlen, address, timeout)); + (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags)); } - push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout) + push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("push_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -74,10 +75,23 @@ namespace gr { // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { - // create message copy and send - zmq::message_t msg(d_itemsize*d_vlen*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - d_socket->send(msg); + + // encode the current offset, # tags, and tags into header + std::string header(""); + + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } + + // create message copy and send + zmq::message_t msg(header.length() + d_itemsize*d_vlen*noutput_items); + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items); + d_socket->send(msg); return noutput_items; } diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h index 9a10065eba..924dee3f15 100644 --- a/gr-zeromq/lib/push_sink_impl.h +++ b/gr-zeromq/lib/push_sink_impl.h @@ -37,9 +37,10 @@ namespace gr { float d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout); + push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); ~push_sink_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc new file mode 100644 index 0000000000..0a18a8b3d2 --- /dev/null +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -0,0 +1,141 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "rep_msg_sink_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + rep_msg_sink::sptr + rep_msg_sink::make(char *address, int timeout) + { + return gnuradio::get_initial_sptr + (new rep_msg_sink_impl(address, timeout)); + } + + rep_msg_sink_impl::rep_msg_sink_impl(char *address, int timeout) + : gr::sync_block("rep_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) + { + int major, minor, patch; + zmq::version (&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout*1000; + } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_REP); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->bind (address); + + message_port_register_in(pmt::mp("in")); +// set_msg_handler( pmt::mp("in"), +// boost::bind(&rep_msg_sink_impl::handler, this, _1)); + } + + rep_msg_sink_impl::~rep_msg_sink_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + bool rep_msg_sink_impl::start(){ + d_finished = false; + d_thread = new boost::thread( boost::bind( &rep_msg_sink_impl::readloop , this ) ); + return true; + } + + bool rep_msg_sink_impl::stop(){ + d_finished = true; + d_thread->join(); + return true; + } + +/* + void rep_msg_sink_impl::handler(pmt::pmt_t msg){ + std::stringbuf sb(""); + pmt::serialize( msg, sb ); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); + memcpy( zmsg.data(), s.c_str(), s.size() ); + d_socket->send(zmsg); + } +*/ + + int + rep_msg_sink_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + return noutput_items; + } + + void rep_msg_sink_impl::readloop(){ + + while(!d_finished){ + + // while we have data, wait for query... + while(!empty_p(pmt::mp("in"))){ + + //std::cout << "wait for req ...\n"; + // wait for query... + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll (&items[0], 1, d_timeout); + + // If we got a reply, process + if (items[0].revents & ZMQ_POLLIN) { + //std::cout << "wait for req ... got req\n"; + // receive data request + zmq::message_t request; + d_socket->recv(&request); + int req_output_items = *(static_cast<int*>(request.data())); + if(req_output_items != 1) + throw std::runtime_error("Request was not 1 msg for rep/req request!!"); + + // create message copy and send + //std::cout << "get pmt in\n"; + pmt::pmt_t msg = delete_head_nowait(pmt::mp("in")); + std::stringbuf sb(""); + pmt::serialize( msg, sb ); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); + memcpy( zmsg.data(), s.c_str(), s.size() ); + //std::cout << "send pmt zmq\n"; + d_socket->send(zmsg); + } // if req + + } // while !empty + + } // while !d_finished + + } + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h new file mode 100644 index 0000000000..25bd0e87a0 --- /dev/null +++ b/gr-zeromq/lib/rep_msg_sink_impl.h @@ -0,0 +1,56 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H +#define INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H + +#include <gnuradio/zeromq/rep_msg_sink.h> +#include <zmq.hpp> + +namespace gr { + namespace zeromq { + + class rep_msg_sink_impl : public rep_msg_sink + { + private: + int d_timeout; + zmq::context_t *d_context; + zmq::socket_t *d_socket; + boost::thread *d_thread; + bool d_finished; + void readloop(); + + public: + rep_msg_sink_impl(char *address, int timeout); + ~rep_msg_sink_impl(); + + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + bool start(); + bool stop(); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index a8fd5881e5..88ed6c11c0 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "rep_sink_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { rep_sink::sptr - rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout) + rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new rep_sink_impl(itemsize, vlen, address, timeout)); + (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags)); } - rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout) + rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("rep_sink", gr::io_signature::make(1, 1, itemsize * vlen), gr::io_signature::make(0, 0, 0)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -78,22 +79,25 @@ namespace gr { zmq::message_t request; d_socket->recv(&request); int req_output_items = *(static_cast<int*>(request.data())); + int nitems_send = std::min(noutput_items, req_output_items); + + // encode the current offset, # tags, and tags into header + std::string header(""); + if(d_pass_tags){ + uint64_t offset = nitems_read(0); + std::vector<gr::tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items); + header = gen_tag_header( offset, tags ); + } // create message copy and send - if (noutput_items < req_output_items) { - zmq::message_t msg(d_itemsize*d_vlen*noutput_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*noutput_items); - d_socket->send(msg); + zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send); + if(d_pass_tags) + memcpy((void*) msg.data(), header.c_str(), header.length() ); + memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send); + d_socket->send(msg); - return noutput_items; - } - else { - zmq::message_t msg(d_itemsize*d_vlen*req_output_items); - memcpy((void *)msg.data(), in, d_itemsize*d_vlen*req_output_items); - d_socket->send(msg); - - return req_output_items; - } + return nitems_send; } return 0; diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h index ff69735757..55ebb69bfa 100644 --- a/gr-zeromq/lib/rep_sink_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013,204 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -37,9 +37,10 @@ namespace gr { int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout); + rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); ~rep_sink_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc new file mode 100644 index 0000000000..2dda15208f --- /dev/null +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -0,0 +1,132 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "req_msg_source_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + req_msg_source::sptr + req_msg_source::make(char *address, int timeout) + { + return gnuradio::get_initial_sptr + (new req_msg_source_impl(address, timeout)); + } + + req_msg_source_impl::req_msg_source_impl(char *address, int timeout) + : gr::sync_block("req_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) + { + int major, minor, patch; + zmq::version (&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout*1000; + } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->connect (address); + + message_port_register_out(pmt::mp("out")); + } + + req_msg_source_impl::~req_msg_source_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + bool req_msg_source_impl::start(){ + d_finished = false; + d_thread = new boost::thread( boost::bind( &req_msg_source_impl::readloop , this ) ); + return true; + } + + bool req_msg_source_impl::stop(){ + d_finished = true; + d_thread->join(); + return true; + } + + void req_msg_source_impl::readloop(){ + while(!d_finished){ + //std::cout << "readloop\n"; + + zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } }; + zmq::poll (&itemsout[0], 1, d_timeout); + + // If we got a reply, process + if (itemsout[0].revents & ZMQ_POLLOUT) { + // Request data, FIXME non portable? + int nmsg = 1; + zmq::message_t request(sizeof(int)); + memcpy ((void *) request.data (), &nmsg, sizeof(int)); + d_socket->send(request); + //std::cout << "sent request...\n"; + } + + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll (&items[0], 1, d_timeout); + //std::cout << "rx response...\n"; + + // If we got a reply, process + if (items[0].revents & ZMQ_POLLIN) { + //std::cout << "rx response... got data\n"; + + // Receive data + zmq::message_t msg; + d_socket->recv(&msg); + + //std::cout << "got msg...\n"; + + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + //std::cout << m << "\n"; + message_port_pub(pmt::mp("out"), m); + + } else { + usleep(100); + } + } + } + + int + req_msg_source_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + return noutput_items; + } + + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h new file mode 100644 index 0000000000..3a691743b5 --- /dev/null +++ b/gr-zeromq/lib/req_msg_source_impl.h @@ -0,0 +1,56 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H +#define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H + +#include <gnuradio/zeromq/req_msg_source.h> +#include <zmq.hpp> + +namespace gr { + namespace zeromq { + + class req_msg_source_impl : public req_msg_source + { + private: + int d_timeout; + zmq::context_t *d_context; + zmq::socket_t *d_socket; + void readloop(); + boost::thread *d_thread; + + public: + req_msg_source_impl(char *address, int timeout); + ~req_msg_source_impl(); + + bool start(); + bool stop(); + bool d_finished; + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index 54cd659972..5c5071e15e 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "req_source_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { req_source::sptr - req_source::make(size_t itemsize, size_t vlen, char *address, int timeout) + req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new req_source_impl(itemsize, vlen, address, timeout)); + (new req_source_impl(itemsize, vlen, address, timeout, pass_tags)); } - req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout) + req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("req_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -89,9 +90,23 @@ namespace gr { zmq::message_t reply; d_socket->recv(&reply); + // Deserialize header data / tags + std::string buf(static_cast<char*>(reply.data()), reply.size()); + + if(d_pass_tags){ + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + buf = parse_tag_header(buf, rcv_offset, tags); + for(size_t i=0; i<tags.size(); i++){ + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } + + // Copy to ouput buffer and return - memcpy(out, (void *)reply.data(), reply.size()); - return reply.size()/(d_itemsize*d_vlen); + memcpy(out, (void *)&buf[0], buf.size()); + return buf.size()/(d_itemsize*d_vlen); } return 0; diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h index f61b1d1ce4..7c6bc5310a 100644 --- a/gr-zeromq/lib/req_source_impl.h +++ b/gr-zeromq/lib/req_source_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio. * @@ -37,9 +37,10 @@ namespace gr { int d_timeout; zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout); + req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); ~req_source_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc new file mode 100644 index 0000000000..f7a9bc9c26 --- /dev/null +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -0,0 +1,117 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/io_signature.h> +#include "sub_msg_source_impl.h" +#include "tag_headers.h" + +namespace gr { + namespace zeromq { + + sub_msg_source::sptr + sub_msg_source::make(char *address, int timeout) + { + return gnuradio::get_initial_sptr + (new sub_msg_source_impl(address, timeout)); + } + + sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout) + : gr::sync_block("sub_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) + { + int major, minor, patch; + zmq::version (&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout*1000; + } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); + //int time = 0; + d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + d_socket->connect (address); + + message_port_register_out(pmt::mp("out")); + } + + sub_msg_source_impl::~sub_msg_source_impl() + { + d_socket->close(); + delete d_socket; + delete d_context; + } + + bool sub_msg_source_impl::start(){ + d_finished = false; + d_thread = new boost::thread( boost::bind( &sub_msg_source_impl::readloop , this ) ); + return true; + } + + bool sub_msg_source_impl::stop(){ + d_finished = true; + d_thread->join(); + return true; + } + + void sub_msg_source_impl::readloop(){ + while(!d_finished){ + //std::cout << "readloop\n"; + + zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } }; + zmq::poll (&items[0], 1, d_timeout); + + // If we got a reply, process + if (items[0].revents & ZMQ_POLLIN) { + + // Receive data + zmq::message_t msg; + d_socket->recv(&msg); + + //std::cout << "got msg...\n"; + + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + //std::cout << m << "\n"; + message_port_pub(pmt::mp("out"), m); + + } else { + usleep(100); + } + } + } + + int + sub_msg_source_impl::work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + return noutput_items; + } + + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h new file mode 100644 index 0000000000..424eb470ad --- /dev/null +++ b/gr-zeromq/lib/sub_msg_source_impl.h @@ -0,0 +1,56 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013,2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H +#define INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H + +#include <gnuradio/zeromq/sub_msg_source.h> +#include "zmq.hpp" + +namespace gr { + namespace zeromq { + + class sub_msg_source_impl : public sub_msg_source + { + private: + int d_timeout; // microseconds, -1 is blocking + zmq::context_t *d_context; + zmq::socket_t *d_socket; + void readloop(); + boost::thread *d_thread; + + public: + sub_msg_source_impl(char *address, int timeout); + ~sub_msg_source_impl(); + + bool start(); + bool stop(); + bool d_finished; + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + }; + + } // namespace zeromq +} // namespace gr + +#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 38ddc78e59..813ff5a1c0 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -26,22 +26,23 @@ #include <gnuradio/io_signature.h> #include "sub_source_impl.h" +#include "tag_headers.h" namespace gr { namespace zeromq { sub_source::sptr - sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout) + sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) { return gnuradio::get_initial_sptr - (new sub_source_impl(itemsize, vlen, address, timeout)); + (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags)); } - sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout) + sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags) : gr::sync_block("sub_source", gr::io_signature::make(0, 0, 0), gr::io_signature::make(1, 1, itemsize * vlen)), - d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout) + d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags) { int major, minor, patch; zmq::version (&major, &minor, &patch); @@ -81,16 +82,31 @@ namespace gr { // Receive data zmq::message_t msg; d_socket->recv(&msg); - // Copy to ouput buffer and return - if (msg.size() >= d_itemsize*d_vlen*noutput_items) { - memcpy(out, (void *)msg.data(), d_itemsize*d_vlen*noutput_items); + // Deserialize header data / tags + std::string buf(static_cast<char*>(msg.data()), msg.size()); + + if(d_pass_tags){ + uint64_t rcv_offset; + std::vector<gr::tag_t> tags; + //int olen = buf.size(); + buf = parse_tag_header(buf, rcv_offset, tags); + //std::cout << "SUB: Header Len = " << olen - buf.size() << ", data len = " << buf.size() << "\n"; + for(size_t i=0; i<tags.size(); i++){ + //std::cout << "add item tag ... (offset = " << tags[i].offset << " rcv_offset = " << rcv_offset << " nitems_read(0) = " << nitems_written(0) << "\n"; + tags[i].offset -= rcv_offset - nitems_written(0); + add_item_tag(0, tags[i]); + } + } + + // Copy to ouput buffer and return + if (buf.size() >= d_itemsize*d_vlen*noutput_items) { + memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items); return noutput_items; } else { - memcpy(out, (void *)msg.data(), msg.size()); - - return msg.size()/(d_itemsize*d_vlen); + memcpy(out, (void *)&buf[0], buf.size()); + return buf.size()/(d_itemsize*d_vlen); } } else { diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h index 44647527b1..0fa8d179cd 100644 --- a/gr-zeromq/lib/sub_source_impl.h +++ b/gr-zeromq/lib/sub_source_impl.h @@ -37,9 +37,10 @@ namespace gr { int d_timeout; // microseconds, -1 is blocking zmq::context_t *d_context; zmq::socket_t *d_socket; + bool d_pass_tags; public: - sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout); + sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags); ~sub_source_impl(); int work(int noutput_items, diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc new file mode 100644 index 0000000000..c97066629c --- /dev/null +++ b/gr-zeromq/lib/tag_headers.cc @@ -0,0 +1,101 @@ +/* -*- c++ -*- */ +/* + * Copyright 2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#include <gnuradio/io_signature.h> +#include <gnuradio/block.h> +#include <sstream> +#include <cstring> + +#define GR_HEADER_MAGIC 0x5FF0 +#define GR_HEADER_VERSION 0x01 + +namespace gr { + namespace zeromq { + + std::string + gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags) { + + uint16_t header_magic = GR_HEADER_MAGIC; + uint8_t header_version = GR_HEADER_VERSION; + + std::stringstream ss; + size_t ntags = tags.size(); + ss.write( reinterpret_cast< const char* >( &header_magic ), sizeof(uint16_t) ); + ss.write( reinterpret_cast< const char* >( &header_version ), sizeof(uint8_t) ); + + ss.write( reinterpret_cast< const char* >( &offset ), sizeof(uint64_t) ); // offset + ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags + std::stringbuf sb(""); + //std::cout << "TX TAGS: (offset="<<offset<<" ntags="<<ntags<<")\n"; + for(size_t i=0; i<tags.size(); i++){ + //std::cout << "TX TAG: (" << tags[i].offset << ", " << tags[i].key << ", " << tags[i].value << ", " << tags[i].srcid << ")\n"; + ss.write( reinterpret_cast< const char* >( &tags[i].offset ), sizeof(uint64_t) ); // offset + sb.str(""); + pmt::serialize( tags[i].key, sb ); // key + pmt::serialize( tags[i].value, sb ); // value + pmt::serialize( tags[i].srcid, sb ); // srcid + ss.write( sb.str().c_str() , sb.str().length() ); + } + + return ss.str(); + } + + std::string + parse_tag_header(std::string &buf_in, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out) { + + std::istringstream iss( buf_in ); + size_t rcv_ntags; + + uint16_t header_magic; + uint8_t header_version; + + iss.read( (char*)&header_magic, sizeof(uint16_t ) ); + iss.read( (char*)&header_version, sizeof(uint8_t ) ); + if(header_magic != GR_HEADER_MAGIC){ + throw std::runtime_error("gr header magic does not match!"); + } + if(header_version != 1){ + throw std::runtime_error("gr header version too high!"); + } + + iss.read( (char*)&offset_out, sizeof(uint64_t ) ); + iss.read( (char*)&rcv_ntags, sizeof(size_t ) ); + //std::cout << "RX TAGS: (offset="<<offset_out<<" ntags="<<rcv_ntags<<")\n"; + int rd_offset = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(size_t); + std::stringbuf sb( iss.str().substr(rd_offset) ); + + for(size_t i=0; i<rcv_ntags; i++){ + gr::tag_t newtag; + sb.sgetn( (char*) &(newtag.offset), sizeof(uint64_t) ); + newtag.key = pmt::deserialize( sb ); + newtag.value = pmt::deserialize( sb ); + newtag.srcid = pmt::deserialize( sb ); + //std::cout << "RX TAG: (" << newtag.offset << ", " << newtag.key << ", " << newtag.value << ", " << newtag.srcid << ")\n"; + tags_out.push_back(newtag); + iss.str(sb.str()); + } + + int ndata = sb.in_avail(); + return iss.str().substr(iss.str().size() - ndata); + } + } /* namespace zeromq */ +} /* namespace gr */ diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h new file mode 100644 index 0000000000..4c7a81238e --- /dev/null +++ b/gr-zeromq/lib/tag_headers.h @@ -0,0 +1,40 @@ +/* -*- c++ -*- */ +/* + * Copyright 2014 Free Software Foundation, Inc. + * + * This file is part of GNU Radio. + * + * This is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this software; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef ZEROMQ_TAG_HEADERS_H +#define ZEROMQ_TAG_HEADERS_H + +#include <gnuradio/io_signature.h> +#include <gnuradio/block.h> +#include <sstream> +#include <cstring> + +namespace gr { + namespace zeromq { + + std::string gen_tag_header(uint64_t &offset, std::vector<gr::tag_t> &tags); + std::string parse_tag_header(std::string &buf_in, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out); + + } /* namespace zeromq */ +} /* namespace gr */ + +#endif /* ZEROMQ_TAG_HEADERS_H */ diff --git a/gr-zeromq/swig/zeromq_swig.i b/gr-zeromq/swig/zeromq_swig.i index bcd90550d4..78669bdc69 100644 --- a/gr-zeromq/swig/zeromq_swig.i +++ b/gr-zeromq/swig/zeromq_swig.i @@ -29,23 +29,41 @@ %{ #include "gnuradio/zeromq/pub_sink.h" +#include "gnuradio/zeromq/pub_msg_sink.h" #include "gnuradio/zeromq/push_sink.h" +#include "gnuradio/zeromq/push_msg_sink.h" #include "gnuradio/zeromq/rep_sink.h" +#include "gnuradio/zeromq/rep_msg_sink.h" #include "gnuradio/zeromq/sub_source.h" +#include "gnuradio/zeromq/sub_msg_source.h" #include "gnuradio/zeromq/pull_source.h" +#include "gnuradio/zeromq/pull_msg_source.h" #include "gnuradio/zeromq/req_source.h" +#include "gnuradio/zeromq/req_msg_source.h" %} %include "gnuradio/zeromq/pub_sink.h" +%include "gnuradio/zeromq/pub_msg_sink.h" %include "gnuradio/zeromq/push_sink.h" +%include "gnuradio/zeromq/push_msg_sink.h" %include "gnuradio/zeromq/rep_sink.h" +%include "gnuradio/zeromq/rep_msg_sink.h" %include "gnuradio/zeromq/sub_source.h" +%include "gnuradio/zeromq/sub_msg_source.h" %include "gnuradio/zeromq/pull_source.h" +%include "gnuradio/zeromq/pull_msg_source.h" %include "gnuradio/zeromq/req_source.h" +%include "gnuradio/zeromq/req_msg_source.h" GR_SWIG_BLOCK_MAGIC2(zeromq, pub_sink); +GR_SWIG_BLOCK_MAGIC2(zeromq, pub_msg_sink); GR_SWIG_BLOCK_MAGIC2(zeromq, push_sink); +GR_SWIG_BLOCK_MAGIC2(zeromq, push_msg_sink); GR_SWIG_BLOCK_MAGIC2(zeromq, rep_sink); +GR_SWIG_BLOCK_MAGIC2(zeromq, rep_msg_sink); GR_SWIG_BLOCK_MAGIC2(zeromq, sub_source); +GR_SWIG_BLOCK_MAGIC2(zeromq, sub_msg_source); GR_SWIG_BLOCK_MAGIC2(zeromq, pull_source); +GR_SWIG_BLOCK_MAGIC2(zeromq, pull_msg_source); GR_SWIG_BLOCK_MAGIC2(zeromq, req_source); +GR_SWIG_BLOCK_MAGIC2(zeromq, req_msg_source); |