Statistics
| Branch: | Tag: | Revision:

root / gnuradio-core / src / lib / io / gr_udp_sink.cc @ fe2e6f80

History | View | Annotate | Download (7.9 kB)

1
/* -*- c++ -*- */
2
/*
3
 * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
4
 * 
5
 * This file is part of GNU Radio
6
 * 
7
 * GNU Radio is free software; you can redistribute it and/or modify
8
 * it under the terms of the GNU General Public License as published by
9
 * the Free Software Foundation; either version 3, or (at your option)
10
 * any later version.
11
 * 
12
 * GNU Radio is distributed in the hope that it will be useful,
13
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15
 * GNU General Public License for more details.
16
 * 
17
 * You should have received a copy of the GNU General Public License
18
 * along with GNU Radio; see the file COPYING.  If not, write to
19
 * the Free Software Foundation, Inc., 51 Franklin Street,
20
 * Boston, MA 02110-1301, USA.
21
 */
22
23
#ifdef HAVE_CONFIG_H
24
#include "config.h"
25
#endif
26
#include <gr_udp_sink.h>
27
#include <gr_io_signature.h>
28
#include <stdexcept>
29
#include <errno.h>
30
#include <stdio.h>
31
#include <string.h>
32
#if defined(HAVE_NETDB_H)
33
#include <netdb.h>
34
#ifdef HAVE_SYS_TYPES_H
35
#include <sys/types.h>
36
#endif
37
#ifdef HAVE_SYS_SOCKET_H
38
#include <sys/socket.h>  //usually included by <netdb.h>?
39
#endif
40
typedef void* optval_t;
41
#elif defined(HAVE_WINDOWS_H)
42
// if not posix, assume winsock
43
#define USING_WINSOCK
44
#include <winsock2.h>
45
#include <ws2tcpip.h>
46
#define SHUT_RDWR 2
47
typedef char* optval_t;
48
#endif
49
50
#include <gruel/thread.h>
51
52
#define SNK_VERBOSE 0
53
54
static int is_error( int perr )
55
{
56
  // Compare error to posix error code; return nonzero if match.
57
#if defined(USING_WINSOCK)
58
#define ENOPROTOOPT 109
59
#define ECONNREFUSED 111
60
  // All codes to be checked for must be defined below
61
  int werr = WSAGetLastError();
62
  switch( werr ) {
63
  case WSAETIMEDOUT:
64
    return( perr == EAGAIN );
65
  case WSAENOPROTOOPT:
66
    return( perr == ENOPROTOOPT );
67
  case WSAECONNREFUSED:
68
    return( perr == ECONNREFUSED );
69
  default:
70
    fprintf(stderr,"gr_udp_source/is_error: unknown error %d\n", perr );
71
    throw std::runtime_error("internal error");
72
  }
73
  return 0;
74
#else
75
  return( perr == errno );
76
#endif
77
}
78
79
static void report_error( const char *msg1, const char *msg2 )
80
{
81
  // Deal with errors, both posix and winsock
82
#if defined(USING_WINSOCK)
83
  int werr = WSAGetLastError();
84
  fprintf(stderr, "%s: winsock error %d\n", msg1, werr );
85
#else
86
  perror(msg1);
87
#endif
88
  if( msg2 != NULL )
89
    throw std::runtime_error(msg2);
90
  return;
91
}
92
93
gr_udp_sink::gr_udp_sink (size_t itemsize, 
94
                          const char *host, unsigned short port,
95
                          int payload_size, bool eof)
96
  : gr_sync_block ("udp_sink",
97
                   gr_make_io_signature (1, 1, itemsize),
98
                   gr_make_io_signature (0, 0, 0)),
99
    d_itemsize (itemsize), d_payload_size(payload_size), d_eof(eof),
100
    d_socket(-1), d_connected(false)
101
{
102
#if defined(USING_WINSOCK) // for Windows (with MinGW)
103
  // initialize winsock DLL
104
  WSADATA wsaData;
105
  int iResult = WSAStartup( MAKEWORD(2,2), &wsaData );
106
  if( iResult != NO_ERROR ) {
107
    report_error( "gr_udp_source WSAStartup", "can't open socket" );
108
  }
109
#endif
110
111
  // create socket
112
  d_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
113
  if(d_socket == -1) {
114
    report_error("socket open","can't open socket");
115
  }
116
117
  // Don't wait when shutting down
118
  linger lngr;
119
  lngr.l_onoff  = 1;
120
  lngr.l_linger = 0;
121
  if(setsockopt(d_socket, SOL_SOCKET, SO_LINGER, (optval_t)&lngr, sizeof(linger)) == -1) {
122
    if( !is_error(ENOPROTOOPT) ) {  // no SO_LINGER for SOCK_DGRAM on Windows
123
      report_error("SO_LINGER","can't set socket option SO_LINGER");
124
    }
125
  }
126
127
  // Get the destination address
128
  connect(host, port);
129
}
130
131
// public constructor that returns a shared_ptr
132
133
gr_udp_sink_sptr
134
gr_make_udp_sink (size_t itemsize, 
135
                  const char *host, unsigned short port,
136
                  int payload_size, bool eof)
137
{
138
  return gnuradio::get_initial_sptr(new gr_udp_sink (itemsize, 
139
                                            host, port,
140
                                            payload_size, eof));
141
}
142
143
gr_udp_sink::~gr_udp_sink ()
144
{
145
  if (d_connected)
146
    disconnect();
147
148
  if (d_socket != -1){
149
    shutdown(d_socket, SHUT_RDWR);
150
#if defined(USING_WINSOCK)
151
    closesocket(d_socket);
152
#else
153
    ::close(d_socket);
154
#endif
155
    d_socket = -1;
156
  }
157
158
#if defined(USING_WINSOCK) // for Windows (with MinGW)
159
  // free winsock resources
160
  WSACleanup();
161
#endif
162
}
163
164
int 
165
gr_udp_sink::work (int noutput_items,
166
                   gr_vector_const_void_star &input_items,
167
                   gr_vector_void_star &output_items)
168
{
169
  const char *in = (const char *) input_items[0];
170
  ssize_t r=0, bytes_sent=0, bytes_to_send=0;
171
  ssize_t total_size = noutput_items*d_itemsize;
172
173
  #if SNK_VERBOSE
174
  printf("Entered udp_sink\n");
175
  #endif
176
177
  gruel::scoped_lock guard(d_mutex);  // protect d_socket
178
179
  while(bytes_sent <  total_size) {
180
    bytes_to_send = std::min((ssize_t)d_payload_size, (total_size-bytes_sent));
181
  
182
    if(d_connected) {
183
      r = send(d_socket, (in+bytes_sent), bytes_to_send, 0);
184
      if(r == -1) {         // error on send command
185
        if( is_error(ECONNREFUSED) )
186
          r = bytes_to_send;  // discard data until receiver is started
187
        else {
188
          report_error("udp_sink",NULL); // there should be no error case where
189
          return -1;                  // this function should not exit immediately
190
        }
191
      }
192
    }
193
    else
194
      r = bytes_to_send;  // discarded for lack of connection
195
    bytes_sent += r;
196
    
197
    #if SNK_VERBOSE
198
    printf("\tbyte sent: %d bytes\n", r);
199
    #endif
200
  }
201
202
  #if SNK_VERBOSE
203
  printf("Sent: %d bytes (noutput_items: %d)\n", bytes_sent, noutput_items);
204
  #endif
205
206
  return noutput_items;
207
}
208
209
void gr_udp_sink::connect( const char *host, unsigned short port )
210
{
211
  if(d_connected)
212
    disconnect();
213
214
  if(host != NULL ) {
215
    // Get the destination address
216
    struct addrinfo *ip_dst;
217
    struct addrinfo hints;
218
    memset( (void*)&hints, 0, sizeof(hints) );
219
    hints.ai_family = AF_INET;
220
    hints.ai_socktype = SOCK_DGRAM;
221
    hints.ai_protocol = IPPROTO_UDP;
222
    char port_str[12];
223
    sprintf( port_str, "%d", port );
224
225
    // FIXME leaks if report_error throws below
226
    int ret = getaddrinfo( host, port_str, &hints, &ip_dst );
227
    if( ret != 0 )
228
      report_error("gr_udp_source/getaddrinfo",
229
                   "can't initialize destination socket" );
230
231
    // don't need d_mutex lock when !d_connected
232
    if(::connect(d_socket, ip_dst->ai_addr, ip_dst->ai_addrlen) == -1) {
233
      report_error("socket connect","can't connect to socket");
234
    }
235
    d_connected = true;
236
237
    freeaddrinfo(ip_dst);
238
  }
239
240
  return;
241
}
242
243
void gr_udp_sink::disconnect()
244
{
245
  if(!d_connected)
246
    return;
247
248
  #if SNK_VERBOSE
249
  printf("gr_udp_sink disconnecting\n");
250
  #endif
251
252
  gruel::scoped_lock guard(d_mutex);  // protect d_socket from work()
253
254
  // Send a few zero-length packets to signal receiver we are done
255
  if(d_eof) {
256
    int i;
257
    for( i = 0; i < 3; i++ )
258
      (void) send( d_socket, NULL, 0, 0 );  // ignore errors
259
  }
260
261
  // Sending EOF can produce ERRCONNREFUSED errors that won't show up
262
  //  until the next send or recv, which might confuse us if it happens
263
  //  on a new connection.  The following does a nonblocking recv to
264
  //  clear any such errors.
265
  timeval timeout;
266
  timeout.tv_sec = 0;    // zero time for immediate return
267
  timeout.tv_usec = 0;
268
  fd_set readfds;
269
  FD_ZERO(&readfds);
270
  FD_SET(d_socket, &readfds);
271
  int r = select(FD_SETSIZE, &readfds, NULL, NULL, &timeout);
272
  if(r < 0) {
273
      #if SNK_VERBOSE
274
      report_error("udp_sink/select",NULL);
275
      #endif
276
  }
277
  else if(r > 0) {  // call recv() to get error return
278
    r = recv(d_socket, (char*)&readfds, sizeof(readfds), 0);
279
    if(r < 0) {
280
        #if SNK_VERBOSE
281
        report_error("udp_sink/recv",NULL);
282
        #endif
283
    }
284
  }
285
286
  // Since I can't find any way to disconnect a datagram socket in Cygwin,
287
  // we just leave it connected but disable sending.
288
#if 0
289
  // zeroed address structure should reset connection
290
  struct sockaddr addr;
291
  memset( (void*)&addr, 0, sizeof(addr) );
292
  // addr.sa_family = AF_UNSPEC;  // doesn't work on Cygwin
293
  // addr.sa_family = AF_INET;  // doesn't work on Cygwin
294
295
  if(::connect(d_socket, &addr, sizeof(addr)) == -1)
296
    report_error("socket connect","can't connect to socket");
297
#endif
298
299
  d_connected = false;
300
301
  return;
302
}