Selector.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
final class Selector {
static final class TimeoutException extends java.lang.Exception {
private static final long serialVersionUID = 7885765825975312023L;
}
Selector(Instance instance) {
_instance = instance;
_selecting = false;
_interrupted = false;
try {
_selector = java.nio.channels.Selector.open();
} catch (IOException ex) {
throw new SyscallException(ex);
}
//
// The Selector holds a Set representing the selected keys. The
// Set reference doesn't change, so we obtain it once here.
//
_keys = _selector.selectedKeys();
}
void destroy() {
if (_selector != null) {
try {
_selector.close();
} catch (IOException ex) {}
_selector = null;
}
}
void initialize(EventHandler handler) {
if (handler.fd() != null) {
updateImpl(handler);
}
}
void update(EventHandler handler, int remove, int add) {
int previous = handler._registered;
handler._registered = handler._registered & ~remove;
handler._registered = handler._registered | add;
if (previous == handler._registered) {
return;
}
if (handler.fd() != null) {
updateImpl(handler);
}
checkReady(handler);
}
void enable(EventHandler handler, int status) {
if ((handler._disabled & status) == 0) {
return;
}
handler._disabled = handler._disabled & ~status;
if (handler._key != null && (handler._registered & status) != 0) {
updateImpl(handler); // If registered with the selector, update the registration.
}
checkReady(handler);
}
void disable(EventHandler handler, int status) {
if ((handler._disabled & status) != 0) {
return;
}
handler._disabled = handler._disabled | status;
if (handler._key != null && (handler._registered & status) != 0) {
updateImpl(handler); // If registered with the selector, update the registration.
}
checkReady(handler);
}
boolean finish(EventHandler handler, boolean closeNow) {
handler._registered = 0;
if (handler._key != null) {
handler._key.cancel();
handler._key = null;
}
_changes.remove(handler);
checkReady(handler);
return closeNow;
}
void ready(EventHandler handler, int status, boolean value) {
if (((handler._ready & status) != 0) == value) {
return; // Nothing to do if ready state already correctly set.
}
if (value) {
handler._ready |= status;
} else {
handler._ready &= ~status;
}
checkReady(handler);
}
void startSelect() {
if (!_changes.isEmpty()) {
updateSelector();
}
_selecting = true;
//
// If there are ready handlers, don't block in select, just do a non-blocking
// select to retrieve new ready handlers from the Java selector.
//
_selectNow = !_readyHandlers.isEmpty();
}
void finishSelect(List<EventHandlerOpPair> handlers) {
assert (handlers.isEmpty());
// If key set is empty and we weren't woken up.
if (_keys.isEmpty() && _readyHandlers.isEmpty() && !_interrupted) {
// This is necessary to prevent a busy loop in case of a spurious wake-up which
// sometime occurs in the client thread pool when the communicator is destroyed.
// If there are too many successive spurious wake-ups, we log an error.
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
// Eat the InterruptedException (as we do in ThreadPool.promoteFollower).
}
if (++_spuriousWakeUp > 100) {
_spuriousWakeUp = 0;
_instance.initializationData().logger.warning("spurious selector wake up");
}
return;
}
_interrupted = false;
_spuriousWakeUp = 0;
for (SelectionKey key : _keys) {
EventHandler handler = (EventHandler) key.attachment();
try {
// Use the intersection of readyOps and interestOps because we only want to
// report the operations in which the handler is still interested.
final int op = fromJavaOps(key.readyOps() & key.interestOps());
// Handler will be added by the loop below.
if (!_readyHandlers.contains(handler)) {
handlers.add(new EventHandlerOpPair(handler, op));
}
} catch (CancelledKeyException ex) {
assert (handler._registered == 0);
}
}
for (EventHandler handler : _readyHandlers) {
int op = handler._ready & ~handler._disabled & handler._registered;
if (handler._key != null && _keys.contains(handler._key)) {
op |= fromJavaOps(handler._key.readyOps() & handler._key.interestOps());
}
if (op > 0) {
handlers.add(new EventHandlerOpPair(handler, op));
}
}
_keys.clear();
_selecting = false;
}
void select(long timeout) throws TimeoutException {
while (true) {
try {
if (_selectNow) {
_selector.selectNow();
} else if (timeout > 0) {
//
// NOTE: On some platforms, select() sometime returns slightly before
// the timeout (at least according to our monotonic time). To make sure
// timeouts are correctly detected, we wait for a little longer than
// the configured timeout (10ms).
//
long before = Time.currentMonotonicTimeMillis();
if (_selector.select(timeout * 1000 + 10) == 0) {
if (Time.currentMonotonicTimeMillis() - before >= timeout * 1000) {
throw new TimeoutException();
}
}
} else {
_selector.select();
}
} catch (CancelledKeyException ex) {
// This sometime occurs on macOS, ignore.
continue;
} catch (IOException ex) {
//
// Pressing Ctrl-C causes select() to raise an
// IOException, which seems like a JDK bug. We trap
// for that special case here and ignore it.
// Hopefully we're not masking something important!
//
if (ex instanceof InterruptedIOException) {
continue;
}
try {
String s = "selector failed:\n" + ex.getCause().getMessage();
try {
_instance.initializationData().logger.error(s);
} catch (Throwable ex1) {
System.out.println(s);
}
} catch (Throwable ex2) {
// Ignore
}
try {
Thread.sleep(1);
} catch (InterruptedException ex2) {}
}
break;
}
}
private void updateImpl(EventHandler handler) {
_changes.add(handler);
wakeup();
}
private void updateSelector() {
for (EventHandler handler : _changes) {
int status = handler._registered & ~handler._disabled;
int ops = toJavaOps(handler, status);
if (handler._key == null) {
if (handler._registered != 0) {
try {
handler._key = handler.fd().register(_selector, ops, handler);
} catch (ClosedChannelException ex) {
assert false;
}
}
} else {
handler._key.interestOps(ops);
}
}
_changes.clear();
}
private void checkReady(EventHandler handler) {
if ((handler._ready & ~handler._disabled & handler._registered) != 0) {
_readyHandlers.add(handler);
if (_selecting) {
wakeup();
}
} else {
_readyHandlers.remove(handler);
}
}
private void wakeup() {
if (_selecting && !_interrupted) {
_selector.wakeup();
_interrupted = true;
}
}
private int toJavaOps(EventHandler handler, int o) {
int op = 0;
if ((o & SocketOperation.Read) != 0) {
if ((handler.fd().validOps() & SelectionKey.OP_READ) != 0) {
op |= SelectionKey.OP_READ;
} else {
op |= SelectionKey.OP_ACCEPT;
}
}
if ((o & SocketOperation.Write) != 0) {
op |= SelectionKey.OP_WRITE;
}
if ((o & SocketOperation.Connect) != 0) {
op |= SelectionKey.OP_CONNECT;
}
return op;
}
private int fromJavaOps(int o) {
int op = 0;
if ((o
& (SelectionKey.OP_READ
| SelectionKey.OP_ACCEPT))
!= 0) {
op |= SocketOperation.Read;
}
if ((o & SelectionKey.OP_WRITE) != 0) {
op |= SocketOperation.Write;
}
if ((o & SelectionKey.OP_CONNECT) != 0) {
op |= SocketOperation.Connect;
}
return op;
}
private final Instance _instance;
private java.nio.channels.Selector _selector;
private final Set<SelectionKey> _keys;
private final HashSet<EventHandler> _changes = new HashSet<>();
private final HashSet<EventHandler> _readyHandlers = new HashSet<>();
private boolean _selecting;
private boolean _selectNow;
private boolean _interrupted;
private int _spuriousWakeUp;
}