1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
/* -*- c++ -*- */
/*
* Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
*
* This file is part of GNU Radio
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gnuradio/block.h>
#include <gnuradio/buffer.h>
#include <gnuradio/buffer_reader_sm.h>
#include <gnuradio/integer_math.h>
#include <gnuradio/math.h>
#include <assert.h>
#include <algorithm>
#include <iostream>
#include <limits>
#include <stdexcept>
namespace gr {
buffer_reader_sm::~buffer_reader_sm() {}
int buffer_reader_sm::items_available()
{
int available = 0;
if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
if (d_buffer->d_write_index == d_read_index) {
if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
available = d_buffer->d_bufsize - d_read_index;
}
} else {
available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
}
}
#ifdef BUFFER_DEBUG
std::ostringstream msg;
msg << "[" << d_buffer << ";" << this << "] "
<< "items_available() WR_idx: " << d_buffer->d_write_index
<< " -- WR items: " << d_buffer->nitems_written()
<< " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-"
<< d_attr_delay << ") -- available: " << available;
GR_LOG_DEBUG(d_logger, msg.str());
#endif
return available;
}
bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
{
gr::thread::scoped_lock(*d_buffer->mutex());
return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
(d_buffer->d_write_index < d_read_index));
}
bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
{
// Maybe adjust read pointers from min read index?
// This would mean that *all* readers must be > (passed) the write index
if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
(d_buffer->d_write_index < d_read_index)) {
// Update items available before going farther as it could be stale
items_avail = items_available();
// Find reader with the smallest read index that is greater than the
// write index
uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
// Record index of reader with minimum read-index
if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
min_read_idx = d_buffer->d_readers[idx]->d_read_index;
min_reader_index = idx;
}
}
}
// Note items_avail might be zero, that's okay.
items_avail += d_read_index - min_read_idx;
int gap = min_read_idx - d_buffer->d_write_index;
if (items_avail > gap) {
return false;
}
#ifdef BUFFER_DEBUG
std::ostringstream msg;
msg << "[" << d_buffer << ";" << this << "] "
<< "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
<< " -- WR items: " << d_buffer->nitems_written()
<< " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
if (idx != min_reader_index) {
msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
}
}
msg << " -- GAP: " << gap << " -- items_required: " << items_required
<< " -- items_avail: " << items_avail;
GR_LOG_DEBUG(d_logger, msg.str());
#endif
// Shift existing data down to make room for blocked data at end of buffer
uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
std::memmove(dest, d_buffer->d_base, move_data_size);
// Next copy the data from the end of the buffer back to the beginning
uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
std::memcpy(d_buffer->d_base, src, avail_data_size);
// Now adjust write pointer
d_buffer->d_write_index += items_avail;
// Finally adjust all reader pointers
for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
if (idx == min_reader_index) {
d_buffer->d_readers[idx]->d_read_index = 0;
} else {
d_buffer->d_readers[idx]->d_read_index += items_avail;
d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
}
}
return true;
}
return false;
}
buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
unsigned int read_index,
block_sptr link)
: buffer_reader(buffer, read_index, link)
{
}
} /* namespace gr */
|