< Summary

Information
Class: Ice.ConnectionI.OutgoingMessage
Assembly: Ice
File(s): /_/csharp/src/Ice/ConnectionI.cs
Tag: 91_21789722663
Line coverage
100%
Covered lines: 22
Uncovered lines: 0
Coverable lines: 22
Total lines: 2944
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage
100%
Covered methods: 5
Total methods: 5
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.ctor(...)100%11100%
canceled()100%11100%
sent()100%44100%
completed(...)100%44100%

File(s)

/_/csharp/src/Ice/ConnectionI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Ice.Instrumentation;
 4using Ice.Internal;
 5using System.Diagnostics;
 6using System.Text;
 7
 8namespace Ice;
 9
 10#pragma warning disable CA1001 // _inactivityTimer is disposed by cancelInactivityTimer.
 11public sealed class ConnectionI : Internal.EventHandler, CancellationHandler, Connection
 12#pragma warning restore CA1001
 13{
 14    internal interface StartCallback
 15    {
 16        void connectionStartCompleted(ConnectionI connection);
 17
 18        void connectionStartFailed(ConnectionI connection, LocalException ex);
 19    }
 20
 21    internal void start(StartCallback callback)
 22    {
 23        try
 24        {
 25            lock (_mutex)
 26            {
 27                //
 28                // The connection might already be closed if the communicator was destroyed.
 29                //
 30                if (_state >= StateClosed)
 31                {
 32                    Debug.Assert(_exception is not null);
 33                    throw _exception;
 34                }
 35
 36                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 37                {
 38                    if (_connectTimeout > TimeSpan.Zero)
 39                    {
 40#pragma warning disable CA2000 // connectTimer is disposed by connectTimedOut.
 41                        var connectTimer = new System.Threading.Timer(
 42                            timerObj => connectTimedOut((System.Threading.Timer)timerObj));
 43                        // schedule timer to run once; connectTimedOut disposes the timer too.
 44                        connectTimer.Change(_connectTimeout, Timeout.InfiniteTimeSpan);
 45#pragma warning restore CA2000
 46                    }
 47
 48                    _startCallback = callback;
 49                    return;
 50                }
 51
 52                // The connection starts in the holding state. It will be activated by the connection factory.
 53                setState(StateHolding);
 54            }
 55        }
 56        catch (LocalException ex)
 57        {
 58            exception(ex);
 59            callback.connectionStartFailed(this, _exception);
 60            return;
 61        }
 62
 63        callback.connectionStartCompleted(this);
 64    }
 65
 66    internal void startAndWait()
 67    {
 68        try
 69        {
 70            lock (_mutex)
 71            {
 72                //
 73                // The connection might already be closed if the communicator was destroyed.
 74                //
 75                if (_state >= StateClosed)
 76                {
 77                    Debug.Assert(_exception is not null);
 78                    throw _exception;
 79                }
 80
 81                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 82                {
 83                    //
 84                    // Wait for the connection to be validated.
 85                    //
 86                    while (_state <= StateNotValidated)
 87                    {
 88                        Monitor.Wait(_mutex);
 89                    }
 90
 91                    if (_state >= StateClosing)
 92                    {
 93                        Debug.Assert(_exception is not null);
 94                        throw _exception;
 95                    }
 96                }
 97
 98                //
 99                // We start out in holding state.
 100                //
 101                setState(StateHolding);
 102            }
 103        }
 104        catch (LocalException ex)
 105        {
 106            exception(ex);
 107            waitUntilFinished();
 108            return;
 109        }
 110    }
 111
 112    internal void activate()
 113    {
 114        lock (_mutex)
 115        {
 116            if (_state <= StateNotValidated)
 117            {
 118                return;
 119            }
 120
 121            setState(StateActive);
 122        }
 123    }
 124
 125    internal void hold()
 126    {
 127        lock (_mutex)
 128        {
 129            if (_state <= StateNotValidated)
 130            {
 131                return;
 132            }
 133
 134            setState(StateHolding);
 135        }
 136    }
 137
 138    // DestructionReason.
 139    public const int ObjectAdapterDeactivated = 0;
 140    public const int CommunicatorDestroyed = 1;
 141
 142    internal void destroy(int reason)
 143    {
 144        lock (_mutex)
 145        {
 146            switch (reason)
 147            {
 148                case ObjectAdapterDeactivated:
 149                {
 150                    setState(StateClosing, new ObjectAdapterDeactivatedException(_adapter?.getName() ?? ""));
 151                    break;
 152                }
 153
 154                case CommunicatorDestroyed:
 155                {
 156                    setState(StateClosing, new CommunicatorDestroyedException());
 157                    break;
 158                }
 159            }
 160        }
 161    }
 162
 163    public void abort()
 164    {
 165        lock (_mutex)
 166        {
 167            setState(
 168                StateClosed,
 169                new ConnectionAbortedException(
 170                    "The connection was aborted by the application.",
 171                    closedByApplication: true));
 172        }
 173    }
 174
 175    public Task closeAsync()
 176    {
 177        lock (_mutex)
 178        {
 179            if (_state < StateClosing)
 180            {
 181                if (_asyncRequests.Count == 0)
 182                {
 183                    doApplicationClose();
 184                }
 185                else
 186                {
 187                    _closeRequested = true;
 188                    scheduleCloseTimer(); // we don't wait forever for outstanding invocations to complete
 189                }
 190            }
 191            // else nothing to do, already closing or closed.
 192        }
 193
 194        return _closed.Task;
 195    }
 196
 197    internal bool isActiveOrHolding()
 198    {
 199        lock (_mutex)
 200        {
 201            return _state > StateNotValidated && _state < StateClosing;
 202        }
 203    }
 204
 205    public void throwException()
 206    {
 207        lock (_mutex)
 208        {
 209            if (_exception is not null)
 210            {
 211                Debug.Assert(_state >= StateClosing);
 212                throw _exception;
 213            }
 214        }
 215    }
 216
 217    internal void waitUntilHolding()
 218    {
 219        lock (_mutex)
 220        {
 221            while (_state < StateHolding || _upcallCount > 0)
 222            {
 223                Monitor.Wait(_mutex);
 224            }
 225        }
 226    }
 227
 228    internal void waitUntilFinished()
 229    {
 230        lock (_mutex)
 231        {
 232            //
 233            // We wait indefinitely until the connection is finished and all
 234            // outstanding requests are completed. Otherwise we couldn't
 235            // guarantee that there are no outstanding calls when deactivate()
 236            // is called on the servant locators.
 237            //
 238            while (_state < StateFinished || _upcallCount > 0)
 239            {
 240                Monitor.Wait(_mutex);
 241            }
 242
 243            Debug.Assert(_state == StateFinished);
 244
 245            //
 246            // Clear the OA. See bug 1673 for the details of why this is necessary.
 247            //
 248            _adapter = null;
 249        }
 250    }
 251
 252    internal void updateObserver()
 253    {
 254        lock (_mutex)
 255        {
 256            if (_state < StateNotValidated || _state > StateClosed)
 257            {
 258                return;
 259            }
 260
 261            Debug.Assert(_instance.initializationData().observer is not null);
 262            _observer = _instance.initializationData().observer.getConnectionObserver(
 263                initConnectionInfo(),
 264                _endpoint,
 265                toConnectionState(_state),
 266                _observer);
 267            if (_observer is not null)
 268            {
 269                _observer.attach();
 270            }
 271            else
 272            {
 273                _writeStreamPos = -1;
 274                _readStreamPos = -1;
 275            }
 276        }
 277    }
 278
 279    internal int sendAsyncRequest(
 280        OutgoingAsyncBase og,
 281        bool compress,
 282        bool response,
 283        int batchRequestCount)
 284    {
 285        OutputStream os = og.getOs();
 286
 287        lock (_mutex)
 288        {
 289            //
 290            // If the exception is closed before we even have a chance
 291            // to send our request, we always try to send the request
 292            // again.
 293            //
 294            if (_exception is not null)
 295            {
 296                throw new RetryException(_exception);
 297            }
 298
 299            Debug.Assert(_state > StateNotValidated);
 300            Debug.Assert(_state < StateClosing);
 301
 302            //
 303            // Ensure the message isn't bigger than what we can send with the
 304            // transport.
 305            //
 306            _transceiver.checkSendSize(os.getBuffer());
 307
 308            //
 309            // Notify the request that it's cancelable with this connection.
 310            // This will throw if the request is canceled.
 311            //
 312            og.cancelable(this);
 313            int requestId = 0;
 314            if (response)
 315            {
 316                //
 317                // Create a new unique request ID.
 318                //
 319                requestId = _nextRequestId++;
 320                if (requestId <= 0)
 321                {
 322                    _nextRequestId = 1;
 323                    requestId = _nextRequestId++;
 324                }
 325
 326                //
 327                // Fill in the request ID.
 328                //
 329                os.pos(Protocol.headerSize);
 330                os.writeInt(requestId);
 331            }
 332            else if (batchRequestCount > 0)
 333            {
 334                os.pos(Protocol.headerSize);
 335                os.writeInt(batchRequestCount);
 336            }
 337
 338            og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
 339
 340            // We're just about to send a request, so we are not inactive anymore.
 341            cancelInactivityTimer();
 342
 343            int status = OutgoingAsyncBase.AsyncStatusQueued;
 344            try
 345            {
 346                var message = new OutgoingMessage(og, os, compress, requestId);
 347                status = sendMessage(message);
 348            }
 349            catch (LocalException ex)
 350            {
 351                setState(StateClosed, ex);
 352                Debug.Assert(_exception is not null);
 353                throw _exception;
 354            }
 355
 356            if (response)
 357            {
 358                //
 359                // Add to the async requests map.
 360                //
 361                _asyncRequests[requestId] = og;
 362            }
 363            return status;
 364        }
 365    }
 366
 367    internal BatchRequestQueue getBatchRequestQueue() => _batchRequestQueue;
 368
 369    public void flushBatchRequests(CompressBatch compress)
 370    {
 371        try
 372        {
 373            var completed = new FlushBatchTaskCompletionCallback();
 374            var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 375            outgoing.invoke(_flushBatchRequests_name, compress, true);
 376            completed.Task.Wait();
 377        }
 378        catch (AggregateException ex)
 379        {
 380            throw ex.InnerException;
 381        }
 382    }
 383
 384    public Task flushBatchRequestsAsync(
 385        CompressBatch compress,
 386        IProgress<bool> progress = null,
 387        CancellationToken cancel = default)
 388    {
 389        var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
 390        var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 391        outgoing.invoke(_flushBatchRequests_name, compress, false);
 392        return completed.Task;
 393    }
 394
 395    private const string _flushBatchRequests_name = "flushBatchRequests";
 396
 397    public void disableInactivityCheck()
 398    {
 399        lock (_mutex)
 400        {
 401            cancelInactivityTimer();
 402            _inactivityTimeout = TimeSpan.Zero;
 403        }
 404    }
 405
 406    public void setCloseCallback(CloseCallback callback)
 407    {
 408        lock (_mutex)
 409        {
 410            if (_state >= StateClosed)
 411            {
 412                if (callback is not null)
 413                {
 414                    _threadPool.execute(
 415                        () =>
 416                        {
 417                            try
 418                            {
 419                                callback(this);
 420                            }
 421                            catch (System.Exception ex)
 422                            {
 423                                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 424                            }
 425                        },
 426                        this);
 427                }
 428            }
 429            else
 430            {
 431                _closeCallback = callback;
 432            }
 433        }
 434    }
 435
 436    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
 437    {
 438        //
 439        // NOTE: This isn't called from a thread pool thread.
 440        //
 441
 442        lock (_mutex)
 443        {
 444            if (_state >= StateClosed)
 445            {
 446                return; // The request has already been or will be shortly notified of the failure.
 447            }
 448
 449            OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
 450            if (o is not null)
 451            {
 452                if (o.requestId > 0)
 453                {
 454                    _asyncRequests.Remove(o.requestId);
 455                }
 456
 457                if (ex is ConnectionAbortedException)
 458                {
 459                    setState(StateClosed, ex);
 460                }
 461                else
 462                {
 463                    //
 464                    // If the request is being sent, don't remove it from the send streams,
 465                    // it will be removed once the sending is finished.
 466                    //
 467                    if (o == _sendStreams.First.Value)
 468                    {
 469                        o.canceled();
 470                    }
 471                    else
 472                    {
 473                        o.canceled();
 474                        _sendStreams.Remove(o);
 475                    }
 476                    if (outAsync.exception(ex))
 477                    {
 478                        outAsync.invokeExceptionAsync();
 479                    }
 480                }
 481
 482                if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 483                {
 484                    doApplicationClose();
 485                }
 486                return;
 487            }
 488
 489            if (outAsync is OutgoingAsync)
 490            {
 491                foreach (KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
 492                {
 493                    if (kvp.Value == outAsync)
 494                    {
 495                        if (ex is ConnectionAbortedException)
 496                        {
 497                            setState(StateClosed, ex);
 498                        }
 499                        else
 500                        {
 501                            _asyncRequests.Remove(kvp.Key);
 502                            if (outAsync.exception(ex))
 503                            {
 504                                outAsync.invokeExceptionAsync();
 505                            }
 506                        }
 507
 508                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 509                        {
 510                            doApplicationClose();
 511                        }
 512                        return;
 513                    }
 514                }
 515            }
 516        }
 517    }
 518
 519    internal EndpointI endpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 520
 521    internal Connector connector() => _connector; // No mutex protection necessary, _endpoint is immutable.
 522
 523    public void setAdapter(ObjectAdapter adapter)
 524    {
 525        if (_connector is null) // server connection
 526        {
 527            throw new InvalidOperationException("setAdapter can only be called on a client connection");
 528        }
 529
 530        if (adapter is not null)
 531        {
 532            // Go through the adapter to set the adapter and servant manager on this connection
 533            // to ensure the object adapter is still active.
 534            adapter.setAdapterOnConnection(this);
 535        }
 536        else
 537        {
 538            lock (_mutex)
 539            {
 540                if (_state <= StateNotValidated || _state >= StateClosing)
 541                {
 542                    return;
 543                }
 544                _adapter = null;
 545            }
 546        }
 547
 548        //
 549        // We never change the thread pool with which we were initially
 550        // registered, even if we add or remove an object adapter.
 551        //
 552    }
 553
 554    public ObjectAdapter getAdapter()
 555    {
 556        lock (_mutex)
 557        {
 558            return _adapter;
 559        }
 560    }
 561
 562    public Endpoint getEndpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 563
 564    public ObjectPrx createProxy(Identity id)
 565    {
 566        ObjectAdapter.checkIdentity(id);
 567        return new ObjectPrxHelper(_instance.referenceFactory().create(id, this));
 568    }
 569
 570    public void setAdapterFromAdapter(ObjectAdapter adapter)
 571    {
 572        lock (_mutex)
 573        {
 574            if (_state <= StateNotValidated || _state >= StateClosing)
 575            {
 576                return;
 577            }
 578            Debug.Assert(adapter is not null); // Called by ObjectAdapter::setAdapterOnConnection
 579            _adapter = adapter;
 580
 581            // Clear cached connection info (if any) as it's no longer accurate.
 582            _info = null;
 583        }
 584    }
 585
 586    //
 587    // Operations from EventHandler
 588    //
 589    public override bool startAsync(int operation, Ice.Internal.AsyncCallback completedCallback)
 590    {
 591        if (_state >= StateClosed)
 592        {
 593            return false;
 594        }
 595
 596        // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
 597        // Ice thread pool thread is terminated (.NET Socket read/write fail with a SocketError.OperationAborted
 598        // error if started from a thread which is later terminated).
 599        Task.Run(() =>
 600        {
 601            lock (_mutex)
 602            {
 603                if (_state >= StateClosed)
 604                {
 605                    completedCallback(this);
 606                    return;
 607                }
 608
 609                try
 610                {
 611                    if ((operation & SocketOperation.Write) != 0)
 612                    {
 613                        if (_observer != null)
 614                        {
 615                            observerStartWrite(_writeStream.getBuffer());
 616                        }
 617
 618                        bool completedSynchronously =
 619                            _transceiver.startWrite(
 620                                _writeStream.getBuffer(),
 621                                completedCallback,
 622                                this,
 623                                out bool messageWritten);
 624
 625                        // If the startWrite call wrote the message, we assume the message is sent now for at-most-once
 626                        // semantics in the event the connection is closed while the message is still in _sendStreams.
 627                        if (messageWritten && _sendStreams.Count > 0)
 628                        {
 629                            // See finish() code.
 630                            _sendStreams.First.Value.isSent = true;
 631                        }
 632
 633                        if (completedSynchronously)
 634                        {
 635                            // If the write completed synchronously, we need to call the completedCallback.
 636                            completedCallback(this);
 637                        }
 638                    }
 639                    else if ((operation & SocketOperation.Read) != 0)
 640                    {
 641                        if (_observer != null && !_readHeader)
 642                        {
 643                            observerStartRead(_readStream.getBuffer());
 644                        }
 645
 646                        if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
 647                        {
 648                            completedCallback(this);
 649                        }
 650                    }
 651                }
 652                catch (LocalException ex)
 653                {
 654                    setState(StateClosed, ex);
 655                    completedCallback(this);
 656                }
 657            }
 658        });
 659
 660        return true;
 661    }
 662
 663    public override bool finishAsync(int operation)
 664    {
 665        if (_state >= StateClosed)
 666        {
 667            return false;
 668        }
 669
 670        try
 671        {
 672            if ((operation & SocketOperation.Write) != 0)
 673            {
 674                Ice.Internal.Buffer buf = _writeStream.getBuffer();
 675                int start = buf.b.position();
 676                _transceiver.finishWrite(buf);
 677                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 678                {
 679                    var s = new StringBuilder("sent ");
 680                    s.Append(buf.b.position() - start);
 681                    if (!_endpoint.datagram())
 682                    {
 683                        s.Append(" of ");
 684                        s.Append(buf.b.limit() - start);
 685                    }
 686                    s.Append(" bytes via ");
 687                    s.Append(_endpoint.protocol());
 688                    s.Append('\n');
 689                    s.Append(ToString());
 690                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 691                }
 692
 693                if (_observer is not null)
 694                {
 695                    observerFinishWrite(_writeStream.getBuffer());
 696                }
 697            }
 698            else if ((operation & SocketOperation.Read) != 0)
 699            {
 700                Ice.Internal.Buffer buf = _readStream.getBuffer();
 701                int start = buf.b.position();
 702                _transceiver.finishRead(buf);
 703                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 704                {
 705                    var s = new StringBuilder("received ");
 706                    if (_endpoint.datagram())
 707                    {
 708                        s.Append(buf.b.limit());
 709                    }
 710                    else
 711                    {
 712                        s.Append(buf.b.position() - start);
 713                        s.Append(" of ");
 714                        s.Append(buf.b.limit() - start);
 715                    }
 716                    s.Append(" bytes via ");
 717                    s.Append(_endpoint.protocol());
 718                    s.Append('\n');
 719                    s.Append(ToString());
 720                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 721                }
 722
 723                if (_observer is not null && !_readHeader)
 724                {
 725                    observerFinishRead(_readStream.getBuffer());
 726                }
 727            }
 728        }
 729        catch (LocalException ex)
 730        {
 731            setState(StateClosed, ex);
 732        }
 733        return _state < StateClosed;
 734    }
 735
 736    public override void message(ThreadPoolCurrent current)
 737    {
 738        StartCallback startCB = null;
 739        Queue<OutgoingMessage> sentCBs = null;
 740        var info = new MessageInfo();
 741        int upcallCount = 0;
 742
 743        using var msg = new ThreadPoolMessage(current, _mutex);
 744        lock (_mutex)
 745        {
 746            try
 747            {
 748                if (!msg.startIOScope())
 749                {
 750                    return;
 751                }
 752
 753                if (_state >= StateClosed)
 754                {
 755                    return;
 756                }
 757
 758                try
 759                {
 760                    int writeOp = SocketOperation.None;
 761                    int readOp = SocketOperation.None;
 762
 763                    // If writes are ready, write the data from the connection's write buffer (_writeStream)
 764                    if ((current.operation & SocketOperation.Write) != 0)
 765                    {
 766                        if (_observer is not null)
 767                        {
 768                            observerStartWrite(_writeStream.getBuffer());
 769                        }
 770                        writeOp = write(_writeStream.getBuffer());
 771                        if (_observer is not null && (writeOp & SocketOperation.Write) == 0)
 772                        {
 773                            observerFinishWrite(_writeStream.getBuffer());
 774                        }
 775                    }
 776
 777                    // If reads are ready, read the data into the connection's read buffer (_readStream). The data is
 778                    // read until:
 779                    // - the full message is read (the transport read returns SocketOperationNone) and
 780                    //   the read buffer is fully filled
 781                    // - the read operation on the transport can't continue without blocking
 782                    if ((current.operation & SocketOperation.Read) != 0)
 783                    {
 784                        while (true)
 785                        {
 786                            Ice.Internal.Buffer buf = _readStream.getBuffer();
 787
 788                            if (_observer is not null && !_readHeader)
 789                            {
 790                                observerStartRead(buf);
 791                            }
 792
 793                            readOp = read(buf);
 794                            if ((readOp & SocketOperation.Read) != 0)
 795                            {
 796                                // Can't continue without blocking, exit out of the loop.
 797                                break;
 798                            }
 799                            if (_observer is not null && !_readHeader)
 800                            {
 801                                Debug.Assert(!buf.b.hasRemaining());
 802                                observerFinishRead(buf);
 803                            }
 804
 805                            // If read header is true, we're reading a new Ice protocol message and we need to read
 806                            // the message header.
 807                            if (_readHeader)
 808                            {
 809                                // The next read will read the remainder of the message.
 810                                _readHeader = false;
 811
 812                                _observer?.receivedBytes(Protocol.headerSize);
 813
 814                                //
 815                                // Connection is validated on first message. This is only used by
 816                                // setState() to check whether or not we can print a connection
 817                                // warning (a client might close the connection forcefully if the
 818                                // connection isn't validated, we don't want to print a warning
 819                                // in this case).
 820                                //
 821                                _validated = true;
 822
 823                                // Full header should be read because the size of _readStream is always headerSize (14)
 824                                // when reading a new message (see the code that sets _readHeader = true).
 825                                int pos = _readStream.pos();
 826                                if (pos < Protocol.headerSize)
 827                                {
 828                                    //
 829                                    // This situation is possible for small UDP packets.
 830                                    //
 831                                    throw new MarshalException("Received Ice message with too few bytes in header.");
 832                                }
 833
 834                                // Decode the header.
 835                                _readStream.pos(0);
 836                                byte[] m = new byte[4];
 837                                m[0] = _readStream.readByte();
 838                                m[1] = _readStream.readByte();
 839                                m[2] = _readStream.readByte();
 840                                m[3] = _readStream.readByte();
 841                                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 842                                m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 843                                {
 844                                    throw new ProtocolException(
 845                                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 846                                }
 847
 848                                var pv = new ProtocolVersion(_readStream);
 849                                if (pv != Protocol.currentProtocol)
 850                                {
 851                                    throw new MarshalException(
 852                                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 853                                }
 854                                var ev = new EncodingVersion(_readStream);
 855                                if (ev != Protocol.currentProtocolEncoding)
 856                                {
 857                                    throw new MarshalException(
 858                                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 859                                }
 860
 861                                _readStream.readByte(); // messageType
 862                                _readStream.readByte(); // compress
 863                                int size = _readStream.readInt();
 864                                if (size < Protocol.headerSize)
 865                                {
 866                                    throw new MarshalException($"Received Ice message with unexpected size {size}.");
 867                                }
 868
 869                                // Resize the read buffer to the message size.
 870                                if (size > _messageSizeMax)
 871                                {
 872                                    Ex.throwMemoryLimitException(size, _messageSizeMax);
 873                                }
 874                                if (size > _readStream.size())
 875                                {
 876                                    _readStream.resize(size);
 877                                }
 878                                _readStream.pos(pos);
 879                            }
 880
 881                            if (buf.b.hasRemaining())
 882                            {
 883                                if (_endpoint.datagram())
 884                                {
 885                                    throw new DatagramLimitException(); // The message was truncated.
 886                                }
 887                                continue;
 888                            }
 889                            break;
 890                        }
 891                    }
 892
 893                    // readOp and writeOp are set to the operations that the transport read or write calls from above
 894                    // returned. They indicate which operations will need to be monitored by the thread pool's selector
 895                    // when this method returns.
 896                    int newOp = readOp | writeOp;
 897
 898                    // Operations that are ready. For example, if message was called with SocketOperationRead and the
 899                    // transport read returned SocketOperationNone, reads are considered done: there's no additional
 900                    // data to read.
 901                    int readyOp = current.operation & ~newOp;
 902
 903                    if (_state <= StateNotValidated)
 904                    {
 905                        // If the connection is still not validated and there's still data to read or write, continue
 906                        // waiting for data to read or write.
 907                        if (newOp != 0)
 908                        {
 909                            _threadPool.update(this, current.operation, newOp);
 910                            return;
 911                        }
 912
 913                        // Initialize the connection if it's not initialized yet.
 914                        if (_state == StateNotInitialized && !initialize(current.operation))
 915                        {
 916                            return;
 917                        }
 918
 919                        // Validate the connection if it's not validated yet.
 920                        if (_state <= StateNotValidated && !validate(current.operation))
 921                        {
 922                            return;
 923                        }
 924
 925                        // The connection is validated and doesn't need additional data to be read or written. So
 926                        // unregister it from the thread pool's selector.
 927                        _threadPool.unregister(this, current.operation);
 928
 929                        //
 930                        // We start out in holding state.
 931                        //
 932                        setState(StateHolding);
 933                        if (_startCallback is not null)
 934                        {
 935                            startCB = _startCallback;
 936                            _startCallback = null;
 937                            if (startCB is not null)
 938                            {
 939                                ++upcallCount;
 940                            }
 941                        }
 942                    }
 943                    else
 944                    {
 945                        Debug.Assert(_state <= StateClosingPending);
 946
 947                        //
 948                        // We parse messages first, if we receive a close
 949                        // connection message we won't send more messages.
 950                        //
 951                        if ((readyOp & SocketOperation.Read) != 0)
 952                        {
 953                            // At this point, the protocol message is fully read and can therefore be decoded by
 954                            // parseMessage. parseMessage returns the operation to wait for readiness next.
 955                            newOp |= parseMessage(ref info);
 956                            upcallCount += info.upcallCount;
 957                        }
 958
 959                        if ((readyOp & SocketOperation.Write) != 0)
 960                        {
 961                            // At this point the message from _writeStream is fully written and the next message can be
 962                            // written.
 963
 964                            newOp |= sendNextMessage(out sentCBs);
 965                            if (sentCBs is not null)
 966                            {
 967                                ++upcallCount;
 968                            }
 969                        }
 970
 971                        // If the connection is not closed yet, we can update the thread pool selector to wait for
 972                        // readiness of read, write or both operations.
 973                        if (_state < StateClosed)
 974                        {
 975                            _threadPool.update(this, current.operation, newOp);
 976                        }
 977                    }
 978
 979                    if (upcallCount == 0)
 980                    {
 981                        return; // Nothing to execute, we're done!
 982                    }
 983
 984                    _upcallCount += upcallCount;
 985
 986                    // There's something to execute so we mark IO as completed to elect a new leader thread and let IO
 987                    // be performed on this new leader thread while this thread continues with executing the upcalls.
 988                    msg.ioCompleted();
 989                }
 990                catch (DatagramLimitException) // Expected.
 991                {
 992                    if (_warnUdp)
 993                    {
 994                        _logger.warning($"maximum datagram size of {_readStream.pos()} exceeded");
 995                    }
 996                    _readStream.resize(Protocol.headerSize);
 997                    _readStream.pos(0);
 998                    _readHeader = true;
 999                    return;
 1000                }
 1001                catch (SocketException ex)
 1002                {
 1003                    setState(StateClosed, ex);
 1004                    return;
 1005                }
 1006                catch (LocalException ex)
 1007                {
 1008                    if (_endpoint.datagram())
 1009                    {
 1010                        if (_warn)
 1011                        {
 1012                            _logger.warning($"datagram connection exception:\n{ex}\n{_desc}");
 1013                        }
 1014                        _readStream.resize(Protocol.headerSize);
 1015                        _readStream.pos(0);
 1016                        _readHeader = true;
 1017                    }
 1018                    else
 1019                    {
 1020                        setState(StateClosed, ex);
 1021                    }
 1022                    return;
 1023                }
 1024            }
 1025            finally
 1026            {
 1027                msg.finishIOScope();
 1028            }
 1029        }
 1030
 1031        _threadPool.executeFromThisThread(() => upcall(startCB, sentCBs, info), this);
 1032    }
 1033
 1034    private void upcall(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
 1035    {
 1036        int completedUpcallCount = 0;
 1037
 1038        //
 1039        // Notify the factory that the connection establishment and
 1040        // validation has completed.
 1041        //
 1042        if (startCB is not null)
 1043        {
 1044            startCB.connectionStartCompleted(this);
 1045            ++completedUpcallCount;
 1046        }
 1047
 1048        //
 1049        // Notify AMI calls that the message was sent.
 1050        //
 1051        if (sentCBs is not null)
 1052        {
 1053            foreach (OutgoingMessage m in sentCBs)
 1054            {
 1055                if (m.invokeSent)
 1056                {
 1057                    m.outAsync.invokeSent();
 1058                }
 1059                if (m.receivedReply)
 1060                {
 1061                    var outAsync = (OutgoingAsync)m.outAsync;
 1062                    if (outAsync.response())
 1063                    {
 1064                        outAsync.invokeResponse();
 1065                    }
 1066                }
 1067            }
 1068            ++completedUpcallCount;
 1069        }
 1070
 1071        //
 1072        // Asynchronous replies must be handled outside the thread
 1073        // synchronization, so that nested calls are possible.
 1074        //
 1075        if (info.outAsync is not null)
 1076        {
 1077            info.outAsync.invokeResponse();
 1078            ++completedUpcallCount;
 1079        }
 1080
 1081        //
 1082        // Method invocation (or multiple invocations for batch messages)
 1083        // must be done outside the thread synchronization, so that nested
 1084        // calls are possible.
 1085        //
 1086        if (info.requestCount > 0)
 1087        {
 1088            dispatchAll(info.stream, info.requestCount, info.requestId, info.compress, info.adapter);
 1089        }
 1090
 1091        //
 1092        // Decrease the upcall count.
 1093        //
 1094        bool finished = false;
 1095        if (completedUpcallCount > 0)
 1096        {
 1097            lock (_mutex)
 1098            {
 1099                _upcallCount -= completedUpcallCount;
 1100                if (_upcallCount == 0)
 1101                {
 1102                    // Only initiate shutdown if not already initiated. It might have already been initiated if the sent
 1103                    // callback or AMI callback was called when the connection was in the closing state.
 1104                    if (_state == StateClosing)
 1105                    {
 1106                        try
 1107                        {
 1108                            initiateShutdown();
 1109                        }
 1110                        catch (Ice.LocalException ex)
 1111                        {
 1112                            setState(StateClosed, ex);
 1113                        }
 1114                    }
 1115                    else if (_state == StateFinished)
 1116                    {
 1117                        finished = true;
 1118                        _observer?.detach();
 1119                    }
 1120                    Monitor.PulseAll(_mutex);
 1121                }
 1122            }
 1123        }
 1124
 1125        if (finished && _removeFromFactory is not null)
 1126        {
 1127            _removeFromFactory(this);
 1128        }
 1129    }
 1130
 1131    public override void finished(ThreadPoolCurrent current)
 1132    {
 1133        // Lock the connection here to ensure setState() completes before the code below is executed. This method can
 1134        // be called by the thread pool as soon as setState() calls _threadPool->finish(...). There's no need to lock
 1135        // the mutex for the remainder of the code because the data members accessed by finish() are immutable once
 1136        // _state == StateClosed (and we don't want to hold the mutex when calling upcalls).
 1137        lock (_mutex)
 1138        {
 1139            Debug.Assert(_state == StateClosed);
 1140        }
 1141
 1142        //
 1143        // If there are no callbacks to call, we don't call ioCompleted() since we're not going
 1144        // to call code that will potentially block (this avoids promoting a new leader and
 1145        // unnecessary thread creation, especially if this is called on shutdown).
 1146        //
 1147        if (_startCallback is null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback is null)
 1148        {
 1149            finish();
 1150            return;
 1151        }
 1152
 1153        current.ioCompleted();
 1154        _threadPool.executeFromThisThread(finish, this);
 1155    }
 1156
 1157    private void finish()
 1158    {
 1159        if (!_initialized)
 1160        {
 1161            if (_instance.traceLevels().network >= 2)
 1162            {
 1163                var s = new StringBuilder("failed to ");
 1164                s.Append(_connector is not null ? "establish" : "accept");
 1165                s.Append(' ');
 1166                s.Append(_endpoint.protocol());
 1167                s.Append(" connection\n");
 1168                s.Append(ToString());
 1169                s.Append('\n');
 1170                s.Append(_exception);
 1171                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1172            }
 1173        }
 1174        else
 1175        {
 1176            if (_instance.traceLevels().network >= 1)
 1177            {
 1178                var s = new StringBuilder("closed ");
 1179                s.Append(_endpoint.protocol());
 1180                s.Append(" connection\n");
 1181                s.Append(ToString());
 1182
 1183                // Trace the cause of most connection closures.
 1184                if (!(_exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException))
 1185                {
 1186                    s.Append('\n');
 1187                    s.Append(_exception);
 1188                }
 1189
 1190                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1191            }
 1192        }
 1193
 1194        _startCallback?.connectionStartFailed(this, _exception);
 1195        _startCallback = null;
 1196
 1197        if (_sendStreams.Count > 0)
 1198        {
 1199            if (!_writeStream.isEmpty())
 1200            {
 1201                //
 1202                // Return the stream to the outgoing call. This is important for
 1203                // retriable AMI calls which are not marshaled again.
 1204                //
 1205                OutgoingMessage message = _sendStreams.First.Value;
 1206                _writeStream.swap(message.stream);
 1207
 1208                //
 1209                // The current message might be sent but not yet removed from _sendStreams. If
 1210                // the response has been received in the meantime, we remove the message from
 1211                // _sendStreams to not call finished on a message which is already done.
 1212                //
 1213                if (message.isSent || message.receivedReply)
 1214                {
 1215                    if (message.sent() && message.invokeSent)
 1216                    {
 1217                        message.outAsync.invokeSent();
 1218                    }
 1219                    if (message.receivedReply)
 1220                    {
 1221                        var outAsync = (OutgoingAsync)message.outAsync;
 1222                        if (outAsync.response())
 1223                        {
 1224                            outAsync.invokeResponse();
 1225                        }
 1226                    }
 1227                    _sendStreams.RemoveFirst();
 1228                }
 1229            }
 1230
 1231            foreach (OutgoingMessage o in _sendStreams)
 1232            {
 1233                o.completed(_exception);
 1234                if (o.requestId > 0) // Make sure finished isn't called twice.
 1235                {
 1236                    _asyncRequests.Remove(o.requestId);
 1237                }
 1238            }
 1239            _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
 1240        }
 1241
 1242        foreach (OutgoingAsyncBase o in _asyncRequests.Values)
 1243        {
 1244            if (o.exception(_exception))
 1245            {
 1246                o.invokeException();
 1247            }
 1248        }
 1249        _asyncRequests.Clear();
 1250
 1251        //
 1252        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
 1253        //
 1254        _writeStream.clear();
 1255        _writeStream.getBuffer().clear();
 1256        _readStream.clear();
 1257        _readStream.getBuffer().clear();
 1258
 1259        if (_exception is ConnectionClosedException or
 1260            CloseConnectionException or
 1261            CommunicatorDestroyedException or
 1262            ObjectAdapterDeactivatedException)
 1263        {
 1264            // Can execute synchronously. Note that we're not within a lock(this) here.
 1265            _closed.SetResult();
 1266        }
 1267        else
 1268        {
 1269            Debug.Assert(_exception is not null);
 1270            _closed.SetException(_exception);
 1271        }
 1272
 1273        if (_closeCallback is not null)
 1274        {
 1275            try
 1276            {
 1277                _closeCallback(this);
 1278            }
 1279            catch (System.Exception ex)
 1280            {
 1281                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 1282            }
 1283            _closeCallback = null;
 1284        }
 1285
 1286        //
 1287        // This must be done last as this will cause waitUntilFinished() to return (and communicator
 1288        // objects such as the timer might be destroyed too).
 1289        //
 1290        bool finished = false;
 1291        lock (_mutex)
 1292        {
 1293            setState(StateFinished);
 1294
 1295            if (_upcallCount == 0)
 1296            {
 1297                finished = true;
 1298                _observer?.detach();
 1299            }
 1300        }
 1301
 1302        if (finished && _removeFromFactory is not null)
 1303        {
 1304            _removeFromFactory(this);
 1305        }
 1306    }
 1307
 1308    /// <inheritdoc/>
 1309    public override string ToString() => _desc; // No mutex lock, _desc is immutable.
 1310
 1311    /// <inheritdoc/>
 1312    public string type() => _type; // No mutex lock, _type is immutable.
 1313
 1314    /// <inheritdoc/>
 1315    public ConnectionInfo getInfo()
 1316    {
 1317        lock (_mutex)
 1318        {
 1319            if (_state >= StateClosed)
 1320            {
 1321                throw _exception;
 1322            }
 1323            return initConnectionInfo();
 1324        }
 1325    }
 1326
 1327    /// <inheritdoc/>
 1328    public void setBufferSize(int rcvSize, int sndSize)
 1329    {
 1330        lock (_mutex)
 1331        {
 1332            if (_state >= StateClosed)
 1333            {
 1334                throw _exception;
 1335            }
 1336            _transceiver.setBufferSize(rcvSize, sndSize);
 1337            _info = null; // Invalidate the cached connection info
 1338        }
 1339    }
 1340
 1341    public void exception(LocalException ex)
 1342    {
 1343        lock (_mutex)
 1344        {
 1345            setState(StateClosed, ex);
 1346        }
 1347    }
 1348
 1349    public Ice.Internal.ThreadPool getThreadPool() => _threadPool;
 1350
 1351    internal ConnectionI(
 1352        Instance instance,
 1353        Transceiver transceiver,
 1354        Connector connector, // null for incoming connections, non-null for outgoing connections
 1355        EndpointI endpoint,
 1356        ObjectAdapter adapter,
 1357        Action<ConnectionI> removeFromFactory, // can be null
 1358        ConnectionOptions options)
 1359    {
 1360        _instance = instance;
 1361        _desc = transceiver.ToString();
 1362        _type = transceiver.protocol();
 1363        _connector = connector;
 1364        _endpoint = endpoint;
 1365        _adapter = adapter;
 1366        InitializationData initData = instance.initializationData();
 1367        _logger = initData.logger; // Cached for better performance.
 1368        _traceLevels = instance.traceLevels(); // Cached for better performance.
 1369        _connectTimeout = options.connectTimeout;
 1370        _closeTimeout = options.closeTimeout; // not used for datagram connections
 1371        // suppress inactivity timeout for datagram connections
 1372        _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
 1373        _maxDispatches = options.maxDispatches;
 1374        _removeFromFactory = removeFromFactory;
 1375        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 1376        _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
 1377        _nextRequestId = 1;
 1378        _messageSizeMax = connector is null ? adapter.messageSizeMax() : instance.messageSizeMax();
 1379        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
 1380        _readStream = new InputStream(instance, Protocol.currentProtocolEncoding);
 1381        _readHeader = false;
 1382        _readStreamPos = -1;
 1383        _writeStream = new OutputStream(); // temporary stream
 1384        _writeStreamPos = -1;
 1385        _upcallCount = 0;
 1386        _state = StateNotInitialized;
 1387
 1388        _compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
 1389        if (_compressionLevel < 1)
 1390        {
 1391            _compressionLevel = 1;
 1392        }
 1393        else if (_compressionLevel > 9)
 1394        {
 1395            _compressionLevel = 9;
 1396        }
 1397
 1398        if (options.idleTimeout > TimeSpan.Zero && !endpoint.datagram())
 1399        {
 1400            _idleTimeoutTransceiver = new IdleTimeoutTransceiverDecorator(
 1401                transceiver,
 1402                this,
 1403                options.idleTimeout,
 1404                options.enableIdleCheck);
 1405            transceiver = _idleTimeoutTransceiver;
 1406        }
 1407        _transceiver = transceiver;
 1408
 1409        try
 1410        {
 1411            if (connector is null)
 1412            {
 1413                // adapter is always set for incoming connections
 1414                Debug.Assert(adapter is not null);
 1415                _threadPool = adapter.getThreadPool();
 1416            }
 1417            else
 1418            {
 1419                // we use the client thread pool for outgoing connections, even if there is an
 1420                // object adapter with its own thread pool.
 1421                _threadPool = instance.clientThreadPool();
 1422            }
 1423            _threadPool.initialize(this);
 1424        }
 1425        catch (LocalException)
 1426        {
 1427            throw;
 1428        }
 1429        catch (System.Exception ex)
 1430        {
 1431            throw new SyscallException(ex);
 1432        }
 1433    }
 1434
 1435    /// <summary>
 1436    /// Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active and
 1437    /// does not receive a byte for some time. See the IdleTimeoutTransceiverDecorator.
 1438    /// </summary>
 1439    internal void idleCheck(TimeSpan idleTimeout)
 1440    {
 1441        lock (_mutex)
 1442        {
 1443            if (_state == StateActive && _idleTimeoutTransceiver!.idleCheckEnabled)
 1444            {
 1445                int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;
 1446
 1447                setState(
 1448                    StateClosed,
 1449                    new ConnectionAbortedException(
 1450                        $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSe
 1451                        closedByApplication: false));
 1452            }
 1453            // else nothing to do
 1454        }
 1455    }
 1456
 1457    internal void sendHeartbeat()
 1458    {
 1459        Debug.Assert(!_endpoint.datagram());
 1460
 1461        lock (_mutex)
 1462        {
 1463            if (_state == StateActive || _state == StateHolding || _state == StateClosing)
 1464            {
 1465                // We check if the connection has become inactive.
 1466                if (
 1467                    _inactivityTimer is null &&           // timer not already scheduled
 1468                    _inactivityTimeout > TimeSpan.Zero && // inactivity timeout is enabled
 1469                    _state == StateActive &&              // only schedule the timer if the connection is active
 1470                    _dispatchCount == 0 &&                // no pending dispatch
 1471                    _asyncRequests.Count == 0 &&          // no pending invocation
 1472                    _readHeader &&                        // we're not waiting for the remainder of an incoming message
 1473                    _sendStreams.Count <= 1)              // there is at most one pending outgoing message
 1474                {
 1475                    // We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
 1476                    // inactivity timer if there is no pending outgoing message or the pending outgoing message is a
 1477                    // heartbeat.
 1478
 1479                    // The stream of the first _sendStreams message is in _writeStream.
 1480                    if (_sendStreams.Count == 0 || isHeartbeat(_writeStream))
 1481                    {
 1482                        scheduleInactivityTimer();
 1483                    }
 1484                }
 1485
 1486                // We send a heartbeat to the peer to generate a "write" on the connection. This write in turns creates
 1487                // a read on the peer, and resets the peer's idle check timer. When _sendStream is not empty, there is
 1488                // already an outstanding write, so we don't need to send a heartbeat. It's possible the first message
 1489                // of _sendStreams was already sent but not yet removed from _sendStreams: it means the last write
 1490                // occurred very recently, which is good enough with respect to the idle check.
 1491                // As a result of this optimization, the only possible heartbeat in _sendStreams is the first
 1492                // _sendStreams message.
 1493                if (_sendStreams.Count == 0)
 1494                {
 1495                    var os = new OutputStream(Protocol.currentProtocolEncoding);
 1496                    os.writeBlob(Protocol.magic);
 1497                    ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 1498                    EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 1499                    os.writeByte(Protocol.validateConnectionMsg);
 1500                    os.writeByte(0);
 1501                    os.writeInt(Protocol.headerSize); // Message size.
 1502                    try
 1503                    {
 1504                        _ = sendMessage(new OutgoingMessage(os, compress: false));
 1505                    }
 1506                    catch (LocalException ex)
 1507                    {
 1508                        setState(StateClosed, ex);
 1509                    }
 1510                }
 1511            }
 1512            // else nothing to do
 1513        }
 1514
 1515        static bool isHeartbeat(OutputStream stream) =>
 1516            stream.getBuffer().b.get(8) == Protocol.validateConnectionMsg;
 1517    }
 1518
 1519    private const int StateNotInitialized = 0;
 1520    private const int StateNotValidated = 1;
 1521    private const int StateActive = 2;
 1522    private const int StateHolding = 3;
 1523    private const int StateClosing = 4;
 1524    private const int StateClosingPending = 5;
 1525    private const int StateClosed = 6;
 1526    private const int StateFinished = 7;
 1527
 1528    private static ConnectionState toConnectionState(int state) => connectionStateMap[state];
 1529
 1530    private void setState(int state, LocalException ex)
 1531    {
 1532        //
 1533        // If setState() is called with an exception, then only closed
 1534        // and closing states are permissible.
 1535        //
 1536        Debug.Assert(state >= StateClosing);
 1537
 1538        if (_state == state) // Don't switch twice.
 1539        {
 1540            return;
 1541        }
 1542
 1543        if (_exception is null)
 1544        {
 1545            //
 1546            // If we are in closed state, an exception must be set.
 1547            //
 1548            Debug.Assert(_state != StateClosed);
 1549
 1550            _exception = ex;
 1551
 1552            //
 1553            // We don't warn if we are not validated.
 1554            //
 1555            if (_warn && _validated)
 1556            {
 1557                //
 1558                // Don't warn about certain expected exceptions.
 1559                //
 1560                if (!(_exception is CloseConnectionException ||
 1561                     _exception is ConnectionClosedException ||
 1562                     _exception is CommunicatorDestroyedException ||
 1563                     _exception is ObjectAdapterDeactivatedException ||
 1564                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1565                {
 1566                    warning("connection exception", _exception);
 1567                }
 1568            }
 1569        }
 1570
 1571        //
 1572        // We must set the new state before we notify requests of any
 1573        // exceptions. Otherwise new requests may retry on a
 1574        // connection that is not yet marked as closed or closing.
 1575        //
 1576        setState(state);
 1577    }
 1578
 1579    private void setState(int state)
 1580    {
 1581        //
 1582        // We don't want to send close connection messages if the endpoint
 1583        // only supports oneway transmission from client to server.
 1584        //
 1585        if (_endpoint.datagram() && state == StateClosing)
 1586        {
 1587            state = StateClosed;
 1588        }
 1589
 1590        //
 1591        // Skip graceful shutdown if we are destroyed before validation.
 1592        //
 1593        if (_state <= StateNotValidated && state == StateClosing)
 1594        {
 1595            state = StateClosed;
 1596        }
 1597
 1598        if (_state == state) // Don't switch twice.
 1599        {
 1600            return;
 1601        }
 1602
 1603        if (state > StateActive)
 1604        {
 1605            // Dispose the inactivity timer, if not null.
 1606            cancelInactivityTimer();
 1607        }
 1608
 1609        try
 1610        {
 1611            switch (state)
 1612            {
 1613                case StateNotInitialized:
 1614                {
 1615                    Debug.Assert(false);
 1616                    break;
 1617                }
 1618
 1619                case StateNotValidated:
 1620                {
 1621                    if (_state != StateNotInitialized)
 1622                    {
 1623                        Debug.Assert(_state == StateClosed);
 1624                        return;
 1625                    }
 1626                    break;
 1627                }
 1628
 1629                case StateActive:
 1630                {
 1631                    //
 1632                    // Can only switch to active from holding or not validated.
 1633                    //
 1634                    if (_state != StateHolding && _state != StateNotValidated)
 1635                    {
 1636                        return;
 1637                    }
 1638
 1639                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)
 1640                    {
 1641                        _threadPool.register(this, SocketOperation.Read);
 1642                        _idleTimeoutTransceiver?.enableIdleCheck();
 1643                    }
 1644                    // else don't resume reading since we're at or over the _maxDispatches limit.
 1645
 1646                    break;
 1647                }
 1648
 1649                case StateHolding:
 1650                {
 1651                    //
 1652                    // Can only switch to holding from active or not validated.
 1653                    //
 1654                    if (_state != StateActive && _state != StateNotValidated)
 1655                    {
 1656                        return;
 1657                    }
 1658
 1659                    if (_state == StateActive && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches))
 1660                    {
 1661                        _threadPool.unregister(this, SocketOperation.Read);
 1662                        _idleTimeoutTransceiver?.disableIdleCheck();
 1663                    }
 1664                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
 1665
 1666                    break;
 1667                }
 1668
 1669                case StateClosing:
 1670                case StateClosingPending:
 1671                {
 1672                    //
 1673                    // Can't change back from closing pending.
 1674                    //
 1675                    if (_state >= StateClosingPending)
 1676                    {
 1677                        return;
 1678                    }
 1679                    break;
 1680                }
 1681
 1682                case StateClosed:
 1683                {
 1684                    if (_state == StateFinished)
 1685                    {
 1686                        return;
 1687                    }
 1688
 1689                    _batchRequestQueue.destroy(_exception);
 1690                    _threadPool.finish(this);
 1691                    _transceiver.close();
 1692                    break;
 1693                }
 1694
 1695                case StateFinished:
 1696                {
 1697                    Debug.Assert(_state == StateClosed);
 1698                    _transceiver.destroy();
 1699                    break;
 1700                }
 1701            }
 1702        }
 1703        catch (LocalException ex)
 1704        {
 1705            _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
 1706        }
 1707
 1708        if (_instance.initializationData().observer is not null)
 1709        {
 1710            ConnectionState oldState = toConnectionState(_state);
 1711            ConnectionState newState = toConnectionState(state);
 1712            if (oldState != newState)
 1713            {
 1714                _observer = _instance.initializationData().observer.getConnectionObserver(
 1715                    initConnectionInfo(),
 1716                    _endpoint,
 1717                    newState,
 1718                    _observer);
 1719                if (_observer is not null)
 1720                {
 1721                    _observer.attach();
 1722                }
 1723                else
 1724                {
 1725                    _writeStreamPos = -1;
 1726                    _readStreamPos = -1;
 1727                }
 1728            }
 1729            if (_observer is not null && state == StateClosed && _exception is not null)
 1730            {
 1731                if (!(_exception is CloseConnectionException ||
 1732                     _exception is ConnectionClosedException ||
 1733                     _exception is CommunicatorDestroyedException ||
 1734                     _exception is ObjectAdapterDeactivatedException ||
 1735                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1736                {
 1737                    _observer.failed(_exception.ice_id());
 1738                }
 1739            }
 1740        }
 1741        _state = state;
 1742
 1743        Monitor.PulseAll(_mutex);
 1744
 1745        if (_state == StateClosing && _upcallCount == 0)
 1746        {
 1747            try
 1748            {
 1749                initiateShutdown();
 1750            }
 1751            catch (LocalException ex)
 1752            {
 1753                setState(StateClosed, ex);
 1754            }
 1755        }
 1756    }
 1757
 1758    private void initiateShutdown()
 1759    {
 1760        Debug.Assert(_state == StateClosing && _upcallCount == 0);
 1761
 1762        if (_shutdownInitiated)
 1763        {
 1764            return;
 1765        }
 1766        _shutdownInitiated = true;
 1767
 1768        if (!_endpoint.datagram())
 1769        {
 1770            //
 1771            // Before we shut down, we send a close connection message.
 1772            //
 1773            var os = new OutputStream(Protocol.currentProtocolEncoding);
 1774            os.writeBlob(Protocol.magic);
 1775            ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 1776            EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 1777            os.writeByte(Protocol.closeConnectionMsg);
 1778            os.writeByte(0); // Compression status: always zero for close connection.
 1779            os.writeInt(Protocol.headerSize); // Message size.
 1780
 1781            scheduleCloseTimer();
 1782
 1783            if ((sendMessage(new OutgoingMessage(os, compress: false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
 1784            {
 1785                setState(StateClosingPending);
 1786
 1787                //
 1788                // Notify the transceiver of the graceful connection closure.
 1789                //
 1790                int op = _transceiver.closing(true, _exception);
 1791                if (op != 0)
 1792                {
 1793                    _threadPool.register(this, op);
 1794                }
 1795            }
 1796        }
 1797    }
 1798
 1799    private bool initialize(int operation)
 1800    {
 1801        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
 1802        if (s != SocketOperation.None)
 1803        {
 1804            _threadPool.update(this, operation, s);
 1805            return false;
 1806        }
 1807
 1808        //
 1809        // Update the connection description once the transceiver is initialized.
 1810        //
 1811        _desc = _transceiver.ToString();
 1812        _initialized = true;
 1813        setState(StateNotValidated);
 1814
 1815        return true;
 1816    }
 1817
 1818    private bool validate(int operation)
 1819    {
 1820        if (!_endpoint.datagram()) // Datagram connections are always implicitly validated.
 1821        {
 1822            if (_connector is null) // The server side has the active role for connection validation.
 1823            {
 1824                if (_writeStream.size() == 0)
 1825                {
 1826                    _writeStream.writeBlob(Protocol.magic);
 1827                    ProtocolVersion.ice_write(_writeStream, Protocol.currentProtocol);
 1828                    EncodingVersion.ice_write(_writeStream, Protocol.currentProtocolEncoding);
 1829                    _writeStream.writeByte(Protocol.validateConnectionMsg);
 1830                    _writeStream.writeByte(0); // Compression status (always zero for validate connection).
 1831                    _writeStream.writeInt(Protocol.headerSize); // Message size.
 1832                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
 1833                    _writeStream.prepareWrite();
 1834                }
 1835
 1836                if (_observer is not null)
 1837                {
 1838                    observerStartWrite(_writeStream.getBuffer());
 1839                }
 1840
 1841                if (_writeStream.pos() != _writeStream.size())
 1842                {
 1843                    int op = write(_writeStream.getBuffer());
 1844                    if (op != 0)
 1845                    {
 1846                        _threadPool.update(this, operation, op);
 1847                        return false;
 1848                    }
 1849                }
 1850
 1851                if (_observer is not null)
 1852                {
 1853                    observerFinishWrite(_writeStream.getBuffer());
 1854                }
 1855            }
 1856            else // The client side has the passive role for connection validation.
 1857            {
 1858                if (_readStream.size() == 0)
 1859                {
 1860                    _readStream.resize(Protocol.headerSize);
 1861                    _readStream.pos(0);
 1862                }
 1863
 1864                if (_observer is not null)
 1865                {
 1866                    observerStartRead(_readStream.getBuffer());
 1867                }
 1868
 1869                if (_readStream.pos() != _readStream.size())
 1870                {
 1871                    int op = read(_readStream.getBuffer());
 1872                    if (op != 0)
 1873                    {
 1874                        _threadPool.update(this, operation, op);
 1875                        return false;
 1876                    }
 1877                }
 1878
 1879                if (_observer is not null)
 1880                {
 1881                    observerFinishRead(_readStream.getBuffer());
 1882                }
 1883
 1884                _validated = true;
 1885
 1886                Debug.Assert(_readStream.pos() == Protocol.headerSize);
 1887                _readStream.pos(0);
 1888                byte[] m = _readStream.readBlob(4);
 1889                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 1890                   m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 1891                {
 1892                    throw new ProtocolException(
 1893                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 1894                }
 1895
 1896                var pv = new ProtocolVersion(_readStream);
 1897                if (pv != Protocol.currentProtocol)
 1898                {
 1899                    throw new MarshalException(
 1900                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 1901                }
 1902                var ev = new EncodingVersion(_readStream);
 1903                if (ev != Protocol.currentProtocolEncoding)
 1904                {
 1905                    throw new MarshalException(
 1906                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 1907                }
 1908
 1909                byte messageType = _readStream.readByte();
 1910                if (messageType != Protocol.validateConnectionMsg)
 1911                {
 1912                    throw new ProtocolException(
 1913                        $"Received message of type {messageType} over a connection that is not yet validated.");
 1914                }
 1915                _readStream.readByte(); // Ignore compression status for validate connection.
 1916                int size = _readStream.readInt();
 1917                if (size != Protocol.headerSize)
 1918                {
 1919                    throw new MarshalException($"Received ValidateConnection message with unexpected size {size}.");
 1920                }
 1921                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);
 1922
 1923                // Client connection starts sending heartbeats once it's received the ValidateConnection message.
 1924                _idleTimeoutTransceiver?.scheduleHeartbeat();
 1925            }
 1926        }
 1927
 1928        _writeStream.resize(0);
 1929        _writeStream.pos(0);
 1930
 1931        _readStream.resize(Protocol.headerSize);
 1932        _readStream.pos(0);
 1933        _readHeader = true;
 1934
 1935        if (_instance.traceLevels().network >= 1)
 1936        {
 1937            var s = new StringBuilder();
 1938            if (_endpoint.datagram())
 1939            {
 1940                s.Append("starting to ");
 1941                s.Append(_connector is not null ? "send" : "receive");
 1942                s.Append(' ');
 1943                s.Append(_endpoint.protocol());
 1944                s.Append(" messages\n");
 1945                s.Append(_transceiver.toDetailedString());
 1946            }
 1947            else
 1948            {
 1949                s.Append(_connector is not null ? "established" : "accepted");
 1950                s.Append(' ');
 1951                s.Append(_endpoint.protocol());
 1952                s.Append(" connection\n");
 1953                s.Append(ToString());
 1954            }
 1955            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1956        }
 1957
 1958        return true;
 1959    }
 1960
 1961    /// <summary>
 1962    /// Sends the next queued messages. This method is called by message() once the message which is being sent
 1963    /// (_sendStreams.First) is fully sent. Before sending the next message, this message is removed from _sendsStream
 1964    /// If any, its sent callback is also queued in given callback queue.
 1965    /// </summary>
 1966    /// <param name="callbacks">The sent callbacks to call for the messages that were sent.</param>
 1967    /// <returns>The socket operation to register with the thread pool's selector to send the remainder of the pending
 1968    /// message being sent (_sendStreams.First).</returns>
 1969    private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
 1970    {
 1971        callbacks = null;
 1972
 1973        if (_sendStreams.Count == 0)
 1974        {
 1975            // This can occur if no message was being written and the socket write operation was registered with the
 1976            // thread pool (a transceiver read method can request writing data).
 1977            return SocketOperation.None;
 1978        }
 1979        else if (_state == StateClosingPending && _writeStream.pos() == 0)
 1980        {
 1981            // Message wasn't sent, empty the _writeStream, we're not going to send more data because the connection
 1982            // is being closed.
 1983            OutgoingMessage message = _sendStreams.First.Value;
 1984            _writeStream.swap(message.stream);
 1985            return SocketOperation.None;
 1986        }
 1987
 1988        // Assert that the message was fully written.
 1989        Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
 1990
 1991        try
 1992        {
 1993            while (true)
 1994            {
 1995                //
 1996                // The message that was being sent is sent. We can swap back the write stream buffer to the
 1997                // outgoing message (required for retry) and queue its sent callback (if any).
 1998                //
 1999                OutgoingMessage message = _sendStreams.First.Value;
 2000                _writeStream.swap(message.stream);
 2001                if (message.sent())
 2002                {
 2003                    callbacks ??= new Queue<OutgoingMessage>();
 2004                    callbacks.Enqueue(message);
 2005                }
 2006                _sendStreams.RemoveFirst();
 2007
 2008                //
 2009                // If there's nothing left to send, we're done.
 2010                //
 2011                if (_sendStreams.Count == 0)
 2012                {
 2013                    break;
 2014                }
 2015
 2016                //
 2017                // If we are in the closed state or if the close is pending, don't continue sending. This can occur if
 2018                // parseMessage (called before sendNextMessage by message()) closes the connection.
 2019                //
 2020                if (_state >= StateClosingPending)
 2021                {
 2022                    return SocketOperation.None;
 2023                }
 2024
 2025                //
 2026                // Otherwise, prepare the next message.
 2027                //
 2028                message = _sendStreams.First.Value;
 2029                Debug.Assert(!message.prepared);
 2030                OutputStream stream = message.stream;
 2031
 2032                message.stream = doCompress(message.stream, message.compress);
 2033                message.stream.prepareWrite();
 2034                message.prepared = true;
 2035
 2036                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2037
 2038                //
 2039                // Send the message.
 2040                //
 2041                _writeStream.swap(message.stream);
 2042                if (_observer is not null)
 2043                {
 2044                    observerStartWrite(_writeStream.getBuffer());
 2045                }
 2046                if (_writeStream.pos() != _writeStream.size())
 2047                {
 2048                    int op = write(_writeStream.getBuffer());
 2049                    if (op != 0)
 2050                    {
 2051                        return op;
 2052                    }
 2053                }
 2054                if (_observer is not null)
 2055                {
 2056                    observerFinishWrite(_writeStream.getBuffer());
 2057                }
 2058
 2059                // If the message was sent right away, loop to send the next queued message.
 2060            }
 2061
 2062            // Once the CloseConnection message is sent, we transition to the StateClosingPending state.
 2063            if (_state == StateClosing && _shutdownInitiated)
 2064            {
 2065                setState(StateClosingPending);
 2066                int op = _transceiver.closing(true, _exception);
 2067                if (op != 0)
 2068                {
 2069                    return op;
 2070                }
 2071            }
 2072        }
 2073        catch (LocalException ex)
 2074        {
 2075            setState(StateClosed, ex);
 2076        }
 2077        return SocketOperation.None;
 2078    }
 2079
 2080    /// <summary>
 2081    /// Sends or queues the given message.
 2082    /// </summary>
 2083    /// <param name="message">The message to send.</param>
 2084    /// <returns>The send status.</returns>
 2085    private int sendMessage(OutgoingMessage message)
 2086    {
 2087        Debug.Assert(_state >= StateActive);
 2088        Debug.Assert(_state < StateClosed);
 2089
 2090        // Some messages are queued for sending. Just adds the message to the send queue and tell the caller that
 2091        // the message was queued.
 2092        if (_sendStreams.Count > 0)
 2093        {
 2094            _sendStreams.AddLast(message);
 2095            return OutgoingAsyncBase.AsyncStatusQueued;
 2096        }
 2097
 2098        // Prepare the message for sending.
 2099        Debug.Assert(!message.prepared);
 2100
 2101        OutputStream stream = message.stream;
 2102
 2103        message.stream = doCompress(stream, message.compress);
 2104        message.stream.prepareWrite();
 2105        message.prepared = true;
 2106
 2107        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2108
 2109        // Send the message without blocking.
 2110        if (_observer is not null)
 2111        {
 2112            observerStartWrite(message.stream.getBuffer());
 2113        }
 2114        int op = write(message.stream.getBuffer());
 2115        if (op == 0)
 2116        {
 2117            // The message was sent so we're done.
 2118
 2119            if (_observer is not null)
 2120            {
 2121                observerFinishWrite(message.stream.getBuffer());
 2122            }
 2123
 2124            int status = OutgoingAsyncBase.AsyncStatusSent;
 2125            if (message.sent())
 2126            {
 2127                // If there's a sent callback, indicate the caller that it should invoke the sent callback.
 2128                status |= OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
 2129            }
 2130
 2131            return status;
 2132        }
 2133
 2134        // The message couldn't be sent right away so we add it to the send stream queue (which is empty) and swap its
 2135        // stream with `_writeStream`. The socket operation returned by the transceiver write is registered with the
 2136        // thread pool. At this point the message() method will take care of sending the whole message (held by
 2137        // _writeStream) when the transceiver is ready to write more of the message buffer.
 2138
 2139        _writeStream.swap(message.stream);
 2140        _sendStreams.AddLast(message);
 2141        _threadPool.register(this, op);
 2142        return OutgoingAsyncBase.AsyncStatusQueued;
 2143    }
 2144
 2145    private OutputStream doCompress(OutputStream decompressed, bool compress)
 2146    {
 2147        if (BZip2.isLoaded(_logger) && compress && decompressed.size() >= 100)
 2148        {
 2149            //
 2150            // Do compression.
 2151            //
 2152            Ice.Internal.Buffer cbuf = BZip2.compress(
 2153                decompressed.getBuffer(),
 2154                Protocol.headerSize,
 2155                _compressionLevel);
 2156            if (cbuf is not null)
 2157            {
 2158                var cstream = new OutputStream(new Internal.Buffer(cbuf, true), decompressed.getEncoding());
 2159
 2160                //
 2161                // Set compression status.
 2162                //
 2163                cstream.pos(9);
 2164                cstream.writeByte(2);
 2165
 2166                //
 2167                // Write the size of the compressed stream into the header.
 2168                //
 2169                cstream.pos(10);
 2170                cstream.writeInt(cstream.size());
 2171
 2172                //
 2173                // Write the compression status and size of the compressed stream into the header of the
 2174                // decompressed stream -- we need this to trace requests correctly.
 2175                //
 2176                decompressed.pos(9);
 2177                decompressed.writeByte(2);
 2178                decompressed.writeInt(cstream.size());
 2179
 2180                return cstream;
 2181            }
 2182        }
 2183
 2184        // Write the compression status. If BZip2 is loaded and compress is set to true, we write 1, to request a
 2185        // compressed reply. Otherwise, we write 0 either BZip2 is not loaded or we are sending an uncompressed reply.
 2186        decompressed.pos(9);
 2187        decompressed.writeByte((byte)((BZip2.isLoaded(_logger) && compress) ? 1 : 0));
 2188
 2189        //
 2190        // Not compressed, fill in the message size.
 2191        //
 2192        decompressed.pos(10);
 2193        decompressed.writeInt(decompressed.size());
 2194
 2195        return decompressed;
 2196    }
 2197
 2198    private struct MessageInfo
 2199    {
 2200        public InputStream stream;
 2201        public int requestCount;
 2202        public int requestId;
 2203        public byte compress;
 2204        public ObjectAdapter adapter;
 2205        public OutgoingAsyncBase outAsync;
 2206        public int upcallCount;
 2207    }
 2208
 2209    private int parseMessage(ref MessageInfo info)
 2210    {
 2211        Debug.Assert(_state > StateNotValidated && _state < StateClosed);
 2212
 2213        info.stream = new InputStream(_instance, Protocol.currentProtocolEncoding);
 2214        _readStream.swap(info.stream);
 2215        _readStream.resize(Protocol.headerSize);
 2216        _readStream.pos(0);
 2217        _readHeader = true;
 2218
 2219        Debug.Assert(info.stream.pos() == info.stream.size());
 2220
 2221        try
 2222        {
 2223            //
 2224            // The magic and version fields have already been checked.
 2225            //
 2226            info.stream.pos(8);
 2227            byte messageType = info.stream.readByte();
 2228            info.compress = info.stream.readByte();
 2229            if (info.compress == 2)
 2230            {
 2231                if (BZip2.isLoaded(_logger))
 2232                {
 2233                    Ice.Internal.Buffer ubuf = BZip2.decompress(
 2234                        info.stream.getBuffer(),
 2235                        Protocol.headerSize,
 2236                        _messageSizeMax);
 2237                    info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
 2238                }
 2239                else
 2240                {
 2241                    throw new FeatureNotSupportedException(
 2242                        "Cannot decompress compressed message: BZip2 library is not loaded.");
 2243                }
 2244            }
 2245            info.stream.pos(Protocol.headerSize);
 2246
 2247            switch (messageType)
 2248            {
 2249                case Protocol.closeConnectionMsg:
 2250                {
 2251                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2252                    if (_endpoint.datagram())
 2253                    {
 2254                        if (_warn)
 2255                        {
 2256                            _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
 2257                        }
 2258                    }
 2259                    else
 2260                    {
 2261                        setState(StateClosingPending, new CloseConnectionException());
 2262
 2263                        //
 2264                        // Notify the transceiver of the graceful connection closure.
 2265                        //
 2266                        int op = _transceiver.closing(false, _exception);
 2267                        if (op != 0)
 2268                        {
 2269                            scheduleCloseTimer();
 2270                            return op;
 2271                        }
 2272                        setState(StateClosed);
 2273                    }
 2274                    break;
 2275                }
 2276
 2277                case Protocol.requestMsg:
 2278                {
 2279                    if (_state >= StateClosing)
 2280                    {
 2281                        TraceUtil.trace(
 2282                            "received request during closing\n(ignored by server, client will retry)",
 2283                            info.stream,
 2284                            this,
 2285                            _logger,
 2286                            _traceLevels);
 2287                    }
 2288                    else
 2289                    {
 2290                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2291                        info.requestId = info.stream.readInt();
 2292                        info.requestCount = 1;
 2293                        info.adapter = _adapter;
 2294                        ++info.upcallCount;
 2295
 2296                        cancelInactivityTimer();
 2297                        ++_dispatchCount;
 2298                    }
 2299                    break;
 2300                }
 2301
 2302                case Protocol.requestBatchMsg:
 2303                {
 2304                    if (_state >= StateClosing)
 2305                    {
 2306                        TraceUtil.trace(
 2307                            "received batch request during closing\n(ignored by server, client will retry)",
 2308                            info.stream,
 2309                            this,
 2310                            _logger,
 2311                            _traceLevels);
 2312                    }
 2313                    else
 2314                    {
 2315                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2316                        int requestCount = info.stream.readInt();
 2317                        if (requestCount < 0)
 2318                        {
 2319                            throw new MarshalException($"Received batch request with {requestCount} batches.");
 2320                        }
 2321                        info.requestCount = requestCount;
 2322                        info.adapter = _adapter;
 2323                        info.upcallCount += info.requestCount;
 2324
 2325                        cancelInactivityTimer();
 2326                        _dispatchCount += info.requestCount;
 2327                    }
 2328                    break;
 2329                }
 2330
 2331                case Protocol.replyMsg:
 2332                {
 2333                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2334                    info.requestId = info.stream.readInt();
 2335                    if (_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
 2336                    {
 2337                        _asyncRequests.Remove(info.requestId);
 2338
 2339                        info.outAsync.getIs().swap(info.stream);
 2340
 2341                        //
 2342                        // If we just received the reply for a request which isn't acknowledge as
 2343                        // sent yet, we queue the reply instead of processing it right away. It
 2344                        // will be processed once the write callback is invoked for the message.
 2345                        //
 2346                        OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
 2347                        if (message is not null && message.outAsync == info.outAsync)
 2348                        {
 2349                            message.receivedReply = true;
 2350                        }
 2351                        else if (info.outAsync.response())
 2352                        {
 2353                            ++info.upcallCount;
 2354                        }
 2355                        else
 2356                        {
 2357                            info.outAsync = null;
 2358                        }
 2359                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 2360                        {
 2361                            doApplicationClose();
 2362                        }
 2363                    }
 2364                    break;
 2365                }
 2366
 2367                case Protocol.validateConnectionMsg:
 2368                {
 2369                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2370                    break;
 2371                }
 2372
 2373                default:
 2374                {
 2375                    TraceUtil.trace(
 2376                        "received unknown message\n(invalid, closing connection)",
 2377                        info.stream,
 2378                        this,
 2379                        _logger,
 2380                        _traceLevels);
 2381
 2382                    throw new ProtocolException($"Received Ice protocol message with unknown type: {messageType}");
 2383                }
 2384            }
 2385        }
 2386        catch (LocalException ex)
 2387        {
 2388            if (_endpoint.datagram())
 2389            {
 2390                if (_warn)
 2391                {
 2392                    _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
 2393                }
 2394            }
 2395            else
 2396            {
 2397                setState(StateClosed, ex);
 2398            }
 2399        }
 2400
 2401        if (_state == StateHolding)
 2402        {
 2403            // Don't continue reading if the connection is in the holding state.
 2404            return SocketOperation.None;
 2405        }
 2406        else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
 2407        {
 2408            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
 2409            _idleTimeoutTransceiver?.disableIdleCheck();
 2410            return SocketOperation.None;
 2411        }
 2412        else
 2413        {
 2414            // Continue reading.
 2415            return SocketOperation.Read;
 2416        }
 2417    }
 2418
 2419    private void dispatchAll(
 2420        InputStream stream,
 2421        int requestCount,
 2422        int requestId,
 2423        byte compress,
 2424        ObjectAdapter adapter)
 2425    {
 2426        // Note: In contrast to other private or protected methods, this method must be called *without* the mutex
 2427        // locked.
 2428
 2429        Object dispatcher = adapter?.dispatchPipeline;
 2430
 2431        try
 2432        {
 2433            while (requestCount > 0)
 2434            {
 2435                // adapter can be null here, however the adapter set in current can't be null, and we never pass
 2436                // a null current.adapter to the application code. Once this file enables nullable, adapter should be
 2437                // adapter! below.
 2438                var request = new IncomingRequest(requestId, this, adapter, stream);
 2439
 2440                if (dispatcher is not null)
 2441                {
 2442                    // We don't and can't await the dispatchAsync: with batch requests, we want all the dispatches to
 2443                    // execute in the current Ice thread pool thread. If we awaited the dispatchAsync, we could
 2444                    // switch to a .NET thread pool thread.
 2445                    _ = dispatchAsync(request);
 2446                }
 2447                else
 2448                {
 2449                    // Received request on a connection without an object adapter.
 2450                    sendResponse(
 2451                        request.current.createOutgoingResponse(new ObjectNotExistException()),
 2452                        isTwoWay: !_endpoint.datagram() && requestId != 0,
 2453                        compress: 0);
 2454                }
 2455                --requestCount;
 2456            }
 2457
 2458            stream.clear();
 2459        }
 2460        catch (LocalException ex) // TODO: catch all exceptions
 2461        {
 2462            // Typically, the IncomingRequest constructor throws an exception, and we can't continue.
 2463            dispatchException(ex, requestCount);
 2464        }
 2465
 2466        async Task dispatchAsync(IncomingRequest request)
 2467        {
 2468            try
 2469            {
 2470                OutgoingResponse response;
 2471
 2472                try
 2473                {
 2474                    response = await dispatcher.dispatchAsync(request).ConfigureAwait(false);
 2475                }
 2476                catch (System.Exception ex)
 2477                {
 2478                    response = request.current.createOutgoingResponse(ex);
 2479                }
 2480
 2481                sendResponse(response, isTwoWay: !_endpoint.datagram() && requestId != 0, compress);
 2482            }
 2483            catch (LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 2484            {
 2485                dispatchException(ex, requestCount: 1);
 2486            }
 2487        }
 2488    }
 2489
 2490    private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compress)
 2491    {
 2492        bool finished = false;
 2493        try
 2494        {
 2495            lock (_mutex)
 2496            {
 2497                Debug.Assert(_state > StateNotValidated);
 2498
 2499                try
 2500                {
 2501                    if (--_upcallCount == 0)
 2502                    {
 2503                        if (_state == StateFinished)
 2504                        {
 2505                            finished = true;
 2506                            _observer?.detach();
 2507                        }
 2508                        Monitor.PulseAll(_mutex);
 2509                    }
 2510
 2511                    if (_state >= StateClosed)
 2512                    {
 2513                        Debug.Assert(_exception is not null);
 2514                        throw _exception;
 2515                    }
 2516
 2517                    if (isTwoWay)
 2518                    {
 2519                        sendMessage(new OutgoingMessage(response.outputStream, compress > 0));
 2520                    }
 2521
 2522                    if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
 2523                    {
 2524                        // Resume reading if the connection is active and the dispatch count is about to be less than
 2525                        // _maxDispatches.
 2526                        _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
 2527                        _idleTimeoutTransceiver?.enableIdleCheck();
 2528                    }
 2529
 2530                    --_dispatchCount;
 2531
 2532                    if (_state == StateClosing && _upcallCount == 0)
 2533                    {
 2534                        initiateShutdown();
 2535                    }
 2536                }
 2537                catch (LocalException ex)
 2538                {
 2539                    setState(StateClosed, ex);
 2540                }
 2541            }
 2542        }
 2543        finally
 2544        {
 2545            if (finished && _removeFromFactory is not null)
 2546            {
 2547                _removeFromFactory(this);
 2548            }
 2549        }
 2550    }
 2551
 2552    private void dispatchException(LocalException ex, int requestCount)
 2553    {
 2554        bool finished = false;
 2555
 2556        // Fatal exception while dispatching a request. Since sendResponse isn't called in case of a fatal exception
 2557        // we decrement _upcallCount here.
 2558        lock (_mutex)
 2559        {
 2560            setState(StateClosed, ex);
 2561
 2562            if (requestCount > 0)
 2563            {
 2564                Debug.Assert(_upcallCount >= requestCount);
 2565                _upcallCount -= requestCount;
 2566                if (_upcallCount == 0)
 2567                {
 2568                    if (_state == StateFinished)
 2569                    {
 2570                        finished = true;
 2571                        _observer?.detach();
 2572                    }
 2573                    Monitor.PulseAll(_mutex);
 2574                }
 2575            }
 2576        }
 2577
 2578        if (finished && _removeFromFactory is not null)
 2579        {
 2580            _removeFromFactory(this);
 2581        }
 2582    }
 2583
 2584    private void inactivityCheck(System.Threading.Timer inactivityTimer)
 2585    {
 2586        lock (_mutex)
 2587        {
 2588            // If the timers are different, it means this inactivityTimer is no longer current.
 2589            if (inactivityTimer == _inactivityTimer)
 2590            {
 2591                _inactivityTimer = null;
 2592                inactivityTimer.Dispose(); // non-blocking
 2593
 2594                if (_state == StateActive)
 2595                {
 2596                    setState(
 2597                        StateClosing,
 2598                        new ConnectionClosedException(
 2599                            "Connection closed because it remained inactive for longer than the inactivity timeout.",
 2600                            closedByApplication: false));
 2601                }
 2602            }
 2603            // Else this timer was already canceled and disposed. Nothing to do.
 2604        }
 2605    }
 2606
 2607    private void connectTimedOut(System.Threading.Timer connectTimer)
 2608    {
 2609        lock (_mutex)
 2610        {
 2611            if (_state < StateActive)
 2612            {
 2613                setState(StateClosed, new ConnectTimeoutException());
 2614            }
 2615        }
 2616        // else ignore since we're already connected.
 2617        connectTimer.Dispose();
 2618    }
 2619
 2620    private void closeTimedOut(System.Threading.Timer closeTimer)
 2621    {
 2622        lock (_mutex)
 2623        {
 2624            if (_state < StateClosed)
 2625            {
 2626                // We don't use setState(state, exception) because we want to overwrite the exception set by a
 2627                // graceful closure.
 2628                _exception = new CloseTimeoutException();
 2629                setState(StateClosed);
 2630            }
 2631        }
 2632        // else ignore since we're already closed.
 2633        closeTimer.Dispose();
 2634    }
 2635
 2636    private ConnectionInfo initConnectionInfo()
 2637    {
 2638        // Called with _mutex locked.
 2639
 2640        if (_state > StateNotInitialized && _info is not null) // Update the connection info until it's initialized
 2641        {
 2642            return _info;
 2643        }
 2644
 2645        _info =
 2646            _transceiver.getInfo(incoming: _connector is null, _adapter?.getName() ?? "", _endpoint.connectionId());
 2647        return _info;
 2648    }
 2649
 2650    private void warning(string msg, System.Exception ex) => _logger.warning($"{msg}:\n{ex}\n{_transceiver}");
 2651
 2652    private void observerStartRead(Ice.Internal.Buffer buf)
 2653    {
 2654        if (_readStreamPos >= 0)
 2655        {
 2656            Debug.Assert(!buf.empty());
 2657            _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2658        }
 2659        _readStreamPos = buf.empty() ? -1 : buf.b.position();
 2660    }
 2661
 2662    private void observerFinishRead(Ice.Internal.Buffer buf)
 2663    {
 2664        if (_readStreamPos == -1)
 2665        {
 2666            return;
 2667        }
 2668        Debug.Assert(buf.b.position() >= _readStreamPos);
 2669        _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2670        _readStreamPos = -1;
 2671    }
 2672
 2673    private void observerStartWrite(Ice.Internal.Buffer buf)
 2674    {
 2675        if (_writeStreamPos >= 0)
 2676        {
 2677            Debug.Assert(!buf.empty());
 2678            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2679        }
 2680        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
 2681    }
 2682
 2683    private void observerFinishWrite(Ice.Internal.Buffer buf)
 2684    {
 2685        if (_writeStreamPos == -1)
 2686        {
 2687            return;
 2688        }
 2689        if (buf.b.position() > _writeStreamPos)
 2690        {
 2691            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2692        }
 2693        _writeStreamPos = -1;
 2694    }
 2695
 2696    private int read(Ice.Internal.Buffer buf)
 2697    {
 2698        int start = buf.b.position();
 2699        int op = _transceiver.read(buf, ref _hasMoreData);
 2700        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2701        {
 2702            var s = new StringBuilder("received ");
 2703            if (_endpoint.datagram())
 2704            {
 2705                s.Append(buf.b.limit());
 2706            }
 2707            else
 2708            {
 2709                s.Append(buf.b.position() - start);
 2710                s.Append(" of ");
 2711                s.Append(buf.b.limit() - start);
 2712            }
 2713            s.Append(" bytes via ");
 2714            s.Append(_endpoint.protocol());
 2715            s.Append('\n');
 2716            s.Append(ToString());
 2717            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2718        }
 2719        return op;
 2720    }
 2721
 2722    private int write(Ice.Internal.Buffer buf)
 2723    {
 2724        int start = buf.b.position();
 2725        int op = _transceiver.write(buf);
 2726        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2727        {
 2728            var s = new StringBuilder("sent ");
 2729            s.Append(buf.b.position() - start);
 2730            if (!_endpoint.datagram())
 2731            {
 2732                s.Append(" of ");
 2733                s.Append(buf.b.limit() - start);
 2734            }
 2735            s.Append(" bytes via ");
 2736            s.Append(_endpoint.protocol());
 2737            s.Append('\n');
 2738            s.Append(ToString());
 2739            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2740        }
 2741        return op;
 2742    }
 2743
 2744    private void scheduleInactivityTimer()
 2745    {
 2746        // Called with the ConnectionI mutex locked.
 2747        Debug.Assert(_inactivityTimer is null);
 2748        Debug.Assert(_inactivityTimeout > TimeSpan.Zero);
 2749
 2750        _inactivityTimer = new System.Threading.Timer(
 2751            inactivityTimer => inactivityCheck((System.Threading.Timer)inactivityTimer));
 2752        _inactivityTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 2753    }
 2754
 2755    private void cancelInactivityTimer()
 2756    {
 2757        // Called with the ConnectionI mutex locked.
 2758        if (_inactivityTimer is not null)
 2759        {
 2760            _inactivityTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 2761            _inactivityTimer.Dispose();
 2762            _inactivityTimer = null;
 2763        }
 2764    }
 2765
 2766    private void scheduleCloseTimer()
 2767    {
 2768        if (_closeTimeout > TimeSpan.Zero)
 2769        {
 2770#pragma warning disable CA2000 // closeTimer is disposed by closeTimedOut.
 2771            var closeTimer = new System.Threading.Timer(
 2772                timerObj => closeTimedOut((System.Threading.Timer)timerObj));
 2773            // schedule timer to run once; closeTimedOut disposes the timer too.
 2774            closeTimer.Change(_closeTimeout, Timeout.InfiniteTimeSpan);
 2775#pragma warning restore CA2000
 2776        }
 2777    }
 2778
 2779    private void doApplicationClose()
 2780    {
 2781        // Called with the ConnectionI mutex locked.
 2782        Debug.Assert(_state < StateClosing);
 2783        setState(
 2784            StateClosing,
 2785            new ConnectionClosedException(
 2786                "The connection was closed gracefully by the application.",
 2787                closedByApplication: true));
 2788    }
 2789
 2790    private class OutgoingMessage
 2791    {
 12792        internal OutgoingMessage(OutputStream stream, bool compress)
 2793        {
 12794            this.stream = stream;
 12795            this.compress = compress;
 12796        }
 2797
 12798        internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
 2799        {
 12800            this.outAsync = outAsync;
 12801            this.stream = stream;
 12802            this.compress = compress;
 12803            this.requestId = requestId;
 12804        }
 2805
 2806        internal void canceled()
 2807        {
 2808            Debug.Assert(outAsync is not null); // Only requests can timeout.
 12809            outAsync = null;
 12810        }
 2811
 2812        internal bool sent()
 2813        {
 12814            stream = null;
 12815            if (outAsync is not null)
 2816            {
 12817                invokeSent = outAsync.sent();
 12818                return invokeSent || receivedReply;
 2819            }
 12820            return false;
 2821        }
 2822
 2823        internal void completed(LocalException ex)
 2824        {
 12825            if (outAsync is not null)
 2826            {
 12827                if (outAsync.exception(ex))
 2828                {
 12829                    outAsync.invokeException();
 2830                }
 2831            }
 12832            stream = null;
 12833        }
 2834
 2835        internal OutputStream stream;
 2836        internal OutgoingAsyncBase outAsync;
 2837        internal bool compress;
 2838        internal int requestId;
 2839        internal bool prepared;
 2840        internal bool isSent;
 2841        internal bool invokeSent;
 2842        internal bool receivedReply;
 2843    }
 2844
 2845    private static readonly ConnectionState[] connectionStateMap = [
 2846        ConnectionState.ConnectionStateValidating,   // StateNotInitialized
 2847        ConnectionState.ConnectionStateValidating,   // StateNotValidated
 2848        ConnectionState.ConnectionStateActive,       // StateActive
 2849        ConnectionState.ConnectionStateHolding,      // StateHolding
 2850        ConnectionState.ConnectionStateClosing,      // StateClosing
 2851        ConnectionState.ConnectionStateClosing,      // StateClosingPending
 2852        ConnectionState.ConnectionStateClosed,       // StateClosed
 2853        ConnectionState.ConnectionStateClosed,       // StateFinished
 2854    ];
 2855
 2856    private readonly Instance _instance;
 2857    private readonly Transceiver _transceiver;
 2858    private readonly IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
 2859
 2860    private string _desc;
 2861    private readonly string _type;
 2862    private readonly Connector _connector;
 2863    private readonly EndpointI _endpoint;
 2864
 2865    private ObjectAdapter _adapter;
 2866
 2867    private readonly Logger _logger;
 2868    private readonly TraceLevels _traceLevels;
 2869    private readonly Ice.Internal.ThreadPool _threadPool;
 2870
 2871    private readonly TimeSpan _connectTimeout;
 2872    private readonly TimeSpan _closeTimeout;
 2873    private TimeSpan _inactivityTimeout; // protected by _mutex
 2874
 2875    private System.Threading.Timer _inactivityTimer; // can be null
 2876
 2877    private StartCallback _startCallback;
 2878
 2879    // This action must be called outside the ConnectionI lock to avoid lock acquisition deadlocks.
 2880    private readonly Action<ConnectionI> _removeFromFactory;
 2881
 2882    private readonly bool _warn;
 2883    private readonly bool _warnUdp;
 2884
 2885    private readonly int _compressionLevel;
 2886
 2887    private int _nextRequestId;
 2888
 2889    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 2890
 2891    private LocalException _exception;
 2892
 2893    private readonly int _messageSizeMax;
 2894    private readonly BatchRequestQueue _batchRequestQueue;
 2895
 2896    private readonly LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
 2897
 2898    // Contains the message which is being received. If the connection is waiting to receive a message (_readHeader ==
 2899    // true), its size is Protocol.headerSize. Otherwise, its size is the message size specified in the received message
 2900    // header.
 2901    private readonly InputStream _readStream;
 2902
 2903    // When _readHeader is true, the next bytes we'll read are the header of a new message. When false, we're reading
 2904    // next the remainder of a message that was already partially received.
 2905    private bool _readHeader;
 2906
 2907    // Contains the message which is being sent. The write stream buffer is empty if no message is being sent.
 2908    private readonly OutputStream _writeStream;
 2909
 2910    private ConnectionObserver _observer;
 2911    private int _readStreamPos;
 2912    private int _writeStreamPos;
 2913
 2914    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent callbacks and
 2915    // connection establishment callbacks that have been started (or are about to be started) by a thread of the thread
 2916    // pool associated with this connection, and have not completed yet. All these operations except the connection
 2917    // establishment callbacks execute application code or code generated from Slice definitions.
 2918    private int _upcallCount;
 2919
 2920    // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
 2921    // _dispatchCount can be greater than a non-0 _maxDispatches when a receive a batch with multiples requests.
 2922    private int _dispatchCount;
 2923
 2924    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to back-pressure the peer.
 2925    // _maxDispatches <= 0 means no limit.
 2926    private readonly int _maxDispatches;
 2927
 2928    private int _state; // The current state.
 2929    private bool _shutdownInitiated;
 2930    private bool _initialized;
 2931    private bool _validated;
 2932
 2933    // When true, the application called close and Connection must close the connection when it receives the reply
 2934    // for the last outstanding invocation.
 2935    private bool _closeRequested;
 2936
 2937    private ConnectionInfo _info;
 2938
 2939    private CloseCallback _closeCallback;
 2940
 2941    // We need to run the continuation asynchronously since it can be completed by an Ice thread pool thread.
 2942    private readonly TaskCompletionSource _closed = new(TaskCreationOptions.RunContinuationsAsynchronously);
 2943    private readonly object _mutex = new();
 2944}