< Summary

Information
Class: Ice.ConnectionI.OutgoingMessage
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/ConnectionI.cs
Tag: 71_18251537082
Line coverage
100%
Covered lines: 22
Uncovered lines: 0
Coverable lines: 22
Total lines: 2945
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage
100%
Covered methods: 5
Total methods: 5
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.ctor(...)100%11100%
canceled()100%11100%
sent()100%44100%
completed(...)100%44100%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/ConnectionI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Ice.Instrumentation;
 4using Ice.Internal;
 5using System.Diagnostics;
 6using System.Text;
 7
 8namespace Ice;
 9
 10#pragma warning disable CA1001 // _inactivityTimer is disposed by cancelInactivityTimer.
 11public sealed class ConnectionI : Internal.EventHandler, CancellationHandler, Connection
 12#pragma warning restore CA1001
 13{
 14    internal interface StartCallback
 15    {
 16        void connectionStartCompleted(ConnectionI connection);
 17
 18        void connectionStartFailed(ConnectionI connection, LocalException ex);
 19    }
 20
 21    internal void start(StartCallback callback)
 22    {
 23        try
 24        {
 25            lock (_mutex)
 26            {
 27                //
 28                // The connection might already be closed if the communicator was destroyed.
 29                //
 30                if (_state >= StateClosed)
 31                {
 32                    Debug.Assert(_exception is not null);
 33                    throw _exception;
 34                }
 35
 36                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 37                {
 38                    if (_connectTimeout > TimeSpan.Zero)
 39                    {
 40#pragma warning disable CA2000 // connectTimer is disposed by connectTimedOut.
 41                        var connectTimer = new System.Threading.Timer(
 42                            timerObj => connectTimedOut((System.Threading.Timer)timerObj));
 43                        // schedule timer to run once; connectTimedOut disposes the timer too.
 44                        connectTimer.Change(_connectTimeout, Timeout.InfiniteTimeSpan);
 45#pragma warning restore CA2000
 46                    }
 47
 48                    _startCallback = callback;
 49                    return;
 50                }
 51
 52                // The connection starts in the holding state. It will be activated by the connection factory.
 53                setState(StateHolding);
 54            }
 55        }
 56        catch (LocalException ex)
 57        {
 58            exception(ex);
 59            callback.connectionStartFailed(this, _exception);
 60            return;
 61        }
 62
 63        callback.connectionStartCompleted(this);
 64    }
 65
 66    internal void startAndWait()
 67    {
 68        try
 69        {
 70            lock (_mutex)
 71            {
 72                //
 73                // The connection might already be closed if the communicator was destroyed.
 74                //
 75                if (_state >= StateClosed)
 76                {
 77                    Debug.Assert(_exception is not null);
 78                    throw _exception;
 79                }
 80
 81                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 82                {
 83                    //
 84                    // Wait for the connection to be validated.
 85                    //
 86                    while (_state <= StateNotValidated)
 87                    {
 88                        Monitor.Wait(_mutex);
 89                    }
 90
 91                    if (_state >= StateClosing)
 92                    {
 93                        Debug.Assert(_exception is not null);
 94                        throw _exception;
 95                    }
 96                }
 97
 98                //
 99                // We start out in holding state.
 100                //
 101                setState(StateHolding);
 102            }
 103        }
 104        catch (LocalException ex)
 105        {
 106            exception(ex);
 107            waitUntilFinished();
 108            return;
 109        }
 110    }
 111
 112    internal void activate()
 113    {
 114        lock (_mutex)
 115        {
 116            if (_state <= StateNotValidated)
 117            {
 118                return;
 119            }
 120
 121            setState(StateActive);
 122        }
 123    }
 124
 125    internal void hold()
 126    {
 127        lock (_mutex)
 128        {
 129            if (_state <= StateNotValidated)
 130            {
 131                return;
 132            }
 133
 134            setState(StateHolding);
 135        }
 136    }
 137
 138    // DestructionReason.
 139    public const int ObjectAdapterDeactivated = 0;
 140    public const int CommunicatorDestroyed = 1;
 141
 142    internal void destroy(int reason)
 143    {
 144        lock (_mutex)
 145        {
 146            switch (reason)
 147            {
 148                case ObjectAdapterDeactivated:
 149                {
 150                    setState(StateClosing, new ObjectAdapterDeactivatedException(_adapter?.getName() ?? ""));
 151                    break;
 152                }
 153
 154                case CommunicatorDestroyed:
 155                {
 156                    setState(StateClosing, new CommunicatorDestroyedException());
 157                    break;
 158                }
 159            }
 160        }
 161    }
 162
 163    public void abort()
 164    {
 165        lock (_mutex)
 166        {
 167            setState(
 168                StateClosed,
 169                new ConnectionAbortedException(
 170                    "The connection was aborted by the application.",
 171                    closedByApplication: true));
 172        }
 173    }
 174
 175    public Task closeAsync()
 176    {
 177        lock (_mutex)
 178        {
 179            if (_state < StateClosing)
 180            {
 181                if (_asyncRequests.Count == 0)
 182                {
 183                    doApplicationClose();
 184                }
 185                else
 186                {
 187                    _closeRequested = true;
 188                    scheduleCloseTimer(); // we don't wait forever for outstanding invocations to complete
 189                }
 190            }
 191            // else nothing to do, already closing or closed.
 192        }
 193
 194        return _closed.Task;
 195    }
 196
 197    internal bool isActiveOrHolding()
 198    {
 199        lock (_mutex)
 200        {
 201            return _state > StateNotValidated && _state < StateClosing;
 202        }
 203    }
 204
 205    public void throwException()
 206    {
 207        lock (_mutex)
 208        {
 209            if (_exception is not null)
 210            {
 211                Debug.Assert(_state >= StateClosing);
 212                throw _exception;
 213            }
 214        }
 215    }
 216
 217    internal void waitUntilHolding()
 218    {
 219        lock (_mutex)
 220        {
 221            while (_state < StateHolding || _upcallCount > 0)
 222            {
 223                Monitor.Wait(_mutex);
 224            }
 225        }
 226    }
 227
 228    internal void waitUntilFinished()
 229    {
 230        lock (_mutex)
 231        {
 232            //
 233            // We wait indefinitely until the connection is finished and all
 234            // outstanding requests are completed. Otherwise we couldn't
 235            // guarantee that there are no outstanding calls when deactivate()
 236            // is called on the servant locators.
 237            //
 238            while (_state < StateFinished || _upcallCount > 0)
 239            {
 240                Monitor.Wait(_mutex);
 241            }
 242
 243            Debug.Assert(_state == StateFinished);
 244
 245            //
 246            // Clear the OA. See bug 1673 for the details of why this is necessary.
 247            //
 248            _adapter = null;
 249        }
 250    }
 251
 252    internal void updateObserver()
 253    {
 254        lock (_mutex)
 255        {
 256            if (_state < StateNotValidated || _state > StateClosed)
 257            {
 258                return;
 259            }
 260
 261            Debug.Assert(_instance.initializationData().observer is not null);
 262            _observer = _instance.initializationData().observer.getConnectionObserver(
 263                initConnectionInfo(),
 264                _endpoint,
 265                toConnectionState(_state),
 266                _observer);
 267            if (_observer is not null)
 268            {
 269                _observer.attach();
 270            }
 271            else
 272            {
 273                _writeStreamPos = -1;
 274                _readStreamPos = -1;
 275            }
 276        }
 277    }
 278
 279    internal int sendAsyncRequest(
 280        OutgoingAsyncBase og,
 281        bool compress,
 282        bool response,
 283        int batchRequestCount)
 284    {
 285        OutputStream os = og.getOs();
 286
 287        lock (_mutex)
 288        {
 289            //
 290            // If the exception is closed before we even have a chance
 291            // to send our request, we always try to send the request
 292            // again.
 293            //
 294            if (_exception is not null)
 295            {
 296                throw new RetryException(_exception);
 297            }
 298
 299            Debug.Assert(_state > StateNotValidated);
 300            Debug.Assert(_state < StateClosing);
 301
 302            //
 303            // Ensure the message isn't bigger than what we can send with the
 304            // transport.
 305            //
 306            _transceiver.checkSendSize(os.getBuffer());
 307
 308            //
 309            // Notify the request that it's cancelable with this connection.
 310            // This will throw if the request is canceled.
 311            //
 312            og.cancelable(this);
 313            int requestId = 0;
 314            if (response)
 315            {
 316                //
 317                // Create a new unique request ID.
 318                //
 319                requestId = _nextRequestId++;
 320                if (requestId <= 0)
 321                {
 322                    _nextRequestId = 1;
 323                    requestId = _nextRequestId++;
 324                }
 325
 326                //
 327                // Fill in the request ID.
 328                //
 329                os.pos(Protocol.headerSize);
 330                os.writeInt(requestId);
 331            }
 332            else if (batchRequestCount > 0)
 333            {
 334                os.pos(Protocol.headerSize);
 335                os.writeInt(batchRequestCount);
 336            }
 337
 338            og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
 339
 340            // We're just about to send a request, so we are not inactive anymore.
 341            cancelInactivityTimer();
 342
 343            int status = OutgoingAsyncBase.AsyncStatusQueued;
 344            try
 345            {
 346                var message = new OutgoingMessage(og, os, compress, requestId);
 347                status = sendMessage(message);
 348            }
 349            catch (LocalException ex)
 350            {
 351                setState(StateClosed, ex);
 352                Debug.Assert(_exception is not null);
 353                throw _exception;
 354            }
 355
 356            if (response)
 357            {
 358                //
 359                // Add to the async requests map.
 360                //
 361                _asyncRequests[requestId] = og;
 362            }
 363            return status;
 364        }
 365    }
 366
 367    internal BatchRequestQueue getBatchRequestQueue() => _batchRequestQueue;
 368
 369    public void flushBatchRequests(CompressBatch compress)
 370    {
 371        try
 372        {
 373            var completed = new FlushBatchTaskCompletionCallback();
 374            var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 375            outgoing.invoke(_flushBatchRequests_name, compress, true);
 376            completed.Task.Wait();
 377        }
 378        catch (AggregateException ex)
 379        {
 380            throw ex.InnerException;
 381        }
 382    }
 383
 384    public Task flushBatchRequestsAsync(
 385        CompressBatch compress,
 386        IProgress<bool> progress = null,
 387        CancellationToken cancel = default)
 388    {
 389        var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
 390        var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 391        outgoing.invoke(_flushBatchRequests_name, compress, false);
 392        return completed.Task;
 393    }
 394
 395    private const string _flushBatchRequests_name = "flushBatchRequests";
 396
 397    public void disableInactivityCheck()
 398    {
 399        lock (_mutex)
 400        {
 401            cancelInactivityTimer();
 402            _inactivityTimeout = TimeSpan.Zero;
 403        }
 404    }
 405
 406    public void setCloseCallback(CloseCallback callback)
 407    {
 408        lock (_mutex)
 409        {
 410            if (_state >= StateClosed)
 411            {
 412                if (callback is not null)
 413                {
 414                    _threadPool.execute(
 415                        () =>
 416                        {
 417                            try
 418                            {
 419                                callback(this);
 420                            }
 421                            catch (System.Exception ex)
 422                            {
 423                                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 424                            }
 425                        },
 426                        this);
 427                }
 428            }
 429            else
 430            {
 431                _closeCallback = callback;
 432            }
 433        }
 434    }
 435
 436    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
 437    {
 438        //
 439        // NOTE: This isn't called from a thread pool thread.
 440        //
 441
 442        lock (_mutex)
 443        {
 444            if (_state >= StateClosed)
 445            {
 446                return; // The request has already been or will be shortly notified of the failure.
 447            }
 448
 449            OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
 450            if (o is not null)
 451            {
 452                if (o.requestId > 0)
 453                {
 454                    _asyncRequests.Remove(o.requestId);
 455                }
 456
 457                if (ex is ConnectionAbortedException)
 458                {
 459                    setState(StateClosed, ex);
 460                }
 461                else
 462                {
 463                    //
 464                    // If the request is being sent, don't remove it from the send streams,
 465                    // it will be removed once the sending is finished.
 466                    //
 467                    if (o == _sendStreams.First.Value)
 468                    {
 469                        o.canceled();
 470                    }
 471                    else
 472                    {
 473                        o.canceled();
 474                        _sendStreams.Remove(o);
 475                    }
 476                    if (outAsync.exception(ex))
 477                    {
 478                        outAsync.invokeExceptionAsync();
 479                    }
 480                }
 481
 482                if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 483                {
 484                    doApplicationClose();
 485                }
 486                return;
 487            }
 488
 489            if (outAsync is OutgoingAsync)
 490            {
 491                foreach (KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
 492                {
 493                    if (kvp.Value == outAsync)
 494                    {
 495                        if (ex is ConnectionAbortedException)
 496                        {
 497                            setState(StateClosed, ex);
 498                        }
 499                        else
 500                        {
 501                            _asyncRequests.Remove(kvp.Key);
 502                            if (outAsync.exception(ex))
 503                            {
 504                                outAsync.invokeExceptionAsync();
 505                            }
 506                        }
 507
 508                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 509                        {
 510                            doApplicationClose();
 511                        }
 512                        return;
 513                    }
 514                }
 515            }
 516        }
 517    }
 518
 519    internal EndpointI endpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 520
 521    internal Connector connector() => _connector; // No mutex protection necessary, _endpoint is immutable.
 522
 523    public void setAdapter(ObjectAdapter adapter)
 524    {
 525        if (_connector is null) // server connection
 526        {
 527            throw new InvalidOperationException("setAdapter can only be called on a client connection");
 528        }
 529
 530        if (adapter is not null)
 531        {
 532            // Go through the adapter to set the adapter and servant manager on this connection
 533            // to ensure the object adapter is still active.
 534            adapter.setAdapterOnConnection(this);
 535        }
 536        else
 537        {
 538            lock (_mutex)
 539            {
 540                if (_state <= StateNotValidated || _state >= StateClosing)
 541                {
 542                    return;
 543                }
 544                _adapter = null;
 545            }
 546        }
 547
 548        //
 549        // We never change the thread pool with which we were initially
 550        // registered, even if we add or remove an object adapter.
 551        //
 552    }
 553
 554    public ObjectAdapter getAdapter()
 555    {
 556        lock (_mutex)
 557        {
 558            return _adapter;
 559        }
 560    }
 561
 562    public Endpoint getEndpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 563
 564    public ObjectPrx createProxy(Identity id)
 565    {
 566        ObjectAdapter.checkIdentity(id);
 567        return new ObjectPrxHelper(_instance.referenceFactory().create(id, this));
 568    }
 569
 570    public void setAdapterFromAdapter(ObjectAdapter adapter)
 571    {
 572        lock (_mutex)
 573        {
 574            if (_state <= StateNotValidated || _state >= StateClosing)
 575            {
 576                return;
 577            }
 578            Debug.Assert(adapter is not null); // Called by ObjectAdapter::setAdapterOnConnection
 579            _adapter = adapter;
 580
 581            // Clear cached connection info (if any) as it's no longer accurate.
 582            _info = null;
 583        }
 584    }
 585
 586    //
 587    // Operations from EventHandler
 588    //
 589    public override bool startAsync(int operation, Ice.Internal.AsyncCallback completedCallback)
 590    {
 591        if (_state >= StateClosed)
 592        {
 593            return false;
 594        }
 595
 596        // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
 597        // Ice thread pool thread is terminated (.NET Socket read/write fail with a SocketError.OperationAborted
 598        // error if started from a thread which is later terminated).
 599        Task.Run(() =>
 600        {
 601            lock (_mutex)
 602            {
 603                if (_state >= StateClosed)
 604                {
 605                    completedCallback(this);
 606                    return;
 607                }
 608
 609                try
 610                {
 611                    if ((operation & SocketOperation.Write) != 0)
 612                    {
 613                        if (_observer != null)
 614                        {
 615                            observerStartWrite(_writeStream.getBuffer());
 616                        }
 617
 618                        bool completedSynchronously =
 619                            _transceiver.startWrite(
 620                                _writeStream.getBuffer(),
 621                                completedCallback,
 622                                this,
 623                                out bool messageWritten);
 624
 625                        // If the startWrite call wrote the message, we assume the message is sent now for at-most-once
 626                        // semantics in the event the connection is closed while the message is still in _sendStreams.
 627                        if (messageWritten && _sendStreams.Count > 0)
 628                        {
 629                            // See finish() code.
 630                            _sendStreams.First.Value.isSent = true;
 631                        }
 632
 633                        if (completedSynchronously)
 634                        {
 635                            // If the write completed synchronously, we need to call the completedCallback.
 636                            completedCallback(this);
 637                        }
 638                    }
 639                    else if ((operation & SocketOperation.Read) != 0)
 640                    {
 641                        if (_observer != null && !_readHeader)
 642                        {
 643                            observerStartRead(_readStream.getBuffer());
 644                        }
 645
 646                        if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
 647                        {
 648                            completedCallback(this);
 649                        }
 650                    }
 651                }
 652                catch (LocalException ex)
 653                {
 654                    setState(StateClosed, ex);
 655                    completedCallback(this);
 656                }
 657            }
 658        });
 659
 660        return true;
 661    }
 662
 663    public override bool finishAsync(int operation)
 664    {
 665        if (_state >= StateClosed)
 666        {
 667            return false;
 668        }
 669
 670        try
 671        {
 672            if ((operation & SocketOperation.Write) != 0)
 673            {
 674                Ice.Internal.Buffer buf = _writeStream.getBuffer();
 675                int start = buf.b.position();
 676                _transceiver.finishWrite(buf);
 677                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 678                {
 679                    var s = new StringBuilder("sent ");
 680                    s.Append(buf.b.position() - start);
 681                    if (!_endpoint.datagram())
 682                    {
 683                        s.Append(" of ");
 684                        s.Append(buf.b.limit() - start);
 685                    }
 686                    s.Append(" bytes via ");
 687                    s.Append(_endpoint.protocol());
 688                    s.Append('\n');
 689                    s.Append(ToString());
 690                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 691                }
 692
 693                if (_observer is not null)
 694                {
 695                    observerFinishWrite(_writeStream.getBuffer());
 696                }
 697            }
 698            else if ((operation & SocketOperation.Read) != 0)
 699            {
 700                Ice.Internal.Buffer buf = _readStream.getBuffer();
 701                int start = buf.b.position();
 702                _transceiver.finishRead(buf);
 703                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 704                {
 705                    var s = new StringBuilder("received ");
 706                    if (_endpoint.datagram())
 707                    {
 708                        s.Append(buf.b.limit());
 709                    }
 710                    else
 711                    {
 712                        s.Append(buf.b.position() - start);
 713                        s.Append(" of ");
 714                        s.Append(buf.b.limit() - start);
 715                    }
 716                    s.Append(" bytes via ");
 717                    s.Append(_endpoint.protocol());
 718                    s.Append('\n');
 719                    s.Append(ToString());
 720                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 721                }
 722
 723                if (_observer is not null && !_readHeader)
 724                {
 725                    observerFinishRead(_readStream.getBuffer());
 726                }
 727            }
 728        }
 729        catch (LocalException ex)
 730        {
 731            setState(StateClosed, ex);
 732        }
 733        return _state < StateClosed;
 734    }
 735
 736    public override void message(ThreadPoolCurrent current)
 737    {
 738        StartCallback startCB = null;
 739        Queue<OutgoingMessage> sentCBs = null;
 740        var info = new MessageInfo();
 741        int upcallCount = 0;
 742
 743        using var msg = new ThreadPoolMessage(current, _mutex);
 744        lock (_mutex)
 745        {
 746            try
 747            {
 748                if (!msg.startIOScope())
 749                {
 750                    return;
 751                }
 752
 753                if (_state >= StateClosed)
 754                {
 755                    return;
 756                }
 757
 758                try
 759                {
 760                    int writeOp = SocketOperation.None;
 761                    int readOp = SocketOperation.None;
 762
 763                    // If writes are ready, write the data from the connection's write buffer (_writeStream)
 764                    if ((current.operation & SocketOperation.Write) != 0)
 765                    {
 766                        if (_observer is not null)
 767                        {
 768                            observerStartWrite(_writeStream.getBuffer());
 769                        }
 770                        writeOp = write(_writeStream.getBuffer());
 771                        if (_observer is not null && (writeOp & SocketOperation.Write) == 0)
 772                        {
 773                            observerFinishWrite(_writeStream.getBuffer());
 774                        }
 775                    }
 776
 777                    // If reads are ready, read the data into the connection's read buffer (_readStream). The data is
 778                    // read until:
 779                    // - the full message is read (the transport read returns SocketOperationNone) and
 780                    //   the read buffer is fully filled
 781                    // - the read operation on the transport can't continue without blocking
 782                    if ((current.operation & SocketOperation.Read) != 0)
 783                    {
 784                        while (true)
 785                        {
 786                            Ice.Internal.Buffer buf = _readStream.getBuffer();
 787
 788                            if (_observer is not null && !_readHeader)
 789                            {
 790                                observerStartRead(buf);
 791                            }
 792
 793                            readOp = read(buf);
 794                            if ((readOp & SocketOperation.Read) != 0)
 795                            {
 796                                // Can't continue without blocking, exit out of the loop.
 797                                break;
 798                            }
 799                            if (_observer is not null && !_readHeader)
 800                            {
 801                                Debug.Assert(!buf.b.hasRemaining());
 802                                observerFinishRead(buf);
 803                            }
 804
 805                            // If read header is true, we're reading a new Ice protocol message and we need to read
 806                            // the message header.
 807                            if (_readHeader)
 808                            {
 809                                // The next read will read the remainder of the message.
 810                                _readHeader = false;
 811
 812                                _observer?.receivedBytes(Protocol.headerSize);
 813
 814                                //
 815                                // Connection is validated on first message. This is only used by
 816                                // setState() to check whether or not we can print a connection
 817                                // warning (a client might close the connection forcefully if the
 818                                // connection isn't validated, we don't want to print a warning
 819                                // in this case).
 820                                //
 821                                _validated = true;
 822
 823                                // Full header should be read because the size of _readStream is always headerSize (14)
 824                                // when reading a new message (see the code that sets _readHeader = true).
 825                                int pos = _readStream.pos();
 826                                if (pos < Protocol.headerSize)
 827                                {
 828                                    //
 829                                    // This situation is possible for small UDP packets.
 830                                    //
 831                                    throw new MarshalException("Received Ice message with too few bytes in header.");
 832                                }
 833
 834                                // Decode the header.
 835                                _readStream.pos(0);
 836                                byte[] m = new byte[4];
 837                                m[0] = _readStream.readByte();
 838                                m[1] = _readStream.readByte();
 839                                m[2] = _readStream.readByte();
 840                                m[3] = _readStream.readByte();
 841                                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 842                                m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 843                                {
 844                                    throw new ProtocolException(
 845                                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 846                                }
 847
 848                                var pv = new ProtocolVersion(_readStream);
 849                                if (pv != Util.currentProtocol)
 850                                {
 851                                    throw new MarshalException(
 852                                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 853                                }
 854                                var ev = new EncodingVersion(_readStream);
 855                                if (ev != Util.currentProtocolEncoding)
 856                                {
 857                                    throw new MarshalException(
 858                                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 859                                }
 860
 861                                _readStream.readByte(); // messageType
 862                                _readStream.readByte(); // compress
 863                                int size = _readStream.readInt();
 864                                if (size < Protocol.headerSize)
 865                                {
 866                                    throw new MarshalException($"Received Ice message with unexpected size {size}.");
 867                                }
 868
 869                                // Resize the read buffer to the message size.
 870                                if (size > _messageSizeMax)
 871                                {
 872                                    Ex.throwMemoryLimitException(size, _messageSizeMax);
 873                                }
 874                                if (size > _readStream.size())
 875                                {
 876                                    _readStream.resize(size);
 877                                }
 878                                _readStream.pos(pos);
 879                            }
 880
 881                            if (buf.b.hasRemaining())
 882                            {
 883                                if (_endpoint.datagram())
 884                                {
 885                                    throw new DatagramLimitException(); // The message was truncated.
 886                                }
 887                                continue;
 888                            }
 889                            break;
 890                        }
 891                    }
 892
 893                    // readOp and writeOp are set to the operations that the transport read or write calls from above
 894                    // returned. They indicate which operations will need to be monitored by the thread pool's selector
 895                    // when this method returns.
 896                    int newOp = readOp | writeOp;
 897
 898                    // Operations that are ready. For example, if message was called with SocketOperationRead and the
 899                    // transport read returned SocketOperationNone, reads are considered done: there's no additional
 900                    // data to read.
 901                    int readyOp = current.operation & ~newOp;
 902
 903                    if (_state <= StateNotValidated)
 904                    {
 905                        // If the connection is still not validated and there's still data to read or write, continue
 906                        // waiting for data to read or write.
 907                        if (newOp != 0)
 908                        {
 909                            _threadPool.update(this, current.operation, newOp);
 910                            return;
 911                        }
 912
 913                        // Initialize the connection if it's not initialized yet.
 914                        if (_state == StateNotInitialized && !initialize(current.operation))
 915                        {
 916                            return;
 917                        }
 918
 919                        // Validate the connection if it's not validated yet.
 920                        if (_state <= StateNotValidated && !validate(current.operation))
 921                        {
 922                            return;
 923                        }
 924
 925                        // The connection is validated and doesn't need additional data to be read or written. So
 926                        // unregister it from the thread pool's selector.
 927                        _threadPool.unregister(this, current.operation);
 928
 929                        //
 930                        // We start out in holding state.
 931                        //
 932                        setState(StateHolding);
 933                        if (_startCallback is not null)
 934                        {
 935                            startCB = _startCallback;
 936                            _startCallback = null;
 937                            if (startCB is not null)
 938                            {
 939                                ++upcallCount;
 940                            }
 941                        }
 942                    }
 943                    else
 944                    {
 945                        Debug.Assert(_state <= StateClosingPending);
 946
 947                        //
 948                        // We parse messages first, if we receive a close
 949                        // connection message we won't send more messages.
 950                        //
 951                        if ((readyOp & SocketOperation.Read) != 0)
 952                        {
 953                            // At this point, the protocol message is fully read and can therefore be decoded by
 954                            // parseMessage. parseMessage returns the operation to wait for readiness next.
 955                            newOp |= parseMessage(ref info);
 956                            upcallCount += info.upcallCount;
 957                        }
 958
 959                        if ((readyOp & SocketOperation.Write) != 0)
 960                        {
 961                            // At this point the message from _writeStream is fully written and the next message can be
 962                            // written.
 963
 964                            newOp |= sendNextMessage(out sentCBs);
 965                            if (sentCBs is not null)
 966                            {
 967                                ++upcallCount;
 968                            }
 969                        }
 970
 971                        // If the connection is not closed yet, we can update the thread pool selector to wait for
 972                        // readiness of read, write or both operations.
 973                        if (_state < StateClosed)
 974                        {
 975                            _threadPool.update(this, current.operation, newOp);
 976                        }
 977                    }
 978
 979                    if (upcallCount == 0)
 980                    {
 981                        return; // Nothing to execute, we're done!
 982                    }
 983
 984                    _upcallCount += upcallCount;
 985
 986                    // There's something to execute so we mark IO as completed to elect a new leader thread and let IO
 987                    // be performed on this new leader thread while this thread continues with executing the upcalls.
 988                    msg.ioCompleted();
 989                }
 990                catch (DatagramLimitException) // Expected.
 991                {
 992                    if (_warnUdp)
 993                    {
 994                        _logger.warning($"maximum datagram size of {_readStream.pos()} exceeded");
 995                    }
 996                    _readStream.resize(Protocol.headerSize);
 997                    _readStream.pos(0);
 998                    _readHeader = true;
 999                    return;
 1000                }
 1001                catch (SocketException ex)
 1002                {
 1003                    setState(StateClosed, ex);
 1004                    return;
 1005                }
 1006                catch (LocalException ex)
 1007                {
 1008                    if (_endpoint.datagram())
 1009                    {
 1010                        if (_warn)
 1011                        {
 1012                            _logger.warning($"datagram connection exception:\n{ex}\n{_desc}");
 1013                        }
 1014                        _readStream.resize(Protocol.headerSize);
 1015                        _readStream.pos(0);
 1016                        _readHeader = true;
 1017                    }
 1018                    else
 1019                    {
 1020                        setState(StateClosed, ex);
 1021                    }
 1022                    return;
 1023                }
 1024            }
 1025            finally
 1026            {
 1027                msg.finishIOScope();
 1028            }
 1029        }
 1030
 1031        _threadPool.executeFromThisThread(() => upcall(startCB, sentCBs, info), this);
 1032    }
 1033
 1034    private void upcall(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
 1035    {
 1036        int completedUpcallCount = 0;
 1037
 1038        //
 1039        // Notify the factory that the connection establishment and
 1040        // validation has completed.
 1041        //
 1042        if (startCB is not null)
 1043        {
 1044            startCB.connectionStartCompleted(this);
 1045            ++completedUpcallCount;
 1046        }
 1047
 1048        //
 1049        // Notify AMI calls that the message was sent.
 1050        //
 1051        if (sentCBs is not null)
 1052        {
 1053            foreach (OutgoingMessage m in sentCBs)
 1054            {
 1055                if (m.invokeSent)
 1056                {
 1057                    m.outAsync.invokeSent();
 1058                }
 1059                if (m.receivedReply)
 1060                {
 1061                    var outAsync = (OutgoingAsync)m.outAsync;
 1062                    if (outAsync.response())
 1063                    {
 1064                        outAsync.invokeResponse();
 1065                    }
 1066                }
 1067            }
 1068            ++completedUpcallCount;
 1069        }
 1070
 1071        //
 1072        // Asynchronous replies must be handled outside the thread
 1073        // synchronization, so that nested calls are possible.
 1074        //
 1075        if (info.outAsync is not null)
 1076        {
 1077            info.outAsync.invokeResponse();
 1078            ++completedUpcallCount;
 1079        }
 1080
 1081        //
 1082        // Method invocation (or multiple invocations for batch messages)
 1083        // must be done outside the thread synchronization, so that nested
 1084        // calls are possible.
 1085        //
 1086        if (info.requestCount > 0)
 1087        {
 1088            dispatchAll(info.stream, info.requestCount, info.requestId, info.compress, info.adapter);
 1089        }
 1090
 1091        //
 1092        // Decrease the upcall count.
 1093        //
 1094        bool finished = false;
 1095        if (completedUpcallCount > 0)
 1096        {
 1097            lock (_mutex)
 1098            {
 1099                _upcallCount -= completedUpcallCount;
 1100                if (_upcallCount == 0)
 1101                {
 1102                    // Only initiate shutdown if not already initiated. It might have already been initiated if the sent
 1103                    // callback or AMI callback was called when the connection was in the closing state.
 1104                    if (_state == StateClosing)
 1105                    {
 1106                        try
 1107                        {
 1108                            initiateShutdown();
 1109                        }
 1110                        catch (Ice.LocalException ex)
 1111                        {
 1112                            setState(StateClosed, ex);
 1113                        }
 1114                    }
 1115                    else if (_state == StateFinished)
 1116                    {
 1117                        finished = true;
 1118                        _observer?.detach();
 1119                    }
 1120                    Monitor.PulseAll(_mutex);
 1121                }
 1122            }
 1123        }
 1124
 1125        if (finished && _removeFromFactory is not null)
 1126        {
 1127            _removeFromFactory(this);
 1128        }
 1129    }
 1130
 1131    public override void finished(ThreadPoolCurrent current)
 1132    {
 1133        // Lock the connection here to ensure setState() completes before the code below is executed. This method can
 1134        // be called by the thread pool as soon as setState() calls _threadPool->finish(...). There's no need to lock
 1135        // the mutex for the remainder of the code because the data members accessed by finish() are immutable once
 1136        // _state == StateClosed (and we don't want to hold the mutex when calling upcalls).
 1137        lock (_mutex)
 1138        {
 1139            Debug.Assert(_state == StateClosed);
 1140        }
 1141
 1142        //
 1143        // If there are no callbacks to call, we don't call ioCompleted() since we're not going
 1144        // to call code that will potentially block (this avoids promoting a new leader and
 1145        // unnecessary thread creation, especially if this is called on shutdown).
 1146        //
 1147        if (_startCallback is null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback is null)
 1148        {
 1149            finish();
 1150            return;
 1151        }
 1152
 1153        current.ioCompleted();
 1154        _threadPool.executeFromThisThread(finish, this);
 1155    }
 1156
 1157    private void finish()
 1158    {
 1159        if (!_initialized)
 1160        {
 1161            if (_instance.traceLevels().network >= 2)
 1162            {
 1163                var s = new StringBuilder("failed to ");
 1164                s.Append(_connector is not null ? "establish" : "accept");
 1165                s.Append(' ');
 1166                s.Append(_endpoint.protocol());
 1167                s.Append(" connection\n");
 1168                s.Append(ToString());
 1169                s.Append('\n');
 1170                s.Append(_exception);
 1171                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1172            }
 1173        }
 1174        else
 1175        {
 1176            if (_instance.traceLevels().network >= 1)
 1177            {
 1178                var s = new StringBuilder("closed ");
 1179                s.Append(_endpoint.protocol());
 1180                s.Append(" connection\n");
 1181                s.Append(ToString());
 1182
 1183                // Trace the cause of most connection closures.
 1184                if (!(_exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException))
 1185                {
 1186                    s.Append('\n');
 1187                    s.Append(_exception);
 1188                }
 1189
 1190                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1191            }
 1192        }
 1193
 1194        if (_startCallback is not null)
 1195        {
 1196            _startCallback.connectionStartFailed(this, _exception);
 1197            _startCallback = null;
 1198        }
 1199
 1200        if (_sendStreams.Count > 0)
 1201        {
 1202            if (!_writeStream.isEmpty())
 1203            {
 1204                //
 1205                // Return the stream to the outgoing call. This is important for
 1206                // retriable AMI calls which are not marshaled again.
 1207                //
 1208                OutgoingMessage message = _sendStreams.First.Value;
 1209                _writeStream.swap(message.stream);
 1210
 1211                //
 1212                // The current message might be sent but not yet removed from _sendStreams. If
 1213                // the response has been received in the meantime, we remove the message from
 1214                // _sendStreams to not call finished on a message which is already done.
 1215                //
 1216                if (message.isSent || message.receivedReply)
 1217                {
 1218                    if (message.sent() && message.invokeSent)
 1219                    {
 1220                        message.outAsync.invokeSent();
 1221                    }
 1222                    if (message.receivedReply)
 1223                    {
 1224                        var outAsync = (OutgoingAsync)message.outAsync;
 1225                        if (outAsync.response())
 1226                        {
 1227                            outAsync.invokeResponse();
 1228                        }
 1229                    }
 1230                    _sendStreams.RemoveFirst();
 1231                }
 1232            }
 1233
 1234            foreach (OutgoingMessage o in _sendStreams)
 1235            {
 1236                o.completed(_exception);
 1237                if (o.requestId > 0) // Make sure finished isn't called twice.
 1238                {
 1239                    _asyncRequests.Remove(o.requestId);
 1240                }
 1241            }
 1242            _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
 1243        }
 1244
 1245        foreach (OutgoingAsyncBase o in _asyncRequests.Values)
 1246        {
 1247            if (o.exception(_exception))
 1248            {
 1249                o.invokeException();
 1250            }
 1251        }
 1252        _asyncRequests.Clear();
 1253
 1254        //
 1255        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
 1256        //
 1257        _writeStream.clear();
 1258        _writeStream.getBuffer().clear();
 1259        _readStream.clear();
 1260        _readStream.getBuffer().clear();
 1261
 1262        if (_exception is ConnectionClosedException or
 1263            CloseConnectionException or
 1264            CommunicatorDestroyedException or
 1265            ObjectAdapterDeactivatedException)
 1266        {
 1267            // Can execute synchronously. Note that we're not within a lock(this) here.
 1268            _closed.SetResult();
 1269        }
 1270        else
 1271        {
 1272            Debug.Assert(_exception is not null);
 1273            _closed.SetException(_exception);
 1274        }
 1275
 1276        if (_closeCallback is not null)
 1277        {
 1278            try
 1279            {
 1280                _closeCallback(this);
 1281            }
 1282            catch (System.Exception ex)
 1283            {
 1284                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 1285            }
 1286            _closeCallback = null;
 1287        }
 1288
 1289        //
 1290        // This must be done last as this will cause waitUntilFinished() to return (and communicator
 1291        // objects such as the timer might be destroyed too).
 1292        //
 1293        bool finished = false;
 1294        lock (_mutex)
 1295        {
 1296            setState(StateFinished);
 1297
 1298            if (_upcallCount == 0)
 1299            {
 1300                finished = true;
 1301                _observer?.detach();
 1302            }
 1303        }
 1304
 1305        if (finished && _removeFromFactory is not null)
 1306        {
 1307            _removeFromFactory(this);
 1308        }
 1309    }
 1310
 1311    /// <inheritdoc/>
 1312    public override string ToString() => _desc; // No mutex lock, _desc is immutable.
 1313
 1314    /// <inheritdoc/>
 1315    public string type() => _type; // No mutex lock, _type is immutable.
 1316
 1317    /// <inheritdoc/>
 1318    public ConnectionInfo getInfo()
 1319    {
 1320        lock (_mutex)
 1321        {
 1322            if (_state >= StateClosed)
 1323            {
 1324                throw _exception;
 1325            }
 1326            return initConnectionInfo();
 1327        }
 1328    }
 1329
 1330    /// <inheritdoc/>
 1331    public void setBufferSize(int rcvSize, int sndSize)
 1332    {
 1333        lock (_mutex)
 1334        {
 1335            if (_state >= StateClosed)
 1336            {
 1337                throw _exception;
 1338            }
 1339            _transceiver.setBufferSize(rcvSize, sndSize);
 1340            _info = null; // Invalidate the cached connection info
 1341        }
 1342    }
 1343
 1344    public void exception(LocalException ex)
 1345    {
 1346        lock (_mutex)
 1347        {
 1348            setState(StateClosed, ex);
 1349        }
 1350    }
 1351
 1352    public Ice.Internal.ThreadPool getThreadPool() => _threadPool;
 1353
 1354    internal ConnectionI(
 1355        Instance instance,
 1356        Transceiver transceiver,
 1357        Connector connector, // null for incoming connections, non-null for outgoing connections
 1358        EndpointI endpoint,
 1359        ObjectAdapter adapter,
 1360        Action<ConnectionI> removeFromFactory, // can be null
 1361        ConnectionOptions options)
 1362    {
 1363        _instance = instance;
 1364        _desc = transceiver.ToString();
 1365        _type = transceiver.protocol();
 1366        _connector = connector;
 1367        _endpoint = endpoint;
 1368        _adapter = adapter;
 1369        InitializationData initData = instance.initializationData();
 1370        _logger = initData.logger; // Cached for better performance.
 1371        _traceLevels = instance.traceLevels(); // Cached for better performance.
 1372        _connectTimeout = options.connectTimeout;
 1373        _closeTimeout = options.closeTimeout; // not used for datagram connections
 1374        // suppress inactivity timeout for datagram connections
 1375        _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
 1376        _maxDispatches = options.maxDispatches;
 1377        _removeFromFactory = removeFromFactory;
 1378        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 1379        _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
 1380        _nextRequestId = 1;
 1381        _messageSizeMax = connector is null ? adapter.messageSizeMax() : instance.messageSizeMax();
 1382        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
 1383        _readStream = new InputStream(instance, Util.currentProtocolEncoding);
 1384        _readHeader = false;
 1385        _readStreamPos = -1;
 1386        _writeStream = new OutputStream(); // temporary stream
 1387        _writeStreamPos = -1;
 1388        _upcallCount = 0;
 1389        _state = StateNotInitialized;
 1390
 1391        _compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
 1392        if (_compressionLevel < 1)
 1393        {
 1394            _compressionLevel = 1;
 1395        }
 1396        else if (_compressionLevel > 9)
 1397        {
 1398            _compressionLevel = 9;
 1399        }
 1400
 1401        if (options.idleTimeout > TimeSpan.Zero && !endpoint.datagram())
 1402        {
 1403            _idleTimeoutTransceiver = new IdleTimeoutTransceiverDecorator(
 1404                transceiver,
 1405                this,
 1406                options.idleTimeout,
 1407                options.enableIdleCheck);
 1408            transceiver = _idleTimeoutTransceiver;
 1409        }
 1410        _transceiver = transceiver;
 1411
 1412        try
 1413        {
 1414            if (connector is null)
 1415            {
 1416                // adapter is always set for incoming connections
 1417                Debug.Assert(adapter is not null);
 1418                _threadPool = adapter.getThreadPool();
 1419            }
 1420            else
 1421            {
 1422                // we use the client thread pool for outgoing connections, even if there is an
 1423                // object adapter with its own thread pool.
 1424                _threadPool = instance.clientThreadPool();
 1425            }
 1426            _threadPool.initialize(this);
 1427        }
 1428        catch (LocalException)
 1429        {
 1430            throw;
 1431        }
 1432        catch (System.Exception ex)
 1433        {
 1434            throw new SyscallException(ex);
 1435        }
 1436    }
 1437
 1438    /// <summary>Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active and
 1439    /// does not receive a byte for some time. See the IdleTimeoutTransceiverDecorator.</summary>
 1440    internal void idleCheck(TimeSpan idleTimeout)
 1441    {
 1442        lock (_mutex)
 1443        {
 1444            if (_state == StateActive && _idleTimeoutTransceiver!.idleCheckEnabled)
 1445            {
 1446                int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;
 1447
 1448                setState(
 1449                    StateClosed,
 1450                    new ConnectionAbortedException(
 1451                        $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSe
 1452                        closedByApplication: false));
 1453            }
 1454            // else nothing to do
 1455        }
 1456    }
 1457
 1458    internal void sendHeartbeat()
 1459    {
 1460        Debug.Assert(!_endpoint.datagram());
 1461
 1462        lock (_mutex)
 1463        {
 1464            if (_state == StateActive || _state == StateHolding || _state == StateClosing)
 1465            {
 1466                // We check if the connection has become inactive.
 1467                if (
 1468                    _inactivityTimer is null &&           // timer not already scheduled
 1469                    _inactivityTimeout > TimeSpan.Zero && // inactivity timeout is enabled
 1470                    _state == StateActive &&              // only schedule the timer if the connection is active
 1471                    _dispatchCount == 0 &&                // no pending dispatch
 1472                    _asyncRequests.Count == 0 &&          // no pending invocation
 1473                    _readHeader &&                        // we're not waiting for the remainder of an incoming message
 1474                    _sendStreams.Count <= 1)              // there is at most one pending outgoing message
 1475                {
 1476                    // We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
 1477                    // inactivity timer if there is no pending outgoing message or the pending outgoing message is a
 1478                    // heartbeat.
 1479
 1480                    // The stream of the first _sendStreams message is in _writeStream.
 1481                    if (_sendStreams.Count == 0 || isHeartbeat(_writeStream))
 1482                    {
 1483                        scheduleInactivityTimer();
 1484                    }
 1485                }
 1486
 1487                // We send a heartbeat to the peer to generate a "write" on the connection. This write in turns creates
 1488                // a read on the peer, and resets the peer's idle check timer. When _sendStream is not empty, there is
 1489                // already an outstanding write, so we don't need to send a heartbeat. It's possible the first message
 1490                // of _sendStreams was already sent but not yet removed from _sendStreams: it means the last write
 1491                // occurred very recently, which is good enough with respect to the idle check.
 1492                // As a result of this optimization, the only possible heartbeat in _sendStreams is the first
 1493                // _sendStreams message.
 1494                if (_sendStreams.Count == 0)
 1495                {
 1496                    var os = new OutputStream(Util.currentProtocolEncoding);
 1497                    os.writeBlob(Protocol.magic);
 1498                    ProtocolVersion.ice_write(os, Util.currentProtocol);
 1499                    EncodingVersion.ice_write(os, Util.currentProtocolEncoding);
 1500                    os.writeByte(Protocol.validateConnectionMsg);
 1501                    os.writeByte(0);
 1502                    os.writeInt(Protocol.headerSize); // Message size.
 1503                    try
 1504                    {
 1505                        _ = sendMessage(new OutgoingMessage(os, compress: false));
 1506                    }
 1507                    catch (LocalException ex)
 1508                    {
 1509                        setState(StateClosed, ex);
 1510                    }
 1511                }
 1512            }
 1513            // else nothing to do
 1514        }
 1515
 1516        static bool isHeartbeat(OutputStream stream) =>
 1517            stream.getBuffer().b.get(8) == Protocol.validateConnectionMsg;
 1518    }
 1519
 1520    private const int StateNotInitialized = 0;
 1521    private const int StateNotValidated = 1;
 1522    private const int StateActive = 2;
 1523    private const int StateHolding = 3;
 1524    private const int StateClosing = 4;
 1525    private const int StateClosingPending = 5;
 1526    private const int StateClosed = 6;
 1527    private const int StateFinished = 7;
 1528
 1529    private static ConnectionState toConnectionState(int state) => connectionStateMap[state];
 1530
 1531    private void setState(int state, LocalException ex)
 1532    {
 1533        //
 1534        // If setState() is called with an exception, then only closed
 1535        // and closing states are permissible.
 1536        //
 1537        Debug.Assert(state >= StateClosing);
 1538
 1539        if (_state == state) // Don't switch twice.
 1540        {
 1541            return;
 1542        }
 1543
 1544        if (_exception is null)
 1545        {
 1546            //
 1547            // If we are in closed state, an exception must be set.
 1548            //
 1549            Debug.Assert(_state != StateClosed);
 1550
 1551            _exception = ex;
 1552
 1553            //
 1554            // We don't warn if we are not validated.
 1555            //
 1556            if (_warn && _validated)
 1557            {
 1558                //
 1559                // Don't warn about certain expected exceptions.
 1560                //
 1561                if (!(_exception is CloseConnectionException ||
 1562                     _exception is ConnectionClosedException ||
 1563                     _exception is CommunicatorDestroyedException ||
 1564                     _exception is ObjectAdapterDeactivatedException ||
 1565                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1566                {
 1567                    warning("connection exception", _exception);
 1568                }
 1569            }
 1570        }
 1571
 1572        //
 1573        // We must set the new state before we notify requests of any
 1574        // exceptions. Otherwise new requests may retry on a
 1575        // connection that is not yet marked as closed or closing.
 1576        //
 1577        setState(state);
 1578    }
 1579
 1580    private void setState(int state)
 1581    {
 1582        //
 1583        // We don't want to send close connection messages if the endpoint
 1584        // only supports oneway transmission from client to server.
 1585        //
 1586        if (_endpoint.datagram() && state == StateClosing)
 1587        {
 1588            state = StateClosed;
 1589        }
 1590
 1591        //
 1592        // Skip graceful shutdown if we are destroyed before validation.
 1593        //
 1594        if (_state <= StateNotValidated && state == StateClosing)
 1595        {
 1596            state = StateClosed;
 1597        }
 1598
 1599        if (_state == state) // Don't switch twice.
 1600        {
 1601            return;
 1602        }
 1603
 1604        if (state > StateActive)
 1605        {
 1606            // Dispose the inactivity timer, if not null.
 1607            cancelInactivityTimer();
 1608        }
 1609
 1610        try
 1611        {
 1612            switch (state)
 1613            {
 1614                case StateNotInitialized:
 1615                {
 1616                    Debug.Assert(false);
 1617                    break;
 1618                }
 1619
 1620                case StateNotValidated:
 1621                {
 1622                    if (_state != StateNotInitialized)
 1623                    {
 1624                        Debug.Assert(_state == StateClosed);
 1625                        return;
 1626                    }
 1627                    break;
 1628                }
 1629
 1630                case StateActive:
 1631                {
 1632                    //
 1633                    // Can only switch to active from holding or not validated.
 1634                    //
 1635                    if (_state != StateHolding && _state != StateNotValidated)
 1636                    {
 1637                        return;
 1638                    }
 1639
 1640                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)
 1641                    {
 1642                        _threadPool.register(this, SocketOperation.Read);
 1643                        _idleTimeoutTransceiver?.enableIdleCheck();
 1644                    }
 1645                    // else don't resume reading since we're at or over the _maxDispatches limit.
 1646
 1647                    break;
 1648                }
 1649
 1650                case StateHolding:
 1651                {
 1652                    //
 1653                    // Can only switch to holding from active or not validated.
 1654                    //
 1655                    if (_state != StateActive && _state != StateNotValidated)
 1656                    {
 1657                        return;
 1658                    }
 1659
 1660                    if (_state == StateActive && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches))
 1661                    {
 1662                        _threadPool.unregister(this, SocketOperation.Read);
 1663                        _idleTimeoutTransceiver?.disableIdleCheck();
 1664                    }
 1665                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
 1666
 1667                    break;
 1668                }
 1669
 1670                case StateClosing:
 1671                case StateClosingPending:
 1672                {
 1673                    //
 1674                    // Can't change back from closing pending.
 1675                    //
 1676                    if (_state >= StateClosingPending)
 1677                    {
 1678                        return;
 1679                    }
 1680                    break;
 1681                }
 1682
 1683                case StateClosed:
 1684                {
 1685                    if (_state == StateFinished)
 1686                    {
 1687                        return;
 1688                    }
 1689
 1690                    _batchRequestQueue.destroy(_exception);
 1691                    _threadPool.finish(this);
 1692                    _transceiver.close();
 1693                    break;
 1694                }
 1695
 1696                case StateFinished:
 1697                {
 1698                    Debug.Assert(_state == StateClosed);
 1699                    _transceiver.destroy();
 1700                    break;
 1701                }
 1702            }
 1703        }
 1704        catch (LocalException ex)
 1705        {
 1706            _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
 1707        }
 1708
 1709        if (_instance.initializationData().observer is not null)
 1710        {
 1711            ConnectionState oldState = toConnectionState(_state);
 1712            ConnectionState newState = toConnectionState(state);
 1713            if (oldState != newState)
 1714            {
 1715                _observer = _instance.initializationData().observer.getConnectionObserver(
 1716                    initConnectionInfo(),
 1717                    _endpoint,
 1718                    newState,
 1719                    _observer);
 1720                if (_observer is not null)
 1721                {
 1722                    _observer.attach();
 1723                }
 1724                else
 1725                {
 1726                    _writeStreamPos = -1;
 1727                    _readStreamPos = -1;
 1728                }
 1729            }
 1730            if (_observer is not null && state == StateClosed && _exception is not null)
 1731            {
 1732                if (!(_exception is CloseConnectionException ||
 1733                     _exception is ConnectionClosedException ||
 1734                     _exception is CommunicatorDestroyedException ||
 1735                     _exception is ObjectAdapterDeactivatedException ||
 1736                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1737                {
 1738                    _observer.failed(_exception.ice_id());
 1739                }
 1740            }
 1741        }
 1742        _state = state;
 1743
 1744        Monitor.PulseAll(_mutex);
 1745
 1746        if (_state == StateClosing && _upcallCount == 0)
 1747        {
 1748            try
 1749            {
 1750                initiateShutdown();
 1751            }
 1752            catch (LocalException ex)
 1753            {
 1754                setState(StateClosed, ex);
 1755            }
 1756        }
 1757    }
 1758
 1759    private void initiateShutdown()
 1760    {
 1761        Debug.Assert(_state == StateClosing && _upcallCount == 0);
 1762
 1763        if (_shutdownInitiated)
 1764        {
 1765            return;
 1766        }
 1767        _shutdownInitiated = true;
 1768
 1769        if (!_endpoint.datagram())
 1770        {
 1771            //
 1772            // Before we shut down, we send a close connection message.
 1773            //
 1774            var os = new OutputStream(Util.currentProtocolEncoding);
 1775            os.writeBlob(Protocol.magic);
 1776            ProtocolVersion.ice_write(os, Util.currentProtocol);
 1777            EncodingVersion.ice_write(os, Util.currentProtocolEncoding);
 1778            os.writeByte(Protocol.closeConnectionMsg);
 1779            os.writeByte(0); // Compression status: always zero for close connection.
 1780            os.writeInt(Protocol.headerSize); // Message size.
 1781
 1782            scheduleCloseTimer();
 1783
 1784            if ((sendMessage(new OutgoingMessage(os, compress: false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
 1785            {
 1786                setState(StateClosingPending);
 1787
 1788                //
 1789                // Notify the transceiver of the graceful connection closure.
 1790                //
 1791                int op = _transceiver.closing(true, _exception);
 1792                if (op != 0)
 1793                {
 1794                    _threadPool.register(this, op);
 1795                }
 1796            }
 1797        }
 1798    }
 1799
 1800    private bool initialize(int operation)
 1801    {
 1802        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
 1803        if (s != SocketOperation.None)
 1804        {
 1805            _threadPool.update(this, operation, s);
 1806            return false;
 1807        }
 1808
 1809        //
 1810        // Update the connection description once the transceiver is initialized.
 1811        //
 1812        _desc = _transceiver.ToString();
 1813        _initialized = true;
 1814        setState(StateNotValidated);
 1815
 1816        return true;
 1817    }
 1818
 1819    private bool validate(int operation)
 1820    {
 1821        if (!_endpoint.datagram()) // Datagram connections are always implicitly validated.
 1822        {
 1823            if (_connector is null) // The server side has the active role for connection validation.
 1824            {
 1825                if (_writeStream.size() == 0)
 1826                {
 1827                    _writeStream.writeBlob(Protocol.magic);
 1828                    ProtocolVersion.ice_write(_writeStream, Util.currentProtocol);
 1829                    EncodingVersion.ice_write(_writeStream, Util.currentProtocolEncoding);
 1830                    _writeStream.writeByte(Protocol.validateConnectionMsg);
 1831                    _writeStream.writeByte(0); // Compression status (always zero for validate connection).
 1832                    _writeStream.writeInt(Protocol.headerSize); // Message size.
 1833                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
 1834                    _writeStream.prepareWrite();
 1835                }
 1836
 1837                if (_observer is not null)
 1838                {
 1839                    observerStartWrite(_writeStream.getBuffer());
 1840                }
 1841
 1842                if (_writeStream.pos() != _writeStream.size())
 1843                {
 1844                    int op = write(_writeStream.getBuffer());
 1845                    if (op != 0)
 1846                    {
 1847                        _threadPool.update(this, operation, op);
 1848                        return false;
 1849                    }
 1850                }
 1851
 1852                if (_observer is not null)
 1853                {
 1854                    observerFinishWrite(_writeStream.getBuffer());
 1855                }
 1856            }
 1857            else // The client side has the passive role for connection validation.
 1858            {
 1859                if (_readStream.size() == 0)
 1860                {
 1861                    _readStream.resize(Protocol.headerSize);
 1862                    _readStream.pos(0);
 1863                }
 1864
 1865                if (_observer is not null)
 1866                {
 1867                    observerStartRead(_readStream.getBuffer());
 1868                }
 1869
 1870                if (_readStream.pos() != _readStream.size())
 1871                {
 1872                    int op = read(_readStream.getBuffer());
 1873                    if (op != 0)
 1874                    {
 1875                        _threadPool.update(this, operation, op);
 1876                        return false;
 1877                    }
 1878                }
 1879
 1880                if (_observer is not null)
 1881                {
 1882                    observerFinishRead(_readStream.getBuffer());
 1883                }
 1884
 1885                _validated = true;
 1886
 1887                Debug.Assert(_readStream.pos() == Protocol.headerSize);
 1888                _readStream.pos(0);
 1889                byte[] m = _readStream.readBlob(4);
 1890                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 1891                   m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 1892                {
 1893                    throw new ProtocolException(
 1894                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 1895                }
 1896
 1897                var pv = new ProtocolVersion(_readStream);
 1898                if (pv != Util.currentProtocol)
 1899                {
 1900                    throw new MarshalException(
 1901                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 1902                }
 1903                var ev = new EncodingVersion(_readStream);
 1904                if (ev != Util.currentProtocolEncoding)
 1905                {
 1906                    throw new MarshalException(
 1907                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 1908                }
 1909
 1910                byte messageType = _readStream.readByte();
 1911                if (messageType != Protocol.validateConnectionMsg)
 1912                {
 1913                    throw new ProtocolException(
 1914                        $"Received message of type {messageType} over a connection that is not yet validated.");
 1915                }
 1916                _readStream.readByte(); // Ignore compression status for validate connection.
 1917                int size = _readStream.readInt();
 1918                if (size != Protocol.headerSize)
 1919                {
 1920                    throw new MarshalException($"Received ValidateConnection message with unexpected size {size}.");
 1921                }
 1922                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);
 1923
 1924                // Client connection starts sending heartbeats once it's received the ValidateConnection message.
 1925                _idleTimeoutTransceiver?.scheduleHeartbeat();
 1926            }
 1927        }
 1928
 1929        _writeStream.resize(0);
 1930        _writeStream.pos(0);
 1931
 1932        _readStream.resize(Protocol.headerSize);
 1933        _readStream.pos(0);
 1934        _readHeader = true;
 1935
 1936        if (_instance.traceLevels().network >= 1)
 1937        {
 1938            var s = new StringBuilder();
 1939            if (_endpoint.datagram())
 1940            {
 1941                s.Append("starting to ");
 1942                s.Append(_connector is not null ? "send" : "receive");
 1943                s.Append(' ');
 1944                s.Append(_endpoint.protocol());
 1945                s.Append(" messages\n");
 1946                s.Append(_transceiver.toDetailedString());
 1947            }
 1948            else
 1949            {
 1950                s.Append(_connector is not null ? "established" : "accepted");
 1951                s.Append(' ');
 1952                s.Append(_endpoint.protocol());
 1953                s.Append(" connection\n");
 1954                s.Append(ToString());
 1955            }
 1956            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1957        }
 1958
 1959        return true;
 1960    }
 1961
 1962    /// <summary>
 1963    /// Sends the next queued messages. This method is called by message() once the message which is being sent
 1964    /// (_sendStreams.First) is fully sent. Before sending the next message, this message is removed from _sendsStream
 1965    /// If any, its sent callback is also queued in given callback queue.
 1966    /// </summary>
 1967    /// <param name="callbacks">The sent callbacks to call for the messages that were sent.</param>
 1968    /// <returns>The socket operation to register with the thread pool's selector to send the remainder of the pending
 1969    /// message being sent (_sendStreams.First).</returns>
 1970    private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
 1971    {
 1972        callbacks = null;
 1973
 1974        if (_sendStreams.Count == 0)
 1975        {
 1976            // This can occur if no message was being written and the socket write operation was registered with the
 1977            // thread pool (a transceiver read method can request writing data).
 1978            return SocketOperation.None;
 1979        }
 1980        else if (_state == StateClosingPending && _writeStream.pos() == 0)
 1981        {
 1982            // Message wasn't sent, empty the _writeStream, we're not going to send more data because the connection
 1983            // is being closed.
 1984            OutgoingMessage message = _sendStreams.First.Value;
 1985            _writeStream.swap(message.stream);
 1986            return SocketOperation.None;
 1987        }
 1988
 1989        // Assert that the message was fully written.
 1990        Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
 1991
 1992        try
 1993        {
 1994            while (true)
 1995            {
 1996                //
 1997                // The message that was being sent is sent. We can swap back the write stream buffer to the
 1998                // outgoing message (required for retry) and queue its sent callback (if any).
 1999                //
 2000                OutgoingMessage message = _sendStreams.First.Value;
 2001                _writeStream.swap(message.stream);
 2002                if (message.sent())
 2003                {
 2004                    callbacks ??= new Queue<OutgoingMessage>();
 2005                    callbacks.Enqueue(message);
 2006                }
 2007                _sendStreams.RemoveFirst();
 2008
 2009                //
 2010                // If there's nothing left to send, we're done.
 2011                //
 2012                if (_sendStreams.Count == 0)
 2013                {
 2014                    break;
 2015                }
 2016
 2017                //
 2018                // If we are in the closed state or if the close is pending, don't continue sending. This can occur if
 2019                // parseMessage (called before sendNextMessage by message()) closes the connection.
 2020                //
 2021                if (_state >= StateClosingPending)
 2022                {
 2023                    return SocketOperation.None;
 2024                }
 2025
 2026                //
 2027                // Otherwise, prepare the next message.
 2028                //
 2029                message = _sendStreams.First.Value;
 2030                Debug.Assert(!message.prepared);
 2031                OutputStream stream = message.stream;
 2032
 2033                message.stream = doCompress(message.stream, message.compress);
 2034                message.stream.prepareWrite();
 2035                message.prepared = true;
 2036
 2037                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2038
 2039                //
 2040                // Send the message.
 2041                //
 2042                _writeStream.swap(message.stream);
 2043                if (_observer is not null)
 2044                {
 2045                    observerStartWrite(_writeStream.getBuffer());
 2046                }
 2047                if (_writeStream.pos() != _writeStream.size())
 2048                {
 2049                    int op = write(_writeStream.getBuffer());
 2050                    if (op != 0)
 2051                    {
 2052                        return op;
 2053                    }
 2054                }
 2055                if (_observer is not null)
 2056                {
 2057                    observerFinishWrite(_writeStream.getBuffer());
 2058                }
 2059
 2060                // If the message was sent right away, loop to send the next queued message.
 2061            }
 2062
 2063            // Once the CloseConnection message is sent, we transition to the StateClosingPending state.
 2064            if (_state == StateClosing && _shutdownInitiated)
 2065            {
 2066                setState(StateClosingPending);
 2067                int op = _transceiver.closing(true, _exception);
 2068                if (op != 0)
 2069                {
 2070                    return op;
 2071                }
 2072            }
 2073        }
 2074        catch (LocalException ex)
 2075        {
 2076            setState(StateClosed, ex);
 2077        }
 2078        return SocketOperation.None;
 2079    }
 2080
 2081    /// <summary>
 2082    /// Sends or queues the given message.
 2083    /// </summary>
 2084    /// <param name="message">The message to send.</param>
 2085    /// <returns>The send status.</returns>
 2086    private int sendMessage(OutgoingMessage message)
 2087    {
 2088        Debug.Assert(_state >= StateActive);
 2089        Debug.Assert(_state < StateClosed);
 2090
 2091        // Some messages are queued for sending. Just adds the message to the send queue and tell the caller that
 2092        // the message was queued.
 2093        if (_sendStreams.Count > 0)
 2094        {
 2095            _sendStreams.AddLast(message);
 2096            return OutgoingAsyncBase.AsyncStatusQueued;
 2097        }
 2098
 2099        // Prepare the message for sending.
 2100        Debug.Assert(!message.prepared);
 2101
 2102        OutputStream stream = message.stream;
 2103
 2104        message.stream = doCompress(stream, message.compress);
 2105        message.stream.prepareWrite();
 2106        message.prepared = true;
 2107
 2108        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2109
 2110        // Send the message without blocking.
 2111        if (_observer is not null)
 2112        {
 2113            observerStartWrite(message.stream.getBuffer());
 2114        }
 2115        int op = write(message.stream.getBuffer());
 2116        if (op == 0)
 2117        {
 2118            // The message was sent so we're done.
 2119
 2120            if (_observer is not null)
 2121            {
 2122                observerFinishWrite(message.stream.getBuffer());
 2123            }
 2124
 2125            int status = OutgoingAsyncBase.AsyncStatusSent;
 2126            if (message.sent())
 2127            {
 2128                // If there's a sent callback, indicate the caller that it should invoke the sent callback.
 2129                status |= OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
 2130            }
 2131
 2132            return status;
 2133        }
 2134
 2135        // The message couldn't be sent right away so we add it to the send stream queue (which is empty) and swap its
 2136        // stream with `_writeStream`. The socket operation returned by the transceiver write is registered with the
 2137        // thread pool. At this point the message() method will take care of sending the whole message (held by
 2138        // _writeStream) when the transceiver is ready to write more of the message buffer.
 2139
 2140        _writeStream.swap(message.stream);
 2141        _sendStreams.AddLast(message);
 2142        _threadPool.register(this, op);
 2143        return OutgoingAsyncBase.AsyncStatusQueued;
 2144    }
 2145
 2146    private OutputStream doCompress(OutputStream decompressed, bool compress)
 2147    {
 2148        if (BZip2.isLoaded(_logger) && compress && decompressed.size() >= 100)
 2149        {
 2150            //
 2151            // Do compression.
 2152            //
 2153            Ice.Internal.Buffer cbuf = BZip2.compress(
 2154                decompressed.getBuffer(),
 2155                Protocol.headerSize,
 2156                _compressionLevel);
 2157            if (cbuf is not null)
 2158            {
 2159                var cstream = new OutputStream(new Internal.Buffer(cbuf, true), decompressed.getEncoding());
 2160
 2161                //
 2162                // Set compression status.
 2163                //
 2164                cstream.pos(9);
 2165                cstream.writeByte(2);
 2166
 2167                //
 2168                // Write the size of the compressed stream into the header.
 2169                //
 2170                cstream.pos(10);
 2171                cstream.writeInt(cstream.size());
 2172
 2173                //
 2174                // Write the compression status and size of the compressed stream into the header of the
 2175                // decompressed stream -- we need this to trace requests correctly.
 2176                //
 2177                decompressed.pos(9);
 2178                decompressed.writeByte(2);
 2179                decompressed.writeInt(cstream.size());
 2180
 2181                return cstream;
 2182            }
 2183        }
 2184
 2185        // Write the compression status. If BZip2 is loaded and compress is set to true, we write 1, to request a
 2186        // compressed reply. Otherwise, we write 0 either BZip2 is not loaded or we are sending an uncompressed reply.
 2187        decompressed.pos(9);
 2188        decompressed.writeByte((byte)((BZip2.isLoaded(_logger) && compress) ? 1 : 0));
 2189
 2190        //
 2191        // Not compressed, fill in the message size.
 2192        //
 2193        decompressed.pos(10);
 2194        decompressed.writeInt(decompressed.size());
 2195
 2196        return decompressed;
 2197    }
 2198
 2199    private struct MessageInfo
 2200    {
 2201        public InputStream stream;
 2202        public int requestCount;
 2203        public int requestId;
 2204        public byte compress;
 2205        public ObjectAdapter adapter;
 2206        public OutgoingAsyncBase outAsync;
 2207        public int upcallCount;
 2208    }
 2209
 2210    private int parseMessage(ref MessageInfo info)
 2211    {
 2212        Debug.Assert(_state > StateNotValidated && _state < StateClosed);
 2213
 2214        info.stream = new InputStream(_instance, Util.currentProtocolEncoding);
 2215        _readStream.swap(info.stream);
 2216        _readStream.resize(Protocol.headerSize);
 2217        _readStream.pos(0);
 2218        _readHeader = true;
 2219
 2220        Debug.Assert(info.stream.pos() == info.stream.size());
 2221
 2222        try
 2223        {
 2224            //
 2225            // The magic and version fields have already been checked.
 2226            //
 2227            info.stream.pos(8);
 2228            byte messageType = info.stream.readByte();
 2229            info.compress = info.stream.readByte();
 2230            if (info.compress == 2)
 2231            {
 2232                if (BZip2.isLoaded(_logger))
 2233                {
 2234                    Ice.Internal.Buffer ubuf = BZip2.decompress(
 2235                        info.stream.getBuffer(),
 2236                        Protocol.headerSize,
 2237                        _messageSizeMax);
 2238                    info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
 2239                }
 2240                else
 2241                {
 2242                    throw new FeatureNotSupportedException(
 2243                        "Cannot decompress compressed message: BZip2 library is not loaded.");
 2244                }
 2245            }
 2246            info.stream.pos(Protocol.headerSize);
 2247
 2248            switch (messageType)
 2249            {
 2250                case Protocol.closeConnectionMsg:
 2251                {
 2252                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2253                    if (_endpoint.datagram())
 2254                    {
 2255                        if (_warn)
 2256                        {
 2257                            _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
 2258                        }
 2259                    }
 2260                    else
 2261                    {
 2262                        setState(StateClosingPending, new CloseConnectionException());
 2263
 2264                        //
 2265                        // Notify the transceiver of the graceful connection closure.
 2266                        //
 2267                        int op = _transceiver.closing(false, _exception);
 2268                        if (op != 0)
 2269                        {
 2270                            scheduleCloseTimer();
 2271                            return op;
 2272                        }
 2273                        setState(StateClosed);
 2274                    }
 2275                    break;
 2276                }
 2277
 2278                case Protocol.requestMsg:
 2279                {
 2280                    if (_state >= StateClosing)
 2281                    {
 2282                        TraceUtil.trace(
 2283                            "received request during closing\n(ignored by server, client will retry)",
 2284                            info.stream,
 2285                            this,
 2286                            _logger,
 2287                            _traceLevels);
 2288                    }
 2289                    else
 2290                    {
 2291                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2292                        info.requestId = info.stream.readInt();
 2293                        info.requestCount = 1;
 2294                        info.adapter = _adapter;
 2295                        ++info.upcallCount;
 2296
 2297                        cancelInactivityTimer();
 2298                        ++_dispatchCount;
 2299                    }
 2300                    break;
 2301                }
 2302
 2303                case Protocol.requestBatchMsg:
 2304                {
 2305                    if (_state >= StateClosing)
 2306                    {
 2307                        TraceUtil.trace(
 2308                            "received batch request during closing\n(ignored by server, client will retry)",
 2309                            info.stream,
 2310                            this,
 2311                            _logger,
 2312                            _traceLevels);
 2313                    }
 2314                    else
 2315                    {
 2316                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2317                        int requestCount = info.stream.readInt();
 2318                        if (requestCount < 0)
 2319                        {
 2320                            throw new MarshalException($"Received batch request with {requestCount} batches.");
 2321                        }
 2322                        info.requestCount = requestCount;
 2323                        info.adapter = _adapter;
 2324                        info.upcallCount += info.requestCount;
 2325
 2326                        cancelInactivityTimer();
 2327                        _dispatchCount += info.requestCount;
 2328                    }
 2329                    break;
 2330                }
 2331
 2332                case Protocol.replyMsg:
 2333                {
 2334                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2335                    info.requestId = info.stream.readInt();
 2336                    if (_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
 2337                    {
 2338                        _asyncRequests.Remove(info.requestId);
 2339
 2340                        info.outAsync.getIs().swap(info.stream);
 2341
 2342                        //
 2343                        // If we just received the reply for a request which isn't acknowledge as
 2344                        // sent yet, we queue the reply instead of processing it right away. It
 2345                        // will be processed once the write callback is invoked for the message.
 2346                        //
 2347                        OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
 2348                        if (message is not null && message.outAsync == info.outAsync)
 2349                        {
 2350                            message.receivedReply = true;
 2351                        }
 2352                        else if (info.outAsync.response())
 2353                        {
 2354                            ++info.upcallCount;
 2355                        }
 2356                        else
 2357                        {
 2358                            info.outAsync = null;
 2359                        }
 2360                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 2361                        {
 2362                            doApplicationClose();
 2363                        }
 2364                    }
 2365                    break;
 2366                }
 2367
 2368                case Protocol.validateConnectionMsg:
 2369                {
 2370                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2371                    break;
 2372                }
 2373
 2374                default:
 2375                {
 2376                    TraceUtil.trace(
 2377                        "received unknown message\n(invalid, closing connection)",
 2378                        info.stream,
 2379                        this,
 2380                        _logger,
 2381                        _traceLevels);
 2382
 2383                    throw new ProtocolException($"Received Ice protocol message with unknown type: {messageType}");
 2384                }
 2385            }
 2386        }
 2387        catch (LocalException ex)
 2388        {
 2389            if (_endpoint.datagram())
 2390            {
 2391                if (_warn)
 2392                {
 2393                    _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
 2394                }
 2395            }
 2396            else
 2397            {
 2398                setState(StateClosed, ex);
 2399            }
 2400        }
 2401
 2402        if (_state == StateHolding)
 2403        {
 2404            // Don't continue reading if the connection is in the holding state.
 2405            return SocketOperation.None;
 2406        }
 2407        else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
 2408        {
 2409            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
 2410            _idleTimeoutTransceiver?.disableIdleCheck();
 2411            return SocketOperation.None;
 2412        }
 2413        else
 2414        {
 2415            // Continue reading.
 2416            return SocketOperation.Read;
 2417        }
 2418    }
 2419
 2420    private void dispatchAll(
 2421        InputStream stream,
 2422        int requestCount,
 2423        int requestId,
 2424        byte compress,
 2425        ObjectAdapter adapter)
 2426    {
 2427        // Note: In contrast to other private or protected methods, this method must be called *without* the mutex
 2428        // locked.
 2429
 2430        Object dispatcher = adapter?.dispatchPipeline;
 2431
 2432        try
 2433        {
 2434            while (requestCount > 0)
 2435            {
 2436                // adapter can be null here, however the adapter set in current can't be null, and we never pass
 2437                // a null current.adapter to the application code. Once this file enables nullable, adapter should be
 2438                // adapter! below.
 2439                var request = new IncomingRequest(requestId, this, adapter, stream);
 2440
 2441                if (dispatcher is not null)
 2442                {
 2443                    // We don't and can't await the dispatchAsync: with batch requests, we want all the dispatches to
 2444                    // execute in the current Ice thread pool thread. If we awaited the dispatchAsync, we could
 2445                    // switch to a .NET thread pool thread.
 2446                    _ = dispatchAsync(request);
 2447                }
 2448                else
 2449                {
 2450                    // Received request on a connection without an object adapter.
 2451                    sendResponse(
 2452                        request.current.createOutgoingResponse(new ObjectNotExistException()),
 2453                        isTwoWay: !_endpoint.datagram() && requestId != 0,
 2454                        compress: 0);
 2455                }
 2456                --requestCount;
 2457            }
 2458
 2459            stream.clear();
 2460        }
 2461        catch (LocalException ex) // TODO: catch all exceptions
 2462        {
 2463            // Typically, the IncomingRequest constructor throws an exception, and we can't continue.
 2464            dispatchException(ex, requestCount);
 2465        }
 2466
 2467        async Task dispatchAsync(IncomingRequest request)
 2468        {
 2469            try
 2470            {
 2471                OutgoingResponse response;
 2472
 2473                try
 2474                {
 2475                    response = await dispatcher.dispatchAsync(request).ConfigureAwait(false);
 2476                }
 2477                catch (System.Exception ex)
 2478                {
 2479                    response = request.current.createOutgoingResponse(ex);
 2480                }
 2481
 2482                sendResponse(response, isTwoWay: !_endpoint.datagram() && requestId != 0, compress);
 2483            }
 2484            catch (LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 2485            {
 2486                dispatchException(ex, requestCount: 1);
 2487            }
 2488        }
 2489    }
 2490
 2491    private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compress)
 2492    {
 2493        bool finished = false;
 2494        try
 2495        {
 2496            lock (_mutex)
 2497            {
 2498                Debug.Assert(_state > StateNotValidated);
 2499
 2500                try
 2501                {
 2502                    if (--_upcallCount == 0)
 2503                    {
 2504                        if (_state == StateFinished)
 2505                        {
 2506                            finished = true;
 2507                            _observer?.detach();
 2508                        }
 2509                        Monitor.PulseAll(_mutex);
 2510                    }
 2511
 2512                    if (_state >= StateClosed)
 2513                    {
 2514                        Debug.Assert(_exception is not null);
 2515                        throw _exception;
 2516                    }
 2517
 2518                    if (isTwoWay)
 2519                    {
 2520                        sendMessage(new OutgoingMessage(response.outputStream, compress > 0));
 2521                    }
 2522
 2523                    if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
 2524                    {
 2525                        // Resume reading if the connection is active and the dispatch count is about to be less than
 2526                        // _maxDispatches.
 2527                        _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
 2528                        _idleTimeoutTransceiver?.enableIdleCheck();
 2529                    }
 2530
 2531                    --_dispatchCount;
 2532
 2533                    if (_state == StateClosing && _upcallCount == 0)
 2534                    {
 2535                        initiateShutdown();
 2536                    }
 2537                }
 2538                catch (LocalException ex)
 2539                {
 2540                    setState(StateClosed, ex);
 2541                }
 2542            }
 2543        }
 2544        finally
 2545        {
 2546            if (finished && _removeFromFactory is not null)
 2547            {
 2548                _removeFromFactory(this);
 2549            }
 2550        }
 2551    }
 2552
 2553    private void dispatchException(LocalException ex, int requestCount)
 2554    {
 2555        bool finished = false;
 2556
 2557        // Fatal exception while dispatching a request. Since sendResponse isn't called in case of a fatal exception
 2558        // we decrement _upcallCount here.
 2559        lock (_mutex)
 2560        {
 2561            setState(StateClosed, ex);
 2562
 2563            if (requestCount > 0)
 2564            {
 2565                Debug.Assert(_upcallCount >= requestCount);
 2566                _upcallCount -= requestCount;
 2567                if (_upcallCount == 0)
 2568                {
 2569                    if (_state == StateFinished)
 2570                    {
 2571                        finished = true;
 2572                        _observer?.detach();
 2573                    }
 2574                    Monitor.PulseAll(_mutex);
 2575                }
 2576            }
 2577        }
 2578
 2579        if (finished && _removeFromFactory is not null)
 2580        {
 2581            _removeFromFactory(this);
 2582        }
 2583    }
 2584
 2585    private void inactivityCheck(System.Threading.Timer inactivityTimer)
 2586    {
 2587        lock (_mutex)
 2588        {
 2589            // If the timers are different, it means this inactivityTimer is no longer current.
 2590            if (inactivityTimer == _inactivityTimer)
 2591            {
 2592                _inactivityTimer = null;
 2593                inactivityTimer.Dispose(); // non-blocking
 2594
 2595                if (_state == StateActive)
 2596                {
 2597                    setState(
 2598                        StateClosing,
 2599                        new ConnectionClosedException(
 2600                            "Connection closed because it remained inactive for longer than the inactivity timeout.",
 2601                            closedByApplication: false));
 2602                }
 2603            }
 2604            // Else this timer was already canceled and disposed. Nothing to do.
 2605        }
 2606    }
 2607
 2608    private void connectTimedOut(System.Threading.Timer connectTimer)
 2609    {
 2610        lock (_mutex)
 2611        {
 2612            if (_state < StateActive)
 2613            {
 2614                setState(StateClosed, new ConnectTimeoutException());
 2615            }
 2616        }
 2617        // else ignore since we're already connected.
 2618        connectTimer.Dispose();
 2619    }
 2620
 2621    private void closeTimedOut(System.Threading.Timer closeTimer)
 2622    {
 2623        lock (_mutex)
 2624        {
 2625            if (_state < StateClosed)
 2626            {
 2627                // We don't use setState(state, exception) because we want to overwrite the exception set by a
 2628                // graceful closure.
 2629                _exception = new CloseTimeoutException();
 2630                setState(StateClosed);
 2631            }
 2632        }
 2633        // else ignore since we're already closed.
 2634        closeTimer.Dispose();
 2635    }
 2636
 2637    private ConnectionInfo initConnectionInfo()
 2638    {
 2639        // Called with _mutex locked.
 2640
 2641        if (_state > StateNotInitialized && _info is not null) // Update the connection info until it's initialized
 2642        {
 2643            return _info;
 2644        }
 2645
 2646        _info =
 2647            _transceiver.getInfo(incoming: _connector is null, _adapter?.getName() ?? "", _endpoint.connectionId());
 2648        return _info;
 2649    }
 2650
 2651    private void warning(string msg, System.Exception ex) => _logger.warning($"{msg}:\n{ex}\n{_transceiver}");
 2652
 2653    private void observerStartRead(Ice.Internal.Buffer buf)
 2654    {
 2655        if (_readStreamPos >= 0)
 2656        {
 2657            Debug.Assert(!buf.empty());
 2658            _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2659        }
 2660        _readStreamPos = buf.empty() ? -1 : buf.b.position();
 2661    }
 2662
 2663    private void observerFinishRead(Ice.Internal.Buffer buf)
 2664    {
 2665        if (_readStreamPos == -1)
 2666        {
 2667            return;
 2668        }
 2669        Debug.Assert(buf.b.position() >= _readStreamPos);
 2670        _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2671        _readStreamPos = -1;
 2672    }
 2673
 2674    private void observerStartWrite(Ice.Internal.Buffer buf)
 2675    {
 2676        if (_writeStreamPos >= 0)
 2677        {
 2678            Debug.Assert(!buf.empty());
 2679            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2680        }
 2681        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
 2682    }
 2683
 2684    private void observerFinishWrite(Ice.Internal.Buffer buf)
 2685    {
 2686        if (_writeStreamPos == -1)
 2687        {
 2688            return;
 2689        }
 2690        if (buf.b.position() > _writeStreamPos)
 2691        {
 2692            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2693        }
 2694        _writeStreamPos = -1;
 2695    }
 2696
 2697    private int read(Ice.Internal.Buffer buf)
 2698    {
 2699        int start = buf.b.position();
 2700        int op = _transceiver.read(buf, ref _hasMoreData);
 2701        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2702        {
 2703            var s = new StringBuilder("received ");
 2704            if (_endpoint.datagram())
 2705            {
 2706                s.Append(buf.b.limit());
 2707            }
 2708            else
 2709            {
 2710                s.Append(buf.b.position() - start);
 2711                s.Append(" of ");
 2712                s.Append(buf.b.limit() - start);
 2713            }
 2714            s.Append(" bytes via ");
 2715            s.Append(_endpoint.protocol());
 2716            s.Append('\n');
 2717            s.Append(ToString());
 2718            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2719        }
 2720        return op;
 2721    }
 2722
 2723    private int write(Ice.Internal.Buffer buf)
 2724    {
 2725        int start = buf.b.position();
 2726        int op = _transceiver.write(buf);
 2727        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2728        {
 2729            var s = new StringBuilder("sent ");
 2730            s.Append(buf.b.position() - start);
 2731            if (!_endpoint.datagram())
 2732            {
 2733                s.Append(" of ");
 2734                s.Append(buf.b.limit() - start);
 2735            }
 2736            s.Append(" bytes via ");
 2737            s.Append(_endpoint.protocol());
 2738            s.Append('\n');
 2739            s.Append(ToString());
 2740            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2741        }
 2742        return op;
 2743    }
 2744
 2745    private void scheduleInactivityTimer()
 2746    {
 2747        // Called with the ConnectionI mutex locked.
 2748        Debug.Assert(_inactivityTimer is null);
 2749        Debug.Assert(_inactivityTimeout > TimeSpan.Zero);
 2750
 2751        _inactivityTimer = new System.Threading.Timer(
 2752            inactivityTimer => inactivityCheck((System.Threading.Timer)inactivityTimer));
 2753        _inactivityTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 2754    }
 2755
 2756    private void cancelInactivityTimer()
 2757    {
 2758        // Called with the ConnectionI mutex locked.
 2759        if (_inactivityTimer is not null)
 2760        {
 2761            _inactivityTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 2762            _inactivityTimer.Dispose();
 2763            _inactivityTimer = null;
 2764        }
 2765    }
 2766
 2767    private void scheduleCloseTimer()
 2768    {
 2769        if (_closeTimeout > TimeSpan.Zero)
 2770        {
 2771#pragma warning disable CA2000 // closeTimer is disposed by closeTimedOut.
 2772            var closeTimer = new System.Threading.Timer(
 2773                timerObj => closeTimedOut((System.Threading.Timer)timerObj));
 2774            // schedule timer to run once; closeTimedOut disposes the timer too.
 2775            closeTimer.Change(_closeTimeout, Timeout.InfiniteTimeSpan);
 2776#pragma warning restore CA2000
 2777        }
 2778    }
 2779
 2780    private void doApplicationClose()
 2781    {
 2782        // Called with the ConnectionI mutex locked.
 2783        Debug.Assert(_state < StateClosing);
 2784        setState(
 2785            StateClosing,
 2786            new ConnectionClosedException(
 2787                "The connection was closed gracefully by the application.",
 2788                closedByApplication: true));
 2789    }
 2790
 2791    private class OutgoingMessage
 2792    {
 12793        internal OutgoingMessage(OutputStream stream, bool compress)
 2794        {
 12795            this.stream = stream;
 12796            this.compress = compress;
 12797        }
 2798
 12799        internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
 2800        {
 12801            this.outAsync = outAsync;
 12802            this.stream = stream;
 12803            this.compress = compress;
 12804            this.requestId = requestId;
 12805        }
 2806
 2807        internal void canceled()
 2808        {
 2809            Debug.Assert(outAsync is not null); // Only requests can timeout.
 12810            outAsync = null;
 12811        }
 2812
 2813        internal bool sent()
 2814        {
 12815            stream = null;
 12816            if (outAsync is not null)
 2817            {
 12818                invokeSent = outAsync.sent();
 12819                return invokeSent || receivedReply;
 2820            }
 12821            return false;
 2822        }
 2823
 2824        internal void completed(LocalException ex)
 2825        {
 12826            if (outAsync is not null)
 2827            {
 12828                if (outAsync.exception(ex))
 2829                {
 12830                    outAsync.invokeException();
 2831                }
 2832            }
 12833            stream = null;
 12834        }
 2835
 2836        internal OutputStream stream;
 2837        internal OutgoingAsyncBase outAsync;
 2838        internal bool compress;
 2839        internal int requestId;
 2840        internal bool prepared;
 2841        internal bool isSent;
 2842        internal bool invokeSent;
 2843        internal bool receivedReply;
 2844    }
 2845
 2846    private static readonly ConnectionState[] connectionStateMap = [
 2847        ConnectionState.ConnectionStateValidating,   // StateNotInitialized
 2848        ConnectionState.ConnectionStateValidating,   // StateNotValidated
 2849        ConnectionState.ConnectionStateActive,       // StateActive
 2850        ConnectionState.ConnectionStateHolding,      // StateHolding
 2851        ConnectionState.ConnectionStateClosing,      // StateClosing
 2852        ConnectionState.ConnectionStateClosing,      // StateClosingPending
 2853        ConnectionState.ConnectionStateClosed,       // StateClosed
 2854        ConnectionState.ConnectionStateClosed,       // StateFinished
 2855    ];
 2856
 2857    private readonly Instance _instance;
 2858    private readonly Transceiver _transceiver;
 2859    private readonly IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
 2860
 2861    private string _desc;
 2862    private readonly string _type;
 2863    private readonly Connector _connector;
 2864    private readonly EndpointI _endpoint;
 2865
 2866    private ObjectAdapter _adapter;
 2867
 2868    private readonly Logger _logger;
 2869    private readonly TraceLevels _traceLevels;
 2870    private readonly Ice.Internal.ThreadPool _threadPool;
 2871
 2872    private readonly TimeSpan _connectTimeout;
 2873    private readonly TimeSpan _closeTimeout;
 2874    private TimeSpan _inactivityTimeout; // protected by _mutex
 2875
 2876    private System.Threading.Timer _inactivityTimer; // can be null
 2877
 2878    private StartCallback _startCallback;
 2879
 2880    // This action must be called outside the ConnectionI lock to avoid lock acquisition deadlocks.
 2881    private readonly Action<ConnectionI> _removeFromFactory;
 2882
 2883    private readonly bool _warn;
 2884    private readonly bool _warnUdp;
 2885
 2886    private readonly int _compressionLevel;
 2887
 2888    private int _nextRequestId;
 2889
 2890    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 2891
 2892    private LocalException _exception;
 2893
 2894    private readonly int _messageSizeMax;
 2895    private readonly BatchRequestQueue _batchRequestQueue;
 2896
 2897    private readonly LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
 2898
 2899    // Contains the message which is being received. If the connection is waiting to receive a message (_readHeader ==
 2900    // true), its size is Protocol.headerSize. Otherwise, its size is the message size specified in the received message
 2901    // header.
 2902    private readonly InputStream _readStream;
 2903
 2904    // When _readHeader is true, the next bytes we'll read are the header of a new message. When false, we're reading
 2905    // next the remainder of a message that was already partially received.
 2906    private bool _readHeader;
 2907
 2908    // Contains the message which is being sent. The write stream buffer is empty if no message is being sent.
 2909    private readonly OutputStream _writeStream;
 2910
 2911    private ConnectionObserver _observer;
 2912    private int _readStreamPos;
 2913    private int _writeStreamPos;
 2914
 2915    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent callbacks and
 2916    // connection establishment callbacks that have been started (or are about to be started) by a thread of the thread
 2917    // pool associated with this connection, and have not completed yet. All these operations except the connection
 2918    // establishment callbacks execute application code or code generated from Slice definitions.
 2919    private int _upcallCount;
 2920
 2921    // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
 2922    // _dispatchCount can be greater than a non-0 _maxDispatches when a receive a batch with multiples requests.
 2923    private int _dispatchCount;
 2924
 2925    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to back-pressure the peer.
 2926    // _maxDispatches <= 0 means no limit.
 2927    private readonly int _maxDispatches;
 2928
 2929    private int _state; // The current state.
 2930    private bool _shutdownInitiated;
 2931    private bool _initialized;
 2932    private bool _validated;
 2933
 2934    // When true, the application called close and Connection must close the connection when it receives the reply
 2935    // for the last outstanding invocation.
 2936    private bool _closeRequested;
 2937
 2938    private ConnectionInfo _info;
 2939
 2940    private CloseCallback _closeCallback;
 2941
 2942    // We need to run the continuation asynchronously since it can be completed by an Ice thread pool thread.
 2943    private readonly TaskCompletionSource _closed = new(TaskCreationOptions.RunContinuationsAsynchronously);
 2944    private readonly object _mutex = new();
 2945}