OutgoingAsync.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
 * @hidden Public because it's used by the generated code.
 */
public class OutgoingAsync<T> extends ProxyOutgoingAsyncBase<T> {
    /**
     * Functional interface for unmarshaling response data from an input stream.
     *
     * @param <V> the type of the value to unmarshal
     */
    @FunctionalInterface
    public static interface Unmarshaler<V> {
        /**
         * Unmarshals a value from the input stream.
         *
         * @param istr the input stream to read from
         * @return the unmarshaled value
         */
        V unmarshal(InputStream istr);
    }

    /**
     * Constructs an OutgoingAsync for the specified proxy and operation.
     *
     * @param prx the proxy to invoke the operation on
     * @param operation the name of the operation to invoke
     * @param mode the operation mode
     * @param synchronous whether the operation is synchronous
     * @param userExceptions the valid user exceptions for this operation
     */
    public OutgoingAsync(
            ObjectPrx prx,
            String operation,
            OperationMode mode,
            boolean synchronous,
            Class<?>[] userExceptions) {
        super((_ObjectPrxI) prx, operation, null);
        _mode = mode == null ? OperationMode.Normal : mode;
        _synchronous = synchronous;
        _userExceptions = userExceptions;
        _encoding = Protocol.getCompatibleEncoding(_proxy._getReference().getEncoding());

        if (_instance.cacheMessageBuffers() > 0) {
            _ObjectPrxI.StreamPair p = _proxy._getCachedMessageBuffers();
            if (p != null) {
                _is = p.is;
                _os = p.os;
            }
        }
        if (_os == null) {
            _os =
                new OutputStream(
                    Protocol.currentProtocolEncoding,
                    _instance.defaultsAndOverrides().defaultFormat,
                    _instance.cacheMessageBuffers() > 1);
        }
    }

    /**
     * Invokes the operation.
     *
     * @param twowayOnly if true, the operation can only be invoked on a twoway proxy
     * @param ctx the operation context
     * @param format the format type for marshaling parameters
     * @param marshal the marshaler for writing parameters, or null if no parameters
     * @param unmarshal the unmarshaler for reading the response, or null if no response expected
     */
    public void invoke(
            boolean twowayOnly,
            Map<String, String> ctx,
            FormatType format,
            OutputStream.Marshaler marshal,
            Unmarshaler<T> unmarshal) {
        _unmarshal = unmarshal;

        if (twowayOnly && !_proxy.ice_isTwoway()) {
            throw new TwowayOnlyException(_operation);
        }

        try {
            prepare(ctx);

            if (marshal == null) {
                writeEmptyParams();
            } else {
                marshal.marshal(startWriteParams(format));
                endWriteParams();
            }

            if (isBatch()) {
                // NOTE: we don't call sent/completed callbacks for batch AMI requests
                _sentSynchronously = true;
                _proxy._getReference()
                    .getBatchRequestQueue()
                    .finishBatchRequest(_os, _proxy, _operation);
                finished(true, false);
            } else {
                // invokeImpl can throw; we handle this exception by calling abort
                invokeImpl(true); // userThread = true
            }
        } catch (LocalException ex) {
            abort(ex);
        }
    }

    @Override
    public T waitForResponse() {
        if (isBatch()) {
            return null; // The future will not be completed for a batch invocation.
        }

        try {
            return waitForResponseOrUserEx();
        } catch (UserException ex) {
            throw UnknownUserException.fromTypeId(ex.ice_id());
        }
    }

    /**
     * Waits for the response and returns the result, allowing user exceptions to be thrown.
     *
     * @return the result of the operation
     * @throws UserException if the operation failed with a user exception
     * @throws LocalException if the operation failed with a local exception
     */
    public T waitForResponseOrUserEx() throws UserException {
        try {
            return get();
        } catch (InterruptedException ex) {
            throw new OperationInterruptedException(ex);
        } catch (ExecutionException ee) {
            try {
                throw ee.getCause().fillInStackTrace();
            } catch (RuntimeException ex /* Includes LocalException */) {
                throw ex;
            } catch (UserException ex) {
                throw ex;
            } catch (Throwable ex) {
                throw new UnknownException(ex);
            }
        }
    }

    @Override
    public boolean sent() {
        return sent(!_proxy.ice_isTwoway()); // done = true if not a two-way proxy (no response
        // expected)
    }

    @Override
    public int invokeRemote(ConnectionI connection, boolean compress, boolean response)
        throws RetryException {
        _cachedConnection = connection;
        return connection.sendAsyncRequest(this, compress, response, 0);
    }

    @Override
    public int invokeCollocated(CollocatedRequestHandler handler) {
        // The stream cannot be cached if the proxy is not a twoway or there is an invocation
        // timeout set.
        if (!_proxy.ice_isTwoway()
            || _proxy._getReference().getInvocationTimeout().compareTo(Duration.ZERO) > 0) {
            // Disable caching by marking the streams as cached!
            _state |= StateCachedBuffers;
        }
        return handler.invokeAsyncRequest(this, 0, _synchronous);
    }

    @Override
    public void abort(LocalException ex) {
        if (isBatch()) {
            //
            // If we didn't finish a batch oneway or datagram request, we
            // must notify the connection about that we give up ownership
            // of the batch stream.
            //
            _proxy._getReference().getBatchRequestQueue().abortBatchRequest(_os);
        }

        super.abort(ex);
    }

    @Override
    protected void markCompleted() {
        try {
            if (!_proxy.ice_isTwoway()) {
                //
                // For a non-twoway proxy, the invocation is completed after it is sent.
                //
                complete(null);
            } else if ((_state & StateOK) > 0) {
                T r = null;
                try {
                    if (_unmarshal != null) {
                        //
                        // The Unmarshaler callback unmarshals and returns the results.
                        //
                        r = _unmarshal.unmarshal(startReadParams());
                        endReadParams();
                    } else {
                        readEmptyParams();
                    }
                } catch (LocalException ex) {
                    completeExceptionally(ex);
                    return;
                }
                complete(r);
            } else {
                //
                // Handle user exception.
                //
                try {
                    throwUserException();
                } catch (UserException ex) {
                    if (_userExceptions != null) {
                        for (int i = 0; i < _userExceptions.length; i++) {
                            if (_userExceptions[i].isInstance(ex)) {
                                completeExceptionally(ex);
                                return;
                            }
                        }
                    }
                    completeExceptionally(UnknownUserException.fromTypeId(ex.ice_id()));
                } catch (Throwable ex) {
                    completeExceptionally(ex);
                }
            }
        } finally {
            cacheMessageBuffers();
        }
    }

    @Override
    public final boolean completed(InputStream is) {
        //
        // NOTE: this method is called from ConnectionI.parseMessage
        // with the connection locked. Therefore, it must not invoke any user callbacks.
        //

        // _is can already be initialized if the invocation is retried
        if (_is == null) {
            _is =
                new InputStream(
                    _instance,
                    Protocol.currentProtocolEncoding,
                    _instance.cacheMessageBuffers() > 1);
        }
        _is.swap(is);

        return super.completed(_is);
    }

    private OutputStream startWriteParams(FormatType format) {
        _os.startEncapsulation(_encoding, format);
        return _os;
    }

    private void endWriteParams() {
        _os.endEncapsulation();
    }

    private void writeEmptyParams() {
        _os.writeEmptyEncapsulation(_encoding);
    }

    private InputStream startReadParams() {
        _is.startEncapsulation();
        return _is;
    }

    private void endReadParams() {
        _is.endEncapsulation();
    }

    private void readEmptyParams() {
        _is.skipEmptyEncapsulation();
    }

    private final void throwUserException() throws UserException {
        try {
            _is.startEncapsulation();
            _is.throwException();
        } catch (UserException ex) {
            _is.endEncapsulation();
            throw ex;
        }
    }

    @Override
    protected void cacheMessageBuffers() {
        if (_instance.cacheMessageBuffers() > 0) {
            synchronized (this) {
                if ((_state & StateCachedBuffers) > 0) {
                    return;
                }
                _state |= StateCachedBuffers;
            }

            if (_is != null) {
                _is.reset();
            }
            _os.reset();

            _proxy._cacheMessageBuffers(_is, _os);

            _is = null;
            _os = null;
        }
    }

    private final EncodingVersion _encoding;
    private InputStream _is;

    private final Class<?>[] _userExceptions; // Valid user exceptions.
    private Unmarshaler<T> _unmarshal;
}