ProxyOutgoingAsyncBase.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
//
// Base class for proxy based invocations. This class handles the
// retry for proxy invocations. It also ensures the child observer is
// correct notified of failures and make sure the retry task is
// correctly canceled when the invocation completes.
//
abstract class ProxyOutgoingAsyncBase<T> extends OutgoingAsyncBase<T> {
public abstract int invokeRemote(ConnectionI con, boolean compress, boolean response)
throws RetryException;
public abstract int invokeCollocated(CollocatedRequestHandler handler);
public boolean isBatch() {
return _proxy._getReference().isBatch();
}
@Override
public ObjectPrx getProxy() {
return _proxy;
}
@Override
public boolean completed(InputStream is) {
//
// NOTE: this method is called from ConnectionI.parseMessage
// with the connection locked. Therefore, it must not invoke
// any user callbacks.
//
assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
if (_childObserver != null) {
_childObserver.reply(is.size() - Protocol.headerSize - 4);
_childObserver.detach();
_childObserver = null;
}
try {
// convert the signed byte into a positive int
int replyStatusInt = is.readByte() & 0xFF;
var replyStatus = ReplyStatus.valueOf(replyStatusInt);
if (replyStatus != null) {
//CHECKSTYLE.OFF: FallThrough
switch (replyStatus) {
case Ok:
break;
case UserException:
if (_observer != null) {
_observer.userException();
}
break;
case ObjectNotExist:
case FacetNotExist:
case OperationNotExist:
Identity id = Identity.ice_read(is);
//
// For compatibility with the old FacetPath.
//
String[] facetPath = is.readStringSeq();
String facet;
if (facetPath.length > 0) {
if (facetPath.length > 1) {
throw new MarshalException(
"Received invalid facet path with '"
+ facetPath.length
+ "' elements.");
}
facet = facetPath[0];
} else {
facet = "";
}
String operation = is.readString();
switch (replyStatus) {
case ObjectNotExist -> throw new ObjectNotExistException(id, facet, operation);
case FacetNotExist -> throw new FacetNotExistException(id, facet, operation);
default -> throw new OperationNotExistException(id, facet, operation);
}
default:
String message = is.readString();
switch (replyStatus) {
case UnknownException -> throw new UnknownException(message);
case UnknownLocalException -> throw new UnknownLocalException(message);
case UnknownUserException -> throw new UnknownUserException(message);
default -> throw new DispatchException(replyStatusInt, message);
}
}
//CHECKSTYLE.ON: FallThrough
return finished(replyStatus == ReplyStatus.Ok, true);
} else {
// Unknown reply status, like the last default case above:
throw new DispatchException(replyStatusInt, is.readString());
}
} catch (LocalException ex) {
return completed(ex);
}
}
@Override
public boolean completed(LocalException exc) {
if (_childObserver != null) {
_childObserver.failed(exc.ice_id());
_childObserver.detach();
_childObserver = null;
}
_cachedConnection = null;
//
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
//
try {
// It's important to let the retry queue do the retry even if
// the retry interval is 0. This method can be called with the
// connection locked so we can't just retry here.
_instance.retryQueue().add(this, handleRetryAfterException(exc));
return false;
} catch (LocalException ex) {
return finished(ex); // No retries, we're done
}
}
public void retryException() {
try {
// Clear request handler and always retry.
_proxy._getRequestHandlerCache().clearCachedRequestHandler(_handler);
// It's important to let the retry queue do the retry. This is
// called from the connect request handler and the retry might
// require could end up waiting for the flush of the
// connection to be done.
_instance.retryQueue().add(this, 0);
} catch (LocalException ex) {
if (completed(ex)) {
invokeCompletedAsync();
}
}
}
public void retry() {
invokeImpl(false);
}
public void abort(LocalException ex) {
assert (_childObserver == null);
if (finished(ex)) {
invokeCompletedAsync();
} else if (ex instanceof CommunicatorDestroyedException) {
//
// If it's a communicator destroyed exception, don't swallow
// it but instead notify the user thread. Even if no callback
// was provided.
//
throw ex;
}
}
protected ProxyOutgoingAsyncBase(_ObjectPrxI prx, String op) {
super(prx.ice_getCommunicator(), prx._getReference().getInstance(), op);
_proxy = prx;
_mode = OperationMode.Normal;
_cnt = 0;
_sent = false;
}
protected ProxyOutgoingAsyncBase(_ObjectPrxI prx, String op, OutputStream os) {
super(prx.ice_getCommunicator(), prx._getReference().getInstance(), op, os);
_proxy = prx;
_mode = OperationMode.Normal;
_cnt = 0;
_sent = false;
}
protected void invokeImpl(boolean userThread) {
try {
if (userThread) {
Duration invocationTimeout = _proxy._getReference().getInvocationTimeout();
if (invocationTimeout.compareTo(Duration.ZERO) > 0) {
_timerFuture =
_instance
.timer()
.schedule(
() -> {
cancel(new InvocationTimeoutException());
},
invocationTimeout.toMillis(),
TimeUnit.MILLISECONDS);
}
} else {
// If not called from the user thread, it's called from the retry queue.
if (_observer != null) {
_observer.retried();
}
}
while (true) {
try {
_sent = false;
_handler = _proxy._getRequestHandlerCache().getRequestHandler();
int status = _handler.sendAsyncRequest(this);
if ((status & AsyncStatus.Sent) > 0) {
if (userThread) {
_sentSynchronously = true;
if ((status & AsyncStatus.InvokeSentCallback) > 0) {
invokeSent(); // Call the sent callback from the user thread.
}
} else {
if ((status & AsyncStatus.InvokeSentCallback) > 0) {
invokeSentAsync(); // Call the sent callback from a client thread
// pool thread.
}
}
}
return; // We're done!
} catch (RetryException ex) {
// Clear request handler and always retry.
_proxy._getRequestHandlerCache().clearCachedRequestHandler(_handler);
} catch (LocalException ex) {
if (_childObserver != null) {
_childObserver.failed(ex.ice_id());
_childObserver.detach();
_childObserver = null;
}
final int interval = handleRetryAfterException(ex);
if (interval > 0) {
_instance.retryQueue().add(this, interval);
return;
} else if (_observer != null) {
_observer.retried();
}
}
}
} catch (LocalException ex) {
// If called from the user thread we re-throw: the exception
// will be caught by the caller and handled using abort().
if (userThread) {
throw ex;
} else if (finished(ex)) {
// No retries, we're done
invokeCompletedAsync();
}
}
}
@Override
protected boolean sent(boolean done) {
_sent = true;
if (done) {
if (_timerFuture != null) {
_timerFuture.cancel(false);
_timerFuture = null;
}
}
return super.sent(done);
}
@Override
protected boolean finished(LocalException ex) {
if (_timerFuture != null) {
_timerFuture.cancel(false);
_timerFuture = null;
}
return super.finished(ex);
}
@Override
protected boolean finished(boolean ok, boolean invoke) {
if (_timerFuture != null) {
_timerFuture.cancel(false);
_timerFuture = null;
}
return super.finished(ok, invoke);
}
private int handleRetryAfterException(LocalException ex) {
// Clear the request handler
_proxy._getRequestHandlerCache().clearCachedRequestHandler(_handler);
// We only retry local exceptions.
//
// A CloseConnectionException indicates graceful server shutdown, and is therefore
// always repeatable without violating "at-most-once". That's because by sending a
// close connection message, the server guarantees that all outstanding requests
// can safely be repeated.
//
// An ObjectNotExistException can always be retried as well without violating
// "at-most-once" (see the implementation of the checkRetryAfterException method
// below for the reasons why it can be useful).
//
// If the request didn't get sent or if it's non-mutating or idempotent it can
// also always be retried if the retry count isn't reached.
boolean shouldRetry =
ex instanceof LocalException
&& (!_sent
|| _mode == OperationMode.Nonmutating
|| _mode == OperationMode.Idempotent
|| ex instanceof CloseConnectionException
|| ex instanceof ObjectNotExistException);
if (shouldRetry) {
try {
return checkRetryAfterException((LocalException) ex);
} catch (CommunicatorDestroyedException exc) {
throw ex; // The communicator is already destroyed, so we cannot retry.
}
} else {
throw ex; // Retry could break at-most-once semantics, don't retry.
}
}
private int checkRetryAfterException(LocalException ex) {
Reference ref = _proxy._getReference();
Instance instance = ref.getInstance();
TraceLevels traceLevels = instance.traceLevels();
Logger logger = instance.initializationData().logger;
int[] retryIntervals = instance.retryIntervals();
// We don't retry batch requests because the exception might have caused
// all the requests batched with the connection to be aborted and we
// want the application to be notified.
if (ref.isBatch()) {
throw ex;
}
// If it's a fixed proxy, retrying isn't useful as the proxy is tied to
// the connection and the request will fail with the exception.
if (ref instanceof FixedReference) {
throw ex;
}
if (ex instanceof ObjectNotExistException) {
ObjectNotExistException one = (ObjectNotExistException) ex;
if (ref.getRouterInfo() != null && "ice_add_proxy".equals(one.operation)) {
// If we have a router, an ObjectNotExistException with an
// operation name "ice_add_proxy" indicates to the client
// that the router isn't aware of the proxy (for example,
// because it was evicted by the router). In this case, we
// must *always* retry, so that the missing proxy is added
// to the router.
ref.getRouterInfo().clearCache(ref);
if (traceLevels.retry >= 1) {
String s = "retrying operation call to add proxy to router\n" + ex.toString();
logger.trace(traceLevels.retryCat, s);
}
return 0; // We must always retry, so we don't look at the retry count.
} else if (ref.isIndirect()) {
// We retry ObjectNotExistException if the reference is indirect.
if (ref.isWellKnown()) {
LocatorInfo li = ref.getLocatorInfo();
if (li != null) {
li.clearCache(ref);
}
}
} else {
// For all other cases, we don't retry ObjectNotExistException.
throw ex;
}
} else if (ex instanceof RequestFailedException) {
// For all other cases, we don't retry ObjectNotExistException
throw ex;
}
// There is no point in retrying an operation that resulted in a
// MarshalException. This must have been raised locally (because
// if it happened in a server it would result in an
// UnknownLocalException instead), which means there was a problem
// in this process that will not change if we try again.
//
// A likely cause for a MarshalException is exceeding the
// maximum message size. For example, a client can attempt to send
// a message that exceeds the maximum memory size, or accumulate
// enough batch requests without flushing that the maximum size is
// reached.
//
// This latter case is especially problematic, because if we were
// to retry a batch request after a MarshalException, we would in
// fact silently discard the accumulated requests and allow new
// batch requests to accumulate. If the subsequent batched
// requests do not exceed the maximum message size, it appears to
// the client that all of the batched requests were accepted, when
// in reality only the last few are actually sent.
if (ex instanceof MarshalException) {
throw ex;
}
// Don't retry if the communicator is destroyed or object adapter is deactivated/destroyed
if (ex instanceof CommunicatorDestroyedException
|| ex instanceof ObjectAdapterDeactivatedException
|| ex instanceof ObjectAdapterDestroyedException) {
throw ex;
}
// Don't retry if the connection was closed by the application.
if (ex instanceof ConnectionAbortedException) {
if (((ConnectionAbortedException) ex).closedByApplication) {
throw ex;
}
}
if (ex instanceof ConnectionClosedException) {
if (((ConnectionClosedException) ex).closedByApplication) {
throw ex;
}
}
// Don't retry invocation timeouts.
if (ex instanceof InvocationTimeoutException || ex instanceof InvocationCanceledException) {
throw ex;
}
// Don't retry on OperationInterruptedException.
if (ex instanceof OperationInterruptedException) {
throw ex;
}
++_cnt;
assert (_cnt > 0);
int interval;
if (_cnt == (retryIntervals.length + 1) && ex instanceof CloseConnectionException) {
// Always retry a CloseConnectionException at least once, even if the retry limit is
// reached.
interval = 0;
} else if (_cnt > retryIntervals.length) {
if (traceLevels.retry >= 1) {
String s =
"cannot retry operation call because retry limit has been exceeded\n"
+ ex.toString();
logger.trace(traceLevels.retryCat, s);
}
throw ex;
} else {
interval = retryIntervals[_cnt - 1];
}
if (traceLevels.retry >= 1) {
String s = "retrying operation call";
if (interval > 0) {
s += " in " + interval + "ms";
}
s += " because of exception\n" + ex;
logger.trace(traceLevels.retryCat, s);
}
return interval;
}
protected void prepare(Map<String, String> ctx) {
Reference ref = _proxy._getReference();
Protocol.checkSupportedProtocol(Protocol.getCompatibleProtocol(ref.getProtocol()));
_observer = ObserverHelper.get(_proxy, _operation, ctx == null ? _emptyContext : ctx);
if (ref.isBatch()) {
ref.getBatchRequestQueue().prepareBatchRequest(_os);
} else {
_os.writeBlob(Protocol.requestHdr);
}
ref.getIdentity().ice_writeMembers(_os);
//
// For compatibility with the old FacetPath.
//
String facet = ref.getFacet();
if (facet == null || facet.isEmpty()) {
_os.writeStringSeq(null);
} else {
String[] facetPath = {facet};
_os.writeStringSeq(facetPath);
}
_os.writeString(_operation);
_os.writeByte((byte) _mode.value());
if (ctx != ObjectPrx.noExplicitContext) {
//
// Explicit context
//
ContextHelper.write(_os, ctx);
} else {
//
// Implicit context
//
ImplicitContextI implicitContext = ref.getInstance().getImplicitContext();
Map<String, String> prxContext = ref.getContext();
if (implicitContext == null) {
ContextHelper.write(_os, prxContext);
} else {
implicitContext.write(prxContext, _os);
}
}
}
protected final _ObjectPrxI _proxy;
protected RequestHandler _handler;
protected OperationMode _mode;
private Future<?> _timerFuture;
private int _cnt;
private boolean _sent;
private static final Map<String, String> _emptyContext = new HashMap<>();
}