ThreadPool.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import com.zeroc.Ice.Instrumentation.CommunicatorObserver;
import com.zeroc.Ice.Instrumentation.ThreadObserver;
import com.zeroc.Ice.Instrumentation.ThreadState;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
final class ThreadPool implements Executor {
final class ShutdownWorkItem implements ThreadPoolWorkItem {
@Override
public void execute(ThreadPoolCurrent current) {
current.ioCompleted();
try {
_instance.objectAdapterFactory().shutdown();
} catch (CommunicatorDestroyedException ex) {}
}
}
static final class FinishedWorkItem implements ThreadPoolWorkItem {
public FinishedWorkItem(EventHandler handler, boolean close) {
_handler = handler;
_close = close;
}
@Override
public void execute(ThreadPoolCurrent current) {
_handler.finished(current, _close);
}
private final EventHandler _handler;
private final boolean _close;
}
static final class JoinThreadWorkItem implements ThreadPoolWorkItem {
public JoinThreadWorkItem(EventHandlerThread thread) {
_thread = thread;
}
@Override
public void execute(ThreadPoolCurrent current) {
// No call to ioCompleted, this shouldn't block (and we don't want to cause a new thread
// to be started).
try {
_thread.join();
} catch (InterruptedException e) {
// Ignore.
}
}
private final EventHandlerThread _thread;
}
static final class InterruptWorkItem implements ThreadPoolWorkItem {
@Override
public void execute(ThreadPoolCurrent current) {
// Nothing to do, this is just used to interrupt the thread pool selector.
}
}
//
// Exception raised by the thread pool work queue when the thread pool is destroyed.
//
static final class DestroyedException extends RuntimeException {
private static final long serialVersionUID = -6665535975321237670L;
}
public ThreadPool(Instance instance, String prefix, int timeout) {
Properties properties = instance.initializationData().properties;
_instance = instance;
_executor = instance.initializationData().executor;
_destroyed = false;
_prefix = prefix;
_selector = new Selector(instance);
_threadIndex = 0;
_inUse = 0;
_inUseIO = 0;
_promote = true;
_serialize = properties.getPropertyAsInt(_prefix + ".Serialize") > 0;
_serverIdleTime = timeout;
_threadPrefix = Util.createThreadName(properties, _prefix);
int nProcessors = Runtime.getRuntime().availableProcessors();
//
// We use just one thread as the default. This is the fastest possible setting, still allows
// one level of nesting, and doesn't require to make the servants thread safe.
//
int size = properties.getPropertyAsIntWithDefault(_prefix + ".Size", 1);
if (size < 1) {
String s = _prefix + ".Size < 1; Size adjusted to 1";
_instance.initializationData().logger.warning(s);
size = 1;
}
int sizeMax = properties.getPropertyAsIntWithDefault(_prefix + ".SizeMax", size);
if (sizeMax == -1) {
sizeMax = nProcessors;
}
if (sizeMax < size) {
String s =
_prefix
+ ".SizeMax < "
+ _prefix
+ ".Size; SizeMax adjusted to Size ("
+ size
+ ")";
_instance.initializationData().logger.warning(s);
sizeMax = size;
}
int sizeWarn = properties.getPropertyAsInt(_prefix + ".SizeWarn");
if (sizeWarn != 0 && sizeWarn < size) {
String s =
_prefix
+ ".SizeWarn < "
+ _prefix
+ ".Size; adjusted SizeWarn to Size ("
+ size
+ ")";
_instance.initializationData().logger.warning(s);
sizeWarn = size;
} else if (sizeWarn > sizeMax) {
String s =
_prefix
+ ".SizeWarn > "
+ _prefix
+ ".SizeMax; adjusted SizeWarn to SizeMax ("
+ sizeMax
+ ")";
_instance.initializationData().logger.warning(s);
sizeWarn = sizeMax;
}
int threadIdleTime =
properties.getPropertyAsIntWithDefault(_prefix + ".ThreadIdleTime", 60);
if (threadIdleTime < 0) {
String s = _prefix + ".ThreadIdleTime < 0; ThreadIdleTime adjusted to 0";
_instance.initializationData().logger.warning(s);
threadIdleTime = 0;
}
_size = size;
_sizeMax = sizeMax;
_sizeWarn = sizeWarn;
_sizeIO = Math.min(sizeMax, nProcessors);
_threadIdleTime = threadIdleTime;
int stackSize = properties.getPropertyAsInt(_prefix + ".StackSize");
if (stackSize < 0) {
String s = _prefix + ".StackSize < 0; Size adjusted to JRE default";
_instance.initializationData().logger.warning(s);
stackSize = 0;
}
_stackSize = stackSize;
boolean hasPriority = properties.getProperty(_prefix + ".ThreadPriority").length() > 0;
int priority = properties.getPropertyAsInt(_prefix + ".ThreadPriority");
if (!hasPriority) {
hasPriority = properties.getIceProperty("Ice.ThreadPriority").length() > 0;
priority = properties.getIcePropertyAsInt("Ice.ThreadPriority");
}
_hasPriority = hasPriority;
_priority = priority;
_workQueue = new ThreadPoolWorkQueue(_instance, this, _selector);
_nextHandler = _handlers.iterator();
if (_instance.traceLevels().threadPool >= 1) {
String s =
"creating "
+ _prefix
+ ": Size = "
+ _size
+ ", SizeMax = "
+ _sizeMax
+ ", SizeWarn = "
+ _sizeWarn;
_instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
}
try {
for (int i = 0; i < _size; i++) {
EventHandlerThread thread =
new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
if (_hasPriority) {
thread.start(_priority);
} else {
thread.start(Thread.NORM_PRIORITY);
}
_threads.add(thread);
}
} catch (RuntimeException ex) {
String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
destroy();
try {
joinWithAllThreads();
} catch (InterruptedException e) {
throw new OperationInterruptedException(e);
}
throw ex;
}
}
@SuppressWarnings({"nofinalizer", "deprecation"})
@Override
protected synchronized void finalize() throws Throwable {
try {
Assert.FinalizerAssert(_destroyed);
} catch (Exception ex) {} finally {
super.finalize();
}
}
public synchronized void destroy() {
if (_destroyed) {
return;
}
_destroyed = true;
_workQueue.destroy();
}
public synchronized void updateObservers() {
for (EventHandlerThread thread : _threads) {
thread.updateObserver();
}
}
public synchronized void initialize(final EventHandler handler) {
assert (!_destroyed);
_selector.initialize(handler);
handler.setReadyCallback(
new ReadyCallback() {
public void ready(int op, boolean value) {
synchronized (ThreadPool.this) {
if (_destroyed) {
return;
}
_selector.ready(handler, op, value);
}
}
});
}
public void register(EventHandler handler, int op) {
update(handler, SocketOperation.None, op);
}
public synchronized void update(EventHandler handler, int remove, int add) {
assert (!_destroyed);
// Don't remove what needs to be added
remove &= ~add;
// Don't remove/add if already un-registered or registered
remove = handler._registered & remove;
add = ~handler._registered & add;
if (remove == add) {
return;
}
_selector.update(handler, remove, add);
}
public void unregister(EventHandler handler, int op) {
update(handler, op, SocketOperation.None);
}
public synchronized boolean finish(EventHandler handler, boolean closeNow) {
assert (!_destroyed);
closeNow = _selector.finish(handler, closeNow);
_workQueue.queue(new FinishedWorkItem(handler, !closeNow));
return closeNow;
}
public void executeFromThisThread(RunnableThreadPoolWorkItem workItem) {
if (_executor != null) {
try {
_executor.accept(workItem, workItem.getConnection());
} catch (Exception ex) {
if (_instance
.initializationData()
.properties
.getIcePropertyAsInt("Ice.Warn.Executor")
> 1) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
ex.printStackTrace(pw);
pw.flush();
_instance
.initializationData()
.logger
.warning("executor exception:\n" + sw.toString());
}
}
} else {
workItem.run();
}
}
public synchronized void dispatch(RunnableThreadPoolWorkItem workItem) {
if (_destroyed) {
throw new CommunicatorDestroyedException();
}
_workQueue.queue(workItem);
}
public void joinWithAllThreads() throws InterruptedException {
//
// _threads is immutable after destroy() has been called,
// therefore no synchronization is needed. (Synchronization wouldn't be possible here
// anyway, because otherwise the other threads would never terminate.)
//
for (EventHandlerThread thread : _threads) {
thread.join();
}
//
// Destroy the selector
//
_selector.destroy();
}
//
// Implement execute method from java.util.concurrent.Executor interface
//
@Override
public void execute(Runnable command) {
dispatch(
new RunnableThreadPoolWorkItem() {
@Override
public void run() {
command.run();
}
});
}
private void run(EventHandlerThread thread) {
ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this, thread);
boolean select = false;
while (true) {
if (current._handler != null) {
try {
current._handler.message(current);
} catch (DestroyedException ex) {
synchronized (this) {
--_inUse;
thread.setState(ThreadState.ThreadStateIdle);
}
return;
} catch (Exception ex) {
String s = "exception in `" + _prefix + "':\n" + Ex.toString(ex);
s += "\nevent handler: " + current._handler.toString();
_instance.initializationData().logger.error(s);
}
} else if (select) {
try {
_selector.select(_serverIdleTime);
} catch (Selector.TimeoutException ex) {
synchronized (this) {
if (!_destroyed && _inUse == 0) {
_workQueue.queue(new ShutdownWorkItem()); // Select timed-out.
}
continue;
}
}
}
synchronized (this) {
if (current._handler == null) {
if (select) {
_selector.finishSelect(_handlers);
select = false;
_nextHandler = _handlers.iterator();
} else if (!current._leader && followerWait(current)) {
return; // Wait timed-out.
}
} else if (_sizeMax > 1) {
if (!current._ioCompleted) {
//
// The handler didn't call ioCompleted() so we take care of decreasing
// the IO thread count now.
//
--_inUseIO;
} else {
//
// If the handler called ioCompleted(), we re-enable the handler in
// case it was disabled and we decrease the number of thread in use.
//
if (_serialize) {
_selector.enable(current._handler, current.operation);
}
assert (_inUse > 0);
--_inUse;
}
if (!current._leader && followerWait(current)) {
return; // Wait timed-out.
}
}
//
// Get the next ready handler.
//
current._handler = null;
while (_nextHandler.hasNext()) {
EventHandlerOpPair n = _nextHandler.next();
int op = n.op & ~n.handler._disabled & n.handler._registered;
if (op != 0) {
current._ioCompleted = false;
current._handler = n.handler;
current.operation = op;
thread.setState(ThreadState.ThreadStateInUseForIO);
break;
}
}
if (current._handler == null) {
//
// If there are no more ready handlers and there are still threads busy
// performing IO, we give up leadership and promote another follower (which will
// perform the
// select() only once all the IOs are completed). Otherwise, if there's no more
// threads peforming IOs, it's time to do another select().
//
if (_inUseIO > 0) {
promoteFollower(current);
} else {
_handlers.clear();
_selector.startSelect();
select = true;
thread.setState(ThreadState.ThreadStateIdle);
}
} else if (_sizeMax > 1) {
//
// Increment the IO thread count and if there's still threads available to
// perform IO and more handlers ready, we promote a follower.
//
++_inUseIO;
if (_nextHandler.hasNext() && _inUseIO < _sizeIO) {
promoteFollower(current);
}
}
}
}
}
synchronized void ioCompleted(ThreadPoolCurrent current) {
current._ioCompleted =
true; // Set the IO completed flag to specify that ioCompleted() has been called.
current._thread.setState(ThreadState.ThreadStateInUseForUser);
if (_sizeMax > 1) {
--_inUseIO;
if (!_destroyed) {
if (_serialize) {
_selector.disable(current._handler, current.operation);
}
}
if (current._leader) {
//
// If this thread is still the leader, it's time to promote a new leader.
//
promoteFollower(current);
} else if (_promote && (_nextHandler.hasNext() || _inUseIO == 0)) {
notify();
}
assert (_inUse >= 0);
++_inUse;
if (_inUse == _sizeWarn) {
String s =
"thread pool `"
+ _prefix
+ "' is running low on threads\n"
+ "Size="
+ _size
+ ", "
+ "SizeMax="
+ _sizeMax
+ ", "
+ "SizeWarn="
+ _sizeWarn;
_instance.initializationData().logger.warning(s);
}
if (!_destroyed) {
assert (_inUse <= _threads.size());
if (_inUse < _sizeMax && _inUse == _threads.size()) {
if (_instance.traceLevels().threadPool >= 1) {
String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().threadPoolCat, s);
}
try {
EventHandlerThread thread =
new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
_threads.add(thread);
if (_hasPriority) {
thread.start(_priority);
} else {
thread.start(Thread.NORM_PRIORITY);
}
} catch (RuntimeException ex) {
String s =
"cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
}
}
}
}
private synchronized void promoteFollower(ThreadPoolCurrent current) {
assert (!_promote && current._leader);
_promote = true;
if (_inUseIO < _sizeIO && (_nextHandler.hasNext() || _inUseIO == 0)) {
notify();
}
current._leader = false;
}
private synchronized boolean followerWait(ThreadPoolCurrent current) {
assert (!current._leader);
current._thread.setState(ThreadState.ThreadStateIdle);
//
// It's important to clear the handler before waiting to make sure that resources for the
// handler are released now if it's finished. We also clear the per-thread stream.
//
current._handler = null;
current.stream.reset();
//
// Wait to be promoted and for all the IO threads to be done.
//
while (!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0)) {
if (_threadIdleTime > 0) {
long before = Time.currentMonotonicTimeMillis();
boolean interrupted = false;
try {
//
// If the wait is interrupted then we'll let the thread die as if it timed out.
//
wait(_threadIdleTime * 1000);
} catch (InterruptedException e) {
interrupted = true;
}
if (interrupted
|| Time.currentMonotonicTimeMillis() - before >= _threadIdleTime * 1000) {
if (!_destroyed
&& (!_promote
|| _inUseIO == _sizeIO
|| (!_nextHandler.hasNext() && _inUseIO > 0))) {
if (_instance.traceLevels().threadPool >= 1) {
String s = "shrinking " + _prefix + ": Size=" + (_threads.size() - 1);
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().threadPoolCat, s);
}
assert (_threads.size()
> 1); // Can only be called by a waiting follower thread.
_threads.remove(current._thread);
_workQueue.queue(new JoinThreadWorkItem(current._thread));
return true;
}
}
} else {
try {
wait();
} catch (InterruptedException e) {
//
// Eat the InterruptedException.
//
}
}
}
current._leader = true; // The current thread has become the leader.
_promote = false;
return false;
}
private final Instance _instance;
private final BiConsumer<Runnable, Connection> _executor;
private final ThreadPoolWorkQueue _workQueue;
private boolean _destroyed;
private final String _prefix;
private final String _threadPrefix;
private final Selector _selector;
final class EventHandlerThread implements Runnable {
EventHandlerThread(String name) {
_name = name;
_state = ThreadState.ThreadStateIdle;
updateObserver();
}
public void updateObserver() {
// Must be called with the thread pool mutex locked
CommunicatorObserver obsv =
_instance.initializationData().observer;
if (obsv != null) {
_observer = obsv.getThreadObserver(_prefix, _name, _state, _observer);
if (_observer != null) {
_observer.attach();
}
}
}
public void setState(ThreadState s) {
// Must be called with the thread pool mutex locked
if (_observer != null) {
if (_state != s) {
_observer.stateChanged(_state, s);
}
}
_state = s;
}
public void join() throws InterruptedException {
_thread.join();
}
public void start(int priority) {
_thread = new Thread(null, this, _name, _stackSize);
_thread.setPriority(priority);
_thread.start();
}
@Override
public void run() {
if (_instance.initializationData().threadStart != null) {
try {
_instance.initializationData().threadStart.run();
} catch (Exception ex) {
String s = "threadStart method raised an unexpected exception in `";
s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
}
try {
ThreadPool.this.run(this);
} catch (Exception ex) {
String s =
"exception in `" + _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
if (_observer != null) {
_observer.detach();
}
if (_instance.initializationData().threadStop != null) {
try {
_instance.initializationData().threadStop.run();
} catch (Exception ex) {
String s = "threadStop method raised an unexpected exception in `";
s += _prefix + "' thread " + _name + ":\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
}
}
}
Thread getThread() {
return _thread;
}
private final String _name;
private Thread _thread;
private ThreadState _state;
private ThreadObserver _observer;
}
private final int _size; // Number of threads that are pre-created.
private final int _sizeIO; // Number of threads that can concurrently perform IO.
private final int _sizeMax; // Maximum number of threads.
private final int
_sizeWarn; // If _inUse reaches _sizeWarn, a "low on threads" warning will be printed.
private final boolean _serialize; // True if requests need to be serialized over the connection.
private final int _priority;
private final boolean _hasPriority;
private final long _serverIdleTime;
private final long _threadIdleTime;
private final int _stackSize;
private final List<EventHandlerThread> _threads = new ArrayList<>();
private int _threadIndex; // For assigning thread names.
private int _inUse; // Number of threads that are currently in use.
private int _inUseIO; // Number of threads that are currently performing IO.
private final List<EventHandlerOpPair> _handlers = new ArrayList<>();
private Iterator<EventHandlerOpPair> _nextHandler;
private boolean _promote;
}