summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTom Rondeau <trondeau@vt.edu>2013-03-11 23:43:28 -0400
committerTom Rondeau <trondeau@vt.edu>2013-03-13 12:51:05 -0400
commitb2c2058921198e49e82301e32733b2da041521b1 (patch)
tree12abfea4c2f3fa974822f4fe1aa3a8c67abf2fcb
parent45f9c0a5792526279ebf5ffa6a4853c7e2de6dd6 (diff)
blocks: udp_sink and udp_source implemented in gr-blocks using boost::asio instead of sockets.
Needs testing on non-Linux OSes.
-rw-r--r--gr-blocks/grc/blocks_block_tree.xml2
-rw-r--r--gr-blocks/grc/blocks_udp_sink.xml77
-rw-r--r--gr-blocks/include/blocks/CMakeLists.txt2
-rw-r--r--gr-blocks/include/blocks/udp_sink.h81
-rw-r--r--gr-blocks/lib/CMakeLists.txt2
-rw-r--r--gr-blocks/lib/udp_sink_impl.cc145
-rw-r--r--gr-blocks/lib/udp_sink_impl.h65
-rw-r--r--gr-blocks/python/qa_udp_source_sink.py121
-rw-r--r--gr-blocks/swig/blocks_swig.i6
9 files changed, 501 insertions, 0 deletions
diff --git a/gr-blocks/grc/blocks_block_tree.xml b/gr-blocks/grc/blocks_block_tree.xml
index ec48f24cd3..5e2c5c8afc 100644
--- a/gr-blocks/grc/blocks_block_tree.xml
+++ b/gr-blocks/grc/blocks_block_tree.xml
@@ -36,6 +36,7 @@
<block>blocks_pdu_to_tagged_stream</block>
<block>blocks_message_source</block>
<block>blocks_message_burst_source</block>
+ <block>blocks_udp_source</block>
<block>blocks_wavfile_source</block>
</cat>
<cat>
@@ -47,6 +48,7 @@
<block>blocks_tag_debug</block>
<block>blocks_message_sink</block>
<block>blocks_tagged_file_sink</block>
+ <block>blocks_udp_sink</block>
<block>blocks_wavfile_sink</block>
</cat>
<cat>
diff --git a/gr-blocks/grc/blocks_udp_sink.xml b/gr-blocks/grc/blocks_udp_sink.xml
new file mode 100644
index 0000000000..a001b2f1bb
--- /dev/null
+++ b/gr-blocks/grc/blocks_udp_sink.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+##UDP Sink
+###################################################
+ -->
+<block>
+ <name>UDP Sink</name>
+ <key>blocks_udp_sink</key>
+ <import>from gnuradio import blocks</import>
+ <make>blocks.udp_sink($type.size*$vlen, $ipaddr, $port, $psize, $eof)</make>
+ <callback>set_mtu($mtu)</callback>
+ <param>
+ <name>Input Type</name>
+ <key>type</key>
+ <type>enum</type>
+ <option>
+ <name>Complex</name>
+ <key>complex</key>
+ <opt>size:gr.sizeof_gr_complex</opt>
+ </option>
+ <option>
+ <name>Float</name>
+ <key>float</key>
+ <opt>size:gr.sizeof_float</opt>
+ </option>
+ <option>
+ <name>Int</name>
+ <key>int</key>
+ <opt>size:gr.sizeof_int</opt>
+ </option>
+ <option>
+ <name>Short</name>
+ <key>short</key>
+ <opt>size:gr.sizeof_short</opt>
+ </option>
+ <option>
+ <name>Byte</name>
+ <key>byte</key>
+ <opt>size:gr.sizeof_char</opt>
+ </option>
+ </param>
+ <param>
+ <name>Destination IP Address</name>
+ <key>ipaddr</key>
+ <type>string</type>
+ </param>
+ <param>
+ <name>Destination Port</name>
+ <key>port</key>
+ <type>int</type>
+ </param>
+ <param>
+ <name>Payload Size</name>
+ <key>psize</key>
+ <value>1472</value>
+ <type>int</type>
+ </param>
+ <param>
+ <name>Send Null Pkt as EOF</name>
+ <key>eof</key>
+ <value>True</value>
+ <type>bool</type>
+ </param>
+ <param>
+ <name>Vec Length</name>
+ <key>vlen</key>
+ <value>1</value>
+ <type>int</type>
+ </param>
+ <check>$vlen &gt; 0</check>
+ <sink>
+ <name>in</name>
+ <type>$type</type>
+ <vlen>$vlen</vlen>
+ </sink>
+</block>
diff --git a/gr-blocks/include/blocks/CMakeLists.txt b/gr-blocks/include/blocks/CMakeLists.txt
index 16df3cac1a..111feef4f1 100644
--- a/gr-blocks/include/blocks/CMakeLists.txt
+++ b/gr-blocks/include/blocks/CMakeLists.txt
@@ -177,6 +177,8 @@ install(FILES
transcendental.h
tuntap_pdu.h
uchar_to_float.h
+ udp_sink.h
+ udp_source.h
unpack_k_bits_bb.h
vco_f.h
vector_to_stream.h
diff --git a/gr-blocks/include/blocks/udp_sink.h b/gr-blocks/include/blocks/udp_sink.h
new file mode 100644
index 0000000000..69c5f0ffae
--- /dev/null
+++ b/gr-blocks/include/blocks/udp_sink.h
@@ -0,0 +1,81 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007-2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_UDP_SINK_H
+#define INCLUDED_GR_UDP_SINK_H
+
+#include <blocks/api.h>
+#include <gr_sync_block.h>
+
+namespace gr {
+ namespace blocks {
+
+ /*!
+ * \brief Write stream to an UDP socket.
+ * \ingroup sink_blk
+ */
+ class BLOCKS_API udp_sink : virtual public gr_sync_block
+ {
+ public:
+ // gr::blocks::udp_sink::sptr
+ typedef boost::shared_ptr<udp_sink> sptr;
+
+ /*!
+ * \brief UDP Sink Constructor
+ *
+ * \param itemsize The size (in bytes) of the item datatype
+ * \param host The name or IP address of the receiving host; use
+ * NULL or None for no connection
+ * \param port Destination port to connect to on receiving host
+ * \param payload_size UDP payload size by default set to
+ * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header))
+ * \param eof Send zero-length packet on disconnect
+ */
+ static sptr make(size_t itemsize,
+ const std::string &host, int port,
+ int payload_size=1472, bool eof=true);
+
+ /*! \brief return the PAYLOAD_SIZE of the socket */
+ virtual int payload_size() = 0;
+
+ /*! \brief Change the connection to a new destination
+ *
+ * \param host The name or IP address of the receiving host; use
+ * NULL or None to break the connection without closing
+ * \param port Destination port to connect to on receiving host
+ *
+ * Calls disconnect() to terminate any current connection first.
+ */
+ virtual void connect(const std::string &host, int port) = 0;
+
+ /*! \brief Send zero-length packet (if eof is requested) then stop sending
+ *
+ * Zero-byte packets can be interpreted as EOF by gr_udp_source.
+ * Note that disconnect occurs automatically when the sink is
+ * destroyed, but not when its top_block stops.*/
+ virtual void disconnect() = 0;
+ };
+
+ } /* namespace blocks */
+} /* namespace gr */
+
+#endif /* INCLUDED_GR_UDP_SINK_H */
diff --git a/gr-blocks/lib/CMakeLists.txt b/gr-blocks/lib/CMakeLists.txt
index 23480fcd10..b65c262989 100644
--- a/gr-blocks/lib/CMakeLists.txt
+++ b/gr-blocks/lib/CMakeLists.txt
@@ -215,6 +215,8 @@ list(APPEND gr_blocks_sources
tuntap_pdu_impl.cc
uchar_array_to_float.cc
uchar_to_float_impl.cc
+ udp_sink_impl.cc
+ udp_source_impl.cc
unpack_k_bits_bb_impl.cc
vco_f_impl.cc
vector_to_stream_impl.cc
diff --git a/gr-blocks/lib/udp_sink_impl.cc b/gr-blocks/lib/udp_sink_impl.cc
new file mode 100644
index 0000000000..500b7c2cd7
--- /dev/null
+++ b/gr-blocks/lib/udp_sink_impl.cc
@@ -0,0 +1,145 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007-2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "udp_sink_impl.h"
+#include <gr_io_signature.h>
+#include <boost/asio.hpp>
+#include <boost/format.hpp>
+#include <gruel/thread.h>
+#include <stdexcept>
+#include <stdio.h>
+#include <string.h>
+
+namespace gr {
+ namespace blocks {
+
+ udp_sink::sptr
+ udp_sink::make(size_t itemsize,
+ const std::string &host, int port,
+ int payload_size, bool eof)
+ {
+ return gnuradio::get_initial_sptr
+ (new udp_sink_impl(itemsize, host, port,
+ payload_size, eof));
+ }
+
+ udp_sink_impl::udp_sink_impl(size_t itemsize,
+ const std::string &host, int port,
+ int payload_size, bool eof)
+ : gr_sync_block("udp_sink",
+ gr_make_io_signature(1, 1, itemsize),
+ gr_make_io_signature(0, 0, 0)),
+ d_itemsize(itemsize), d_payload_size(payload_size), d_eof(eof),
+ d_connected(false)
+ {
+ // Get the destination address
+ connect(host, port);
+ }
+
+ // public constructor that returns a shared_ptr
+ udp_sink_impl::~udp_sink_impl()
+ {
+ if(d_connected)
+ disconnect();
+ }
+
+ void
+ udp_sink_impl::connect(const std::string &host, int port)
+ {
+ if(d_connected)
+ disconnect();
+
+ std::string s_port = (boost::format("%d")%port).str();
+ if(host.size() > 0) {
+ boost::asio::ip::udp::resolver resolver(d_io_service);
+ boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(),
+ host, s_port);
+ d_endpoint = *resolver.resolve(query);
+
+ d_socket = new boost::asio::ip::udp::socket(d_io_service);
+ d_socket->open(boost::asio::ip::udp::v4());
+
+ d_connected = true;
+ }
+ }
+
+ void
+ udp_sink_impl::disconnect()
+ {
+ if(!d_connected)
+ return;
+
+ gruel::scoped_lock guard(d_mutex); // protect d_socket from work()
+
+ // Send a few zero-length packets to signal receiver we are done
+ boost::array<char, 1> send_buf = {{ 0 }};
+ if(d_eof) {
+ int i;
+ for(i = 0; i < 3; i++)
+ d_socket->send_to(boost::asio::buffer(send_buf), d_endpoint);
+ }
+
+ d_socket->close();
+ delete d_socket;
+
+ d_connected = false;
+ }
+
+ int
+ udp_sink_impl::work (int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items)
+ {
+ const char *in = (const char *) input_items[0];
+ ssize_t r=0, bytes_sent=0, bytes_to_send=0;
+ ssize_t total_size = noutput_items*d_itemsize;
+
+ gruel::scoped_lock guard(d_mutex); // protect d_socket
+
+ while(bytes_sent < total_size) {
+ bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent));
+
+ if(d_connected) {
+ try {
+ r = d_socket->send_to(boost::asio::buffer((void*)(in+bytes_sent), bytes_to_send),
+ d_endpoint);
+ }
+ catch(std::exception& e) {
+ GR_LOG_ERROR(d_logger, boost::format("send error: %s") % e.what());
+ return -1;
+ }
+ }
+ else
+ r = bytes_to_send; // discarded for lack of connection
+ bytes_sent += r;
+ }
+
+ return noutput_items;
+ }
+
+ } /* namespace blocks */
+} /* namespace gr */
+
diff --git a/gr-blocks/lib/udp_sink_impl.h b/gr-blocks/lib/udp_sink_impl.h
new file mode 100644
index 0000000000..243d499b60
--- /dev/null
+++ b/gr-blocks/lib/udp_sink_impl.h
@@ -0,0 +1,65 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2007-2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_UDP_SINK_IMPL_H
+#define INCLUDED_GR_UDP_SINK_IMPL_H
+
+#include <blocks/udp_sink.h>
+#include <boost/asio.hpp>
+
+namespace gr {
+ namespace blocks {
+
+ class udp_sink_impl : public udp_sink
+ {
+ private:
+ size_t d_itemsize;
+
+ int d_payload_size; // maximum transmission unit (packet length)
+ bool d_eof; // send zero-length packet on disconnect
+ bool d_connected; // are we connected?
+ gruel::mutex d_mutex; // protects d_socket and d_connected
+
+ boost::asio::ip::udp::socket *d_socket; // handle to socket
+ boost::asio::ip::udp::endpoint d_endpoint;
+ boost::asio::io_service d_io_service;
+
+ public:
+ udp_sink_impl(size_t itemsize,
+ const std::string &host, int port,
+ int payload_size, bool eof);
+ ~udp_sink_impl();
+
+ int payload_size() { return d_payload_size; }
+
+ void connect(const std::string &host, int port);
+ void disconnect();
+
+ int work(int noutput_items,
+ gr_vector_const_void_star &input_items,
+ gr_vector_void_star &output_items);
+ };
+
+ } /* namespace blocks */
+} /* namespace gr */
+
+#endif /* INCLUDED_GR_UDP_SINK_IMPL_H */
diff --git a/gr-blocks/python/qa_udp_source_sink.py b/gr-blocks/python/qa_udp_source_sink.py
new file mode 100644
index 0000000000..6ebbf87afc
--- /dev/null
+++ b/gr-blocks/python/qa_udp_source_sink.py
@@ -0,0 +1,121 @@
+#!/usr/bin/env python
+#
+# Copyright 2008,2010,2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import os
+
+from threading import Timer
+
+class test_udp_sink_source(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb_snd = gr.top_block()
+ self.tb_rcv = gr.top_block()
+
+ def tearDown(self):
+ self.tb_rcv = None
+ self.tb_snd = None
+
+ def test_001(self):
+ # Tests calling disconnect/reconnect.
+
+ port = 65500
+
+ n_data = 16
+ src_data = [x for x in range(n_data)]
+ expected_result = tuple(src_data)
+ src = gr.vector_source_s(src_data, False)
+ udp_snd = blocks.udp_sink(gr.sizeof_short, 'localhost', port)
+ self.tb_snd.connect(src, udp_snd)
+
+ self.tb_snd.run()
+ udp_snd.disconnect()
+
+ udp_snd.connect('localhost', port+1)
+ src.rewind()
+ self.tb_snd.run()
+
+ def test_002(self):
+ port = 65500
+
+ n_data = 100000
+ src_data = [float(x) for x in range(n_data)]
+ expected_result = tuple(src_data)
+ src = gr.vector_source_f(src_data, False)
+ udp_snd = gr.udp_sink(gr.sizeof_float, 'localhost', port)
+ self.tb_snd.connect(src, udp_snd)
+
+ udp_rcv = gr.udp_source(gr.sizeof_float, 'localhost', port)
+ dst = gr.vector_sink_f()
+ self.tb_rcv.connect(udp_rcv, dst)
+
+ self.tb_rcv.start()
+ self.tb_snd.run()
+ udp_snd.disconnect()
+ self.timeout = False
+ q = Timer(2.0,self.stop_rcv)
+ q.start()
+ self.tb_rcv.wait()
+ q.cancel()
+
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+ self.assert_(not self.timeout)
+
+ def test_003(self):
+ udp_rcv = gr.udp_source( gr.sizeof_float, '0.0.0.0', 0, eof=False )
+ rcv_port = udp_rcv.get_port()
+
+ udp_snd = gr.udp_sink( gr.sizeof_float, '127.0.0.1', 65500 )
+ udp_snd.connect( 'localhost', rcv_port )
+
+ n_data = 16
+ src_data = [float(x) for x in range(n_data)]
+ expected_result = tuple(src_data)
+ src = gr.vector_source_f(src_data)
+ dst = gr.vector_sink_f()
+
+ self.tb_snd.connect( src, udp_snd )
+ self.tb_rcv.connect( udp_rcv, dst )
+
+ self.tb_rcv.start()
+ self.tb_snd.run()
+ udp_snd.disconnect()
+ self.timeout = False
+ q = Timer(2.0,self.stop_rcv)
+ q.start()
+ self.tb_rcv.wait()
+ q.cancel()
+
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+ self.assert_(self.timeout) # source ignores EOF?
+
+ def stop_rcv(self):
+ self.timeout = True
+ self.tb_rcv.stop()
+ #print "tb_rcv stopped by Timer"
+
+if __name__ == '__main__':
+ gr_unittest.run(test_udp_sink_source, "test_udp_sink_source.xml")
+
diff --git a/gr-blocks/swig/blocks_swig.i b/gr-blocks/swig/blocks_swig.i
index f6d7f43fde..9d9caa29fc 100644
--- a/gr-blocks/swig/blocks_swig.i
+++ b/gr-blocks/swig/blocks_swig.i
@@ -176,6 +176,8 @@
#include "blocks/transcendental.h"
#include "blocks/tuntap_pdu.h"
#include "blocks/uchar_to_float.h"
+#include "blocks/udp_sink.h"
+#include "blocks/udp_source.h"
#include "blocks/unpack_k_bits_bb.h"
#include "blocks/unpacked_to_packed_bb.h"
#include "blocks/unpacked_to_packed_ss.h"
@@ -335,6 +337,8 @@
%include "blocks/transcendental.h"
%include "blocks/tuntap_pdu.h"
%include "blocks/uchar_to_float.h"
+%include "blocks/udp_sink.h"
+%include "blocks/udp_source.h"
%include "blocks/unpack_k_bits_bb.h"
%include "blocks/unpacked_to_packed_bb.h"
%include "blocks/unpacked_to_packed_ss.h"
@@ -492,6 +496,8 @@ GR_SWIG_BLOCK_MAGIC2(blocks, throttle);
GR_SWIG_BLOCK_MAGIC2(blocks, transcendental);
GR_SWIG_BLOCK_MAGIC2(blocks, tuntap_pdu);
GR_SWIG_BLOCK_MAGIC2(blocks, uchar_to_float);
+GR_SWIG_BLOCK_MAGIC2(blocks, udp_sink);
+GR_SWIG_BLOCK_MAGIC2(blocks, udp_source);
GR_SWIG_BLOCK_MAGIC2(blocks, unpack_k_bits_bb);
GR_SWIG_BLOCK_MAGIC2(blocks, unpacked_to_packed_bb);
GR_SWIG_BLOCK_MAGIC2(blocks, unpacked_to_packed_ss);