summaryrefslogtreecommitdiff
path: root/gr-blocks
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks')
-rw-r--r--gr-blocks/grc/blocks_block_tree.xml1
-rw-r--r--gr-blocks/grc/blocks_tcp_server_sink.xml77
-rw-r--r--gr-blocks/include/gnuradio/blocks/CMakeLists.txt1
-rw-r--r--gr-blocks/include/gnuradio/blocks/tcp_server_sink.h66
-rw-r--r--gr-blocks/lib/CMakeLists.txt1
-rw-r--r--gr-blocks/lib/tcp_server_sink_impl.cc161
-rw-r--r--gr-blocks/lib/tcp_server_sink_impl.h73
-rw-r--r--gr-blocks/python/blocks/qa_tcp_server_sink.py87
-rw-r--r--gr-blocks/swig/blocks_swig5.i3
9 files changed, 470 insertions, 0 deletions
diff --git a/gr-blocks/grc/blocks_block_tree.xml b/gr-blocks/grc/blocks_block_tree.xml
index c4857f3b3a..644773f686 100644
--- a/gr-blocks/grc/blocks_block_tree.xml
+++ b/gr-blocks/grc/blocks_block_tree.xml
@@ -158,6 +158,7 @@
<name>Networking Tools</name>
<block>blocks_tuntap_pdu</block>
<block>blocks_socket_pdu</block>
+ <block>blocks_tcp_server_sink</block>
<block>blocks_udp_source</block>
<block>blocks_udp_sink</block>
</cat>
diff --git a/gr-blocks/grc/blocks_tcp_server_sink.xml b/gr-blocks/grc/blocks_tcp_server_sink.xml
new file mode 100644
index 0000000000..644027da65
--- /dev/null
+++ b/gr-blocks/grc/blocks_tcp_server_sink.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+##TCP Server Sink
+###################################################
+ -->
+<block>
+ <name>TCP Server Sink</name>
+ <key>blocks_tcp_server_sink</key>
+ <import>from gnuradio import blocks</import>
+ <make>blocks.tcp_server_sink($type.size*$vlen, $ipaddr, $port, $noblock)</make>
+ <param>
+ <name>Input Type</name>
+ <key>type</key>
+ <type>enum</type>
+ <option>
+ <name>Complex</name>
+ <key>complex</key>
+ <opt>size:gr.sizeof_gr_complex</opt>
+ </option>
+ <option>
+ <name>Float</name>
+ <key>float</key>
+ <opt>size:gr.sizeof_float</opt>
+ </option>
+ <option>
+ <name>Int</name>
+ <key>int</key>
+ <opt>size:gr.sizeof_int</opt>
+ </option>
+ <option>
+ <name>Short</name>
+ <key>short</key>
+ <opt>size:gr.sizeof_short</opt>
+ </option>
+ <option>
+ <name>Byte</name>
+ <key>byte</key>
+ <opt>size:gr.sizeof_char</opt>
+ </option>
+ </param>
+ <param>
+ <name>Destination IP Address</name>
+ <key>ipaddr</key>
+ <type>string</type>
+ </param>
+ <param>
+ <name>Destination Port</name>
+ <key>port</key>
+ <type>int</type>
+ </param>
+ <param>
+ <name>Nonblocking Mode</name>
+ <key>noblock</key>
+ <type>enum</type>
+ <option>
+ <name>On</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>Off</name>
+ <key>False</key>
+ </option>
+ </param>
+ <param>
+ <name>Vec Length</name>
+ <key>vlen</key>
+ <value>1</value>
+ <type>int</type>
+ </param>
+ <check>$vlen &gt; 0</check>
+ <sink>
+ <name>in</name>
+ <type>$type</type>
+ <vlen>$vlen</vlen>
+ </sink>
+</block>
diff --git a/gr-blocks/include/gnuradio/blocks/CMakeLists.txt b/gr-blocks/include/gnuradio/blocks/CMakeLists.txt
index 38c79d60f3..6b3eca69d3 100644
--- a/gr-blocks/include/gnuradio/blocks/CMakeLists.txt
+++ b/gr-blocks/include/gnuradio/blocks/CMakeLists.txt
@@ -174,6 +174,7 @@ install(FILES
tagged_stream_multiply_length.h
tagged_stream_to_pdu.h
tags_strobe.h
+ tcp_server_sink.h
test_tag_variable_rate_ff.h
threshold_ff.h
throttle.h
diff --git a/gr-blocks/include/gnuradio/blocks/tcp_server_sink.h b/gr-blocks/include/gnuradio/blocks/tcp_server_sink.h
new file mode 100644
index 0000000000..6d951a8e67
--- /dev/null
+++ b/gr-blocks/include/gnuradio/blocks/tcp_server_sink.h
@@ -0,0 +1,66 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_BLOCKS_TCP_SERVER_SINK_H
+#define INCLUDED_BLOCKS_TCP_SERVER_SINK_H
+
+#include <gnuradio/blocks/api.h>
+#include <gnuradio/sync_block.h>
+
+namespace gr {
+ namespace blocks {
+
+ /*!
+ * \brief Send stream trought an TCP socket.
+ * \ingroup networking_tools_blk
+ *
+ * \details
+ * Listen for incomming TCP connection(s). Duplicate data for each
+ * opened connection. Optionaly can wait until first client connects
+ * before streaming starts.
+ */
+ class BLOCKS_API tcp_server_sink : virtual public gr::sync_block
+ {
+ public:
+ // gr::blocks::tcp_server_sink::sptr
+ typedef boost::shared_ptr<tcp_server_sink> sptr;
+
+ /*!
+ * \brief TCP Server Sink Constructor
+ *
+ * \param itemsize The size (in bytes) of the item datatype
+ * \param host The name or IP address of interface to bind to.
+ * \param port Port where to listen.
+ * \param noblock If false, wait until first client connects before
+ * streaming starts. In non blocking mode
+ * (noblock=true), drop data onto floor if no client
+ * is connected.
+ */
+ static sptr make(size_t itemsize,
+ const std::string &host, int port,
+ bool noblock = false);
+ };
+
+ } /* namespace blocks */
+} /* namespace gr */
+
+#endif /* INCLUDED_BLOCKS_TCP_SERVER_SINK_H */
diff --git a/gr-blocks/lib/CMakeLists.txt b/gr-blocks/lib/CMakeLists.txt
index 643190c2ee..1d69f27a1d 100644
--- a/gr-blocks/lib/CMakeLists.txt
+++ b/gr-blocks/lib/CMakeLists.txt
@@ -199,6 +199,7 @@ list(APPEND gr_blocks_sources
throttle_impl.cc
transcendental_impl.cc
tcp_connection.cc
+ tcp_server_sink_impl.cc
tuntap_pdu_impl.cc
tag_gate_impl.cc
tagged_stream_align_impl.cc
diff --git a/gr-blocks/lib/tcp_server_sink_impl.cc b/gr-blocks/lib/tcp_server_sink_impl.cc
new file mode 100644
index 0000000000..329e798ca8
--- /dev/null
+++ b/gr-blocks/lib/tcp_server_sink_impl.cc
@@ -0,0 +1,161 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007-2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "tcp_server_sink_impl.h"
+#include <gnuradio/io_signature.h>
+#include <algorithm>
+#include <boost/array.hpp>
+#include <boost/asio.hpp>
+#include <boost/format.hpp>
+#include <gnuradio/thread/thread.h>
+#include <stdexcept>
+#include <stdio.h>
+#include <string.h>
+
+namespace gr {
+ namespace blocks {
+
+ tcp_server_sink::sptr
+ tcp_server_sink::make(size_t itemsize,
+ const std::string &host, int port,
+ bool noblock)
+ {
+ return gnuradio::get_initial_sptr
+ (new tcp_server_sink_impl(itemsize, host, port, noblock));
+ }
+
+ tcp_server_sink_impl::tcp_server_sink_impl(size_t itemsize,
+ const std::string &host, int port,
+ bool noblock)
+ : sync_block("tcp_server_sink",
+ io_signature::make(1, 1, itemsize),
+ io_signature::make(0, 0, 0)),
+ d_itemsize(itemsize),
+ d_acceptor(d_io_service),
+ d_buf(new uint8_t[BUF_SIZE]),
+ d_writing(0)
+ {
+ std::string s_port = (boost::format("%d") % port).str();
+ std::string s_host = host.empty() ? std::string("localhost") : host;
+ boost::asio::ip::tcp::resolver resolver(d_io_service);
+ boost::asio::ip::tcp::resolver::query query(s_host, s_port,
+ boost::asio::ip::resolver_query_base::passive);
+ d_endpoint = *resolver.resolve(query);
+
+ d_acceptor.open(d_endpoint.protocol());
+ d_acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+ d_acceptor.bind(d_endpoint);
+ d_acceptor.listen();
+
+ if (!noblock) {
+ d_socket.reset(new boost::asio::ip::tcp::socket(d_io_service));
+ d_acceptor.accept(*d_socket, d_endpoint);
+ d_sockets.insert(d_socket.release());
+ }
+
+ d_socket.reset(new boost::asio::ip::tcp::socket(d_io_service));
+ d_acceptor.async_accept(*d_socket, boost::bind(&tcp_server_sink_impl::do_accept,
+ this, boost::asio::placeholders::error));
+ d_io_serv_thread = boost::thread(
+ boost::bind(&boost::asio::io_service::run, &d_io_service));
+ }
+
+ void
+ tcp_server_sink_impl::do_accept(const boost::system::error_code& error)
+ {
+ if (!error) {
+ gr::thread::scoped_lock guard(d_writing_mut);
+ d_sockets.insert(d_socket.release());
+ d_socket.reset(new boost::asio::ip::tcp::socket(d_io_service));
+ d_acceptor.async_accept(*d_socket, boost::bind(&tcp_server_sink_impl::do_accept,
+ this, boost::asio::placeholders::error));
+ }
+ }
+
+ void
+ tcp_server_sink_impl::do_write(const boost::system::error_code& error,
+ size_t len, std::set<boost::asio::ip::tcp::socket *>::iterator i)
+ {
+ {
+ gr::thread::scoped_lock guard(d_writing_mut);
+ --d_writing;
+ if (error) {
+ delete *i;
+ d_sockets.erase(i);
+ }
+ }
+ d_writing_cond.notify_one();
+ }
+
+ tcp_server_sink_impl::~tcp_server_sink_impl()
+ {
+ gr::thread::scoped_lock guard(d_writing_mut);
+ while (d_writing) {
+ d_writing_cond.wait(guard);
+ }
+
+ for (std::set<boost::asio::ip::tcp::socket *>::iterator i = d_sockets.begin();
+ i != d_sockets.end(); ++i ) {
+ delete *i;
+ }
+ d_sockets.clear();
+
+ d_io_service.reset();
+ d_io_service.stop();
+ d_io_serv_thread.join();
+ }
+
+ int
+ tcp_server_sink_impl::work (int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ const char *in = (const char *) input_items[0];
+
+ gr::thread::scoped_lock guard(d_writing_mut);
+ while (d_writing) {
+ d_writing_cond.wait(guard);
+ }
+
+ size_t data_len = std::min(size_t(BUF_SIZE), noutput_items * d_itemsize);
+ data_len -= data_len % d_itemsize;
+ memcpy(d_buf.get(), in, data_len);
+ for (std::set<boost::asio::ip::tcp::socket *>::iterator i = d_sockets.begin();
+ i != d_sockets.end(); ++i ) {
+ boost::asio::async_write(**i, boost::asio::buffer(d_buf.get(), data_len),
+ boost::bind(&tcp_server_sink_impl::do_write, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred,
+ i));
+ }
+ d_writing = d_sockets.size();
+
+ return data_len / d_itemsize;
+ }
+
+ } /* namespace blocks */
+} /* namespace gr */
+
diff --git a/gr-blocks/lib/tcp_server_sink_impl.h b/gr-blocks/lib/tcp_server_sink_impl.h
new file mode 100644
index 0000000000..d10f3b95b8
--- /dev/null
+++ b/gr-blocks/lib/tcp_server_sink_impl.h
@@ -0,0 +1,73 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2014 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_TCP_SERVER_SINK_IMPL_H
+#define INCLUDED_GR_TCP_SERVER_SINK_IMPL_H
+
+#include <gnuradio/blocks/tcp_server_sink.h>
+#include <boost/asio.hpp>
+#include <set>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+namespace gr {
+ namespace blocks {
+
+ class tcp_server_sink_impl : public tcp_server_sink
+ {
+ private:
+ size_t d_itemsize;
+
+ boost::asio::io_service d_io_service;
+ gr::thread::thread d_io_serv_thread;
+ boost::asio::ip::tcp::endpoint d_endpoint;
+ std::auto_ptr<boost::asio::ip::tcp::socket> d_socket;
+ std::set<boost::asio::ip::tcp::socket *> d_sockets;
+ boost::asio::ip::tcp::acceptor d_acceptor;
+
+ boost::shared_ptr<uint8_t> d_buf;
+ enum {
+ BUF_SIZE = 256 * 1024,
+ };
+
+ int d_writing;
+ boost::condition_variable d_writing_cond;
+ boost::mutex d_writing_mut;
+
+ void do_accept(const boost::system::error_code& error);
+ void do_write(const boost::system::error_code& error, std::size_t len,
+ std::set<boost::asio::ip::tcp::socket *>::iterator);
+
+ public:
+ tcp_server_sink_impl(size_t itemsize,
+ const std::string &host, int port,
+ bool noblock);
+ ~tcp_server_sink_impl();
+
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ };
+
+ } /* namespace blocks */
+} /* namespace gr */
+
+#endif /* INCLUDED_GR_TCP_SERVER_SINK_IMPL_H */
diff --git a/gr-blocks/python/blocks/qa_tcp_server_sink.py b/gr-blocks/python/blocks/qa_tcp_server_sink.py
new file mode 100644
index 0000000000..f7d3a0af92
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_tcp_server_sink.py
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest, blocks
+import os
+import socket
+from time import sleep
+
+from threading import Timer
+from multiprocessing import Process
+
+class test_tcp_sink(gr_unittest.TestCase):
+
+ def setUp(self):
+ os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
+ self.tb_snd = gr.top_block()
+ self.tb_rcv = gr.top_block()
+
+ def tearDown(self):
+ self.tb_rcv = None
+ self.tb_snd = None
+
+ def _tcp_client(self):
+ dst = blocks.vector_sink_s()
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ for t in (0, 0.2):
+# wait until server listens
+ sleep(t)
+ try:
+ sock.connect((self.addr, self.port))
+ except socket.error as e:
+ if e.errno != 111:
+ raise
+ continue
+ break
+ fd = os.dup(sock.fileno())
+ self.tb_rcv.connect(blocks.file_descriptor_source(self.itemsize, fd), dst)
+ self.tb_rcv.run()
+ self.assertEqual(self.data, dst.data())
+
+ def test_001(self):
+ self.addr = '127.0.0.1'
+ self.port = 65510
+ self.itemsize = gr.sizeof_short
+ n_data = 16
+ self.data = tuple([x for x in range(n_data)])
+
+# tcp_server_sink blocks until client does not connect, start client process first
+ p = Process(target=self._tcp_client)
+ p.start()
+
+ src = blocks.vector_source_s(self.data, False)
+ tcp_snd = blocks.tcp_server_sink(self.itemsize, self.addr, self.port, False)
+ self.tb_snd.connect(src, tcp_snd)
+
+ self.tb_snd.run()
+ del tcp_snd
+ self.tb_snd = None
+ p.join()
+
+ def stop_rcv(self):
+ self.timeout = True
+ self.tb_rcv.stop()
+ #print "tb_rcv stopped by Timer"
+
+if __name__ == '__main__':
+ gr_unittest.run(test_tcp_sink, "test_tcp_server_sink.xml")
+
diff --git a/gr-blocks/swig/blocks_swig5.i b/gr-blocks/swig/blocks_swig5.i
index 51601a8c30..761b0a855a 100644
--- a/gr-blocks/swig/blocks_swig5.i
+++ b/gr-blocks/swig/blocks_swig5.i
@@ -56,6 +56,7 @@
#include "gnuradio/blocks/tagged_stream_multiply_length.h"
#include "gnuradio/blocks/tagged_stream_to_pdu.h"
#include "gnuradio/blocks/tags_strobe.h"
+#include "gnuradio/blocks/tcp_server_sink.h"
#include "gnuradio/blocks/test_tag_variable_rate_ff.h"
#include "gnuradio/blocks/threshold_ff.h"
#include "gnuradio/blocks/transcendental.h"
@@ -100,6 +101,7 @@
%include "gnuradio/blocks/tagged_stream_multiply_length.h"
%include "gnuradio/blocks/tagged_stream_to_pdu.h"
%include "gnuradio/blocks/tags_strobe.h"
+%include "gnuradio/blocks/tcp_server_sink.h"
%include "gnuradio/blocks/test_tag_variable_rate_ff.h"
%include "gnuradio/blocks/threshold_ff.h"
%include "gnuradio/blocks/transcendental.h"
@@ -143,6 +145,7 @@ GR_SWIG_BLOCK_MAGIC2(blocks, tagged_stream_mux);
GR_SWIG_BLOCK_MAGIC2(blocks, tagged_stream_multiply_length);
GR_SWIG_BLOCK_MAGIC2(blocks, tagged_stream_to_pdu);
GR_SWIG_BLOCK_MAGIC2(blocks, tags_strobe);
+GR_SWIG_BLOCK_MAGIC2(blocks, tcp_server_sink);
GR_SWIG_BLOCK_MAGIC2(blocks, test_tag_variable_rate_ff);
GR_SWIG_BLOCK_MAGIC2(blocks, threshold_ff);
GR_SWIG_BLOCK_MAGIC2(blocks, transcendental);