GNU Radio 3.6.5 C++ API

gr_socket_pdu.h

Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2012 Free Software Foundation, Inc.
00004  *
00005  * This file is part of GNU Radio
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  *
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef INCLUDED_GR_SOCKET_PDU_H
00024 #define INCLUDED_GR_SOCKET_PDU_H
00025 
00026 #include <gr_core_api.h>
00027 #include <gr_sync_block.h>
00028 #include <gr_message.h>
00029 #include <gr_msg_queue.h>
00030 #include <gr_stream_pdu_base.h>
00031 #include <boost/array.hpp>
00032 #include <boost/asio.hpp>
00033 #include <iostream>
00034 
00035 class gr_socket_pdu;
00036 typedef boost::shared_ptr<gr_socket_pdu> gr_socket_pdu_sptr;
00037 
00038 GR_CORE_API gr_socket_pdu_sptr gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000);
00039 
00040 class tcp_connection
00041   : public boost::enable_shared_from_this<tcp_connection>
00042 {
00043 public:
00044   typedef boost::shared_ptr<tcp_connection> pointer;
00045   gr_socket_pdu *d_block;
00046   boost::array<char, 10000> buf;
00047 
00048   static pointer create(boost::asio::io_service& io_service)
00049   {
00050     return pointer(new tcp_connection(io_service));
00051   }
00052 
00053   boost::asio::ip::tcp::socket& socket()
00054   {
00055     return socket_;
00056   }
00057 
00058   void start(gr_socket_pdu* parent)
00059   {
00060     d_block = parent;
00061 //    message_ = "connected to gr_socket_pdu\n";
00062 //    boost::asio::async_write(socket_, boost::asio::buffer(message_),
00063 //        boost::bind(&tcp_connection::handle_write, shared_from_this(),
00064 //          boost::asio::placeholders::error,
00065 //          boost::asio::placeholders::bytes_transferred));
00066 
00067     socket_.async_read_some(
00068         boost::asio::buffer(buf),
00069          boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00070   }
00071   void send(pmt::pmt_t vector){
00072     size_t len = pmt::pmt_length(vector);
00073     size_t offset(0);
00074     boost::array<char, 10000> txbuf;
00075     memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
00076     boost::asio::async_write(socket_, boost::asio::buffer(txbuf, len),
00077         boost::bind(&tcp_connection::handle_write, shared_from_this(),
00078           boost::asio::placeholders::error,
00079           boost::asio::placeholders::bytes_transferred));
00080     }
00081 
00082   ~tcp_connection(){
00083 //    std::cout << "tcp_connection destroyed\n";
00084     }
00085 
00086 private:
00087   tcp_connection(boost::asio::io_service& io_service)
00088     : socket_(io_service)
00089   {
00090   }
00091 
00092   void handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred);
00093 
00094   void handle_write(const boost::system::error_code& /*error*/,
00095       size_t /*bytes_transferred*/)
00096   {
00097   }
00098 
00099   boost::asio::ip::tcp::socket socket_;
00100   std::string message_;
00101 };
00102 
00103 
00104 /*!
00105  * \brief Gather received items into messages and insert into msgq
00106  * \ingroup sink_blk
00107  */
00108 class GR_CORE_API gr_socket_pdu : public gr_stream_pdu_base
00109 {
00110  private:
00111   friend GR_CORE_API gr_socket_pdu_sptr
00112   gr_make_socket_pdu(std::string type, std::string addr, std::string port, int MTU);
00113 
00114   boost::asio::io_service _io_service;
00115  
00116   boost::array<char, 10000> rxbuf;
00117 
00118   // tcp specific
00119   boost::asio::ip::tcp::endpoint _tcp_endpoint;
00120 
00121   // specific to tcp server
00122   boost::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor_tcp;
00123   std::vector<tcp_connection::pointer> d_tcp_connections;
00124   void tcp_server_send(pmt::pmt_t msg);
00125   void tcp_client_send(pmt::pmt_t msg);
00126   void udp_send(pmt::pmt_t msg);
00127 
00128   // specific to tcp client
00129   boost::shared_ptr<boost::asio::ip::tcp::socket> _tcp_socket;
00130 
00131   // specific to udp client/server
00132   boost::asio::ip::udp::endpoint _udp_endpoint;
00133   boost::asio::ip::udp::endpoint _udp_endpoint_other;
00134   boost::shared_ptr<boost::asio::ip::udp::socket> _udp_socket;
00135 
00136   void handle_receive(const boost::system::error_code& error, std::size_t ){
00137     }
00138  
00139   void start_tcp_accept(){
00140     tcp_connection::pointer new_connection =
00141         tcp_connection::create(_acceptor_tcp->get_io_service());
00142 
00143     _acceptor_tcp->async_accept(new_connection->socket(),
00144         boost::bind(&gr_socket_pdu::handle_tcp_accept, this, new_connection,
00145             boost::asio::placeholders::error));
00146     }
00147 
00148   void handle_tcp_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error){
00149         if (!error)
00150         {
00151             new_connection->start(this);
00152             d_tcp_connections.push_back(new_connection);
00153             start_tcp_accept();
00154         } else {
00155             std::cout << error << std::endl;
00156         }
00157     }
00158  
00159   void run_io_service(){
00160     _io_service.run();
00161     } 
00162 
00163   void handle_udp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){
00164     if(!error){
00165         pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]);
00166         pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
00167         
00168         message_port_pub( pmt::mp("pdus"), pdu );
00169     
00170         _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other,
00171             boost::bind(&gr_socket_pdu::handle_udp_read, this,
00172                 boost::asio::placeholders::error,
00173                 boost::asio::placeholders::bytes_transferred));
00174     } else {
00175         throw boost::system::system_error(error);
00176 //        std::cout << "error occurred\n";
00177     }
00178   }
00179   void handle_tcp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){
00180     if(!error)
00181     {
00182         pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]);
00183         pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
00184 
00185         message_port_pub( pmt::mp("pdus"), pdu );
00186 
00187         _tcp_socket->async_read_some(
00188             boost::asio::buffer(rxbuf),
00189             boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
00190 
00191     } else {
00192         //std::cout << "error occurred\n";
00193         throw boost::system::system_error(error);
00194     }
00195   }
00196 
00197  protected:
00198   gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000);
00199  public:
00200   ~gr_socket_pdu () {}
00201 };
00202 
00203 #endif /* INCLUDED_GR_TUNTAP_PDU_H */