< Summary

Information
Class: Ice.Internal.ProxyOutgoingAsyncBase
Assembly: Ice
File(s): /_/csharp/src/Ice/Internal/OutgoingAsync.cs
Tag: 91_21789722663
Line coverage
83%
Covered lines: 118
Uncovered lines: 24
Coverable lines: 142
Total lines: 1560
Line coverage: 83%
Branch coverage
80%
Covered branches: 89
Total branches: 110
Branch coverage: 80.9%
Method coverage
91%
Covered methods: 11
Total methods: 12
Method coverage: 91.6%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
exception(...)100%22100%
retryException()0%620%
retry()100%11100%
abort(...)75%4.13480%
.ctor(...)100%11100%
invokeImpl(...)83.33%27.92481.08%
sentImpl(...)75%4.13480%
exceptionImpl(...)100%22100%
responseImpl(...)100%22100%
runTimerTask()100%11100%
handleRetryAfterException(...)90%1010100%
checkRetryAfterException(...)80%68.786086.54%

File(s)

/_/csharp/src/Ice/Internal/OutgoingAsync.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4
 5namespace Ice.Internal;
 6
 7public interface OutgoingAsyncCompletionCallback
 8{
 9    void init(OutgoingAsyncBase og);
 10
 11    bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og);
 12
 13    bool handleException(Ice.Exception ex, OutgoingAsyncBase og);
 14
 15    bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og);
 16
 17    void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og);
 18
 19    void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og);
 20
 21    void handleInvokeResponse(bool ok, OutgoingAsyncBase og);
 22}
 23
 24public abstract class OutgoingAsyncBase
 25{
 26    public virtual bool sent() => sentImpl(true);
 27
 28    public virtual bool exception(Ice.Exception ex) => exceptionImpl(ex);
 29
 30    public virtual bool response()
 31    {
 32        Debug.Assert(false); // Must be overridden by request that can handle responses
 33        return false;
 34    }
 35
 36    public void invokeSentAsync()
 37    {
 38        //
 39        // This is called when it's not safe to call the sent callback
 40        // synchronously from this thread. Instead the exception callback
 41        // is called asynchronously from the client thread pool.
 42        //
 43        try
 44        {
 45            instance_.clientThreadPool().execute(invokeSent, cachedConnection_);
 46        }
 47        catch (Ice.CommunicatorDestroyedException)
 48        {
 49        }
 50    }
 51
 52    public void invokeExceptionAsync() =>
 53        //
 54        // CommunicatorDestroyedCompleted is the only exception that can propagate directly
 55        // from this method.
 56        //
 57        instance_.clientThreadPool().execute(invokeException, cachedConnection_);
 58
 59    public void invokeResponseAsync() =>
 60        //
 61        // CommunicatorDestroyedCompleted is the only exception that can propagate directly
 62        // from this method.
 63        //
 64        instance_.clientThreadPool().execute(invokeResponse, cachedConnection_);
 65
 66    public void invokeSent()
 67    {
 68        try
 69        {
 70            _completionCallback.handleInvokeSent(sentSynchronously_, _doneInSent, _alreadySent, this);
 71        }
 72        catch (System.Exception ex)
 73        {
 74            warning(ex);
 75        }
 76
 77        if (observer_ != null && _doneInSent)
 78        {
 79            observer_.detach();
 80            observer_ = null;
 81        }
 82    }
 83
 84    public void invokeException()
 85    {
 86        try
 87        {
 88            try
 89            {
 90                throw _ex;
 91            }
 92            catch (Ice.Exception ex)
 93            {
 94                _completionCallback.handleInvokeException(ex, this);
 95            }
 96        }
 97        catch (System.Exception ex)
 98        {
 99            warning(ex);
 100        }
 101
 102        observer_?.detach();
 103        observer_ = null;
 104    }
 105
 106    public void invokeResponse()
 107    {
 108        if (_ex != null)
 109        {
 110            invokeException();
 111            return;
 112        }
 113
 114        try
 115        {
 116            try
 117            {
 118                _completionCallback.handleInvokeResponse((state_ & StateOK) != 0, this);
 119            }
 120            catch (Ice.Exception ex)
 121            {
 122                if (_completionCallback.handleException(ex, this))
 123                {
 124                    _completionCallback.handleInvokeException(ex, this);
 125                }
 126            }
 127            catch (System.AggregateException ex)
 128            {
 129                throw ex.InnerException;
 130            }
 131        }
 132        catch (System.Exception ex)
 133        {
 134            warning(ex);
 135        }
 136
 137        observer_?.detach();
 138        observer_ = null;
 139    }
 140
 141    public virtual void cancelable(CancellationHandler handler)
 142    {
 143        lock (mutex_)
 144        {
 145            if (_cancellationException != null)
 146            {
 147                try
 148                {
 149                    throw _cancellationException;
 150                }
 151                catch (Ice.LocalException)
 152                {
 153                    _cancellationException = null;
 154                    throw;
 155                }
 156            }
 157            _cancellationHandler = handler;
 158        }
 159    }
 160
 161    public void cancel() => cancel(new Ice.InvocationCanceledException());
 162
 163    public void attachRemoteObserver(Ice.ConnectionInfo info, Ice.Endpoint endpt, int requestId)
 164    {
 165        Ice.Instrumentation.InvocationObserver observer = getObserver();
 166        if (observer != null)
 167        {
 168            int size = os_.size() - Protocol.headerSize - 4;
 169            childObserver_ = observer.getRemoteObserver(info, endpt, requestId, size);
 170            childObserver_?.attach();
 171        }
 172    }
 173
 174    public void attachCollocatedObserver(Ice.ObjectAdapter adapter, int requestId)
 175    {
 176        Ice.Instrumentation.InvocationObserver observer = getObserver();
 177        if (observer != null)
 178        {
 179            int size = os_.size() - Protocol.headerSize - 4;
 180            childObserver_ = observer.getCollocatedObserver(adapter, requestId, size);
 181            childObserver_?.attach();
 182        }
 183    }
 184
 185    public Ice.OutputStream getOs() => os_;
 186
 187    public Ice.InputStream getIs() => is_;
 188
 189    public virtual void throwUserException()
 190    {
 191    }
 192
 193    public virtual void cacheMessageBuffers()
 194    {
 195    }
 196
 197    public bool isSynchronous() => synchronous_;
 198
 199    protected OutgoingAsyncBase(
 200        Instance instance,
 201        OutgoingAsyncCompletionCallback completionCallback,
 202        Ice.OutputStream os = null,
 203        Ice.InputStream iss = null)
 204    {
 205        instance_ = instance;
 206        sentSynchronously_ = false;
 207        synchronous_ = false;
 208        _doneInSent = false;
 209        _alreadySent = false;
 210        state_ = 0;
 211        os_ = os ?? new OutputStream(Protocol.currentProtocolEncoding, instance.defaultsAndOverrides().defaultFormat);
 212        is_ = iss ?? new Ice.InputStream(instance, Protocol.currentProtocolEncoding);
 213        _completionCallback = completionCallback;
 214        _completionCallback?.init(this);
 215    }
 216
 217    protected virtual bool sentImpl(bool done)
 218    {
 219        lock (mutex_)
 220        {
 221            _alreadySent = (state_ & StateSent) > 0;
 222            state_ |= StateSent;
 223            if (done)
 224            {
 225                _doneInSent = true;
 226                childObserver_?.detach();
 227                childObserver_ = null;
 228                _cancellationHandler = null;
 229
 230                //
 231                // For oneway requests after the data has been sent
 232                // the buffers can be reused unless this is a
 233                // collocated invocation. For collocated invocations
 234                // the buffer won't be reused because it has already
 235                // been marked as cached in invokeCollocated.
 236                //
 237                cacheMessageBuffers();
 238            }
 239
 240            bool invoke = _completionCallback.handleSent(done, _alreadySent, this);
 241            if (!invoke && _doneInSent && observer_ != null)
 242            {
 243                observer_.detach();
 244                observer_ = null;
 245            }
 246            return invoke;
 247        }
 248    }
 249
 250    protected virtual bool exceptionImpl(Ice.Exception ex)
 251    {
 252        lock (mutex_)
 253        {
 254            _ex = ex;
 255            if (childObserver_ != null)
 256            {
 257                childObserver_.failed(ex.ice_id());
 258                childObserver_.detach();
 259                childObserver_ = null;
 260            }
 261            _cancellationHandler = null;
 262
 263            observer_?.failed(ex.ice_id());
 264            bool invoke = _completionCallback.handleException(ex, this);
 265            if (!invoke && observer_ != null)
 266            {
 267                observer_.detach();
 268                observer_ = null;
 269            }
 270            return invoke;
 271        }
 272    }
 273
 274    protected virtual bool responseImpl(bool userThread, bool ok, bool invoke)
 275    {
 276        lock (mutex_)
 277        {
 278            if (ok)
 279            {
 280                state_ |= StateOK;
 281            }
 282
 283            _cancellationHandler = null;
 284
 285            try
 286            {
 287                invoke &= _completionCallback.handleResponse(userThread, ok, this);
 288            }
 289            catch (Ice.Exception ex)
 290            {
 291                _ex = ex;
 292                invoke = _completionCallback.handleException(ex, this);
 293            }
 294            if (!invoke && observer_ != null)
 295            {
 296                observer_.detach();
 297                observer_ = null;
 298            }
 299            return invoke;
 300        }
 301    }
 302
 303    protected void cancel(Ice.LocalException ex)
 304    {
 305        CancellationHandler handler;
 306        {
 307            lock (mutex_)
 308            {
 309                if (_cancellationHandler == null)
 310                {
 311                    _cancellationException = ex;
 312                    return;
 313                }
 314                handler = _cancellationHandler;
 315            }
 316        }
 317        handler.asyncRequestCanceled(this, ex);
 318    }
 319
 320    private void warning(System.Exception ex)
 321    {
 322        if (instance_.initializationData().properties.getIcePropertyAsInt("Ice.Warn.AMICallback") > 0)
 323        {
 324            instance_.initializationData().logger.warning("exception raised by AMI callback:\n" + ex);
 325        }
 326    }
 327
 328    //
 329    // This virtual method is necessary for the communicator flush
 330    // batch requests implementation.
 331    //
 332    protected virtual Ice.Instrumentation.InvocationObserver getObserver() => observer_;
 333
 334    public bool sentSynchronously() => sentSynchronously_;
 335
 336    protected Instance instance_;
 337    protected Ice.Connection cachedConnection_;
 338    protected bool sentSynchronously_;
 339    protected bool synchronous_;
 340    protected int state_;
 341
 342    protected Ice.Instrumentation.InvocationObserver observer_;
 343    protected Ice.Instrumentation.ChildInvocationObserver childObserver_;
 344
 345    protected Ice.OutputStream os_;
 346    protected Ice.InputStream is_;
 347
 348    protected readonly object mutex_ = new();
 349
 350    private bool _doneInSent;
 351    private bool _alreadySent;
 352    private Ice.Exception _ex;
 353    private Ice.LocalException _cancellationException;
 354    private CancellationHandler _cancellationHandler;
 355    private readonly OutgoingAsyncCompletionCallback _completionCallback;
 356
 357    protected const int StateOK = 0x1;
 358    protected const int StateDone = 0x2;
 359    protected const int StateSent = 0x4;
 360    protected const int StateEndCalled = 0x8;
 361    protected const int StateCachedBuffers = 0x10;
 362
 363    public const int AsyncStatusQueued = 0;
 364    public const int AsyncStatusSent = 1;
 365    public const int AsyncStatusInvokeSentCallback = 2;
 366}
 367
 368//
 369// Base class for proxy based invocations. This class handles the
 370// retry for proxy invocations. It also ensures the child observer is
 371// correct notified of failures and make sure the retry task is
 372// correctly canceled when the invocation completes.
 373//
 374public abstract class ProxyOutgoingAsyncBase : OutgoingAsyncBase, TimerTask
 375{
 376    public abstract int invokeRemote(Ice.ConnectionI connection, bool compress, bool response);
 377
 378    public abstract int invokeCollocated(CollocatedRequestHandler handler);
 379
 380    public override bool exception(Ice.Exception ex)
 381    {
 1382        if (childObserver_ != null)
 383        {
 1384            childObserver_.failed(ex.ice_id());
 1385            childObserver_.detach();
 1386            childObserver_ = null;
 387        }
 388
 1389        cachedConnection_ = null;
 390
 391        //
 392        // NOTE: at this point, synchronization isn't needed, no other threads should be
 393        // calling on the callback.
 394        //
 395        try
 396        {
 397            //
 398            // It's important to let the retry queue do the retry even if
 399            // the retry interval is 0. This method can be called with the
 400            // connection locked so we can't just retry here.
 401            //
 1402            instance_.retryQueue().add(this, handleRetryAfterException(ex));
 1403            return false;
 404        }
 1405        catch (Ice.Exception retryEx)
 406        {
 1407            return exceptionImpl(retryEx); // No retries, we're done
 408        }
 1409    }
 410
 411    public void retryException()
 412    {
 413        try
 414        {
 415            // It's important to let the retry queue do the retry. This is
 416            // called from the connect request handler and the retry might
 417            // require could end up waiting for the flush of the
 418            // connection to be done.
 419
 0420            proxy_.iceGetRequestHandlerCache().clearCachedRequestHandler(handler_);
 0421            instance_.retryQueue().add(this, 0);
 0422        }
 0423        catch (Ice.Exception ex)
 424        {
 0425            if (exception(ex))
 426            {
 0427                invokeExceptionAsync();
 428            }
 0429        }
 0430    }
 431
 1432    public void retry() => invokeImpl(false);
 433
 434    public void abort(Ice.Exception ex)
 435    {
 436        Debug.Assert(childObserver_ == null);
 1437        if (exceptionImpl(ex))
 438        {
 1439            invokeExceptionAsync();
 440        }
 1441        else if (ex is Ice.CommunicatorDestroyedException)
 442        {
 443            //
 444            // If it's a communicator destroyed exception, swallow
 445            // it but instead notify the user thread. Even if no callback
 446            // was provided.
 447            //
 0448            throw ex;
 449        }
 1450    }
 451
 452    protected ProxyOutgoingAsyncBase(
 453        Ice.ObjectPrxHelperBase prx,
 454        OutgoingAsyncCompletionCallback completionCallback,
 455        Ice.OutputStream os = null,
 456        Ice.InputStream iss = null)
 1457        : base(prx.iceReference().getInstance(), completionCallback, os, iss)
 458    {
 1459        proxy_ = prx;
 1460        mode_ = Ice.OperationMode.Normal;
 1461        _cnt = 0;
 1462        _sent = false;
 1463    }
 464
 465    protected void invokeImpl(bool userThread)
 466    {
 467        try
 468        {
 1469            if (userThread)
 470            {
 1471                TimeSpan invocationTimeout = proxy_.iceReference().getInvocationTimeout();
 1472                if (invocationTimeout > TimeSpan.Zero)
 473                {
 1474                    instance_.timer().schedule(this, (long)invocationTimeout.TotalMilliseconds);
 475                }
 476            }
 477            else
 478            {
 1479                observer_?.retried();
 480            }
 481
 482            while (true)
 483            {
 484                try
 485                {
 1486                    _sent = false;
 1487                    handler_ = proxy_.iceGetRequestHandlerCache().requestHandler;
 1488                    int status = handler_.sendAsyncRequest(this);
 1489                    if ((status & AsyncStatusSent) != 0)
 490                    {
 1491                        if (userThread)
 492                        {
 1493                            sentSynchronously_ = true;
 1494                            if ((status & AsyncStatusInvokeSentCallback) != 0)
 495                            {
 1496                                invokeSent(); // Call the sent callback from the user thread.
 497                            }
 498                        }
 499                        else
 500                        {
 1501                            if ((status & AsyncStatusInvokeSentCallback) != 0)
 502                            {
 0503                                invokeSentAsync(); // Call the sent callback from a client thread pool thread.
 504                            }
 505                        }
 506                    }
 1507                    return; // We're done!
 508                }
 1509                catch (RetryException)
 510                {
 511                    // Clear request handler and always retry.
 1512                    proxy_.iceGetRequestHandlerCache().clearCachedRequestHandler(handler_);
 1513                }
 1514                catch (Ice.Exception ex)
 515                {
 1516                    if (childObserver_ != null)
 517                    {
 0518                        childObserver_.failed(ex.ice_id());
 0519                        childObserver_.detach();
 0520                        childObserver_ = null;
 521                    }
 1522                    int interval = handleRetryAfterException(ex);
 1523                    if (interval > 0)
 524                    {
 0525                        instance_.retryQueue().add(this, interval);
 0526                        return;
 527                    }
 528                    else
 529                    {
 1530                        observer_?.retried();
 531                    }
 1532                }
 533            }
 534        }
 1535        catch (Ice.Exception ex)
 536        {
 537            // If called from the user thread we re-throw, the exception will caught by the caller and handled using
 538            // abort.
 1539            if (userThread)
 540            {
 1541                throw;
 542            }
 1543            else if (exceptionImpl(ex)) // No retries, we're done
 544            {
 0545                invokeExceptionAsync();
 546            }
 1547        }
 1548    }
 549
 550    protected override bool sentImpl(bool done)
 551    {
 1552        _sent = true;
 1553        if (done)
 554        {
 1555            if (proxy_.iceReference().getInvocationTimeout() > TimeSpan.Zero)
 556            {
 0557                instance_.timer().cancel(this);
 558            }
 559        }
 1560        return base.sentImpl(done);
 561    }
 562
 563    protected override bool exceptionImpl(Ice.Exception ex)
 564    {
 1565        if (proxy_.iceReference().getInvocationTimeout() > TimeSpan.Zero)
 566        {
 1567            instance_.timer().cancel(this);
 568        }
 1569        return base.exceptionImpl(ex);
 570    }
 571
 572    protected override bool responseImpl(bool userThread, bool ok, bool invoke)
 573    {
 1574        if (proxy_.iceReference().getInvocationTimeout() > TimeSpan.Zero)
 575        {
 1576            instance_.timer().cancel(this);
 577        }
 1578        return base.responseImpl(userThread, ok, invoke);
 579    }
 580
 1581    public void runTimerTask() => cancel(new Ice.InvocationTimeoutException());
 582
 583    private int handleRetryAfterException(Ice.Exception ex)
 584    {
 585        // Clear the request handler
 1586        proxy_.iceGetRequestHandlerCache().clearCachedRequestHandler(handler_);
 587
 588        // We only retry local exception.
 589        //
 590        // A CloseConnectionException indicates graceful server shutdown, and is therefore
 591        // always repeatable without violating "at-most-once". That's because by sending a
 592        // close connection message, the server guarantees that all outstanding requests
 593        // can safely be repeated.
 594        //
 595        // An ObjectNotExistException can always be retried as well without violating
 596        // "at-most-once" (see the implementation of the checkRetryAfterException method
 597        // below for the reasons why it can be useful).
 598        //
 599        // If the request didn't get sent or if it's non-mutating or idempotent it can
 600        // also always be retried if the retry count isn't reached.
 1601        bool shouldRetry = ex is LocalException && (!_sent ||
 1602            mode_ is not OperationMode.Normal ||
 1603            ex is CloseConnectionException ||
 1604            ex is ObjectNotExistException);
 605
 1606        if (shouldRetry)
 607        {
 608            try
 609            {
 1610                return checkRetryAfterException((LocalException)ex);
 611            }
 1612            catch (CommunicatorDestroyedException)
 613            {
 1614                throw ex; // The communicator is already destroyed, so we cannot retry.
 615            }
 616        }
 617        else
 618        {
 1619            throw ex; // Retry could break at-most-once semantics, don't retry.
 620        }
 1621    }
 622
 623    private int checkRetryAfterException(Ice.LocalException ex)
 624    {
 1625        Reference @ref = proxy_.iceReference();
 1626        Instance instance = @ref.getInstance();
 627
 1628        TraceLevels traceLevels = instance.traceLevels();
 1629        Ice.Logger logger = instance.initializationData().logger!;
 630
 631        // We don't retry batch requests because the exception might have caused
 632        // the all the requests batched with the connection to be aborted and we
 633        // want the application to be notified.
 1634        if (@ref.getMode() == Reference.Mode.ModeBatchOneway || @ref.getMode() == Reference.Mode.ModeBatchDatagram)
 635        {
 1636            throw ex;
 637        }
 638
 639        // If it's a fixed proxy, retrying isn't useful as the proxy is tied to
 640        // the connection and the request will fail with the exception.
 1641        if (@ref is FixedReference)
 642        {
 1643            throw ex;
 644        }
 645
 1646        var one = ex as Ice.ObjectNotExistException;
 1647        if (one is not null)
 648        {
 1649            if (@ref.getRouterInfo() != null && one.operation == "ice_add_proxy")
 650            {
 651                // If we have a router, an ObjectNotExistException with an
 652                // operation name "ice_add_proxy" indicates to the client
 653                // that the router isn't aware of the proxy (for example,
 654                // because it was evicted by the router). In this case, we
 655                // must *always* retry, so that the missing proxy is added
 656                // to the router.
 657
 0658                @ref.getRouterInfo().clearCache(@ref);
 659
 0660                if (traceLevels.retry >= 1)
 661                {
 0662                    string s = "retrying operation call to add proxy to router\n" + ex;
 0663                    logger.trace(traceLevels.retryCat, s);
 664                }
 0665                return 0; // We must always retry, so we don't look at the retry count.
 666            }
 1667            else if (@ref.isIndirect())
 668            {
 669                // We retry ObjectNotExistException if the reference is indirect.
 670
 1671                if (@ref.isWellKnown())
 672                {
 1673                    LocatorInfo li = @ref.getLocatorInfo();
 1674                    li?.clearCache(@ref);
 675                }
 676            }
 677            else
 678            {
 679                // For all other cases, we don't retry ObjectNotExistException.
 1680                throw ex;
 681            }
 682        }
 1683        else if (ex is Ice.RequestFailedException)
 684        {
 1685            throw ex;
 686        }
 687
 688        // There is no point in retrying an operation that resulted in a
 689        // MarshalException. This must have been raised locally (because if
 690        // it happened in a server it would result in an UnknownLocalException
 691        // instead), which means there was a problem in this process that will
 692        // not change if we try again.
 693        //
 694        // A likely cause for a MarshalException is exceeding the
 695        // maximum message size. For example, a client can attempt to send a
 696        // message that exceeds the maximum memory size, or accumulate enough
 697        // batch requests without flushing that the maximum size is reached.
 698        //
 699        // This latter case is especially problematic, because if we were to
 700        // retry a batch request after a MarshalException, we would in fact
 701        // silently discard the accumulated requests and allow new batch
 702        // requests to accumulate. If the subsequent batched requests do not
 703        // exceed the maximum message size, it appears to the client that all
 704        // of the batched requests were accepted, when in reality only the
 705        // last few are actually sent.
 1706        if (ex is Ice.MarshalException)
 707        {
 0708            throw ex;
 709        }
 710
 711        // Don't retry if the communicator is destroyed, object adapter is deactivated,
 712        // or connection is closed by the application.
 1713        if (ex is CommunicatorDestroyedException ||
 1714           ex is ObjectAdapterDeactivatedException ||
 1715           ex is ObjectAdapterDestroyedException ||
 1716           (ex is ConnectionAbortedException connectionAbortedException &&
 1717            connectionAbortedException.closedByApplication) ||
 1718           (ex is ConnectionClosedException connectionClosedException &&
 1719            connectionClosedException.closedByApplication))
 720        {
 1721            throw ex;
 722        }
 723
 724        // Don't retry invocation timeouts.
 1725        if (ex is Ice.InvocationTimeoutException || ex is Ice.InvocationCanceledException)
 726        {
 1727            throw ex;
 728        }
 729
 1730        ++_cnt;
 731        Debug.Assert(_cnt > 0);
 732
 1733        int[] retryIntervals = instance.retryIntervals;
 734
 735        int interval;
 1736        if (_cnt == (retryIntervals.Length + 1) && ex is Ice.CloseConnectionException)
 737        {
 738            // A close connection exception is always retried at least once, even if the retry
 739            // limit is reached.
 0740            interval = 0;
 741        }
 1742        else if (_cnt > retryIntervals.Length)
 743        {
 1744            if (traceLevels.retry >= 1)
 745            {
 1746                string s = "cannot retry operation call because retry limit has been exceeded\n" + ex;
 1747                logger.trace(traceLevels.retryCat, s);
 748            }
 1749            throw ex;
 750        }
 751        else
 752        {
 1753            interval = retryIntervals[_cnt - 1];
 754        }
 755
 1756        if (traceLevels.retry >= 1)
 757        {
 1758            string s = "retrying operation call";
 1759            if (interval > 0)
 760            {
 1761                s += " in " + interval + "ms";
 762            }
 1763            s += " because of exception\n" + ex;
 1764            logger.trace(traceLevels.retryCat, s);
 765        }
 766
 1767        return interval;
 768    }
 769
 770    protected readonly Ice.ObjectPrxHelperBase proxy_;
 771    protected RequestHandler handler_;
 772    protected Ice.OperationMode mode_;
 773
 774    private int _cnt;
 775    private bool _sent;
 776}
 777
 778//
 779// Class for handling Slice operation invocations
 780//
 781public class OutgoingAsync : ProxyOutgoingAsyncBase
 782{
 783    public OutgoingAsync(
 784        Ice.ObjectPrxHelperBase prx,
 785        OutgoingAsyncCompletionCallback completionCallback,
 786        Ice.OutputStream os = null,
 787        Ice.InputStream iss = null)
 788        : base(prx, completionCallback, os, iss)
 789    {
 790        encoding_ = proxy_.iceReference().getEncoding();
 791        synchronous_ = false;
 792    }
 793
 794    public void prepare(string operation, Ice.OperationMode mode, Dictionary<string, string> context)
 795    {
 796        if (proxy_.iceReference().getProtocol() != Protocol.currentProtocol)
 797        {
 798            throw new FeatureNotSupportedException(
 799                $"Cannot send request using protocol version {proxy_.iceReference().getProtocol()}.");
 800        }
 801
 802        mode_ = mode;
 803
 804        observer_ = ObserverHelper.get(proxy_, operation, context);
 805
 806        if (proxy_.iceReference().isBatch)
 807        {
 808            proxy_.iceReference().batchRequestQueue.prepareBatchRequest(os_);
 809        }
 810        else
 811        {
 812            os_.writeBlob(Protocol.requestHdr);
 813        }
 814
 815        Reference rf = proxy_.iceReference();
 816
 817        Identity.ice_write(os_, rf.getIdentity());
 818
 819        //
 820        // For compatibility with the old FacetPath.
 821        //
 822        string facet = rf.getFacet();
 823        if (facet == null || facet.Length == 0)
 824        {
 825            os_.writeStringSeq(null);
 826        }
 827        else
 828        {
 829            string[] facetPath = { facet };
 830            os_.writeStringSeq(facetPath);
 831        }
 832
 833        os_.writeString(operation);
 834
 835        os_.writeByte((byte)mode);
 836
 837        if (context != null)
 838        {
 839            //
 840            // Explicit context
 841            //
 842            Ice.ContextHelper.write(os_, context);
 843        }
 844        else
 845        {
 846            //
 847            // Implicit context
 848            //
 849            Ice.ImplicitContextI implicitContext = rf.getInstance().getImplicitContext();
 850            Dictionary<string, string> prxContext = rf.getContext();
 851
 852            if (implicitContext == null)
 853            {
 854                Ice.ContextHelper.write(os_, prxContext);
 855            }
 856            else
 857            {
 858                implicitContext.write(prxContext, os_);
 859            }
 860        }
 861    }
 862
 863    public override bool sent() => sentImpl(!proxy_.ice_isTwoway()); // done = true if it's not a two-way proxy
 864
 865    public override bool response()
 866    {
 867        //
 868        // NOTE: this method is called from ConnectionI.parseMessage
 869        // with the connection locked. Therefore, it must not invoke
 870        // any user callbacks.
 871        //
 872        Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways.
 873
 874        if (childObserver_ != null)
 875        {
 876            childObserver_.reply(is_.size() - Protocol.headerSize - 4);
 877            childObserver_.detach();
 878            childObserver_ = null;
 879        }
 880
 881        try
 882        {
 883            // We can't (shouldn't) use the generated code to unmarshal a possibly unknown reply status.
 884            var replyStatus = (ReplyStatus)is_.readByte();
 885
 886            switch (replyStatus)
 887            {
 888                case ReplyStatus.Ok:
 889                    break;
 890
 891                case ReplyStatus.UserException:
 892                    observer_?.userException();
 893                    break;
 894
 895                case ReplyStatus.ObjectNotExist:
 896                case ReplyStatus.FacetNotExist:
 897                case ReplyStatus.OperationNotExist:
 898                {
 899                    var ident = new Ice.Identity(is_);
 900
 901                    //
 902                    // For compatibility with the old FacetPath.
 903                    //
 904                    string[] facetPath = is_.readStringSeq();
 905                    string facet;
 906                    if (facetPath.Length > 0)
 907                    {
 908                        if (facetPath.Length > 1)
 909                        {
 910                            throw new MarshalException(
 911                                $"Received invalid facet path with {facetPath.Length} elements.");
 912                        }
 913                        facet = facetPath[0];
 914                    }
 915                    else
 916                    {
 917                        facet = "";
 918                    }
 919
 920                    string operation = is_.readString();
 921                    throw replyStatus switch
 922                    {
 923                        ReplyStatus.ObjectNotExist => new ObjectNotExistException(ident, facet, operation),
 924                        ReplyStatus.FacetNotExist => new FacetNotExistException(ident, facet, operation),
 925                        _ => new OperationNotExistException(ident, facet, operation)
 926                    };
 927                }
 928
 929                default:
 930                {
 931                    string message = is_.readString();
 932                    throw replyStatus switch
 933                    {
 934                        ReplyStatus.UnknownException => new UnknownException(message),
 935                        ReplyStatus.UnknownLocalException => new UnknownLocalException(message),
 936                        ReplyStatus.UnknownUserException => new UnknownUserException(message),
 937                        _ => new DispatchException(replyStatus, message),
 938                    };
 939                }
 940            }
 941
 942            return responseImpl(false, replyStatus == ReplyStatus.Ok, true);
 943        }
 944        catch (Ice.Exception ex)
 945        {
 946            return exception(ex);
 947        }
 948    }
 949
 950    public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
 951    {
 952        cachedConnection_ = connection;
 953        return connection.sendAsyncRequest(this, compress, response, 0);
 954    }
 955
 956    public override int invokeCollocated(CollocatedRequestHandler handler)
 957    {
 958        // The stream cannot be cached if the proxy is not a twoway or there is an invocation timeout set.
 959        if (!proxy_.ice_isTwoway() || proxy_.iceReference().getInvocationTimeout() > TimeSpan.Zero)
 960        {
 961            // Disable caching by marking the streams as cached!
 962            state_ |= StateCachedBuffers;
 963        }
 964        return handler.invokeAsyncRequest(this, 0, synchronous_);
 965    }
 966
 967    public new void abort(Ice.Exception ex)
 968    {
 969        if (proxy_.iceReference().isBatch)
 970        {
 971            proxy_.iceReference().batchRequestQueue.abortBatchRequest(os_);
 972        }
 973
 974        base.abort(ex);
 975    }
 976
 977    protected void invoke(string operation, bool synchronous)
 978    {
 979        synchronous_ = synchronous;
 980        if (proxy_.iceReference().isBatch)
 981        {
 982            sentSynchronously_ = true;
 983            proxy_.iceReference().batchRequestQueue.finishBatchRequest(os_, proxy_, operation);
 984            responseImpl(true, true, false); // Don't call sent/completed callback for batch AMI requests
 985            return;
 986        }
 987
 988        // invokeImpl can throw
 989        invokeImpl(true); // userThread = true
 990    }
 991
 992    public void invoke(
 993        string operation,
 994        Ice.OperationMode mode,
 995        Ice.FormatType? format,
 996        Dictionary<string, string> context,
 997        bool synchronous,
 998        System.Action<Ice.OutputStream> write)
 999    {
 1000        try
 1001        {
 1002            prepare(operation, mode, context);
 1003            if (write != null)
 1004            {
 1005                os_.startEncapsulation(encoding_, format);
 1006                write(os_);
 1007                os_.endEncapsulation();
 1008            }
 1009            else
 1010            {
 1011                os_.writeEmptyEncapsulation(encoding_);
 1012            }
 1013            invoke(operation, synchronous);
 1014        }
 1015        catch (Ice.Exception ex)
 1016        {
 1017            abort(ex);
 1018        }
 1019    }
 1020
 1021    public override void throwUserException()
 1022    {
 1023        try
 1024        {
 1025            is_.startEncapsulation();
 1026            is_.throwException();
 1027        }
 1028        catch (UserException ex)
 1029        {
 1030            is_.endEncapsulation();
 1031            userException_?.Invoke(ex);
 1032            throw UnknownUserException.fromTypeId(ex.ice_id());
 1033        }
 1034    }
 1035
 1036    public override void cacheMessageBuffers()
 1037    {
 1038        if (proxy_.iceReference().getInstance().cacheMessageBuffers() > 0)
 1039        {
 1040            lock (mutex_)
 1041            {
 1042                if ((state_ & StateCachedBuffers) > 0)
 1043                {
 1044                    return;
 1045                }
 1046                state_ |= StateCachedBuffers;
 1047            }
 1048
 1049            is_?.reset();
 1050            os_.reset();
 1051
 1052            proxy_.cacheMessageBuffers(is_, os_);
 1053
 1054            is_ = null;
 1055            os_ = null;
 1056        }
 1057    }
 1058
 1059    protected readonly Ice.EncodingVersion encoding_;
 1060    protected System.Action<Ice.UserException> userException_;
 1061}
 1062
 1063public class OutgoingAsyncT<T> : OutgoingAsync
 1064{
 1065    public OutgoingAsyncT(
 1066        Ice.ObjectPrxHelperBase prx,
 1067        OutgoingAsyncCompletionCallback completionCallback,
 1068        Ice.OutputStream os = null,
 1069        Ice.InputStream iss = null)
 1070        : base(prx, completionCallback, os, iss)
 1071    {
 1072    }
 1073
 1074    public void invoke(
 1075        string operation,
 1076        Ice.OperationMode mode,
 1077        Ice.FormatType? format,
 1078        Dictionary<string, string> context,
 1079        bool synchronous,
 1080        System.Action<Ice.OutputStream> write = null,
 1081        System.Action<Ice.UserException> userException = null,
 1082        System.Func<Ice.InputStream, T> read = null)
 1083    {
 1084        read_ = read;
 1085        userException_ = userException;
 1086        base.invoke(operation, mode, format, context, synchronous, write);
 1087    }
 1088
 1089    public T getResult(bool ok)
 1090    {
 1091        try
 1092        {
 1093            if (ok)
 1094            {
 1095                if (read_ == null)
 1096                {
 1097                    if (is_ == null || is_.isEmpty())
 1098                    {
 1099                        //
 1100                        // If there's no response (oneway, batch-oneway proxies), we just set the result
 1101                        // on completion without reading anything from the input stream. This is required for
 1102                        // batch invocations.
 1103                        //
 1104                    }
 1105                    else
 1106                    {
 1107                        is_.skipEmptyEncapsulation();
 1108                    }
 1109                    return default;
 1110                }
 1111                else
 1112                {
 1113                    is_.startEncapsulation();
 1114                    T r = read_(is_);
 1115                    is_.endEncapsulation();
 1116                    return r;
 1117                }
 1118            }
 1119            else
 1120            {
 1121                throwUserException();
 1122                return default; // make compiler happy
 1123            }
 1124        }
 1125        finally
 1126        {
 1127            cacheMessageBuffers();
 1128        }
 1129    }
 1130
 1131    protected System.Func<Ice.InputStream, T> read_;
 1132}
 1133
 1134//
 1135// Class for handling the proxy's begin_ice_flushBatchRequest request.
 1136//
 1137internal class ProxyFlushBatchAsync : ProxyOutgoingAsyncBase
 1138{
 1139    public ProxyFlushBatchAsync(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback)
 1140        : base(prx, completionCallback)
 1141    {
 1142    }
 1143
 1144    public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
 1145    {
 1146        if (_batchRequestNum == 0)
 1147        {
 1148            if (sent())
 1149            {
 1150                return AsyncStatusSent | AsyncStatusInvokeSentCallback;
 1151            }
 1152            else
 1153            {
 1154                return AsyncStatusSent;
 1155            }
 1156        }
 1157        cachedConnection_ = connection;
 1158        return connection.sendAsyncRequest(this, compress, false, _batchRequestNum);
 1159    }
 1160
 1161    public override int invokeCollocated(CollocatedRequestHandler handler)
 1162    {
 1163        if (_batchRequestNum == 0)
 1164        {
 1165            if (sent())
 1166            {
 1167                return AsyncStatusSent | AsyncStatusInvokeSentCallback;
 1168            }
 1169            else
 1170            {
 1171                return AsyncStatusSent;
 1172            }
 1173        }
 1174        return handler.invokeAsyncRequest(this, _batchRequestNum, false);
 1175    }
 1176
 1177    public void invoke(string operation, bool synchronous)
 1178    {
 1179        if (proxy_.iceReference().getProtocol().major != Protocol.currentProtocol.major)
 1180        {
 1181            throw new FeatureNotSupportedException(
 1182                $"Cannot send request using protocol version {proxy_.iceReference().getProtocol()}.");
 1183        }
 1184        try
 1185        {
 1186            synchronous_ = synchronous;
 1187            observer_ = ObserverHelper.get(proxy_, operation, null);
 1188            // Not used for proxy flush batch requests.
 1189            _batchRequestNum = proxy_.iceReference().batchRequestQueue.swap(os_, out _);
 1190            invokeImpl(true); // userThread = true
 1191        }
 1192        catch (Ice.Exception ex)
 1193        {
 1194            abort(ex);
 1195        }
 1196    }
 1197
 1198    private int _batchRequestNum;
 1199}
 1200
 1201//
 1202// Class for handling the proxy's begin_ice_getConnection request.
 1203//
 1204internal class ProxyGetConnection : ProxyOutgoingAsyncBase
 1205{
 1206    public ProxyGetConnection(Ice.ObjectPrxHelperBase prx, OutgoingAsyncCompletionCallback completionCallback)
 1207        : base(prx, completionCallback)
 1208    {
 1209    }
 1210
 1211    public override int invokeRemote(Ice.ConnectionI connection, bool compress, bool response)
 1212    {
 1213        cachedConnection_ = connection;
 1214        if (responseImpl(false, true, true))
 1215        {
 1216            invokeResponseAsync();
 1217        }
 1218        return AsyncStatusSent;
 1219    }
 1220
 1221    public override int invokeCollocated(CollocatedRequestHandler handler)
 1222    {
 1223        if (responseImpl(false, true, true))
 1224        {
 1225            invokeResponseAsync();
 1226        }
 1227        return AsyncStatusSent;
 1228    }
 1229
 1230    public Ice.Connection getConnection() => cachedConnection_;
 1231
 1232    public void invoke(string operation, bool synchronous)
 1233    {
 1234        try
 1235        {
 1236            synchronous_ = synchronous;
 1237            observer_ = ObserverHelper.get(proxy_, operation, null);
 1238            invokeImpl(true); // userThread = true
 1239        }
 1240        catch (Ice.Exception ex)
 1241        {
 1242            abort(ex);
 1243        }
 1244    }
 1245}
 1246
 1247internal class ConnectionFlushBatchAsync : OutgoingAsyncBase
 1248{
 1249    public ConnectionFlushBatchAsync(
 1250        Ice.ConnectionI connection,
 1251        Instance instance,
 1252        OutgoingAsyncCompletionCallback completionCallback)
 1253        : base(instance, completionCallback) => _connection = connection;
 1254
 1255    public void invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)
 1256    {
 1257        synchronous_ = synchronous;
 1258        observer_ = ObserverHelper.get(instance_, operation);
 1259        try
 1260        {
 1261            int status;
 1262            int batchRequestNum = _connection.getBatchRequestQueue().swap(os_, out bool compress);
 1263            if (batchRequestNum == 0)
 1264            {
 1265                status = AsyncStatusSent;
 1266                if (sent())
 1267                {
 1268                    status |= AsyncStatusInvokeSentCallback;
 1269                }
 1270            }
 1271            else
 1272            {
 1273                bool comp;
 1274                if (compressBatch == Ice.CompressBatch.Yes)
 1275                {
 1276                    comp = true;
 1277                }
 1278                else if (compressBatch == Ice.CompressBatch.No)
 1279                {
 1280                    comp = false;
 1281                }
 1282                else
 1283                {
 1284                    comp = compress;
 1285                }
 1286                status = _connection.sendAsyncRequest(this, comp, false, batchRequestNum);
 1287            }
 1288
 1289            if ((status & AsyncStatusSent) != 0)
 1290            {
 1291                sentSynchronously_ = true;
 1292                if ((status & AsyncStatusInvokeSentCallback) != 0)
 1293                {
 1294                    invokeSent();
 1295                }
 1296            }
 1297        }
 1298        catch (RetryException ex)
 1299        {
 1300            try
 1301            {
 1302                throw ex.get();
 1303            }
 1304            catch (Ice.LocalException ee)
 1305            {
 1306                if (exception(ee))
 1307                {
 1308                    invokeExceptionAsync();
 1309                }
 1310            }
 1311        }
 1312        catch (Ice.Exception ex)
 1313        {
 1314            if (exception(ex))
 1315            {
 1316                invokeExceptionAsync();
 1317            }
 1318        }
 1319    }
 1320
 1321    private readonly Ice.ConnectionI _connection;
 1322}
 1323
 1324public class CommunicatorFlushBatchAsync : OutgoingAsyncBase
 1325{
 1326    private class FlushBatch : OutgoingAsyncBase
 1327    {
 1328        public FlushBatch(
 1329            CommunicatorFlushBatchAsync outAsync,
 1330            Instance instance,
 1331            Ice.Instrumentation.InvocationObserver observer)
 1332            : base(instance, null)
 1333        {
 1334            _outAsync = outAsync;
 1335            _observer = observer;
 1336        }
 1337
 1338        public override bool
 1339        sent()
 1340        {
 1341            childObserver_?.detach();
 1342            childObserver_ = null;
 1343            _outAsync.check(false);
 1344            return false;
 1345        }
 1346
 1347        public override bool
 1348        exception(Ice.Exception ex)
 1349        {
 1350            if (childObserver_ != null)
 1351            {
 1352                childObserver_.failed(ex.ice_id());
 1353                childObserver_.detach();
 1354                childObserver_ = null;
 1355            }
 1356            _outAsync.check(false);
 1357            return false;
 1358        }
 1359
 1360        protected override Ice.Instrumentation.InvocationObserver
 1361        getObserver() => _observer;
 1362
 1363        private readonly CommunicatorFlushBatchAsync _outAsync;
 1364        private readonly Ice.Instrumentation.InvocationObserver _observer;
 1365    }
 1366
 1367    public CommunicatorFlushBatchAsync(Instance instance, OutgoingAsyncCompletionCallback callback)
 1368        : base(instance, callback) =>
 1369        //
 1370        // _useCount is initialized to 1 to prevent premature callbacks.
 1371        // The caller must invoke ready() after all flush requests have
 1372        // been initiated.
 1373        //
 1374        _useCount = 1;
 1375
 1376    internal void flushConnection(Ice.ConnectionI con, Ice.CompressBatch compressBatch)
 1377    {
 1378        lock (mutex_)
 1379        {
 1380            ++_useCount;
 1381        }
 1382
 1383        try
 1384        {
 1385            var flushBatch = new FlushBatch(this, instance_, observer_);
 1386            int batchRequestNum = con.getBatchRequestQueue().swap(flushBatch.getOs(), out bool compress);
 1387            if (batchRequestNum == 0)
 1388            {
 1389                flushBatch.sent();
 1390            }
 1391            else
 1392            {
 1393                bool comp;
 1394                if (compressBatch == Ice.CompressBatch.Yes)
 1395                {
 1396                    comp = true;
 1397                }
 1398                else if (compressBatch == Ice.CompressBatch.No)
 1399                {
 1400                    comp = false;
 1401                }
 1402                else
 1403                {
 1404                    comp = compress;
 1405                }
 1406                con.sendAsyncRequest(flushBatch, comp, false, batchRequestNum);
 1407            }
 1408        }
 1409        catch (Ice.LocalException)
 1410        {
 1411            check(false);
 1412            throw;
 1413        }
 1414    }
 1415
 1416    public void invoke(string operation, Ice.CompressBatch compressBatch, bool synchronous)
 1417    {
 1418        synchronous_ = synchronous;
 1419        observer_ = ObserverHelper.get(instance_, operation);
 1420        instance_.outgoingConnectionFactory().flushAsyncBatchRequests(compressBatch, this);
 1421        instance_.objectAdapterFactory().flushAsyncBatchRequests(compressBatch, this);
 1422        check(true);
 1423    }
 1424
 1425    public void check(bool userThread)
 1426    {
 1427        lock (mutex_)
 1428        {
 1429            Debug.Assert(_useCount > 0);
 1430            if (--_useCount > 0)
 1431            {
 1432                return;
 1433            }
 1434        }
 1435
 1436        if (sentImpl(true))
 1437        {
 1438            if (userThread)
 1439            {
 1440                sentSynchronously_ = true;
 1441                invokeSent();
 1442            }
 1443            else
 1444            {
 1445                invokeSentAsync();
 1446            }
 1447        }
 1448    }
 1449
 1450    private int _useCount;
 1451}
 1452
 1453public abstract class TaskCompletionCallback<T> : TaskCompletionSource<T>, OutgoingAsyncCompletionCallback
 1454{
 1455    protected TaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)
 1456        : base(TaskCreationOptions.RunContinuationsAsynchronously)
 1457    {
 1458        progress_ = progress;
 1459        _cancellationToken = cancellationToken;
 1460    }
 1461
 1462    public void init(OutgoingAsyncBase og)
 1463    {
 1464        if (_cancellationToken.CanBeCanceled)
 1465        {
 1466            _cancellationToken.Register(og.cancel);
 1467        }
 1468    }
 1469
 1470    public bool handleSent(bool done, bool alreadySent, OutgoingAsyncBase og)
 1471    {
 1472        if (done && og.isSynchronous())
 1473        {
 1474            Debug.Assert(progress_ == null);
 1475            handleInvokeSent(false, done, alreadySent, og);
 1476            return false;
 1477        }
 1478        return done || (progress_ != null && !alreadySent); // Invoke the sent callback only if not already invoked.
 1479    }
 1480
 1481    public bool handleException(Ice.Exception ex, OutgoingAsyncBase og)
 1482    {
 1483        //
 1484        // If this is a synchronous call, we can notify the task from this thread to avoid
 1485        // the thread context switch. We know there aren't any continuations setup with the
 1486        // task.
 1487        //
 1488        if (og.isSynchronous())
 1489        {
 1490            handleInvokeException(ex, og);
 1491            return false;
 1492        }
 1493        else
 1494        {
 1495            return true;
 1496        }
 1497    }
 1498
 1499    public bool handleResponse(bool userThread, bool ok, OutgoingAsyncBase og)
 1500    {
 1501        //
 1502        // If called from the user thread (only the case for batch requests) or if this
 1503        // is a synchronous call, we can notify the task from this thread to avoid the
 1504        // thread context switch. We know there aren't any continuations setup with the
 1505        // task.
 1506        //
 1507        if (userThread || og.isSynchronous())
 1508        {
 1509            handleInvokeResponse(ok, og);
 1510            return false;
 1511        }
 1512        else
 1513        {
 1514            return true;
 1515        }
 1516    }
 1517
 1518    public virtual void handleInvokeSent(bool sentSynchronously, bool done, bool alreadySent, OutgoingAsyncBase og)
 1519    {
 1520        if (progress_ != null && !alreadySent)
 1521        {
 1522            progress_.Report(sentSynchronously);
 1523        }
 1524        if (done)
 1525        {
 1526            SetResult(default);
 1527        }
 1528    }
 1529
 1530    public void handleInvokeException(Ice.Exception ex, OutgoingAsyncBase og) => SetException(ex);
 1531
 1532    public abstract void handleInvokeResponse(bool ok, OutgoingAsyncBase og);
 1533
 1534    private readonly CancellationToken _cancellationToken;
 1535
 1536    protected readonly System.IProgress<bool> progress_;
 1537}
 1538
 1539public class OperationTaskCompletionCallback<T> : TaskCompletionCallback<T>
 1540{
 1541    public OperationTaskCompletionCallback(System.IProgress<bool> progress, CancellationToken cancellationToken)
 1542        : base(progress, cancellationToken)
 1543    {
 1544    }
 1545
 1546    public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og) =>
 1547        SetResult(((OutgoingAsyncT<T>)og).getResult(ok));
 1548}
 1549
 1550public class FlushBatchTaskCompletionCallback : TaskCompletionCallback<object>
 1551{
 1552    public FlushBatchTaskCompletionCallback(
 1553        IProgress<bool> progress = null,
 1554        CancellationToken cancellationToken = default)
 1555        : base(progress, cancellationToken)
 1556    {
 1557    }
 1558
 1559    public override void handleInvokeResponse(bool ok, OutgoingAsyncBase og) => SetResult(null);
 1560}