diff options
4 files changed, 114 insertions, 37 deletions
diff --git a/usrp/host/lib/legacy/circular_buffer.h b/usrp/host/lib/legacy/circular_buffer.h
index fa451d607b..8898e41948 100644
--- a/usrp/host/lib/legacy/circular_buffer.h
+++ b/usrp/host/lib/legacy/circular_buffer.h
@@ -26,7 +26,9 @@
#include "mld_threads.h"
#include <stdexcept>
+#ifndef DO_DEBUG
#define DO_DEBUG 0
#define DEBUG(X) do{X} while(0);
@@ -82,7 +84,7 @@ public:
DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
"doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
(d_doWriteBlock ? "true" : "false"),
- (d_doFullRead ? "true" : "false")));
+ (d_doFullRead ? "true" : "false")););
~circular_buffer () {
@@ -150,7 +152,7 @@ public:
int enqueue (T* buf, UInt32 bufLen_I) {
DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
"#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
- d_n_avail_write_I, d_n_avail_read_I));
+ d_n_avail_write_I, d_n_avail_read_I););
if (bufLen_I > d_bufLen_I) {
fprintf (stderr, "cannot add buffer longer (%ld"
") than instantiated length (%ld"
@@ -173,21 +175,21 @@ public:
if (bufLen_I > d_n_avail_write_I) {
if (d_doWriteBlock) {
while (bufLen_I > d_n_avail_write_I) {
- DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"));
+ DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"););
// wait will automatically unlock() the internal mutex
d_writeBlock->wait ();
// and lock() it here.
if (d_doAbort) {
d_internal->unlock ();
- DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"));
+ DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"););
return (2);
- DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"));
+ DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"););
} else {
d_n_avail_read_I = d_bufLen_I - bufLen_I;
d_n_avail_write_I = bufLen_I;
- DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"));
+ DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"););
retval = -1;
@@ -233,7 +235,7 @@ public:
int dequeue (T* buf, UInt32* bufLen_I) {
DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
"#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
- d_n_avail_write_I, d_n_avail_read_I));
+ d_n_avail_write_I, d_n_avail_read_I););
if (!bufLen_I)
throw std::runtime_error ("circular_buffer::dequeue(): "
"input bufLen pointer is NULL.\n");
@@ -257,29 +259,29 @@ public:
if (d_doFullRead) {
while (d_n_avail_read_I < l_bufLen_I) {
- DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"));
+ DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"););
// wait will automatically unlock() the internal mutex
d_readBlock->wait ();
// and lock() it here.
if (d_doAbort) {
d_internal->unlock ();
- DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"));
+ DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"););
return (2);
- DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"));
+ DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"););
} else {
while (d_n_avail_read_I == 0) {
- DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"));
+ DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"););
// wait will automatically unlock() the internal mutex
d_readBlock->wait ();
// and lock() it here.
if (d_doAbort) {
d_internal->unlock ();
- DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"));
+ DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"););
return (2);
- DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"));
+ DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"););
if (l_bufLen_I > d_n_avail_read_I)
diff --git a/usrp/host/lib/legacy/circular_linked_list.h b/usrp/host/lib/legacy/circular_linked_list.h
index 14d81ac915..e495d609b2 100644
--- a/usrp/host/lib/legacy/circular_linked_list.h
+++ b/usrp/host/lib/legacy/circular_linked_list.h
@@ -27,7 +27,10 @@
#include <stdexcept>
#define __INLINE__ inline
+#ifndef DO_DEBUG
#define DO_DEBUG 0
#define DEBUG(X) do{X} while(0);
@@ -168,17 +171,17 @@ public:
d_internal->lock ();
// find an available node
s_node_ptr l_node = d_available;
- DEBUG (fprintf (stderr, "w "));
+ DEBUG (fprintf (stderr, "w "););
while (! l_node) {
- DEBUG (fprintf (stderr, "x\n"));
+ DEBUG (fprintf (stderr, "x\n"););
// the ioBlock condition will automatically unlock() d_internal
d_ioBlock->wait ();
// and lock() is here
- DEBUG (fprintf (stderr, "y\n"));
+ DEBUG (fprintf (stderr, "y\n"););
l_node = d_available;
DEBUG (fprintf (stderr, "::f_n_a_n: #u = %ld, node = %p\n",
- num_used(), l_node));
+ num_used(), l_node););
// remove this one from the current available list
if (num_available () == 1) {
// last one, just set available to NULL
@@ -201,7 +204,7 @@ public:
if (!l_node) return;
d_internal->lock ();
DEBUG (fprintf (stderr, "::m_n_a: #u = %ld, node = %p\n",
- num_used(), l_node));
+ num_used(), l_node););
// remove this node from the inUse list
if (num_used () == 1) {
// last one, just set inUse to NULL
@@ -216,10 +219,10 @@ public:
l_node->insert_before (d_available);
- DEBUG (fprintf (stderr, "s%ld ", d_n_used));
+ DEBUG (fprintf (stderr, "s%ld ", d_n_used););
// signal the condition when new data arrives
d_ioBlock->signal ();
- DEBUG (fprintf (stderr, "t "));
+ DEBUG (fprintf (stderr, "t "););
// unlock the mutex for thread safety
d_internal->unlock ();
diff --git a/usrp/host/lib/legacy/ b/usrp/host/lib/legacy/
index 05db29e805..737387b877 100644
--- a/usrp/host/lib/legacy/
+++ b/usrp/host/lib/legacy/
@@ -27,6 +27,7 @@
// tell mld_threads to NOT use omni_threads,
// but rather Darwin's pthreads
+#define DO_DEBUG 0
#include <usb.h>
#include "fusb.h"
@@ -190,13 +191,18 @@ fusb_ephandle_darwin::start ()
d_endpoint, d_pipeRef, d_interface, d_interfaceRef, direction,
number, interval, maxPacketSize);
-// set global start boolean
+ // set global start boolean
d_started = true;
-// create the run thread, which allows OSX to process I/O separately
+ // lock the runBlock mutex, before creating the run thread.
+ // this guarantees that we can control execution between these 2 threads
+ d_runBlock->mutex ()->lock ();
+ // create the run thread, which allows OSX to process I/O separately
d_runThread = new mld_thread (run_thread, this);
-// wait until the threads are -really- going
+ // wait until the run thread (and possibky read thread) are -really-
+ // going; this will unlock the mutex before waiting for a signal ()
d_runBlock->wait ();
if (usb_debug)
@@ -210,11 +216,16 @@ void
fusb_ephandle_darwin::run_thread (void* arg)
fusb_ephandle_darwin* This = static_cast<fusb_ephandle_darwin*>(arg);
+ // lock the run thread running mutex; if ::stop() is called, it will
+ // first abort() the pipe then wait for the run thread to finish,
+ // via a lock() on this mutex
mld_mutex_ptr l_runThreadRunning = This->d_runThreadRunning;
l_runThreadRunning->lock ();
mld_mutex_ptr l_readRunning = This->d_readRunning;
mld_condition_ptr l_readBlock = This->d_readBlock;
+ mld_mutex_ptr l_readBlock_mutex = l_readBlock->mutex ();
bool l_input_p = This->d_input_p;
@@ -237,24 +248,41 @@ fusb_ephandle_darwin::run_thread (void* arg)
mld_thread_ptr l_rwThread = NULL;
if (l_input_p) {
+ // lock the readBlock mutex, before creating the read thread.
+ // this guarantees that we can control execution between these 2 threads
+ l_readBlock_mutex->lock ();
+ // create the read thread, which just issues all of the starting
+ // async read commands, then returns
l_rwThread = new mld_thread (read_thread, arg);
-// wait until the the rwThread is -really- going
+ // wait until the the read thread is -really- going; this will
+ // unlock the read block mutex before waiting for a signal ()
l_readBlock->wait ();
-// now signal the run condition to release and finish ::start()
+ // now signal the run condition to release and finish ::start().
+ // lock the runBlock mutex first; this will force waiting until the
+ // ->wait() command is issued in ::start()
+ mld_mutex_ptr l_run_block_mutex = This->d_runBlock->mutex ();
+ l_run_block_mutex->lock ();
+ // now that the lock is in place, signal the parent thread that
+ // things are running
This->d_runBlock->signal ();
-// run the loop
+ // release the run_block mutex, just in case
+ l_run_block_mutex->unlock ();
+ // run the loop
CFRunLoopRun ();
if (l_input_p) {
-// wait for read_thread () to finish
+ // wait for read_thread () to finish, if needed
l_readRunning->lock ();
l_readRunning->unlock ();
-// remove run loop stuff
+ // remove run loop stuff
CFRunLoopRemoveSource (CFRunLoopGetCurrent (),
l_cfSource, kCFRunLoopDefaultMode);
@@ -262,6 +290,7 @@ fusb_ephandle_darwin::run_thread (void* arg)
fprintf (stderr, "fusb_ephandle_darwin::run_thread: finished for %s.\n",
l_input_p ? "read" : "write");
+ // release the run thread running mutex
l_runThreadRunning->unlock ();
@@ -273,13 +302,27 @@ fusb_ephandle_darwin::read_thread (void* arg)
fusb_ephandle_darwin* This = static_cast<fusb_ephandle_darwin*>(arg);
+ // before doing anything else, lock the read running mutex. this
+ // mutex does flow control between this thread and the run_thread
mld_mutex_ptr l_readRunning = This->d_readRunning;
l_readRunning->lock ();
-// signal the read condition from run_thread() to continue
+ // signal the read condition from run_thread() to continue
+ // lock the readBlock mutex first; this will force waiting until the
+ // ->wait() command is issued in ::run_thread()
mld_condition_ptr l_readBlock = This->d_readBlock;
+ mld_mutex_ptr l_read_block_mutex = l_readBlock->mutex ();
+ l_read_block_mutex->lock ();
+ // now that the lock is in place, signal the parent thread that
+ // things are running here
l_readBlock->signal ();
+ // release the run_block mutex, just in case
+ l_read_block_mutex->unlock ();
+ // queue up all of the available read requests
s_queue_ptr l_queue = This->d_queue;
l_queue->iterate_start ();
s_node_ptr l_node = l_queue->iterate_next ();
@@ -291,14 +334,21 @@ fusb_ephandle_darwin::read_thread (void* arg)
if (usb_debug)
fprintf (stderr, "fusb_ephandle_darwin::read_thread: finished.\n");
+ // release the read running mutex, to let the parent thread knows
+ // that this thread is finished
l_readRunning->unlock ();
fusb_ephandle_darwin::read_issue (s_both_ptr l_both)
- if ((! l_both) || (! d_started))
+ if ((! l_both) || (! d_started)) {
+ if (usb_debug > 4)
+ fprintf (stderr, "fusb_ephandle_darwin::read_issue: Doing nothing; "
+ "l_both is %X; started is %s\n", (unsigned int) l_both,
+ d_started ? "TRUE" : "FALSE");
+ }
// set the node and buffer from the input "both"
s_node_ptr l_node = l_both->node ();
@@ -328,6 +378,9 @@ fusb_ephandle_darwin::read_issue (s_both_ptr l_both)
"(ReadPipeAsync%s): %s",
d_transferType == kUSBInterrupt ? "" : "TO",
darwin_error_str (result));
+ else if (usb_debug > 4)
+ fprintf (stderr, "fusb_ephandle_darwin::read_issue: "
+ "Queued %X (%ld Bytes)\n", (unsigned int) l_both, bufLen);
@@ -347,6 +400,10 @@ fusb_ephandle_darwin::read_completed (void* refCon,
fprintf (stderr, "fusb_ephandle_darwin::read_completed: "
"Expected %ld bytes; read %ld.\n",
l_i_size, l_size);
+ else if (usb_debug > 4)
+ fprintf (stderr, "fusb_ephandle_darwin::read_completed: "
+ "Read %X (%ld bytes)\n",
+ (unsigned int) l_both, l_size);
// add this read to the transfer buffer
if (l_buffer->enqueue (l_buf->buffer (), l_size) == -1) {
@@ -378,7 +435,12 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes)
UInt32 l_nbytes = (UInt32) nbytes;
- if (! d_started) return (0);
+ if (! d_started) {
+ if (usb_debug)
+ fprintf (stderr, "fusb_ephandle_darwin::write: Not yet started.\n");
+ return (0);
+ }
while (l_nbytes != 0) {
// find out how much data to copy; limited to "d_bufLenBytes" per node
@@ -400,11 +462,11 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes)
if (d_transferType == kUSBInterrupt)
/* This is an interrupt pipe ... can't specify a timeout. */
result = d_interface->WritePipeAsync
- (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes,
+ (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes,
(IOAsyncCallback1) write_completed, (void*) l_both);
result = d_interface->WritePipeAsyncTO
- (d_interfaceRef, d_pipeRef, v_buffer, l_nbytes, 0, USB_TIMEOUT,
+ (d_interfaceRef, d_pipeRef, v_buffer, t_nbytes, 0, USB_TIMEOUT,
(IOAsyncCallback1) write_completed, (void*) l_both);
if (result != kIOReturnSuccess)
@@ -413,6 +475,10 @@ fusb_ephandle_darwin::write (const void* buffer, int nbytes)
"(WritePipeAsync%s): %s",
d_transferType == kUSBInterrupt ? "" : "TO",
darwin_error_str (result));
+ else if (usb_debug > 4) {
+ fprintf (stderr, "fusb_ephandle_darwin::write_thread: "
+ "Queued %X (%ld Bytes)\n", (unsigned int) l_both, t_nbytes);
+ }
l_nbytes -= t_nbytes;
@@ -436,6 +502,9 @@ fusb_ephandle_darwin::write_completed (void* refCon,
fprintf (stderr, "fusb_ephandle_darwin::write_completed: "
"Expected %ld bytes written; wrote %ld.\n",
l_i_size, l_size);
+ else if (usb_debug > 4)
+ fprintf (stderr, "fusb_ephandle_darwin::write_completed: "
+ "Wrote %X (%ld Bytes)\n", (unsigned int) l_both, l_size);
// set buffer's # data to 0
l_buf->n_used (0);
diff --git a/usrp/host/lib/legacy/mld_threads.h b/usrp/host/lib/legacy/mld_threads.h
index a59a928634..b2ec657510 100644
--- a/usrp/host/lib/legacy/mld_threads.h
+++ b/usrp/host/lib/legacy/mld_threads.h
@@ -37,7 +37,10 @@
#include <stdexcept>
#define __INLINE__ inline
+#ifndef DO_DEBUG
#define DO_DEBUG 0
#define DEBUG(X) do{X} while(0);
@@ -182,7 +185,7 @@ public:
__INLINE__ mld_mutex_ptr mutex () {return (d_mutex);};
__INLINE__ void signal () {
- DEBUG (fprintf (stderr, "a "));
+ DEBUG (fprintf (stderr, "a "););
d_condition->signal ();
@@ -193,11 +196,11 @@ public:
"Error %d.\n", l_ret);
- DEBUG (fprintf (stderr, "b "));
+ DEBUG (fprintf (stderr, "b "););
__INLINE__ void wait () {
- DEBUG (fprintf (stderr, "c "));
+ DEBUG (fprintf (stderr, "c "););
d_condition->wait ();
@@ -207,7 +210,7 @@ public:
"Error %d.\n", l_ret);
- DEBUG (printf (stderr, "d "));
+ DEBUG (fprintf (stderr, "d "););