Statistics
| Branch: | Tag: | Revision:

root / gnuradio-core / src / lib / omnithread / vxWorks.cc @ 5d69a524

History | View | Annotate | Download (25.2 kB)

1
//////////////////////////////////////////////////////////////////////////////
2
// Filename:         vxWorks.cc
3
// Author:                Tihomir Sokcevic
4
//                                        Acterna, Eningen.
5
// Description: vxWorks adaptation of the omnithread wrapper classes
6
// Notes:                 Munching strategy is imperative
7
//////////////////////////////////////////////////////////////////////////////
8
// $Log$
9
// Revision 1.1  2004/04/10 18:00:52  eb
10
// Initial revision
11
//
12
// Revision 1.1.1.1  2004/03/01 00:20:27  eb
13
// initial checkin
14
//
15
// Revision 1.1  2003/05/25 05:29:04  eb
16
// see ChangeLog
17
//
18
// Revision 1.1.2.1  2003/02/17 02:03:11  dgrisby
19
// vxWorks port. (Thanks Michael Sturm / Acterna Eningen GmbH).
20
//
21
// Revision 1.1.1.1  2002/11/19 14:58:04  sokcevti
22
// OmniOrb4.0.0 VxWorks port
23
//
24
// Revision 1.4  2002/10/15 07:54:09  kuttlest
25
// change semaphore from SEM_FIFO to SEM_PRIO
26
// ---
27
//
28
// Revision 1.3  2002/07/05 07:38:52  engeln
29
// made priority redefinable on load time by defining int variables
30
//         omni_thread_prio_low = 220;
31
//         omni_thread_prio_normal = 110;
32
//         omni_thread_prio_high = 55;
33
// the default priority is prio_normal.
34
// The normal priority default has been increased from 200 to 110 and the
35
//     high priority from 100 to 55.
36
// ---
37
//
38
// Revision 1.2  2002/06/14 12:44:57  engeln
39
// replaced possibly unsafe wakeup procedure in broadcast.
40
// ---
41
//
42
// Revision 1.1.1.1  2002/04/02 10:09:34  sokcevti
43
// omniORB4 initial realease
44
//
45
// Revision 1.0        2001/10/23 14:22:45        sokcevti
46
// Initial Version 4.00
47
// ---
48
//
49
//////////////////////////////////////////////////////////////////////////////
50
51
52
//////////////////////////////////////////////////////////////////////////////
53
// Include files
54
//////////////////////////////////////////////////////////////////////////////
55
#include <stdlib.h>
56
#include <stdio.h>
57
#include <errno.h>
58
#include <time.h>
59
#include <omnithread.h>
60
#include <sysLib.h>
61
62
#include <assert.h>                // assert
63
#include <intLib.h>                // intContext
64
65
66
//////////////////////////////////////////////////////////////////////////////
67
// Local defines
68
//////////////////////////////////////////////////////////////////////////////
69
#define ERRNO(x) (((x) != 0) ? (errno) : 0)
70
#define THROW_ERRORS(x) { if((x) != OK) throw omni_thread_fatal(errno); }
71
#define OMNI_THREAD_ID        0x7F7155AAl
72
#define OMNI_STACK_SIZE 32768l
73
74
#ifdef _DEBUG
75
        #include <fstream>
76
        #define DBG_TRACE(X) X
77
#else // _DEBUG
78
        #define DBG_TRACE(X)
79
#endif // _DEBUG
80
81
#define DBG_ASSERT(X)
82
83
#define DBG_THROW(X) X
84
85
int omni_thread_prio_low = 220;
86
int omni_thread_prio_normal = 110;
87
int omni_thread_prio_high = 55;
88
///////////////////////////////////////////////////////////////////////////
89
//
90
// Mutex
91
//
92
///////////////////////////////////////////////////////////////////////////
93
omni_mutex::omni_mutex(void):m_bConstructed(false)
94
{
95
        mutexID = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE);
96
97
        DBG_ASSERT(assert(mutexID != NULL));
98
99
        if(mutexID==NULL)
100
        {
101
                DBG_TRACE(cout<<"Exception: omni_mutex::omni_mutex()  tid: "<<(int)taskIdSelf()<<endl);
102
                DBG_THROW(throw omni_thread_fatal(-1));
103
        }
104
105
        m_bConstructed = true;
106
}
107
108
omni_mutex::~omni_mutex(void)
109
{
110
        m_bConstructed = false;
111
112
        STATUS status = semDelete(mutexID);
113
114
        DBG_ASSERT(assert(status == OK));
115
116
        if(status != OK)
117
        {
118
                DBG_TRACE(cout<<"Exception: omni_mutex::~omni_mutex()  mutexID: "<<(int)mutexID<<" tid: "<<(int)taskIdSelf()<<endl);
119
                DBG_THROW(throw omni_thread_fatal(errno));
120
        }
121
}
122
123
/*
124
void omni_mutex::lock(void)
125
{
126
        DBG_ASSERT(assert(!intContext()));                // not in ISR context
127
        DBG_ASSERT(assert(m_bConstructed));
128
129
        STATUS status = semTake(mutexID, WAIT_FOREVER);
130
131
        DBG_ASSERT(assert(status == OK));
132
133
        if(status != OK)
134
        {
135
                DBG_TRACE(cout<<"Exception: omni_mutex::lock()  mutexID: "<<(int)mutexID<<" tid: "<<(int)taskIdSelf()<<endl);
136
                DBG_THROW(throw omni_thread_fatal(errno));
137
        }
138
}
139
140
void omni_mutex::unlock(void)
141
{
142
        DBG_ASSERT(assert(m_bConstructed));
143
144
        STATUS status = semGive(mutexID);
145
146
        DBG_ASSERT(assert(status == OK));
147
148
        if(status != OK)
149
        {
150
                DBG_TRACE(cout<<"Exception: omni_mutex::unlock()  mutexID: "<<(int)mutexID<<" tid: "<<(int)taskIdSelf()<<endl);
151
                DBG_THROW(throw omni_thread_fatal(errno));
152
        }
153
}
154
*/
155
156
///////////////////////////////////////////////////////////////////////////
157
//
158
// Condition variable
159
//
160
///////////////////////////////////////////////////////////////////////////
161
omni_condition::omni_condition(omni_mutex* m) : mutex(m)
162
{
163
        DBG_TRACE(cout<<"omni_condition::omni_condition  mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl);
164
165
        waiters_ = 0;
166
167
        sema_ = semCCreate(SEM_Q_PRIORITY, 0);
168
        if(sema_ == NULL)
169
        {
170
                DBG_TRACE(cout<<"Exception: omni_condition::omni_condition()  tid: "<<(int)taskIdSelf()<<endl);
171
                DBG_THROW(throw omni_thread_fatal(errno));
172
        }
173
174
        waiters_lock_ = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE);
175
        if(waiters_lock_ == NULL)
176
        {
177
                DBG_TRACE(cout<<"Exception: omni_condition::omni_condition()  tid: "<<(int)taskIdSelf()<<endl);
178
                DBG_THROW(throw omni_thread_fatal(errno));
179
        }
180
181
}
182
183
omni_condition::~omni_condition(void)
184
{
185
        STATUS status = semDelete(waiters_lock_);
186
187
        DBG_ASSERT(assert(status == OK));
188
189
        if(status != OK)
190
        {
191
                DBG_TRACE(cout<<"Exception: omni_condition::~omni_condition"<<endl);
192
                DBG_THROW(throw omni_thread_fatal(errno));
193
        }
194
195
        status = semDelete(sema_);
196
197
        DBG_ASSERT(assert(status == OK));
198
199
        if(status != OK)
200
        {
201
                DBG_TRACE(cout<<"Exception: omni_condition::~omni_condition"<<endl);
202
                DBG_THROW(throw omni_thread_fatal(errno));
203
        }
204
}
205
206
void omni_condition::wait(void)
207
{
208
        DBG_TRACE(cout<<"omni_condition::wait            mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl);
209
210
        // Prevent race conditions on the <waiters_> count.
211
212
        STATUS status = semTake(waiters_lock_,WAIT_FOREVER);
213
214
        DBG_ASSERT(assert(status == OK));
215
216
        if(status != OK)
217
        {
218
                DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl);
219
                DBG_THROW(throw omni_thread_fatal(errno));
220
        }
221
222
        ++waiters_;
223
224
        status = semGive(waiters_lock_);
225
226
        DBG_ASSERT(assert(status == OK));
227
228
        if(status != OK)
229
        {
230
                DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl);
231
                DBG_THROW(throw omni_thread_fatal(errno));
232
        }
233
234
        // disable task lock to have an atomic unlock+semTake
235
        taskLock();
236
237
        // We keep the lock held just long enough to increment the count of
238
        // waiters by one.        Note that we can't keep it held across the call
239
        // to wait() since that will deadlock other calls to signal().
240
        mutex->unlock();
241
242
        // Wait to be awakened by a cond_signal() or cond_broadcast().
243
        status = semTake(sema_,WAIT_FOREVER);
244
245
        // reenable task rescheduling
246
        taskUnlock();
247
248
        DBG_ASSERT(assert(status == OK));
249
250
        if(status != OK)
251
        {
252
                DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl);
253
                DBG_THROW(throw omni_thread_fatal(errno));
254
        }
255
256
        // Reacquire lock to avoid race conditions on the <waiters_> count.
257
        status = semTake(waiters_lock_,WAIT_FOREVER);
258
259
        DBG_ASSERT(assert(status == OK));
260
261
        if(status != OK)
262
        {
263
                DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl);
264
                DBG_THROW(throw omni_thread_fatal(errno));
265
        }
266
267
        // We're ready to return, so there's one less waiter.
268
        --waiters_;
269
270
        // Release the lock so that other collaborating threads can make
271
        // progress.
272
        status = semGive(waiters_lock_);
273
274
        DBG_ASSERT(assert(status == OK));
275
276
        if(status != OK)
277
        {
278
                DBG_TRACE(cout<<"Exception: omni_condition::wait"<<endl);
279
                DBG_THROW(throw omni_thread_fatal(errno));
280
        }
281
282
        // Bad things happened, so let's just return below.
283
284
        // We must always regain the <external_mutex>, even when errors
285
        // occur because that's the guarantee that we give to our callers.
286
        mutex->lock();
287
}
288
289
290
// The time given is absolute. Return 0 is timeout
291
int omni_condition::timedwait(unsigned long secs, unsigned long nanosecs)
292
{
293
        STATUS result = OK;
294
        timespec now;
295
        unsigned long timeout;
296
        int ticks;
297
298
        // Prevent race conditions on the <waiters_> count.
299
        STATUS status = semTake(waiters_lock_, WAIT_FOREVER);
300
301
        DBG_ASSERT(assert(status == OK));
302
303
        if(status != OK)
304
        {
305
                DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl);
306
                DBG_THROW(throw omni_thread_fatal(errno));
307
        }
308
309
        ++waiters_;
310
311
        status = semGive(waiters_lock_);
312
313
        DBG_ASSERT(assert(status == OK));
314
315
        if(status != OK)
316
        {
317
                DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl);
318
                DBG_THROW(throw omni_thread_fatal(errno));
319
        }
320
321
        clock_gettime(CLOCK_REALTIME, &now);
322
323
        if(((unsigned long)secs <= (unsigned long)now.tv_sec) &&
324
                (((unsigned long)secs < (unsigned long)now.tv_sec) ||
325
                (nanosecs < (unsigned long)now.tv_nsec)))
326
                timeout = 0;
327
        else
328
                timeout = (secs-now.tv_sec) * 1000 + (nanosecs-now.tv_nsec) / 1000000l;
329
330
        // disable task lock to have an atomic unlock+semTake
331
        taskLock();
332
333
        // We keep the lock held just long enough to increment the count
334
        // of waiters by one.
335
        mutex->unlock();
336
337
        // Wait to be awakened by a signal() or broadcast().
338
        ticks = (timeout * sysClkRateGet()) / 1000L;
339
        result = semTake(sema_, ticks);
340
341
        // reenable task rescheduling
342
        taskUnlock();
343
344
        // Reacquire lock to avoid race conditions.
345
        status = semTake(waiters_lock_, WAIT_FOREVER);
346
347
        DBG_ASSERT(assert(status == OK));
348
349
        if(status != OK)
350
        {
351
                DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl);
352
                DBG_THROW(throw omni_thread_fatal(errno));
353
        }
354
355
        --waiters_;
356
357
        status = semGive(waiters_lock_);
358
359
        DBG_ASSERT(assert(status == OK));
360
361
        if(status != OK)
362
        {
363
                DBG_TRACE(cout<<"Exception: omni_condition::timedwait"<<endl);
364
                DBG_THROW(throw omni_thread_fatal(errno));
365
        }
366
367
        // A timeout has occured - fires exception if the origin is other than timeout
368
        if(result!=OK && !(errno == S_objLib_OBJ_TIMEOUT || errno == S_objLib_OBJ_UNAVAILABLE))
369
        {
370
                DBG_TRACE(cout<<"omni_condition::timedwait! - thread:"<<omni_thread::self()->id()<<" SemID:"<<(int)sema_<<" errno:"<<errno<<endl);
371
                DBG_THROW(throw omni_thread_fatal(errno));
372
        }
373
374
        // We must always regain the <external_mutex>, even when errors
375
        // occur because that's the guarantee that we give to our callers.
376
        mutex->lock();
377
378
        if(result!=OK) // timeout
379
                return 0;
380
381
        return 1;
382
}
383
384
void omni_condition::signal(void)
385
{
386
        DBG_TRACE(cout<<"omni_condition::signal          mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl);
387
388
        STATUS status = semTake(waiters_lock_, WAIT_FOREVER);
389
390
        DBG_ASSERT(assert(status == OK));
391
392
        if(status != OK)
393
        {
394
                DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl);
395
                DBG_THROW(throw omni_thread_fatal(errno));
396
        }
397
398
        int have_waiters = waiters_ > 0;
399
400
        status = semGive(waiters_lock_);
401
402
        DBG_ASSERT(assert(status == OK));
403
404
        if(status != OK)
405
        {
406
                DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl);
407
                DBG_THROW(throw omni_thread_fatal(errno));
408
        }
409
410
        if(have_waiters != 0)
411
        {
412
                status = semGive(sema_);
413
414
                DBG_ASSERT(assert(status == OK));
415
416
                if(status != OK)
417
                {
418
                        DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl);
419
                        DBG_THROW(throw omni_thread_fatal(errno));
420
                }
421
        }
422
}
423
424
void omni_condition::broadcast(void)
425
{
426
        DBG_TRACE(cout<<"omni_condition::broadcast       mutexID: "<<(int)mutex->mutexID<<" tid:"<<(int)taskIdSelf()<<endl);
427
428
        int have_waiters = 0;
429
430
        // The <external_mutex> must be locked before this call is made.
431
        // This is needed to ensure that <waiters_> and <was_broadcast_> are
432
        // consistent relative to each other.
433
        STATUS status = semTake(waiters_lock_, WAIT_FOREVER);
434
435
        DBG_ASSERT(assert(status == OK));
436
437
        if(status != OK)
438
        {
439
                DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl);
440
                DBG_THROW(throw omni_thread_fatal(errno));
441
        }
442
443
        if(waiters_ > 0)
444
        {
445
                // We are broadcasting, even if there is just one waiter...
446
                // Record the fact that we are broadcasting.        This helps the
447
                // cond_wait() method know how to optimize itself.        Be sure to
448
                // set this with the <waiters_lock_> held.
449
                have_waiters = 1;
450
        }
451
452
        status = semGive(waiters_lock_);
453
454
        DBG_ASSERT(assert(status == OK));
455
456
        if(status != OK)
457
        {
458
                DBG_TRACE(cout<<"Exception: omni_condition::signal"<<endl);
459
                DBG_THROW(throw omni_thread_fatal(errno));
460
        }
461
462
        if(have_waiters)
463
        {
464
                // Wake up all the waiters.
465
                status = semFlush(sema_);
466
467
                        DBG_ASSERT(assert(status == OK));
468
469
                        if(status != OK)
470
                        {
471
                                DBG_TRACE(cout<<"omni_condition::broadcast1! - thread:"<<omni_thread::self()->id()<<" SemID:"<<(int)sema_<<" errno:"<<errno<<endl);
472
                                DBG_THROW(throw omni_thread_fatal(errno));
473
                        }
474
475
        }
476
}
477
478
479
///////////////////////////////////////////////////////////////////////////
480
//
481
// Counting semaphore
482
//
483
///////////////////////////////////////////////////////////////////////////
484
omni_semaphore::omni_semaphore(unsigned int initial)
485
{
486
487
        DBG_ASSERT(assert(0 <= (int)initial));                // POSIX expects only unsigned init values
488
489
        semID = semCCreate(SEM_Q_PRIORITY, (int)initial);
490
491
        DBG_ASSERT(assert(semID!=NULL));
492
493
        if(semID==NULL)
494
        {
495
                DBG_TRACE(cout<<"Exception: omni_semaphore::omni_semaphore"<<endl);
496
                DBG_THROW(throw omni_thread_fatal(-1));
497
        }
498
}
499
500
omni_semaphore::~omni_semaphore(void)
501
{
502
        STATUS status = semDelete(semID);
503
504
        DBG_ASSERT(assert(status == OK));
505
506
        if(status != OK)
507
        {
508
                DBG_TRACE(cout<<"Exception: omni_semaphore::~omni_semaphore"<<endl);
509
                DBG_THROW(throw omni_thread_fatal(errno));
510
        }
511
}
512
513
void omni_semaphore::wait(void)
514
{
515
        DBG_ASSERT(assert(!intContext()));                // no wait in ISR
516
517
        STATUS status = semTake(semID, WAIT_FOREVER);
518
519
        DBG_ASSERT(assert(status == OK));
520
521
        if(status != OK)
522
        {
523
                DBG_TRACE(cout<<"Exception: omni_semaphore::wait"<<endl);
524
                DBG_THROW(throw omni_thread_fatal(errno));
525
        }
526
}
527
528
int omni_semaphore::trywait(void)
529
{
530
        STATUS status = semTake(semID, NO_WAIT);
531
532
        DBG_ASSERT(assert(status == OK));
533
534
        if(status != OK)
535
        {
536
                if(errno == S_objLib_OBJ_UNAVAILABLE)
537
                {
538
                        return 0;
539
                }
540
                else
541
                {
542
                        DBG_ASSERT(assert(false));
543
544
                        DBG_TRACE(cout<<"Exception: omni_semaphore::trywait"<<endl);
545
                        DBG_THROW(throw omni_thread_fatal(errno));
546
                }
547
        }
548
549
        return 1;
550
}
551
552
void omni_semaphore::post(void)
553
{
554
        STATUS status = semGive(semID);
555
556
        DBG_ASSERT(assert(status == OK));
557
558
        if(status != OK)
559
        {
560
                DBG_TRACE(cout<<"Exception: omni_semaphore::post"<<endl);
561
                DBG_THROW(throw omni_thread_fatal(errno));
562
        }
563
}
564
565
566
567
///////////////////////////////////////////////////////////////////////////
568
//
569
// Thread
570
//
571
///////////////////////////////////////////////////////////////////////////
572
573
574
//
575
// static variables
576
//
577
omni_mutex* omni_thread::next_id_mutex = 0;
578
int omni_thread::next_id = 0;
579
580
// omniORB requires a larger stack size than the default (21120) on OSF/1
581
static size_t stack_size = OMNI_STACK_SIZE;
582
583
584
//
585
// Initialisation function (gets called before any user code).
586
//
587
588
static int& count() {
589
  static int the_count = 0;
590
  return the_count;
591
}
592
593
omni_thread::init_t::init_t(void)
594
{
595
        // Only do it once however many objects get created.
596
        if(count()++ != 0)
597
                return;
598
599
        attach();
600
}
601
602
omni_thread::init_t::~init_t(void)
603
{
604
    if (--count() != 0) return;
605
606
    omni_thread* self = omni_thread::self();
607
    if (!self) return;
608
609
    taskTcb(taskIdSelf())->spare1 = 0;
610
    delete self;
611
612
    delete next_id_mutex;
613
}
614
615
616
//
617
// Wrapper for thread creation.
618
//
619
extern "C" void omni_thread_wrapper(void* ptr)
620
{
621
        omni_thread* me = (omni_thread*)ptr;
622
623
        DBG_TRACE(cout<<"omni_thread_wrapper: thread "<<me->id()<<" started\n");
624
625
        //
626
        // We can now tweaked the task info since the tcb exist now
627
        //
628
        me->mutex.lock();        // To ensure that start has had time to finish
629
        taskTcb(me->tid)->spare1 = OMNI_THREAD_ID;
630
        taskTcb(me->tid)->spare2 = (int)ptr;
631
        me->mutex.unlock();
632
633
        //
634
        // Now invoke the thread function with the given argument.
635
        //
636
        if(me->fn_void != NULL)
637
        {
638
                (*me->fn_void)(me->thread_arg);
639
                omni_thread::exit();
640
        }
641
642
        if(me->fn_ret != NULL)
643
        {
644
                void* return_value = (*me->fn_ret)(me->thread_arg);
645
                omni_thread::exit(return_value);
646
        }
647
648
        if(me->detached)
649
        {
650
                me->run(me->thread_arg);
651
                omni_thread::exit();
652
        }
653
        else
654
        {
655
                void* return_value = me->run_undetached(me->thread_arg);
656
                omni_thread::exit(return_value);
657
        }
658
}
659
660
661
//
662
// Special functions for VxWorks only
663
//
664
void omni_thread::attach(void)
665
{
666
        DBG_TRACE(cout<<"omni_thread_attach: VxWorks mapping thread initialising\n");
667
668
        int _tid = taskIdSelf();
669
670
        // Check the task is not already attached
671
        if(taskTcb(_tid)->spare1 == OMNI_THREAD_ID)
672
                return;
673
674
        // Create the mutex required to lock the threads debugging id (create before the thread!!!)
675
        if(next_id_mutex == 0)
676
                next_id_mutex = new omni_mutex;
677
678
        // Create a thread object for THIS running process
679
        omni_thread* t = new omni_thread;
680
681
        // Lock its mutex straigh away!
682
        omni_mutex_lock l(t->mutex);
683
684
        // Adjust data members of this instance
685
        t->_state = STATE_RUNNING;
686
        t->tid = taskIdSelf();
687
688
        // Set the thread values so it can be recongnised as a omni_thread
689
        // Set the id last can possibly prevent race condition
690
        taskTcb(t->tid)->spare2 = (int)t;
691
        taskTcb(t->tid)->spare1 = OMNI_THREAD_ID;
692
693
        // Create the running_mutex at this stage, but leave it empty. We are not running
694
        //        in the task context HERE, so taking it would be disastrous.
695
        t->running_cond = new omni_condition(&t->mutex);
696
}
697
698
699
void omni_thread::detach(void)
700
{
701
        DBG_TRACE(cout<<"omni_thread_detach: VxWorks detaching thread mapping\n");
702
703
        int _tid = taskIdSelf();
704
705
        // Check the task has a OMNI_THREAD attached
706
        if(taskTcb(_tid)->spare1 != OMNI_THREAD_ID)
707
                return;
708
709
        // Invalidate the id NOW !
710
        taskTcb(_tid)->spare1 = 0;
711
712
        // Even if NULL, it is safe to delete the thread
713
        omni_thread* t = (omni_thread*)taskTcb(_tid)->spare2;
714
        // Fininsh cleaning the tcb structure
715
        taskTcb(_tid)->spare2 = 0;
716
717
        delete t;
718
}
719
720
721
//
722
// Constructors for omni_thread - set up the thread object but don't
723
// start it running.
724
//
725
726
// construct a detached thread running a given function.
727
omni_thread::omni_thread(void (*fn)(void*), void* arg, priority_t pri)
728
{
729
        common_constructor(arg, pri, 1);
730
        fn_void = fn;
731
        fn_ret = NULL;
732
}
733
734
// construct an undetached thread running a given function.
735
omni_thread::omni_thread(void* (*fn)(void*), void* arg, priority_t pri)
736
{
737
        common_constructor(arg, pri, 0);
738
        fn_void = NULL;
739
        fn_ret = fn;
740
}
741
742
// construct a thread which will run either run() or run_undetached().
743
744
omni_thread::omni_thread(void* arg, priority_t pri)
745
{
746
        common_constructor(arg, pri, 1);
747
        fn_void = NULL;
748
        fn_ret = NULL;
749
}
750
751
// common part of all constructors.
752
void omni_thread::common_constructor(void* arg, priority_t pri, int det)
753
{
754
        _state = STATE_NEW;
755
        _priority = pri;
756
757
        // Set the debugging id
758
        next_id_mutex->lock();
759
        _id = next_id++;
760
        next_id_mutex->unlock();
761
762
        // Note : tid can only be setup when the task is up and running
763
        tid = 0;
764
765
        thread_arg = arg;
766
        detached = det;                // may be altered in start_undetached()
767
768
    _dummy       = 0;
769
    _values      = 0;
770
    _value_alloc = 0;
771
}
772
773
//
774
// Destructor for omni_thread.
775
//
776
omni_thread::~omni_thread(void)
777
{
778
        DBG_TRACE(cout<<"omni_thread::~omni_thread for thread "<<id()<<endl);
779
780
    if (_values) {
781
        for (key_t i=0; i < _value_alloc; i++) {
782
            if (_values[i]) {
783
                delete _values[i];
784
            }
785
        }
786
        delete [] _values;
787
    }
788
789
        delete running_cond;
790
}
791
792
793
//
794
// Start the thread
795
//
796
void omni_thread::start(void)
797
{
798
        omni_mutex_lock l(mutex);
799
800
        DBG_ASSERT(assert(_state == STATE_NEW));
801
802
        if(_state != STATE_NEW)
803
                DBG_THROW(throw omni_thread_invalid());
804
805
        // Allocate memory for the task. (The returned id cannot be trusted by the task)
806
        tid = taskSpawn(
807
                NULL,                                                                 // Task name
808
                vxworks_priority(_priority),        // Priority
809
                0,                                                                         // Option
810
                stack_size,                                                 // Stack size
811
                (FUNCPTR)omni_thread_wrapper, // Priority
812
                (int)this,                                                        // First argument is this
813
                0,0,0,0,0,0,0,0,0                                 // Remaining unused args
814
                );
815
816
        DBG_ASSERT(assert(tid!=ERROR));
817
818
        if(tid==ERROR)
819
                DBG_THROW(throw omni_thread_invalid());
820
821
        _state = STATE_RUNNING;
822
823
        // Create the running_mutex at this stage, but leave it empty. We are not running
824
        //        in the task context HERE, so taking it would be disastrous.
825
        running_cond = new omni_condition(&mutex);
826
}
827
828
829
//
830
// Start a thread which will run the member function run_undetached().
831
//
832
void omni_thread::start_undetached(void)
833
{
834
        DBG_ASSERT(assert(!((fn_void != NULL) || (fn_ret != NULL))));
835
836
        if((fn_void != NULL) || (fn_ret != NULL))
837
                DBG_THROW(throw omni_thread_invalid());
838
839
        detached = 0;
840
841
        start();
842
}
843
844
845
//
846
// join - Wait for the task to complete before returning to the calling process
847
//
848
void omni_thread::join(void** status)
849
{
850
        mutex.lock();
851
852
        if((_state != STATE_RUNNING) && (_state != STATE_TERMINATED))
853
        {
854
                mutex.unlock();
855
856
                DBG_ASSERT(assert(false));
857
858
                DBG_THROW(throw omni_thread_invalid());
859
        }
860
861
        mutex.unlock();
862
863
        DBG_ASSERT(assert(this != self()));
864
865
        if(this == self())
866
                DBG_THROW(throw omni_thread_invalid());
867
868
        DBG_ASSERT(assert(!detached));
869
870
        if(detached)
871
                DBG_THROW(throw omni_thread_invalid());
872
873
        mutex.lock();
874
        running_cond->wait();
875
        mutex.unlock();
876
877
        if(status)
878
                *status = return_val;
879
880
        delete this;
881
}
882
883
884
//
885
// Change this thread's priority.
886
//
887
void omni_thread::set_priority(priority_t pri)
888
{
889
        omni_mutex_lock l(mutex);
890
891
        DBG_ASSERT(assert(_state == STATE_RUNNING));
892
893
        if(_state != STATE_RUNNING)
894
        {
895
                DBG_THROW(throw omni_thread_invalid());
896
        }
897
898
        _priority = pri;
899
900
        if(taskPrioritySet(tid, vxworks_priority(pri))==ERROR)
901
        {
902
                DBG_ASSERT(assert(false));
903
904
                DBG_THROW(throw omni_thread_fatal(errno));
905
        }
906
}
907
908
909
//
910
// create - construct a new thread object and start it running.        Returns thread
911
// object if successful, null pointer if not.
912
//
913
914
// detached version (the entry point is a void)
915
omni_thread* omni_thread::create(void (*fn)(void*), void* arg, priority_t pri)
916
{
917
        omni_thread* t = new omni_thread(fn, arg, pri);
918
919
        t->start();
920
921
        return t;
922
}
923
924
// undetached version (the entry point is a void*)
925
omni_thread* omni_thread::create(void* (*fn)(void*), void* arg, priority_t pri)
926
{
927
        omni_thread* t = new omni_thread(fn, arg, pri);
928
929
        t->start();
930
931
        return t;
932
}
933
934
935
//
936
// exit() _must_ lock the mutex even in the case of a detached thread.        This is
937
// because a thread may run to completion before the thread that created it has
938
// had a chance to get out of start().        By locking the mutex we ensure that the
939
// creating thread must have reached the end of start() before we delete the
940
// thread object.        Of course, once the call to start() returns, the user can
941
// still incorrectly refer to the thread object, but that's their problem.
942
//
943
void omni_thread::exit(void* return_value)
944
{
945
        omni_thread* me = self();
946
947
        if(me)
948
        {
949
                me->mutex.lock();
950
951
                me->return_val = return_value;
952
                me->_state = STATE_TERMINATED;
953
                me->running_cond->signal();
954
955
                me->mutex.unlock();
956
957
                DBG_TRACE(cout<<"omni_thread::exit: thread "<<me->id()<<" detached "<<me->detached<<" return value "<<(int)return_value<<endl);
958
959
                if(me->detached)
960
                        delete me;
961
        }
962
        else
963
                DBG_TRACE(cout<<"omni_thread::exit: called with a non-omnithread. Exit quietly."<<endl);
964
965
        taskDelete(taskIdSelf());
966
}
967
968
969
omni_thread* omni_thread::self(void)
970
{
971
        if(taskTcb(taskIdSelf())->spare1 != OMNI_THREAD_ID)
972
                return NULL;
973
974
        return (omni_thread*)taskTcb(taskIdSelf())->spare2;
975
}
976
977
978
void omni_thread::yield(void)
979
{
980
        taskDelay(NO_WAIT);
981
}
982
983
984
void omni_thread::sleep(unsigned long secs, unsigned long nanosecs)
985
{
986
        int tps = sysClkRateGet();
987
988
        // Convert to us to avoid overflow in the multiplication
989
        //        tps should always be less than 1000 !
990
        nanosecs /= 1000;
991
992
        taskDelay(secs*tps + (nanosecs*tps)/1000000l);
993
}
994
995
996
void omni_thread::get_time( unsigned long* abs_sec,
997
                                                        unsigned long* abs_nsec,
998
                                                        unsigned long rel_sec,
999
                                                        unsigned long rel_nsec)
1000
{
1001
        timespec abs;
1002
        clock_gettime(CLOCK_REALTIME, &abs);
1003
        abs.tv_nsec += rel_nsec;
1004
        abs.tv_sec += rel_sec + abs.tv_nsec / 1000000000;
1005
        abs.tv_nsec = abs.tv_nsec % 1000000000;
1006
        *abs_sec = abs.tv_sec;
1007
        *abs_nsec = abs.tv_nsec;
1008
}
1009
1010
1011
int omni_thread::vxworks_priority(priority_t pri)
1012
{
1013
        switch (pri)
1014
        {
1015
        case PRIORITY_LOW:
1016
                return omni_thread_prio_low;
1017
1018
        case PRIORITY_NORMAL:
1019
                return omni_thread_prio_normal;
1020
1021
        case PRIORITY_HIGH:
1022
                return omni_thread_prio_high;
1023
        }
1024
1025
        DBG_ASSERT(assert(false));
1026
1027
        DBG_THROW(throw omni_thread_invalid());
1028
}
1029
1030
1031
void omni_thread::stacksize(unsigned long sz)
1032
{
1033
        stack_size = sz;
1034
}
1035
1036
1037
unsigned long omni_thread::stacksize()
1038
{
1039
        return stack_size;
1040
}
1041
1042
1043
void omni_thread::show(void)
1044
{
1045
        omni_thread *pThread;
1046
        int s1, s2;
1047
        int tid = taskIdSelf();
1048
1049
        printf("TaskId is %.8x\n", tid);
1050
1051
        s1 = taskTcb(tid)->spare1;
1052
1053
        if(s1 != OMNI_THREAD_ID)
1054
        {
1055
                printf("Spare 1 is %.8x, and not recongnized\n", s1);
1056
1057
                return;
1058
        }
1059
        else
1060
        {
1061
                printf("Spare 1 indicate an omni_thread.\n");
1062
        }
1063
1064
        s2 = taskTcb(tid)->spare2;
1065
1066
        if(s2 == 0)
1067
        {
1068
                printf("Spare 2 is NULL! - No thread object attached !!\n");
1069
1070
                return;
1071
        }
1072
        else
1073
        {
1074
                printf("Thread object at %.8x\n", s2);
1075
        }
1076
1077
        pThread = (omni_thread *)s2;
1078
1079
        state_t status = pThread->_state;
1080
1081
        printf("        | Thread status is ");
1082
1083
        switch (status)
1084
        {
1085
        case STATE_NEW:
1086
                printf("NEW\n");                break;
1087
        case STATE_RUNNING:
1088
                printf("STATE_RUNNING\n"); break;
1089
        case STATE_TERMINATED:
1090
                printf("TERMINATED\n");        break;
1091
        default:
1092
                printf("Illegal (=%.8x)\n", (unsigned int)status);
1093
1094
                return;
1095
        }
1096
1097
        if(pThread->tid != tid)
1098
        {
1099
                printf("        | Task ID in thread object is different!! (=%.8x)\n", pThread->tid);
1100
1101
                return;
1102
        }
1103
        else
1104
        {
1105
                printf("        | Task ID in thread consistent\n");
1106
        }
1107
1108
        printf("\n");
1109
}
1110
1111
1112
//
1113
// Dummy thread
1114
//
1115
1116
class omni_thread_dummy : public omni_thread {
1117
public:
1118
  inline omni_thread_dummy() : omni_thread()
1119
  {
1120
    _dummy = 1;
1121
    _state = STATE_RUNNING;
1122
1123
        // Adjust data members of this instance
1124
        tid = taskIdSelf();
1125
1126
        // Set the thread values so it can be recongnised as a omni_thread
1127
        // Set the id last can possibly prevent race condition
1128
        taskTcb(tid)->spare2 = (int)this;
1129
        taskTcb(tid)->spare1 = OMNI_THREAD_ID;
1130
   }
1131
  inline ~omni_thread_dummy()
1132
  {
1133
        taskTcb(taskIdSelf())->spare1 = 0;
1134
  }
1135
};
1136
1137
omni_thread*
1138
omni_thread::create_dummy()
1139
{
1140
  if (omni_thread::self())
1141
    throw omni_thread_invalid();
1142
1143
  return new omni_thread_dummy;
1144
}
1145
1146
void
1147
omni_thread::release_dummy()
1148
{
1149
  omni_thread* self = omni_thread::self();
1150
  if (!self || !self->_dummy)
1151
    throw omni_thread_invalid();
1152
1153
  omni_thread_dummy* dummy = (omni_thread_dummy*)self;
1154
  delete dummy;
1155
}
1156
1157
1158
#define INSIDE_THREAD_IMPL_CC
1159
#include "threaddata.cc"
1160
#undef INSIDE_THREAD_IMPL_CC