ConnectionI.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import com.zeroc.Ice.Instrumentation.ConnectionObserver;
import com.zeroc.Ice.Instrumentation.ConnectionState;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.channels.SelectableChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * @hidden Public because it's used by the 'Ice/metrics' test.
 */
public final class ConnectionI extends EventHandler implements Connection, CancellationHandler {
    public interface StartCallback {
        void connectionStartCompleted(ConnectionI connection);

        void connectionStartFailed(ConnectionI connection, LocalException ex);
    }

    public void start(StartCallback callback) {
        try {
            synchronized (this) {
                // The connection might already be closed if the communicator was destroyed.
                if (_state >= StateClosed) {
                    assert (_exception != null);
                    throw (LocalException) _exception.fillInStackTrace();
                }

                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None)) {
                    if (_connectTimeout > 0) {
                        // Schedules a one-time check.
                        _timer.schedule(this::connectTimedOut, _connectTimeout, TimeUnit.SECONDS);
                    }
                    _startCallback = callback;
                    return;
                }

                //
                // We start out in holding state.
                //
                setState(StateHolding);
            }
        } catch (LocalException ex) {
            exception(ex);
            callback.connectionStartFailed(this, _exception);
            return;
        }

        callback.connectionStartCompleted(this);
    }

    public void startAndWait() throws InterruptedException {
        try {
            synchronized (this) {
                // The connection might already be closed if the communicator was destroyed.
                if (_state >= StateClosed) {
                    assert (_exception != null);
                    throw (LocalException) _exception.fillInStackTrace();
                }

                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None)) {
                    while (_state <= StateNotValidated) {
                        wait();
                    }

                    if (_state >= StateClosing) {
                        assert (_exception != null);
                        throw (LocalException) _exception.fillInStackTrace();
                    }
                }

                //
                // We start out in holding state.
                //
                setState(StateHolding);
            }
        } catch (LocalException ex) {
            exception(ex);
            waitUntilFinished();
        }
    }

    public synchronized void activate() {
        if (_state <= StateNotValidated) {
            return;
        }

        setState(StateActive);
    }

    public synchronized void hold() {
        if (_state <= StateNotValidated) {
            return;
        }

        setState(StateHolding);
    }

    // DestructionReason.
    public static final int ObjectAdapterDeactivated = 0;
    public static final int CommunicatorDestroyed = 1;

    public synchronized void destroy(int reason) {
        switch (reason) {
            case ObjectAdapterDeactivated -> {
                setState(
                    StateClosing,
                    new ObjectAdapterDeactivatedException(_adapter != null ? _adapter.getName() : ""));
            }

            case CommunicatorDestroyed -> {
                setState(StateClosing, new CommunicatorDestroyedException());
            }
        }
    }

    @Override
    public synchronized void abort() {
        setState(
            StateClosed,
            new ConnectionAbortedException("connection aborted by the application", true));
    }

    @Override
    public void close() {
        synchronized (this) {
            if (_state < StateClosing) {
                if (_asyncRequests.isEmpty()) {
                    doApplicationClose();
                } else {
                    _closeRequested = true;
                    // we don't wait forever for outstanding invocations to complete
                    scheduleCloseTimer();
                }
            }
            // else nothing else to do, already closing or closed.

            // Wait until the connection has been closed.
            while (_state < StateClosed) {
                try {
                    wait();
                } catch (InterruptedException ex) {
                    throw new OperationInterruptedException(ex);
                }
            }

            if (!(_exception instanceof ConnectionClosedException
                || _exception instanceof CloseConnectionException
                || _exception instanceof CommunicatorDestroyedException
                || _exception instanceof ObjectAdapterDeactivatedException
                || _exception instanceof ObjectAdapterDestroyedException)) {
                assert (_exception != null);
                throw _exception;
            }
        }
    }

    public synchronized boolean isActiveOrHolding() {
        return _state > StateNotValidated && _state < StateClosing;
    }

    public synchronized boolean isFinished() {
        if (_state != StateFinished || _upcallCount != 0) {
            return false;
        }

        assert (_state == StateFinished);
        return true;
    }

    public synchronized void throwException() {
        if (_exception != null) {
            assert (_state >= StateClosing);
            throw (LocalException) _exception.fillInStackTrace();
        }
    }

    public synchronized void waitUntilHolding() throws InterruptedException {
        while (_state < StateHolding || _upcallCount > 0) {
            wait();
        }
    }

    public synchronized void waitUntilFinished() throws InterruptedException {
        //
        // We wait indefinitely until the connection is finished and all outstanding requests are
        // completed. Otherwise we couldn't
        // guarantee that there are no outstanding calls when deactivate()
        // is called on the servant locators.
        //
        while (_state < StateFinished || _upcallCount > 0) {
            wait();
        }

        assert (_state == StateFinished);

        //
        // Clear the OA. See bug 1673 for the details of why this is necessary.
        //
        _adapter = null;
    }

    public synchronized void updateObserver() {
        if (_state < StateNotValidated || _state > StateClosed) {
            return;
        }

        assert (_instance.initializationData().observer != null);
        _observer =
            _instance
                .initializationData()
                .observer
                .getConnectionObserver(
                    initConnectionInfo(),
                    _endpoint,
                    toConnectionState(_state),
                    _observer);
        if (_observer != null) {
            _observer.attach();
        } else {
            _writeStreamPos = -1;
            _readStreamPos = -1;
        }
    }

    public synchronized int sendAsyncRequest(
            OutgoingAsyncBase out, boolean compress, boolean response, int batchRequestNum)
        throws RetryException {
        final OutputStream os = out.getOs();

        if (_exception != null) {
            //
            // If the connection is closed before we even have a chance to send our request, we
            // always try to send the request again.
            //
            throw new RetryException((LocalException) _exception.fillInStackTrace());
        }

        assert (_state > StateNotValidated);
        assert (_state < StateClosing);

        //
        // Ensure the message isn't bigger than what we can send with the transport.
        //
        _transceiver.checkSendSize(os.getBuffer());

        //
        // Notify the request that it's cancelable with this connection. This will throw if the
        // request is canceled.
        //
        out.cancelable(this);

        int requestId = 0;
        if (response) {
            //
            // Create a new unique request ID.
            //
            requestId = _nextRequestId++;
            if (requestId <= 0) {
                _nextRequestId = 1;
                requestId = _nextRequestId++;
            }

            //
            // Fill in the request ID.
            //
            os.pos(Protocol.headerSize);
            os.writeInt(requestId);
        } else if (batchRequestNum > 0) {
            os.pos(Protocol.headerSize);
            os.writeInt(batchRequestNum);
        }

        out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);

        // We're just about to send a request, so we are not inactive anymore.
        cancelInactivityTimer();

        int status;
        try {
            status = sendMessage(new OutgoingMessage(out, os, compress, requestId));
        } catch (LocalException ex) {
            setState(StateClosed, ex);
            assert (_exception != null);
            throw (LocalException) _exception.fillInStackTrace();
        }

        if (response) {
            //
            // Add to the async requests map.
            //
            _asyncRequests.put(requestId, out);
        }
        return status;
    }

    public BatchRequestQueue getBatchRequestQueue() {
        return _batchRequestQueue;
    }

    @Override
    public void flushBatchRequests(CompressBatch compressBatch) {
        _iceI_flushBatchRequestsAsync(compressBatch).waitForResponse();
    }

    @Override
    public CompletableFuture<Void> flushBatchRequestsAsync(CompressBatch compressBatch) {
        return _iceI_flushBatchRequestsAsync(compressBatch);
    }

    private ConnectionFlushBatch _iceI_flushBatchRequestsAsync(CompressBatch compressBatch) {
        var f = new ConnectionFlushBatch(this, _communicator, _instance);
        f.invoke(compressBatch);
        return f;
    }

    @Override
    public synchronized void setCloseCallback(final CloseCallback callback) {
        if (_state >= StateClosed) {
            if (callback != null) {
                _threadPool.dispatch(
                    new RunnableThreadPoolWorkItem(this) {
                        @Override
                        public void run() {
                            try {
                                callback.closed(ConnectionI.this);
                            } catch (LocalException ex) {
                                _logger.error(
                                    "connection callback exception:\n" + ex + '\n' + _desc);
                            }
                        }
                    });
            }
        } else {
            _closeCallback = callback;
        }
    }

    @Override
    public synchronized void disableInactivityCheck() {
        cancelInactivityTimer();
        _inactivityTimeout = 0;
    }

    @Override
    public synchronized void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex) {
        if (_state >= StateClosed) {
            return; // The request has already been or will be shortly notified of the failure.
        }

        Iterator<OutgoingMessage> it = _sendStreams.iterator();
        while (it.hasNext()) {
            OutgoingMessage o = it.next();
            if (o.outAsync == outAsync) {
                if (o.requestId > 0) {
                    _asyncRequests.remove(o.requestId);
                }

                if (ex instanceof ConnectionAbortedException) {
                    setState(StateClosed, ex);
                } else {
                    //
                    // If the request is being sent, don't remove it from the send streams, it will
                    // be removed once the sending is finished.
                    //
                    // Note that since we swapped the message stream to _writeStream
                    // it's fine if the OutgoingAsync output stream is released (and
                    // as long as canceled requests cannot be retried).
                    //
                    o.canceled();
                    if (o != _sendStreams.getFirst()) {
                        it.remove();
                    }
                    if (outAsync.completed(ex)) {
                        outAsync.invokeCompletedAsync();
                    }
                }

                if (_closeRequested && _state < StateClosing && _asyncRequests.isEmpty()) {
                    doApplicationClose();
                }
                return;
            }
        }

        if (outAsync instanceof OutgoingAsync) {
            Iterator<OutgoingAsyncBase> it2 = _asyncRequests.values().iterator();
            while (it2.hasNext()) {
                if (it2.next() == outAsync) {
                    if (ex instanceof ConnectionAbortedException) {
                        setState(StateClosed, ex);
                    } else {
                        it2.remove();
                        if (outAsync.completed(ex)) {
                            outAsync.invokeCompletedAsync();
                        }
                    }

                    if (_closeRequested && _state < StateClosing && _asyncRequests.isEmpty()) {
                        doApplicationClose();
                    }
                    return;
                }
            }
        }
    }

    public EndpointI endpoint() {
        return _endpoint; // No mutex protection necessary, _endpoint is
        // immutable.
    }

    public Connector connector() {
        return _connector; // No mutex protection necessary, _connector is
        // immutable.
    }

    @Override
    public void setAdapter(ObjectAdapter adapter) {
        if (_connector == null) { // server connection
            throw new UnsupportedOperationException(
                "setAdapter can only be called on a client connection");
        }

        if (adapter != null) {
            // Go through the adapter to set the adapter on this connection to ensure the object
            // adapter is still active and to ensure proper locking order.
            adapter.setAdapterOnConnection(this);
        } else {
            synchronized (this) {
                if (_state <= StateNotValidated || _state >= StateClosing) {
                    return;
                }
                _adapter = null;
            }
        }

        //
        // We never change the thread pool with which we were initially registered, even if we add
        // or remove an object adapter.
        //
    }

    @Override
    public synchronized ObjectAdapter getAdapter() {
        return _adapter;
    }

    @Override
    public Endpoint getEndpoint() {
        return _endpoint; // No mutex protection necessary, _endpoint is
        // immutable.
    }

    @Override
    public ObjectPrx createProxy(Identity ident) {
        //
        // Create a reference and return a reverse proxy for this reference.
        //
        var ref = _instance.referenceFactory().create(ident, this);
        return ref == null ? null : new _ObjectPrxI(ref);
    }

    synchronized void setAdapterFromAdapter(ObjectAdapter adapter) {
        if (_state <= StateNotValidated || _state >= StateClosing) {
            return;
        }
        assert (adapter != null); // Called by ObjectAdapter::setAdapterOnConnection
        _adapter = adapter;
    }

    //
    // Operations from EventHandler
    //
    @Override
    public void message(ThreadPoolCurrent current) {
        StartCallback startCB = null;
        List<OutgoingMessage> sentCBs = null;
        MessageInfo info = null;
        int upcallCount = 0;

        synchronized (this) {
            if (_state >= StateClosed) {
                return;
            }

            if (!current.ioReady()) {
                return;
            }

            try {
                int writeOp = SocketOperation.None;
                int readOp = SocketOperation.None;

                // If writes are ready, write the data from the connection's write buffer
                // (_writeStream)
                if ((current.operation & SocketOperation.Write) != 0) {
                    final Buffer buf = _writeStream.getBuffer();
                    if (_observer != null) {
                        observerStartWrite(buf);
                    }
                    writeOp = write(buf);
                    if (_observer != null && (writeOp & SocketOperation.Write) == 0) {
                        observerFinishWrite(buf);
                    }
                }

                // If reads are ready, read the data into the connection's read buffer
                // (_readStream). The data is read until:
                // - the full message is read (the transport read returns SocketOperationNone)
                // and the read buffer is fully filled - the read operation on the transport can't
                // continue without blocking
                if ((current.operation & SocketOperation.Read) != 0) {
                    while (true) {
                        final Buffer buf = _readStream.getBuffer();
                        if (_observer != null && !_readHeader) {
                            observerStartRead(buf);
                        }

                        readOp = read(buf);
                        if ((readOp & SocketOperation.Read) != 0) {
                            // Can't continue without blocking, exit out of the loop.
                            break;
                        }
                        if (_observer != null && !_readHeader) {
                            assert (!buf.b.hasRemaining());
                            observerFinishRead(buf);
                        }

                        // If read header is true, we're reading a new Ice protocol message and we
                        // need to read the message header.
                        if (_readHeader) {
                            // The next read will read the remainder of the message.
                            _readHeader = false;

                            if (_observer != null) {
                                _observer.receivedBytes(Protocol.headerSize);
                            }

                            //
                            // Connection is validated on first message. This is only used by
                            // setState() to check whether or not we can print a connection
                            // warning (a client might close the connection forcefully if the
                            // connection isn't validated, we don't want to print a warning
                            // in this case).
                            //
                            _validated = true;

                            // Full header should be read because the size of _readStream is always
                            // headerSize (14) when reading a new message (see the code that sets
                            // _readHeader = true).
                            int pos = _readStream.pos();
                            if (pos < Protocol.headerSize) {
                                //
                                // This situation is possible for small UDP packets.
                                //
                                throw new MarshalException(
                                    "Received Ice message with too few bytes in header.");
                            }

                            // Decode the header.
                            _readStream.pos(0);
                            byte[] m = new byte[4];
                            m[0] = _readStream.readByte();
                            m[1] = _readStream.readByte();
                            m[2] = _readStream.readByte();
                            m[3] = _readStream.readByte();
                            if (m[0] != Protocol.magic[0]
                                || m[1] != Protocol.magic[1]
                                || m[2] != Protocol.magic[2]
                                || m[3] != Protocol.magic[3]) {
                                throw new ProtocolException(
                                    "Bad magic in message header: "
                                        + Integer.toHexString(m[0])
                                        + " "
                                        + Integer.toHexString(m[1])
                                        + " "
                                        + Integer.toHexString(m[2])
                                        + " "
                                        + Integer.toHexString(m[3]));
                            }

                            _readProtocol.ice_readMembers(_readStream);
                            Protocol.checkSupportedProtocol(_readProtocol);

                            _readProtocolEncoding.ice_readMembers(_readStream);
                            Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);

                            _readStream.readByte(); // messageType
                            _readStream.readByte(); // compress
                            int size = _readStream.readInt();
                            if (size < Protocol.headerSize) {
                                throw new MarshalException(
                                    "Received Ice message with unexpected size " + size + ".");
                            }

                            // Resize the read buffer to the message size.
                            if (size > _messageSizeMax) {
                                Ex.throwMemoryLimitException(size, _messageSizeMax);
                            }
                            if (size > _readStream.size()) {
                                _readStream.resize(size);
                            }
                            _readStream.pos(pos);
                        }

                        if (_readStream.pos() != _readStream.size()) {
                            if (_endpoint.datagram()) {
                                // The message was truncated.
                                throw new DatagramLimitException();
                            }
                            continue;
                        }
                        break;
                    }
                }

                // readOp and writeOp are set to the operations that the transport read or write
                // calls from above returned. They indicate which operations will need to be
                // monitored by the thread pool's selector when this method returns.
                int newOp = readOp | writeOp;

                // Operations that are ready. For example, if message was called with
                // SocketOperationRead and the transport read returned SocketOperationNone,
                // reads are considered done: there's no additional data to read.
                int readyOp = current.operation & ~newOp;

                if (_state <= StateNotValidated) {
                    // If the connection is still not validated and there's still data to read or
                    // write, continue waiting for data to read or write.
                    if (newOp != 0) {
                        _threadPool.update(this, current.operation, newOp);
                        return;
                    }

                    // Initialize the connection if it's not initialized yet.
                    if (_state == StateNotInitialized && !initialize(current.operation)) {
                        return;
                    }

                    // Validate the connection if it's not validate yet.
                    if (_state <= StateNotValidated && !validate(current.operation)) {
                        return;
                    }

                    // The connection is validated and doesn't need additional data to be read or
                    // written. So unregister it from the thread pool's selector.
                    _threadPool.unregister(this, current.operation);

                    // The connection starts in the holding state. It will be activated by the
                    // connection factory.
                    setState(StateHolding);
                    if (_startCallback != null) {
                        startCB = _startCallback;
                        _startCallback = null;
                        if (startCB != null) {
                            ++upcallCount;
                        }
                    }
                } else { // The connection is active or waits for the CloseConnection message.
                    assert (_state <= StateClosingPending);

                    //
                    // We parse messages first, if we receive a close connection message we won't
                    // send more messages.
                    //
                    if ((readyOp & SocketOperation.Read) != 0) {
                        // Optimization: use the thread's stream.
                        info = new MessageInfo(current.stream);

                        // At this point, the protocol message is fully read and can therefore be
                        // decoded by parseMessage. parseMessage returns the operation to wait for
                        // readiness next.
                        newOp |= parseMessage(info);
                        upcallCount += info.upcallCount;
                    }

                    if ((readyOp & SocketOperation.Write) != 0) {
                        // At this point the message from _writeStream is fully written and the next
                        // message can be written.
                        sentCBs = new LinkedList<>();
                        newOp |= sendNextMessage(sentCBs);
                        if (!sentCBs.isEmpty()) {
                            ++upcallCount;
                        } else {
                            sentCBs = null;
                        }
                    }

                    // If the connection is not closed yet, we update the thread pool selector to
                    // wait for readiness of read, write or both operations.
                    if (_state < StateClosed) {
                        _threadPool.update(this, current.operation, newOp);
                    }
                }

                if (upcallCount == 0) {
                    return; // Nothing to dispatch we're done!
                }

                _upcallCount += upcallCount;

                // There's something to dispatch so we mark IO as completed to elect a new leader
                // thread and let IO be performed on this new leader thread while this thread
                // continues with dispatching the up-calls.
                current.ioCompleted();
            } catch (DatagramLimitException expected) {
                if (_warnUdp) {
                    _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded");
                }
                _readStream.resize(Protocol.headerSize);
                _readStream.pos(0);
                _readHeader = true;
                return;
            } catch (SocketException ex) {
                setState(StateClosed, ex);
                return;
            } catch (LocalException ex) {
                if (_endpoint.datagram()) {
                    if (_warn) {
                        String s = "datagram connection exception:\n" + ex + '\n' + _desc;
                        _logger.warning(s);
                    }
                    _readStream.resize(Protocol.headerSize);
                    _readStream.pos(0);
                    _readHeader = true;
                } else {
                    setState(StateClosed, ex);
                }
                return;
            }
        }

        if (!_executor) {
            // Optimization, call upcall() directly if there's no executor.
            upcall(startCB, sentCBs, info);
        } else {
            if (info != null) {
                //
                // Create a new stream for the dispatch instead of using the thread pool's thread
                // stream.
                //
                assert (info.stream == current.stream);
                InputStream stream = info.stream;
                info.stream =
                    new InputStream(
                        _instance,
                        Protocol.currentProtocolEncoding,
                        _instance.cacheMessageBuffers() > 1);
                info.stream.swap(stream);
            }
            final StartCallback finalStartCB = startCB;
            final List<OutgoingMessage> finalSentCBs = sentCBs;
            final MessageInfo finalInfo = info;
            _threadPool.executeFromThisThread(
                new RunnableThreadPoolWorkItem(this) {
                    @Override
                    public void run() {
                        upcall(finalStartCB, finalSentCBs, finalInfo);
                    }
                });
        }
    }

    protected void upcall(StartCallback startCB, List<OutgoingMessage> sentCBs, MessageInfo info) {
        int dispatchedCount = 0;

        //
        // Notify the factory that the connection establishment and validation has completed.
        //
        if (startCB != null) {
            startCB.connectionStartCompleted(this);
            ++dispatchedCount;
        }

        //
        // Notify AMI calls that the message was sent.
        //
        if (sentCBs != null) {
            for (OutgoingMessage msg : sentCBs) {
                msg.outAsync.invokeSent();
            }
            ++dispatchedCount;
        }

        if (info != null) {
            //
            // Asynchronous replies must be handled outside the thread synchronization, so that
            // nested calls are possible.
            //
            if (info.outAsync != null) {
                info.outAsync.invokeCompleted();
                ++dispatchedCount;
            }

            //
            // Method invocation (or multiple invocations for batch messages) must be done outside
            // the thread synchronization, so that nested calls are possible.
            //
            if (info.requestCount > 0) {
                dispatchAll(
                    info.stream,
                    info.requestCount,
                    info.requestId,
                    info.compress,
                    info.adapter);

                //
                // Don't increase dispatchedCount, the dispatch count is decreased when the incoming
                // reply is sent.
                //
            }
        }

        //
        // Decrease dispatch count.
        //
        if (dispatchedCount > 0) {
            boolean finished = false;

            synchronized (this) {
                _upcallCount -= dispatchedCount;
                if (_upcallCount == 0) {
                    //
                    // Only initiate shutdown if not already done. It might have already been done
                    // if the sent callback or AMI callback was dispatched when the connection was
                    // already in the closing state.
                    //
                    if (_state == StateClosing) {
                        try {
                            initiateShutdown();
                        } catch (LocalException ex) {
                            setState(StateClosed, ex);
                        }
                    } else if (_state == StateFinished) {
                        finished = true;
                        if (_observer != null) {
                            _observer.detach();
                        }
                    }
                    notifyAll();
                }
            }

            if (finished && _removeFromFactory != null) {
                _removeFromFactory.accept(this);
            }
        }
    }

    @Override
    public void finished(ThreadPoolCurrent current, final boolean close) {
        // Lock the connection here to ensure setState() completes before
        // the code below is executed. This method can be called by the
        // thread pool as soon as setState() calls _threadPool->finish(...).
        // There's no need to lock the mutex for the remainder of the code
        // because the data members accessed by finish() are immutable once
        // _state == StateClosed (and we don't want to hold the mutex when
        // calling upcalls).
        synchronized (this) {
            assert _state == StateClosed;
        }

        //
        // If there are no callbacks to call, we don't call ioCompleted() since
        // we're not going to call code that will potentially block (this avoids promoting a new
        // leader and unnecessary thread creation, especially if this is called on shutdown).
        //
        if (_startCallback == null
            && _sendStreams.isEmpty()
            && _asyncRequests.isEmpty()
            && _closeCallback == null) {
            finish(close);
            return;
        }

        current.ioCompleted();
        if (!_executor) {
            // Optimization, call finish() directly if there's no executor.
            finish(close);
        } else {
            _threadPool.executeFromThisThread(
                new RunnableThreadPoolWorkItem(this) {
                    @Override
                    public void run() {
                        finish(close);
                    }
                });
        }
    }

    public void finish(boolean close) {
        if (!_initialized) {
            if (_instance.traceLevels().network >= 2) {
                StringBuffer s = new StringBuffer("failed to ");
                s.append(_connector != null ? "establish" : "accept");
                s.append(" ");
                s.append(_endpoint.protocol());
                s.append(" connection\n");
                s.append(toString());
                s.append("\n");
                s.append(_exception);
                _instance
                    .initializationData()
                    .logger
                    .trace(_instance.traceLevels().networkCat, s.toString());
            }
        } else {
            if (_instance.traceLevels().network >= 1) {
                StringBuffer s = new StringBuffer("closed ");
                s.append(_endpoint.protocol());
                s.append(" connection\n");
                s.append(toString());

                // Trace the cause of most connection closures.
                if (!(_exception instanceof CommunicatorDestroyedException
                    || _exception instanceof ObjectAdapterDeactivatedException
                    || _exception instanceof ObjectAdapterDestroyedException)) {
                    s.append("\n");
                    s.append(_exception);
                }
                _instance
                    .initializationData()
                    .logger
                    .trace(_instance.traceLevels().networkCat, s.toString());
            }
        }

        if (close) {
            try {
                _transceiver.close();
            } catch (LocalException ex) {
                StringWriter sw = new StringWriter();
                PrintWriter pw = new PrintWriter(sw);
                ex.printStackTrace(pw);
                pw.flush();
                String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString();
                _instance.initializationData().logger.error(s);
            }
        }

        if (_startCallback != null) {
            _startCallback.connectionStartFailed(this, _exception);
            _startCallback = null;
        }

        if (!_sendStreams.isEmpty()) {
            if (!_writeStream.isEmpty()) {
                //
                // Return the stream to the outgoing call. This is important for retriable AMI calls
                // which are not marshaled again.
                //
                OutgoingMessage message = _sendStreams.getFirst();
                _writeStream.swap(message.stream);
            }

            for (OutgoingMessage p : _sendStreams) {
                p.completed(_exception);

                // Make sure finished isn't called twice.
                if (p.requestId > 0) {
                    _asyncRequests.remove(p.requestId);
                }
            }
            _sendStreams.clear();
        }

        for (OutgoingAsyncBase p : _asyncRequests.values()) {
            if (p.completed(_exception)) {
                p.invokeCompleted();
            }
        }
        _asyncRequests.clear();

        //
        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
        //
        _writeStream.clear();
        _writeStream.getBuffer().clear();
        _readStream.clear();
        _readStream.getBuffer().clear();

        if (_closeCallback != null) {
            try {
                _closeCallback.closed(this);
            } catch (LocalException ex) {
                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
            }
            _closeCallback = null;
        }

        //
        // This must be done last as this will cause waitUntilFinished() to
        // return (and communicator objects such as the timer might be destroyed too).
        //
        boolean finished = false;
        synchronized (this) {
            setState(StateFinished);

            if (_upcallCount == 0) {
                finished = true;
                if (_observer != null) {
                    _observer.detach();
                }
            }
        }

        if (finished && _removeFromFactory != null) {
            _removeFromFactory.accept(this);
        }
    }

    @Override
    public String toString() {
        return _toString();
    }

    @Override
    public SelectableChannel fd() {
        return _transceiver.fd();
    }

    @Override
    public void setReadyCallback(ReadyCallback callback) {
        _transceiver.setReadyCallback(callback);
    }

    @Override
    public String type() {
        return _type; // No mutex lock, _type is immutable.
    }

    @Override
    public synchronized ConnectionInfo getInfo() {
        if (_state >= StateClosed) {
            throw (LocalException) _exception.fillInStackTrace();
        }
        return initConnectionInfo();
    }

    @Override
    public synchronized void setBufferSize(int rcvSize, int sndSize) {
        if (_state >= StateClosed) {
            throw (LocalException) _exception.fillInStackTrace();
        }
        _transceiver.setBufferSize(rcvSize, sndSize);
        _info = null; // Invalidate the cached connection info
    }

    @Override
    public String _toString() {
        return _desc; // No mutex lock, _desc is immutable.
    }

    public synchronized void exception(LocalException ex) {
        setState(StateClosed, ex);
    }

    public ThreadPool getThreadPool() {
        return _threadPool;
    }

    public synchronized void idleCheck(int idleTimeout) {
        if (_state == StateActive && _idleTimeoutTransceiver.isIdleCheckEnabled()) {
            setState(
                StateClosed,
                new ConnectionAbortedException(
                    "Connection aborted by the idle check because it did not receive any"
                        + " bytes for "
                        + idleTimeout
                        + "s.",
                    false));
        }
        // else nothing to do
    }

    public synchronized void sendHeartbeat() {
        assert !_endpoint.datagram();

        if (_state == StateActive || _state == StateHolding || _state == StateClosing) {

            // We check if the connection has become inactive.
            if (_inactivityTimerFuture == null // timer not already scheduled
                && _inactivityTimeout > 0 // inactivity timeout is enabled
                && _state == StateActive // only schedule the timer if the connection is active
                && _dispatchCount == 0 // no pending dispatch
                && _asyncRequests.isEmpty() // no pending invocation
                && _readHeader // we're not waiting for the remainder of an incoming message
                && _sendStreams.size() <= 1 // there is at most one pending outgoing message
            ) {
                // We may become inactive while the peer is back-pressuring us. In this case, we
                // only schedule the inactivity timer if there is no pending outgoing message or the
                // pending outgoing message is a heartbeat.

                // The stream of the first _sendStreams message is in _writeStream.
                if (_sendStreams.isEmpty()
                    || _writeStream.getBuffer().b.get(8) == Protocol.validateConnectionMsg) {
                    scheduleInactivityTimer();
                }
            }

            // We send a heartbeat to the peer to generate a "write" on the connection. This write
            // in turns creates a read on the peer, and resets the peer's idle check timer. When
            // _sendStream is not empty, there is already an outstanding write, so we don't need to
            // send a heartbeat. It's possible the first message of _sendStreams was already sent
            // but not yet removed from _sendStreams: it means the last write occurred very
            // recently, which is good enough with respect to the idle check.
            // As a result of this optimization, the only possible heartbeat in _sendStreams is the
            // first _sendStreams message.
            if (_sendStreams.isEmpty()) {
                OutputStream os = new OutputStream(Protocol.currentProtocolEncoding);
                os.writeBlob(Protocol.magic);
                Protocol.currentProtocol.ice_writeMembers(os);
                Protocol.currentProtocolEncoding.ice_writeMembers(os);
                os.writeByte(Protocol.validateConnectionMsg);
                os.writeByte((byte) 0);
                os.writeInt(Protocol.headerSize); // Message size.

                try {
                    sendMessage(new OutgoingMessage(os, false));
                } catch (LocalException ex) {
                    setState(StateClosed, ex);
                }
            }
        }
        // else, nothing to do.
    }

    ConnectionI(
            Communicator communicator,
            Instance instance,
            Transceiver transceiver,
            Connector connector,
            EndpointI endpoint,
            ObjectAdapter adapter,
            Consumer<ConnectionI> removeFromFactory, // can be null
            ConnectionOptions options) {
        _communicator = communicator;
        _instance = instance;
        _desc = transceiver.toString();
        _type = transceiver.protocol();
        _connector = connector;
        _endpoint = endpoint;
        _adapter = adapter;
        final InitializationData initData = instance.initializationData();
        // Cached for better performance.
        _executor = initData.executor != null;
        _logger = initData.logger; // Cached for better performance.
        _traceLevels = instance.traceLevels(); // Cached for better performance.
        _connectTimeout = options.connectTimeout();
        _closeTimeout = options.closeTimeout(); // not used for datagram connections
        // suppress inactivity timeout for datagram connections
        _inactivityTimeout = endpoint.datagram() ? 0 : options.inactivityTimeout();
        _maxDispatches = options.maxDispatches();
        _timer = instance.timer();
        _removeFromFactory = removeFromFactory;
        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
        _warnUdp =
            instance.initializationData().properties.getIcePropertyAsInt("Ice.Warn.Datagrams")
                > 0;
        _nextRequestId = 1;
        _messageSizeMax = connector == null ? adapter.messageSizeMax() : instance.messageSizeMax();
        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
        _readStream =
            new InputStream(
                instance,
                Protocol.currentProtocolEncoding,
                instance.cacheMessageBuffers() > 1);
        _readHeader = false;
        _readStreamPos = -1;
        _writeStream = new OutputStream(); // temporary stream
        _writeStreamPos = -1;
        _upcallCount = 0;
        _state = StateNotInitialized;

        int compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
        if (compressionLevel < 1) {
            compressionLevel = 1;
        } else if (compressionLevel > 9) {
            compressionLevel = 9;
        }
        _compressionLevel = compressionLevel;

        if (options.idleTimeout() > 0 && !endpoint.datagram()) {
            _idleTimeoutTransceiver =
                new IdleTimeoutTransceiverDecorator(
                    transceiver,
                    this,
                    options.idleTimeout(),
                    options.enableIdleCheck(),
                    _instance.timer());
            transceiver = _idleTimeoutTransceiver;
        } else {
            _idleTimeoutTransceiver = null;
        }

        _transceiver = transceiver;

        try {
            if (connector == null) { // server connection
                assert adapter != null;
                _threadPool = adapter.getThreadPool();
            } else { // client connection
                _threadPool = _instance.clientThreadPool();
            }
            _threadPool.initialize(this);
        } catch (LocalException ex) {
            throw ex;
        } catch (Exception ex) {
            throw new SyscallException(ex);
        }
    }

    @SuppressWarnings({"deprecation", "nofinalizer"})
    @Override
    protected synchronized void finalize() throws Throwable {
        try {
            Assert.FinalizerAssert(_startCallback == null);
            Assert.FinalizerAssert(_state == StateFinished);
            Assert.FinalizerAssert(_upcallCount == 0);
            Assert.FinalizerAssert(_sendStreams.isEmpty());
            Assert.FinalizerAssert(_asyncRequests.isEmpty());
        } catch (Exception ex) {
        } finally {
            super.finalize();
        }
    }

    private static final int StateNotInitialized = 0;
    private static final int StateNotValidated = 1;
    private static final int StateActive = 2;
    private static final int StateHolding = 3;
    private static final int StateClosing = 4;
    private static final int StateClosingPending = 5;
    private static final int StateClosed = 6;
    private static final int StateFinished = 7;

    private void setState(int state, LocalException ex) {
        // If setState() is called with an exception, then only closed and closing states are permissible.
        assert state >= StateClosing;

        // Don't switch twice.
        if (_state == state) {
            return;
        }

        if (_exception == null) {
            // If we are in closed state, an exception must be set.
            assert (_state != StateClosed);

            _exception = ex;

            // We don't warn if we are not validated.
            if (_warn && _validated) {
                // Don't warn about certain expected exceptions.
                if (!(_exception instanceof CloseConnectionException
                    || _exception instanceof ConnectionClosedException
                    || _exception instanceof CommunicatorDestroyedException
                    || _exception instanceof ObjectAdapterDeactivatedException
                    || _exception instanceof ObjectAdapterDestroyedException
                    || (_exception instanceof ConnectionLostException
                    && _state >= StateClosing))) {
                    warning("connection exception", _exception);
                }
            }
        }

        // We must set the new state before we notify requests of any exceptions.
        // Otherwise new requests may retry on a connection that is not yet marked as closed or closing.
        setState(state);
    }

    private void setState(int state) {
        // We don't want to send close connection messages if the endpoint
        // only supports oneway transmission from client to server.
        if (_endpoint.datagram() && state == StateClosing) {
            state = StateClosed;
        }

        // Skip graceful shutdown if we are destroyed before validation.
        if (_state <= StateNotValidated && state == StateClosing) {
            state = StateClosed;
        }

        if (_state == state) {
            // Don't switch twice.
            return;
        }

        if (state > StateActive) {
            cancelInactivityTimer();
        }

        try {
            switch (state) {
                case StateNotInitialized -> {
                    assert false;
                }

                case StateNotValidated -> {
                    if (_state != StateNotInitialized) {
                        assert (_state == StateClosed);
                        return;
                    }
                }

                case StateActive -> {
                    // Can only switch from holding or not validated to active.
                    if (_state != StateHolding && _state != StateNotValidated) {
                        return;
                    }

                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches) {
                        _threadPool.register(this, SocketOperation.Read);
                        if (_idleTimeoutTransceiver != null) {
                            _idleTimeoutTransceiver.enableIdleCheck();
                        }
                    }
                    // else don't resume reading since we're at or over the _maxDispatches limit.
                }

                case StateHolding -> {
                    // Can only switch from active or not validated to holding.
                    if (_state != StateActive && _state != StateNotValidated) {
                        return;
                    }
                    if (_state == StateActive
                        && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)) {
                        _threadPool.unregister(this, SocketOperation.Read);
                        if (_idleTimeoutTransceiver != null) {
                            _idleTimeoutTransceiver.disableIdleCheck();
                        }
                    }
                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
                }

                case StateClosing, StateClosingPending -> {
                    // Can't change back from closing pending.
                    if (_state >= StateClosingPending) {
                        return;
                    }
                }

                case StateClosed -> {
                    if (_state == StateFinished) {
                        return;
                    }

                    _batchRequestQueue.destroy(_exception);

                    // Don't need to close now for connections so only close the transceiver if
                    // the selector requested it.
                    if (_threadPool.finish(this, false)) {
                        _transceiver.close();
                    }
                }

                case StateFinished -> {
                    assert (_state == StateClosed);
                    _communicator = null;
                }
            }
        } catch (LocalException ex) {
            StringWriter sw = new StringWriter();
            PrintWriter pw = new PrintWriter(sw);
            ex.printStackTrace(pw);
            pw.flush();
            String s = "unexpected connection exception:\n " + _desc + "\n" + sw.toString();
            _instance.initializationData().logger.error(s);
        }

        if (_instance.initializationData().observer != null) {
            ConnectionState oldState = toConnectionState(_state);
            ConnectionState newState = toConnectionState(state);
            if (oldState != newState) {
                _observer =
                    _instance
                        .initializationData()
                        .observer
                        .getConnectionObserver(
                            initConnectionInfo(), _endpoint, newState, _observer);
                if (_observer != null) {
                    _observer.attach();
                } else {
                    _writeStreamPos = -1;
                    _readStreamPos = -1;
                }
            }
            if (_observer != null && state == StateClosed && _exception != null) {
                if (!(_exception instanceof CloseConnectionException
                    || _exception instanceof ConnectionClosedException
                    || _exception instanceof CommunicatorDestroyedException
                    || _exception instanceof ObjectAdapterDeactivatedException
                    || _exception instanceof ObjectAdapterDestroyedException
                    || (_exception instanceof ConnectionLostException
                    && _state >= StateClosing))) {
                    _observer.failed(_exception.ice_id());
                }
            }
        }
        _state = state;

        notifyAll();

        if (_state == StateClosing && _upcallCount == 0) {
            try {
                initiateShutdown();
            } catch (LocalException ex) {
                setState(StateClosed, ex);
            }
        }
    }

    private void initiateShutdown() {
        assert (_state == StateClosing && _upcallCount == 0);

        if (_shutdownInitiated) {
            return;
        }
        _shutdownInitiated = true;

        if (!_endpoint.datagram()) {
            //
            // Before we shut down, we send a close connection message.
            //
            OutputStream os = new OutputStream(Protocol.currentProtocolEncoding);
            os.writeBlob(Protocol.magic);
            Protocol.currentProtocol.ice_writeMembers(os);
            Protocol.currentProtocolEncoding.ice_writeMembers(os);
            os.writeByte(Protocol.closeConnectionMsg);
            os.writeByte((byte) 0); // compression status
            os.writeInt(Protocol.headerSize); // Message size.

            scheduleCloseTimer();

            if ((sendMessage(new OutgoingMessage(os, false)) & AsyncStatus.Sent) > 0) {
                setState(StateClosingPending);

                //
                // Notify the transceiver of the graceful connection closure.
                //
                int op = _transceiver.closing(true, _exception);
                if (op != 0) {
                    _threadPool.register(this, op);
                }
            }
        }
    }

    private boolean initialize(int operation) {
        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer());
        if (s != SocketOperation.None) {
            _threadPool.update(this, operation, s);
            return false;
        }

        //
        // Update the connection description once the transceiver is initialized.
        //
        _desc = _transceiver.toString();
        _initialized = true;
        setState(StateNotValidated);

        return true;
    }

    private boolean validate(int operation) {
        // Datagram connections are always implicitly validated.
        if (!_endpoint.datagram()) {
            if (_connector == null) {
                // The server side has the active role for connection validation.

                if (_writeStream.isEmpty()) {
                    _writeStream.writeBlob(Protocol.magic);
                    Protocol.currentProtocol.ice_writeMembers(_writeStream);
                    Protocol.currentProtocolEncoding.ice_writeMembers(_writeStream);
                    _writeStream.writeByte(Protocol.validateConnectionMsg);
                    _writeStream.writeByte((byte) 0); // Compression status
                    // (always zero for validate connection).
                    _writeStream.writeInt(Protocol.headerSize); // Message size.
                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
                    _writeStream.prepareWrite();
                }

                if (_observer != null) {
                    observerStartWrite(_writeStream.getBuffer());
                }

                if (_writeStream.pos() != _writeStream.size()) {
                    int op = write(_writeStream.getBuffer());
                    if (op != 0) {
                        _threadPool.update(this, operation, op);
                        return false;
                    }
                }

                if (_observer != null) {
                    observerFinishWrite(_writeStream.getBuffer());
                }
            } else {
                // The client side has the passive role for connection validation.

                if (_readStream.isEmpty()) {
                    _readStream.resize(Protocol.headerSize);
                    _readStream.pos(0);
                }

                if (_observer != null) {
                    observerStartRead(_readStream.getBuffer());
                }

                if (_readStream.pos() != _readStream.size()) {
                    int op = read(_readStream.getBuffer());
                    if (op != 0) {
                        _threadPool.update(this, operation, op);
                        return false;
                    }
                }

                if (_observer != null) {
                    observerFinishRead(_readStream.getBuffer());
                }

                _validated = true;

                assert (_readStream.pos() == Protocol.headerSize);
                _readStream.pos(0);
                byte[] m = _readStream.readBlob(4);
                if (m[0] != Protocol.magic[0]
                    || m[1] != Protocol.magic[1]
                    || m[2] != Protocol.magic[2]
                    || m[3] != Protocol.magic[3]) {
                    throw new ProtocolException(
                        "Bad magic in message header: "
                            + Integer.toHexString(m[0])
                            + " "
                            + Integer.toHexString(m[1])
                            + " "
                            + Integer.toHexString(m[2])
                            + " "
                            + Integer.toHexString(m[3]));
                }

                _readProtocol.ice_readMembers(_readStream);
                Protocol.checkSupportedProtocol(_readProtocol);

                _readProtocolEncoding.ice_readMembers(_readStream);
                Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding);

                byte messageType = _readStream.readByte();
                if (messageType != Protocol.validateConnectionMsg) {
                    throw new ProtocolException(
                        "Received message of type "
                            + messageType
                            + " over a connection that is not yet validated.");
                }
                _readStream.readByte(); // Ignore compression status for validate connection.
                int size = _readStream.readInt();
                if (size != Protocol.headerSize) {
                    throw new MarshalException(
                        "Received ValidateConnection message with unexpected size "
                            + size
                            + ".");
                }
                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);

                // Client connection starts sending heartbeats once it has received the
                // ValidateConnection message.
                if (_idleTimeoutTransceiver != null) {
                    _idleTimeoutTransceiver.scheduleHeartbeat();
                }
            }
        }

        _writeStream.resize(0);
        _writeStream.pos(0);

        _readStream.resize(Protocol.headerSize);
        _readStream.pos(0);
        _readHeader = true;

        if (_instance.traceLevels().network >= 1) {
            StringBuffer s = new StringBuffer();
            if (_endpoint.datagram()) {
                s.append("starting to ");
                s.append(_connector != null ? "send" : "receive");
                s.append(" ");
                s.append(_endpoint.protocol());
                s.append(" messages\n");
                s.append(_transceiver.toDetailedString());
            } else {
                s.append(_connector != null ? "established" : "accepted");
                s.append(" ");
                s.append(_endpoint.protocol());
                s.append(" connection\n");
                s.append(toString());
            }
            _instance
                .initializationData()
                .logger
                .trace(_instance.traceLevels().networkCat, s.toString());
        }

        return true;
    }

    /**
     * Sends the next queued messages. This method is called by message() once the message which is
     * being sent (_sendStreams.First) is fully sent. Before sending the next message, this message
     * is removed from _sendsStream. If any, its sent callback is also queued in given callback
     * queue.
     *
     * @param callbacks The sent callbacks to call for the messages that were sent.
     * @return The socket operation to register with the thread pool's selector to send the
     *     remainder of the pending message being sent (_sendStreams.First).
     */
    private int sendNextMessage(List<OutgoingMessage> callbacks) {
        if (_sendStreams.isEmpty()) {
            // This can occur if no message was being written and the socket write operation was
            // registered with the thread pool (a transceiver read method can request writing data).
            return SocketOperation.None;
        } else if (_state == StateClosingPending && _writeStream.pos() == 0) {
            // Message wasn't sent, empty the _writeStream, we're not going to send more data
            // because the connection is being closed.
            OutgoingMessage message = _sendStreams.getFirst();
            _writeStream.swap(message.stream);
            return SocketOperation.None;
        }

        // Assert that the message was fully written.
        assert (!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());

        try {
            while (true) {
                //
                // The message that was being sent is sent. We can swap back the write stream buffer
                // to the outgoing message (required for retry) and queue its sent callback (if
                // any).
                //
                OutgoingMessage message = _sendStreams.getFirst();
                _writeStream.swap(message.stream);
                if (message.sent()) {
                    callbacks.add(message);
                }
                _sendStreams.removeFirst();

                //
                // If there's nothing left to send, we're done.
                //
                if (_sendStreams.isEmpty()) {
                    break;
                }

                //
                // If we are in the closed state or if the close is pending, don't continue sending.
                //
                // This can occur if parseMessage (called before
                // sendNextMessage by message()) closes the connection.
                //
                if (_state >= StateClosingPending) {
                    return SocketOperation.None;
                }

                //
                // Otherwise, prepare the next message.
                //
                message = _sendStreams.getFirst();
                assert (!message.prepared);
                OutputStream stream = message.stream;

                message.stream = doCompress(stream, message.compress);
                message.stream.prepareWrite();
                message.prepared = true;

                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);

                //
                // Send the message.
                //
                _writeStream.swap(message.stream);
                if (_observer != null) {
                    observerStartWrite(_writeStream.getBuffer());
                }
                if (_writeStream.pos() != _writeStream.size()) {
                    int op = write(_writeStream.getBuffer());
                    if (op != 0) {
                        return op;
                    }
                }
                if (_observer != null) {
                    observerFinishWrite(_writeStream.getBuffer());
                }

                // If the message was sent right away, loop to send the next queued message.
            }

            // Once the CloseConnection message is sent, we transition to the StateClosingPending
            // state.
            if (_state == StateClosing && _shutdownInitiated) {
                setState(StateClosingPending);
                int op = _transceiver.closing(true, _exception);
                if (op != 0) {
                    return op;
                }
            }
        } catch (LocalException ex) {
            setState(StateClosed, ex);
        }
        return SocketOperation.None;
    }

    /**
     * Sends or queues the given message.
     *
     * @param message The message to send.
     * @return The send status.
     */
    private int sendMessage(OutgoingMessage message) {
        assert _state >= StateActive;
        assert _state < StateClosed;

        // Some messages are queued for sending. Just adds the message to the send queue and tell
        // the caller that the message was queued.
        if (!_sendStreams.isEmpty()) {
            _sendStreams.addLast(message);
            return AsyncStatus.Queued;
        }

        assert (!message.prepared);

        // Prepare the message for sending.
        OutputStream stream = message.stream;

        message.stream = doCompress(stream, message.compress);
        message.stream.prepareWrite();
        message.prepared = true;
        int op;
        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);

        // Send the message without blocking.
        if (_observer != null) {
            observerStartWrite(message.stream.getBuffer());
        }
        op = write(message.stream.getBuffer());
        if (op == 0) {
            // The message was sent so we're done.
            if (_observer != null) {
                observerFinishWrite(message.stream.getBuffer());
            }

            int status = AsyncStatus.Sent;
            if (message.sent()) {
                status |= AsyncStatus.InvokeSentCallback;
            }

            return status;
        }

        // The message couldn't be sent right away so we add it to the send stream queue (which is
        // empty) and swap its stream with `_writeStream`. The socket operation returned by the
        // transceiver write is registered with the thread
        // pool. At this point the message() method will take care of sending the whole
        // message (held by _writeStream) when the transceiver is ready to write more of the message
        // buffer.

        _writeStream.swap(message.stream);
        _sendStreams.addLast(message);
        _threadPool.register(this, op);
        return AsyncStatus.Queued;
    }

    private OutputStream doCompress(OutputStream uncompressed, boolean compress) {
        boolean compressionSupported = false;
        if (compress) {
            //
            // Don't check whether compression support is available unless the proxy is configured
            // for compression.
            //
            compressionSupported = BZip2.supported();
        }

        if (compressionSupported && uncompressed.size() >= 100) {
            //
            // Do compression.
            //
            Buffer cbuf =
                BZip2.compress(
                    uncompressed.getBuffer(), Protocol.headerSize, _compressionLevel);
            if (cbuf != null) {
                var cstream = new OutputStream(new Buffer(cbuf, true), uncompressed.getEncoding());

                //
                // Set compression status.
                //
                cstream.pos(9);
                cstream.writeByte((byte) 2);

                //
                // Write the size of the compressed stream into the header.
                //
                cstream.pos(10);
                cstream.writeInt(cstream.size());

                //
                // Write the compression status and size of the compressed
                // stream into the header of the uncompressed stream -- we need
                // this to trace requests correctly.
                //
                uncompressed.pos(9);
                uncompressed.writeByte((byte) 2);
                uncompressed.writeInt(cstream.size());

                return cstream;
            }
        }

        uncompressed.pos(9);
        uncompressed.writeByte((byte) (compressionSupported ? 1 : 0));

        //
        // Not compressed, fill in the message size.
        //
        uncompressed.pos(10);
        uncompressed.writeInt(uncompressed.size());

        return uncompressed;
    }

    private static class MessageInfo {
        MessageInfo(InputStream stream) {
            this.stream = stream;
        }

        InputStream stream;
        int requestCount;
        int requestId;
        byte compress;
        ObjectAdapter adapter;
        OutgoingAsyncBase outAsync;
        int upcallCount;
    }

    private int parseMessage(MessageInfo info) {
        assert (_state > StateNotValidated && _state < StateClosed);

        _readStream.swap(info.stream);
        _readStream.resize(Protocol.headerSize);
        _readStream.pos(0);
        _readHeader = true;

        assert (info.stream.pos() == info.stream.size());

        try {
            //
            // We don't need to check magic and version here. This has already
            // been done by the ThreadPool which provides us with the stream.
            //
            info.stream.pos(8);
            byte messageType = info.stream.readByte();
            info.compress = info.stream.readByte();
            if (info.compress == (byte) 2) {
                if (BZip2.supported()) {
                    Buffer ubuf =
                        BZip2.uncompress(
                            info.stream.getBuffer(), Protocol.headerSize, _messageSizeMax);
                    info.stream =
                        new InputStream(
                            info.stream.instance(), info.stream.getEncoding(), ubuf, true);
                } else {
                    throw new FeatureNotSupportedException(
                        "Cannot uncompress compressed message: "
                            + "org.apache.tools.bzip2.CBZip2OutputStream was not found");
                }
            }
            info.stream.pos(Protocol.headerSize);

            switch (messageType) {
                case Protocol.closeConnectionMsg: {
                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
                    if (_endpoint.datagram()) {
                        if (_warn) {
                            _logger.warning(
                                "ignoring close connection message for datagram"
                                    + " connection:\n"
                                    + _desc);
                        }
                    } else {
                        setState(StateClosingPending, new CloseConnectionException());

                        //
                        // Notify the transceiver of the graceful connection closure.
                        //
                        int op = _transceiver.closing(false, _exception);
                        if (op != 0) {
                            scheduleCloseTimer();
                            return op;
                        }
                        setState(StateClosed);
                    }
                    break;
                }

                case Protocol.requestMsg: {
                    if (_state >= StateClosing) {
                        TraceUtil.trace(
                            "received request during closing\n"
                                + "(ignored by server, client will retry)",
                            info.stream,
                            this,
                            _logger,
                            _traceLevels);
                    } else {
                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
                        info.requestId = info.stream.readInt();
                        info.requestCount = 1;
                        info.adapter = _adapter;
                        ++info.upcallCount;

                        cancelInactivityTimer();
                        ++_dispatchCount;
                    }
                    break;
                }

                case Protocol.requestBatchMsg: {
                    if (_state >= StateClosing) {
                        TraceUtil.trace(
                            "received batch request during closing\n"
                                + "(ignored by server, client will retry)",
                            info.stream,
                            this,
                            _logger,
                            _traceLevels);
                    } else {
                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
                        info.requestCount = info.stream.readInt();
                        if (info.requestCount < 0) {
                            info.requestCount = 0;
                            throw new MarshalException(
                                "Received batch request with "
                                    + info.requestCount
                                    + "batches.");
                        }
                        info.adapter = _adapter;
                        info.upcallCount += info.requestCount;

                        cancelInactivityTimer();
                        _dispatchCount += info.requestCount;
                    }
                    break;
                }

                case Protocol.replyMsg: {
                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
                    info.requestId = info.stream.readInt();

                    var outAsync = _asyncRequests.remove(info.requestId);
                    if (outAsync != null && outAsync.completed(info.stream)) {
                        info.outAsync = outAsync;
                        ++info.upcallCount;
                    }
                    if (_closeRequested && _state < StateClosing && _asyncRequests.isEmpty()) {
                        doApplicationClose();
                    }
                    break;
                }

                case Protocol.validateConnectionMsg: {
                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
                    break;
                }

                default: {
                    TraceUtil.trace(
                        "received unknown message\n(invalid, closing connection)",
                        info.stream,
                        this,
                        _logger,
                        _traceLevels);
                    throw new ProtocolException(
                        "Received Ice protocol message with unknown type: " + messageType);
                }
            }
        } catch (LocalException ex) {
            if (_endpoint.datagram()) {
                if (_warn) {
                    _logger.warning("datagram connection exception:\n" + ex + '\n' + _desc);
                }
            } else {
                setState(StateClosed, ex);
            }
        }

        if (_state == StateHolding) {
            // Don't continue reading if the connection is in the holding state.
            return SocketOperation.None;
        } else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches) {
            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
            if (_idleTimeoutTransceiver != null) {
                _idleTimeoutTransceiver.disableIdleCheck();
            }
            return SocketOperation.None;
        } else {
            // Continue reading.
            return SocketOperation.Read;
        }
    }

    private void dispatchAll(
            InputStream stream,
            int requestCount,
            int requestId,
            byte compress,
            ObjectAdapter adapter) {

        // Note: In contrast to other private or protected methods, this method must be called
        // *without* the mutex locked.

        Object dispatcher = adapter != null ? adapter.dispatchPipeline() : null;

        try {
            while (requestCount > 0) {
                // adapter can be null here, however we never pass a null current.adapter to the
                // application code.
                var request = new IncomingRequest(requestId, this, adapter, stream);
                final boolean isTwoWay = !_endpoint.datagram() && requestId != 0;

                if (dispatcher != null) {
                    CompletionStage<OutgoingResponse> response = null;
                    try {
                        response = dispatcher.dispatch(request);
                    } catch (Throwable ex) { // UserException or an unchecked exception
                        sendResponse(
                            request.current.createOutgoingResponse(ex), isTwoWay, (byte) 0);
                    }
                    if (response != null) {
                        response.whenComplete(
                            (result, exception) -> {
                                if (exception != null) {
                                    sendResponse(
                                        request.current.createOutgoingResponse(exception),
                                        isTwoWay,
                                        (byte) 0);
                                } else {
                                    sendResponse(result, isTwoWay, compress);
                                }
                                // Any exception thrown by this closure is effectively ignored.
                            });
                    }
                } else {
                    // Received request on a connection without an object adapter.
                    sendResponse(
                        request.current.createOutgoingResponse(new ObjectNotExistException()),
                        isTwoWay,
                        (byte) 0);
                }
                --requestCount;
            }
            stream.clear();

        } catch (LocalException ex) {
            dispatchException(ex, requestCount);
        } catch (RuntimeException | Error ex) {
            // A runtime exception or an error was thrown outside of servant code (i.e., by Ice
            // code). Note that this code does NOT send a response to the client.
            var uex = new UnknownException(ex);
            var sw = new StringWriter();
            var pw = new PrintWriter(sw);
            ex.printStackTrace(pw);
            pw.flush();

            _logger.error(sw.toString());
            dispatchException(uex, requestCount);
        }
    }

    private void sendResponse(OutgoingResponse response, boolean isTwoWay, byte compress) {
        final OutputStream outputStream = response.outputStream;
        boolean finished = false;
        try {
            synchronized (this) {
                assert (_state > StateNotValidated);
                try {
                    if (--_upcallCount == 0) {
                        if (_state == StateFinished) {
                            finished = true;
                            if (_observer != null) {
                                _observer.detach();
                            }
                        }
                        notifyAll();
                    }

                    if (_state < StateClosed) {
                        if (isTwoWay) {
                            sendMessage(new OutgoingMessage(outputStream, compress != 0));
                        }

                        if (_state == StateActive
                            && _maxDispatches > 0
                            && _dispatchCount == _maxDispatches) {
                            // Resume reading if the connection is active and the dispatch count is
                            // about to be less than _maxDispatches.
                            _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
                            if (_idleTimeoutTransceiver != null) {
                                _idleTimeoutTransceiver.enableIdleCheck();
                            }
                        }

                        --_dispatchCount;
                    }

                    if (_state == StateClosing && _upcallCount == 0) {
                        initiateShutdown();
                    }
                } catch (LocalException ex) {
                    setState(StateClosed, ex);
                }
            }
        } finally {
            if (finished && _removeFromFactory != null) {
                _removeFromFactory.accept(this);
            }
        }
    }

    private void dispatchException(LocalException ex, int requestCount) {
        boolean finished = false;
        synchronized (this) {
            // Fatal exception while dispatching a request. Since sendResponse isn't called in case
            // of a fatal exception we decrement _upcallCount here.

            setState(StateClosed, ex);

            if (requestCount > 0) {
                assert (_upcallCount > 0);
                _upcallCount -= requestCount;
                assert (_upcallCount >= 0);
                if (_upcallCount == 0) {
                    if (_state == StateFinished) {
                        finished = true;
                        if (_observer != null) {
                            _observer.detach();
                        }
                    }
                    notifyAll();
                }
            }
        }

        if (finished && _removeFromFactory != null) {
            _removeFromFactory.accept(this);
        }
    }

    private ConnectionInfo initConnectionInfo() {
        // Called in synchronization

        // Update the connection information until it's initialized
        if (_state > StateNotInitialized && _info != null) {
            return _info;
        }

        boolean incoming = _connector == null;
        _info =
            _transceiver.getInfo(
                incoming,
                _adapter != null ? _adapter.getName() : "",
                _endpoint.connectionId());
        return _info;
    }

    private ConnectionState toConnectionState(int state) {
        return connectionStateMap[state];
    }

    private void warning(String msg, Exception ex) {
        StringWriter sw = new StringWriter();
        PrintWriter pw = new PrintWriter(sw);
        ex.printStackTrace(pw);
        pw.flush();
        String s = msg + ":\n" + _desc + "\n" + sw.toString();
        _logger.warning(s);
    }

    private void observerStartRead(Buffer buf) {
        if (_readStreamPos >= 0) {
            assert (!buf.empty());
            _observer.receivedBytes(buf.b.position() - _readStreamPos);
        }
        _readStreamPos = buf.empty() ? -1 : buf.b.position();
    }

    private void observerFinishRead(Buffer buf) {
        if (_readStreamPos == -1) {
            return;
        }
        assert (buf.b.position() >= _readStreamPos);
        _observer.receivedBytes(buf.b.position() - _readStreamPos);
        _readStreamPos = -1;
    }

    private void observerStartWrite(Buffer buf) {
        if (_writeStreamPos >= 0) {
            assert (!buf.empty());
            _observer.sentBytes(buf.b.position() - _writeStreamPos);
        }
        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
    }

    private void observerFinishWrite(Buffer buf) {
        if (_writeStreamPos == -1) {
            return;
        }
        if (buf.b.position() > _writeStreamPos) {
            _observer.sentBytes(buf.b.position() - _writeStreamPos);
        }
        _writeStreamPos = -1;
    }

    private int read(Buffer buf) {
        int start = buf.b.position();
        int op = _transceiver.read(buf);
        if (_instance.traceLevels().network >= 3 && buf.b.position() != start) {
            StringBuffer s = new StringBuffer("received ");
            if (_endpoint.datagram()) {
                s.append(buf.b.limit());
            } else {
                s.append(buf.b.position() - start);
                s.append(" of ");
                s.append(buf.b.limit() - start);
            }
            s.append(" bytes via ");
            s.append(_endpoint.protocol());
            s.append("\n");
            s.append(toString());

            _instance
                .initializationData()
                .logger
                .trace(_instance.traceLevels().networkCat, s.toString());
        }
        return op;
    }

    private synchronized void inactivityCheck() {
        if (_inactivityTimerFuture.getDelay(TimeUnit.NANOSECONDS) <= 0) {
            _inactivityTimerFuture = null;

            if (_state == StateActive) {
                setState(
                    StateClosing,
                    new ConnectionClosedException(
                        "Connection closed because it remained inactive for longer than the"
                            + " inactivity timeout.",
                        false));
            }
        }
        // Else this timer was already canceled and disposed. Nothing to do.
    }

    private synchronized void connectTimedOut() {
        if (_state < StateActive) {
            setState(StateClosed, new ConnectTimeoutException());
        }
        // else ignore since we're already connected
    }

    private synchronized void closeTimedOut() {
        if (_state < StateClosed) {
            // We don't use setState(state, exception) because we want to overwrite the exception
            // set by a graceful closure.
            _exception = new CloseTimeoutException();
            setState(StateClosed);
        }
        // else ignore since we're already closed.
    }

    private int write(Buffer buf) {
        int start = buf.b.position();
        int op = _transceiver.write(buf);
        if (_instance.traceLevels().network >= 3 && buf.b.position() != start) {
            StringBuffer s = new StringBuffer("sent ");
            s.append(buf.b.position() - start);
            if (!_endpoint.datagram()) {
                s.append(" of ");
                s.append(buf.b.limit() - start);
            }
            s.append(" bytes via ");
            s.append(_endpoint.protocol());
            s.append("\n");
            s.append(toString());
            _instance
                .initializationData()
                .logger
                .trace(_instance.traceLevels().networkCat, s.toString());
        }
        return op;
    }

    private void scheduleInactivityTimer() {
        // Called within the synchronization lock
        assert _inactivityTimerFuture == null;
        assert _inactivityTimeout > 0;

        _inactivityTimerFuture =
            _timer.schedule(this::inactivityCheck, _inactivityTimeout, TimeUnit.SECONDS);
    }

    private void cancelInactivityTimer() {
        // Called within the synchronization lock
        if (_inactivityTimerFuture != null) {
            _inactivityTimerFuture.cancel(false);
            _inactivityTimerFuture = null;
        }
    }

    private void scheduleCloseTimer() {
        if (_closeTimeout > 0) {
            // Schedules a one-time check.
            _timer.schedule(this::closeTimedOut, _closeTimeout, TimeUnit.SECONDS);
        }
    }

    // Only public so that tests can initiate a closure without blocking on it to complete.
    public synchronized void doApplicationClose() {
        assert (_state < StateClosing);
        setState(
            StateClosing,
            new ConnectionClosedException(
                "connection closed gracefully by the application", true));
    }

    private static class OutgoingMessage {
        OutgoingMessage(OutputStream stream, boolean compress) {
            this.stream = stream;
            this.compress = compress;
            this.requestId = 0;
        }

        OutgoingMessage(
                OutgoingAsyncBase out, OutputStream stream, boolean compress, int requestId) {
            this.stream = stream;
            this.compress = compress;
            this.outAsync = out;
            this.requestId = requestId;
        }

        public void canceled() {
            assert (outAsync != null);
            outAsync = null;
        }

        public boolean sent() {
            if (outAsync != null) {
                return outAsync.sent();
            }
            return false;
        }

        public void completed(LocalException ex) {
            if (outAsync != null && outAsync.completed(ex)) {
                outAsync.invokeCompleted();
            }
        }

        public OutputStream stream;
        public OutgoingAsyncBase outAsync;
        public boolean compress;
        public int requestId;
        boolean prepared;
    }

    private Communicator _communicator;
    private final Instance _instance;
    private final Transceiver _transceiver;
    private final IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
    private String _desc;
    private final String _type;
    private final Connector _connector;
    private final EndpointI _endpoint;

    private ObjectAdapter _adapter;

    private final boolean _executor;
    private final Logger _logger;
    private final TraceLevels _traceLevels;
    private final ThreadPool _threadPool;

    // All these timeouts are in seconds. A value <= 0 means infinite timeout.
    private final int _connectTimeout;
    private final int _closeTimeout;
    private int _inactivityTimeout;

    private ScheduledFuture<?> _inactivityTimerFuture; // can be null

    private final ScheduledExecutorService _timer;

    private StartCallback _startCallback;

    private final Consumer<ConnectionI> _removeFromFactory;

    private final boolean _warn;
    private final boolean _warnUdp;

    private final int _compressionLevel;

    private int _nextRequestId;

    private final Map<Integer, OutgoingAsyncBase> _asyncRequests = new HashMap<>();

    private LocalException _exception;

    private final int _messageSizeMax;
    private final BatchRequestQueue _batchRequestQueue;

    private final LinkedList<OutgoingMessage> _sendStreams = new LinkedList<>();

    // Contains the message which is being received. If the connection is waiting to receive a
    // message (_readHeader == true), its size is Protocol.headerSize. Otherwise,
    // its size is the message size specified in the received message header.
    private final InputStream _readStream;

    // When _readHeader is true, the next bytes we'll read are the header of a new
    // message. When false, we're reading next the remainder of a message that was already partially
    // received.
    private boolean _readHeader;

    // Contains the message which is being sent. The write stream buffer is empty if no message is
    // being sent.
    private final OutputStream _writeStream;

    private ConnectionObserver _observer;
    private int _readStreamPos;
    private int _writeStreamPos;

    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent
    // callbacks and connection establishment callbacks that have been started (or are about to be
    // started) by a thread of the thread pool associated with this connection, and have not
    // completed yet. All these operations except the connection establishment callbacks execute
    // application code or code generated from Slice definitions.
    private int _upcallCount;

    // The number of outstanding dispatches. Maintained only while state is
    // StateActive or StateHolding.
    private int _dispatchCount;

    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to
    // back-pressure the peer. _maxDispatches <= 0 means no limit.
    private final int _maxDispatches;

    private int _state; // The current state.
    private boolean _shutdownInitiated;
    private boolean _initialized;
    private boolean _validated;

    // When true, the application called close and Connection must close the
    // connection when it receives the reply for the last outstanding invocation.
    private boolean _closeRequested;

    private final ProtocolVersion _readProtocol = new ProtocolVersion();
    private final EncodingVersion _readProtocolEncoding = new EncodingVersion();

    private ConnectionInfo _info;

    private CloseCallback _closeCallback;

    private static final ConnectionState[] connectionStateMap = {
        ConnectionState.ConnectionStateValidating, // StateNotInitialized
        ConnectionState.ConnectionStateValidating, // StateNotValidated
        ConnectionState.ConnectionStateActive, // StateActive
        ConnectionState.ConnectionStateHolding, // StateHolding
        ConnectionState.ConnectionStateClosing, // StateClosing
        ConnectionState.ConnectionStateClosing, // StateClosingPending
        ConnectionState.ConnectionStateClosed, // StateClosed
        ConnectionState.ConnectionStateClosed, // StateFinished
    };
}