/* -*- c++ -*- */ /* * Copyright 2012 Free Software Foundation, Inc. * * This file is part of GNU Radio * * SPDX-License-Identifier: GPL-3.0-or-later * */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "file_meta_source_impl.h" #include <gnuradio/io_signature.h> #include <fcntl.h> #include <stdio.h> #include <sys/stat.h> #include <sys/types.h> #include <cstdio> #include <stdexcept> // win32 (mingw/msvc) specific #ifdef HAVE_IO_H #include <io.h> #endif #ifdef O_BINARY #define OUR_O_BINARY O_BINARY #else #define OUR_O_BINARY 0 #endif // should be handled via configure #ifdef O_LARGEFILE #define OUR_O_LARGEFILE O_LARGEFILE #else #define OUR_O_LARGEFILE 0 #endif namespace gr { namespace blocks { file_meta_source::sptr file_meta_source::make(const std::string& filename, bool repeat, bool detached_header, const std::string& hdr_filename) { return gnuradio::make_block_sptr<file_meta_source_impl>( filename, repeat, detached_header, hdr_filename); } file_meta_source_impl::file_meta_source_impl(const std::string& filename, bool repeat, bool detached_header, const std::string& hdr_filename) : sync_block( "file_meta_source", io_signature::make(0, 0, 0), io_signature::make(1, 1, 1)), d_itemsize(0), d_samp_rate(0), d_seg_size(0), d_updated(false), d_repeat(repeat) { d_fp = 0; d_new_fp = 0; d_hdr_fp = 0; d_new_hdr_fp = 0; if (detached_header == true) { d_state = STATE_DETACHED; } else d_state = STATE_INLINE; if (!open(filename, hdr_filename)) throw std::runtime_error("file_meta_source: can't open file"); do_update(); pmt::pmt_t hdr = pmt::PMT_NIL, extras = pmt::PMT_NIL; if (read_header(hdr, extras)) { parse_header(hdr, 0, d_tags); parse_extras(extras, 0, d_tags); } else throw std::runtime_error("file_meta_source: could not read header."); // Set output signature based on itemsize info in header set_output_signature(io_signature::make(1, 1, d_itemsize)); } file_meta_source_impl::~file_meta_source_impl() { close(); } bool file_meta_source_impl::read_header(pmt::pmt_t& hdr, pmt::pmt_t& extras) { // Select which file handle to read from. FILE* fp; if (d_state == STATE_DETACHED) fp = d_hdr_fp; else fp = d_fp; size_t ret; size_t size = 0; std::string str; std::vector<char> hdr_buffer(METADATA_HEADER_SIZE); while (size < METADATA_HEADER_SIZE) { ret = fread(&hdr_buffer[size], sizeof(char), METADATA_HEADER_SIZE - size, fp); if (ret == 0) { if (feof(fp)) return false; else { std::stringstream s; s << "file_meta_source: error occurred extracting header: " << strerror(errno) << std::endl; throw std::runtime_error(s.str()); } } size += ret; } // Convert to string or the char array gets confused by the \0 str.insert(0, hdr_buffer.data(), METADATA_HEADER_SIZE); hdr = pmt::deserialize_str(str); hdr_buffer.clear(); uint64_t seg_start, extra_len = 0; pmt::pmt_t r, dump; if (pmt::dict_has_key(hdr, pmt::string_to_symbol("strt"))) { r = pmt::dict_ref(hdr, pmt::string_to_symbol("strt"), dump); seg_start = pmt::to_uint64(r); extra_len = seg_start - METADATA_HEADER_SIZE; } if (extra_len > 0) { size = 0; hdr_buffer.resize(extra_len); while (size < extra_len) { ret = fread(&hdr_buffer[size], sizeof(char), extra_len - size, fp); if (ret == 0) { if (feof(fp)) return false; else { std::stringstream s; s << "file_meta_source: error occurred extracting extras: " << strerror(errno) << std::endl; throw std::runtime_error(s.str()); } } size += ret; } str.clear(); str.insert(0, hdr_buffer.data(), extra_len); extras = pmt::deserialize_str(str); } return true; } void file_meta_source_impl::parse_header(pmt::pmt_t hdr, uint64_t offset, std::vector<tag_t>& tags) { pmt::pmt_t r, key; // GET SAMPLE RATE key = pmt::string_to_symbol("rx_rate"); if (pmt::dict_has_key(hdr, key)) { r = pmt::dict_ref(hdr, key, pmt::PMT_NIL); d_samp_rate = pmt::to_double(r); tag_t t; t.offset = offset; t.key = key; t.value = r; t.srcid = alias_pmt(); tags.push_back(t); } else { throw std::runtime_error("file_meta_source: Could not extract sample rate."); } // GET TIME STAMP key = pmt::string_to_symbol("rx_time"); if (pmt::dict_has_key(hdr, key)) { d_time_stamp = pmt::dict_ref(hdr, key, pmt::PMT_NIL); tag_t t; t.offset = offset; t.key = key; t.value = d_time_stamp; t.srcid = alias_pmt(); tags.push_back(t); } else { throw std::runtime_error("file_meta_source: Could not extract time stamp."); } // GET ITEM SIZE OF DATA if (pmt::dict_has_key(hdr, pmt::string_to_symbol("size"))) { d_itemsize = pmt::to_long(pmt::dict_ref(hdr, pmt::string_to_symbol("size"), pmt::PMT_NIL)); } else { throw std::runtime_error("file_meta_source: Could not extract item size."); } // GET SEGMENT SIZE if (pmt::dict_has_key(hdr, pmt::string_to_symbol("bytes"))) { d_seg_size = pmt::to_uint64( pmt::dict_ref(hdr, pmt::string_to_symbol("bytes"), pmt::PMT_NIL)); // Convert from bytes to items d_seg_size /= d_itemsize; } else { throw std::runtime_error("file_meta_source: Could not extract segment size."); } } void file_meta_source_impl::parse_extras(pmt::pmt_t extras, uint64_t offset, std::vector<tag_t>& tags) { pmt::pmt_t item, key, val; size_t nitems = pmt::length(extras); for (size_t i = 0; i < nitems; i++) { item = pmt::nth(i, extras); key = pmt::car(item); val = pmt::cdr(item); tag_t t; t.offset = offset; t.key = key; t.value = val; t.srcid = alias_pmt(); tags.push_back(t); } } bool file_meta_source_impl::open(const std::string& filename, const std::string& hdr_filename) { bool ret = true; if (d_state == STATE_DETACHED) { std::string s; if (hdr_filename.empty()) s = filename + ".hdr"; else s = hdr_filename; ret = _open(&d_new_hdr_fp, s.c_str()); } ret = ret && _open(&d_new_fp, filename.c_str()); d_updated = true; return ret; } bool file_meta_source_impl::_open(FILE** fp, const char* filename) { gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function bool ret = true; int fd; if ((fd = ::open(filename, O_RDONLY | OUR_O_LARGEFILE | OUR_O_BINARY)) < 0) { GR_LOG_ERROR(d_logger, boost::format("%s: %s") % filename % strerror(errno)); return false; } if (*fp) { // if we've already got a new one open, close it fclose(*fp); fp = 0; } if ((*fp = fdopen(fd, "rb")) == NULL) { GR_LOG_ERROR(d_logger, boost::format("%s: %s") % filename % strerror(errno)); ::close(fd); // don't leak file descriptor if fdopen fails. } ret = fp != 0; return ret; } void file_meta_source_impl::close() { gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function if (d_state == STATE_DETACHED) { if (d_new_hdr_fp) { fclose(d_new_hdr_fp); d_new_hdr_fp = 0; } } if (d_new_fp) { fclose(d_new_fp); d_new_fp = 0; } d_updated = true; if (d_fp) { fclose(d_fp); d_fp = 0; } if (d_state == STATE_DETACHED) { if (d_hdr_fp) { fclose(d_hdr_fp); d_hdr_fp = 0; } } } void file_meta_source_impl::do_update() { if (d_updated) { gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this block if (d_state == STATE_DETACHED) { if (d_hdr_fp) fclose(d_hdr_fp); d_hdr_fp = d_new_hdr_fp; // install new file pointer d_new_hdr_fp = 0; } if (d_fp) fclose(d_fp); d_fp = d_new_fp; // install new file pointer d_new_fp = 0; d_updated = false; } } int file_meta_source_impl::work(int noutput_items, gr_vector_const_void_star& input_items, gr_vector_void_star& output_items) { // We've reached the end of a segment; parse the next header and get // the new tags to send and set the next segment size. if (d_seg_size == 0) { pmt::pmt_t hdr = pmt::PMT_NIL, extras = pmt::PMT_NIL; if (read_header(hdr, extras)) { parse_header(hdr, nitems_written(0), d_tags); parse_extras(extras, nitems_written(0), d_tags); } else { if (!d_repeat) return -1; else { if (fseek(d_fp, 0, SEEK_SET) == -1) { std::stringstream s; s << "[" << __FILE__ << "]" << " fseek failed" << std::endl; throw std::runtime_error(s.str()); } } } } char* out = (char*)output_items[0]; int i; int seg_size = std::min(noutput_items, (int)d_seg_size); int size = seg_size; do_update(); // update d_fp is reqd if (d_fp == NULL) throw std::runtime_error("work with file not open"); // Push all tags onto the stream and remove them from the vector while (!d_tags.empty()) { add_item_tag(0, d_tags.back()); d_tags.pop_back(); } gr::thread::scoped_lock lock(d_setlock); // hold for the rest of this function while (size) { i = fread(out, d_itemsize, size, d_fp); size -= i; d_seg_size -= i; out += i * d_itemsize; if (size == 0) // done break; if (i > 0) // short read, try again continue; // We got a zero from fread. This is either EOF or error. In // any event, if we're in repeat mode, seek back to the beginning // of the file and try again, else break if (!d_repeat) break; if (fseek(d_fp, 0, SEEK_SET) == -1) { std::stringstream s; s << "[" << __FILE__ << "]" << " fseek failed" << std::endl; throw std::runtime_error(s.str()); } } if (size > 0) { // EOF or error if (size == seg_size) // we didn't read anything; say we're done return -1; return seg_size - size; // else return partial result } return seg_size; } } /* namespace blocks */ } /* namespace gr */