CommunicatorFlushBatch.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import com.zeroc.Ice.Instrumentation.InvocationObserver;
import java.util.concurrent.ExecutionException;
class CommunicatorFlushBatch extends InvocationFuture<Void> {
public CommunicatorFlushBatch(Communicator communicator, Instance instance) {
super(communicator, instance, "flushBatchRequests");
//
// _useCount is initialized to 1 to prevent premature callbacks.
// The caller must invoke ready() after all flush requests have
// been initiated.
//
_useCount = 1;
}
@Override
protected void markCompleted() {
complete(null);
}
public void flushConnection(final ConnectionI con, final CompressBatch compressBatch) {
class FlushBatch extends OutgoingAsyncBase<Void> {
public FlushBatch() {
super(
CommunicatorFlushBatch.this.getCommunicator(),
CommunicatorFlushBatch.this._instance,
CommunicatorFlushBatch.this.getOperation());
}
@Override
protected void markCompleted() {
assert false;
}
@Override
public boolean sent() {
if (_childObserver != null) {
_childObserver.detach();
_childObserver = null;
}
doCheck(false);
return false;
}
@Override
public boolean completed(LocalException ex) {
if (_childObserver != null) {
_childObserver.failed(ex.ice_id());
_childObserver.detach();
_childObserver = null;
}
doCheck(false);
return false;
}
@Override
protected InvocationObserver getObserver() {
return CommunicatorFlushBatch.this._observer;
}
}
synchronized (this) {
++_useCount;
}
try {
final FlushBatch flushBatch = new FlushBatch();
final BatchRequestQueue.SwapResult r =
con.getBatchRequestQueue().swap(flushBatch.getOs());
if (r == null) {
flushBatch.sent();
} else {
boolean comp = false;
if (compressBatch == CompressBatch.Yes) {
comp = true;
} else if (compressBatch == CompressBatch.No) {
comp = false;
} else {
comp = r.compress;
}
con.sendAsyncRequest(flushBatch, comp, false, r.batchRequestNum);
}
} catch (RetryException ex) {
doCheck(false);
throw ex.get();
} catch (LocalException ex) {
doCheck(false);
throw ex;
}
}
public void invoke(CompressBatch compressBatch) {
_observer = ObserverHelper.get(_instance, "flushBatchRequests");
_instance.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this);
_instance.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this);
doCheck(true);
}
public void waitForResponse() {
try {
get();
} catch (InterruptedException ex) {
throw new OperationInterruptedException(ex);
} catch (ExecutionException ee) {
try {
throw ee.getCause().fillInStackTrace();
} catch (RuntimeException ex /* Includes LocalException */) {
throw ex;
} catch (Throwable ex) {
throw new UnknownException(ex);
}
}
}
private void doCheck(boolean userThread) {
synchronized (this) {
assert (_useCount > 0);
if (--_useCount > 0) {
return;
}
}
if (sent(true)) {
if (userThread) {
_sentSynchronously = true;
invokeSent();
} else {
invokeSentAsync();
}
}
}
private int _useCount;
}