< Summary

Information
Class: Ice.ConnectionI.OutgoingMessage
Assembly: Ice
File(s): /_/csharp/src/Ice/ConnectionI.cs
Tag: 105_25977636357
Line coverage
100%
Covered lines: 22
Uncovered lines: 0
Coverable lines: 22
Total lines: 2959
Line coverage: 100%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage
100%
Covered methods: 5
Fully covered methods: 5
Total methods: 5
Method coverage: 100%
Full method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
.ctor(...)100%11100%
canceled()100%11100%
sent()100%44100%
completed(...)100%44100%

File(s)

/_/csharp/src/Ice/ConnectionI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Ice.Instrumentation;
 4using Ice.Internal;
 5using System.Diagnostics;
 6using System.Text;
 7
 8namespace Ice;
 9
 10#pragma warning disable CA1001 // _inactivityTimer is disposed by cancelInactivityTimer.
 11public sealed class ConnectionI : Internal.EventHandler, CancellationHandler, Connection
 12#pragma warning restore CA1001
 13{
 14    internal interface StartCallback
 15    {
 16        void connectionStartCompleted(ConnectionI connection);
 17
 18        void connectionStartFailed(ConnectionI connection, LocalException ex);
 19    }
 20
 21    internal void start(StartCallback callback)
 22    {
 23        try
 24        {
 25            lock (_mutex)
 26            {
 27                //
 28                // The connection might already be closed if the communicator was destroyed.
 29                //
 30                if (_state >= StateClosed)
 31                {
 32                    Debug.Assert(_exception is not null);
 33                    throw _exception;
 34                }
 35
 36                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 37                {
 38                    if (_connectTimeout > TimeSpan.Zero)
 39                    {
 40#pragma warning disable CA2000 // connectTimer is disposed by connectTimedOut.
 41                        var connectTimer = new System.Threading.Timer(
 42                            timerObj => connectTimedOut((System.Threading.Timer)timerObj));
 43                        // schedule timer to run once; connectTimedOut disposes the timer too.
 44                        connectTimer.Change(_connectTimeout, Timeout.InfiniteTimeSpan);
 45#pragma warning restore CA2000
 46                    }
 47
 48                    _startCallback = callback;
 49                    return;
 50                }
 51
 52                // The connection starts in the holding state. It will be activated by the connection factory.
 53                setState(StateHolding);
 54            }
 55        }
 56        catch (LocalException ex)
 57        {
 58            exception(ex);
 59            callback.connectionStartFailed(this, _exception);
 60            return;
 61        }
 62
 63        callback.connectionStartCompleted(this);
 64    }
 65
 66    internal void startAndWait()
 67    {
 68        try
 69        {
 70            lock (_mutex)
 71            {
 72                //
 73                // The connection might already be closed if the communicator was destroyed.
 74                //
 75                if (_state >= StateClosed)
 76                {
 77                    Debug.Assert(_exception is not null);
 78                    throw _exception;
 79                }
 80
 81                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 82                {
 83                    //
 84                    // Wait for the connection to be validated.
 85                    //
 86                    while (_state <= StateNotValidated)
 87                    {
 88                        Monitor.Wait(_mutex);
 89                    }
 90
 91                    if (_state >= StateClosing)
 92                    {
 93                        Debug.Assert(_exception is not null);
 94                        throw _exception;
 95                    }
 96                }
 97
 98                //
 99                // We start out in holding state.
 100                //
 101                setState(StateHolding);
 102            }
 103        }
 104        catch (LocalException ex)
 105        {
 106            exception(ex);
 107            waitUntilFinished();
 108            return;
 109        }
 110    }
 111
 112    internal void activate()
 113    {
 114        lock (_mutex)
 115        {
 116            if (_state <= StateNotValidated)
 117            {
 118                return;
 119            }
 120
 121            setState(StateActive);
 122        }
 123    }
 124
 125    internal void hold()
 126    {
 127        lock (_mutex)
 128        {
 129            if (_state <= StateNotValidated)
 130            {
 131                return;
 132            }
 133
 134            setState(StateHolding);
 135        }
 136    }
 137
 138    // DestructionReason.
 139    public const int ObjectAdapterDeactivated = 0;
 140    public const int CommunicatorDestroyed = 1;
 141
 142    internal void destroy(int reason)
 143    {
 144        lock (_mutex)
 145        {
 146            switch (reason)
 147            {
 148                case ObjectAdapterDeactivated:
 149                {
 150                    setState(StateClosing, new ObjectAdapterDeactivatedException(_adapter?.getName() ?? ""));
 151                    break;
 152                }
 153
 154                case CommunicatorDestroyed:
 155                {
 156                    setState(StateClosing, new CommunicatorDestroyedException());
 157                    break;
 158                }
 159            }
 160        }
 161    }
 162
 163    public void abort()
 164    {
 165        lock (_mutex)
 166        {
 167            setState(
 168                StateClosed,
 169                new ConnectionAbortedException(
 170                    "The connection was aborted by the application.",
 171                    closedByApplication: true));
 172        }
 173    }
 174
 175    public Task closeAsync()
 176    {
 177        lock (_mutex)
 178        {
 179            if (_state < StateClosing)
 180            {
 181                if (_asyncRequests.Count == 0)
 182                {
 183                    doApplicationClose();
 184                }
 185                else
 186                {
 187                    _closeRequested = true;
 188                    scheduleCloseTimer(); // we don't wait forever for outstanding invocations to complete
 189                }
 190            }
 191            // else nothing to do, already closing or closed.
 192        }
 193
 194        return _closed.Task;
 195    }
 196
 197    internal bool isActiveOrHolding()
 198    {
 199        lock (_mutex)
 200        {
 201            return _state > StateNotValidated && _state < StateClosing;
 202        }
 203    }
 204
 205    public void throwException()
 206    {
 207        lock (_mutex)
 208        {
 209            if (_exception is not null)
 210            {
 211                Debug.Assert(_state >= StateClosing);
 212                throw _exception;
 213            }
 214        }
 215    }
 216
 217    internal void waitUntilHolding()
 218    {
 219        lock (_mutex)
 220        {
 221            while (_state < StateHolding || _upcallCount > 0)
 222            {
 223                Monitor.Wait(_mutex);
 224            }
 225        }
 226    }
 227
 228    internal void waitUntilFinished()
 229    {
 230        lock (_mutex)
 231        {
 232            //
 233            // We wait indefinitely until the connection is finished and all
 234            // outstanding requests are completed. Otherwise we couldn't
 235            // guarantee that there are no outstanding calls when deactivate()
 236            // is called on the servant locators.
 237            //
 238            while (_state < StateFinished || _upcallCount > 0)
 239            {
 240                Monitor.Wait(_mutex);
 241            }
 242
 243            Debug.Assert(_state == StateFinished);
 244
 245            //
 246            // Clear the OA. See bug 1673 for the details of why this is necessary.
 247            //
 248            _adapter = null;
 249        }
 250    }
 251
 252    internal void updateObserver()
 253    {
 254        lock (_mutex)
 255        {
 256            if (_state < StateNotValidated || _state > StateClosed)
 257            {
 258                return;
 259            }
 260
 261            Debug.Assert(_instance.initializationData().observer is not null);
 262            _observer = _instance.initializationData().observer.getConnectionObserver(
 263                initConnectionInfo(),
 264                _endpoint,
 265                toConnectionState(_state),
 266                _observer);
 267            if (_observer is not null)
 268            {
 269                _observer.attach();
 270            }
 271            else
 272            {
 273                _writeStreamPos = -1;
 274                _readStreamPos = -1;
 275            }
 276        }
 277    }
 278
 279    internal int sendAsyncRequest(
 280        OutgoingAsyncBase og,
 281        bool compress,
 282        bool response,
 283        int batchRequestCount)
 284    {
 285        OutputStream os = og.getOs();
 286
 287        lock (_mutex)
 288        {
 289            //
 290            // If the exception is closed before we even have a chance
 291            // to send our request, we always try to send the request
 292            // again.
 293            //
 294            if (_exception is not null)
 295            {
 296                throw new RetryException(_exception);
 297            }
 298
 299            Debug.Assert(_state > StateNotValidated);
 300            Debug.Assert(_state < StateClosing);
 301
 302            //
 303            // Ensure the message isn't bigger than what we can send with the
 304            // transport.
 305            //
 306            _transceiver.checkSendSize(os.getBuffer());
 307
 308            //
 309            // Notify the request that it's cancelable with this connection.
 310            // This will throw if the request is canceled.
 311            //
 312            og.cancelable(this);
 313            int requestId = 0;
 314            if (response)
 315            {
 316                //
 317                // Create a new unique request ID.
 318                //
 319                requestId = _nextRequestId++;
 320                if (requestId <= 0)
 321                {
 322                    _nextRequestId = 1;
 323                    requestId = _nextRequestId++;
 324                }
 325
 326                //
 327                // Fill in the request ID.
 328                //
 329                os.pos(Protocol.headerSize);
 330                os.writeInt(requestId);
 331            }
 332            else if (batchRequestCount > 0)
 333            {
 334                os.pos(Protocol.headerSize);
 335                os.writeInt(batchRequestCount);
 336            }
 337
 338            og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
 339
 340            // We're just about to send a request, so we are not inactive anymore.
 341            cancelInactivityTimer();
 342
 343            int status = OutgoingAsyncBase.AsyncStatusQueued;
 344            try
 345            {
 346                var message = new OutgoingMessage(og, os, compress, requestId);
 347                status = sendMessage(message);
 348            }
 349            catch (LocalException ex)
 350            {
 351                setState(StateClosed, ex);
 352                Debug.Assert(_exception is not null);
 353                throw _exception;
 354            }
 355
 356            if (response)
 357            {
 358                //
 359                // Add to the async requests map.
 360                //
 361                _asyncRequests[requestId] = og;
 362            }
 363            return status;
 364        }
 365    }
 366
 367    internal BatchRequestQueue getBatchRequestQueue() => _batchRequestQueue;
 368
 369    public void flushBatchRequests(CompressBatch compress)
 370    {
 371        try
 372        {
 373            var completed = new FlushBatchTaskCompletionCallback();
 374            var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 375            outgoing.invoke(_flushBatchRequests_name, compress, true);
 376            completed.Task.Wait();
 377        }
 378        catch (AggregateException ex)
 379        {
 380            throw ex.InnerException;
 381        }
 382    }
 383
 384    public Task flushBatchRequestsAsync(
 385        CompressBatch compress,
 386        IProgress<bool> progress = null,
 387        CancellationToken cancel = default)
 388    {
 389        var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
 390        var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 391        outgoing.invoke(_flushBatchRequests_name, compress, false);
 392        return completed.Task;
 393    }
 394
 395    private const string _flushBatchRequests_name = "flushBatchRequests";
 396
 397    public void disableInactivityCheck()
 398    {
 399        lock (_mutex)
 400        {
 401            cancelInactivityTimer();
 402            _inactivityTimeout = TimeSpan.Zero;
 403        }
 404    }
 405
 406    public void setCloseCallback(CloseCallback callback)
 407    {
 408        lock (_mutex)
 409        {
 410            if (_state >= StateClosed)
 411            {
 412                if (callback is not null)
 413                {
 414                    _threadPool.execute(
 415                        () =>
 416                        {
 417                            try
 418                            {
 419                                callback(this);
 420                            }
 421                            catch (System.Exception ex)
 422                            {
 423                                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 424                            }
 425                        },
 426                        this);
 427                }
 428            }
 429            else
 430            {
 431                _closeCallback = callback;
 432            }
 433        }
 434    }
 435
 436    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
 437    {
 438        //
 439        // NOTE: This isn't called from a thread pool thread.
 440        //
 441
 442        lock (_mutex)
 443        {
 444            if (_state >= StateClosed)
 445            {
 446                return; // The request has already been or will be shortly notified of the failure.
 447            }
 448
 449            OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
 450            if (o is not null)
 451            {
 452                if (o.requestId > 0)
 453                {
 454                    _asyncRequests.Remove(o.requestId);
 455                }
 456
 457                if (ex is ConnectionAbortedException)
 458                {
 459                    setState(StateClosed, ex);
 460                }
 461                else
 462                {
 463                    //
 464                    // If the request is being sent, don't remove it from the send streams,
 465                    // it will be removed once the sending is finished.
 466                    //
 467                    if (o == _sendStreams.First.Value)
 468                    {
 469                        o.canceled();
 470                    }
 471                    else
 472                    {
 473                        o.canceled();
 474                        _sendStreams.Remove(o);
 475                    }
 476                    if (outAsync.exception(ex))
 477                    {
 478                        outAsync.invokeExceptionAsync();
 479                    }
 480                }
 481
 482                if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 483                {
 484                    doApplicationClose();
 485                }
 486                return;
 487            }
 488
 489            if (outAsync is OutgoingAsync)
 490            {
 491                foreach (KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
 492                {
 493                    if (kvp.Value == outAsync)
 494                    {
 495                        if (ex is ConnectionAbortedException)
 496                        {
 497                            setState(StateClosed, ex);
 498                        }
 499                        else
 500                        {
 501                            _asyncRequests.Remove(kvp.Key);
 502                            if (outAsync.exception(ex))
 503                            {
 504                                outAsync.invokeExceptionAsync();
 505                            }
 506                        }
 507
 508                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 509                        {
 510                            doApplicationClose();
 511                        }
 512                        return;
 513                    }
 514                }
 515            }
 516        }
 517    }
 518
 519    internal EndpointI endpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 520
 521    internal Connector connector() => _connector; // No mutex protection necessary, _endpoint is immutable.
 522
 523    public void setAdapter(ObjectAdapter adapter)
 524    {
 525        if (_connector is null) // server connection
 526        {
 527            throw new InvalidOperationException("setAdapter can only be called on a client connection");
 528        }
 529
 530        if (adapter is not null)
 531        {
 532            // Go through the adapter to set the adapter and servant manager on this connection
 533            // to ensure the object adapter is still active.
 534            adapter.setAdapterOnConnection(this);
 535        }
 536        else
 537        {
 538            lock (_mutex)
 539            {
 540                if (_state <= StateNotValidated || _state >= StateClosing)
 541                {
 542                    return;
 543                }
 544                _adapter = null;
 545            }
 546        }
 547
 548        //
 549        // We never change the thread pool with which we were initially
 550        // registered, even if we add or remove an object adapter.
 551        //
 552    }
 553
 554    public ObjectAdapter getAdapter()
 555    {
 556        lock (_mutex)
 557        {
 558            return _adapter;
 559        }
 560    }
 561
 562    public Endpoint getEndpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 563
 564    public ObjectPrx createProxy(Identity id)
 565    {
 566        ObjectAdapter.checkIdentity(id);
 567        return new ObjectPrxHelper(_instance.referenceFactory().create(id, this));
 568    }
 569
 570    public void setAdapterFromAdapter(ObjectAdapter adapter)
 571    {
 572        lock (_mutex)
 573        {
 574            if (_state <= StateNotValidated || _state >= StateClosing)
 575            {
 576                return;
 577            }
 578            Debug.Assert(adapter is not null); // Called by ObjectAdapter::setAdapterOnConnection
 579            _adapter = adapter;
 580
 581            // Clear cached connection info (if any) as it's no longer accurate.
 582            _info = null;
 583        }
 584    }
 585
 586    //
 587    // Operations from EventHandler
 588    //
 589    public override bool startAsync(int operation, Ice.Internal.AsyncCallback completedCallback)
 590    {
 591        if (_state >= StateClosed)
 592        {
 593            return false;
 594        }
 595
 596        if (_threadHopRequired)
 597        {
 598            // Run the I/O on a .NET ThreadPool thread so it survives the initiating Ice worker exiting.
 599            // See ctor for when this is required.
 600            Task.Run(doIO);
 601        }
 602        else
 603        {
 604            doIO();
 605        }
 606
 607        return true;
 608
 609        void doIO()
 610        {
 611            lock (_mutex)
 612            {
 613                if (_state >= StateClosed)
 614                {
 615                    completedCallback(this);
 616                    return;
 617                }
 618
 619                try
 620                {
 621                    if ((operation & SocketOperation.Write) != 0)
 622                    {
 623                        if (_observer != null)
 624                        {
 625                            observerStartWrite(_writeStream.getBuffer());
 626                        }
 627
 628                        bool completedSynchronously =
 629                            _transceiver.startWrite(
 630                                _writeStream.getBuffer(),
 631                                completedCallback,
 632                                this,
 633                                out bool messageWritten);
 634
 635                        // If the startWrite call wrote the message, we assume the message is sent now for at-most-once
 636                        // semantics in the event the connection is closed while the message is still in _sendStreams.
 637                        if (messageWritten && _sendStreams.Count > 0)
 638                        {
 639                            // See finish() code.
 640                            _sendStreams.First.Value.isSent = true;
 641                        }
 642
 643                        if (completedSynchronously)
 644                        {
 645                            // If the write completed synchronously, we need to call the completedCallback.
 646                            completedCallback(this);
 647                        }
 648                    }
 649                    else if ((operation & SocketOperation.Read) != 0)
 650                    {
 651                        if (_observer != null && !_readHeader)
 652                        {
 653                            observerStartRead(_readStream.getBuffer());
 654                        }
 655
 656                        if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
 657                        {
 658                            completedCallback(this);
 659                        }
 660                    }
 661                }
 662                catch (LocalException ex)
 663                {
 664                    setState(StateClosed, ex);
 665                    completedCallback(this);
 666                }
 667            }
 668        }
 669    }
 670
 671    public override bool finishAsync(int operation)
 672    {
 673        if (_state >= StateClosed)
 674        {
 675            return false;
 676        }
 677
 678        try
 679        {
 680            if ((operation & SocketOperation.Write) != 0)
 681            {
 682                Ice.Internal.Buffer buf = _writeStream.getBuffer();
 683                int start = buf.b.position();
 684                _transceiver.finishWrite(buf);
 685                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 686                {
 687                    var s = new StringBuilder("sent ");
 688                    s.Append(buf.b.position() - start);
 689                    if (!_endpoint.datagram())
 690                    {
 691                        s.Append(" of ");
 692                        s.Append(buf.b.limit() - start);
 693                    }
 694                    s.Append(" bytes via ");
 695                    s.Append(_endpoint.protocol());
 696                    s.Append('\n');
 697                    s.Append(ToString());
 698                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 699                }
 700
 701                if (_observer is not null)
 702                {
 703                    observerFinishWrite(_writeStream.getBuffer());
 704                }
 705            }
 706            else if ((operation & SocketOperation.Read) != 0)
 707            {
 708                Ice.Internal.Buffer buf = _readStream.getBuffer();
 709                int start = buf.b.position();
 710                _transceiver.finishRead(buf);
 711                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 712                {
 713                    var s = new StringBuilder("received ");
 714                    if (_endpoint.datagram())
 715                    {
 716                        s.Append(buf.b.limit());
 717                    }
 718                    else
 719                    {
 720                        s.Append(buf.b.position() - start);
 721                        s.Append(" of ");
 722                        s.Append(buf.b.limit() - start);
 723                    }
 724                    s.Append(" bytes via ");
 725                    s.Append(_endpoint.protocol());
 726                    s.Append('\n');
 727                    s.Append(ToString());
 728                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 729                }
 730
 731                if (_observer is not null && !_readHeader)
 732                {
 733                    observerFinishRead(_readStream.getBuffer());
 734                }
 735            }
 736        }
 737        catch (LocalException ex)
 738        {
 739            setState(StateClosed, ex);
 740        }
 741        return _state < StateClosed;
 742    }
 743
 744    public override void message(ThreadPoolCurrent current)
 745    {
 746        StartCallback startCB = null;
 747        Queue<OutgoingMessage> sentCBs = null;
 748        var info = new MessageInfo();
 749        int upcallCount = 0;
 750
 751        using var msg = new ThreadPoolMessage(current, _mutex);
 752        lock (_mutex)
 753        {
 754            try
 755            {
 756                if (!msg.startIOScope())
 757                {
 758                    return;
 759                }
 760
 761                if (_state >= StateClosed)
 762                {
 763                    return;
 764                }
 765
 766                try
 767                {
 768                    int writeOp = SocketOperation.None;
 769                    int readOp = SocketOperation.None;
 770
 771                    // If writes are ready, write the data from the connection's write buffer (_writeStream)
 772                    if ((current.operation & SocketOperation.Write) != 0)
 773                    {
 774                        if (_observer is not null)
 775                        {
 776                            observerStartWrite(_writeStream.getBuffer());
 777                        }
 778                        writeOp = write(_writeStream.getBuffer());
 779                        if (_observer is not null && (writeOp & SocketOperation.Write) == 0)
 780                        {
 781                            observerFinishWrite(_writeStream.getBuffer());
 782                        }
 783                    }
 784
 785                    // If reads are ready, read the data into the connection's read buffer (_readStream). The data is
 786                    // read until:
 787                    // - the full message is read (the transport read returns SocketOperationNone) and
 788                    //   the read buffer is fully filled
 789                    // - the read operation on the transport can't continue without blocking
 790                    if ((current.operation & SocketOperation.Read) != 0)
 791                    {
 792                        while (true)
 793                        {
 794                            Ice.Internal.Buffer buf = _readStream.getBuffer();
 795
 796                            if (_observer is not null && !_readHeader)
 797                            {
 798                                observerStartRead(buf);
 799                            }
 800
 801                            readOp = read(buf);
 802                            if ((readOp & SocketOperation.Read) != 0)
 803                            {
 804                                // Can't continue without blocking, exit out of the loop.
 805                                break;
 806                            }
 807                            if (_observer is not null && !_readHeader)
 808                            {
 809                                Debug.Assert(!buf.b.hasRemaining());
 810                                observerFinishRead(buf);
 811                            }
 812
 813                            // If read header is true, we're reading a new Ice protocol message and we need to read
 814                            // the message header.
 815                            if (_readHeader)
 816                            {
 817                                // The next read will read the remainder of the message.
 818                                _readHeader = false;
 819
 820                                _observer?.receivedBytes(Protocol.headerSize);
 821
 822                                //
 823                                // Connection is validated on first message. This is only used by
 824                                // setState() to check whether or not we can print a connection
 825                                // warning (a client might close the connection forcefully if the
 826                                // connection isn't validated, we don't want to print a warning
 827                                // in this case).
 828                                //
 829                                _validated = true;
 830
 831                                // Full header should be read because the size of _readStream is always headerSize (14)
 832                                // when reading a new message (see the code that sets _readHeader = true).
 833                                int pos = _readStream.pos();
 834                                if (pos < Protocol.headerSize)
 835                                {
 836                                    //
 837                                    // This situation is possible for small UDP packets.
 838                                    //
 839                                    throw new MarshalException("Received Ice message with too few bytes in header.");
 840                                }
 841
 842                                // Decode the header.
 843                                _readStream.pos(0);
 844                                byte[] m = new byte[4];
 845                                m[0] = _readStream.readByte();
 846                                m[1] = _readStream.readByte();
 847                                m[2] = _readStream.readByte();
 848                                m[3] = _readStream.readByte();
 849                                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 850                                m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 851                                {
 852                                    throw new ProtocolException(
 853                                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 854                                }
 855
 856                                var pv = new ProtocolVersion(_readStream);
 857                                if (pv != Protocol.currentProtocol)
 858                                {
 859                                    throw new MarshalException(
 860                                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 861                                }
 862                                var ev = new EncodingVersion(_readStream);
 863                                if (ev != Protocol.currentProtocolEncoding)
 864                                {
 865                                    throw new MarshalException(
 866                                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 867                                }
 868
 869                                _readStream.readByte(); // messageType
 870                                _readStream.readByte(); // compress
 871                                int size = _readStream.readInt();
 872                                if (size < Protocol.headerSize)
 873                                {
 874                                    throw new MarshalException($"Received Ice message with unexpected size {size}.");
 875                                }
 876
 877                                // Resize the read buffer to the message size.
 878                                if (size > _messageSizeMax)
 879                                {
 880                                    Ex.throwMemoryLimitException(size, _messageSizeMax);
 881                                }
 882                                if (size > _readStream.size())
 883                                {
 884                                    _readStream.resize(size);
 885                                }
 886                                _readStream.pos(pos);
 887                            }
 888
 889                            if (buf.b.hasRemaining())
 890                            {
 891                                if (_endpoint.datagram())
 892                                {
 893                                    throw new DatagramLimitException(); // The message was truncated.
 894                                }
 895                                continue;
 896                            }
 897                            break;
 898                        }
 899                    }
 900
 901                    // readOp and writeOp are set to the operations that the transport read or write calls from above
 902                    // returned. They indicate which operations will need to be monitored by the thread pool's selector
 903                    // when this method returns.
 904                    int newOp = readOp | writeOp;
 905
 906                    // Operations that are ready. For example, if message was called with SocketOperationRead and the
 907                    // transport read returned SocketOperationNone, reads are considered done: there's no additional
 908                    // data to read.
 909                    int readyOp = current.operation & ~newOp;
 910
 911                    if (_state <= StateNotValidated)
 912                    {
 913                        // If the connection is still not validated and there's still data to read or write, continue
 914                        // waiting for data to read or write.
 915                        if (newOp != 0)
 916                        {
 917                            _threadPool.update(this, current.operation, newOp);
 918                            return;
 919                        }
 920
 921                        // Initialize the connection if it's not initialized yet.
 922                        if (_state == StateNotInitialized && !initialize(current.operation))
 923                        {
 924                            return;
 925                        }
 926
 927                        // Validate the connection if it's not validated yet.
 928                        if (_state <= StateNotValidated && !validate(current.operation))
 929                        {
 930                            return;
 931                        }
 932
 933                        // The connection is validated and doesn't need additional data to be read or written. So
 934                        // unregister it from the thread pool's selector.
 935                        _threadPool.unregister(this, current.operation);
 936
 937                        //
 938                        // We start out in holding state.
 939                        //
 940                        setState(StateHolding);
 941                        if (_startCallback is not null)
 942                        {
 943                            startCB = _startCallback;
 944                            _startCallback = null;
 945                            if (startCB is not null)
 946                            {
 947                                ++upcallCount;
 948                            }
 949                        }
 950                    }
 951                    else
 952                    {
 953                        Debug.Assert(_state <= StateClosingPending);
 954
 955                        //
 956                        // We parse messages first, if we receive a close
 957                        // connection message we won't send more messages.
 958                        //
 959                        if ((readyOp & SocketOperation.Read) != 0)
 960                        {
 961                            // At this point, the protocol message is fully read and can therefore be decoded by
 962                            // parseMessage. parseMessage returns the operation to wait for readiness next.
 963                            newOp |= parseMessage(ref info);
 964                            upcallCount += info.upcallCount;
 965                        }
 966
 967                        if ((readyOp & SocketOperation.Write) != 0)
 968                        {
 969                            // At this point the message from _writeStream is fully written and the next message can be
 970                            // written.
 971
 972                            newOp |= sendNextMessage(out sentCBs);
 973                            if (sentCBs is not null)
 974                            {
 975                                ++upcallCount;
 976                            }
 977                        }
 978
 979                        // If the connection is not closed yet, we can update the thread pool selector to wait for
 980                        // readiness of read, write or both operations.
 981                        if (_state < StateClosed)
 982                        {
 983                            _threadPool.update(this, current.operation, newOp);
 984                        }
 985                    }
 986
 987                    if (upcallCount == 0)
 988                    {
 989                        return; // Nothing to execute, we're done!
 990                    }
 991
 992                    _upcallCount += upcallCount;
 993
 994                    // There's something to execute so we mark IO as completed to elect a new leader thread and let IO
 995                    // be performed on this new leader thread while this thread continues with executing the upcalls.
 996                    msg.ioCompleted();
 997                }
 998                catch (DatagramLimitException) // Expected.
 999                {
 1000                    if (_warnUdp)
 1001                    {
 1002                        _logger.warning($"maximum datagram size of {_readStream.pos()} exceeded");
 1003                    }
 1004                    _readStream.resize(Protocol.headerSize);
 1005                    _readStream.pos(0);
 1006                    _readHeader = true;
 1007                    return;
 1008                }
 1009                catch (SocketException ex)
 1010                {
 1011                    setState(StateClosed, ex);
 1012                    return;
 1013                }
 1014                catch (LocalException ex)
 1015                {
 1016                    if (_endpoint.datagram())
 1017                    {
 1018                        if (_warn)
 1019                        {
 1020                            _logger.warning($"datagram connection exception:\n{ex}\n{_desc}");
 1021                        }
 1022                        _readStream.resize(Protocol.headerSize);
 1023                        _readStream.pos(0);
 1024                        _readHeader = true;
 1025                    }
 1026                    else
 1027                    {
 1028                        setState(StateClosed, ex);
 1029                    }
 1030                    return;
 1031                }
 1032            }
 1033            finally
 1034            {
 1035                msg.finishIOScope();
 1036            }
 1037        }
 1038
 1039        _threadPool.executeFromThisThread(() => upcall(startCB, sentCBs, info), this);
 1040    }
 1041
 1042    private void upcall(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
 1043    {
 1044        int completedUpcallCount = 0;
 1045
 1046        //
 1047        // Notify the factory that the connection establishment and
 1048        // validation has completed.
 1049        //
 1050        if (startCB is not null)
 1051        {
 1052            startCB.connectionStartCompleted(this);
 1053            ++completedUpcallCount;
 1054        }
 1055
 1056        //
 1057        // Notify AMI calls that the message was sent.
 1058        //
 1059        if (sentCBs is not null)
 1060        {
 1061            foreach (OutgoingMessage m in sentCBs)
 1062            {
 1063                if (m.invokeSent)
 1064                {
 1065                    m.outAsync.invokeSent();
 1066                }
 1067                if (m.receivedReply)
 1068                {
 1069                    var outAsync = (OutgoingAsync)m.outAsync;
 1070                    if (outAsync.response())
 1071                    {
 1072                        outAsync.invokeResponse();
 1073                    }
 1074                }
 1075            }
 1076            ++completedUpcallCount;
 1077        }
 1078
 1079        //
 1080        // Asynchronous replies must be handled outside the thread
 1081        // synchronization, so that nested calls are possible.
 1082        //
 1083        if (info.outAsync is not null)
 1084        {
 1085            info.outAsync.invokeResponse();
 1086            ++completedUpcallCount;
 1087        }
 1088
 1089        //
 1090        // Method invocation (or multiple invocations for batch messages)
 1091        // must be done outside the thread synchronization, so that nested
 1092        // calls are possible.
 1093        //
 1094        if (info.requestCount > 0)
 1095        {
 1096            dispatchAll(info.stream, info.requestCount, info.requestId, info.compress, info.adapter);
 1097        }
 1098
 1099        //
 1100        // Decrease the upcall count.
 1101        //
 1102        bool finished = false;
 1103        if (completedUpcallCount > 0)
 1104        {
 1105            lock (_mutex)
 1106            {
 1107                _upcallCount -= completedUpcallCount;
 1108                if (_upcallCount == 0)
 1109                {
 1110                    // Only initiate shutdown if not already initiated. It might have already been initiated if the sent
 1111                    // callback or AMI callback was called when the connection was in the closing state.
 1112                    if (_state == StateClosing)
 1113                    {
 1114                        try
 1115                        {
 1116                            initiateShutdown();
 1117                        }
 1118                        catch (Ice.LocalException ex)
 1119                        {
 1120                            setState(StateClosed, ex);
 1121                        }
 1122                    }
 1123                    else if (_state == StateFinished)
 1124                    {
 1125                        finished = true;
 1126                        _observer?.detach();
 1127                    }
 1128                    Monitor.PulseAll(_mutex);
 1129                }
 1130            }
 1131        }
 1132
 1133        if (finished && _removeFromFactory is not null)
 1134        {
 1135            _removeFromFactory(this);
 1136        }
 1137    }
 1138
 1139    public override void finished(ThreadPoolCurrent current)
 1140    {
 1141        // Lock the connection here to ensure setState() completes before the code below is executed. This method can
 1142        // be called by the thread pool as soon as setState() calls _threadPool->finish(...). There's no need to lock
 1143        // the mutex for the remainder of the code because the data members accessed by finish() are immutable once
 1144        // _state == StateClosed (and we don't want to hold the mutex when calling upcalls).
 1145        lock (_mutex)
 1146        {
 1147            Debug.Assert(_state == StateClosed);
 1148        }
 1149
 1150        //
 1151        // If there are no callbacks to call, we don't call ioCompleted() since we're not going
 1152        // to call code that will potentially block (this avoids promoting a new leader and
 1153        // unnecessary thread creation, especially if this is called on shutdown).
 1154        //
 1155        if (_startCallback is null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback is null)
 1156        {
 1157            finish();
 1158            return;
 1159        }
 1160
 1161        current.ioCompleted();
 1162        _threadPool.executeFromThisThread(finish, this);
 1163    }
 1164
 1165    private void finish()
 1166    {
 1167        if (!_initialized)
 1168        {
 1169            if (_instance.traceLevels().network >= 2)
 1170            {
 1171                var s = new StringBuilder("failed to ");
 1172                s.Append(_connector is not null ? "establish" : "accept");
 1173                s.Append(' ');
 1174                s.Append(_endpoint.protocol());
 1175                s.Append(" connection\n");
 1176                s.Append(ToString());
 1177                s.Append('\n');
 1178                s.Append(_exception);
 1179                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1180            }
 1181        }
 1182        else
 1183        {
 1184            if (_instance.traceLevels().network >= 1)
 1185            {
 1186                var s = new StringBuilder("closed ");
 1187                s.Append(_endpoint.protocol());
 1188                s.Append(" connection\n");
 1189                s.Append(ToString());
 1190
 1191                // Trace the cause of most connection closures.
 1192                if (!(_exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException))
 1193                {
 1194                    s.Append('\n');
 1195                    s.Append(_exception);
 1196                }
 1197
 1198                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1199            }
 1200        }
 1201
 1202        _startCallback?.connectionStartFailed(this, _exception);
 1203        _startCallback = null;
 1204
 1205        if (_sendStreams.Count > 0)
 1206        {
 1207            if (!_writeStream.isEmpty())
 1208            {
 1209                //
 1210                // Return the stream to the outgoing call. This is important for
 1211                // retriable AMI calls which are not marshaled again.
 1212                //
 1213                OutgoingMessage message = _sendStreams.First.Value;
 1214                _writeStream.swap(message.stream);
 1215
 1216                //
 1217                // The current message might be sent but not yet removed from _sendStreams. If
 1218                // the response has been received in the meantime, we remove the message from
 1219                // _sendStreams to not call finished on a message which is already done.
 1220                //
 1221                if (message.isSent || message.receivedReply)
 1222                {
 1223                    if (message.sent() && message.invokeSent)
 1224                    {
 1225                        message.outAsync.invokeSent();
 1226                    }
 1227                    if (message.receivedReply)
 1228                    {
 1229                        var outAsync = (OutgoingAsync)message.outAsync;
 1230                        if (outAsync.response())
 1231                        {
 1232                            outAsync.invokeResponse();
 1233                        }
 1234                    }
 1235                    _sendStreams.RemoveFirst();
 1236                }
 1237            }
 1238
 1239            foreach (OutgoingMessage o in _sendStreams)
 1240            {
 1241                o.completed(_exception);
 1242                if (o.requestId > 0) // Make sure finished isn't called twice.
 1243                {
 1244                    _asyncRequests.Remove(o.requestId);
 1245                }
 1246            }
 1247            _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
 1248        }
 1249
 1250        foreach (OutgoingAsyncBase o in _asyncRequests.Values)
 1251        {
 1252            if (o.exception(_exception))
 1253            {
 1254                o.invokeException();
 1255            }
 1256        }
 1257        _asyncRequests.Clear();
 1258
 1259        //
 1260        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
 1261        //
 1262        _writeStream.clear();
 1263        _writeStream.getBuffer().clear();
 1264        _readStream.clear();
 1265        _readStream.getBuffer().clear();
 1266
 1267        if (_exception is ConnectionClosedException or
 1268            CloseConnectionException or
 1269            CommunicatorDestroyedException or
 1270            ObjectAdapterDeactivatedException)
 1271        {
 1272            // Can execute synchronously. Note that we're not within a lock(this) here.
 1273            _closed.SetResult();
 1274        }
 1275        else
 1276        {
 1277            Debug.Assert(_exception is not null);
 1278            _closed.SetException(_exception);
 1279        }
 1280
 1281        if (_closeCallback is not null)
 1282        {
 1283            try
 1284            {
 1285                _closeCallback(this);
 1286            }
 1287            catch (System.Exception ex)
 1288            {
 1289                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 1290            }
 1291            _closeCallback = null;
 1292        }
 1293
 1294        //
 1295        // This must be done last as this will cause waitUntilFinished() to return (and communicator
 1296        // objects such as the timer might be destroyed too).
 1297        //
 1298        bool finished = false;
 1299        lock (_mutex)
 1300        {
 1301            setState(StateFinished);
 1302
 1303            if (_upcallCount == 0)
 1304            {
 1305                finished = true;
 1306                _observer?.detach();
 1307            }
 1308        }
 1309
 1310        if (finished && _removeFromFactory is not null)
 1311        {
 1312            _removeFromFactory(this);
 1313        }
 1314    }
 1315
 1316    /// <inheritdoc/>
 1317    public override string ToString() => _desc; // No mutex lock, _desc is immutable.
 1318
 1319    /// <inheritdoc/>
 1320    public string type() => _type; // No mutex lock, _type is immutable.
 1321
 1322    /// <inheritdoc/>
 1323    public ConnectionInfo getInfo()
 1324    {
 1325        lock (_mutex)
 1326        {
 1327            if (_state >= StateClosed)
 1328            {
 1329                throw _exception;
 1330            }
 1331            return initConnectionInfo();
 1332        }
 1333    }
 1334
 1335    /// <inheritdoc/>
 1336    public void setBufferSize(int rcvSize, int sndSize)
 1337    {
 1338        lock (_mutex)
 1339        {
 1340            if (_state >= StateClosed)
 1341            {
 1342                throw _exception;
 1343            }
 1344            _transceiver.setBufferSize(rcvSize, sndSize);
 1345            _info = null; // Invalidate the cached connection info
 1346        }
 1347    }
 1348
 1349    public void exception(LocalException ex)
 1350    {
 1351        lock (_mutex)
 1352        {
 1353            setState(StateClosed, ex);
 1354        }
 1355    }
 1356
 1357    public Ice.Internal.ThreadPool getThreadPool() => _threadPool;
 1358
 1359    internal ConnectionI(
 1360        Instance instance,
 1361        Transceiver transceiver,
 1362        Connector connector, // null for incoming connections, non-null for outgoing connections
 1363        EndpointI endpoint,
 1364        ObjectAdapter adapter,
 1365        Action<ConnectionI> removeFromFactory, // can be null
 1366        ConnectionOptions options)
 1367    {
 1368        _instance = instance;
 1369        _desc = transceiver.ToString();
 1370        _type = transceiver.protocol();
 1371        _connector = connector;
 1372        _endpoint = endpoint;
 1373        _adapter = adapter;
 1374        InitializationData initData = instance.initializationData();
 1375        _logger = initData.logger; // Cached for better performance.
 1376        _traceLevels = instance.traceLevels(); // Cached for better performance.
 1377        _connectTimeout = options.connectTimeout;
 1378        _closeTimeout = options.closeTimeout; // not used for datagram connections
 1379        // suppress inactivity timeout for datagram connections
 1380        _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
 1381        _maxDispatches = options.maxDispatches;
 1382        _removeFromFactory = removeFromFactory;
 1383        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 1384        _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
 1385        _nextRequestId = 1;
 1386        _messageSizeMax = connector is null ? adapter.messageSizeMax() : instance.messageSizeMax();
 1387        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
 1388        _readStream = new InputStream(instance, Protocol.currentProtocolEncoding);
 1389        _readHeader = false;
 1390        _readStreamPos = -1;
 1391        _writeStream = new OutputStream(); // temporary stream
 1392        _writeStreamPos = -1;
 1393        _upcallCount = 0;
 1394        _state = StateNotInitialized;
 1395
 1396        _compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
 1397        if (_compressionLevel < 1)
 1398        {
 1399            _compressionLevel = 1;
 1400        }
 1401        else if (_compressionLevel > 9)
 1402        {
 1403            _compressionLevel = 9;
 1404        }
 1405
 1406        if (options.idleTimeout > TimeSpan.Zero && !endpoint.datagram())
 1407        {
 1408            _idleTimeoutTransceiver = new IdleTimeoutTransceiverDecorator(
 1409                transceiver,
 1410                this,
 1411                options.idleTimeout,
 1412                options.enableIdleCheck);
 1413            transceiver = _idleTimeoutTransceiver;
 1414        }
 1415        _transceiver = transceiver;
 1416
 1417        try
 1418        {
 1419            if (connector is null)
 1420            {
 1421                // adapter is always set for incoming connections
 1422                Debug.Assert(adapter is not null);
 1423                _threadPool = adapter.getThreadPool();
 1424            }
 1425            else
 1426            {
 1427                // we use the client thread pool for outgoing connections, even if there is an
 1428                // object adapter with its own thread pool.
 1429                _threadPool = instance.clientThreadPool();
 1430            }
 1431            // On Windows, async socket I/O initiated on a thread that subsequently terminates is cancelled by the OS
 1432            // with SocketError.OperationAborted. The Ice thread pool can reap workers idle past ThreadIdleTime when
 1433            // SizeMax > 1, so in that combination we hop the I/O onto the .NET ThreadPool (whose threads are managed
 1434            // by the runtime and not reaped while owning pending I/O). Other platforms and fixed-size Ice pools don't
 1435            // need the hop. See startAsync.
 1436            _threadHopRequired = AssemblyUtil.isWindows && _threadPool.canShrink;
 1437            _threadPool.initialize(this);
 1438        }
 1439        catch (LocalException)
 1440        {
 1441            throw;
 1442        }
 1443        catch (System.Exception ex)
 1444        {
 1445            throw new SyscallException(ex);
 1446        }
 1447    }
 1448
 1449    /// <summary>
 1450    /// Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active and
 1451    /// does not receive a byte for some time. See the IdleTimeoutTransceiverDecorator.
 1452    /// </summary>
 1453    internal void idleCheck(TimeSpan idleTimeout)
 1454    {
 1455        lock (_mutex)
 1456        {
 1457            if (_state == StateActive && _idleTimeoutTransceiver!.idleCheckEnabled)
 1458            {
 1459                int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;
 1460
 1461                setState(
 1462                    StateClosed,
 1463                    new ConnectionAbortedException(
 1464                        $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSe
 1465                        closedByApplication: false));
 1466            }
 1467            // else nothing to do
 1468        }
 1469    }
 1470
 1471    internal void sendHeartbeat()
 1472    {
 1473        Debug.Assert(!_endpoint.datagram());
 1474
 1475        lock (_mutex)
 1476        {
 1477            if (_state == StateActive || _state == StateHolding || _state == StateClosing)
 1478            {
 1479                // We check if the connection has become inactive.
 1480                if (
 1481                    _inactivityTimer is null &&           // timer not already scheduled
 1482                    _inactivityTimeout > TimeSpan.Zero && // inactivity timeout is enabled
 1483                    _state == StateActive &&              // only schedule the timer if the connection is active
 1484                    _dispatchCount == 0 &&                // no pending dispatch
 1485                    _asyncRequests.Count == 0 &&          // no pending invocation
 1486                    _readHeader &&                        // we're not waiting for the remainder of an incoming message
 1487                    _sendStreams.Count <= 1)              // there is at most one pending outgoing message
 1488                {
 1489                    // We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
 1490                    // inactivity timer if there is no pending outgoing message or the pending outgoing message is a
 1491                    // heartbeat.
 1492
 1493                    // The stream of the first _sendStreams message is in _writeStream.
 1494                    if (_sendStreams.Count == 0 || isHeartbeat(_writeStream))
 1495                    {
 1496                        scheduleInactivityTimer();
 1497                    }
 1498                }
 1499
 1500                // We send a heartbeat to the peer to generate a "write" on the connection. This write in turns creates
 1501                // a read on the peer, and resets the peer's idle check timer. When _sendStream is not empty, there is
 1502                // already an outstanding write, so we don't need to send a heartbeat. It's possible the first message
 1503                // of _sendStreams was already sent but not yet removed from _sendStreams: it means the last write
 1504                // occurred very recently, which is good enough with respect to the idle check.
 1505                // As a result of this optimization, the only possible heartbeat in _sendStreams is the first
 1506                // _sendStreams message.
 1507                if (_sendStreams.Count == 0)
 1508                {
 1509                    var os = new OutputStream(Protocol.currentProtocolEncoding);
 1510                    os.writeBlob(Protocol.magic);
 1511                    ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 1512                    EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 1513                    os.writeByte(Protocol.validateConnectionMsg);
 1514                    os.writeByte(0);
 1515                    os.writeInt(Protocol.headerSize); // Message size.
 1516                    try
 1517                    {
 1518                        _ = sendMessage(new OutgoingMessage(os, compress: false));
 1519                    }
 1520                    catch (LocalException ex)
 1521                    {
 1522                        setState(StateClosed, ex);
 1523                    }
 1524                }
 1525            }
 1526            // else nothing to do
 1527        }
 1528
 1529        static bool isHeartbeat(OutputStream stream) =>
 1530            stream.getBuffer().b.get(8) == Protocol.validateConnectionMsg;
 1531    }
 1532
 1533    private const int StateNotInitialized = 0;
 1534    private const int StateNotValidated = 1;
 1535    private const int StateActive = 2;
 1536    private const int StateHolding = 3;
 1537    private const int StateClosing = 4;
 1538    private const int StateClosingPending = 5;
 1539    private const int StateClosed = 6;
 1540    private const int StateFinished = 7;
 1541
 1542    private static ConnectionState toConnectionState(int state) => connectionStateMap[state];
 1543
 1544    private void setState(int state, LocalException ex)
 1545    {
 1546        //
 1547        // If setState() is called with an exception, then only closed
 1548        // and closing states are permissible.
 1549        //
 1550        Debug.Assert(state >= StateClosing);
 1551
 1552        if (_state == state) // Don't switch twice.
 1553        {
 1554            return;
 1555        }
 1556
 1557        if (_exception is null)
 1558        {
 1559            //
 1560            // If we are in closed state, an exception must be set.
 1561            //
 1562            Debug.Assert(_state != StateClosed);
 1563
 1564            _exception = ex;
 1565
 1566            //
 1567            // We don't warn if we are not validated.
 1568            //
 1569            if (_warn && _validated)
 1570            {
 1571                //
 1572                // Don't warn about certain expected exceptions.
 1573                //
 1574                if (!(_exception is CloseConnectionException ||
 1575                     _exception is ConnectionClosedException ||
 1576                     _exception is CommunicatorDestroyedException ||
 1577                     _exception is ObjectAdapterDeactivatedException ||
 1578                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1579                {
 1580                    warning("connection exception", _exception);
 1581                }
 1582            }
 1583        }
 1584
 1585        //
 1586        // We must set the new state before we notify requests of any
 1587        // exceptions. Otherwise new requests may retry on a
 1588        // connection that is not yet marked as closed or closing.
 1589        //
 1590        setState(state);
 1591    }
 1592
 1593    private void setState(int state)
 1594    {
 1595        //
 1596        // We don't want to send close connection messages if the endpoint
 1597        // only supports oneway transmission from client to server.
 1598        //
 1599        if (_endpoint.datagram() && state == StateClosing)
 1600        {
 1601            state = StateClosed;
 1602        }
 1603
 1604        //
 1605        // Skip graceful shutdown if we are destroyed before validation.
 1606        //
 1607        if (_state <= StateNotValidated && state == StateClosing)
 1608        {
 1609            state = StateClosed;
 1610        }
 1611
 1612        if (_state == state) // Don't switch twice.
 1613        {
 1614            return;
 1615        }
 1616
 1617        if (state > StateActive)
 1618        {
 1619            // Dispose the inactivity timer, if not null.
 1620            cancelInactivityTimer();
 1621        }
 1622
 1623        try
 1624        {
 1625            switch (state)
 1626            {
 1627                case StateNotInitialized:
 1628                {
 1629                    Debug.Assert(false);
 1630                    break;
 1631                }
 1632
 1633                case StateNotValidated:
 1634                {
 1635                    if (_state != StateNotInitialized)
 1636                    {
 1637                        Debug.Assert(_state == StateClosed);
 1638                        return;
 1639                    }
 1640                    break;
 1641                }
 1642
 1643                case StateActive:
 1644                {
 1645                    //
 1646                    // Can only switch to active from holding or not validated.
 1647                    //
 1648                    if (_state != StateHolding && _state != StateNotValidated)
 1649                    {
 1650                        return;
 1651                    }
 1652
 1653                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)
 1654                    {
 1655                        _threadPool.register(this, SocketOperation.Read);
 1656                        _idleTimeoutTransceiver?.enableIdleCheck();
 1657                    }
 1658                    // else don't resume reading since we're at or over the _maxDispatches limit.
 1659
 1660                    break;
 1661                }
 1662
 1663                case StateHolding:
 1664                {
 1665                    //
 1666                    // Can only switch to holding from active or not validated.
 1667                    //
 1668                    if (_state != StateActive && _state != StateNotValidated)
 1669                    {
 1670                        return;
 1671                    }
 1672
 1673                    if (_state == StateActive && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches))
 1674                    {
 1675                        _threadPool.unregister(this, SocketOperation.Read);
 1676                        _idleTimeoutTransceiver?.disableIdleCheck();
 1677                    }
 1678                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
 1679
 1680                    break;
 1681                }
 1682
 1683                case StateClosing:
 1684                case StateClosingPending:
 1685                {
 1686                    //
 1687                    // Can't change back from closing pending.
 1688                    //
 1689                    if (_state >= StateClosingPending)
 1690                    {
 1691                        return;
 1692                    }
 1693                    break;
 1694                }
 1695
 1696                case StateClosed:
 1697                {
 1698                    if (_state == StateFinished)
 1699                    {
 1700                        return;
 1701                    }
 1702
 1703                    _batchRequestQueue.destroy(_exception);
 1704                    _threadPool.finish(this);
 1705                    _transceiver.close();
 1706                    break;
 1707                }
 1708
 1709                case StateFinished:
 1710                {
 1711                    Debug.Assert(_state == StateClosed);
 1712                    _transceiver.destroy();
 1713                    break;
 1714                }
 1715            }
 1716        }
 1717        catch (LocalException ex)
 1718        {
 1719            _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
 1720        }
 1721
 1722        if (_instance.initializationData().observer is not null)
 1723        {
 1724            ConnectionState oldState = toConnectionState(_state);
 1725            ConnectionState newState = toConnectionState(state);
 1726            if (oldState != newState)
 1727            {
 1728                _observer = _instance.initializationData().observer.getConnectionObserver(
 1729                    initConnectionInfo(),
 1730                    _endpoint,
 1731                    newState,
 1732                    _observer);
 1733                if (_observer is not null)
 1734                {
 1735                    _observer.attach();
 1736                }
 1737                else
 1738                {
 1739                    _writeStreamPos = -1;
 1740                    _readStreamPos = -1;
 1741                }
 1742            }
 1743            if (_observer is not null && state == StateClosed && _exception is not null)
 1744            {
 1745                if (!(_exception is CloseConnectionException ||
 1746                     _exception is ConnectionClosedException ||
 1747                     _exception is CommunicatorDestroyedException ||
 1748                     _exception is ObjectAdapterDeactivatedException ||
 1749                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1750                {
 1751                    _observer.failed(_exception.ice_id());
 1752                }
 1753            }
 1754        }
 1755        _state = state;
 1756
 1757        Monitor.PulseAll(_mutex);
 1758
 1759        if (_state == StateClosing && _upcallCount == 0)
 1760        {
 1761            try
 1762            {
 1763                initiateShutdown();
 1764            }
 1765            catch (LocalException ex)
 1766            {
 1767                setState(StateClosed, ex);
 1768            }
 1769        }
 1770    }
 1771
 1772    private void initiateShutdown()
 1773    {
 1774        Debug.Assert(_state == StateClosing && _upcallCount == 0);
 1775
 1776        if (_shutdownInitiated)
 1777        {
 1778            return;
 1779        }
 1780        _shutdownInitiated = true;
 1781
 1782        if (!_endpoint.datagram())
 1783        {
 1784            //
 1785            // Before we shut down, we send a close connection message.
 1786            //
 1787            var os = new OutputStream(Protocol.currentProtocolEncoding);
 1788            os.writeBlob(Protocol.magic);
 1789            ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 1790            EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 1791            os.writeByte(Protocol.closeConnectionMsg);
 1792            os.writeByte(0); // Compression status: always zero for close connection.
 1793            os.writeInt(Protocol.headerSize); // Message size.
 1794
 1795            scheduleCloseTimer();
 1796
 1797            if ((sendMessage(new OutgoingMessage(os, compress: false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
 1798            {
 1799                setState(StateClosingPending);
 1800
 1801                //
 1802                // Notify the transceiver of the graceful connection closure.
 1803                //
 1804                int op = _transceiver.closing(true, _exception);
 1805                if (op != 0)
 1806                {
 1807                    _threadPool.register(this, op);
 1808                }
 1809            }
 1810        }
 1811    }
 1812
 1813    private bool initialize(int operation)
 1814    {
 1815        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
 1816        if (s != SocketOperation.None)
 1817        {
 1818            _threadPool.update(this, operation, s);
 1819            return false;
 1820        }
 1821
 1822        //
 1823        // Update the connection description once the transceiver is initialized.
 1824        //
 1825        _desc = _transceiver.ToString();
 1826        _initialized = true;
 1827        setState(StateNotValidated);
 1828
 1829        return true;
 1830    }
 1831
 1832    private bool validate(int operation)
 1833    {
 1834        if (!_endpoint.datagram()) // Datagram connections are always implicitly validated.
 1835        {
 1836            if (_connector is null) // The server side has the active role for connection validation.
 1837            {
 1838                if (_writeStream.size() == 0)
 1839                {
 1840                    _writeStream.writeBlob(Protocol.magic);
 1841                    ProtocolVersion.ice_write(_writeStream, Protocol.currentProtocol);
 1842                    EncodingVersion.ice_write(_writeStream, Protocol.currentProtocolEncoding);
 1843                    _writeStream.writeByte(Protocol.validateConnectionMsg);
 1844                    _writeStream.writeByte(0); // Compression status (always zero for validate connection).
 1845                    _writeStream.writeInt(Protocol.headerSize); // Message size.
 1846                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
 1847                    _writeStream.prepareWrite();
 1848                }
 1849
 1850                if (_observer is not null)
 1851                {
 1852                    observerStartWrite(_writeStream.getBuffer());
 1853                }
 1854
 1855                if (_writeStream.pos() != _writeStream.size())
 1856                {
 1857                    int op = write(_writeStream.getBuffer());
 1858                    if (op != 0)
 1859                    {
 1860                        _threadPool.update(this, operation, op);
 1861                        return false;
 1862                    }
 1863                }
 1864
 1865                if (_observer is not null)
 1866                {
 1867                    observerFinishWrite(_writeStream.getBuffer());
 1868                }
 1869            }
 1870            else // The client side has the passive role for connection validation.
 1871            {
 1872                if (_readStream.size() == 0)
 1873                {
 1874                    _readStream.resize(Protocol.headerSize);
 1875                    _readStream.pos(0);
 1876                }
 1877
 1878                if (_observer is not null)
 1879                {
 1880                    observerStartRead(_readStream.getBuffer());
 1881                }
 1882
 1883                if (_readStream.pos() != _readStream.size())
 1884                {
 1885                    int op = read(_readStream.getBuffer());
 1886                    if (op != 0)
 1887                    {
 1888                        _threadPool.update(this, operation, op);
 1889                        return false;
 1890                    }
 1891                }
 1892
 1893                if (_observer is not null)
 1894                {
 1895                    observerFinishRead(_readStream.getBuffer());
 1896                }
 1897
 1898                _validated = true;
 1899
 1900                Debug.Assert(_readStream.pos() == Protocol.headerSize);
 1901                _readStream.pos(0);
 1902                byte[] m = _readStream.readBlob(4);
 1903                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 1904                   m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 1905                {
 1906                    throw new ProtocolException(
 1907                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 1908                }
 1909
 1910                var pv = new ProtocolVersion(_readStream);
 1911                if (pv != Protocol.currentProtocol)
 1912                {
 1913                    throw new MarshalException(
 1914                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 1915                }
 1916                var ev = new EncodingVersion(_readStream);
 1917                if (ev != Protocol.currentProtocolEncoding)
 1918                {
 1919                    throw new MarshalException(
 1920                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 1921                }
 1922
 1923                byte messageType = _readStream.readByte();
 1924                if (messageType != Protocol.validateConnectionMsg)
 1925                {
 1926                    throw new ProtocolException(
 1927                        $"Received message of type {messageType} over a connection that is not yet validated.");
 1928                }
 1929                _readStream.readByte(); // Ignore compression status for validate connection.
 1930                int size = _readStream.readInt();
 1931                if (size != Protocol.headerSize)
 1932                {
 1933                    throw new MarshalException($"Received ValidateConnection message with unexpected size {size}.");
 1934                }
 1935                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);
 1936
 1937                // Client connection starts sending heartbeats once it's received the ValidateConnection message.
 1938                _idleTimeoutTransceiver?.scheduleHeartbeat();
 1939            }
 1940        }
 1941
 1942        _writeStream.resize(0);
 1943        _writeStream.pos(0);
 1944
 1945        _readStream.resize(Protocol.headerSize);
 1946        _readStream.pos(0);
 1947        _readHeader = true;
 1948
 1949        if (_instance.traceLevels().network >= 1)
 1950        {
 1951            var s = new StringBuilder();
 1952            if (_endpoint.datagram())
 1953            {
 1954                s.Append("starting to ");
 1955                s.Append(_connector is not null ? "send" : "receive");
 1956                s.Append(' ');
 1957                s.Append(_endpoint.protocol());
 1958                s.Append(" messages\n");
 1959                s.Append(_transceiver.toDetailedString());
 1960            }
 1961            else
 1962            {
 1963                s.Append(_connector is not null ? "established" : "accepted");
 1964                s.Append(' ');
 1965                s.Append(_endpoint.protocol());
 1966                s.Append(" connection\n");
 1967                s.Append(ToString());
 1968            }
 1969            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1970        }
 1971
 1972        return true;
 1973    }
 1974
 1975    /// <summary>
 1976    /// Sends the next queued messages. This method is called by message() once the message which is being sent
 1977    /// (_sendStreams.First) is fully sent. Before sending the next message, this message is removed from _sendsStream
 1978    /// If any, its sent callback is also queued in given callback queue.
 1979    /// </summary>
 1980    /// <param name="callbacks">The sent callbacks to call for the messages that were sent.</param>
 1981    /// <returns>The socket operation to register with the thread pool's selector to send the remainder of the pending
 1982    /// message being sent (_sendStreams.First).</returns>
 1983    private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
 1984    {
 1985        callbacks = null;
 1986
 1987        if (_sendStreams.Count == 0)
 1988        {
 1989            // This can occur if no message was being written and the socket write operation was registered with the
 1990            // thread pool (a transceiver read method can request writing data).
 1991            return SocketOperation.None;
 1992        }
 1993        else if (_state == StateClosingPending && _writeStream.pos() == 0)
 1994        {
 1995            // Message wasn't sent, empty the _writeStream, we're not going to send more data because the connection
 1996            // is being closed.
 1997            OutgoingMessage message = _sendStreams.First.Value;
 1998            _writeStream.swap(message.stream);
 1999            return SocketOperation.None;
 2000        }
 2001
 2002        // Assert that the message was fully written.
 2003        Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
 2004
 2005        try
 2006        {
 2007            while (true)
 2008            {
 2009                //
 2010                // The message that was being sent is sent. We can swap back the write stream buffer to the
 2011                // outgoing message (required for retry) and queue its sent callback (if any).
 2012                //
 2013                OutgoingMessage message = _sendStreams.First.Value;
 2014                _writeStream.swap(message.stream);
 2015                if (message.sent())
 2016                {
 2017                    callbacks ??= new Queue<OutgoingMessage>();
 2018                    callbacks.Enqueue(message);
 2019                }
 2020                _sendStreams.RemoveFirst();
 2021
 2022                //
 2023                // If there's nothing left to send, we're done.
 2024                //
 2025                if (_sendStreams.Count == 0)
 2026                {
 2027                    break;
 2028                }
 2029
 2030                //
 2031                // If we are in the closed state or if the close is pending, don't continue sending. This can occur if
 2032                // parseMessage (called before sendNextMessage by message()) closes the connection.
 2033                //
 2034                if (_state >= StateClosingPending)
 2035                {
 2036                    return SocketOperation.None;
 2037                }
 2038
 2039                //
 2040                // Otherwise, prepare the next message.
 2041                //
 2042                message = _sendStreams.First.Value;
 2043                Debug.Assert(!message.prepared);
 2044                OutputStream stream = message.stream;
 2045
 2046                message.stream = doCompress(message.stream, message.compress);
 2047                message.stream.prepareWrite();
 2048                message.prepared = true;
 2049
 2050                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2051
 2052                //
 2053                // Send the message.
 2054                //
 2055                _writeStream.swap(message.stream);
 2056                if (_observer is not null)
 2057                {
 2058                    observerStartWrite(_writeStream.getBuffer());
 2059                }
 2060                if (_writeStream.pos() != _writeStream.size())
 2061                {
 2062                    int op = write(_writeStream.getBuffer());
 2063                    if (op != 0)
 2064                    {
 2065                        return op;
 2066                    }
 2067                }
 2068                if (_observer is not null)
 2069                {
 2070                    observerFinishWrite(_writeStream.getBuffer());
 2071                }
 2072
 2073                // If the message was sent right away, loop to send the next queued message.
 2074            }
 2075
 2076            // Once the CloseConnection message is sent, we transition to the StateClosingPending state.
 2077            if (_state == StateClosing && _shutdownInitiated)
 2078            {
 2079                setState(StateClosingPending);
 2080                int op = _transceiver.closing(true, _exception);
 2081                if (op != 0)
 2082                {
 2083                    return op;
 2084                }
 2085            }
 2086        }
 2087        catch (LocalException ex)
 2088        {
 2089            setState(StateClosed, ex);
 2090        }
 2091        return SocketOperation.None;
 2092    }
 2093
 2094    /// <summary>
 2095    /// Sends or queues the given message.
 2096    /// </summary>
 2097    /// <param name="message">The message to send.</param>
 2098    /// <returns>The send status.</returns>
 2099    private int sendMessage(OutgoingMessage message)
 2100    {
 2101        Debug.Assert(_state >= StateActive);
 2102        Debug.Assert(_state < StateClosed);
 2103
 2104        // Some messages are queued for sending. Just adds the message to the send queue and tell the caller that
 2105        // the message was queued.
 2106        if (_sendStreams.Count > 0)
 2107        {
 2108            _sendStreams.AddLast(message);
 2109            return OutgoingAsyncBase.AsyncStatusQueued;
 2110        }
 2111
 2112        // Prepare the message for sending.
 2113        Debug.Assert(!message.prepared);
 2114
 2115        OutputStream stream = message.stream;
 2116
 2117        message.stream = doCompress(stream, message.compress);
 2118        message.stream.prepareWrite();
 2119        message.prepared = true;
 2120
 2121        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2122
 2123        // Send the message without blocking.
 2124        if (_observer is not null)
 2125        {
 2126            observerStartWrite(message.stream.getBuffer());
 2127        }
 2128        int op = write(message.stream.getBuffer());
 2129        if (op == 0)
 2130        {
 2131            // The message was sent so we're done.
 2132
 2133            if (_observer is not null)
 2134            {
 2135                observerFinishWrite(message.stream.getBuffer());
 2136            }
 2137
 2138            int status = OutgoingAsyncBase.AsyncStatusSent;
 2139            if (message.sent())
 2140            {
 2141                // If there's a sent callback, indicate the caller that it should invoke the sent callback.
 2142                status |= OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
 2143            }
 2144
 2145            return status;
 2146        }
 2147
 2148        // The message couldn't be sent right away so we add it to the send stream queue (which is empty) and swap its
 2149        // stream with `_writeStream`. The socket operation returned by the transceiver write is registered with the
 2150        // thread pool. At this point the message() method will take care of sending the whole message (held by
 2151        // _writeStream) when the transceiver is ready to write more of the message buffer.
 2152
 2153        _writeStream.swap(message.stream);
 2154        _sendStreams.AddLast(message);
 2155        _threadPool.register(this, op);
 2156        return OutgoingAsyncBase.AsyncStatusQueued;
 2157    }
 2158
 2159    private OutputStream doCompress(OutputStream decompressed, bool compress)
 2160    {
 2161        if (BZip2.isLoaded(_logger) && compress && decompressed.size() >= 100)
 2162        {
 2163            //
 2164            // Do compression.
 2165            //
 2166            Ice.Internal.Buffer cbuf = BZip2.compress(
 2167                decompressed.getBuffer(),
 2168                Protocol.headerSize,
 2169                _compressionLevel);
 2170            if (cbuf is not null)
 2171            {
 2172                var cstream = new OutputStream(new Internal.Buffer(cbuf, true), decompressed.getEncoding());
 2173
 2174                //
 2175                // Set compression status.
 2176                //
 2177                cstream.pos(9);
 2178                cstream.writeByte(2);
 2179
 2180                //
 2181                // Write the size of the compressed stream into the header.
 2182                //
 2183                cstream.pos(10);
 2184                cstream.writeInt(cstream.size());
 2185
 2186                //
 2187                // Write the compression status and size of the compressed stream into the header of the
 2188                // decompressed stream -- we need this to trace requests correctly.
 2189                //
 2190                decompressed.pos(9);
 2191                decompressed.writeByte(2);
 2192                decompressed.writeInt(cstream.size());
 2193
 2194                return cstream;
 2195            }
 2196        }
 2197
 2198        // Write the compression status. If BZip2 is loaded and compress is set to true, we write 1, to request a
 2199        // compressed reply. Otherwise, we write 0 either BZip2 is not loaded or we are sending an uncompressed reply.
 2200        decompressed.pos(9);
 2201        decompressed.writeByte((byte)((BZip2.isLoaded(_logger) && compress) ? 1 : 0));
 2202
 2203        //
 2204        // Not compressed, fill in the message size.
 2205        //
 2206        decompressed.pos(10);
 2207        decompressed.writeInt(decompressed.size());
 2208
 2209        return decompressed;
 2210    }
 2211
 2212    private struct MessageInfo
 2213    {
 2214        public InputStream stream;
 2215        public int requestCount;
 2216        public int requestId;
 2217        public byte compress;
 2218        public ObjectAdapter adapter;
 2219        public OutgoingAsyncBase outAsync;
 2220        public int upcallCount;
 2221    }
 2222
 2223    private int parseMessage(ref MessageInfo info)
 2224    {
 2225        Debug.Assert(_state > StateNotValidated && _state < StateClosed);
 2226
 2227        info.stream = new InputStream(_instance, Protocol.currentProtocolEncoding);
 2228        _readStream.swap(info.stream);
 2229        _readStream.resize(Protocol.headerSize);
 2230        _readStream.pos(0);
 2231        _readHeader = true;
 2232
 2233        Debug.Assert(info.stream.pos() == info.stream.size());
 2234
 2235        try
 2236        {
 2237            //
 2238            // The magic and version fields have already been checked.
 2239            //
 2240            info.stream.pos(8);
 2241            byte messageType = info.stream.readByte();
 2242            info.compress = info.stream.readByte();
 2243            if (info.compress == 2)
 2244            {
 2245                if (BZip2.isLoaded(_logger))
 2246                {
 2247                    Ice.Internal.Buffer ubuf = BZip2.decompress(
 2248                        info.stream.getBuffer(),
 2249                        Protocol.headerSize,
 2250                        _messageSizeMax);
 2251                    info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
 2252                }
 2253                else
 2254                {
 2255                    throw new FeatureNotSupportedException(
 2256                        "Cannot decompress compressed message: BZip2 library is not loaded.");
 2257                }
 2258            }
 2259            info.stream.pos(Protocol.headerSize);
 2260
 2261            switch (messageType)
 2262            {
 2263                case Protocol.closeConnectionMsg:
 2264                {
 2265                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2266                    if (_endpoint.datagram())
 2267                    {
 2268                        if (_warn)
 2269                        {
 2270                            _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
 2271                        }
 2272                    }
 2273                    else
 2274                    {
 2275                        setState(StateClosingPending, new CloseConnectionException());
 2276
 2277                        //
 2278                        // Notify the transceiver of the graceful connection closure.
 2279                        //
 2280                        int op = _transceiver.closing(false, _exception);
 2281                        if (op != 0)
 2282                        {
 2283                            scheduleCloseTimer();
 2284                            return op;
 2285                        }
 2286                        setState(StateClosed);
 2287                    }
 2288                    break;
 2289                }
 2290
 2291                case Protocol.requestMsg:
 2292                {
 2293                    if (_state >= StateClosing)
 2294                    {
 2295                        TraceUtil.trace(
 2296                            "received request during closing\n(ignored by server, client will retry)",
 2297                            info.stream,
 2298                            this,
 2299                            _logger,
 2300                            _traceLevels);
 2301                    }
 2302                    else
 2303                    {
 2304                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2305                        info.requestId = info.stream.readInt();
 2306                        info.requestCount = 1;
 2307                        info.adapter = _adapter;
 2308                        ++info.upcallCount;
 2309
 2310                        cancelInactivityTimer();
 2311                        ++_dispatchCount;
 2312                    }
 2313                    break;
 2314                }
 2315
 2316                case Protocol.requestBatchMsg:
 2317                {
 2318                    if (_state >= StateClosing)
 2319                    {
 2320                        TraceUtil.trace(
 2321                            "received batch request during closing\n(ignored by server, client will retry)",
 2322                            info.stream,
 2323                            this,
 2324                            _logger,
 2325                            _traceLevels);
 2326                    }
 2327                    else
 2328                    {
 2329                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2330                        int requestCount = info.stream.readInt();
 2331                        if (requestCount < 0)
 2332                        {
 2333                            throw new MarshalException($"Received batch request with {requestCount} batches.");
 2334                        }
 2335                        info.requestCount = requestCount;
 2336                        info.adapter = _adapter;
 2337                        info.upcallCount += info.requestCount;
 2338
 2339                        cancelInactivityTimer();
 2340                        _dispatchCount += info.requestCount;
 2341                    }
 2342                    break;
 2343                }
 2344
 2345                case Protocol.replyMsg:
 2346                {
 2347                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2348                    info.requestId = info.stream.readInt();
 2349                    if (_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
 2350                    {
 2351                        _asyncRequests.Remove(info.requestId);
 2352
 2353                        info.outAsync.getIs().swap(info.stream);
 2354
 2355                        //
 2356                        // If we just received the reply for a request which isn't acknowledge as
 2357                        // sent yet, we queue the reply instead of processing it right away. It
 2358                        // will be processed once the write callback is invoked for the message.
 2359                        //
 2360                        OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
 2361                        if (message is not null && message.outAsync == info.outAsync)
 2362                        {
 2363                            message.receivedReply = true;
 2364                        }
 2365                        else if (info.outAsync.response())
 2366                        {
 2367                            ++info.upcallCount;
 2368                        }
 2369                        else
 2370                        {
 2371                            info.outAsync = null;
 2372                        }
 2373                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 2374                        {
 2375                            doApplicationClose();
 2376                        }
 2377                    }
 2378                    break;
 2379                }
 2380
 2381                case Protocol.validateConnectionMsg:
 2382                {
 2383                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 2384                    break;
 2385                }
 2386
 2387                default:
 2388                {
 2389                    TraceUtil.trace(
 2390                        "received unknown message\n(invalid, closing connection)",
 2391                        info.stream,
 2392                        this,
 2393                        _logger,
 2394                        _traceLevels);
 2395
 2396                    throw new ProtocolException($"Received Ice protocol message with unknown type: {messageType}");
 2397                }
 2398            }
 2399        }
 2400        catch (LocalException ex)
 2401        {
 2402            if (_endpoint.datagram())
 2403            {
 2404                if (_warn)
 2405                {
 2406                    _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
 2407                }
 2408            }
 2409            else
 2410            {
 2411                setState(StateClosed, ex);
 2412            }
 2413        }
 2414
 2415        if (_state == StateHolding)
 2416        {
 2417            // Don't continue reading if the connection is in the holding state.
 2418            return SocketOperation.None;
 2419        }
 2420        else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
 2421        {
 2422            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
 2423            _idleTimeoutTransceiver?.disableIdleCheck();
 2424            return SocketOperation.None;
 2425        }
 2426        else
 2427        {
 2428            // Continue reading.
 2429            return SocketOperation.Read;
 2430        }
 2431    }
 2432
 2433    private void dispatchAll(
 2434        InputStream stream,
 2435        int requestCount,
 2436        int requestId,
 2437        byte compress,
 2438        ObjectAdapter adapter)
 2439    {
 2440        // Note: In contrast to other private or protected methods, this method must be called *without* the mutex
 2441        // locked.
 2442
 2443        Object dispatcher = adapter?.dispatchPipeline;
 2444
 2445        try
 2446        {
 2447            while (requestCount > 0)
 2448            {
 2449                // adapter can be null here, however the adapter set in current can't be null, and we never pass
 2450                // a null current.adapter to the application code. Once this file enables nullable, adapter should be
 2451                // adapter! below.
 2452                var request = new IncomingRequest(requestId, this, adapter, stream);
 2453
 2454                if (dispatcher is not null)
 2455                {
 2456                    // We don't and can't await the dispatchAsync: with batch requests, we want all the dispatches to
 2457                    // execute in the current Ice thread pool thread. If we awaited the dispatchAsync, we could
 2458                    // switch to a .NET thread pool thread.
 2459                    _ = dispatchAsync(request);
 2460                }
 2461                else
 2462                {
 2463                    // Received request on a connection without an object adapter.
 2464                    sendResponse(
 2465                        request.current.createOutgoingResponse(new ObjectNotExistException()),
 2466                        isTwoWay: !_endpoint.datagram() && requestId != 0,
 2467                        compress: 0);
 2468                }
 2469                --requestCount;
 2470            }
 2471
 2472            stream.clear();
 2473        }
 2474        catch (LocalException ex) // TODO: catch all exceptions
 2475        {
 2476            // Typically, the IncomingRequest constructor throws an exception, and we can't continue.
 2477            dispatchException(ex, requestCount);
 2478        }
 2479
 2480        async Task dispatchAsync(IncomingRequest request)
 2481        {
 2482            try
 2483            {
 2484                OutgoingResponse response;
 2485
 2486                try
 2487                {
 2488                    response = await dispatcher.dispatchAsync(request).ConfigureAwait(false);
 2489                }
 2490                catch (System.Exception ex)
 2491                {
 2492                    response = request.current.createOutgoingResponse(ex);
 2493                }
 2494
 2495                sendResponse(response, isTwoWay: !_endpoint.datagram() && requestId != 0, compress);
 2496            }
 2497            catch (LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 2498            {
 2499                dispatchException(ex, requestCount: 1);
 2500            }
 2501        }
 2502    }
 2503
 2504    private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compress)
 2505    {
 2506        bool finished = false;
 2507        try
 2508        {
 2509            lock (_mutex)
 2510            {
 2511                Debug.Assert(_state > StateNotValidated);
 2512
 2513                try
 2514                {
 2515                    if (--_upcallCount == 0)
 2516                    {
 2517                        if (_state == StateFinished)
 2518                        {
 2519                            finished = true;
 2520                            _observer?.detach();
 2521                        }
 2522                        Monitor.PulseAll(_mutex);
 2523                    }
 2524
 2525                    if (_state >= StateClosed)
 2526                    {
 2527                        Debug.Assert(_exception is not null);
 2528                        throw _exception;
 2529                    }
 2530
 2531                    if (isTwoWay)
 2532                    {
 2533                        sendMessage(new OutgoingMessage(response.outputStream, compress > 0));
 2534                    }
 2535
 2536                    if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
 2537                    {
 2538                        // Resume reading if the connection is active and the dispatch count is about to be less than
 2539                        // _maxDispatches.
 2540                        _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
 2541                        _idleTimeoutTransceiver?.enableIdleCheck();
 2542                    }
 2543
 2544                    --_dispatchCount;
 2545
 2546                    if (_state == StateClosing && _upcallCount == 0)
 2547                    {
 2548                        initiateShutdown();
 2549                    }
 2550                }
 2551                catch (LocalException ex)
 2552                {
 2553                    setState(StateClosed, ex);
 2554                }
 2555            }
 2556        }
 2557        finally
 2558        {
 2559            if (finished && _removeFromFactory is not null)
 2560            {
 2561                _removeFromFactory(this);
 2562            }
 2563        }
 2564    }
 2565
 2566    private void dispatchException(LocalException ex, int requestCount)
 2567    {
 2568        bool finished = false;
 2569
 2570        // Fatal exception while dispatching a request. Since sendResponse isn't called in case of a fatal exception
 2571        // we decrement _upcallCount here.
 2572        lock (_mutex)
 2573        {
 2574            setState(StateClosed, ex);
 2575
 2576            if (requestCount > 0)
 2577            {
 2578                Debug.Assert(_upcallCount >= requestCount);
 2579                _upcallCount -= requestCount;
 2580                if (_upcallCount == 0)
 2581                {
 2582                    if (_state == StateFinished)
 2583                    {
 2584                        finished = true;
 2585                        _observer?.detach();
 2586                    }
 2587                    Monitor.PulseAll(_mutex);
 2588                }
 2589            }
 2590        }
 2591
 2592        if (finished && _removeFromFactory is not null)
 2593        {
 2594            _removeFromFactory(this);
 2595        }
 2596    }
 2597
 2598    private void inactivityCheck(System.Threading.Timer inactivityTimer)
 2599    {
 2600        lock (_mutex)
 2601        {
 2602            // If the timers are different, it means this inactivityTimer is no longer current.
 2603            if (inactivityTimer == _inactivityTimer)
 2604            {
 2605                _inactivityTimer = null;
 2606                inactivityTimer.Dispose(); // non-blocking
 2607
 2608                if (_state == StateActive)
 2609                {
 2610                    setState(
 2611                        StateClosing,
 2612                        new ConnectionClosedException(
 2613                            "Connection closed because it remained inactive for longer than the inactivity timeout.",
 2614                            closedByApplication: false));
 2615                }
 2616            }
 2617            // Else this timer was already canceled and disposed. Nothing to do.
 2618        }
 2619    }
 2620
 2621    private void connectTimedOut(System.Threading.Timer connectTimer)
 2622    {
 2623        lock (_mutex)
 2624        {
 2625            if (_state < StateActive)
 2626            {
 2627                setState(StateClosed, new ConnectTimeoutException());
 2628            }
 2629        }
 2630        // else ignore since we're already connected.
 2631        connectTimer.Dispose();
 2632    }
 2633
 2634    private void closeTimedOut(System.Threading.Timer closeTimer)
 2635    {
 2636        lock (_mutex)
 2637        {
 2638            if (_state < StateClosed)
 2639            {
 2640                // We don't use setState(state, exception) because we want to overwrite the exception set by a
 2641                // graceful closure.
 2642                _exception = new CloseTimeoutException();
 2643                setState(StateClosed);
 2644            }
 2645        }
 2646        // else ignore since we're already closed.
 2647        closeTimer.Dispose();
 2648    }
 2649
 2650    private ConnectionInfo initConnectionInfo()
 2651    {
 2652        // Called with _mutex locked.
 2653
 2654        if (_state > StateNotInitialized && _info is not null) // Update the connection info until it's initialized
 2655        {
 2656            return _info;
 2657        }
 2658
 2659        _info =
 2660            _transceiver.getInfo(incoming: _connector is null, _adapter?.getName() ?? "", _endpoint.connectionId());
 2661        return _info;
 2662    }
 2663
 2664    private void warning(string msg, System.Exception ex) => _logger.warning($"{msg}:\n{ex}\n{_transceiver}");
 2665
 2666    private void observerStartRead(Ice.Internal.Buffer buf)
 2667    {
 2668        if (_readStreamPos >= 0)
 2669        {
 2670            Debug.Assert(!buf.empty());
 2671            _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2672        }
 2673        _readStreamPos = buf.empty() ? -1 : buf.b.position();
 2674    }
 2675
 2676    private void observerFinishRead(Ice.Internal.Buffer buf)
 2677    {
 2678        if (_readStreamPos == -1)
 2679        {
 2680            return;
 2681        }
 2682        Debug.Assert(buf.b.position() >= _readStreamPos);
 2683        _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2684        _readStreamPos = -1;
 2685    }
 2686
 2687    private void observerStartWrite(Ice.Internal.Buffer buf)
 2688    {
 2689        if (_writeStreamPos >= 0)
 2690        {
 2691            Debug.Assert(!buf.empty());
 2692            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2693        }
 2694        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
 2695    }
 2696
 2697    private void observerFinishWrite(Ice.Internal.Buffer buf)
 2698    {
 2699        if (_writeStreamPos == -1)
 2700        {
 2701            return;
 2702        }
 2703        if (buf.b.position() > _writeStreamPos)
 2704        {
 2705            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2706        }
 2707        _writeStreamPos = -1;
 2708    }
 2709
 2710    private int read(Ice.Internal.Buffer buf)
 2711    {
 2712        int start = buf.b.position();
 2713        int op = _transceiver.read(buf, ref _hasMoreData);
 2714        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2715        {
 2716            var s = new StringBuilder("received ");
 2717            if (_endpoint.datagram())
 2718            {
 2719                s.Append(buf.b.limit());
 2720            }
 2721            else
 2722            {
 2723                s.Append(buf.b.position() - start);
 2724                s.Append(" of ");
 2725                s.Append(buf.b.limit() - start);
 2726            }
 2727            s.Append(" bytes via ");
 2728            s.Append(_endpoint.protocol());
 2729            s.Append('\n');
 2730            s.Append(ToString());
 2731            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2732        }
 2733        return op;
 2734    }
 2735
 2736    private int write(Ice.Internal.Buffer buf)
 2737    {
 2738        int start = buf.b.position();
 2739        int op = _transceiver.write(buf);
 2740        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2741        {
 2742            var s = new StringBuilder("sent ");
 2743            s.Append(buf.b.position() - start);
 2744            if (!_endpoint.datagram())
 2745            {
 2746                s.Append(" of ");
 2747                s.Append(buf.b.limit() - start);
 2748            }
 2749            s.Append(" bytes via ");
 2750            s.Append(_endpoint.protocol());
 2751            s.Append('\n');
 2752            s.Append(ToString());
 2753            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2754        }
 2755        return op;
 2756    }
 2757
 2758    private void scheduleInactivityTimer()
 2759    {
 2760        // Called with the ConnectionI mutex locked.
 2761        Debug.Assert(_inactivityTimer is null);
 2762        Debug.Assert(_inactivityTimeout > TimeSpan.Zero);
 2763
 2764        _inactivityTimer = new System.Threading.Timer(
 2765            inactivityTimer => inactivityCheck((System.Threading.Timer)inactivityTimer));
 2766        _inactivityTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 2767    }
 2768
 2769    private void cancelInactivityTimer()
 2770    {
 2771        // Called with the ConnectionI mutex locked.
 2772        if (_inactivityTimer is not null)
 2773        {
 2774            _inactivityTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 2775            _inactivityTimer.Dispose();
 2776            _inactivityTimer = null;
 2777        }
 2778    }
 2779
 2780    private void scheduleCloseTimer()
 2781    {
 2782        if (_closeTimeout > TimeSpan.Zero)
 2783        {
 2784#pragma warning disable CA2000 // closeTimer is disposed by closeTimedOut.
 2785            var closeTimer = new System.Threading.Timer(
 2786                timerObj => closeTimedOut((System.Threading.Timer)timerObj));
 2787            // schedule timer to run once; closeTimedOut disposes the timer too.
 2788            closeTimer.Change(_closeTimeout, Timeout.InfiniteTimeSpan);
 2789#pragma warning restore CA2000
 2790        }
 2791    }
 2792
 2793    private void doApplicationClose()
 2794    {
 2795        // Called with the ConnectionI mutex locked.
 2796        Debug.Assert(_state < StateClosing);
 2797        setState(
 2798            StateClosing,
 2799            new ConnectionClosedException(
 2800                "The connection was closed gracefully by the application.",
 2801                closedByApplication: true));
 2802    }
 2803
 2804    private class OutgoingMessage
 2805    {
 12806        internal OutgoingMessage(OutputStream stream, bool compress)
 2807        {
 12808            this.stream = stream;
 12809            this.compress = compress;
 12810        }
 2811
 12812        internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
 2813        {
 12814            this.outAsync = outAsync;
 12815            this.stream = stream;
 12816            this.compress = compress;
 12817            this.requestId = requestId;
 12818        }
 2819
 2820        internal void canceled()
 2821        {
 2822            Debug.Assert(outAsync is not null); // Only requests can timeout.
 12823            outAsync = null;
 12824        }
 2825
 2826        internal bool sent()
 2827        {
 12828            stream = null;
 12829            if (outAsync is not null)
 2830            {
 12831                invokeSent = outAsync.sent();
 12832                return invokeSent || receivedReply;
 2833            }
 12834            return false;
 2835        }
 2836
 2837        internal void completed(LocalException ex)
 2838        {
 12839            if (outAsync is not null)
 2840            {
 12841                if (outAsync.exception(ex))
 2842                {
 12843                    outAsync.invokeException();
 2844                }
 2845            }
 12846            stream = null;
 12847        }
 2848
 2849        internal OutputStream stream;
 2850        internal OutgoingAsyncBase outAsync;
 2851        internal bool compress;
 2852        internal int requestId;
 2853        internal bool prepared;
 2854        internal bool isSent;
 2855        internal bool invokeSent;
 2856        internal bool receivedReply;
 2857    }
 2858
 2859    private static readonly ConnectionState[] connectionStateMap = [
 2860        ConnectionState.ConnectionStateValidating,   // StateNotInitialized
 2861        ConnectionState.ConnectionStateValidating,   // StateNotValidated
 2862        ConnectionState.ConnectionStateActive,       // StateActive
 2863        ConnectionState.ConnectionStateHolding,      // StateHolding
 2864        ConnectionState.ConnectionStateClosing,      // StateClosing
 2865        ConnectionState.ConnectionStateClosing,      // StateClosingPending
 2866        ConnectionState.ConnectionStateClosed,       // StateClosed
 2867        ConnectionState.ConnectionStateClosed,       // StateFinished
 2868    ];
 2869
 2870    private readonly Instance _instance;
 2871    private readonly Transceiver _transceiver;
 2872    private readonly IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
 2873
 2874    private string _desc;
 2875    private readonly string _type;
 2876    private readonly Connector _connector;
 2877    private readonly EndpointI _endpoint;
 2878
 2879    private ObjectAdapter _adapter;
 2880
 2881    private readonly Logger _logger;
 2882    private readonly TraceLevels _traceLevels;
 2883    private readonly Ice.Internal.ThreadPool _threadPool;
 2884    private readonly bool _threadHopRequired;
 2885
 2886    private readonly TimeSpan _connectTimeout;
 2887    private readonly TimeSpan _closeTimeout;
 2888    private TimeSpan _inactivityTimeout; // protected by _mutex
 2889
 2890    private System.Threading.Timer _inactivityTimer; // can be null
 2891
 2892    private StartCallback _startCallback;
 2893
 2894    // This action must be called outside the ConnectionI lock to avoid lock acquisition deadlocks.
 2895    private readonly Action<ConnectionI> _removeFromFactory;
 2896
 2897    private readonly bool _warn;
 2898    private readonly bool _warnUdp;
 2899
 2900    private readonly int _compressionLevel;
 2901
 2902    private int _nextRequestId;
 2903
 2904    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 2905
 2906    private LocalException _exception;
 2907
 2908    private readonly int _messageSizeMax;
 2909    private readonly BatchRequestQueue _batchRequestQueue;
 2910
 2911    private readonly LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
 2912
 2913    // Contains the message which is being received. If the connection is waiting to receive a message (_readHeader ==
 2914    // true), its size is Protocol.headerSize. Otherwise, its size is the message size specified in the received message
 2915    // header.
 2916    private readonly InputStream _readStream;
 2917
 2918    // When _readHeader is true, the next bytes we'll read are the header of a new message. When false, we're reading
 2919    // next the remainder of a message that was already partially received.
 2920    private bool _readHeader;
 2921
 2922    // Contains the message which is being sent. The write stream buffer is empty if no message is being sent.
 2923    private readonly OutputStream _writeStream;
 2924
 2925    private ConnectionObserver _observer;
 2926    private int _readStreamPos;
 2927    private int _writeStreamPos;
 2928
 2929    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent callbacks and
 2930    // connection establishment callbacks that have been started (or are about to be started) by a thread of the thread
 2931    // pool associated with this connection, and have not completed yet. All these operations except the connection
 2932    // establishment callbacks execute application code or code generated from Slice definitions.
 2933    private int _upcallCount;
 2934
 2935    // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
 2936    // _dispatchCount can be greater than a non-0 _maxDispatches when a receive a batch with multiples requests.
 2937    private int _dispatchCount;
 2938
 2939    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to back-pressure the peer.
 2940    // _maxDispatches <= 0 means no limit.
 2941    private readonly int _maxDispatches;
 2942
 2943    private int _state; // The current state.
 2944    private bool _shutdownInitiated;
 2945    private bool _initialized;
 2946    private bool _validated;
 2947
 2948    // When true, the application called close and Connection must close the connection when it receives the reply
 2949    // for the last outstanding invocation.
 2950    private bool _closeRequested;
 2951
 2952    private ConnectionInfo _info;
 2953
 2954    private CloseCallback _closeCallback;
 2955
 2956    // We need to run the continuation asynchronously since it can be completed by an Ice thread pool thread.
 2957    private readonly TaskCompletionSource _closed = new(TaskCreationOptions.RunContinuationsAsynchronously);
 2958    private readonly object _mutex = new();
 2959}