ConnectRequestHandler.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

final class ConnectRequestHandler
    implements RequestHandler, Reference.GetConnectionCallback, RouterInfo.AddProxyCallback {
    @Override
    public int sendAsyncRequest(ProxyOutgoingAsyncBase out) throws RetryException {
        synchronized (this) {
            if (!_initialized) {
                out.cancelable(this); // This will throw if the request is canceled
            }

            if (!initialized()) {
                _requests.add(out);
                return AsyncStatus.Queued;
            }
        }
        return out.invokeRemote(_connection, _compress, _response);
    }

    @Override
    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex) {
        synchronized (this) {
            if (_exception != null) {
                return; // The request has been notified of a failure already.
            }

            if (!initialized()) {
                Iterator<ProxyOutgoingAsyncBase> it = _requests.iterator();
                while (it.hasNext()) {
                    OutgoingAsyncBase request = it.next();
                    if (request == outAsync) {
                        it.remove();
                        if (outAsync.completed(ex)) {
                            outAsync.invokeCompletedAsync();
                        }
                        return;
                    }
                }
                // The request has to be queued if it timed out and we're not initialized yet.
                assert false;
            }
        }
        _connection.asyncRequestCanceled(outAsync, ex);
    }

    @Override
    public synchronized ConnectionI getConnection() {
        //
        // First check for the connection, it's important otherwise the user could first get a
        // connection and then the exception if he tries to obtain the proxy cached connection
        // multiple times (the exception can be set after the connection is set if the flush of
        // pending requests fails).
        //
        if (_connection != null) {
            return _connection;
        } else if (_exception != null) {
            throw (LocalException) _exception.fillInStackTrace();
        }
        return null;
    }

    //
    // Implementation of Reference.GetConnectionCallback
    //

    @Override
    public void setConnection(ConnectionI connection, boolean compress) {
        synchronized (this) {
            assert (!_flushing && _exception == null && _connection == null);
            _connection = connection;
            _compress = compress;
        }

        //
        // If this proxy is for a non-local object, and we are using a router, then add this proxy
        // to the router info object.
        //
        RouterInfo ri = _reference.getRouterInfo();
        if (ri != null && !ri.addProxy(_reference, this)) {
            return; // The request handler will be initialized once addProxy returns.
        }

        //
        // We can now send the queued requests.
        //
        flushRequests();
    }

    @Override
    public void setException(final LocalException ex) {
        synchronized (this) {
            assert (!_flushing && !_initialized && _exception == null);
            _exception = ex;
            _flushing = true; // Ensures request handler is removed before processing new requests.
        }

        for (OutgoingAsyncBase outAsync : _requests) {
            if (outAsync.completed(_exception)) {
                outAsync.invokeCompletedAsync();
            }
        }
        _requests.clear();

        synchronized (this) {
            _flushing = false;
            notifyAll();
        }
    }

    //
    // Implementation of RouterInfo.AddProxyCallback
    //
    @Override
    public void addedProxy() {
        //
        // The proxy was added to the router info, we're now ready to send the queued requests.
        //
        flushRequests();
    }

    public ConnectRequestHandler(Reference ref) {
        _reference = ref;
        _response = _reference.isTwoway();
        _initialized = false;
        _flushing = false;
    }

    private boolean initialized() {
        // Must be called with the mutex locked.

        if (_initialized) {
            assert (_connection != null);
            return true;
        } else {
            //
            // This is similar to a mutex lock in that the flag is only true for a short period of
            // time.
            //
            boolean interrupted = false;
            while (_flushing) {
                try {
                    wait();
                } catch (InterruptedException ex) {
                    interrupted = true;
                }
            }
            //
            // Restore the interrupted status.
            //
            if (interrupted) {
                Thread.currentThread().interrupt();
            }

            if (_exception != null) {
                if (_connection != null) {
                    //
                    // Only throw if the connection didn't get established. If it died after being
                    // established, we allow the caller to retry the connection establishment by not
                    // throwing here
                    // (the connection will throw RetryException).
                    //
                    return true;
                }
                throw (LocalException) _exception.fillInStackTrace();
            } else {
                return _initialized;
            }
        }
    }

    private void flushRequests() {
        synchronized (this) {
            assert (_connection != null && !_initialized);

            //
            // We set the _flushing flag to true to prevent any additional queuing. Callers might
            // block for a little while as the queued requests are being sent but this shouldn't be
            // an issue as the request sends are non-blocking.
            //
            _flushing = true;
        }

        LocalException exception = null;
        for (ProxyOutgoingAsyncBase outAsync : _requests) {
            try {
                if ((outAsync.invokeRemote(_connection, _compress, _response)
                    & AsyncStatus.InvokeSentCallback)
                    > 0) {
                    outAsync.invokeSentAsync();
                }
            } catch (RetryException ex) {
                exception = ex.get();
                outAsync.retryException();
            } catch (LocalException ex) {
                exception = ex;
                if (outAsync.completed(ex)) {
                    outAsync.invokeCompletedAsync();
                }
            }
        }
        _requests.clear();

        synchronized (this) {
            assert (!_initialized);
            _exception = exception;
            _initialized = _exception == null;
            _flushing = false;
            notifyAll();
        }
    }

    private final Reference _reference;
    private final boolean _response;

    private ConnectionI _connection;
    private boolean _compress;
    private LocalException _exception;
    private boolean _initialized;
    private boolean _flushing;

    private final List<ProxyOutgoingAsyncBase> _requests = new LinkedList<>();
}