IncomingConnectionFactory.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import java.nio.channels.SelectableChannel;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
final class IncomingConnectionFactory extends EventHandler implements ConnectionI.StartCallback {
public synchronized void activate() {
setState(StateActive);
}
public synchronized void hold() {
setState(StateHolding);
}
public synchronized void destroy() {
setState(StateClosed);
}
public synchronized void updateConnectionObservers() {
for (ConnectionI connection : _connections) {
connection.updateObserver();
}
}
public void waitUntilHolding() throws InterruptedException {
LinkedList<ConnectionI> connections;
synchronized (this) {
//
// First we wait until the connection factory itself is in holding state.
//
while (_state < StateHolding) {
wait();
}
//
// We want to wait until all connections are in holding state outside the thread
// synchronization.
//
connections = new LinkedList<>(_connections);
}
//
// Now we wait until each connection is in holding state.
//
for (ConnectionI connection : connections) {
connection.waitUntilHolding();
}
}
public void waitUntilFinished() throws InterruptedException {
LinkedList<ConnectionI> connections = null;
synchronized (this) {
//
// First we wait until the factory is destroyed. If we are using an acceptor, we also
// wait for it to be closed.
//
while (_state != StateFinished) {
wait();
}
//
// Clear the OA. See bug 1673 for the details of why this is necessary.
//
_adapter = null;
//
// We want to wait until all connections are finished outside the thread
// synchronization.
//
connections = new LinkedList<>(_connections);
}
for (ConnectionI connection : connections) {
try {
connection.waitUntilFinished();
} catch (InterruptedException e) {
//
// Force close all of the connections.
//
for (ConnectionI c : connections) {
c.abort();
}
throw e;
}
}
synchronized (this) {
_connections.clear();
}
}
public synchronized EndpointI endpoint() {
return _endpoint;
}
public synchronized LinkedList<ConnectionI> connections() {
LinkedList<ConnectionI> connections = new LinkedList<>();
//
// Only copy connections which have not been destroyed.
//
for (ConnectionI connection : _connections) {
if (connection.isActiveOrHolding()) {
connections.add(connection);
}
}
return connections;
}
public void flushAsyncBatchRequests(CompressBatch compressBatch, CommunicatorFlushBatch outAsync) {
// connections() is synchronized, no need to synchronize here.
for (ConnectionI c : connections()) {
try {
outAsync.flushConnection(c, compressBatch);
} catch (LocalException ignored) {
}
}
}
//
// Operations from EventHandler.
//
@Override
public void message(ThreadPoolCurrent current) {
ConnectionI connection = null;
synchronized (this) {
if (_state >= StateClosed) {
return;
} else if (_state == StateHolding) {
Thread.yield();
return;
}
if (!_acceptorStarted) {
return;
}
//
// Now accept a new connection.
//
Transceiver transceiver = null;
try {
transceiver = _acceptor.accept();
if (_maxConnections > 0 && _connections.size() == _maxConnections) {
// Can't accept more connections, so we abort this transport connection.
if (_instance.traceLevels().network >= 2) {
StringBuffer s = new StringBuffer("rejecting new ");
s.append(_endpoint.protocol());
s.append(" connection\n");
s.append(transceiver.toString());
s.append("\nbecause the maximum number of connections has been reached");
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
try {
transceiver.close();
} catch (SocketException ex) {
// Ignore
}
return;
}
if (_instance.traceLevels().network >= 2) {
StringBuffer s = new StringBuffer("trying to accept ");
s.append(_endpoint.protocol());
s.append(" connection\n");
s.append(transceiver.toString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
} catch (LocalException ex) {
if (_warn) {
_instance.initializationData().logger.warning(
"error accepting connection:\n" + Ex.toString(ex) + '\n' + _acceptor.toString());
}
return;
}
assert (transceiver != null);
try {
connection =
new ConnectionI(
_adapter.getCommunicator(),
_instance,
transceiver,
null,
_endpoint,
_adapter,
this::removeConnection,
_connectionOptions);
} catch (LocalException ex) {
try {
transceiver.close();
} catch (LocalException exc) {
// Ignore
}
if (_warn) {
_instance.initializationData().logger.warning(
"error accepting connection:\n" + Ex.toString(ex) + '\n' + _acceptor.toString());
}
return;
}
_connections.add(connection);
}
assert (connection != null);
connection.start(this);
}
@Override
public synchronized void finished(ThreadPoolCurrent current, boolean close) {
if (_state < StateClosed) {
if (close) {
closeAcceptor();
}
return;
}
assert (_state >= StateClosed);
setState(StateFinished);
if (close) {
closeAcceptor();
}
}
@Override
public synchronized String toString() {
if (_transceiver != null) {
return _transceiver.toString();
}
return _acceptor.toString();
}
@Override
public SelectableChannel fd() {
assert (_acceptor != null);
return _acceptor.fd();
}
@Override
public void setReadyCallback(ReadyCallback readyCallback) {
if (_acceptor != null) {
_acceptor.setReadyCallback(readyCallback);
}
}
//
// Operations from ConnectionI.StartCallback
//
@Override
public synchronized void connectionStartCompleted(ConnectionI connection) {
//
// Initially, connections are in the holding state. If the factory is active we activate the
// connection.
//
if (_state == StateActive) {
connection.activate();
}
}
@Override
public void connectionStartFailed(ConnectionI connection, LocalException ex) {
// Do not warn about connection exceptions here. The connection is not yet validated.
}
public IncomingConnectionFactory(Instance instance, EndpointI endpoint, ObjectAdapter adapter) {
_instance = instance;
_connectionOptions = instance.serverConnectionOptions(adapter.getName());
// Meaningful only for non-datagram (non-UDP) connections.
_maxConnections =
endpoint.datagram()
? 0
: instance.initializationData()
.properties
.getPropertyAsInt(adapter.getName() + ".MaxConnections");
_endpoint = endpoint;
_adapter = adapter;
_warn =
_instance
.initializationData()
.properties
.getIcePropertyAsInt("Ice.Warn.Connections")
> 0;
_state = StateHolding;
_acceptorStarted = false;
DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
if (defaultsAndOverrides.overrideCompress.isPresent()) {
_endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompress.get());
}
try {
_transceiver = _endpoint.transceiver();
if (_transceiver != null) {
// All this is for UDP "connections".
if (_instance.traceLevels().network >= 2) {
StringBuffer s = new StringBuffer("attempting to bind to ");
s.append(_endpoint.protocol());
s.append(" socket\n");
s.append(_transceiver.toString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
_endpoint = _transceiver.bind();
var connection =
new ConnectionI(
_adapter.getCommunicator(),
_instance,
_transceiver,
null,
_endpoint,
_adapter,
null,
_connectionOptions);
connection.startAndWait();
_connections.add(connection);
assert _maxConnections == 0; // UDP so no max connections
} else {
createAcceptor();
}
} catch (Exception ex) {
//
// Clean up for finalizer.
//
if (_transceiver != null) {
try {
_transceiver.close();
} catch (LocalException e) {
// Here we ignore any exceptions in close().
}
}
_state = StateFinished;
_connections.clear();
if (ex instanceof LocalException) {
throw (LocalException) ex;
} else if (ex instanceof InterruptedException) {
throw new OperationInterruptedException(ex);
} else {
throw new SyscallException(ex);
}
}
}
@SuppressWarnings({"deprecation", "nofinalizer"})
@Override
protected synchronized void finalize() throws Throwable {
try {
Assert.FinalizerAssert(_state == StateFinished);
Assert.FinalizerAssert(_connections.isEmpty());
} catch (Exception ex) {} finally {
super.finalize();
}
}
private static final int StateActive = 0;
private static final int StateHolding = 1;
private static final int StateClosed = 2;
private static final int StateFinished = 3;
private void setState(int state) {
// Don't switch twice.
if (_state == state) {
return;
}
switch (state) {
case StateActive -> {
// Can only switch from holding to active.
if (_state != StateHolding) {
return;
}
if (_acceptor != null) {
if (_instance.traceLevels().network >= 1) {
StringBuffer s = new StringBuffer("accepting ");
s.append(_endpoint.protocol());
s.append(" connections at ");
s.append(_acceptor.toString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
_adapter.getThreadPool().register(this, SocketOperation.Read);
}
for (ConnectionI connection : _connections) {
connection.activate();
}
}
case StateHolding -> {
// Can only switch from active to holding.
if (_state != StateActive) {
return;
}
if (_acceptor != null) {
// Stop accepting new connections.
if (_instance.traceLevels().network >= 1) {
StringBuffer s = new StringBuffer("holding ");
s.append(_endpoint.protocol());
s.append(" connections at ");
s.append(_acceptor.toString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
_adapter.getThreadPool().unregister(this, SocketOperation.Read);
}
for (ConnectionI connection : _connections) {
connection.hold();
}
}
case StateClosed -> {
if (_acceptorStarted) {
//
// If possible, close the acceptor now to prevent new connections from being
// accepted while we are deactivating. This is especially useful if there
// are no more threads in the thread pool available to dispatch the finish()
// call.
//
_acceptorStarted = false;
if (_adapter.getThreadPool().finish(this, true)) {
closeAcceptor();
}
} else {
state = StateFinished;
}
for (ConnectionI connection : _connections) {
connection.destroy(ConnectionI.ObjectAdapterDeactivated);
}
}
case StateFinished -> {
assert (_state == StateClosed);
}
}
_state = state;
notifyAll();
}
private void createAcceptor() {
try {
assert (!_acceptorStarted);
_acceptor = _endpoint.acceptor(_adapter.getName(), _adapter.getSSLEngineFactory());
assert (_acceptor != null);
if (_instance.traceLevels().network >= 2) {
StringBuffer s = new StringBuffer("attempting to bind to ");
s.append(_endpoint.protocol());
s.append(" socket ");
s.append(_acceptor.toString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
_endpoint = _acceptor.listen();
if (_instance.traceLevels().network >= 1) {
StringBuffer s = new StringBuffer("listening for ");
s.append(_endpoint.protocol());
s.append(" connections\n");
s.append(_acceptor.toDetailedString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
_adapter.getThreadPool().initialize(this);
if (_state == StateActive) {
_adapter.getThreadPool().register(this, SocketOperation.Read);
}
_acceptorStarted = true;
} catch (Exception ex) {
if (_acceptor != null) {
_acceptor.close();
}
throw ex;
}
}
private void closeAcceptor() {
assert (_acceptor != null);
if (_instance.traceLevels().network >= 1) {
StringBuffer s = new StringBuffer("stopping to accept ");
s.append(_endpoint.protocol());
s.append(" connections at ");
s.append(_acceptor.toString());
_instance
.initializationData()
.logger
.trace(_instance.traceLevels().networkCat, s.toString());
}
assert (!_acceptorStarted);
_acceptor.close();
}
private synchronized void removeConnection(ConnectionI connection) {
if (_state == StateActive || _state == StateHolding) {
_connections.remove(connection);
}
// else it's already being cleaned up.
}
private final Instance _instance;
private final ConnectionOptions _connectionOptions;
private final int _maxConnections;
private Acceptor _acceptor;
private Transceiver _transceiver;
private EndpointI _endpoint;
private ObjectAdapter _adapter;
private final boolean _warn;
private final Set<ConnectionI> _connections = new HashSet<>();
private int _state;
private boolean _acceptorStarted;
}