diff options
Diffstat (limited to 'gr-blocks/lib/file_meta_source_impl.cc')
-rw-r--r-- | gr-blocks/lib/file_meta_source_impl.cc | 702 |
1 files changed, 342 insertions, 360 deletions
diff --git a/gr-blocks/lib/file_meta_source_impl.cc b/gr-blocks/lib/file_meta_source_impl.cc index 3c72e2a003..fcf8228de5 100644 --- a/gr-blocks/lib/file_meta_source_impl.cc +++ b/gr-blocks/lib/file_meta_source_impl.cc @@ -38,409 +38,391 @@ #include <io.h> #endif #ifdef O_BINARY -#define OUR_O_BINARY O_BINARY +#define OUR_O_BINARY O_BINARY #else -#define OUR_O_BINARY 0 +#define OUR_O_BINARY 0 #endif // should be handled via configure #ifdef O_LARGEFILE -#define OUR_O_LARGEFILE O_LARGEFILE +#define OUR_O_LARGEFILE O_LARGEFILE #else -#define OUR_O_LARGEFILE 0 +#define OUR_O_LARGEFILE 0 #endif namespace gr { - namespace blocks { - - - file_meta_source::sptr - file_meta_source::make(const std::string &filename, - bool repeat, - bool detached_header, - const std::string &hdr_filename) - { - return gnuradio::get_initial_sptr - (new file_meta_source_impl(filename, - repeat, - detached_header, - hdr_filename)); +namespace blocks { + + +file_meta_source::sptr file_meta_source::make(const std::string& filename, + bool repeat, + bool detached_header, + const std::string& hdr_filename) +{ + return gnuradio::get_initial_sptr( + new file_meta_source_impl(filename, repeat, detached_header, hdr_filename)); +} + +file_meta_source_impl::file_meta_source_impl(const std::string& filename, + bool repeat, + bool detached_header, + const std::string& hdr_filename) + : sync_block( + "file_meta_source", io_signature::make(0, 0, 0), io_signature::make(1, 1, 1)), + d_itemsize(0), + d_samp_rate(0), + d_seg_size(0), + d_updated(false), + d_repeat(repeat) +{ + d_fp = 0; + d_new_fp = 0; + d_hdr_fp = 0; + d_new_hdr_fp = 0; + + if (detached_header == true) { + d_state = STATE_DETACHED; + } else + d_state = STATE_INLINE; + + if (!open(filename, hdr_filename)) + throw std::runtime_error("file_meta_source: can't open file\n"); + + do_update(); + + pmt::pmt_t hdr = pmt::PMT_NIL, extras = pmt::PMT_NIL; + if (read_header(hdr, extras)) { + parse_header(hdr, 0, d_tags); + parse_extras(extras, 0, d_tags); + } else + throw std::runtime_error("file_meta_source: could not read header.\n"); + + // Set output signature based on itemsize info in header + set_output_signature(io_signature::make(1, 1, d_itemsize)); +} + +file_meta_source_impl::~file_meta_source_impl() { close(); } + +bool file_meta_source_impl::read_header(pmt::pmt_t& hdr, pmt::pmt_t& extras) +{ + // Select which file handle to read from. + FILE* fp; + if (d_state == STATE_DETACHED) + fp = d_hdr_fp; + else + fp = d_fp; + + size_t ret; + size_t size = 0; + std::string str; + char* hdr_buffer = new char[METADATA_HEADER_SIZE]; + while (size < METADATA_HEADER_SIZE) { + ret = fread(&hdr_buffer[size], sizeof(char), METADATA_HEADER_SIZE - size, fp); + if (ret == 0) { + delete[] hdr_buffer; + if (feof(fp)) + return false; + else { + std::stringstream s; + s << "file_meta_source: error occurred extracting header: " + << strerror(errno) << std::endl; + throw std::runtime_error(s.str()); + } + } + size += ret; } - file_meta_source_impl::file_meta_source_impl(const std::string &filename, - bool repeat, - bool detached_header, - const std::string &hdr_filename) - : sync_block("file_meta_source", - io_signature::make(0, 0, 0), - io_signature::make(1, 1, 1)), - d_itemsize(0), d_samp_rate(0), - d_seg_size(0), - d_updated(false), d_repeat(repeat) - { - d_fp = 0; - d_new_fp = 0; - d_hdr_fp = 0; - d_new_hdr_fp = 0; - - if(detached_header == true) { - d_state = STATE_DETACHED; - } - else - d_state = STATE_INLINE; - - if(!open(filename, hdr_filename)) - throw std::runtime_error("file_meta_source: can't open file\n"); - - do_update(); - - pmt::pmt_t hdr = pmt::PMT_NIL, extras = pmt::PMT_NIL; - if(read_header(hdr, extras)) { - parse_header(hdr, 0, d_tags); - parse_extras(extras, 0, d_tags); - } - else - throw std::runtime_error("file_meta_source: could not read header.\n"); - - // Set output signature based on itemsize info in header - set_output_signature(io_signature::make(1, 1, d_itemsize)); + // Convert to string or the char array gets confused by the \0 + str.insert(0, hdr_buffer, METADATA_HEADER_SIZE); + hdr = pmt::deserialize_str(str); + delete[] hdr_buffer; + + uint64_t seg_start, extra_len = 0; + pmt::pmt_t r, dump; + if (pmt::dict_has_key(hdr, pmt::string_to_symbol("strt"))) { + r = pmt::dict_ref(hdr, pmt::string_to_symbol("strt"), dump); + seg_start = pmt::to_uint64(r); + extra_len = seg_start - METADATA_HEADER_SIZE; } - file_meta_source_impl::~file_meta_source_impl() - { - close(); - + if (extra_len > 0) { + size = 0; + hdr_buffer = new char[extra_len]; + while (size < extra_len) { + ret = fread(&hdr_buffer[size], sizeof(char), extra_len - size, fp); + if (ret == 0) { + delete[] hdr_buffer; + if (feof(fp)) + return false; + else { + std::stringstream s; + s << "file_meta_source: error occurred extracting extras: " + << strerror(errno) << std::endl; + throw std::runtime_error(s.str()); + } + } + size += ret; + } + str.clear(); + str.insert(0, hdr_buffer, extra_len); + extras = pmt::deserialize_str(str); + delete[] hdr_buffer; } - bool - file_meta_source_impl::read_header(pmt::pmt_t &hdr, pmt::pmt_t &extras) - { - // Select which file handle to read from. - FILE *fp; - if(d_state == STATE_DETACHED) - fp = d_hdr_fp; - else - fp = d_fp; - - size_t ret; - size_t size = 0; - std::string str; - char *hdr_buffer = new char[METADATA_HEADER_SIZE]; - while(size < METADATA_HEADER_SIZE) { - ret = fread(&hdr_buffer[size], sizeof(char), METADATA_HEADER_SIZE-size, fp); - if(ret == 0) { - delete [] hdr_buffer; - if(feof(fp)) - return false; - else { - std::stringstream s; - s << "file_meta_source: error occurred extracting header: " - << strerror(errno) << std::endl; - throw std::runtime_error(s.str()); - } - } - size += ret; - } - - // Convert to string or the char array gets confused by the \0 - str.insert(0, hdr_buffer, METADATA_HEADER_SIZE); - hdr = pmt::deserialize_str(str); - delete [] hdr_buffer; - - uint64_t seg_start, extra_len = 0; - pmt::pmt_t r, dump; - if(pmt::dict_has_key(hdr, pmt::string_to_symbol("strt"))) { - r = pmt::dict_ref(hdr, pmt::string_to_symbol("strt"), dump); - seg_start = pmt::to_uint64(r); - extra_len = seg_start - METADATA_HEADER_SIZE; - } - - if(extra_len > 0) { - size = 0; - hdr_buffer = new char[extra_len]; - while(size < extra_len) { - ret = fread(&hdr_buffer[size], sizeof(char), extra_len-size, fp); - if(ret == 0) { - delete [] hdr_buffer; - if(feof(fp)) - return false; - else { - std::stringstream s; - s << "file_meta_source: error occurred extracting extras: " - << strerror(errno) << std::endl; - throw std::runtime_error(s.str()); - } - } - size += ret; - } - - str.clear(); - str.insert(0, hdr_buffer, extra_len); - extras = pmt::deserialize_str(str); - delete [] hdr_buffer; - } - - return true; + return true; +} + +void file_meta_source_impl::parse_header(pmt::pmt_t hdr, + uint64_t offset, + std::vector<tag_t>& tags) +{ + pmt::pmt_t r, key; + + // GET SAMPLE RATE + key = pmt::string_to_symbol("rx_rate"); + if (pmt::dict_has_key(hdr, key)) { + r = pmt::dict_ref(hdr, key, pmt::PMT_NIL); + d_samp_rate = pmt::to_double(r); + + tag_t t; + t.offset = offset; + t.key = key; + t.value = r; + t.srcid = alias_pmt(); + tags.push_back(t); + } else { + throw std::runtime_error("file_meta_source: Could not extract sample rate.\n"); } - void - file_meta_source_impl::parse_header(pmt::pmt_t hdr, uint64_t offset, - std::vector<tag_t> &tags) - { - pmt::pmt_t r, key; - - // GET SAMPLE RATE - key = pmt::string_to_symbol("rx_rate"); - if(pmt::dict_has_key(hdr, key)) { - r = pmt::dict_ref(hdr, key, pmt::PMT_NIL); - d_samp_rate = pmt::to_double(r); - - tag_t t; - t.offset = offset; - t.key = key; - t.value = r; - t.srcid = alias_pmt(); - tags.push_back(t); - } - else { - throw std::runtime_error("file_meta_source: Could not extract sample rate.\n"); - } - - // GET TIME STAMP - key = pmt::string_to_symbol("rx_time"); - if(pmt::dict_has_key(hdr, key)) { - d_time_stamp = pmt::dict_ref(hdr, key, pmt::PMT_NIL); - - tag_t t; - t.offset = offset; - t.key = key; - t.value = d_time_stamp; - t.srcid = alias_pmt(); - tags.push_back(t); - } - else { - throw std::runtime_error("file_meta_source: Could not extract time stamp.\n"); - } - - // GET ITEM SIZE OF DATA - if(pmt::dict_has_key(hdr, pmt::string_to_symbol("size"))) { - d_itemsize = pmt::to_long(pmt::dict_ref(hdr, pmt::string_to_symbol("size"), pmt::PMT_NIL)); - } - else { - throw std::runtime_error("file_meta_source: Could not extract item size.\n"); - } - - // GET SEGMENT SIZE - if(pmt::dict_has_key(hdr, pmt::string_to_symbol("bytes"))) { - d_seg_size = pmt::to_uint64(pmt::dict_ref(hdr, pmt::string_to_symbol("bytes"), pmt::PMT_NIL)); - - // Convert from bytes to items - d_seg_size /= d_itemsize; - } - else { - throw std::runtime_error("file_meta_source: Could not extract segment size.\n"); - } + // GET TIME STAMP + key = pmt::string_to_symbol("rx_time"); + if (pmt::dict_has_key(hdr, key)) { + d_time_stamp = pmt::dict_ref(hdr, key, pmt::PMT_NIL); + + tag_t t; + t.offset = offset; + t.key = key; + t.value = d_time_stamp; + t.srcid = alias_pmt(); + tags.push_back(t); + } else { + throw std::runtime_error("file_meta_source: Could not extract time stamp.\n"); } - void - file_meta_source_impl::parse_extras(pmt::pmt_t extras, uint64_t offset, - std::vector<tag_t> &tags) - { - pmt::pmt_t item, key, val; - - size_t nitems = pmt::length(extras); - for(size_t i = 0; i < nitems; i++) { - item = pmt::nth(i, extras); - key = pmt::car(item); - val = pmt::cdr(item); - - tag_t t; - t.offset = offset; - t.key = key; - t.value = val; - t.srcid = alias_pmt(); - tags.push_back(t); - } + // GET ITEM SIZE OF DATA + if (pmt::dict_has_key(hdr, pmt::string_to_symbol("size"))) { + d_itemsize = + pmt::to_long(pmt::dict_ref(hdr, pmt::string_to_symbol("size"), pmt::PMT_NIL)); + } else { + throw std::runtime_error("file_meta_source: Could not extract item size.\n"); } - bool - file_meta_source_impl::open(const std::string &filename, - const std::string &hdr_filename) - { - bool ret = true; - if(d_state == STATE_DETACHED) { - std::string s; - if(hdr_filename == "") - s = filename + ".hdr"; - else - s = hdr_filename; - ret = _open(&d_new_hdr_fp, s.c_str()); - } - - ret = ret && _open(&d_new_fp, filename.c_str()); - d_updated = true; - return ret; + // GET SEGMENT SIZE + if (pmt::dict_has_key(hdr, pmt::string_to_symbol("bytes"))) { + d_seg_size = pmt::to_uint64( + pmt::dict_ref(hdr, pmt::string_to_symbol("bytes"), pmt::PMT_NIL)); + + // Convert from bytes to items + d_seg_size /= d_itemsize; + } else { + throw std::runtime_error("file_meta_source: Could not extract segment size.\n"); + } +} + +void file_meta_source_impl::parse_extras(pmt::pmt_t extras, + uint64_t offset, + std::vector<tag_t>& tags) +{ + pmt::pmt_t item, key, val; + + size_t nitems = pmt::length(extras); + for (size_t i = 0; i < nitems; i++) { + item = pmt::nth(i, extras); + key = pmt::car(item); + val = pmt::cdr(item); + + tag_t t; + t.offset = offset; + t.key = key; + t.value = val; + t.srcid = alias_pmt(); + tags.push_back(t); + } +} + +bool file_meta_source_impl::open(const std::string& filename, + const std::string& hdr_filename) +{ + bool ret = true; + if (d_state == STATE_DETACHED) { + std::string s; + if (hdr_filename == "") + s = filename + ".hdr"; + else + s = hdr_filename; + ret = _open(&d_new_hdr_fp, s.c_str()); } - bool - file_meta_source_impl::_open(FILE **fp, const char *filename) - { - gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function + ret = ret && _open(&d_new_fp, filename.c_str()); + d_updated = true; + return ret; +} - bool ret = true; - int fd; +bool file_meta_source_impl::_open(FILE** fp, const char* filename) +{ + gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function - if((fd = ::open(filename, - O_RDONLY|OUR_O_LARGEFILE|OUR_O_BINARY)) < 0) { - perror(filename); - return false; - } + bool ret = true; + int fd; - if(*fp) { // if we've already got a new one open, close it - fclose(*fp); - fp = 0; - } + if ((fd = ::open(filename, O_RDONLY | OUR_O_LARGEFILE | OUR_O_BINARY)) < 0) { + perror(filename); + return false; + } - if((*fp = fdopen(fd, "rb")) == NULL) { - perror(filename); - ::close(fd); // don't leak file descriptor if fdopen fails. - } + if (*fp) { // if we've already got a new one open, close it + fclose(*fp); + fp = 0; + } - ret = fp != 0; + if ((*fp = fdopen(fd, "rb")) == NULL) { + perror(filename); + ::close(fd); // don't leak file descriptor if fdopen fails. + } + + ret = fp != 0; + + return ret; +} - return ret; +void file_meta_source_impl::close() +{ + gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function + if (d_state == STATE_DETACHED) { + if (d_new_hdr_fp) { + fclose(d_new_hdr_fp); + d_new_hdr_fp = 0; + } } - void - file_meta_source_impl::close() - { - gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this function - if(d_state == STATE_DETACHED) { - if(d_new_hdr_fp) { - fclose(d_new_hdr_fp); - d_new_hdr_fp = 0; - } - } - - if(d_new_fp) { - fclose(d_new_fp); - d_new_fp = 0; - } - d_updated = true; - - if (d_fp) { + if (d_new_fp) { + fclose(d_new_fp); + d_new_fp = 0; + } + d_updated = true; + + if (d_fp) { fclose(d_fp); d_fp = 0; - } + } - if (d_state == STATE_DETACHED) { + if (d_state == STATE_DETACHED) { if (d_hdr_fp) { - fclose(d_hdr_fp); - d_hdr_fp = 0; + fclose(d_hdr_fp); + d_hdr_fp = 0; } - } } +} + +void file_meta_source_impl::do_update() +{ + if (d_updated) { + gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this block + if (d_state == STATE_DETACHED) { + if (d_hdr_fp) + fclose(d_hdr_fp); + d_hdr_fp = d_new_hdr_fp; // install new file pointer + d_new_hdr_fp = 0; + } + + if (d_fp) + fclose(d_fp); + d_fp = d_new_fp; // install new file pointer + d_new_fp = 0; - void - file_meta_source_impl::do_update() - { - if(d_updated) { - gr::thread::scoped_lock guard(d_setlock); // hold mutex for duration of this block - if(d_state == STATE_DETACHED) { - if(d_hdr_fp) - fclose(d_hdr_fp); - d_hdr_fp = d_new_hdr_fp; // install new file pointer - d_new_hdr_fp = 0; - } - - if(d_fp) - fclose(d_fp); - d_fp = d_new_fp; // install new file pointer - d_new_fp = 0; - - d_updated = false; - } + d_updated = false; } +} + +int file_meta_source_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + // We've reached the end of a segment; parse the next header and get + // the new tags to send and set the next segment size. + if (d_seg_size == 0) { + pmt::pmt_t hdr = pmt::PMT_NIL, extras = pmt::PMT_NIL; + if (read_header(hdr, extras)) { + parse_header(hdr, nitems_written(0), d_tags); + parse_extras(extras, nitems_written(0), d_tags); + } else { + if (!d_repeat) + return -1; + else { + if (fseek(d_fp, 0, SEEK_SET) == -1) { + std::stringstream s; + s << "[" << __FILE__ << "]" + << " fseek failed" << std::endl; + throw std::runtime_error(s.str()); + } + } + } + } + + char* out = (char*)output_items[0]; + int i; + int seg_size = std::min(noutput_items, (int)d_seg_size); + int size = seg_size; + + do_update(); // update d_fp is reqd + if (d_fp == NULL) + throw std::runtime_error("work with file not open"); + + // Push all tags onto the stream and remove them from the vector + while (!d_tags.empty()) { + add_item_tag(0, d_tags.back()); + d_tags.pop_back(); + } + + gr::thread::scoped_lock lock(d_setlock); // hold for the rest of this function + while (size) { + i = fread(out, d_itemsize, size, d_fp); + + size -= i; + d_seg_size -= i; + out += i * d_itemsize; - int - file_meta_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - // We've reached the end of a segment; parse the next header and get - // the new tags to send and set the next segment size. - if(d_seg_size == 0) { - pmt::pmt_t hdr=pmt::PMT_NIL, extras=pmt::PMT_NIL; - if(read_header(hdr, extras)) { - parse_header(hdr, nitems_written(0), d_tags); - parse_extras(extras, nitems_written(0), d_tags); - } - else { - if(!d_repeat) + if (size == 0) // done + break; + + if (i > 0) // short read, try again + continue; + + // We got a zero from fread. This is either EOF or error. In + // any event, if we're in repeat mode, seek back to the beginning + // of the file and try again, else break + + if (!d_repeat) + break; + + if (fseek(d_fp, 0, SEEK_SET) == -1) { + std::stringstream s; + s << "[" << __FILE__ << "]" + << " fseek failed" << std::endl; + throw std::runtime_error(s.str()); + } + } + + if (size > 0) { // EOF or error + if (size == seg_size) // we didn't read anything; say we're done return -1; - else { - if(fseek(d_fp, 0, SEEK_SET) == -1) { - std::stringstream s; - s << "[" << __FILE__ << "]" << " fseek failed" << std::endl; - throw std::runtime_error(s.str()); - } - } - } - } - - char *out = (char*)output_items[0]; - int i; - int seg_size = std::min(noutput_items, (int)d_seg_size); - int size = seg_size; - - do_update(); // update d_fp is reqd - if(d_fp == NULL) - throw std::runtime_error("work with file not open"); - - // Push all tags onto the stream and remove them from the vector - while(!d_tags.empty()) { - add_item_tag(0, d_tags.back()); - d_tags.pop_back(); - } - - gr::thread::scoped_lock lock(d_setlock); // hold for the rest of this function - while(size) { - i = fread(out, d_itemsize, size, d_fp); - - size -= i; - d_seg_size -= i; - out += i * d_itemsize; - - if(size == 0) // done - break; - - if(i > 0) // short read, try again - continue; - - // We got a zero from fread. This is either EOF or error. In - // any event, if we're in repeat mode, seek back to the beginning - // of the file and try again, else break - - if(!d_repeat) - break; - - if(fseek(d_fp, 0, SEEK_SET) == -1) { - std::stringstream s; - s << "[" << __FILE__ << "]" << " fseek failed" << std::endl; - throw std::runtime_error(s.str()); - } - } - - if(size > 0) { // EOF or error - if(size == seg_size) // we didn't read anything; say we're done - return -1; - return seg_size - size; // else return partial result - } - - return seg_size; + return seg_size - size; // else return partial result } - } /* namespace blocks */ + return seg_size; +} + +} /* namespace blocks */ } /* namespace gr */ |