/* -*- c++ -*- */ /* * Copyright 2012,2018 Free Software Foundation, Inc. * * This file is part of GNU Radio * * SPDX-License-Identifier: GPL-3.0-or-later * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "file_meta_sink_impl.h" #include <gnuradio/io_signature.h> #include <fcntl.h> #include <sys/stat.h> #include <sys/types.h> #include <cstdio> #include <stdexcept> // win32 (mingw/msvc) specific #ifdef HAVE_IO_H #include <io.h> #endif #ifdef O_BINARY #define OUR_O_BINARY O_BINARY #else #define OUR_O_BINARY 0 #endif // should be handled via configure #ifdef O_LARGEFILE #define OUR_O_LARGEFILE O_LARGEFILE #else #define OUR_O_LARGEFILE 0 #endif namespace gr { namespace blocks { file_meta_sink::sptr file_meta_sink::make(size_t itemsize, const std::string& filename, double samp_rate, double relative_rate, gr_file_types type, bool complex, size_t max_segment_size, pmt::pmt_t extra_dict, bool detached_header) { return gnuradio::make_block_sptr<file_meta_sink_impl>(itemsize, filename, samp_rate, relative_rate, type, complex, max_segment_size, extra_dict, detached_header); } file_meta_sink_impl::file_meta_sink_impl(size_t itemsize, const std::string& filename, double samp_rate, double relative_rate, gr_file_types type, bool complex, size_t max_segment_size, pmt::pmt_t extra_dict, bool detached_header) : sync_block("file_meta_sink", io_signature::make(1, 1, itemsize), io_signature::make(0, 0, 0)), d_itemsize(itemsize), d_samp_rate(samp_rate), d_relative_rate(relative_rate), d_max_seg_size(max_segment_size), d_total_seg_size(0), d_updated(false), d_unbuffered(false) { d_fp = 0; d_new_fp = 0; d_hdr_fp = 0; d_new_hdr_fp = 0; if (detached_header == true) d_state = STATE_DETACHED; else d_state = STATE_INLINE; if (!open(filename)) throw std::runtime_error("file_meta_sink: can't open file"); pmt::pmt_t timestamp = pmt::make_tuple(pmt::from_uint64(0), pmt::from_double(0)); // handle extra dictionary d_extra = pmt::make_dict(); pmt::pmt_t keys = pmt::dict_keys(extra_dict); pmt::pmt_t vals = pmt::dict_values(extra_dict); size_t nitems = pmt::length(keys); for (size_t i = 0; i < nitems; i++) { d_extra = pmt::dict_add(d_extra, pmt::nth(i, keys), pmt::nth(i, vals)); } d_extra_size = pmt::serialize_str(d_extra).size(); d_header = pmt::make_dict(); d_header = pmt::dict_add(d_header, pmt::mp("version"), pmt::mp(METADATA_VERSION)); d_header = pmt::dict_add(d_header, pmt::mp("rx_rate"), pmt::mp(samp_rate)); d_header = pmt::dict_add(d_header, pmt::mp("rx_time"), timestamp); d_header = pmt::dict_add(d_header, pmt::mp("size"), pmt::from_long(d_itemsize)); d_header = pmt::dict_add(d_header, pmt::mp("type"), pmt::from_long(type)); d_header = pmt::dict_add(d_header, pmt::mp("cplx"), complex ? pmt::PMT_T : pmt::PMT_F); d_header = pmt::dict_add( d_header, pmt::mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size)); d_header = pmt::dict_add(d_header, mp("bytes"), pmt::from_uint64(0)); do_update(); if (d_state == STATE_DETACHED) write_header(d_hdr_fp, d_header, d_extra); else write_header(d_fp, d_header, d_extra); } file_meta_sink_impl::~file_meta_sink_impl() { close(); } bool file_meta_sink_impl::open(const std::string& filename) { bool ret = true; if (d_state == STATE_DETACHED) { std::string s = filename + ".hdr"; ret = _open(&d_new_hdr_fp, s.c_str()); } ret = ret && _open(&d_new_fp, filename.c_str()); d_updated = true; return ret; } bool file_meta_sink_impl::_open(FILE** fp, const char* filename) { gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function bool ret = true; int fd; if ((fd = ::open(filename, O_WRONLY | O_CREAT | O_TRUNC | OUR_O_LARGEFILE | OUR_O_BINARY, 0664)) < 0) { GR_LOG_ERROR(d_logger, boost::format("%s: %s") % filename % strerror(errno)); return false; } if (*fp) { // if we've already got a new one open, close it fclose(*fp); fp = 0; } if ((*fp = fdopen(fd, "wb")) == NULL) { GR_LOG_ERROR(d_logger, boost::format("%s: %s") % filename % strerror(errno)); ::close(fd); // don't leak file descriptor if fdopen fails. } ret = fp != 0; return ret; } void file_meta_sink_impl::close() { gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function update_last_header(); if (d_state == STATE_DETACHED) { if (d_new_hdr_fp) { fclose(d_new_hdr_fp); d_new_hdr_fp = 0; } } if (d_new_fp) { fclose(d_new_fp); d_new_fp = 0; } d_updated = true; if (d_fp) { fclose(d_fp); d_fp = 0; } if (d_state == STATE_DETACHED) { if (d_hdr_fp) { fclose(d_hdr_fp); d_hdr_fp = 0; } } } void file_meta_sink_impl::do_update() { if (d_updated) { gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this block if (d_state == STATE_DETACHED) { if (d_hdr_fp) fclose(d_hdr_fp); d_hdr_fp = d_new_hdr_fp; // install new file pointer d_new_hdr_fp = 0; } if (d_fp) fclose(d_fp); d_fp = d_new_fp; // install new file pointer d_new_fp = 0; d_updated = false; } } void file_meta_sink_impl::write_header(FILE* fp, pmt::pmt_t header, pmt::pmt_t extra) { std::string header_str = pmt::serialize_str(header); std::string extra_str = pmt::serialize_str(extra); if ((header_str.size() != METADATA_HEADER_SIZE) || (extra_str.size() != d_extra_size)) throw std::runtime_error("file_meta_sink: header or extra_dict is wrong size."); size_t nwritten = 0; while (nwritten < header_str.size()) { std::string sub = header_str.substr(nwritten); int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp); nwritten += count; if ((count == 0) && (ferror(fp))) { fclose(fp); throw std::runtime_error("file_meta_sink: error writing header to file."); } } nwritten = 0; while (nwritten < extra_str.size()) { std::string sub = extra_str.substr(nwritten); int count = fwrite(sub.c_str(), sizeof(char), sub.size(), fp); nwritten += count; if ((count == 0) && (ferror(fp))) { fclose(fp); throw std::runtime_error("file_meta_sink: error writing extra to file."); } } fflush(fp); } void file_meta_sink_impl::update_header(pmt::pmt_t key, pmt::pmt_t value) { // Special handling caveat to transform rate from radio source into // the rate at this sink. if (pmt::eq(key, mp("rx_rate"))) { d_samp_rate = pmt::to_double(value); value = pmt::from_double(d_samp_rate * d_relative_rate); } // If the tag is not part of the standard header, we put it into the // extra data, which either updates the current dictionary or adds a // new item. if (pmt::dict_has_key(d_header, key)) { d_header = pmt::dict_add(d_header, key, value); } else { d_extra = pmt::dict_add(d_extra, key, value); d_extra_size = pmt::serialize_str(d_extra).size(); } } void file_meta_sink_impl::update_last_header() { if (d_state == STATE_DETACHED) { if (d_hdr_fp) update_last_header_detached(); } else { if (d_fp) update_last_header_inline(); } } void file_meta_sink_impl::update_last_header_inline() { // Update the last header info with the number of samples this // block represents. size_t hdrlen = pmt::to_uint64(pmt::dict_ref(d_header, mp("strt"), pmt::PMT_NIL)); size_t seg_size = d_itemsize * d_total_seg_size; pmt::pmt_t s = pmt::from_uint64(seg_size); update_header(mp("bytes"), s); update_header(mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size)); if (fseek(d_fp, -seg_size - hdrlen, SEEK_CUR) == -1) { throw std::runtime_error("fseek() failed."); } write_header(d_fp, d_header, d_extra); if (fseek(d_fp, seg_size, SEEK_CUR) == -1) { throw std::runtime_error("fseek() failed."); } } void file_meta_sink_impl::update_last_header_detached() { // Update the last header info with the number of samples this // block represents. size_t hdrlen = pmt::to_uint64(pmt::dict_ref(d_header, mp("strt"), pmt::PMT_NIL)); size_t seg_size = d_itemsize * d_total_seg_size; pmt::pmt_t s = pmt::from_uint64(seg_size); update_header(mp("bytes"), s); update_header(mp("strt"), pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size)); if (fseek(d_hdr_fp, -hdrlen, SEEK_CUR) == -1) { throw std::runtime_error("fseek() failed."); } write_header(d_hdr_fp, d_header, d_extra); } void file_meta_sink_impl::write_and_update() { // New header, so set current size of chunk to 0 and start of chunk // based on current index + header size. // uint64_t loc = get_last_header_loc(); pmt::pmt_t s = pmt::from_uint64(0); update_header(mp("bytes"), s); // If we have multiple tags on the same offset, this makes // sure we just overwrite the same header each time instead // of creating a new header per tag. s = pmt::from_uint64(METADATA_HEADER_SIZE + d_extra_size); update_header(mp("strt"), s); if (d_state == STATE_DETACHED) write_header(d_hdr_fp, d_header, d_extra); else write_header(d_fp, d_header, d_extra); } void file_meta_sink_impl::update_rx_time() { pmt::pmt_t rx_time = pmt::string_to_symbol("rx_time"); pmt::pmt_t r = pmt::dict_ref(d_header, rx_time, pmt::PMT_NIL); uint64_t secs = pmt::to_uint64(pmt::tuple_ref(r, 0)); double fracs = pmt::to_double(pmt::tuple_ref(r, 1)); double diff = d_total_seg_size / (d_samp_rate * d_relative_rate); // std::cerr << "old secs: " << secs << std::endl; // std::cerr << "old fracs: " << fracs << std::endl; // std::cerr << "seg size: " << d_total_seg_size << std::endl; // std::cerr << "diff: " << diff << std::endl; fracs += diff; uint64_t new_secs = static_cast<uint64_t>(fracs); secs += new_secs; fracs -= new_secs; // std::cerr << "new secs: " << secs << std::endl; // std::cerr << "new fracs: " << fracs << std::endl << std::endl; r = pmt::make_tuple(pmt::from_uint64(secs), pmt::from_double(fracs)); d_header = pmt::dict_add(d_header, rx_time, r); } int file_meta_sink_impl::work(int noutput_items, gr_vector_const_void_star& input_items, gr_vector_void_star& output_items) { char* inbuf = (char*)input_items[0]; int nwritten = 0; do_update(); // update d_fp is reqd if (!d_fp) return noutput_items; // drop output on the floor uint64_t abs_N = nitems_read(0); uint64_t end_N = abs_N + (uint64_t)(noutput_items); std::vector<tag_t> all_tags; get_tags_in_range(all_tags, 0, abs_N, end_N); for (const auto& tag : all_tags) { int item_offset = (int)(tag.offset - abs_N); // Write date to file up to the next tag location while (nwritten < item_offset) { size_t towrite = std::min(d_max_seg_size - d_total_seg_size, (size_t)(item_offset - nwritten)); int count = fwrite(inbuf, d_itemsize, towrite, d_fp); if (count == 0) // FIXME add error handling break; nwritten += count; inbuf += count * d_itemsize; d_total_seg_size += count; // Only add a new header if we are not at the position of the // next tag if ((d_total_seg_size == d_max_seg_size) && (nwritten < item_offset)) { update_last_header(); update_rx_time(); write_and_update(); d_total_seg_size = 0; } } if (d_total_seg_size > 0) { update_last_header(); update_header(tag.key, tag.value); write_and_update(); d_total_seg_size = 0; } else { update_header(tag.key, tag.value); update_last_header(); } } // Finish up the rest of the data after tags while (nwritten < noutput_items) { size_t towrite = std::min(d_max_seg_size - d_total_seg_size, (size_t)(noutput_items - nwritten)); int count = fwrite(inbuf, d_itemsize, towrite, d_fp); if (count == 0) // FIXME add error handling break; nwritten += count; inbuf += count * d_itemsize; d_total_seg_size += count; if (d_total_seg_size == d_max_seg_size) { update_last_header(); update_rx_time(); write_and_update(); d_total_seg_size = 0; } } if (d_unbuffered) fflush(d_fp); return nwritten; } } /* namespace blocks */ } /* namespace gr */