<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.0 Transitional//EN" "http://www.w3.org/TR/REC-html40/loose.dtd"> <HTML> <HEAD><TITLE>The OMNI Thread Abstraction</TITLE> <META http-equiv="Content-Type" content="text/html; charset=ISO-8859-1"> <META name="GENERATOR" content="hevea 1.06"> </HEAD> <BODY > <!--HEVEA command line is: /usr/local/bin/hevea omnithread --> <!--HTMLHEAD--> <!--ENDHTML--> <!--PREFIX <ARG ></ARG>--> <!--CUT DEF section 1 --> <H1 ALIGN=center>The OMNI Thread Abstraction</H1> <H3 ALIGN=center>Tristan Richardson<BR> AT&T Laboratories Cambridge<BR> </H3> <H3 ALIGN=center><I>Revised</I> November 2001</H3> <!--TOC section Introduction--> <H2><A NAME="htoc1">1</A> Introduction</H2><!--SEC END --> The OMNI thread abstraction is designed to provide a common set of thread operations for use in programs written in C++. Programs written using the abstraction should be much easier to port between different architectures with different underlying threads primitives.<BR> <BR> The programming interface is designed to be similar to the C language interface to POSIX threads (IEEE draft standard 1003.1c --- previously 1003.4a, often known as ``pthreads'' [<A HREF="#pthreads"><CITE>POSIX94</CITE></A>]).<BR> <BR> Much of the abstraction consists of simple C++ object wrappers around pthread calls. However for some features such as thread-specific data, a better interface can be offered because of the use of C++.<BR> <BR> Some of the more complex features of pthreads are not supported because of the difficulty of ensuring the same features can be offered on top of other thread systems. Such features include thread cancellation and complex scheduling control (though simple thread priorities are supported).<BR> <BR> The abstraction layer is currently implemented for the following architectures / thread systems: <UL><LI>Solaris 2.x using pthreads draft 10 <LI>Solaris 2.x using solaris threads (but pthreads version is now standard) <LI>Alpha OSF1 using pthreads draft 4 <LI>Windows NT using NT threads <LI>Linux 2.x using Linuxthread 0.5 (which is based on pthreads draft 10) <LI>Linux 2.x using MIT pthreads (which is based on draft 8) <LI>ATMos using pthreads draft 6 (but not Virata ATMos)</UL> See the <TT>omnithread.h</TT> header file for full details of the API. The descriptions below assume you have some previous knowledge of threads, mutexes, condition variables and semaphores. Also refer to other documentation ([<A HREF="#birrell"><CITE>Birrell89</CITE></A>], [<A HREF="#pthreads"><CITE>POSIX94</CITE></A>]) for further explanation of these ideas (particularly condition variables, the use of which may not be particularly intuitive when first encountered).<BR> <BR> <!--TOC section Synchronisation objects--> <H2><A NAME="htoc2">2</A> Synchronisation objects</H2><!--SEC END --> Synchronisation objects are used to synchronise threads within the same process. There is no inter-process synchronisation provided. The synchronisation objects provided are mutexes, condition variables and counting semaphores.<BR> <BR> <!--TOC subsection Mutex--> <H3><A NAME="htoc3">2.1</A> Mutex</H3><!--SEC END --> An object of type <TT>omni_mutex</TT> is used for mutual exclusion. It provides two operations, <TT>lock()</TT> and <TT>unlock()</TT>. The alternative names <TT>acquire()</TT> and <TT>release()</TT> can be used if preferred. Behaviour is undefined when a thread attempts to lock the same mutex again or when a mutex is locked by one thread and unlocked by a different thread.<BR> <BR> <!--TOC subsection Condition Variable--> <H3><A NAME="htoc4">2.2</A> Condition Variable</H3><!--SEC END --> A condition variable is represented by an <TT>omni_condition</TT> and is used for signalling between threads. A call to <TT>wait()</TT> causes a thread to wait on the condition variable. A call to <TT>signal()</TT> wakes up at least one thread if any are waiting. A call to <TT>broadcast()</TT> wakes up all threads waiting on the condition variable.<BR> <BR> When constructed, a pointer to an <TT>omni_mutex</TT> must be given. A condition variable <TT>wait()</TT> has an implicit mutex <TT>unlock()</TT> and <TT>lock()</TT> around it. The link between condition variable and mutex lasts for the lifetime of the condition variable (unlike pthreads where the link is only for the duration of the wait). The same mutex may be used with several condition variables.<BR> <BR> A wait with a timeout can be achieved by calling <TT>timed_wait()</TT>. This is given an absolute time to wait until. The routine <TT>omni_thread::get_time()</TT> can be used to turn a relative time into an absolute time. <TT>timed_wait()</TT> returns <TT>true</TT> if the condition was signalled, <TT>false</TT> if the time expired before the condition variable was signalled.<BR> <BR> <!--TOC subsection Counting semaphores--> <H3><A NAME="htoc5">2.3</A> Counting semaphores</H3><!--SEC END --> An <TT>omni_semaphore</TT> is a counting semaphore. When created it is given an initial unsigned integer value. When <TT>wait()</TT> is called, the value is decremented if non-zero. If the value is zero then the thread blocks instead. When <TT>post()</TT> is called, if any threads are blocked in <TT>wait()</TT>, exactly one thread is woken. If no threads were blocked then the value of the semaphore is incremented.<BR> <BR> If a thread calls <TT>try_wait()</TT>, then the thread won't block if the semaphore's value is 0, returning <TT>false</TT> instead.<BR> <BR> There is no way of querying the value of the semaphore.<BR> <BR> <!--TOC section Thread object--> <H2><A NAME="htoc6">3</A> Thread object</H2><!--SEC END --> A thread is represented by an <TT>omni_thread</TT> object. There are broadly two different ways in which it can be used.<BR> <BR> The first way is simply to create an <TT>omni_thread</TT> object, giving a particular function which the thread should execute. This is like the POSIX (or any other) C language interface.<BR> <BR> The second method of use is to create a new class which inherits from <TT>omni_thread</TT>. In this case the thread will execute the <TT>run()</TT> member function of the new class. One advantage of this scheme is that thread-specific data can be implemented simply by having data members of the new class.<BR> <BR> When constructed a thread is in the "new" state and has not actually started. A call to <TT>start()</TT> causes the thread to begin executing. A static member function <TT>create()</TT> is provided to construct and start a thread in a single call. A thread exits by calling <TT>exit()</TT> or by returning from the thread function.<BR> <BR> Threads can be either detached or undetached. Detached threads are threads for which all state will be lost upon exit. Other threads cannot determine when a detached thread will disappear, and therefore should not attempt to access the thread object unless some explicit synchronisation with the detached thread guarantees that it still exists.<BR> <BR> Undetached threads are threads for which storage is not reclaimed until another thread waits for its termination by calling <TT>join()</TT>. An exit value can be passed from an undetached thread to the thread which joins it.<BR> <BR> Detached / undetached threads are distinguished on creation by the type of function they execute. Undetached threads execute a function which has a <TT>void*</TT> return type, whereas detached threads execute a function which has a <TT>void</TT> return type. Unfortunately C++ member functions are not allowed to be distinguished simply by their return type. Thus in the case of a derived class of <TT>omni_thread</TT> which needs an undetached thread, the member function executed by the thread is called <TT>run_undetached()</TT> rather than <TT>run()</TT>, and it is started by calling <TT>start_undetached()</TT> instead of <TT>start()</TT>.<BR> <BR> The abstraction currently supports three priorities of thread, but no guarantee is made of how this will affect underlying thread scheduling. The three priorities are <TT>PRIORITY_LOW</TT>, <TT>PRIORITY_NORMAL</TT> and <TT>PRIORITY_HIGH</TT>. By default all threads run at <TT>PRIORITY_NORMAL</TT>. A different priority can be specified on thread creation, or while the thread is running using <TT>set_priority().</TT> A thread's current priority is returned by <TT>priority()</TT>.<BR> <BR> Other functions provided are <TT>self()</TT> which returns the calling thread's <TT>omni_thread</TT> object, <TT>yield()</TT> which requests that other threads be allowed to run, <TT>id()</TT> which returns an integer id for the thread for use in debugging, <TT>state()</TT>, <TT>sleep()</TT> and <TT>get_time()</TT>.<BR> <BR> <!--TOC section Per-thread data--> <H2><A NAME="htoc7">4</A> Per-thread data</H2><!--SEC END --> omnithread supports per-thread data, via member functions of the <TT>omni_thread</TT> object.<BR> <BR> First, you must allocate a key for with the <TT>omni_thread::allocate_key()</TT> function. Then, any object whose class is derived from <TT>omni_thread::value_t</TT> can be stored using the <TT>set_value()</TT> function. Values are retrieved or removed with <TT>get_value()</TT> and <TT>remove_value()</TT> respectively.<BR> <BR> When the thread exits, all per-thread data is deleted (hence the base class with virtual destructor).<BR> <BR> Note that the per-thread data functions are <B>not</B> thread safe, so although you can access one thread's storage from another thread, there is no concurrency control. Unless you really know what you are doing, it is best to only access per-thread data from the thread it is attached to.<BR> <BR> <!--TOC section Using OMNI threads in your program--> <H2><A NAME="htoc8">5</A> Using OMNI threads in your program</H2><!--SEC END --> Obviously you need to include the <TT>omnithread.h</TT> header file in your source code, and link in the omnithread library with your executable. Because there is a single <TT>omnithread.h</TT> for all platforms, certain preprocessor defines must be given as compiler options. The easiest way to do this is to study the makefiles given in the examples provided with this distribution. If you are to include OMNI threads in your own development environment, these are the necessary preprocessor defines:<BR> <TABLE BORDER=1 CELLSPACING=0 CELLPADDING=1> <TR><TD ALIGN=left NOWRAP>Platform</TD> <TD ALIGN=left NOWRAP>Preprocessor Defines</TD> </TR> <TR><TD ALIGN=left NOWRAP>Sun Solaris 2.x</TD> <TD ALIGN=left NOWRAP><CODE>-D__sunos__ -D__sparc__ -D__OSVERSION__=5</CODE></TD> </TR> <TR><TD ALIGN=left NOWRAP> </TD> <TD ALIGN=left NOWRAP><CODE>-DSVR4 -DUsePthread -D_REENTRANT</CODE></TD> </TR> <TR><TD ALIGN=left NOWRAP>x86 Linux 2.0</TD> <TD ALIGN=left NOWRAP><CODE>-D__linux__ -D__i86__ -D__OSVERSION__=2</CODE></TD> </TR> <TR><TD ALIGN=left NOWRAP>with linuxthreads 0.5</TD> <TD ALIGN=left NOWRAP><CODE>-D_REENTRANT</CODE></TD> </TR> <TR><TD ALIGN=left NOWRAP>Digital Unix 3.2</TD> <TD ALIGN=left NOWRAP><CODE>-D__osf1__ -D__alpha__ -D__OSVERSION__=3</CODE></TD> </TR> <TR><TD ALIGN=left NOWRAP> </TD> <TD ALIGN=left NOWRAP><CODE>-D_REENTRANT</CODE></TD> </TR> <TR><TD ALIGN=left NOWRAP>Windows NT</TD> <TD ALIGN=left NOWRAP><CODE>-D__NT__ -MD</CODE></TD> </TR></TABLE><BR> <!--TOC section Threaded I/O shutdown for Unix--> <H2><A NAME="htoc9">6</A> Threaded I/O shutdown for Unix</H2><!--SEC END --> or, how one thread should tell another thread to shut down when it might be doing a blocking call on a socket.<BR> <BR> <B>If you are using omniORB, you don't need to worry about all this, since omniORB does it for you.</B> This section is only relevant if you are using omnithread in your own socket-based programming. It is also seriously out of date.<BR> <BR> Unfortunately there doesn't seem to be a standard way of doing this which works across all Unix systems. I have investigated the behaviour of Solaris 2.5 and Digital Unix 3.2. On Digital Unix everything is fine, as the obvious method using shutdown() seems to work OK. Unfortunately on Solaris shutdown can only be used on a connected socket, so we need devious means to get around this limitation. The details are summarised below:<BR> <BR> <!--TOC subsection read()--> <H3><A NAME="htoc10">6.1</A> read()</H3><!--SEC END --> Thread A is in a loop, doing <CODE>read(sock)</CODE>, processing the data, then going back into the read.<BR> <BR> Thread B comes along and wants to shut it down --- it can't cancel thread A since (i) working out how to clean up according to where A is in its loop is a nightmare, and (ii) this isn't available in omnithread anyway.<BR> <BR> On Solaris 2.5 and Digital Unix 3.2 the following strategy works:<BR> <BR> Thread B does <CODE>shutdown(sock,2)</CODE>.<BR> <BR> At this point thread A is either blocked inside <CODE>read(sock)</CODE>, or is elsewhere in the loop. If the former then read will return 0, indicating that the socket is closed. If the latter then eventually thread A will call <CODE>read(sock)</CODE> and then this will return 0. Thread A should <CODE>close(sock)</CODE>, do any other tidying up, and exit.<BR> <BR> If there is another point in the loop that thread A can block then obviously thread B needs to be aware of this and be able to wake it up in the appropriate way from that point.<BR> <BR> <!--TOC subsection accept()--> <H3><A NAME="htoc11">6.2</A> accept()</H3><!--SEC END --> Again thread A is in a loop, this time doing an accept on listenSock, dealing with a new connection and going back into accept. Thread B wants to cancel it.<BR> <BR> On Digital Unix 3.2 the strategy is identical to that for read:<BR> <BR> Thread B does <CODE>shutdown(listenSock,2)</CODE>. Wherever thread A is in the loop, eventually it will return <CODE>ECONNABORTED</CODE> from the accept call. It should <CODE>close(listenSock)</CODE>, tidy up as necessary and exit.<BR> <BR> On Solaris 2.5 thread B can't do <CODE>shutdown(listenSock,2)</CODE> --- this returns <CODE>ENOTCONN</CODE>. Instead the following strategy can be used:<BR> <BR> First thread B sets some sort of "shutdown flag" associated with listenSock. Then it does <CODE>getsockaddr(listenSock)</CODE> to find out which port listenSock is on (or knows already), sets up a socket dummySock, does <CODE>connect(dummySock,</CODE> <CODE>this host, port)</CODE> and finally does <CODE>close(dummySock)</CODE>.<BR> <BR> Wherever thread A is in the loop, eventually it will call <CODE>accept(listenSock)</CODE>. This will return successfully with a new socket, say connSock. Thread A then checks to see if the "shutdown flag" is set. If not, then it's a normal connection. If it is set, then thread A closes listenSock and connSock, tidies up and exits.<BR> <BR> <!--TOC subsection write()--> <H3><A NAME="htoc12">6.3</A> write()</H3><!--SEC END --> Thread A may be blocked in write, or about to go in to a potentially-blocking write. Thread B wants to shut it down.<BR> <BR> On Solaris 2.5:<BR> <BR> Thread B does <CODE>shutdown(sock,2)</CODE>.<BR> <BR> If thread A is already in <CODE>write(sock)</CODE> then it will return with <CODE>ENXIO</CODE>. If thread A calls write after thread B calls shutdown this will return <CODE>EIO</CODE>.<BR> <BR> On Digital Unix 3.2:<BR> <BR> Thread B does <CODE>shutdown(sock,2)</CODE>.<BR> <BR> If thread A is already in <CODE>write(sock)</CODE> then it will return the number of bytes written before it became blocked. A subsequent call to write will then generate <CODE>SIGPIPE</CODE> (or <CODE>EPIPE</CODE> will be returned if <CODE>SIGPIPE</CODE> is ignored by the thread).<BR> <BR> <!--TOC subsection connect()--> <H3><A NAME="htoc13">6.4</A> connect()</H3><!--SEC END --> Thread A may be blocked in connect, or about to go in to a potentially-blocking connect. Thread B wants to shut it down.<BR> <BR> On Digital Unix 3.2:<BR> <BR> Thread B does <CODE>shutdown(sock,2)</CODE>.<BR> <BR> If thread A is already in <CODE>connect(sock)</CODE> then it will return a successful connection. Subsequent reading or writing will show that the socket has been shut down (i.e. read returns 0, write generates <CODE>SIGPIPE</CODE> or returns <CODE>EPIPE</CODE>). If thread A calls connect after thread B calls shutdown this will return <CODE>EINVAL</CODE>.<BR> <BR> On Solaris 2.5:<BR> <BR> There is no way to wake up a thread which is blocked in connect. Instead Solaris forces us through a ridiculous procedure whichever way we try it. One way is this:<BR> <BR> First thread A creates a pipe in addition to the socket. Instead of shutting down the socket, thread B simply writes a byte to the pipe.<BR> <BR> Thread A meanwhile sets the socket to non-blocking mode using <CODE>fcntl(sock,</CODE> <CODE>F_SETFL, O_NONBLOCK)</CODE>. Then it calls connect on the socket --- this will return <CODE>EINPROGRESS</CODE>. Then it must call <CODE>select()</CODE>, waiting for either sock to become writable or for the pipe to become readable. If select returns that just sock is writable then the connection has succeeded. It then needs to set the socket back to blocking mode using <CODE>fcntl(sock, F_SETFL, 0)</CODE>. If instead select returns that the pipe is readable, thread A closes the socket, tidies up and exits.<BR> <BR> An alternative method is similar but to use polling instead of the pipe. Thread B justs sets a flag and thread A calls select with a timeout, periodically waking up to see if the flag has been set.<BR> <BR> <!--TOC section References--> <H2>References</H2><!--SEC END --> <DL COMPACT=compact><DT><A NAME="pthreads"><FONT COLOR=purple>[POSIX94]</FONT></A><DD> <EM>Portable Operating System Interface (POSIX) Threads Extension</EM>, P1003.1c Draft 10, IEEE, September 1994.<BR> <BR> <DT><A NAME="birrell"><FONT COLOR=purple>[Birrell89]</FONT></A><DD> <EM>An Introduction to Programming with Threads</EM>, Research Report 35, DEC Systems Research Center, Palo Alto, CA, January 1989.</DL> <!--HTMLFOOT--> <!--ENDHTML--> <!--FOOTER--> <HR SIZE=2> <BLOCKQUOTE><EM>This document was translated from L<sup>A</sup>T<sub>E</sub>X by </EM><A HREF="http://pauillac.inria.fr/~maranget/hevea/index.html"><EM>H<FONT SIZE=2><sup>E</sup></FONT>V<FONT SIZE=2><sup>E</sup></FONT>A</EM></A><EM>. </EM></BLOCKQUOTE> </BODY> </HTML>