From 461ece56b36a44b2405282630157739c7f9a26ba Mon Sep 17 00:00:00 2001
From: Tom Rondeau <trondeau@vt.edu>
Date: Fri, 14 Dec 2012 16:10:30 -0500
Subject: blocks: moving file metadata sink/source to gr-blocks.

---
 gr-blocks/lib/file_meta_source_impl.cc | 433 +++++++++++++++++++++++++++++++++
 1 file changed, 433 insertions(+)
 create mode 100644 gr-blocks/lib/file_meta_source_impl.cc

(limited to 'gr-blocks/lib/file_meta_source_impl.cc')

diff --git a/gr-blocks/lib/file_meta_source_impl.cc b/gr-blocks/lib/file_meta_source_impl.cc
new file mode 100644
index 0000000000..fb39b205b4
--- /dev/null
+++ b/gr-blocks/lib/file_meta_source_impl.cc
@@ -0,0 +1,433 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "file_meta_source_impl.h"
+#include <gr_io_signature.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <cstdio>
+
+namespace gr {
+  namespace blocks {
+
+// win32 (mingw/msvc) specific
+#ifdef HAVE_IO_H
+#include <io.h>
+#endif
+#ifdef O_BINARY
+#define	OUR_O_BINARY O_BINARY
+#else
+#define	OUR_O_BINARY 0
+#endif
+
+// should be handled via configure
+#ifdef O_LARGEFILE
+#define	OUR_O_LARGEFILE	O_LARGEFILE
+#else
+#define	OUR_O_LARGEFILE 0
+#endif
+
+    file_meta_source::sptr
+    file_meta_source::make(const std::string &filename,
+			   bool repeat,
+			   bool detached_header,
+			   const std::string &hdr_filename)
+    {
+      return gnuradio::get_initial_sptr
+	(new file_meta_source_impl(filename,
+				   repeat,
+				   detached_header,
+				   hdr_filename));
+    }
+
+    file_meta_source_impl::file_meta_source_impl(const std::string &filename,
+						 bool repeat,
+						 bool detached_header,
+						 const std::string &hdr_filename)
+      : gr_sync_block("file_meta_source",
+		      gr_make_io_signature(0, 0, 0),
+		      gr_make_io_signature(1, 1, 1)),
+	d_itemsize(0), d_samp_rate(0),
+	d_seg_size(0),
+	d_updated(false), d_repeat(repeat)
+    {
+      d_fp = 0;
+      d_new_fp = 0;
+      d_hdr_fp = 0;
+      d_new_hdr_fp = 0;
+
+      if(detached_header == true) {
+	d_state = STATE_DETACHED;
+      }
+      else
+	d_state = STATE_INLINE;
+
+      if(!open(filename, hdr_filename))
+	throw std::runtime_error("file_meta_source: can't open file\n");
+
+      do_update();
+
+      pmt_t hdr = PMT_NIL, extras = PMT_NIL;
+      if(read_header(hdr, extras)) {
+	parse_header(hdr, 0, d_tags);
+	parse_extras(extras, 0, d_tags);
+      }
+      else
+	throw std::runtime_error("file_meta_source: could not read header.\n");
+
+      // Set output signature based on itemsize info in header
+      set_output_signature(gr_make_io_signature(1, 1, d_itemsize));
+    }
+
+    file_meta_source_impl::~file_meta_source_impl()
+    {
+      close();
+
+      if(d_fp) {
+	fclose(d_fp);
+	d_fp = 0;
+      }
+  
+      if(d_state == STATE_DETACHED) {
+	if(d_hdr_fp) {
+	  fclose(d_hdr_fp);
+	  d_hdr_fp = 0;
+	}
+      }
+    }
+
+    bool
+    file_meta_source_impl::read_header(pmt_t &hdr, pmt_t &extras)
+    {
+      // Select which file handle to read from.
+      FILE *fp;
+      if(d_state == STATE_DETACHED)
+	fp = d_hdr_fp;
+      else
+	fp = d_fp;
+
+      size_t ret;
+      size_t size = 0;
+      std::string str;
+      char *hdr_buffer = new char[METADATA_HEADER_SIZE];
+      while(size < METADATA_HEADER_SIZE) {
+	ret = fread(&hdr_buffer[size], sizeof(char), METADATA_HEADER_SIZE-size, fp);
+	if(ret == 0) {
+	  delete [] hdr_buffer;
+	  if(feof(fp))
+	    return false;
+	  else {
+	    std::stringstream s;
+	    s << "file_meta_source: error occurred extracting header: "
+	      << strerror(errno) << std::endl;
+	    throw std::runtime_error(s.str());
+	  }
+	}
+	size += ret;
+      }
+
+      // Convert to string or the char array gets confused by the \0
+      str.insert(0, hdr_buffer, METADATA_HEADER_SIZE);
+      hdr = pmt_deserialize_str(str);
+      delete [] hdr_buffer;
+
+      uint64_t seg_start, extra_len;
+      pmt_t r, dump;
+      if(pmt_dict_has_key(hdr, pmt_string_to_symbol("strt"))) {
+	r = pmt_dict_ref(hdr, pmt_string_to_symbol("strt"), dump);
+	seg_start = pmt_to_uint64(r);
+	extra_len = seg_start - METADATA_HEADER_SIZE;
+      }
+
+      if(extra_len > 0) {
+	size = 0;
+	hdr_buffer = new char[extra_len];
+	while(size < extra_len) {
+	  ret = fread(&hdr_buffer[size], sizeof(char), extra_len-size, fp);
+	  if(ret == 0) {
+	    delete [] hdr_buffer;
+	    if(feof(fp))
+	      return false;
+	    else {
+	      std::stringstream s;
+	      s << "file_meta_source: error occurred extracting extras: "
+		<< strerror(errno) << std::endl;
+	      throw std::runtime_error(s.str());
+	    }
+	  }
+	  size += ret;
+	}
+
+	str.clear();
+	str.insert(0, hdr_buffer, extra_len);
+	extras = pmt_deserialize_str(str);
+	delete [] hdr_buffer;
+      }
+
+      return true;
+    }
+
+    void
+    file_meta_source_impl::parse_header(pmt_t hdr, uint64_t offset,
+					std::vector<gr_tag_t> &tags)
+    {
+      pmt_t r, key;
+
+      // GET SAMPLE RATE
+      key = pmt_string_to_symbol("rx_rate");
+      if(pmt_dict_has_key(hdr, key)) {
+	r = pmt_dict_ref(hdr, key, PMT_NIL);
+	d_samp_rate = pmt_to_double(r);
+
+	gr_tag_t t;
+	t.offset = offset;
+	t.key = key;
+	t.value = r;
+	t.srcid = alias_pmt();
+	tags.push_back(t);
+      }
+      else {
+	throw std::runtime_error("file_meta_source: Could not extract sample rate.\n");
+      }
+
+      // GET TIME STAMP
+      key = pmt_string_to_symbol("rx_time");
+      if(pmt_dict_has_key(hdr, key)) {
+	d_time_stamp = pmt_dict_ref(hdr, key, PMT_NIL);
+
+	gr_tag_t t;
+	t.offset = offset;
+	t.key = key;
+	t.value = d_time_stamp;
+	t.srcid = alias_pmt();
+	tags.push_back(t);
+      }
+      else {
+	throw std::runtime_error("file_meta_source: Could not extract time stamp.\n");
+      }
+
+      // GET ITEM SIZE OF DATA
+      if(pmt_dict_has_key(hdr, pmt_string_to_symbol("size"))) {
+	d_itemsize = pmt_to_long(pmt_dict_ref(hdr, pmt_string_to_symbol("size"), PMT_NIL));
+      }
+      else {
+	throw std::runtime_error("file_meta_source: Could not extract item size.\n");
+      }
+
+      // GET SEGMENT SIZE
+      if(pmt_dict_has_key(hdr, pmt_string_to_symbol("bytes"))) {
+	d_seg_size = pmt_to_uint64(pmt_dict_ref(hdr, pmt_string_to_symbol("bytes"), PMT_NIL));
+
+	// Convert from bytes to items
+	d_seg_size /= d_itemsize;
+      }
+      else {
+	throw std::runtime_error("file_meta_source: Could not extract segment size.\n");
+      }
+    }
+
+    void
+    file_meta_source_impl::parse_extras(pmt_t extras, uint64_t offset,
+					std::vector<gr_tag_t> &tags)
+    {
+      pmt_t item, key, val;
+
+      size_t nitems = pmt_length(extras);
+      for(size_t i = 0; i < nitems; i++) {
+	item = pmt_nth(i, extras);
+	key = pmt_car(item);
+	val = pmt_cdr(item);
+
+	gr_tag_t t;
+	t.offset = offset;
+	t.key = key;
+	t.value = val;
+	t.srcid = alias_pmt();
+	tags.push_back(t);
+      }
+    }
+
+    bool
+    file_meta_source_impl::open(const std::string &filename,
+				const std::string &hdr_filename)
+    {
+      bool ret = true;
+      if(d_state == STATE_DETACHED) {
+	std::string s;
+	if(hdr_filename == "")
+	  s = filename + ".hdr";
+	else
+	  s = hdr_filename;
+	ret = _open(&d_new_hdr_fp, s.c_str());
+      }
+
+      ret = ret && _open(&d_new_fp, filename.c_str());
+      d_updated = true;
+      return ret;
+    }
+
+    bool
+    file_meta_source_impl::_open(FILE **fp, const char *filename)
+    {
+      gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+
+      bool ret = true;
+      int fd;
+
+      if((fd = ::open(filename,
+		      O_RDONLY|OUR_O_LARGEFILE|OUR_O_BINARY)) < 0) {
+	perror(filename);
+	return false;
+      }
+
+      if(*fp) {		// if we've already got a new one open, close it
+	fclose(*fp);
+	fp = 0;
+      }
+
+      if((*fp = fdopen(fd, "rb")) == NULL) {
+	perror(filename);
+	::close(fd);		// don't leak file descriptor if fdopen fails.
+      }
+
+      ret = fp != 0;
+
+      return ret;
+    }
+
+    void
+    file_meta_source_impl::close()
+    {
+      gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+      if(d_state == STATE_DETACHED) {
+	if(d_new_hdr_fp) {
+	  fclose(d_new_hdr_fp);
+	  d_new_hdr_fp = 0;
+	}
+      }
+
+      if(d_new_fp) {
+	fclose(d_new_fp);
+	d_new_fp = 0;
+      }
+      d_updated = true;
+    }
+
+    void
+    file_meta_source_impl::do_update()
+    {
+      if(d_updated) {
+	gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this block
+	if(d_state == STATE_DETACHED) {
+	  if(d_hdr_fp)
+	    fclose(d_hdr_fp);
+	  d_hdr_fp = d_new_hdr_fp;		// install new file pointer
+	  d_new_hdr_fp = 0;
+	}
+
+	if(d_fp)
+	  fclose(d_fp);
+	d_fp = d_new_fp;		// install new file pointer
+	d_new_fp = 0;
+
+	d_updated = false;
+      }
+    }
+
+    int
+    file_meta_source_impl::work(int noutput_items,
+				gr_vector_const_void_star &input_items,
+				gr_vector_void_star &output_items)
+    {
+      // We've reached the end of a segment; parse the next header and get
+      // the new tags to send and set the next segment size.
+      if(d_seg_size == 0) {
+	pmt_t hdr=PMT_NIL, extras=PMT_NIL;
+	if(read_header(hdr, extras)) {
+	  parse_header(hdr, nitems_written(0), d_tags);
+	  parse_extras(extras, nitems_written(0), d_tags);
+	}
+	else {
+	  return -1;
+	}
+      }
+
+      char *out = (char*)output_items[0];
+      int i;
+      int seg_size = std::min(noutput_items, (int)d_seg_size);
+      int size = seg_size;
+
+      do_update();       // update d_fp is reqd
+      if(d_fp == NULL)
+	throw std::runtime_error("work with file not open");
+
+      // Push all tags onto the stream and remove them from the vector
+      while(!d_tags.empty()) {
+	add_item_tag(0, d_tags.back());
+	d_tags.pop_back();
+      }
+
+      gruel::scoped_lock lock(d_mutex); // hold for the rest of this function
+      while(size) {
+	i = fread(out, d_itemsize, size, d_fp);
+
+	size -= i;
+	d_seg_size -= i;
+	out += i * d_itemsize;
+
+	if(size == 0)		// done
+	  break;
+
+	if(i > 0)			// short read, try again
+	  continue;
+
+	// We got a zero from fread.  This is either EOF or error.  In
+	// any event, if we're in repeat mode, seek back to the beginning
+	// of the file and try again, else break
+
+	if(!d_repeat)
+	  break;
+
+	if(fseek(d_fp, 0, SEEK_SET) == -1) {
+	  std::stringstream s;
+	  s << "[" << __FILE__ << "]" << " fseek failed" << std::endl;
+	  throw std::runtime_error(s.str());
+	}
+      }
+
+      if(size > 0) {			// EOF or error
+	if(size == seg_size)		// we didn't read anything; say we're done
+	  return -1;
+	return seg_size - size;	// else return partial result
+      }
+
+      return seg_size;
+    }
+
+  } /* namespace blocks */
+} /* namespace gr */
-- 
cgit v1.2.3