summaryrefslogtreecommitdiff
path: root/gr-blocks/lib/stream_pdu_base.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/lib/stream_pdu_base.cc')
-rw-r--r--gr-blocks/lib/stream_pdu_base.cc126
1 files changed, 126 insertions, 0 deletions
diff --git a/gr-blocks/lib/stream_pdu_base.cc b/gr-blocks/lib/stream_pdu_base.cc
new file mode 100644
index 0000000000..3378067f85
--- /dev/null
+++ b/gr-blocks/lib/stream_pdu_base.cc
@@ -0,0 +1,126 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#ifdef HAVE_IO_H
+#include <io.h>
+#endif
+
+#include <gr_pdu.h>
+#include <gr_basic_block.h>
+#include "stream_pdu_base.h"
+#include <boost/format.hpp>
+
+static const long timeout_us = 100*1000; //100ms
+
+namespace gr {
+ namespace blocks {
+
+ stream_pdu_base::stream_pdu_base(int MTU)
+ : d_fd(-1),
+ d_started(false),
+ d_finished(false)
+ {
+ // reserve space for rx buffer
+ d_rxbuf.resize(MTU,0);
+ }
+
+ stream_pdu_base::~stream_pdu_base()
+ {
+ stop_rxthread();
+ }
+
+ void
+ stream_pdu_base::start_rxthread(gr_basic_block *blk, pmt::pmt_t port)
+ {
+ d_blk = blk;
+ d_port = port;
+ d_thread = gruel::thread(boost::bind(&stream_pdu_base::run, this));
+ d_started = true;
+ }
+
+ void
+ stream_pdu_base::stop_rxthread()
+ {
+ d_finished = true;
+
+ if (d_started) {
+ d_thread.interrupt();
+ d_thread.join();
+ }
+ }
+
+ void
+ stream_pdu_base::run()
+ {
+ while(!d_finished) {
+ if (!wait_ready())
+ continue;
+
+ const int result = read(d_fd, &d_rxbuf[0], d_rxbuf.size());
+ if (result <= 0)
+ throw std::runtime_error("stream_pdu_base, bad socket read!");
+
+ pmt::pmt_t vector = pmt::pmt_init_u8vector(result, &d_rxbuf[0]);
+ pmt::pmt_t pdu = pmt::pmt_cons(pmt::PMT_NIL, vector);
+
+ d_blk->message_port_pub(d_port, pdu);
+ }
+ }
+
+ bool
+ stream_pdu_base::wait_ready()
+ {
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = timeout_us;
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(d_fd, &rset);
+
+ //call select with timeout on receive socket
+ return ::select(d_fd+1, &rset, NULL, NULL, &tv) > 0;
+ }
+
+ void
+ stream_pdu_base::send(pmt::pmt_t msg)
+ {
+ pmt::pmt_t vector = pmt::pmt_cdr(msg);
+ size_t offset(0);
+ size_t itemsize(::gr_pdu_itemsize(type_from_pmt(vector)));
+ int len(pmt::pmt_length(vector)*itemsize);
+
+ const int rv = write(d_fd, pmt::pmt_uniform_vector_elements(vector, offset), len);
+ if (rv != len) {
+ std::cerr << boost::format("WARNING: gr_stream_pdu_base::send(pdu) write failed! (d_fd=%d, len=%d, rv=%d)")
+ % d_fd % len % rv << std::endl;
+ }
+ }
+
+ } /* namespace blocks */
+} /* namespace gr */