InvocationFuture.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import com.zeroc.Ice.Instrumentation.InvocationObserver;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
/**
* An instance of an InvocationFuture subclass is the return value of an asynchronous invocation.
* With this object, an application can obtain several attributes of the invocation.
*/
public abstract class InvocationFuture<T> extends CompletableFuture<T> {
InvocationFuture(Communicator communicator, Instance instance, String op) {
_communicator = communicator;
_instance = instance;
_operation = op;
_state = 0;
_sentSynchronously = false;
_doneInSent = false;
_synchronous = false;
_exception = null;
}
/**
* If not completed, cancels the request. This is a local operation, it won't cancel the request
* on the server side. Calling cancel prevents a queued request from being sent or
* ignores a reply if the request has already been sent.
*
* @return true if this task is now cancelled
*/
public boolean cancel() {
return cancel(false);
}
/**
* If not completed, cancels the request. This is a local operation, it won't cancel the request
* on the server side. Calling cancel prevents a queued request from being sent or
* ignores a reply if the request has already been sent.
*
* @param mayInterruptIfRunning true if the thread executing this task should be interrupted;
* otherwise, in-progress tasks are allowed to complete
* @return true if this task is now cancelled
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
//
// Call super.cancel(boolean) first. This sets the result of the future.
// Calling cancel(LocalException) also eventually attempts to complete the future
// (exceptionally), but this result is ignored.
//
boolean r = super.cancel(mayInterruptIfRunning);
cancel(new InvocationCanceledException());
return r;
}
/**
* Returns the communicator that sent the invocation.
*
* @return the communicator
*/
public Communicator getCommunicator() {
return _communicator;
}
/**
* Returns the connection that was used to start the invocation, or null if this future was not
* obtained via an asynchronous connection invocation (such as flushBatchRequestsAsync).
*
* @return the connection
*/
public Connection getConnection() {
return null;
}
/**
* Returns the proxy that was used to start the asynchronous invocation, or null if this object
* was not obtained via an asynchronous proxy invocation.
*
* @return the proxy
*/
public ObjectPrx getProxy() {
return null;
}
/**
* Returns the name of the operation.
*
* @return the operation name
*/
public final String getOperation() {
return _operation;
}
/**
* Blocks the caller until the result of the invocation is available.
*/
public final void waitForCompleted() {
try {
join();
} catch (CompletionException completionException) {
var cause = completionException.getCause();
if (cause instanceof InterruptedException) {
throw new OperationInterruptedException(cause);
}
} catch (Exception ex) {}
}
/**
* When you start an asynchronous invocation, the Ice run time attempts to write the
* corresponding request to the client-side transport. If the transport cannot accept the
* request, the Ice run time queues the request for later transmission. This method returns true
* if, at the time it is called, the request has been written to the local transport (whether it
* was initially queued or not). Otherwise, if the request is still queued, this method returns
* false.
*
* @return true if the request has been sent, or false if the request is queued
*/
public final boolean isSent() {
synchronized (this) {
return (_state & StateSent) > 0;
}
}
/**
* Blocks the caller until the request has been written to the client-side transport.
*/
public final synchronized void waitForSent() {
while ((_state & StateSent) == 0 && _exception == null) {
try {
this.wait();
} catch (InterruptedException ex) {
throw new OperationInterruptedException(ex);
}
}
}
/**
* Returns true if a request was written to the client-side transport without first being
* queued. If the request was initially queued, this method returns false (independent of
* whether the request is still in the queue or has since been written to the client-side
* transport).
*
* @return true if the request was sent without being queued, or false otherwise
*/
public final boolean sentSynchronously() {
return _sentSynchronously; // No lock needed, immutable
}
/**
* Returns a future that completes when the entire request message has been accepted by the
* transport and executes the given action. The boolean value indicates whether the message was
* sent synchronously.
*
* @param action executed when the future is completed successfully or exceptionally
* @return a future that completes when the message has been handed off to the transport
*/
public final synchronized CompletableFuture<Boolean> whenSent(
BiConsumer<Boolean, ? super Throwable> action) {
if (_sentFuture == null) {
_sentFuture = new CompletableFuture<>();
}
CompletableFuture<Boolean> r = _sentFuture.whenComplete(action);
//
// Check if the request has already been sent.
//
if (((_state & StateSent) > 0 || _exception != null) && !_sentFuture.isDone()) {
if (_exception != null) {
_sentFuture.completeExceptionally(_exception);
} else {
_sentFuture.complete(_sentSynchronously);
}
}
return r;
}
/**
* Returns a future that completes when the entire request message has been accepted by the
* transport and executes the given action using the default executor. The boolean value
* indicates whether the message was sent synchronously.
*
* @param action executed when the future is completed successfully or exceptionally
* @return a future that completes when the message has been handed off to the transport
*/
public final synchronized CompletableFuture<Boolean> whenSentAsync(
BiConsumer<Boolean, ? super Throwable> action) {
return whenSentAsync(action, null);
}
/**
* Returns a future that completes when the entire request message has been accepted by the
* transport and executes the given action using the executor. The boolean value indicates
* whether the message was sent synchronously.
*
* @param action executed when the future is completed successfully or exceptionally
* @param executor the executor to use for asynchronous execution
* @return a future that completes when the message has been handed off to the transport
*/
public final synchronized CompletableFuture<Boolean> whenSentAsync(
BiConsumer<Boolean, ? super Throwable> action, Executor executor) {
if (_sentFuture == null) {
_sentFuture = new CompletableFuture<>();
}
CompletableFuture<Boolean> r;
if (executor == null) {
r = _sentFuture.whenCompleteAsync(action);
} else {
r = _sentFuture.whenCompleteAsync(action, executor);
}
//
// Check if the request has already been sent.
//
if (((_state & StateSent) > 0 || _exception != null) && !_sentFuture.isDone()) {
if (_exception != null) {
_sentFuture.completeExceptionally(_exception);
} else {
_sentFuture.complete(_sentSynchronously);
}
}
return r;
}
/**
* Invokes the sent callback to notify that the request has been sent.
*/
public final void invokeSent() {
try {
synchronized (this) {
if (_sentFuture != null && !_sentFuture.isDone()) {
_sentFuture.complete(_sentSynchronously);
}
}
if (_doneInSent) {
markCompleted();
}
} catch (RuntimeException ex) {
warning(ex);
} catch (Error exc) {
error(exc);
if (!(exc instanceof AssertionError || exc instanceof OutOfMemoryError)) {
throw exc;
}
}
if (_observer != null) {
ObjectPrx proxy = getProxy();
if (proxy == null || !proxy.ice_isTwoway()) {
_observer.detach();
_observer = null;
}
}
}
/**
* Marks the invocation as completed. This is an abstract method that must be
* implemented by subclasses.
*/
abstract void markCompleted();
/**
* Invokes the completion callback to notify that the invocation has completed.
*/
public final void invokeCompleted() {
try {
if (_exception != null) {
synchronized (this) {
if (_sentFuture != null && !_sentFuture.isDone()) {
_sentFuture.completeExceptionally(_exception);
}
}
completeExceptionally(_exception);
} else {
markCompleted();
}
} catch (RuntimeException ex) {
warning(ex);
} catch (AssertionError exc) {
error(exc);
} catch (OutOfMemoryError exc) {
error(exc);
}
if (_observer != null) {
_observer.detach();
_observer = null;
}
}
/**
* Invokes the completion callback asynchronously.
*/
public final void invokeCompletedAsync() {
//
// CommunicatorDestroyedException is the only exception that can propagate directly from
// this method.
//
_instance
.clientThreadPool()
.dispatch(
new RunnableThreadPoolWorkItem(_cachedConnection) {
@Override
public void run() {
invokeCompleted();
}
});
}
/**
* Sets a cancellation handler for this invocation.
*
* @param handler the cancellation handler
*/
public synchronized void cancelable(final CancellationHandler handler) {
if (_cancellationException != null) {
try {
throw _cancellationException;
} finally {
_cancellationException = null;
}
}
_cancellationHandler = handler;
}
/**
* Caches message buffers for reuse. This method can be overridden by subclasses.
*/
void cacheMessageBuffers() {}
boolean sent(boolean done) {
synchronized (this) {
assert (_exception == null);
boolean alreadySent = (_state & StateSent) != 0;
_state |= StateSent;
if (done) {
_state |= StateDone | StateOK;
_cancellationHandler = null;
_doneInSent = true;
//
// For oneway requests after the data has been sent the buffers can be reused unless
// this is a collocated invocation. For collocated invocations the buffer won't be
// reused because it has already
// been marked as cached in invokeCollocated.
//
cacheMessageBuffers();
}
if (_synchronous && done) {
if (_observer != null) {
_observer.detach();
_observer = null;
}
markCompleted();
return false;
} else {
this.notifyAll();
boolean invoke = (!alreadySent && _sentFuture != null || done) && !_synchronous;
return invoke;
}
}
}
boolean finished(boolean ok, boolean invoke) {
synchronized (this) {
_state |= StateDone;
if (ok) {
_state |= StateOK;
}
_cancellationHandler = null;
invoke &= !_synchronous;
if (!invoke && _observer != null) {
_observer.detach();
_observer = null;
}
if (!invoke) {
if (_exception != null) {
completeExceptionally(_exception);
} else {
markCompleted();
}
return false;
} else {
this.notifyAll();
return invoke;
}
}
}
boolean finished(LocalException ex) {
synchronized (this) {
_state |= StateDone;
_exception = ex;
_cancellationHandler = null;
if (_observer != null) {
_observer.failed(ex.ice_id());
}
boolean invoke = !_synchronous;
if (!invoke && _observer != null) {
_observer.detach();
_observer = null;
}
if (!invoke) {
if (_exception != null) {
completeExceptionally(_exception);
} else {
markCompleted();
}
return false;
} else {
this.notifyAll();
return invoke;
}
}
}
/**
* Invokes the sent callback asynchronously.
*/
public final void invokeSentAsync() {
//
// This is called when it's not safe to call the sent callback synchronously from this
// thread. Instead the future is completed asynchronously from a client in the client thread
// pool.
//
dispatch(() -> invokeSent());
}
void cancel(LocalException ex) {
CancellationHandler handler;
synchronized (this) {
if (_cancellationHandler == null) {
_cancellationException = ex;
return;
}
handler = _cancellationHandler;
}
handler.asyncRequestCanceled((OutgoingAsyncBase) this, ex);
}
InvocationObserver getObserver() {
return _observer;
}
void dispatch(final Runnable runnable) {
try {
_instance
.clientThreadPool()
.dispatch(
new RunnableThreadPoolWorkItem(_cachedConnection) {
@Override
public void run() {
runnable.run();
}
});
} catch (CommunicatorDestroyedException ex) {}
}
private void warning(RuntimeException ex) {
if (_instance.initializationData().properties.getIcePropertyAsInt("Ice.Warn.AMICallback")
> 0) {
String s = "exception raised by AMI callback:\n" + Ex.toString(ex);
_instance.initializationData().logger.warning(s);
}
}
private void error(Error error) {
String s = "error raised by AMI callback:\n" + Ex.toString(error);
_instance.initializationData().logger.error(s);
}
// While package-private, the fields below should only be accessed by subclasses.
final Instance _instance;
InvocationObserver _observer;
Connection _cachedConnection;
boolean _sentSynchronously;
boolean _doneInSent;
// True if this AMI request is being used for a generated synchronous invocation.
boolean _synchronous;
CompletableFuture<Boolean> _sentFuture;
final Communicator _communicator;
final String _operation;
LocalException _exception;
private CancellationHandler _cancellationHandler;
private LocalException _cancellationException;
static final byte StateOK = 0x1;
static final byte StateDone = 0x2;
static final byte StateSent = 0x4;
static final byte StateCachedBuffers = 0x08;
byte _state;
}