OutgoingAsyncBase.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import com.zeroc.Ice.Instrumentation.ChildInvocationObserver;
import com.zeroc.Ice.Instrumentation.InvocationObserver;

import java.util.concurrent.ExecutionException;

//
// Base class for handling asynchronous invocations. This class is
// responsible for the handling of the output stream and the child invocation observer.
//
abstract class OutgoingAsyncBase<T> extends InvocationFuture<T> {
    public boolean sent() {
        return sent(true);
    }

    public boolean completed(InputStream is) {
        assert false; // Must be implemented by classes that handle responses
        return false;
    }

    public boolean completed(LocalException ex) {
        return finished(ex);
    }

    public final void attachRemoteObserver(ConnectionInfo info, Endpoint endpt, int requestId) {
        InvocationObserver observer = getObserver();
        if (observer != null) {
            final int size = _os.size() - Protocol.headerSize - 4;
            _childObserver = observer.getRemoteObserver(info, endpt, requestId, size);
            if (_childObserver != null) {
                _childObserver.attach();
            }
        }
    }

    public final void attachCollocatedObserver(ObjectAdapter adapter, int requestId) {
        InvocationObserver observer = getObserver();
        if (observer != null) {
            final int size = _os.size() - Protocol.headerSize - 4;
            _childObserver = observer.getCollocatedObserver(adapter, requestId, size);
            if (_childObserver != null) {
                _childObserver.attach();
            }
        }
    }

    public final OutputStream getOs() {
        return _os;
    }

    public T waitForResponse() {
        try {
            return get();
        } catch (InterruptedException ex) {
            throw new OperationInterruptedException(ex);
        } catch (ExecutionException ee) {
            try {
                throw ee.getCause().fillInStackTrace();
            } catch (RuntimeException ex /* Includes LocalException */) {
                throw ex;
            } catch (Throwable ex) {
                throw new UnknownException(ex);
            }
        }
    }

    protected OutgoingAsyncBase(Communicator com, Instance instance, String op) {
        super(com, instance, op);
        _os =
            new OutputStream(
                Protocol.currentProtocolEncoding,
                instance.defaultsAndOverrides().defaultFormat,
                instance.cacheMessageBuffers() > 1);
    }

    protected OutgoingAsyncBase(Communicator com, Instance instance, String op, OutputStream os) {
        super(com, instance, op);
        _os = os;
    }

    @Override
    protected boolean sent(boolean done) {
        if (done) {
            if (_childObserver != null) {
                _childObserver.detach();
                _childObserver = null;
            }
        }
        return super.sent(done);
    }

    @Override
    protected boolean finished(LocalException ex) {
        if (_childObserver != null) {
            _childObserver.failed(ex.ice_id());
            _childObserver.detach();
            _childObserver = null;
        }
        return super.finished(ex);
    }

    protected OutputStream _os;
    protected ChildInvocationObserver _childObserver;
}