From 461ece56b36a44b2405282630157739c7f9a26ba Mon Sep 17 00:00:00 2001
From: Tom Rondeau <trondeau@vt.edu>
Date: Fri, 14 Dec 2012 16:10:30 -0500
Subject: blocks: moving file metadata sink/source to gr-blocks.

---
 gr-blocks/lib/file_meta_sink_impl.cc | 464 +++++++++++++++++++++++++++++++++++
 1 file changed, 464 insertions(+)
 create mode 100644 gr-blocks/lib/file_meta_sink_impl.cc

(limited to 'gr-blocks/lib/file_meta_sink_impl.cc')

diff --git a/gr-blocks/lib/file_meta_sink_impl.cc b/gr-blocks/lib/file_meta_sink_impl.cc
new file mode 100644
index 0000000000..ad16e9fcac
--- /dev/null
+++ b/gr-blocks/lib/file_meta_sink_impl.cc
@@ -0,0 +1,464 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "file_meta_sink_impl.h"
+#include <gr_io_signature.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <cstdio>
+
+namespace gr {
+  namespace blocks {
+
+// win32 (mingw/msvc) specific
+#ifdef HAVE_IO_H
+#include <io.h>
+#endif
+#ifdef O_BINARY
+#define	OUR_O_BINARY O_BINARY
+#else
+#define	OUR_O_BINARY 0
+#endif
+
+// should be handled via configure
+#ifdef O_LARGEFILE
+#define	OUR_O_LARGEFILE	O_LARGEFILE
+#else
+#define	OUR_O_LARGEFILE 0
+#endif
+
+    file_meta_sink::sptr
+    file_meta_sink::make(size_t itemsize, const std::string &filename,
+			 double samp_rate, double relative_rate,
+			 gr_file_types type, bool complex,
+			 size_t max_segment_size,
+			 const std::string &extra_dict,
+			 bool detached_header)
+    {
+      return gnuradio::get_initial_sptr
+	(new file_meta_sink_impl(itemsize, filename,
+				 samp_rate, relative_rate,
+				 type, complex,
+				 max_segment_size,
+				 extra_dict,
+				 detached_header));
+    }
+
+    file_meta_sink_impl::file_meta_sink_impl(size_t itemsize,
+					     const std::string &filename,
+					     double samp_rate, double relative_rate,
+					     gr_file_types type, bool complex,
+					     size_t max_segment_size,
+					     const std::string &extra_dict,
+					     bool detached_header)
+      : gr_sync_block("file_meta_sink",
+		      gr_make_io_signature(1, 1, itemsize),
+		      gr_make_io_signature(0, 0, 0)),
+	d_itemsize(itemsize),
+	d_samp_rate(samp_rate), d_relative_rate(relative_rate),
+	d_max_seg_size(max_segment_size), d_total_seg_size(0),
+	d_updated(false), d_unbuffered(false)
+    {
+      d_fp = 0;
+      d_new_fp = 0;
+      d_hdr_fp = 0;
+      d_new_hdr_fp = 0;
+
+      if(detached_header == true)
+	d_state = STATE_DETACHED;
+      else
+	d_state = STATE_INLINE;
+
+      if(!open(filename))
+	throw std::runtime_error("file_meta_sink: can't open file\n");
+
+      pmt_t timestamp = pmt_make_tuple(pmt_from_uint64(0),
+				       pmt_from_double(0));
+
+      // handle extra dictionary
+      d_extra = pmt_make_dict();
+      if(extra_dict.size() > 0) {
+	pmt_t extras = pmt_deserialize_str(extra_dict);
+	pmt_t keys = pmt_dict_keys(extras);
+	pmt_t vals = pmt_dict_values(extras);
+	size_t nitems = pmt_length(keys);
+	for(size_t i = 0; i < nitems; i++) {
+	  d_extra = pmt_dict_add(d_extra,
+				 pmt_nth(i, keys),
+				 pmt_nth(i, vals));
+	}
+      }
+
+      d_extra_size = pmt_serialize_str(d_extra).size();
+
+      d_header = pmt_make_dict();
+      d_header = pmt_dict_add(d_header, mp("version"), mp(METADATA_VERSION));
+      d_header = pmt_dict_add(d_header, mp("rx_rate"), mp(samp_rate));
+      d_header = pmt_dict_add(d_header, mp("rx_time"), timestamp);
+      d_header = pmt_dict_add(d_header, mp("size"), pmt_from_long(d_itemsize));
+      d_header = pmt_dict_add(d_header, mp("type"), pmt_from_long(type));
+      d_header = pmt_dict_add(d_header, mp("cplx"), complex ? PMT_T : PMT_F);
+      d_header = pmt_dict_add(d_header, mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
+      d_header = pmt_dict_add(d_header, mp("bytes"), pmt_from_uint64(0));
+
+      do_update();
+
+      if(d_state == STATE_DETACHED)
+	write_header(d_hdr_fp, d_header, d_extra);
+      else
+	write_header(d_fp, d_header, d_extra);
+    }
+
+    file_meta_sink_impl::~file_meta_sink_impl()
+    {
+      close();
+
+      if(d_fp) {
+	fclose(d_fp);
+	d_fp = 0;
+      }
+  
+      if(d_state == STATE_DETACHED) {
+	if(d_hdr_fp) {
+	  fclose(d_hdr_fp);
+	  d_hdr_fp = 0;
+	}
+      }
+    }
+
+    bool
+    file_meta_sink_impl::open(const std::string &filename)
+    {
+      bool ret = true;
+      if(d_state == STATE_DETACHED) {
+	std::string s = filename + ".hdr";
+	ret = _open(&d_new_hdr_fp, s.c_str());
+      }
+
+      ret = ret && _open(&d_new_fp, filename.c_str());
+      d_updated = true;
+      return ret;
+    }
+
+    bool
+    file_meta_sink_impl::_open(FILE **fp, const char *filename)
+    {
+      gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+
+      bool ret = true;
+      int fd;
+
+      if((fd = ::open(filename,
+		      O_WRONLY|O_CREAT|O_TRUNC|OUR_O_LARGEFILE|OUR_O_BINARY,
+		      0664)) < 0){
+	perror(filename);
+	return false;
+      }
+
+      if(*fp) {		// if we've already got a new one open, close it
+	fclose(*fp);
+	fp = 0;
+      }
+
+      if((*fp = fdopen(fd, "wb")) == NULL) {
+	perror(filename);
+	::close(fd);		// don't leak file descriptor if fdopen fails.
+      }
+
+      ret = fp != 0;
+
+      return ret;
+    }
+
+    void
+    file_meta_sink_impl::close()
+    {
+      gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this function
+      update_last_header();
+
+      if(d_state == STATE_DETACHED) {
+	if(d_new_hdr_fp) {
+	  fclose(d_new_hdr_fp);
+	  d_new_hdr_fp = 0;
+	}
+      }
+
+      if(d_new_fp) {
+	fclose(d_new_fp);
+	d_new_fp = 0;
+      }
+      d_updated = true;
+    }
+
+    void
+    file_meta_sink_impl::do_update()
+    {
+      if(d_updated) {
+	gruel::scoped_lock guard(d_mutex); // hold mutex for duration of this block
+	if(d_state == STATE_DETACHED) {
+	  if(d_hdr_fp)
+	    fclose(d_hdr_fp);
+	  d_hdr_fp = d_new_hdr_fp;		// install new file pointer
+	  d_new_hdr_fp = 0;
+	}
+
+	if(d_fp)
+	  fclose(d_fp);
+	d_fp = d_new_fp;		// install new file pointer
+	d_new_fp = 0;
+
+	d_updated = false;
+      }
+    }
+
+    void
+    file_meta_sink_impl::write_header(FILE *fp, pmt_t header, pmt_t extra)
+    {
+      std::string header_str = pmt_serialize_str(header);
+      std::string extra_str = pmt_serialize_str(extra);
+
+      if((header_str.size() != METADATA_HEADER_SIZE) && (extra_str.size() != d_extra_size))
+	throw std::runtime_error("file_meta_sink: header or extras is wrong size.\n");
+
+      size_t nwritten = 0;
+      while(nwritten < header_str.size()) {
+	std::string sub = header_str.substr(nwritten);
+	int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
+	nwritten += count;
+	if((count == 0) && (ferror(fp))) {
+	  fclose(fp);
+	  throw std::runtime_error("file_meta_sink: error writing header to file.\n");
+	}
+      }
+
+      nwritten = 0;
+      while(nwritten < extra_str.size()) {
+	std::string sub = extra_str.substr(nwritten);
+	int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp);
+	nwritten += count;
+	if((count == 0) && (ferror(fp))) {
+	  fclose(fp);
+	  throw std::runtime_error("file_meta_sink: error writing extra to file.\n");
+	}
+      }
+
+      fflush(fp);
+    }
+
+    void
+    file_meta_sink_impl::update_header(pmt_t key, pmt_t value)
+    {
+      // Special handling caveat to transform rate from radio source into
+      // the rate at this sink.
+      if(pmt_eq(key, mp("rx_rate"))) {
+	d_samp_rate = pmt_to_double(value);
+	value = pmt_from_double(d_samp_rate*d_relative_rate);
+      }
+
+      // If the tag is not part of the standard header, we put it into the
+      // extra data, which either updates the current dictionary or adds a
+      // new item.
+      if(pmt_dict_has_key(d_header, key)) {
+	d_header = pmt_dict_add(d_header, key, value);
+      }
+      else {
+	d_extra = pmt_dict_add(d_extra, key, value);
+	d_extra_size = pmt_serialize_str(d_extra).size();
+      }
+    }
+
+    void
+    file_meta_sink_impl::update_last_header()
+    {
+      if(d_state == STATE_DETACHED)
+	update_last_header_detached();
+      else
+	update_last_header_inline();
+    }
+
+    void
+    file_meta_sink_impl::update_last_header_inline()
+    {
+      // Update the last header info with the number of samples this
+      // block represents.
+
+      size_t hdrlen = pmt_to_uint64(pmt_dict_ref(d_header, mp("strt"), PMT_NIL));
+      size_t seg_size = d_itemsize*d_total_seg_size;
+      pmt_t s = pmt_from_uint64(seg_size);
+      update_header(mp("bytes"), s);
+      update_header(mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
+      fseek(d_fp, -seg_size-hdrlen, SEEK_CUR);
+      write_header(d_fp, d_header, d_extra);
+      fseek(d_fp, seg_size, SEEK_CUR);
+    }
+
+    void
+    file_meta_sink_impl::update_last_header_detached()
+    {
+      // Update the last header info with the number of samples this
+      // block represents.
+      size_t hdrlen = pmt_to_uint64(pmt_dict_ref(d_header, mp("strt"), PMT_NIL));
+      size_t seg_size = d_itemsize*d_total_seg_size;
+      pmt_t s = pmt_from_uint64(seg_size);
+      update_header(mp("bytes"), s);
+      update_header(mp("strt"), pmt_from_uint64(METADATA_HEADER_SIZE+d_extra_size));
+      fseek(d_hdr_fp, -hdrlen, SEEK_CUR);
+      write_header(d_hdr_fp, d_header, d_extra);
+    }
+
+    void
+    file_meta_sink_impl::write_and_update()
+    {
+      // New header, so set current size of chunk to 0 and start of chunk
+      // based on current index + header size.
+      //uint64_t loc = get_last_header_loc();
+      pmt_t s = pmt_from_uint64(0);
+      update_header(mp("bytes"), s);
+
+      // If we have multiple tags on the same offset, this makes
+      // sure we just overwrite the same header each time instead
+      // of creating a new header per tag.
+      s = pmt_from_uint64(METADATA_HEADER_SIZE + d_extra_size);
+      update_header(mp("strt"), s);
+
+      if(d_state == STATE_DETACHED)
+	write_header(d_hdr_fp, d_header, d_extra);
+      else
+	write_header(d_fp, d_header, d_extra);
+    }
+
+    void
+    file_meta_sink_impl::update_rx_time()
+    {
+      pmt_t rx_time = pmt_string_to_symbol("rx_time");
+      pmt_t r = pmt_dict_ref(d_header, rx_time, PMT_NIL);
+      uint64_t secs = pmt_to_uint64(pmt_tuple_ref(r, 0));
+      double fracs = pmt_to_double(pmt_tuple_ref(r, 1));
+      double diff = d_total_seg_size / (d_samp_rate*d_relative_rate);
+
+      //std::cerr << "old secs:  " << secs << std::endl;
+      //std::cerr << "old fracs: " << fracs << std::endl;
+      //std::cerr << "seg size:  " << d_total_seg_size << std::endl;
+      //std::cerr << "diff:      " << diff << std::endl;
+
+      fracs += diff;
+      uint64_t new_secs = static_cast<uint64_t>(fracs);
+      secs += new_secs;
+      fracs -= new_secs;
+
+      //std::cerr << "new secs:  " << secs << std::endl;
+      //std::cerr << "new fracs: " << fracs << std::endl << std::endl;
+
+      r = pmt_make_tuple(pmt_from_uint64(secs), pmt_from_double(fracs));
+      d_header = pmt_dict_add(d_header, rx_time, r);
+    }
+
+    int
+    file_meta_sink_impl::work(int noutput_items,
+			      gr_vector_const_void_star &input_items,
+			      gr_vector_void_star &output_items)
+    {
+      char *inbuf = (char*)input_items[0];
+      int  nwritten = 0;
+
+      do_update();				// update d_fp is reqd
+
+      if(!d_fp)
+	return noutput_items;		// drop output on the floor
+
+      uint64_t abs_N = nitems_read(0);
+      uint64_t end_N = abs_N + (uint64_t)(noutput_items);
+      std::vector<gr_tag_t> all_tags;
+      get_tags_in_range(all_tags, 0, abs_N, end_N);
+
+      std::vector<gr_tag_t>::iterator itr;
+      for(itr = all_tags.begin(); itr != all_tags.end(); itr++) {
+	int item_offset = (int)(itr->offset - abs_N);
+
+	// Write date to file up to the next tag location
+	while(nwritten < item_offset) {
+	  size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
+				    (size_t)(item_offset - nwritten));
+	  int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
+	  if(count == 0)	// FIXME add error handling
+	    break;
+	  nwritten += count;
+	  inbuf += count * d_itemsize;
+
+	  d_total_seg_size += count;
+
+	  // Only add a new header if we are not at the position of the
+	  // next tag
+	  if((d_total_seg_size == d_max_seg_size) &&
+	     (nwritten < item_offset)) {
+	    update_last_header();
+	    update_rx_time();
+	    write_and_update();
+	    d_total_seg_size = 0;
+	  }
+	}
+
+	if(d_total_seg_size > 0) {
+	  update_last_header();
+	  update_header(itr->key, itr->value);
+	  write_and_update();
+	  d_total_seg_size = 0;
+	}
+	else {
+	  update_header(itr->key, itr->value);
+	  update_last_header();
+	}
+      }
+
+      // Finish up the rest of the data after tags
+      while(nwritten < noutput_items) {
+	size_t towrite = std::min(d_max_seg_size - d_total_seg_size,
+				  (size_t)(noutput_items - nwritten));
+	int count = fwrite(inbuf, d_itemsize, towrite, d_fp);
+	if(count == 0)	// FIXME add error handling
+	  break;
+	nwritten += count;
+	inbuf += count * d_itemsize;
+
+	d_total_seg_size += count;
+	if(d_total_seg_size == d_max_seg_size) {
+	  update_last_header();
+	  update_rx_time();
+	  write_and_update();
+	  d_total_seg_size = 0;
+	}
+      }
+
+      if(d_unbuffered)
+	fflush(d_fp);
+
+      return nwritten;
+    }
+
+  } /* namespace blocks */
+} /* namespace gr */
-- 
cgit v1.2.3