1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index 255d237..9954a8a 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -409,6 +409,8 @@ void ThreadManager::Impl::removeWorker(size_t value) {
workerMaxCount_ -= value;
+ shutdown_mutex_.unlock();
+
if (idleCount_ < value) {
for (size_t ix = 0; ix < idleCount_; ix++) {
monitor_.notify();
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.h b/lib/cpp/src/thrift/concurrency/ThreadManager.h
index 7bb71d1..e97fd25 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.h
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.h
@@ -24,6 +24,7 @@
#include <thrift/cxxfunctional.h>
#include <sys/types.h>
#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/Mutex.h>
namespace apache {
namespace thrift {
@@ -59,6 +60,7 @@ protected:
ThreadManager() {}
public:
+ Mutex shutdown_mutex_;
typedef apache::thrift::stdcxx::function<void(boost::shared_ptr<Runnable>)> ExpireCallback;
virtual ~ThreadManager() {}
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
index 0530d8d..d6b73dc 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.cpp
@@ -194,19 +194,28 @@ void TThreadPoolServer::serve() {
}
}
- // If stopped manually, join the existing threads
- if (stop_) {
- try {
- serverTransport_->close();
- threadManager_->join();
- } catch (TException& tx) {
- string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
- GlobalOutput(errStr.c_str());
+ {
+ Guard g(threadManager_->shutdown_mutex_);
+ // If stopped manually, join the existing threads
+ if (stop_) {
+ try {
+ serverTransport_->close();
+ threadManager_->join();
+ } catch (TException& tx) {
+ string errStr = string("TThreadPoolServer: Exception shutting down: ") + tx.what();
+ GlobalOutput(errStr.c_str());
+ }
+ stop_ = false;
}
- stop_ = false;
}
}
+void TThreadPoolServer::stop() {
+ threadManager_->shutdown_mutex_.lock();
+ stop_ = true;
+ serverTransport_->interrupt();
+}
+
int64_t TThreadPoolServer::getTimeout() const {
return timeout_;
}
diff --git a/lib/cpp/src/thrift/server/TThreadPoolServer.h b/lib/cpp/src/thrift/server/TThreadPoolServer.h
index ad7e7ef..9b89846 100644
--- a/lib/cpp/src/thrift/server/TThreadPoolServer.h
+++ b/lib/cpp/src/thrift/server/TThreadPoolServer.h
@@ -24,6 +24,7 @@
#include <thrift/server/TServer.h>
#include <thrift/transport/TServerTransport.h>
+
#include <boost/shared_ptr.hpp>
namespace apache {
@@ -113,10 +114,7 @@ public:
virtual void setTimeout(int64_t value);
- virtual void stop() {
- stop_ = true;
- serverTransport_->interrupt();
- }
+ virtual void stop();
virtual int64_t getTaskExpiration() const;
|