OutgoingConnectionFactory.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import com.zeroc.Ice.Instrumentation.CommunicatorObserver;
import com.zeroc.Ice.Instrumentation.Observer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

final class OutgoingConnectionFactory {
    //
    // Helper class to multi hash map.
    //
    private static class MultiHashMap<K, V> extends HashMap<K, List<V>> {
        public void putOne(K key, V value) {
            List<V> list = this.get(key);
            if (list == null) {
                list = new LinkedList<>();
                this.put(key, list);
            }
            list.add(value);
        }

        private static final long serialVersionUID = -8109942200313578944L;
    }

    interface CreateConnectionCallback {
        void setConnection(ConnectionI connection, boolean compress);

        void setException(LocalException ex);
    }

    public synchronized void destroy() {
        if (_destroyed) {
            return;
        }

        for (List<ConnectionI> connectionList : _connections.values()) {
            for (ConnectionI connection : connectionList) {
                connection.destroy(ConnectionI.CommunicatorDestroyed);
            }
        }

        _destroyed = true;
        _communicator = null;
        _defaultObjectAdapter = null;
        notifyAll();
    }

    public synchronized void updateConnectionObservers() {
        for (List<ConnectionI> connectionList : _connections.values()) {
            for (ConnectionI connection : connectionList) {
                connection.updateObserver();
            }
        }
    }

    // Called from Instance.destroy().
    public void waitUntilFinished() {
        Map<Connector, List<ConnectionI>> connections = null;
        synchronized (this) {
            //
            // First we wait until the factory is destroyed. We also wait until there are no pending
            // connections anymore. Only then we can be sure the _connections contains all
            // connections.
            //
            while (!_destroyed || !_pending.isEmpty() || _pendingConnectCount > 0) {
                try {
                    wait();
                } catch (InterruptedException ex) {
                    throw new OperationInterruptedException(ex);
                }
            }

            //
            // We want to wait until all connections are finished outside the thread
            // synchronization.
            //
            connections = new HashMap<>(_connections);
        }

        //
        // Now we wait until the destruction of each connection is finished.
        //
        for (List<ConnectionI> connectionList : connections.values()) {
            for (ConnectionI connection : connectionList) {
                try {
                    connection.waitUntilFinished();
                } catch (InterruptedException e) {
                    //
                    // Force close all of the connections.
                    //
                    for (List<ConnectionI> l : connections.values()) {
                        for (ConnectionI c : l) {
                            c.abort();
                        }
                    }
                    throw new OperationInterruptedException(e);
                }
            }
        }

        synchronized (this) {
            _connections.clear();
            _connectionsByEndpoint.clear();
        }
    }

    public void create(EndpointI[] endpts, boolean hasMore, CreateConnectionCallback callback) {
        assert (endpts.length > 0);

        // TODO: fix API to use List directly.
        var endpoints = Arrays.asList(endpts);

        //
        // Try to find a connection to one of the given endpoints.
        //
        try {
            Holder<Boolean> compress = new Holder<>();
            ConnectionI connection = findConnectionByEndpoint(endpoints, compress);
            if (connection != null) {
                callback.setConnection(connection, compress.value);
                return;
            }
        } catch (LocalException ex) {
            callback.setException(ex);
            return;
        }

        final ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback);
        cb.getConnectors();
    }

    public void setRouterInfo(RouterInfo routerInfo) {
        assert (routerInfo != null);
        ObjectAdapter adapter = routerInfo.getAdapter();
        EndpointI[] endpoints =
            routerInfo.getClientEndpoints(); // Must be called outside the synchronization

        synchronized (this) {
            if (_destroyed) {
                throw new CommunicatorDestroyedException();
            }

            //
            // Search for connections to the router's client proxy
            // endpoints, and update the object adapter for such
            // connections, so that callbacks from the router can be received over such connections.
            //
            for (EndpointI endpoint : endpoints) {
                //
                // The Connection object does not take the compression flag of
                // endpoints into account, but instead gets the information
                // about whether messages should be compressed or not from other sources. In order
                // to allow connection sharing for
                // endpoints that differ in the value of the compression flag
                // only, we always set the compression flag to false here in this connection
                // factory. We also clear the timeout as it is
                // no longer used for Ice 3.8.
                //
                endpoint = endpoint.compress(false).timeout(-1);

                for (List<ConnectionI> connectionList : _connections.values()) {
                    for (ConnectionI connection : connectionList) {
                        if (connection.endpoint().equals(endpoint)) {
                            connection.setAdapter(adapter);
                        }
                    }
                }
            }
        }
    }

    public synchronized void removeAdapter(ObjectAdapter adapter) {
        if (_destroyed) {
            return;
        }

        for (List<ConnectionI> connectionList : _connections.values()) {
            for (ConnectionI connection : connectionList) {
                if (connection.getAdapter() == adapter) {
                    connection.setAdapter(null);
                }
            }
        }
    }

    public void flushAsyncBatchRequests(
            CompressBatch compressBatch, CommunicatorFlushBatch outAsync) {
        List<ConnectionI> c = new LinkedList<>();

        synchronized (this) {
            if (!_destroyed) {
                for (List<ConnectionI> connectionList : _connections.values()) {
                    for (ConnectionI connection : connectionList) {
                        if (connection.isActiveOrHolding()) {
                            c.add(connection);
                        }
                    }
                }
            }
        }

        for (ConnectionI conn : c) {
            try {
                outAsync.flushConnection(conn, compressBatch);
            } catch (LocalException ex) {
                // Ignore.
            }
        }
    }

    //
    // Only for use by Instance.
    //
    OutgoingConnectionFactory(Communicator communicator, Instance instance) {
        _communicator = communicator;
        _instance = instance;
        _connectionOptions = instance.clientConnectionOptions();
        _destroyed = false;
    }

    @SuppressWarnings({"nofinalizer", "deprecation"})
    @Override
    protected synchronized void finalize() throws Throwable {
        try {
            Assert.FinalizerAssert(_destroyed);
            Assert.FinalizerAssert(_connections.isEmpty());
            Assert.FinalizerAssert(_connectionsByEndpoint.isEmpty());
            Assert.FinalizerAssert(_pendingConnectCount == 0);
            Assert.FinalizerAssert(_pending.isEmpty());
        } catch (Exception ex) {} finally {
            super.finalize();
        }
    }

    private synchronized ConnectionI findConnectionByEndpoint(
            List<EndpointI> endpoints, Holder<Boolean> compress) {
        if (_destroyed) {
            throw new CommunicatorDestroyedException();
        }

        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
        assert (!endpoints.isEmpty());

        for (EndpointI proxyEndpoint : endpoints) {
            // Clear the timeout
            EndpointI endpoint = proxyEndpoint.timeout(-1);
            List<ConnectionI> connectionList = _connectionsByEndpoint.get(endpoint);
            if (connectionList == null) {
                continue;
            }

            for (ConnectionI connection : connectionList) {
                // Don't return destroyed or un-validated connections
                if (connection.isActiveOrHolding()) {
                    if (defaultsAndOverrides.overrideCompress.isPresent()) {
                        compress.value = defaultsAndOverrides.overrideCompress.get();
                    } else {
                        compress.value = endpoint.compress();
                    }
                    return connection;
                }
            }
        }

        return null;
    }

    synchronized void setDefaultObjectAdapter(ObjectAdapter adapter) {
        _defaultObjectAdapter = adapter;
    }

    synchronized ObjectAdapter getDefaultObjectAdapter() {
        return _defaultObjectAdapter;
    }

    //
    // Must be called while synchronized.
    //
    private ConnectionI findConnection(
            List<ConnectorInfo> connectors, Holder<Boolean> compress) {
        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
        for (ConnectorInfo ci : connectors) {
            if (_pending.containsKey(ci.connector)) {
                continue;
            }

            List<ConnectionI> connectionList = _connections.get(ci.connector);
            if (connectionList == null) {
                continue;
            }

            for (ConnectionI connection : connectionList) {
                // Don't return destroyed or un-validated connections
                if (connection.isActiveOrHolding()) {
                    if (defaultsAndOverrides.overrideCompress.isPresent()) {
                        compress.value = defaultsAndOverrides.overrideCompress.get();
                    } else {
                        compress.value = ci.endpoint.compress();
                    }
                    return connection;
                }
            }
        }

        return null;
    }

    private synchronized void incPendingConnectCount() {
        //
        // Keep track of the number of pending connects. The outgoing connection factory
        // waitUntilFinished() method waits for all the pending connects to terminate before
        // to return. This ensures that the communicator client thread pool isn't destroyed
        // too soon and will still be available to execute the ice_exception() callbacks for
        // the asynchronous requests waiting on a connection to be established.
        //

        if (_destroyed) {
            throw new CommunicatorDestroyedException();
        }
        ++_pendingConnectCount;
    }

    private synchronized void decPendingConnectCount() {
        --_pendingConnectCount;
        assert (_pendingConnectCount >= 0);
        if (_destroyed && _pendingConnectCount == 0) {
            notifyAll();
        }
    }

    private ConnectionI getConnection(
            List<ConnectorInfo> connectors,
            ConnectCallback cb,
            Holder<Boolean> compress) {
        assert (cb != null);
        synchronized (this) {
            if (_destroyed) {
                throw new CommunicatorDestroyedException();
            }

            // Search for an existing connections matching one of the given endpoints.
            ConnectionI connection = findConnection(connectors, compress);
            if (connection != null) {
                return connection;
            }

            if (addToPending(cb, connectors)) {
                // A connection to one of our endpoints is pending. The callback will be notified
                // once the connection is established. Returning null indicates that the connection
                // is still pending.
                return null;
            }
        }

        // No connection is pending. Call nextConnector to initiate connection establishment. Return
        // null to indicate that the connection is still pending.
        cb.nextConnector();
        return null;
    }

    private synchronized ConnectionI createConnection(Transceiver transceiver, ConnectorInfo ci) {
        assert (_pending.containsKey(ci.connector) && transceiver != null);

        //
        // Create and add the connection to the connection map. Adding the connection to the map
        // is necessary to support the interruption of the connection initialization and validation
        // in case the communicator is destroyed.
        //
        ConnectionI connection = null;
        try {
            if (_destroyed) {
                throw new CommunicatorDestroyedException();
            }

            connection =
                new ConnectionI(
                    _communicator,
                    _instance,
                    transceiver,
                    ci.connector,
                    ci.endpoint.compress(false).timeout(-1),
                    _defaultObjectAdapter,
                    this::removeConnection,
                    _connectionOptions);
        } catch (LocalException ex) {
            try {
                transceiver.close();
            } catch (LocalException exc) {
                // Ignore
            }
            throw ex;
        }

        _connections.putOne(ci.connector, connection);
        _connectionsByEndpoint.putOne(connection.endpoint(), connection);
        _connectionsByEndpoint.putOne(connection.endpoint().compress(true), connection);
        return connection;
    }

    private void finishGetConnection(
            List<ConnectorInfo> connectors,
            ConnectorInfo ci,
            ConnectionI connection,
            ConnectCallback cb) {
        Set<ConnectCallback> connectionCallbacks = new HashSet<>();
        if (cb != null) {
            connectionCallbacks.add(cb);
        }

        Set<ConnectCallback> callbacks = new HashSet<>();
        synchronized (this) {
            for (ConnectorInfo c : connectors) {
                Set<ConnectCallback> cbs = _pending.remove(c.connector);
                if (cbs != null) {
                    for (ConnectCallback cc : cbs) {
                        if (cc.hasConnector(ci)) {
                            connectionCallbacks.add(cc);
                        } else {
                            callbacks.add(cc);
                        }
                    }
                }
            }

            for (ConnectCallback cc : connectionCallbacks) {
                cc.removeFromPending();
                callbacks.remove(cc);
            }
            for (ConnectCallback cc : callbacks) {
                cc.removeFromPending();
            }
            notifyAll();
        }

        boolean compress;
        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
        if (defaultsAndOverrides.overrideCompress.isPresent()) {
            compress = defaultsAndOverrides.overrideCompress.get();
        } else {
            compress = ci.endpoint.compress();
        }

        for (ConnectCallback cc : callbacks) {
            cc.getConnection();
        }
        for (ConnectCallback cc : connectionCallbacks) {
            cc.setConnection(connection, compress);
        }
    }

    private void finishGetConnection(
            List<ConnectorInfo> connectors, LocalException ex, ConnectCallback cb) {
        Set<ConnectCallback> failedCallbacks = new HashSet<>();
        if (cb != null) {
            failedCallbacks.add(cb);
        }

        Set<ConnectCallback> callbacks = new HashSet<>();
        synchronized (this) {
            for (ConnectorInfo c : connectors) {
                Set<ConnectCallback> cbs = _pending.remove(c.connector);
                if (cbs != null) {
                    for (ConnectCallback cc : cbs) {
                        if (cc.removeConnectors(connectors)) {
                            failedCallbacks.add(cc);
                        } else {
                            callbacks.add(cc);
                        }
                    }
                }
            }

            for (ConnectCallback cc : callbacks) {
                assert (!failedCallbacks.contains(cc));
                cc.removeFromPending();
            }
            notifyAll();
        }

        for (ConnectCallback cc : callbacks) {
            cc.getConnection();
        }
        for (ConnectCallback cc : failedCallbacks) {
            cc.setException(ex);
        }
    }

    private boolean addToPending(ConnectCallback cb, List<ConnectorInfo> connectors) {
        //
        // Add the callback to each connector pending list.
        //
        boolean found = false;
        for (ConnectorInfo p : connectors) {
            Set<ConnectCallback> cbs = _pending.get(p.connector);
            if (cbs != null) {
                found = true;
                if (cb != null) {
                    cbs.add(cb); // Add the callback to each pending connector.
                }
            }
        }

        if (found) {
            return true;
        }

        //
        // If there's no pending connection for the given connectors, we're responsible for its
        // establishment. We add empty pending lists, other callbacks to the same connectors will be
        // queued.
        //
        for (ConnectorInfo p : connectors) {
            if (!_pending.containsKey(p.connector)) {
                _pending.put(p.connector, new HashSet<>());
            }
        }

        return false;
    }

    private void removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors) {
        for (ConnectorInfo p : connectors) {
            Set<ConnectCallback> cbs = _pending.get(p.connector);
            if (cbs != null) {
                cbs.remove(cb);
            }
        }
    }

    private void handleConnectionException(LocalException ex, boolean hasMore) {
        TraceLevels traceLevels = _instance.traceLevels();
        if (traceLevels.network >= 2) {
            StringBuilder s = new StringBuilder(128);
            s.append("connection to endpoint failed");
            if (ex instanceof CommunicatorDestroyedException) {
                s.append("\n");
            } else {
                if (hasMore) {
                    s.append(", trying next endpoint\n");
                } else {
                    s.append(" and no more endpoints to try\n");
                }
            }
            s.append(ex.toString());
            _instance.initializationData().logger.trace(traceLevels.networkCat, s.toString());
        }
    }

    private void handleException(LocalException ex, boolean hasMore) {
        TraceLevels traceLevels = _instance.traceLevels();
        if (traceLevels.network >= 2) {
            StringBuilder s = new StringBuilder(128);
            s.append("couldn't resolve endpoint host");
            if (ex instanceof CommunicatorDestroyedException) {
                s.append("\n");
            } else {
                if (hasMore) {
                    s.append(", trying next endpoint\n");
                } else {
                    s.append(" and no more endpoints to try\n");
                }
            }
            s.append(ex.toString());
            _instance.initializationData().logger.trace(traceLevels.networkCat, s.toString());
        }
    }

    private synchronized void removeConnection(ConnectionI connection) {
        if (_destroyed) {
            return;
        }

        _connections.remove(connection.connector(), connection);
        _connectionsByEndpoint.remove(connection.endpoint(), connection);
        _connectionsByEndpoint.remove(connection.endpoint().compress(true), connection);
    }

    private static class ConnectorInfo {
        public ConnectorInfo(Connector c, EndpointI e) {
            connector = c;
            endpoint = e;
        }

        @Override
        public boolean equals(java.lang.Object obj) {
            ConnectorInfo r = (ConnectorInfo) obj;
            return connector.equals(r.connector);
        }

        @Override
        public int hashCode() {
            return connector.hashCode();
        }

        public Connector connector;
        public EndpointI endpoint;
    }

    private static class ConnectCallback
        implements ConnectionI.StartCallback, EndpointI_connectors {
        ConnectCallback(
                OutgoingConnectionFactory f,
                List<EndpointI> endpoints,
                boolean more,
                CreateConnectionCallback cb) {
            _factory = f;
            _endpoints = endpoints;
            _hasMore = more;
            _callback = cb;
            _endpointsIter = _endpoints.iterator();
        }

        //
        // Methods from ConnectionI.StartCallback
        //
        @Override
        public void connectionStartCompleted(ConnectionI connection) {
            if (_observer != null) {
                _observer.detach();
            }
            connection.activate();
            _factory.finishGetConnection(_connectors, _current, connection, this);
        }

        @Override
        public void connectionStartFailed(ConnectionI connection, LocalException ex) {
            assert (_current != null);
            if (connectionStartFailedImpl(ex)) {
                nextConnector();
            }
        }

        //
        // Methods from EndpointI_connectors
        //
        @Override
        public void connectors(List<Connector> cons) {
            for (Connector p : cons) {
                _connectors.add(new ConnectorInfo(p, _currentEndpoint));
            }

            if (_endpointsIter.hasNext()) {
                nextEndpoint();
            } else {
                assert (!_connectors.isEmpty());

                //
                // We now have all the connectors for the given endpoints. We can try to obtain the
                // connection.
                //
                _iter = _connectors.iterator();
                getConnection();
            }
        }

        @Override
        public void exception(LocalException ex) {
            _factory.handleException(ex, _hasMore || _endpointsIter.hasNext());
            if (_endpointsIter.hasNext()) {
                nextEndpoint();
            } else if (!_connectors.isEmpty()) {
                //
                // We now have all the connectors for the given endpoints. We can try to obtain the
                // connection.
                //
                _iter = _connectors.iterator();
                getConnection();
            } else {
                _callback.setException(ex);
                _factory.decPendingConnectCount(); // Must be called last.
            }
        }

        void setConnection(ConnectionI connection, boolean compress) {
            //
            // Callback from the factory: the connection to one of the callback
            // connectors has been established.
            //
            _callback.setConnection(connection, compress);
            _factory.decPendingConnectCount(); // Must be called last.
        }

        void setException(LocalException ex) {
            //
            // Callback from the factory: connection establishment failed.
            //
            _callback.setException(ex);
            _factory.decPendingConnectCount(); // Must be called last.
        }

        boolean hasConnector(ConnectorInfo ci) {
            return _connectors.contains(ci);
        }

        boolean removeConnectors(List<ConnectorInfo> connectors) {
            _connectors.removeAll(connectors);
            _iter = _connectors.iterator();
            return _connectors.isEmpty();
        }

        void removeFromPending() {
            _factory.removeFromPending(this, _connectors);
        }

        private void getConnectors() {
            try {
                //
                // Notify the factory that there's an async connect pending. This is necessary to
                // prevent the outgoing connection factory to be destroyed before all the pending
                // asynchronous connects are finished.
                //
                _factory.incPendingConnectCount();
            } catch (LocalException ex) {
                _callback.setException(ex);
                return;
            }

            nextEndpoint();
        }

        private void nextEndpoint() {
            try {
                assert (_endpointsIter.hasNext());
                _currentEndpoint = _endpointsIter.next();
                _currentEndpoint.connectors_async(this);
            } catch (LocalException ex) {
                exception(ex);
            }
        }

        private void getConnection() {
            try {
                //
                // If all the connectors have been created, we ask the factory to get a connection.
                //
                Holder<Boolean> compress = new Holder<>();
                ConnectionI connection = _factory.getConnection(_connectors, this, compress);
                if (connection == null) {
                    //
                    // A null return value from getConnection indicates that the connection
                    // is being established and that everything has been done to ensure that the
                    // callback will be notified when the connection establishment is done.
                    //
                    return;
                }

                _callback.setConnection(connection, compress.value);
                _factory.decPendingConnectCount(); // Must be called last.
            } catch (LocalException ex) {
                _callback.setException(ex);
                _factory.decPendingConnectCount(); // Must be called last.
            }
        }

        private void nextConnector() {
            while (true) {
                try {
                    assert (_iter.hasNext());
                    _current = _iter.next();

                    CommunicatorObserver observer =
                        _factory._instance.initializationData().observer;
                    if (observer != null) {
                        _observer =
                            observer.getConnectionEstablishmentObserver(
                                _current.endpoint, _current.connector.toString());
                        if (_observer != null) {
                            _observer.attach();
                        }
                    }

                    if (_factory._instance.traceLevels().network >= 2) {
                        StringBuffer s = new StringBuffer("trying to establish ");
                        s.append(_current.endpoint.protocol());
                        s.append(" connection to ");
                        s.append(_current.connector.toString());
                        _factory._instance
                            .initializationData()
                            .logger
                            .trace(_factory._instance.traceLevels().networkCat, s.toString());
                    }

                    ConnectionI connection =
                        _factory.createConnection(_current.connector.connect(), _current);
                    connection.start(this);
                } catch (LocalException ex) {
                    if (_factory._instance.traceLevels().network >= 2) {
                        StringBuffer s = new StringBuffer("failed to establish ");
                        s.append(_current.endpoint.protocol());
                        s.append(" connection to ");
                        s.append(_current.connector.toString());
                        s.append("\n");
                        s.append(ex);
                        _factory._instance
                            .initializationData()
                            .logger
                            .trace(_factory._instance.traceLevels().networkCat, s.toString());
                    }

                    if (connectionStartFailedImpl(ex)) {
                        continue;
                    }
                }
                break;
            }
        }

        private boolean connectionStartFailedImpl(LocalException ex) {
            if (_observer != null) {
                _observer.failed(ex.ice_id());
                _observer.detach();
            }

            _factory.handleConnectionException(ex, _hasMore || _iter.hasNext());

            // We stop on ConnectTimeoutException to fail reasonably fast when the endpoint has many
            // connectors (IP addresses).
            if (_iter.hasNext()
                && !(ex instanceof CommunicatorDestroyedException
                || ex instanceof ConnectTimeoutException)) {
                return true; // keep going
            }

            _factory.finishGetConnection(_connectors, ex, this);
            return false;
        }

        private final OutgoingConnectionFactory _factory;
        private final boolean _hasMore;
        private final CreateConnectionCallback _callback;
        private final List<EndpointI> _endpoints;
        private Iterator<EndpointI> _endpointsIter;
        private EndpointI _currentEndpoint;
        private List<ConnectorInfo> _connectors = new ArrayList<>();
        private Iterator<ConnectorInfo> _iter;
        private ConnectorInfo _current;
        private Observer _observer;
    }

    private Communicator _communicator;
    private final Instance _instance;
    private final ConnectionOptions _connectionOptions;
    private ObjectAdapter _defaultObjectAdapter;
    private boolean _destroyed;

    private final MultiHashMap<Connector, ConnectionI> _connections = new MultiHashMap<>();
    private final MultiHashMap<EndpointI, ConnectionI> _connectionsByEndpoint = new MultiHashMap<>();
    private final Map<Connector, HashSet<ConnectCallback>> _pending =
        new HashMap<>();
    private int _pendingConnectCount;
}