diff options
author | Tom Rondeau <trondeau@vt.edu> | 2013-03-11 23:43:28 -0400 |
---|---|---|
committer | Tom Rondeau <trondeau@vt.edu> | 2013-03-13 12:51:05 -0400 |
commit | b2c2058921198e49e82301e32733b2da041521b1 (patch) | |
tree | 12abfea4c2f3fa974822f4fe1aa3a8c67abf2fcb | |
parent | 45f9c0a5792526279ebf5ffa6a4853c7e2de6dd6 (diff) |
blocks: udp_sink and udp_source implemented in gr-blocks using boost::asio instead of sockets.
Needs testing on non-Linux OSes.
-rw-r--r-- | gr-blocks/grc/blocks_block_tree.xml | 2 | ||||
-rw-r--r-- | gr-blocks/grc/blocks_udp_sink.xml | 77 | ||||
-rw-r--r-- | gr-blocks/include/blocks/CMakeLists.txt | 2 | ||||
-rw-r--r-- | gr-blocks/include/blocks/udp_sink.h | 81 | ||||
-rw-r--r-- | gr-blocks/lib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | gr-blocks/lib/udp_sink_impl.cc | 145 | ||||
-rw-r--r-- | gr-blocks/lib/udp_sink_impl.h | 65 | ||||
-rw-r--r-- | gr-blocks/python/qa_udp_source_sink.py | 121 | ||||
-rw-r--r-- | gr-blocks/swig/blocks_swig.i | 6 |
9 files changed, 501 insertions, 0 deletions
diff --git a/gr-blocks/grc/blocks_block_tree.xml b/gr-blocks/grc/blocks_block_tree.xml index ec48f24cd3..5e2c5c8afc 100644 --- a/gr-blocks/grc/blocks_block_tree.xml +++ b/gr-blocks/grc/blocks_block_tree.xml @@ -36,6 +36,7 @@ <block>blocks_pdu_to_tagged_stream</block> <block>blocks_message_source</block> <block>blocks_message_burst_source</block> + <block>blocks_udp_source</block> <block>blocks_wavfile_source</block> </cat> <cat> @@ -47,6 +48,7 @@ <block>blocks_tag_debug</block> <block>blocks_message_sink</block> <block>blocks_tagged_file_sink</block> + <block>blocks_udp_sink</block> <block>blocks_wavfile_sink</block> </cat> <cat> diff --git a/gr-blocks/grc/blocks_udp_sink.xml b/gr-blocks/grc/blocks_udp_sink.xml new file mode 100644 index 0000000000..a001b2f1bb --- /dev/null +++ b/gr-blocks/grc/blocks_udp_sink.xml @@ -0,0 +1,77 @@ +<?xml version="1.0"?> +<!-- +################################################### +##UDP Sink +################################################### + --> +<block> + <name>UDP Sink</name> + <key>blocks_udp_sink</key> + <import>from gnuradio import blocks</import> + <make>blocks.udp_sink($type.size*$vlen, $ipaddr, $port, $psize, $eof)</make> + <callback>set_mtu($mtu)</callback> + <param> + <name>Input Type</name> + <key>type</key> + <type>enum</type> + <option> + <name>Complex</name> + <key>complex</key> + <opt>size:gr.sizeof_gr_complex</opt> + </option> + <option> + <name>Float</name> + <key>float</key> + <opt>size:gr.sizeof_float</opt> + </option> + <option> + <name>Int</name> + <key>int</key> + <opt>size:gr.sizeof_int</opt> + </option> + <option> + <name>Short</name> + <key>short</key> + <opt>size:gr.sizeof_short</opt> + </option> + <option> + <name>Byte</name> + <key>byte</key> + <opt>size:gr.sizeof_char</opt> + </option> + </param> + <param> + <name>Destination IP Address</name> + <key>ipaddr</key> + <type>string</type> + </param> + <param> + <name>Destination Port</name> + <key>port</key> + <type>int</type> + </param> + <param> + <name>Payload Size</name> + <key>psize</key> + <value>1472</value> + <type>int</type> + </param> + <param> + <name>Send Null Pkt as EOF</name> + <key>eof</key> + <value>True</value> + <type>bool</type> + </param> + <param> + <name>Vec Length</name> + <key>vlen</key> + <value>1</value> + <type>int</type> + </param> + <check>$vlen > 0</check> + <sink> + <name>in</name> + <type>$type</type> + <vlen>$vlen</vlen> + </sink> +</block> diff --git a/gr-blocks/include/blocks/CMakeLists.txt b/gr-blocks/include/blocks/CMakeLists.txt index 16df3cac1a..111feef4f1 100644 --- a/gr-blocks/include/blocks/CMakeLists.txt +++ b/gr-blocks/include/blocks/CMakeLists.txt @@ -177,6 +177,8 @@ install(FILES transcendental.h tuntap_pdu.h uchar_to_float.h + udp_sink.h + udp_source.h unpack_k_bits_bb.h vco_f.h vector_to_stream.h diff --git a/gr-blocks/include/blocks/udp_sink.h b/gr-blocks/include/blocks/udp_sink.h new file mode 100644 index 0000000000..69c5f0ffae --- /dev/null +++ b/gr-blocks/include/blocks/udp_sink.h @@ -0,0 +1,81 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007-2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_UDP_SINK_H +#define INCLUDED_GR_UDP_SINK_H + +#include <blocks/api.h> +#include <gr_sync_block.h> + +namespace gr { + namespace blocks { + + /*! + * \brief Write stream to an UDP socket. + * \ingroup sink_blk + */ + class BLOCKS_API udp_sink : virtual public gr_sync_block + { + public: + // gr::blocks::udp_sink::sptr + typedef boost::shared_ptr<udp_sink> sptr; + + /*! + * \brief UDP Sink Constructor + * + * \param itemsize The size (in bytes) of the item datatype + * \param host The name or IP address of the receiving host; use + * NULL or None for no connection + * \param port Destination port to connect to on receiving host + * \param payload_size UDP payload size by default set to + * 1472 = (1500 MTU - (8 byte UDP header) - (20 byte IP header)) + * \param eof Send zero-length packet on disconnect + */ + static sptr make(size_t itemsize, + const std::string &host, int port, + int payload_size=1472, bool eof=true); + + /*! \brief return the PAYLOAD_SIZE of the socket */ + virtual int payload_size() = 0; + + /*! \brief Change the connection to a new destination + * + * \param host The name or IP address of the receiving host; use + * NULL or None to break the connection without closing + * \param port Destination port to connect to on receiving host + * + * Calls disconnect() to terminate any current connection first. + */ + virtual void connect(const std::string &host, int port) = 0; + + /*! \brief Send zero-length packet (if eof is requested) then stop sending + * + * Zero-byte packets can be interpreted as EOF by gr_udp_source. + * Note that disconnect occurs automatically when the sink is + * destroyed, but not when its top_block stops.*/ + virtual void disconnect() = 0; + }; + + } /* namespace blocks */ +} /* namespace gr */ + +#endif /* INCLUDED_GR_UDP_SINK_H */ diff --git a/gr-blocks/lib/CMakeLists.txt b/gr-blocks/lib/CMakeLists.txt index 23480fcd10..b65c262989 100644 --- a/gr-blocks/lib/CMakeLists.txt +++ b/gr-blocks/lib/CMakeLists.txt @@ -215,6 +215,8 @@ list(APPEND gr_blocks_sources tuntap_pdu_impl.cc uchar_array_to_float.cc uchar_to_float_impl.cc + udp_sink_impl.cc + udp_source_impl.cc unpack_k_bits_bb_impl.cc vco_f_impl.cc vector_to_stream_impl.cc diff --git a/gr-blocks/lib/udp_sink_impl.cc b/gr-blocks/lib/udp_sink_impl.cc new file mode 100644 index 0000000000..500b7c2cd7 --- /dev/null +++ b/gr-blocks/lib/udp_sink_impl.cc @@ -0,0 +1,145 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007-2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "udp_sink_impl.h" +#include <gr_io_signature.h> +#include <boost/asio.hpp> +#include <boost/format.hpp> +#include <gruel/thread.h> +#include <stdexcept> +#include <stdio.h> +#include <string.h> + +namespace gr { + namespace blocks { + + udp_sink::sptr + udp_sink::make(size_t itemsize, + const std::string &host, int port, + int payload_size, bool eof) + { + return gnuradio::get_initial_sptr + (new udp_sink_impl(itemsize, host, port, + payload_size, eof)); + } + + udp_sink_impl::udp_sink_impl(size_t itemsize, + const std::string &host, int port, + int payload_size, bool eof) + : gr_sync_block("udp_sink", + gr_make_io_signature(1, 1, itemsize), + gr_make_io_signature(0, 0, 0)), + d_itemsize(itemsize), d_payload_size(payload_size), d_eof(eof), + d_connected(false) + { + // Get the destination address + connect(host, port); + } + + // public constructor that returns a shared_ptr + udp_sink_impl::~udp_sink_impl() + { + if(d_connected) + disconnect(); + } + + void + udp_sink_impl::connect(const std::string &host, int port) + { + if(d_connected) + disconnect(); + + std::string s_port = (boost::format("%d")%port).str(); + if(host.size() > 0) { + boost::asio::ip::udp::resolver resolver(d_io_service); + boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), + host, s_port); + d_endpoint = *resolver.resolve(query); + + d_socket = new boost::asio::ip::udp::socket(d_io_service); + d_socket->open(boost::asio::ip::udp::v4()); + + d_connected = true; + } + } + + void + udp_sink_impl::disconnect() + { + if(!d_connected) + return; + + gruel::scoped_lock guard(d_mutex); // protect d_socket from work() + + // Send a few zero-length packets to signal receiver we are done + boost::array<char, 1> send_buf = {{ 0 }}; + if(d_eof) { + int i; + for(i = 0; i < 3; i++) + d_socket->send_to(boost::asio::buffer(send_buf), d_endpoint); + } + + d_socket->close(); + delete d_socket; + + d_connected = false; + } + + int + udp_sink_impl::work (int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + const char *in = (const char *) input_items[0]; + ssize_t r=0, bytes_sent=0, bytes_to_send=0; + ssize_t total_size = noutput_items*d_itemsize; + + gruel::scoped_lock guard(d_mutex); // protect d_socket + + while(bytes_sent < total_size) { + bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent)); + + if(d_connected) { + try { + r = d_socket->send_to(boost::asio::buffer((void*)(in+bytes_sent), bytes_to_send), + d_endpoint); + } + catch(std::exception& e) { + GR_LOG_ERROR(d_logger, boost::format("send error: %s") % e.what()); + return -1; + } + } + else + r = bytes_to_send; // discarded for lack of connection + bytes_sent += r; + } + + return noutput_items; + } + + } /* namespace blocks */ +} /* namespace gr */ + diff --git a/gr-blocks/lib/udp_sink_impl.h b/gr-blocks/lib/udp_sink_impl.h new file mode 100644 index 0000000000..243d499b60 --- /dev/null +++ b/gr-blocks/lib/udp_sink_impl.h @@ -0,0 +1,65 @@ +/* -*- c++ -*- */ +/* + * Copyright 2007-2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_UDP_SINK_IMPL_H +#define INCLUDED_GR_UDP_SINK_IMPL_H + +#include <blocks/udp_sink.h> +#include <boost/asio.hpp> + +namespace gr { + namespace blocks { + + class udp_sink_impl : public udp_sink + { + private: + size_t d_itemsize; + + int d_payload_size; // maximum transmission unit (packet length) + bool d_eof; // send zero-length packet on disconnect + bool d_connected; // are we connected? + gruel::mutex d_mutex; // protects d_socket and d_connected + + boost::asio::ip::udp::socket *d_socket; // handle to socket + boost::asio::ip::udp::endpoint d_endpoint; + boost::asio::io_service d_io_service; + + public: + udp_sink_impl(size_t itemsize, + const std::string &host, int port, + int payload_size, bool eof); + ~udp_sink_impl(); + + int payload_size() { return d_payload_size; } + + void connect(const std::string &host, int port); + void disconnect(); + + int work(int noutput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + }; + + } /* namespace blocks */ +} /* namespace gr */ + +#endif /* INCLUDED_GR_UDP_SINK_IMPL_H */ diff --git a/gr-blocks/python/qa_udp_source_sink.py b/gr-blocks/python/qa_udp_source_sink.py new file mode 100644 index 0000000000..6ebbf87afc --- /dev/null +++ b/gr-blocks/python/qa_udp_source_sink.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python +# +# Copyright 2008,2010,2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import blocks_swig as blocks +import os + +from threading import Timer + +class test_udp_sink_source(gr_unittest.TestCase): + + def setUp(self): + self.tb_snd = gr.top_block() + self.tb_rcv = gr.top_block() + + def tearDown(self): + self.tb_rcv = None + self.tb_snd = None + + def test_001(self): + # Tests calling disconnect/reconnect. + + port = 65500 + + n_data = 16 + src_data = [x for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_s(src_data, False) + udp_snd = blocks.udp_sink(gr.sizeof_short, 'localhost', port) + self.tb_snd.connect(src, udp_snd) + + self.tb_snd.run() + udp_snd.disconnect() + + udp_snd.connect('localhost', port+1) + src.rewind() + self.tb_snd.run() + + def test_002(self): + port = 65500 + + n_data = 100000 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_f(src_data, False) + udp_snd = gr.udp_sink(gr.sizeof_float, 'localhost', port) + self.tb_snd.connect(src, udp_snd) + + udp_rcv = gr.udp_source(gr.sizeof_float, 'localhost', port) + dst = gr.vector_sink_f() + self.tb_rcv.connect(udp_rcv, dst) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(2.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(not self.timeout) + + def test_003(self): + udp_rcv = gr.udp_source( gr.sizeof_float, '0.0.0.0', 0, eof=False ) + rcv_port = udp_rcv.get_port() + + udp_snd = gr.udp_sink( gr.sizeof_float, '127.0.0.1', 65500 ) + udp_snd.connect( 'localhost', rcv_port ) + + n_data = 16 + src_data = [float(x) for x in range(n_data)] + expected_result = tuple(src_data) + src = gr.vector_source_f(src_data) + dst = gr.vector_sink_f() + + self.tb_snd.connect( src, udp_snd ) + self.tb_rcv.connect( udp_rcv, dst ) + + self.tb_rcv.start() + self.tb_snd.run() + udp_snd.disconnect() + self.timeout = False + q = Timer(2.0,self.stop_rcv) + q.start() + self.tb_rcv.wait() + q.cancel() + + result_data = dst.data() + self.assertEqual(expected_result, result_data) + self.assert_(self.timeout) # source ignores EOF? + + def stop_rcv(self): + self.timeout = True + self.tb_rcv.stop() + #print "tb_rcv stopped by Timer" + +if __name__ == '__main__': + gr_unittest.run(test_udp_sink_source, "test_udp_sink_source.xml") + diff --git a/gr-blocks/swig/blocks_swig.i b/gr-blocks/swig/blocks_swig.i index f6d7f43fde..9d9caa29fc 100644 --- a/gr-blocks/swig/blocks_swig.i +++ b/gr-blocks/swig/blocks_swig.i @@ -176,6 +176,8 @@ #include "blocks/transcendental.h" #include "blocks/tuntap_pdu.h" #include "blocks/uchar_to_float.h" +#include "blocks/udp_sink.h" +#include "blocks/udp_source.h" #include "blocks/unpack_k_bits_bb.h" #include "blocks/unpacked_to_packed_bb.h" #include "blocks/unpacked_to_packed_ss.h" @@ -335,6 +337,8 @@ %include "blocks/transcendental.h" %include "blocks/tuntap_pdu.h" %include "blocks/uchar_to_float.h" +%include "blocks/udp_sink.h" +%include "blocks/udp_source.h" %include "blocks/unpack_k_bits_bb.h" %include "blocks/unpacked_to_packed_bb.h" %include "blocks/unpacked_to_packed_ss.h" @@ -492,6 +496,8 @@ GR_SWIG_BLOCK_MAGIC2(blocks, throttle); GR_SWIG_BLOCK_MAGIC2(blocks, transcendental); GR_SWIG_BLOCK_MAGIC2(blocks, tuntap_pdu); GR_SWIG_BLOCK_MAGIC2(blocks, uchar_to_float); +GR_SWIG_BLOCK_MAGIC2(blocks, udp_sink); +GR_SWIG_BLOCK_MAGIC2(blocks, udp_source); GR_SWIG_BLOCK_MAGIC2(blocks, unpack_k_bits_bb); GR_SWIG_BLOCK_MAGIC2(blocks, unpacked_to_packed_bb); GR_SWIG_BLOCK_MAGIC2(blocks, unpacked_to_packed_ss); |