< Summary

Information
Class: Ice.ConnectionI
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/ConnectionI.cs
Tag: 71_18251537082
Line coverage
87%
Covered lines: 1093
Uncovered lines: 151
Coverable lines: 1244
Total lines: 2945
Line coverage: 87.8%
Branch coverage
83%
Covered branches: 607
Total branches: 728
Branch coverage: 83.3%
Method coverage
94%
Covered methods: 73
Total methods: 77
Method coverage: 94.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
start(...)87.5%8.01894.44%
startAndWait()30%27.81043.75%
activate()100%22100%
hold()100%22100%
destroy(...)50%44100%
abort()100%11100%
closeAsync()100%44100%
isActiveOrHolding()100%22100%
throwException()50%2.03280%
waitUntilHolding()100%44100%
waitUntilFinished()100%44100%
updateObserver()66.67%6.01692.86%
sendAsyncRequest(...)90%10.031093.33%
getBatchRequestQueue()100%11100%
flushBatchRequests(...)100%1.02175%
flushBatchRequestsAsync(...)100%11100%
disableInactivityCheck()100%11100%
setCloseCallback(...)25%7.73438.46%
asyncRequestCanceled(...)52.94%47.313477.42%
endpoint()100%11100%
connector()100%11100%
setAdapter(...)62.5%8.51880%
getAdapter()100%11100%
getEndpoint()100%11100%
createProxy(...)100%11100%
setAdapterFromAdapter(...)50%4.05485.71%
startAsync(...)50%2297.14%
finishAsync(...)100%2424100%
message(...)84.52%98.498487.29%
upcall(...)96.67%30.483091.89%
finished(...)100%88100%
finish()91.67%62.176091.55%
ToString()100%11100%
type()100%11100%
getInfo()100%22100%
setBufferSize(...)50%2.01285.71%
exception(...)100%11100%
getThreadPool()100%210%
.ctor(...)85.71%14.191490.16%
idleCheck(...)100%44100%
sendHeartbeat()88.46%26.932688.89%
isHeartbeat()100%210%
toConnectionState(...)100%11100%
setState(...)75%20.182092.31%
setState(...)88.57%83.627085.94%
initiateShutdown()100%88100%
initialize(...)100%22100%
validate(...)76%61.585083.33%
sendNextMessage(...)92.86%31.382883.72%
sendMessage(...)100%1010100%
doCompress(...)100%88100%
parseMessage(...)75%63.054478.57%
dispatchAll(...)87.5%8.42881.25%
dispatchAsync()100%2.08272.73%
sendResponse(...)96.15%2626100%
dispatchException(...)0%156120%
inactivityCheck(...)100%44100%
connectTimedOut(...)100%22100%
closeTimedOut(...)100%22100%
initConnectionInfo()100%88100%
warning(...)100%210%
observerStartRead(...)75%44100%
observerFinishRead(...)50%2.03280%
observerStartWrite(...)100%44100%
observerFinishWrite(...)100%44100%
read(...)83.33%6.01693.33%
write(...)100%66100%
scheduleInactivityTimer()100%11100%
cancelInactivityTimer()100%22100%
scheduleCloseTimer()100%22100%
doApplicationClose()100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
canceled()100%11100%
sent()100%44100%
completed(...)100%44100%
.cctor()100%11100%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/ConnectionI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Ice.Instrumentation;
 4using Ice.Internal;
 5using System.Diagnostics;
 6using System.Text;
 7
 8namespace Ice;
 9
 10#pragma warning disable CA1001 // _inactivityTimer is disposed by cancelInactivityTimer.
 11public sealed class ConnectionI : Internal.EventHandler, CancellationHandler, Connection
 12#pragma warning restore CA1001
 13{
 14    internal interface StartCallback
 15    {
 16        void connectionStartCompleted(ConnectionI connection);
 17
 18        void connectionStartFailed(ConnectionI connection, LocalException ex);
 19    }
 20
 21    internal void start(StartCallback callback)
 22    {
 23        try
 24        {
 125            lock (_mutex)
 26            {
 27                //
 28                // The connection might already be closed if the communicator was destroyed.
 29                //
 130                if (_state >= StateClosed)
 31                {
 32                    Debug.Assert(_exception is not null);
 033                    throw _exception;
 34                }
 35
 136                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 37                {
 138                    if (_connectTimeout > TimeSpan.Zero)
 39                    {
 40#pragma warning disable CA2000 // connectTimer is disposed by connectTimedOut.
 141                        var connectTimer = new System.Threading.Timer(
 142                            timerObj => connectTimedOut((System.Threading.Timer)timerObj));
 43                        // schedule timer to run once; connectTimedOut disposes the timer too.
 144                        connectTimer.Change(_connectTimeout, Timeout.InfiniteTimeSpan);
 45#pragma warning restore CA2000
 46                    }
 47
 148                    _startCallback = callback;
 149                    return;
 50                }
 51
 52                // The connection starts in the holding state. It will be activated by the connection factory.
 153                setState(StateHolding);
 154            }
 155        }
 156        catch (LocalException ex)
 57        {
 158            exception(ex);
 159            callback.connectionStartFailed(this, _exception);
 160            return;
 61        }
 62
 163        callback.connectionStartCompleted(this);
 164    }
 65
 66    internal void startAndWait()
 67    {
 68        try
 69        {
 170            lock (_mutex)
 71            {
 72                //
 73                // The connection might already be closed if the communicator was destroyed.
 74                //
 175                if (_state >= StateClosed)
 76                {
 77                    Debug.Assert(_exception is not null);
 078                    throw _exception;
 79                }
 80
 181                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 82                {
 83                    //
 84                    // Wait for the connection to be validated.
 85                    //
 086                    while (_state <= StateNotValidated)
 87                    {
 088                        Monitor.Wait(_mutex);
 89                    }
 90
 091                    if (_state >= StateClosing)
 92                    {
 93                        Debug.Assert(_exception is not null);
 094                        throw _exception;
 95                    }
 96                }
 97
 98                //
 99                // We start out in holding state.
 100                //
 1101                setState(StateHolding);
 1102            }
 1103        }
 0104        catch (LocalException ex)
 105        {
 0106            exception(ex);
 0107            waitUntilFinished();
 0108            return;
 109        }
 1110    }
 111
 112    internal void activate()
 113    {
 1114        lock (_mutex)
 115        {
 1116            if (_state <= StateNotValidated)
 117            {
 1118                return;
 119            }
 120
 1121            setState(StateActive);
 1122        }
 1123    }
 124
 125    internal void hold()
 126    {
 1127        lock (_mutex)
 128        {
 1129            if (_state <= StateNotValidated)
 130            {
 1131                return;
 132            }
 133
 1134            setState(StateHolding);
 1135        }
 1136    }
 137
 138    // DestructionReason.
 139    public const int ObjectAdapterDeactivated = 0;
 140    public const int CommunicatorDestroyed = 1;
 141
 142    internal void destroy(int reason)
 143    {
 1144        lock (_mutex)
 145        {
 146            switch (reason)
 147            {
 148                case ObjectAdapterDeactivated:
 149                {
 1150                    setState(StateClosing, new ObjectAdapterDeactivatedException(_adapter?.getName() ?? ""));
 1151                    break;
 152                }
 153
 154                case CommunicatorDestroyed:
 155                {
 1156                    setState(StateClosing, new CommunicatorDestroyedException());
 1157                    break;
 158                }
 159            }
 160        }
 1161    }
 162
 163    public void abort()
 164    {
 1165        lock (_mutex)
 166        {
 1167            setState(
 1168                StateClosed,
 1169                new ConnectionAbortedException(
 1170                    "The connection was aborted by the application.",
 1171                    closedByApplication: true));
 1172        }
 1173    }
 174
 175    public Task closeAsync()
 176    {
 1177        lock (_mutex)
 178        {
 1179            if (_state < StateClosing)
 180            {
 1181                if (_asyncRequests.Count == 0)
 182                {
 1183                    doApplicationClose();
 184                }
 185                else
 186                {
 1187                    _closeRequested = true;
 1188                    scheduleCloseTimer(); // we don't wait forever for outstanding invocations to complete
 189                }
 190            }
 191            // else nothing to do, already closing or closed.
 1192        }
 193
 1194        return _closed.Task;
 195    }
 196
 197    internal bool isActiveOrHolding()
 198    {
 1199        lock (_mutex)
 200        {
 1201            return _state > StateNotValidated && _state < StateClosing;
 202        }
 1203    }
 204
 205    public void throwException()
 206    {
 1207        lock (_mutex)
 208        {
 1209            if (_exception is not null)
 210            {
 211                Debug.Assert(_state >= StateClosing);
 0212                throw _exception;
 213            }
 1214        }
 1215    }
 216
 217    internal void waitUntilHolding()
 218    {
 1219        lock (_mutex)
 220        {
 1221            while (_state < StateHolding || _upcallCount > 0)
 222            {
 1223                Monitor.Wait(_mutex);
 224            }
 1225        }
 1226    }
 227
 228    internal void waitUntilFinished()
 229    {
 1230        lock (_mutex)
 231        {
 232            //
 233            // We wait indefinitely until the connection is finished and all
 234            // outstanding requests are completed. Otherwise we couldn't
 235            // guarantee that there are no outstanding calls when deactivate()
 236            // is called on the servant locators.
 237            //
 1238            while (_state < StateFinished || _upcallCount > 0)
 239            {
 1240                Monitor.Wait(_mutex);
 241            }
 242
 243            Debug.Assert(_state == StateFinished);
 244
 245            //
 246            // Clear the OA. See bug 1673 for the details of why this is necessary.
 247            //
 1248            _adapter = null;
 1249        }
 1250    }
 251
 252    internal void updateObserver()
 253    {
 1254        lock (_mutex)
 255        {
 1256            if (_state < StateNotValidated || _state > StateClosed)
 257            {
 0258                return;
 259            }
 260
 261            Debug.Assert(_instance.initializationData().observer is not null);
 1262            _observer = _instance.initializationData().observer.getConnectionObserver(
 1263                initConnectionInfo(),
 1264                _endpoint,
 1265                toConnectionState(_state),
 1266                _observer);
 1267            if (_observer is not null)
 268            {
 1269                _observer.attach();
 270            }
 271            else
 272            {
 1273                _writeStreamPos = -1;
 1274                _readStreamPos = -1;
 275            }
 1276        }
 1277    }
 278
 279    internal int sendAsyncRequest(
 280        OutgoingAsyncBase og,
 281        bool compress,
 282        bool response,
 283        int batchRequestCount)
 284    {
 1285        OutputStream os = og.getOs();
 286
 1287        lock (_mutex)
 288        {
 289            //
 290            // If the exception is closed before we even have a chance
 291            // to send our request, we always try to send the request
 292            // again.
 293            //
 1294            if (_exception is not null)
 295            {
 1296                throw new RetryException(_exception);
 297            }
 298
 299            Debug.Assert(_state > StateNotValidated);
 300            Debug.Assert(_state < StateClosing);
 301
 302            //
 303            // Ensure the message isn't bigger than what we can send with the
 304            // transport.
 305            //
 1306            _transceiver.checkSendSize(os.getBuffer());
 307
 308            //
 309            // Notify the request that it's cancelable with this connection.
 310            // This will throw if the request is canceled.
 311            //
 1312            og.cancelable(this);
 1313            int requestId = 0;
 1314            if (response)
 315            {
 316                //
 317                // Create a new unique request ID.
 318                //
 1319                requestId = _nextRequestId++;
 1320                if (requestId <= 0)
 321                {
 0322                    _nextRequestId = 1;
 0323                    requestId = _nextRequestId++;
 324                }
 325
 326                //
 327                // Fill in the request ID.
 328                //
 1329                os.pos(Protocol.headerSize);
 1330                os.writeInt(requestId);
 331            }
 1332            else if (batchRequestCount > 0)
 333            {
 1334                os.pos(Protocol.headerSize);
 1335                os.writeInt(batchRequestCount);
 336            }
 337
 1338            og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
 339
 340            // We're just about to send a request, so we are not inactive anymore.
 1341            cancelInactivityTimer();
 342
 1343            int status = OutgoingAsyncBase.AsyncStatusQueued;
 344            try
 345            {
 1346                var message = new OutgoingMessage(og, os, compress, requestId);
 1347                status = sendMessage(message);
 1348            }
 1349            catch (LocalException ex)
 350            {
 1351                setState(StateClosed, ex);
 352                Debug.Assert(_exception is not null);
 1353                throw _exception;
 354            }
 355
 1356            if (response)
 357            {
 358                //
 359                // Add to the async requests map.
 360                //
 1361                _asyncRequests[requestId] = og;
 362            }
 1363            return status;
 364        }
 1365    }
 366
 1367    internal BatchRequestQueue getBatchRequestQueue() => _batchRequestQueue;
 368
 369    public void flushBatchRequests(CompressBatch compress)
 370    {
 371        try
 372        {
 1373            var completed = new FlushBatchTaskCompletionCallback();
 1374            var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 1375            outgoing.invoke(_flushBatchRequests_name, compress, true);
 1376            completed.Task.Wait();
 1377        }
 0378        catch (AggregateException ex)
 379        {
 0380            throw ex.InnerException;
 381        }
 1382    }
 383
 384    public Task flushBatchRequestsAsync(
 385        CompressBatch compress,
 386        IProgress<bool> progress = null,
 387        CancellationToken cancel = default)
 388    {
 1389        var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
 1390        var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 1391        outgoing.invoke(_flushBatchRequests_name, compress, false);
 1392        return completed.Task;
 393    }
 394
 395    private const string _flushBatchRequests_name = "flushBatchRequests";
 396
 397    public void disableInactivityCheck()
 398    {
 1399        lock (_mutex)
 400        {
 1401            cancelInactivityTimer();
 1402            _inactivityTimeout = TimeSpan.Zero;
 1403        }
 1404    }
 405
 406    public void setCloseCallback(CloseCallback callback)
 407    {
 1408        lock (_mutex)
 409        {
 1410            if (_state >= StateClosed)
 411            {
 0412                if (callback is not null)
 413                {
 0414                    _threadPool.execute(
 0415                        () =>
 0416                        {
 0417                            try
 0418                            {
 0419                                callback(this);
 0420                            }
 0421                            catch (System.Exception ex)
 0422                            {
 0423                                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 0424                            }
 0425                        },
 0426                        this);
 427                }
 428            }
 429            else
 430            {
 1431                _closeCallback = callback;
 432            }
 1433        }
 1434    }
 435
 436    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
 437    {
 438        //
 439        // NOTE: This isn't called from a thread pool thread.
 440        //
 441
 1442        lock (_mutex)
 443        {
 1444            if (_state >= StateClosed)
 445            {
 0446                return; // The request has already been or will be shortly notified of the failure.
 447            }
 448
 1449            OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
 1450            if (o is not null)
 451            {
 1452                if (o.requestId > 0)
 453                {
 1454                    _asyncRequests.Remove(o.requestId);
 455                }
 456
 1457                if (ex is ConnectionAbortedException)
 458                {
 0459                    setState(StateClosed, ex);
 460                }
 461                else
 462                {
 463                    //
 464                    // If the request is being sent, don't remove it from the send streams,
 465                    // it will be removed once the sending is finished.
 466                    //
 1467                    if (o == _sendStreams.First.Value)
 468                    {
 0469                        o.canceled();
 470                    }
 471                    else
 472                    {
 1473                        o.canceled();
 1474                        _sendStreams.Remove(o);
 475                    }
 1476                    if (outAsync.exception(ex))
 477                    {
 1478                        outAsync.invokeExceptionAsync();
 479                    }
 480                }
 481
 1482                if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 483                {
 0484                    doApplicationClose();
 485                }
 1486                return;
 487            }
 488
 1489            if (outAsync is OutgoingAsync)
 490            {
 1491                foreach (KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
 492                {
 1493                    if (kvp.Value == outAsync)
 494                    {
 1495                        if (ex is ConnectionAbortedException)
 496                        {
 0497                            setState(StateClosed, ex);
 498                        }
 499                        else
 500                        {
 1501                            _asyncRequests.Remove(kvp.Key);
 1502                            if (outAsync.exception(ex))
 503                            {
 1504                                outAsync.invokeExceptionAsync();
 505                            }
 506                        }
 507
 1508                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 509                        {
 0510                            doApplicationClose();
 511                        }
 1512                        return;
 513                    }
 514                }
 515            }
 0516        }
 1517    }
 518
 1519    internal EndpointI endpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 520
 1521    internal Connector connector() => _connector; // No mutex protection necessary, _endpoint is immutable.
 522
 523    public void setAdapter(ObjectAdapter adapter)
 524    {
 1525        if (_connector is null) // server connection
 526        {
 0527            throw new InvalidOperationException("setAdapter can only be called on a client connection");
 528        }
 529
 1530        if (adapter is not null)
 531        {
 532            // Go through the adapter to set the adapter and servant manager on this connection
 533            // to ensure the object adapter is still active.
 1534            adapter.setAdapterOnConnection(this);
 535        }
 536        else
 537        {
 1538            lock (_mutex)
 539            {
 1540                if (_state <= StateNotValidated || _state >= StateClosing)
 541                {
 0542                    return;
 543                }
 1544                _adapter = null;
 1545            }
 546        }
 547
 548        //
 549        // We never change the thread pool with which we were initially
 550        // registered, even if we add or remove an object adapter.
 551        //
 1552    }
 553
 554    public ObjectAdapter getAdapter()
 555    {
 1556        lock (_mutex)
 557        {
 1558            return _adapter;
 559        }
 1560    }
 561
 1562    public Endpoint getEndpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 563
 564    public ObjectPrx createProxy(Identity id)
 565    {
 1566        ObjectAdapter.checkIdentity(id);
 1567        return new ObjectPrxHelper(_instance.referenceFactory().create(id, this));
 568    }
 569
 570    public void setAdapterFromAdapter(ObjectAdapter adapter)
 571    {
 1572        lock (_mutex)
 573        {
 1574            if (_state <= StateNotValidated || _state >= StateClosing)
 575            {
 0576                return;
 577            }
 578            Debug.Assert(adapter is not null); // Called by ObjectAdapter::setAdapterOnConnection
 1579            _adapter = adapter;
 580
 581            // Clear cached connection info (if any) as it's no longer accurate.
 1582            _info = null;
 1583        }
 1584    }
 585
 586    //
 587    // Operations from EventHandler
 588    //
 589    public override bool startAsync(int operation, Ice.Internal.AsyncCallback completedCallback)
 590    {
 1591        if (_state >= StateClosed)
 592        {
 0593            return false;
 594        }
 595
 596        // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
 597        // Ice thread pool thread is terminated (.NET Socket read/write fail with a SocketError.OperationAborted
 598        // error if started from a thread which is later terminated).
 1599        Task.Run(() =>
 1600        {
 1601            lock (_mutex)
 1602            {
 1603                if (_state >= StateClosed)
 1604                {
 1605                    completedCallback(this);
 1606                    return;
 1607                }
 1608
 1609                try
 1610                {
 1611                    if ((operation & SocketOperation.Write) != 0)
 1612                    {
 1613                        if (_observer != null)
 1614                        {
 1615                            observerStartWrite(_writeStream.getBuffer());
 1616                        }
 1617
 1618                        bool completedSynchronously =
 1619                            _transceiver.startWrite(
 1620                                _writeStream.getBuffer(),
 1621                                completedCallback,
 1622                                this,
 1623                                out bool messageWritten);
 1624
 1625                        // If the startWrite call wrote the message, we assume the message is sent now for at-most-once
 1626                        // semantics in the event the connection is closed while the message is still in _sendStreams.
 1627                        if (messageWritten && _sendStreams.Count > 0)
 1628                        {
 1629                            // See finish() code.
 1630                            _sendStreams.First.Value.isSent = true;
 1631                        }
 1632
 1633                        if (completedSynchronously)
 1634                        {
 1635                            // If the write completed synchronously, we need to call the completedCallback.
 1636                            completedCallback(this);
 1637                        }
 1638                    }
 1639                    else if ((operation & SocketOperation.Read) != 0)
 1640                    {
 1641                        if (_observer != null && !_readHeader)
 1642                        {
 1643                            observerStartRead(_readStream.getBuffer());
 1644                        }
 1645
 1646                        if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
 1647                        {
 1648                            completedCallback(this);
 1649                        }
 1650                    }
 1651                }
 1652                catch (LocalException ex)
 1653                {
 1654                    setState(StateClosed, ex);
 1655                    completedCallback(this);
 1656                }
 1657            }
 1658        });
 659
 1660        return true;
 661    }
 662
 663    public override bool finishAsync(int operation)
 664    {
 1665        if (_state >= StateClosed)
 666        {
 1667            return false;
 668        }
 669
 670        try
 671        {
 1672            if ((operation & SocketOperation.Write) != 0)
 673            {
 1674                Ice.Internal.Buffer buf = _writeStream.getBuffer();
 1675                int start = buf.b.position();
 1676                _transceiver.finishWrite(buf);
 1677                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 678                {
 1679                    var s = new StringBuilder("sent ");
 1680                    s.Append(buf.b.position() - start);
 1681                    if (!_endpoint.datagram())
 682                    {
 1683                        s.Append(" of ");
 1684                        s.Append(buf.b.limit() - start);
 685                    }
 1686                    s.Append(" bytes via ");
 1687                    s.Append(_endpoint.protocol());
 1688                    s.Append('\n');
 1689                    s.Append(ToString());
 1690                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 691                }
 692
 1693                if (_observer is not null)
 694                {
 1695                    observerFinishWrite(_writeStream.getBuffer());
 696                }
 697            }
 1698            else if ((operation & SocketOperation.Read) != 0)
 699            {
 1700                Ice.Internal.Buffer buf = _readStream.getBuffer();
 1701                int start = buf.b.position();
 1702                _transceiver.finishRead(buf);
 1703                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 704                {
 1705                    var s = new StringBuilder("received ");
 1706                    if (_endpoint.datagram())
 707                    {
 1708                        s.Append(buf.b.limit());
 709                    }
 710                    else
 711                    {
 1712                        s.Append(buf.b.position() - start);
 1713                        s.Append(" of ");
 1714                        s.Append(buf.b.limit() - start);
 715                    }
 1716                    s.Append(" bytes via ");
 1717                    s.Append(_endpoint.protocol());
 1718                    s.Append('\n');
 1719                    s.Append(ToString());
 1720                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 721                }
 722
 1723                if (_observer is not null && !_readHeader)
 724                {
 1725                    observerFinishRead(_readStream.getBuffer());
 726                }
 727            }
 1728        }
 1729        catch (LocalException ex)
 730        {
 1731            setState(StateClosed, ex);
 1732        }
 1733        return _state < StateClosed;
 734    }
 735
 736    public override void message(ThreadPoolCurrent current)
 737    {
 1738        StartCallback startCB = null;
 1739        Queue<OutgoingMessage> sentCBs = null;
 1740        var info = new MessageInfo();
 1741        int upcallCount = 0;
 742
 1743        using var msg = new ThreadPoolMessage(current, _mutex);
 1744        lock (_mutex)
 745        {
 746            try
 747            {
 1748                if (!msg.startIOScope())
 749                {
 1750                    return;
 751                }
 752
 1753                if (_state >= StateClosed)
 754                {
 0755                    return;
 756                }
 757
 758                try
 759                {
 1760                    int writeOp = SocketOperation.None;
 1761                    int readOp = SocketOperation.None;
 762
 763                    // If writes are ready, write the data from the connection's write buffer (_writeStream)
 1764                    if ((current.operation & SocketOperation.Write) != 0)
 765                    {
 1766                        if (_observer is not null)
 767                        {
 1768                            observerStartWrite(_writeStream.getBuffer());
 769                        }
 1770                        writeOp = write(_writeStream.getBuffer());
 1771                        if (_observer is not null && (writeOp & SocketOperation.Write) == 0)
 772                        {
 1773                            observerFinishWrite(_writeStream.getBuffer());
 774                        }
 775                    }
 776
 777                    // If reads are ready, read the data into the connection's read buffer (_readStream). The data is
 778                    // read until:
 779                    // - the full message is read (the transport read returns SocketOperationNone) and
 780                    //   the read buffer is fully filled
 781                    // - the read operation on the transport can't continue without blocking
 1782                    if ((current.operation & SocketOperation.Read) != 0)
 783                    {
 784                        while (true)
 785                        {
 1786                            Ice.Internal.Buffer buf = _readStream.getBuffer();
 787
 1788                            if (_observer is not null && !_readHeader)
 789                            {
 1790                                observerStartRead(buf);
 791                            }
 792
 1793                            readOp = read(buf);
 1794                            if ((readOp & SocketOperation.Read) != 0)
 795                            {
 796                                // Can't continue without blocking, exit out of the loop.
 797                                break;
 798                            }
 1799                            if (_observer is not null && !_readHeader)
 800                            {
 801                                Debug.Assert(!buf.b.hasRemaining());
 1802                                observerFinishRead(buf);
 803                            }
 804
 805                            // If read header is true, we're reading a new Ice protocol message and we need to read
 806                            // the message header.
 1807                            if (_readHeader)
 808                            {
 809                                // The next read will read the remainder of the message.
 1810                                _readHeader = false;
 811
 1812                                _observer?.receivedBytes(Protocol.headerSize);
 813
 814                                //
 815                                // Connection is validated on first message. This is only used by
 816                                // setState() to check whether or not we can print a connection
 817                                // warning (a client might close the connection forcefully if the
 818                                // connection isn't validated, we don't want to print a warning
 819                                // in this case).
 820                                //
 1821                                _validated = true;
 822
 823                                // Full header should be read because the size of _readStream is always headerSize (14)
 824                                // when reading a new message (see the code that sets _readHeader = true).
 1825                                int pos = _readStream.pos();
 1826                                if (pos < Protocol.headerSize)
 827                                {
 828                                    //
 829                                    // This situation is possible for small UDP packets.
 830                                    //
 0831                                    throw new MarshalException("Received Ice message with too few bytes in header.");
 832                                }
 833
 834                                // Decode the header.
 1835                                _readStream.pos(0);
 1836                                byte[] m = new byte[4];
 1837                                m[0] = _readStream.readByte();
 1838                                m[1] = _readStream.readByte();
 1839                                m[2] = _readStream.readByte();
 1840                                m[3] = _readStream.readByte();
 1841                                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 1842                                m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 843                                {
 0844                                    throw new ProtocolException(
 0845                                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 846                                }
 847
 1848                                var pv = new ProtocolVersion(_readStream);
 1849                                if (pv != Util.currentProtocol)
 850                                {
 0851                                    throw new MarshalException(
 0852                                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 853                                }
 1854                                var ev = new EncodingVersion(_readStream);
 1855                                if (ev != Util.currentProtocolEncoding)
 856                                {
 0857                                    throw new MarshalException(
 0858                                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 859                                }
 860
 1861                                _readStream.readByte(); // messageType
 1862                                _readStream.readByte(); // compress
 1863                                int size = _readStream.readInt();
 1864                                if (size < Protocol.headerSize)
 865                                {
 0866                                    throw new MarshalException($"Received Ice message with unexpected size {size}.");
 867                                }
 868
 869                                // Resize the read buffer to the message size.
 1870                                if (size > _messageSizeMax)
 871                                {
 1872                                    Ex.throwMemoryLimitException(size, _messageSizeMax);
 873                                }
 1874                                if (size > _readStream.size())
 875                                {
 1876                                    _readStream.resize(size);
 877                                }
 1878                                _readStream.pos(pos);
 879                            }
 880
 1881                            if (buf.b.hasRemaining())
 882                            {
 1883                                if (_endpoint.datagram())
 884                                {
 1885                                    throw new DatagramLimitException(); // The message was truncated.
 886                                }
 887                                continue;
 888                            }
 889                            break;
 890                        }
 891                    }
 892
 893                    // readOp and writeOp are set to the operations that the transport read or write calls from above
 894                    // returned. They indicate which operations will need to be monitored by the thread pool's selector
 895                    // when this method returns.
 1896                    int newOp = readOp | writeOp;
 897
 898                    // Operations that are ready. For example, if message was called with SocketOperationRead and the
 899                    // transport read returned SocketOperationNone, reads are considered done: there's no additional
 900                    // data to read.
 1901                    int readyOp = current.operation & ~newOp;
 902
 1903                    if (_state <= StateNotValidated)
 904                    {
 905                        // If the connection is still not validated and there's still data to read or write, continue
 906                        // waiting for data to read or write.
 1907                        if (newOp != 0)
 908                        {
 1909                            _threadPool.update(this, current.operation, newOp);
 1910                            return;
 911                        }
 912
 913                        // Initialize the connection if it's not initialized yet.
 1914                        if (_state == StateNotInitialized && !initialize(current.operation))
 915                        {
 1916                            return;
 917                        }
 918
 919                        // Validate the connection if it's not validated yet.
 1920                        if (_state <= StateNotValidated && !validate(current.operation))
 921                        {
 1922                            return;
 923                        }
 924
 925                        // The connection is validated and doesn't need additional data to be read or written. So
 926                        // unregister it from the thread pool's selector.
 1927                        _threadPool.unregister(this, current.operation);
 928
 929                        //
 930                        // We start out in holding state.
 931                        //
 1932                        setState(StateHolding);
 1933                        if (_startCallback is not null)
 934                        {
 1935                            startCB = _startCallback;
 1936                            _startCallback = null;
 1937                            if (startCB is not null)
 938                            {
 1939                                ++upcallCount;
 940                            }
 941                        }
 942                    }
 943                    else
 944                    {
 945                        Debug.Assert(_state <= StateClosingPending);
 946
 947                        //
 948                        // We parse messages first, if we receive a close
 949                        // connection message we won't send more messages.
 950                        //
 1951                        if ((readyOp & SocketOperation.Read) != 0)
 952                        {
 953                            // At this point, the protocol message is fully read and can therefore be decoded by
 954                            // parseMessage. parseMessage returns the operation to wait for readiness next.
 1955                            newOp |= parseMessage(ref info);
 1956                            upcallCount += info.upcallCount;
 957                        }
 958
 1959                        if ((readyOp & SocketOperation.Write) != 0)
 960                        {
 961                            // At this point the message from _writeStream is fully written and the next message can be
 962                            // written.
 963
 1964                            newOp |= sendNextMessage(out sentCBs);
 1965                            if (sentCBs is not null)
 966                            {
 1967                                ++upcallCount;
 968                            }
 969                        }
 970
 971                        // If the connection is not closed yet, we can update the thread pool selector to wait for
 972                        // readiness of read, write or both operations.
 1973                        if (_state < StateClosed)
 974                        {
 1975                            _threadPool.update(this, current.operation, newOp);
 976                        }
 977                    }
 978
 1979                    if (upcallCount == 0)
 980                    {
 1981                        return; // Nothing to execute, we're done!
 982                    }
 983
 1984                    _upcallCount += upcallCount;
 985
 986                    // There's something to execute so we mark IO as completed to elect a new leader thread and let IO
 987                    // be performed on this new leader thread while this thread continues with executing the upcalls.
 1988                    msg.ioCompleted();
 1989                }
 1990                catch (DatagramLimitException) // Expected.
 991                {
 1992                    if (_warnUdp)
 993                    {
 0994                        _logger.warning($"maximum datagram size of {_readStream.pos()} exceeded");
 995                    }
 1996                    _readStream.resize(Protocol.headerSize);
 1997                    _readStream.pos(0);
 1998                    _readHeader = true;
 1999                    return;
 1000                }
 11001                catch (SocketException ex)
 1002                {
 11003                    setState(StateClosed, ex);
 11004                    return;
 1005                }
 11006                catch (LocalException ex)
 1007                {
 11008                    if (_endpoint.datagram())
 1009                    {
 01010                        if (_warn)
 1011                        {
 01012                            _logger.warning($"datagram connection exception:\n{ex}\n{_desc}");
 1013                        }
 01014                        _readStream.resize(Protocol.headerSize);
 01015                        _readStream.pos(0);
 01016                        _readHeader = true;
 1017                    }
 1018                    else
 1019                    {
 11020                        setState(StateClosed, ex);
 1021                    }
 11022                    return;
 1023                }
 1024            }
 1025            finally
 1026            {
 11027                msg.finishIOScope();
 11028            }
 1029        }
 1030
 11031        _threadPool.executeFromThisThread(() => upcall(startCB, sentCBs, info), this);
 11032    }
 1033
 1034    private void upcall(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
 1035    {
 11036        int completedUpcallCount = 0;
 1037
 1038        //
 1039        // Notify the factory that the connection establishment and
 1040        // validation has completed.
 1041        //
 11042        if (startCB is not null)
 1043        {
 11044            startCB.connectionStartCompleted(this);
 11045            ++completedUpcallCount;
 1046        }
 1047
 1048        //
 1049        // Notify AMI calls that the message was sent.
 1050        //
 11051        if (sentCBs is not null)
 1052        {
 11053            foreach (OutgoingMessage m in sentCBs)
 1054            {
 11055                if (m.invokeSent)
 1056                {
 11057                    m.outAsync.invokeSent();
 1058                }
 11059                if (m.receivedReply)
 1060                {
 11061                    var outAsync = (OutgoingAsync)m.outAsync;
 11062                    if (outAsync.response())
 1063                    {
 11064                        outAsync.invokeResponse();
 1065                    }
 1066                }
 1067            }
 11068            ++completedUpcallCount;
 1069        }
 1070
 1071        //
 1072        // Asynchronous replies must be handled outside the thread
 1073        // synchronization, so that nested calls are possible.
 1074        //
 11075        if (info.outAsync is not null)
 1076        {
 11077            info.outAsync.invokeResponse();
 11078            ++completedUpcallCount;
 1079        }
 1080
 1081        //
 1082        // Method invocation (or multiple invocations for batch messages)
 1083        // must be done outside the thread synchronization, so that nested
 1084        // calls are possible.
 1085        //
 11086        if (info.requestCount > 0)
 1087        {
 11088            dispatchAll(info.stream, info.requestCount, info.requestId, info.compress, info.adapter);
 1089        }
 1090
 1091        //
 1092        // Decrease the upcall count.
 1093        //
 11094        bool finished = false;
 11095        if (completedUpcallCount > 0)
 1096        {
 11097            lock (_mutex)
 1098            {
 11099                _upcallCount -= completedUpcallCount;
 11100                if (_upcallCount == 0)
 1101                {
 1102                    // Only initiate shutdown if not already initiated. It might have already been initiated if the sent
 1103                    // callback or AMI callback was called when the connection was in the closing state.
 11104                    if (_state == StateClosing)
 1105                    {
 1106                        try
 1107                        {
 11108                            initiateShutdown();
 11109                        }
 01110                        catch (Ice.LocalException ex)
 1111                        {
 01112                            setState(StateClosed, ex);
 01113                        }
 1114                    }
 11115                    else if (_state == StateFinished)
 1116                    {
 11117                        finished = true;
 11118                        _observer?.detach();
 1119                    }
 11120                    Monitor.PulseAll(_mutex);
 1121                }
 11122            }
 1123        }
 1124
 11125        if (finished && _removeFromFactory is not null)
 1126        {
 11127            _removeFromFactory(this);
 1128        }
 11129    }
 1130
 1131    public override void finished(ThreadPoolCurrent current)
 1132    {
 1133        // Lock the connection here to ensure setState() completes before the code below is executed. This method can
 1134        // be called by the thread pool as soon as setState() calls _threadPool->finish(...). There's no need to lock
 1135        // the mutex for the remainder of the code because the data members accessed by finish() are immutable once
 1136        // _state == StateClosed (and we don't want to hold the mutex when calling upcalls).
 11137        lock (_mutex)
 1138        {
 1139            Debug.Assert(_state == StateClosed);
 11140        }
 1141
 1142        //
 1143        // If there are no callbacks to call, we don't call ioCompleted() since we're not going
 1144        // to call code that will potentially block (this avoids promoting a new leader and
 1145        // unnecessary thread creation, especially if this is called on shutdown).
 1146        //
 11147        if (_startCallback is null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback is null)
 1148        {
 11149            finish();
 11150            return;
 1151        }
 1152
 11153        current.ioCompleted();
 11154        _threadPool.executeFromThisThread(finish, this);
 11155    }
 1156
 1157    private void finish()
 1158    {
 11159        if (!_initialized)
 1160        {
 11161            if (_instance.traceLevels().network >= 2)
 1162            {
 11163                var s = new StringBuilder("failed to ");
 11164                s.Append(_connector is not null ? "establish" : "accept");
 11165                s.Append(' ');
 11166                s.Append(_endpoint.protocol());
 11167                s.Append(" connection\n");
 11168                s.Append(ToString());
 11169                s.Append('\n');
 11170                s.Append(_exception);
 11171                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1172            }
 1173        }
 1174        else
 1175        {
 11176            if (_instance.traceLevels().network >= 1)
 1177            {
 11178                var s = new StringBuilder("closed ");
 11179                s.Append(_endpoint.protocol());
 11180                s.Append(" connection\n");
 11181                s.Append(ToString());
 1182
 1183                // Trace the cause of most connection closures.
 11184                if (!(_exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException))
 1185                {
 11186                    s.Append('\n');
 11187                    s.Append(_exception);
 1188                }
 1189
 11190                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1191            }
 1192        }
 1193
 11194        if (_startCallback is not null)
 1195        {
 11196            _startCallback.connectionStartFailed(this, _exception);
 11197            _startCallback = null;
 1198        }
 1199
 11200        if (_sendStreams.Count > 0)
 1201        {
 11202            if (!_writeStream.isEmpty())
 1203            {
 1204                //
 1205                // Return the stream to the outgoing call. This is important for
 1206                // retriable AMI calls which are not marshaled again.
 1207                //
 11208                OutgoingMessage message = _sendStreams.First.Value;
 11209                _writeStream.swap(message.stream);
 1210
 1211                //
 1212                // The current message might be sent but not yet removed from _sendStreams. If
 1213                // the response has been received in the meantime, we remove the message from
 1214                // _sendStreams to not call finished on a message which is already done.
 1215                //
 11216                if (message.isSent || message.receivedReply)
 1217                {
 11218                    if (message.sent() && message.invokeSent)
 1219                    {
 11220                        message.outAsync.invokeSent();
 1221                    }
 11222                    if (message.receivedReply)
 1223                    {
 01224                        var outAsync = (OutgoingAsync)message.outAsync;
 01225                        if (outAsync.response())
 1226                        {
 01227                            outAsync.invokeResponse();
 1228                        }
 1229                    }
 11230                    _sendStreams.RemoveFirst();
 1231                }
 1232            }
 1233
 11234            foreach (OutgoingMessage o in _sendStreams)
 1235            {
 11236                o.completed(_exception);
 11237                if (o.requestId > 0) // Make sure finished isn't called twice.
 1238                {
 11239                    _asyncRequests.Remove(o.requestId);
 1240                }
 1241            }
 11242            _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
 1243        }
 1244
 11245        foreach (OutgoingAsyncBase o in _asyncRequests.Values)
 1246        {
 11247            if (o.exception(_exception))
 1248            {
 11249                o.invokeException();
 1250            }
 1251        }
 11252        _asyncRequests.Clear();
 1253
 1254        //
 1255        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
 1256        //
 11257        _writeStream.clear();
 11258        _writeStream.getBuffer().clear();
 11259        _readStream.clear();
 11260        _readStream.getBuffer().clear();
 1261
 11262        if (_exception is ConnectionClosedException or
 11263            CloseConnectionException or
 11264            CommunicatorDestroyedException or
 11265            ObjectAdapterDeactivatedException)
 1266        {
 1267            // Can execute synchronously. Note that we're not within a lock(this) here.
 11268            _closed.SetResult();
 1269        }
 1270        else
 1271        {
 1272            Debug.Assert(_exception is not null);
 11273            _closed.SetException(_exception);
 1274        }
 1275
 11276        if (_closeCallback is not null)
 1277        {
 1278            try
 1279            {
 11280                _closeCallback(this);
 11281            }
 01282            catch (System.Exception ex)
 1283            {
 01284                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 01285            }
 11286            _closeCallback = null;
 1287        }
 1288
 1289        //
 1290        // This must be done last as this will cause waitUntilFinished() to return (and communicator
 1291        // objects such as the timer might be destroyed too).
 1292        //
 11293        bool finished = false;
 11294        lock (_mutex)
 1295        {
 11296            setState(StateFinished);
 1297
 11298            if (_upcallCount == 0)
 1299            {
 11300                finished = true;
 11301                _observer?.detach();
 1302            }
 11303        }
 1304
 11305        if (finished && _removeFromFactory is not null)
 1306        {
 11307            _removeFromFactory(this);
 1308        }
 11309    }
 1310
 1311    /// <inheritdoc/>
 11312    public override string ToString() => _desc; // No mutex lock, _desc is immutable.
 1313
 1314    /// <inheritdoc/>
 11315    public string type() => _type; // No mutex lock, _type is immutable.
 1316
 1317    /// <inheritdoc/>
 1318    public ConnectionInfo getInfo()
 1319    {
 11320        lock (_mutex)
 1321        {
 11322            if (_state >= StateClosed)
 1323            {
 11324                throw _exception;
 1325            }
 11326            return initConnectionInfo();
 1327        }
 11328    }
 1329
 1330    /// <inheritdoc/>
 1331    public void setBufferSize(int rcvSize, int sndSize)
 1332    {
 11333        lock (_mutex)
 1334        {
 11335            if (_state >= StateClosed)
 1336            {
 01337                throw _exception;
 1338            }
 11339            _transceiver.setBufferSize(rcvSize, sndSize);
 11340            _info = null; // Invalidate the cached connection info
 11341        }
 11342    }
 1343
 1344    public void exception(LocalException ex)
 1345    {
 11346        lock (_mutex)
 1347        {
 11348            setState(StateClosed, ex);
 11349        }
 11350    }
 1351
 01352    public Ice.Internal.ThreadPool getThreadPool() => _threadPool;
 1353
 11354    internal ConnectionI(
 11355        Instance instance,
 11356        Transceiver transceiver,
 11357        Connector connector, // null for incoming connections, non-null for outgoing connections
 11358        EndpointI endpoint,
 11359        ObjectAdapter adapter,
 11360        Action<ConnectionI> removeFromFactory, // can be null
 11361        ConnectionOptions options)
 1362    {
 11363        _instance = instance;
 11364        _desc = transceiver.ToString();
 11365        _type = transceiver.protocol();
 11366        _connector = connector;
 11367        _endpoint = endpoint;
 11368        _adapter = adapter;
 11369        InitializationData initData = instance.initializationData();
 11370        _logger = initData.logger; // Cached for better performance.
 11371        _traceLevels = instance.traceLevels(); // Cached for better performance.
 11372        _connectTimeout = options.connectTimeout;
 11373        _closeTimeout = options.closeTimeout; // not used for datagram connections
 1374        // suppress inactivity timeout for datagram connections
 11375        _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
 11376        _maxDispatches = options.maxDispatches;
 11377        _removeFromFactory = removeFromFactory;
 11378        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 11379        _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
 11380        _nextRequestId = 1;
 11381        _messageSizeMax = connector is null ? adapter.messageSizeMax() : instance.messageSizeMax();
 11382        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
 11383        _readStream = new InputStream(instance, Util.currentProtocolEncoding);
 11384        _readHeader = false;
 11385        _readStreamPos = -1;
 11386        _writeStream = new OutputStream(); // temporary stream
 11387        _writeStreamPos = -1;
 11388        _upcallCount = 0;
 11389        _state = StateNotInitialized;
 1390
 11391        _compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
 11392        if (_compressionLevel < 1)
 1393        {
 01394            _compressionLevel = 1;
 1395        }
 11396        else if (_compressionLevel > 9)
 1397        {
 01398            _compressionLevel = 9;
 1399        }
 1400
 11401        if (options.idleTimeout > TimeSpan.Zero && !endpoint.datagram())
 1402        {
 11403            _idleTimeoutTransceiver = new IdleTimeoutTransceiverDecorator(
 11404                transceiver,
 11405                this,
 11406                options.idleTimeout,
 11407                options.enableIdleCheck);
 11408            transceiver = _idleTimeoutTransceiver;
 1409        }
 11410        _transceiver = transceiver;
 1411
 1412        try
 1413        {
 11414            if (connector is null)
 1415            {
 1416                // adapter is always set for incoming connections
 1417                Debug.Assert(adapter is not null);
 11418                _threadPool = adapter.getThreadPool();
 1419            }
 1420            else
 1421            {
 1422                // we use the client thread pool for outgoing connections, even if there is an
 1423                // object adapter with its own thread pool.
 11424                _threadPool = instance.clientThreadPool();
 1425            }
 11426            _threadPool.initialize(this);
 11427        }
 01428        catch (LocalException)
 1429        {
 01430            throw;
 1431        }
 01432        catch (System.Exception ex)
 1433        {
 01434            throw new SyscallException(ex);
 1435        }
 11436    }
 1437
 1438    /// <summary>Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active and
 1439    /// does not receive a byte for some time. See the IdleTimeoutTransceiverDecorator.</summary>
 1440    internal void idleCheck(TimeSpan idleTimeout)
 1441    {
 11442        lock (_mutex)
 1443        {
 11444            if (_state == StateActive && _idleTimeoutTransceiver!.idleCheckEnabled)
 1445            {
 11446                int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;
 1447
 11448                setState(
 11449                    StateClosed,
 11450                    new ConnectionAbortedException(
 11451                        $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSe
 11452                        closedByApplication: false));
 1453            }
 1454            // else nothing to do
 11455        }
 11456    }
 1457
 1458    internal void sendHeartbeat()
 1459    {
 1460        Debug.Assert(!_endpoint.datagram());
 1461
 11462        lock (_mutex)
 1463        {
 11464            if (_state == StateActive || _state == StateHolding || _state == StateClosing)
 1465            {
 1466                // We check if the connection has become inactive.
 11467                if (
 11468                    _inactivityTimer is null &&           // timer not already scheduled
 11469                    _inactivityTimeout > TimeSpan.Zero && // inactivity timeout is enabled
 11470                    _state == StateActive &&              // only schedule the timer if the connection is active
 11471                    _dispatchCount == 0 &&                // no pending dispatch
 11472                    _asyncRequests.Count == 0 &&          // no pending invocation
 11473                    _readHeader &&                        // we're not waiting for the remainder of an incoming message
 11474                    _sendStreams.Count <= 1)              // there is at most one pending outgoing message
 1475                {
 1476                    // We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
 1477                    // inactivity timer if there is no pending outgoing message or the pending outgoing message is a
 1478                    // heartbeat.
 1479
 1480                    // The stream of the first _sendStreams message is in _writeStream.
 11481                    if (_sendStreams.Count == 0 || isHeartbeat(_writeStream))
 1482                    {
 11483                        scheduleInactivityTimer();
 1484                    }
 1485                }
 1486
 1487                // We send a heartbeat to the peer to generate a "write" on the connection. This write in turns creates
 1488                // a read on the peer, and resets the peer's idle check timer. When _sendStream is not empty, there is
 1489                // already an outstanding write, so we don't need to send a heartbeat. It's possible the first message
 1490                // of _sendStreams was already sent but not yet removed from _sendStreams: it means the last write
 1491                // occurred very recently, which is good enough with respect to the idle check.
 1492                // As a result of this optimization, the only possible heartbeat in _sendStreams is the first
 1493                // _sendStreams message.
 11494                if (_sendStreams.Count == 0)
 1495                {
 11496                    var os = new OutputStream(Util.currentProtocolEncoding);
 11497                    os.writeBlob(Protocol.magic);
 11498                    ProtocolVersion.ice_write(os, Util.currentProtocol);
 11499                    EncodingVersion.ice_write(os, Util.currentProtocolEncoding);
 11500                    os.writeByte(Protocol.validateConnectionMsg);
 11501                    os.writeByte(0);
 11502                    os.writeInt(Protocol.headerSize); // Message size.
 1503                    try
 1504                    {
 11505                        _ = sendMessage(new OutgoingMessage(os, compress: false));
 11506                    }
 01507                    catch (LocalException ex)
 1508                    {
 01509                        setState(StateClosed, ex);
 01510                    }
 1511                }
 1512            }
 1513            // else nothing to do
 11514        }
 1515
 1516        static bool isHeartbeat(OutputStream stream) =>
 01517            stream.getBuffer().b.get(8) == Protocol.validateConnectionMsg;
 11518    }
 1519
 1520    private const int StateNotInitialized = 0;
 1521    private const int StateNotValidated = 1;
 1522    private const int StateActive = 2;
 1523    private const int StateHolding = 3;
 1524    private const int StateClosing = 4;
 1525    private const int StateClosingPending = 5;
 1526    private const int StateClosed = 6;
 1527    private const int StateFinished = 7;
 1528
 11529    private static ConnectionState toConnectionState(int state) => connectionStateMap[state];
 1530
 1531    private void setState(int state, LocalException ex)
 1532    {
 1533        //
 1534        // If setState() is called with an exception, then only closed
 1535        // and closing states are permissible.
 1536        //
 1537        Debug.Assert(state >= StateClosing);
 1538
 11539        if (_state == state) // Don't switch twice.
 1540        {
 11541            return;
 1542        }
 1543
 11544        if (_exception is null)
 1545        {
 1546            //
 1547            // If we are in closed state, an exception must be set.
 1548            //
 1549            Debug.Assert(_state != StateClosed);
 1550
 11551            _exception = ex;
 1552
 1553            //
 1554            // We don't warn if we are not validated.
 1555            //
 11556            if (_warn && _validated)
 1557            {
 1558                //
 1559                // Don't warn about certain expected exceptions.
 1560                //
 11561                if (!(_exception is CloseConnectionException ||
 11562                     _exception is ConnectionClosedException ||
 11563                     _exception is CommunicatorDestroyedException ||
 11564                     _exception is ObjectAdapterDeactivatedException ||
 11565                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1566                {
 01567                    warning("connection exception", _exception);
 1568                }
 1569            }
 1570        }
 1571
 1572        //
 1573        // We must set the new state before we notify requests of any
 1574        // exceptions. Otherwise new requests may retry on a
 1575        // connection that is not yet marked as closed or closing.
 1576        //
 11577        setState(state);
 11578    }
 1579
 1580    private void setState(int state)
 1581    {
 1582        //
 1583        // We don't want to send close connection messages if the endpoint
 1584        // only supports oneway transmission from client to server.
 1585        //
 11586        if (_endpoint.datagram() && state == StateClosing)
 1587        {
 11588            state = StateClosed;
 1589        }
 1590
 1591        //
 1592        // Skip graceful shutdown if we are destroyed before validation.
 1593        //
 11594        if (_state <= StateNotValidated && state == StateClosing)
 1595        {
 11596            state = StateClosed;
 1597        }
 1598
 11599        if (_state == state) // Don't switch twice.
 1600        {
 01601            return;
 1602        }
 1603
 11604        if (state > StateActive)
 1605        {
 1606            // Dispose the inactivity timer, if not null.
 11607            cancelInactivityTimer();
 1608        }
 1609
 1610        try
 1611        {
 1612            switch (state)
 1613            {
 1614                case StateNotInitialized:
 1615                {
 1616                    Debug.Assert(false);
 1617                    break;
 1618                }
 1619
 1620                case StateNotValidated:
 1621                {
 11622                    if (_state != StateNotInitialized)
 1623                    {
 1624                        Debug.Assert(_state == StateClosed);
 01625                        return;
 1626                    }
 1627                    break;
 1628                }
 1629
 1630                case StateActive:
 1631                {
 1632                    //
 1633                    // Can only switch to active from holding or not validated.
 1634                    //
 11635                    if (_state != StateHolding && _state != StateNotValidated)
 1636                    {
 01637                        return;
 1638                    }
 1639
 11640                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)
 1641                    {
 11642                        _threadPool.register(this, SocketOperation.Read);
 11643                        _idleTimeoutTransceiver?.enableIdleCheck();
 1644                    }
 1645                    // else don't resume reading since we're at or over the _maxDispatches limit.
 1646
 11647                    break;
 1648                }
 1649
 1650                case StateHolding:
 1651                {
 1652                    //
 1653                    // Can only switch to holding from active or not validated.
 1654                    //
 11655                    if (_state != StateActive && _state != StateNotValidated)
 1656                    {
 11657                        return;
 1658                    }
 1659
 11660                    if (_state == StateActive && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches))
 1661                    {
 11662                        _threadPool.unregister(this, SocketOperation.Read);
 11663                        _idleTimeoutTransceiver?.disableIdleCheck();
 1664                    }
 1665                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
 1666
 11667                    break;
 1668                }
 1669
 1670                case StateClosing:
 1671                case StateClosingPending:
 1672                {
 1673                    //
 1674                    // Can't change back from closing pending.
 1675                    //
 11676                    if (_state >= StateClosingPending)
 1677                    {
 11678                        return;
 1679                    }
 1680                    break;
 1681                }
 1682
 1683                case StateClosed:
 1684                {
 11685                    if (_state == StateFinished)
 1686                    {
 11687                        return;
 1688                    }
 1689
 11690                    _batchRequestQueue.destroy(_exception);
 11691                    _threadPool.finish(this);
 11692                    _transceiver.close();
 11693                    break;
 1694                }
 1695
 1696                case StateFinished:
 1697                {
 1698                    Debug.Assert(_state == StateClosed);
 11699                    _transceiver.destroy();
 1700                    break;
 1701                }
 1702            }
 11703        }
 01704        catch (LocalException ex)
 1705        {
 01706            _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
 01707        }
 1708
 11709        if (_instance.initializationData().observer is not null)
 1710        {
 11711            ConnectionState oldState = toConnectionState(_state);
 11712            ConnectionState newState = toConnectionState(state);
 11713            if (oldState != newState)
 1714            {
 11715                _observer = _instance.initializationData().observer.getConnectionObserver(
 11716                    initConnectionInfo(),
 11717                    _endpoint,
 11718                    newState,
 11719                    _observer);
 11720                if (_observer is not null)
 1721                {
 11722                    _observer.attach();
 1723                }
 1724                else
 1725                {
 11726                    _writeStreamPos = -1;
 11727                    _readStreamPos = -1;
 1728                }
 1729            }
 11730            if (_observer is not null && state == StateClosed && _exception is not null)
 1731            {
 11732                if (!(_exception is CloseConnectionException ||
 11733                     _exception is ConnectionClosedException ||
 11734                     _exception is CommunicatorDestroyedException ||
 11735                     _exception is ObjectAdapterDeactivatedException ||
 11736                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1737                {
 11738                    _observer.failed(_exception.ice_id());
 1739                }
 1740            }
 1741        }
 11742        _state = state;
 1743
 11744        Monitor.PulseAll(_mutex);
 1745
 11746        if (_state == StateClosing && _upcallCount == 0)
 1747        {
 1748            try
 1749            {
 11750                initiateShutdown();
 11751            }
 01752            catch (LocalException ex)
 1753            {
 01754                setState(StateClosed, ex);
 01755            }
 1756        }
 11757    }
 1758
 1759    private void initiateShutdown()
 1760    {
 1761        Debug.Assert(_state == StateClosing && _upcallCount == 0);
 1762
 11763        if (_shutdownInitiated)
 1764        {
 11765            return;
 1766        }
 11767        _shutdownInitiated = true;
 1768
 11769        if (!_endpoint.datagram())
 1770        {
 1771            //
 1772            // Before we shut down, we send a close connection message.
 1773            //
 11774            var os = new OutputStream(Util.currentProtocolEncoding);
 11775            os.writeBlob(Protocol.magic);
 11776            ProtocolVersion.ice_write(os, Util.currentProtocol);
 11777            EncodingVersion.ice_write(os, Util.currentProtocolEncoding);
 11778            os.writeByte(Protocol.closeConnectionMsg);
 11779            os.writeByte(0); // Compression status: always zero for close connection.
 11780            os.writeInt(Protocol.headerSize); // Message size.
 1781
 11782            scheduleCloseTimer();
 1783
 11784            if ((sendMessage(new OutgoingMessage(os, compress: false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
 1785            {
 11786                setState(StateClosingPending);
 1787
 1788                //
 1789                // Notify the transceiver of the graceful connection closure.
 1790                //
 11791                int op = _transceiver.closing(true, _exception);
 11792                if (op != 0)
 1793                {
 11794                    _threadPool.register(this, op);
 1795                }
 1796            }
 1797        }
 11798    }
 1799
 1800    private bool initialize(int operation)
 1801    {
 11802        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
 11803        if (s != SocketOperation.None)
 1804        {
 11805            _threadPool.update(this, operation, s);
 11806            return false;
 1807        }
 1808
 1809        //
 1810        // Update the connection description once the transceiver is initialized.
 1811        //
 11812        _desc = _transceiver.ToString();
 11813        _initialized = true;
 11814        setState(StateNotValidated);
 1815
 11816        return true;
 1817    }
 1818
 1819    private bool validate(int operation)
 1820    {
 11821        if (!_endpoint.datagram()) // Datagram connections are always implicitly validated.
 1822        {
 11823            if (_connector is null) // The server side has the active role for connection validation.
 1824            {
 11825                if (_writeStream.size() == 0)
 1826                {
 11827                    _writeStream.writeBlob(Protocol.magic);
 11828                    ProtocolVersion.ice_write(_writeStream, Util.currentProtocol);
 11829                    EncodingVersion.ice_write(_writeStream, Util.currentProtocolEncoding);
 11830                    _writeStream.writeByte(Protocol.validateConnectionMsg);
 11831                    _writeStream.writeByte(0); // Compression status (always zero for validate connection).
 11832                    _writeStream.writeInt(Protocol.headerSize); // Message size.
 11833                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
 11834                    _writeStream.prepareWrite();
 1835                }
 1836
 11837                if (_observer is not null)
 1838                {
 01839                    observerStartWrite(_writeStream.getBuffer());
 1840                }
 1841
 11842                if (_writeStream.pos() != _writeStream.size())
 1843                {
 11844                    int op = write(_writeStream.getBuffer());
 11845                    if (op != 0)
 1846                    {
 11847                        _threadPool.update(this, operation, op);
 11848                        return false;
 1849                    }
 1850                }
 1851
 11852                if (_observer is not null)
 1853                {
 01854                    observerFinishWrite(_writeStream.getBuffer());
 1855                }
 1856            }
 1857            else // The client side has the passive role for connection validation.
 1858            {
 11859                if (_readStream.size() == 0)
 1860                {
 11861                    _readStream.resize(Protocol.headerSize);
 11862                    _readStream.pos(0);
 1863                }
 1864
 11865                if (_observer is not null)
 1866                {
 01867                    observerStartRead(_readStream.getBuffer());
 1868                }
 1869
 11870                if (_readStream.pos() != _readStream.size())
 1871                {
 11872                    int op = read(_readStream.getBuffer());
 11873                    if (op != 0)
 1874                    {
 11875                        _threadPool.update(this, operation, op);
 11876                        return false;
 1877                    }
 1878                }
 1879
 11880                if (_observer is not null)
 1881                {
 01882                    observerFinishRead(_readStream.getBuffer());
 1883                }
 1884
 11885                _validated = true;
 1886
 1887                Debug.Assert(_readStream.pos() == Protocol.headerSize);
 11888                _readStream.pos(0);
 11889                byte[] m = _readStream.readBlob(4);
 11890                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 11891                   m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 1892                {
 01893                    throw new ProtocolException(
 01894                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 1895                }
 1896
 11897                var pv = new ProtocolVersion(_readStream);
 11898                if (pv != Util.currentProtocol)
 1899                {
 01900                    throw new MarshalException(
 01901                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 1902                }
 11903                var ev = new EncodingVersion(_readStream);
 11904                if (ev != Util.currentProtocolEncoding)
 1905                {
 01906                    throw new MarshalException(
 01907                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 1908                }
 1909
 11910                byte messageType = _readStream.readByte();
 11911                if (messageType != Protocol.validateConnectionMsg)
 1912                {
 01913                    throw new ProtocolException(
 01914                        $"Received message of type {messageType} over a connection that is not yet validated.");
 1915                }
 11916                _readStream.readByte(); // Ignore compression status for validate connection.
 11917                int size = _readStream.readInt();
 11918                if (size != Protocol.headerSize)
 1919                {
 01920                    throw new MarshalException($"Received ValidateConnection message with unexpected size {size}.");
 1921                }
 11922                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);
 1923
 1924                // Client connection starts sending heartbeats once it's received the ValidateConnection message.
 11925                _idleTimeoutTransceiver?.scheduleHeartbeat();
 1926            }
 1927        }
 1928
 11929        _writeStream.resize(0);
 11930        _writeStream.pos(0);
 1931
 11932        _readStream.resize(Protocol.headerSize);
 11933        _readStream.pos(0);
 11934        _readHeader = true;
 1935
 11936        if (_instance.traceLevels().network >= 1)
 1937        {
 11938            var s = new StringBuilder();
 11939            if (_endpoint.datagram())
 1940            {
 11941                s.Append("starting to ");
 11942                s.Append(_connector is not null ? "send" : "receive");
 11943                s.Append(' ');
 11944                s.Append(_endpoint.protocol());
 11945                s.Append(" messages\n");
 11946                s.Append(_transceiver.toDetailedString());
 1947            }
 1948            else
 1949            {
 11950                s.Append(_connector is not null ? "established" : "accepted");
 11951                s.Append(' ');
 11952                s.Append(_endpoint.protocol());
 11953                s.Append(" connection\n");
 11954                s.Append(ToString());
 1955            }
 11956            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1957        }
 1958
 11959        return true;
 1960    }
 1961
 1962    /// <summary>
 1963    /// Sends the next queued messages. This method is called by message() once the message which is being sent
 1964    /// (_sendStreams.First) is fully sent. Before sending the next message, this message is removed from _sendsStream
 1965    /// If any, its sent callback is also queued in given callback queue.
 1966    /// </summary>
 1967    /// <param name="callbacks">The sent callbacks to call for the messages that were sent.</param>
 1968    /// <returns>The socket operation to register with the thread pool's selector to send the remainder of the pending
 1969    /// message being sent (_sendStreams.First).</returns>
 1970    private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
 1971    {
 11972        callbacks = null;
 1973
 11974        if (_sendStreams.Count == 0)
 1975        {
 1976            // This can occur if no message was being written and the socket write operation was registered with the
 1977            // thread pool (a transceiver read method can request writing data).
 11978            return SocketOperation.None;
 1979        }
 11980        else if (_state == StateClosingPending && _writeStream.pos() == 0)
 1981        {
 1982            // Message wasn't sent, empty the _writeStream, we're not going to send more data because the connection
 1983            // is being closed.
 01984            OutgoingMessage message = _sendStreams.First.Value;
 01985            _writeStream.swap(message.stream);
 01986            return SocketOperation.None;
 1987        }
 1988
 1989        // Assert that the message was fully written.
 1990        Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
 1991
 1992        try
 1993        {
 11994            while (true)
 1995            {
 1996                //
 1997                // The message that was being sent is sent. We can swap back the write stream buffer to the
 1998                // outgoing message (required for retry) and queue its sent callback (if any).
 1999                //
 12000                OutgoingMessage message = _sendStreams.First.Value;
 12001                _writeStream.swap(message.stream);
 12002                if (message.sent())
 2003                {
 12004                    callbacks ??= new Queue<OutgoingMessage>();
 12005                    callbacks.Enqueue(message);
 2006                }
 12007                _sendStreams.RemoveFirst();
 2008
 2009                //
 2010                // If there's nothing left to send, we're done.
 2011                //
 12012                if (_sendStreams.Count == 0)
 2013                {
 2014                    break;
 2015                }
 2016
 2017                //
 2018                // If we are in the closed state or if the close is pending, don't continue sending. This can occur if
 2019                // parseMessage (called before sendNextMessage by message()) closes the connection.
 2020                //
 12021                if (_state >= StateClosingPending)
 2022                {
 02023                    return SocketOperation.None;
 2024                }
 2025
 2026                //
 2027                // Otherwise, prepare the next message.
 2028                //
 12029                message = _sendStreams.First.Value;
 2030                Debug.Assert(!message.prepared);
 12031                OutputStream stream = message.stream;
 2032
 12033                message.stream = doCompress(message.stream, message.compress);
 12034                message.stream.prepareWrite();
 12035                message.prepared = true;
 2036
 12037                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2038
 2039                //
 2040                // Send the message.
 2041                //
 12042                _writeStream.swap(message.stream);
 12043                if (_observer is not null)
 2044                {
 12045                    observerStartWrite(_writeStream.getBuffer());
 2046                }
 12047                if (_writeStream.pos() != _writeStream.size())
 2048                {
 12049                    int op = write(_writeStream.getBuffer());
 12050                    if (op != 0)
 2051                    {
 12052                        return op;
 2053                    }
 2054                }
 12055                if (_observer is not null)
 2056                {
 12057                    observerFinishWrite(_writeStream.getBuffer());
 2058                }
 2059
 2060                // If the message was sent right away, loop to send the next queued message.
 2061            }
 2062
 2063            // Once the CloseConnection message is sent, we transition to the StateClosingPending state.
 12064            if (_state == StateClosing && _shutdownInitiated)
 2065            {
 12066                setState(StateClosingPending);
 12067                int op = _transceiver.closing(true, _exception);
 12068                if (op != 0)
 2069                {
 12070                    return op;
 2071                }
 2072            }
 12073        }
 02074        catch (LocalException ex)
 2075        {
 02076            setState(StateClosed, ex);
 02077        }
 12078        return SocketOperation.None;
 12079    }
 2080
 2081    /// <summary>
 2082    /// Sends or queues the given message.
 2083    /// </summary>
 2084    /// <param name="message">The message to send.</param>
 2085    /// <returns>The send status.</returns>
 2086    private int sendMessage(OutgoingMessage message)
 2087    {
 2088        Debug.Assert(_state >= StateActive);
 2089        Debug.Assert(_state < StateClosed);
 2090
 2091        // Some messages are queued for sending. Just adds the message to the send queue and tell the caller that
 2092        // the message was queued.
 12093        if (_sendStreams.Count > 0)
 2094        {
 12095            _sendStreams.AddLast(message);
 12096            return OutgoingAsyncBase.AsyncStatusQueued;
 2097        }
 2098
 2099        // Prepare the message for sending.
 2100        Debug.Assert(!message.prepared);
 2101
 12102        OutputStream stream = message.stream;
 2103
 12104        message.stream = doCompress(stream, message.compress);
 12105        message.stream.prepareWrite();
 12106        message.prepared = true;
 2107
 12108        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2109
 2110        // Send the message without blocking.
 12111        if (_observer is not null)
 2112        {
 12113            observerStartWrite(message.stream.getBuffer());
 2114        }
 12115        int op = write(message.stream.getBuffer());
 12116        if (op == 0)
 2117        {
 2118            // The message was sent so we're done.
 2119
 12120            if (_observer is not null)
 2121            {
 12122                observerFinishWrite(message.stream.getBuffer());
 2123            }
 2124
 12125            int status = OutgoingAsyncBase.AsyncStatusSent;
 12126            if (message.sent())
 2127            {
 2128                // If there's a sent callback, indicate the caller that it should invoke the sent callback.
 12129                status |= OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
 2130            }
 2131
 12132            return status;
 2133        }
 2134
 2135        // The message couldn't be sent right away so we add it to the send stream queue (which is empty) and swap its
 2136        // stream with `_writeStream`. The socket operation returned by the transceiver write is registered with the
 2137        // thread pool. At this point the message() method will take care of sending the whole message (held by
 2138        // _writeStream) when the transceiver is ready to write more of the message buffer.
 2139
 12140        _writeStream.swap(message.stream);
 12141        _sendStreams.AddLast(message);
 12142        _threadPool.register(this, op);
 12143        return OutgoingAsyncBase.AsyncStatusQueued;
 2144    }
 2145
 2146    private OutputStream doCompress(OutputStream decompressed, bool compress)
 2147    {
 12148        if (BZip2.isLoaded(_logger) && compress && decompressed.size() >= 100)
 2149        {
 2150            //
 2151            // Do compression.
 2152            //
 12153            Ice.Internal.Buffer cbuf = BZip2.compress(
 12154                decompressed.getBuffer(),
 12155                Protocol.headerSize,
 12156                _compressionLevel);
 12157            if (cbuf is not null)
 2158            {
 12159                var cstream = new OutputStream(new Internal.Buffer(cbuf, true), decompressed.getEncoding());
 2160
 2161                //
 2162                // Set compression status.
 2163                //
 12164                cstream.pos(9);
 12165                cstream.writeByte(2);
 2166
 2167                //
 2168                // Write the size of the compressed stream into the header.
 2169                //
 12170                cstream.pos(10);
 12171                cstream.writeInt(cstream.size());
 2172
 2173                //
 2174                // Write the compression status and size of the compressed stream into the header of the
 2175                // decompressed stream -- we need this to trace requests correctly.
 2176                //
 12177                decompressed.pos(9);
 12178                decompressed.writeByte(2);
 12179                decompressed.writeInt(cstream.size());
 2180
 12181                return cstream;
 2182            }
 2183        }
 2184
 2185        // Write the compression status. If BZip2 is loaded and compress is set to true, we write 1, to request a
 2186        // compressed reply. Otherwise, we write 0 either BZip2 is not loaded or we are sending an uncompressed reply.
 12187        decompressed.pos(9);
 12188        decompressed.writeByte((byte)((BZip2.isLoaded(_logger) && compress) ? 1 : 0));
 2189
 2190        //
 2191        // Not compressed, fill in the message size.
 2192        //
 12193        decompressed.pos(10);
 12194        decompressed.writeInt(decompressed.size());
 2195
 12196        return decompressed;
 2197    }
 2198
 2199    private struct MessageInfo
 2200    {
 2201        public InputStream stream;
 2202        public int requestCount;
 2203        public int requestId;
 2204        public byte compress;
 2205        public ObjectAdapter adapter;
 2206        public OutgoingAsyncBase outAsync;
 2207        public int upcallCount;
 2208    }
 2209
 2210    private int parseMessage(ref MessageInfo info)
 2211    {
 2212        Debug.Assert(_state > StateNotValidated && _state < StateClosed);
 2213
 12214        info.stream = new InputStream(_instance, Util.currentProtocolEncoding);
 12215        _readStream.swap(info.stream);
 12216        _readStream.resize(Protocol.headerSize);
 12217        _readStream.pos(0);
 12218        _readHeader = true;
 2219
 2220        Debug.Assert(info.stream.pos() == info.stream.size());
 2221
 2222        try
 2223        {
 2224            //
 2225            // The magic and version fields have already been checked.
 2226            //
 12227            info.stream.pos(8);
 12228            byte messageType = info.stream.readByte();
 12229            info.compress = info.stream.readByte();
 12230            if (info.compress == 2)
 2231            {
 12232                if (BZip2.isLoaded(_logger))
 2233                {
 12234                    Ice.Internal.Buffer ubuf = BZip2.decompress(
 12235                        info.stream.getBuffer(),
 12236                        Protocol.headerSize,
 12237                        _messageSizeMax);
 12238                    info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
 2239                }
 2240                else
 2241                {
 02242                    throw new FeatureNotSupportedException(
 02243                        "Cannot decompress compressed message: BZip2 library is not loaded.");
 2244                }
 2245            }
 12246            info.stream.pos(Protocol.headerSize);
 2247
 2248            switch (messageType)
 2249            {
 2250                case Protocol.closeConnectionMsg:
 2251                {
 12252                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12253                    if (_endpoint.datagram())
 2254                    {
 02255                        if (_warn)
 2256                        {
 02257                            _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
 2258                        }
 2259                    }
 2260                    else
 2261                    {
 12262                        setState(StateClosingPending, new CloseConnectionException());
 2263
 2264                        //
 2265                        // Notify the transceiver of the graceful connection closure.
 2266                        //
 12267                        int op = _transceiver.closing(false, _exception);
 12268                        if (op != 0)
 2269                        {
 12270                            scheduleCloseTimer();
 12271                            return op;
 2272                        }
 12273                        setState(StateClosed);
 2274                    }
 12275                    break;
 2276                }
 2277
 2278                case Protocol.requestMsg:
 2279                {
 12280                    if (_state >= StateClosing)
 2281                    {
 12282                        TraceUtil.trace(
 12283                            "received request during closing\n(ignored by server, client will retry)",
 12284                            info.stream,
 12285                            this,
 12286                            _logger,
 12287                            _traceLevels);
 2288                    }
 2289                    else
 2290                    {
 12291                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12292                        info.requestId = info.stream.readInt();
 12293                        info.requestCount = 1;
 12294                        info.adapter = _adapter;
 12295                        ++info.upcallCount;
 2296
 12297                        cancelInactivityTimer();
 12298                        ++_dispatchCount;
 2299                    }
 12300                    break;
 2301                }
 2302
 2303                case Protocol.requestBatchMsg:
 2304                {
 12305                    if (_state >= StateClosing)
 2306                    {
 02307                        TraceUtil.trace(
 02308                            "received batch request during closing\n(ignored by server, client will retry)",
 02309                            info.stream,
 02310                            this,
 02311                            _logger,
 02312                            _traceLevels);
 2313                    }
 2314                    else
 2315                    {
 12316                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12317                        int requestCount = info.stream.readInt();
 12318                        if (requestCount < 0)
 2319                        {
 02320                            throw new MarshalException($"Received batch request with {requestCount} batches.");
 2321                        }
 12322                        info.requestCount = requestCount;
 12323                        info.adapter = _adapter;
 12324                        info.upcallCount += info.requestCount;
 2325
 12326                        cancelInactivityTimer();
 12327                        _dispatchCount += info.requestCount;
 2328                    }
 12329                    break;
 2330                }
 2331
 2332                case Protocol.replyMsg:
 2333                {
 12334                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12335                    info.requestId = info.stream.readInt();
 12336                    if (_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
 2337                    {
 12338                        _asyncRequests.Remove(info.requestId);
 2339
 12340                        info.outAsync.getIs().swap(info.stream);
 2341
 2342                        //
 2343                        // If we just received the reply for a request which isn't acknowledge as
 2344                        // sent yet, we queue the reply instead of processing it right away. It
 2345                        // will be processed once the write callback is invoked for the message.
 2346                        //
 12347                        OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
 12348                        if (message is not null && message.outAsync == info.outAsync)
 2349                        {
 12350                            message.receivedReply = true;
 2351                        }
 12352                        else if (info.outAsync.response())
 2353                        {
 12354                            ++info.upcallCount;
 2355                        }
 2356                        else
 2357                        {
 12358                            info.outAsync = null;
 2359                        }
 12360                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 2361                        {
 12362                            doApplicationClose();
 2363                        }
 2364                    }
 12365                    break;
 2366                }
 2367
 2368                case Protocol.validateConnectionMsg:
 2369                {
 12370                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12371                    break;
 2372                }
 2373
 2374                default:
 2375                {
 02376                    TraceUtil.trace(
 02377                        "received unknown message\n(invalid, closing connection)",
 02378                        info.stream,
 02379                        this,
 02380                        _logger,
 02381                        _traceLevels);
 2382
 02383                    throw new ProtocolException($"Received Ice protocol message with unknown type: {messageType}");
 2384                }
 2385            }
 12386        }
 12387        catch (LocalException ex)
 2388        {
 12389            if (_endpoint.datagram())
 2390            {
 02391                if (_warn)
 2392                {
 02393                    _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
 2394                }
 2395            }
 2396            else
 2397            {
 12398                setState(StateClosed, ex);
 2399            }
 12400        }
 2401
 12402        if (_state == StateHolding)
 2403        {
 2404            // Don't continue reading if the connection is in the holding state.
 02405            return SocketOperation.None;
 2406        }
 12407        else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
 2408        {
 2409            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
 12410            _idleTimeoutTransceiver?.disableIdleCheck();
 12411            return SocketOperation.None;
 2412        }
 2413        else
 2414        {
 2415            // Continue reading.
 12416            return SocketOperation.Read;
 2417        }
 12418    }
 2419
 2420    private void dispatchAll(
 2421        InputStream stream,
 2422        int requestCount,
 2423        int requestId,
 2424        byte compress,
 2425        ObjectAdapter adapter)
 2426    {
 2427        // Note: In contrast to other private or protected methods, this method must be called *without* the mutex
 2428        // locked.
 2429
 12430        Object dispatcher = adapter?.dispatchPipeline;
 2431
 2432        try
 2433        {
 12434            while (requestCount > 0)
 2435            {
 2436                // adapter can be null here, however the adapter set in current can't be null, and we never pass
 2437                // a null current.adapter to the application code. Once this file enables nullable, adapter should be
 2438                // adapter! below.
 12439                var request = new IncomingRequest(requestId, this, adapter, stream);
 2440
 12441                if (dispatcher is not null)
 2442                {
 2443                    // We don't and can't await the dispatchAsync: with batch requests, we want all the dispatches to
 2444                    // execute in the current Ice thread pool thread. If we awaited the dispatchAsync, we could
 2445                    // switch to a .NET thread pool thread.
 12446                    _ = dispatchAsync(request);
 2447                }
 2448                else
 2449                {
 2450                    // Received request on a connection without an object adapter.
 12451                    sendResponse(
 12452                        request.current.createOutgoingResponse(new ObjectNotExistException()),
 12453                        isTwoWay: !_endpoint.datagram() && requestId != 0,
 12454                        compress: 0);
 2455                }
 12456                --requestCount;
 2457            }
 2458
 12459            stream.clear();
 12460        }
 02461        catch (LocalException ex) // TODO: catch all exceptions
 2462        {
 2463            // Typically, the IncomingRequest constructor throws an exception, and we can't continue.
 02464            dispatchException(ex, requestCount);
 02465        }
 2466
 2467        async Task dispatchAsync(IncomingRequest request)
 2468        {
 2469            try
 2470            {
 2471                OutgoingResponse response;
 2472
 2473                try
 2474                {
 12475                    response = await dispatcher.dispatchAsync(request).ConfigureAwait(false);
 12476                }
 12477                catch (System.Exception ex)
 2478                {
 12479                    response = request.current.createOutgoingResponse(ex);
 12480                }
 2481
 12482                sendResponse(response, isTwoWay: !_endpoint.datagram() && requestId != 0, compress);
 12483            }
 02484            catch (LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 2485            {
 02486                dispatchException(ex, requestCount: 1);
 02487            }
 12488        }
 12489    }
 2490
 2491    private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compress)
 2492    {
 12493        bool finished = false;
 2494        try
 2495        {
 12496            lock (_mutex)
 2497            {
 2498                Debug.Assert(_state > StateNotValidated);
 2499
 2500                try
 2501                {
 12502                    if (--_upcallCount == 0)
 2503                    {
 12504                        if (_state == StateFinished)
 2505                        {
 12506                            finished = true;
 12507                            _observer?.detach();
 2508                        }
 12509                        Monitor.PulseAll(_mutex);
 2510                    }
 2511
 12512                    if (_state >= StateClosed)
 2513                    {
 2514                        Debug.Assert(_exception is not null);
 12515                        throw _exception;
 2516                    }
 2517
 12518                    if (isTwoWay)
 2519                    {
 12520                        sendMessage(new OutgoingMessage(response.outputStream, compress > 0));
 2521                    }
 2522
 12523                    if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
 2524                    {
 2525                        // Resume reading if the connection is active and the dispatch count is about to be less than
 2526                        // _maxDispatches.
 12527                        _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
 12528                        _idleTimeoutTransceiver?.enableIdleCheck();
 2529                    }
 2530
 12531                    --_dispatchCount;
 2532
 12533                    if (_state == StateClosing && _upcallCount == 0)
 2534                    {
 12535                        initiateShutdown();
 2536                    }
 12537                }
 12538                catch (LocalException ex)
 2539                {
 12540                    setState(StateClosed, ex);
 12541                }
 2542            }
 2543        }
 2544        finally
 2545        {
 12546            if (finished && _removeFromFactory is not null)
 2547            {
 12548                _removeFromFactory(this);
 2549            }
 12550        }
 12551    }
 2552
 2553    private void dispatchException(LocalException ex, int requestCount)
 2554    {
 02555        bool finished = false;
 2556
 2557        // Fatal exception while dispatching a request. Since sendResponse isn't called in case of a fatal exception
 2558        // we decrement _upcallCount here.
 02559        lock (_mutex)
 2560        {
 02561            setState(StateClosed, ex);
 2562
 02563            if (requestCount > 0)
 2564            {
 2565                Debug.Assert(_upcallCount >= requestCount);
 02566                _upcallCount -= requestCount;
 02567                if (_upcallCount == 0)
 2568                {
 02569                    if (_state == StateFinished)
 2570                    {
 02571                        finished = true;
 02572                        _observer?.detach();
 2573                    }
 02574                    Monitor.PulseAll(_mutex);
 2575                }
 2576            }
 02577        }
 2578
 02579        if (finished && _removeFromFactory is not null)
 2580        {
 02581            _removeFromFactory(this);
 2582        }
 02583    }
 2584
 2585    private void inactivityCheck(System.Threading.Timer inactivityTimer)
 2586    {
 12587        lock (_mutex)
 2588        {
 2589            // If the timers are different, it means this inactivityTimer is no longer current.
 12590            if (inactivityTimer == _inactivityTimer)
 2591            {
 12592                _inactivityTimer = null;
 12593                inactivityTimer.Dispose(); // non-blocking
 2594
 12595                if (_state == StateActive)
 2596                {
 12597                    setState(
 12598                        StateClosing,
 12599                        new ConnectionClosedException(
 12600                            "Connection closed because it remained inactive for longer than the inactivity timeout.",
 12601                            closedByApplication: false));
 2602                }
 2603            }
 2604            // Else this timer was already canceled and disposed. Nothing to do.
 12605        }
 12606    }
 2607
 2608    private void connectTimedOut(System.Threading.Timer connectTimer)
 2609    {
 12610        lock (_mutex)
 2611        {
 12612            if (_state < StateActive)
 2613            {
 12614                setState(StateClosed, new ConnectTimeoutException());
 2615            }
 12616        }
 2617        // else ignore since we're already connected.
 12618        connectTimer.Dispose();
 12619    }
 2620
 2621    private void closeTimedOut(System.Threading.Timer closeTimer)
 2622    {
 12623        lock (_mutex)
 2624        {
 12625            if (_state < StateClosed)
 2626            {
 2627                // We don't use setState(state, exception) because we want to overwrite the exception set by a
 2628                // graceful closure.
 12629                _exception = new CloseTimeoutException();
 12630                setState(StateClosed);
 2631            }
 12632        }
 2633        // else ignore since we're already closed.
 12634        closeTimer.Dispose();
 12635    }
 2636
 2637    private ConnectionInfo initConnectionInfo()
 2638    {
 2639        // Called with _mutex locked.
 2640
 12641        if (_state > StateNotInitialized && _info is not null) // Update the connection info until it's initialized
 2642        {
 12643            return _info;
 2644        }
 2645
 12646        _info =
 12647            _transceiver.getInfo(incoming: _connector is null, _adapter?.getName() ?? "", _endpoint.connectionId());
 12648        return _info;
 2649    }
 2650
 02651    private void warning(string msg, System.Exception ex) => _logger.warning($"{msg}:\n{ex}\n{_transceiver}");
 2652
 2653    private void observerStartRead(Ice.Internal.Buffer buf)
 2654    {
 12655        if (_readStreamPos >= 0)
 2656        {
 2657            Debug.Assert(!buf.empty());
 12658            _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2659        }
 12660        _readStreamPos = buf.empty() ? -1 : buf.b.position();
 12661    }
 2662
 2663    private void observerFinishRead(Ice.Internal.Buffer buf)
 2664    {
 12665        if (_readStreamPos == -1)
 2666        {
 02667            return;
 2668        }
 2669        Debug.Assert(buf.b.position() >= _readStreamPos);
 12670        _observer.receivedBytes(buf.b.position() - _readStreamPos);
 12671        _readStreamPos = -1;
 12672    }
 2673
 2674    private void observerStartWrite(Ice.Internal.Buffer buf)
 2675    {
 12676        if (_writeStreamPos >= 0)
 2677        {
 2678            Debug.Assert(!buf.empty());
 12679            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2680        }
 12681        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
 12682    }
 2683
 2684    private void observerFinishWrite(Ice.Internal.Buffer buf)
 2685    {
 12686        if (_writeStreamPos == -1)
 2687        {
 12688            return;
 2689        }
 12690        if (buf.b.position() > _writeStreamPos)
 2691        {
 12692            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2693        }
 12694        _writeStreamPos = -1;
 12695    }
 2696
 2697    private int read(Ice.Internal.Buffer buf)
 2698    {
 12699        int start = buf.b.position();
 12700        int op = _transceiver.read(buf, ref _hasMoreData);
 12701        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2702        {
 12703            var s = new StringBuilder("received ");
 12704            if (_endpoint.datagram())
 2705            {
 02706                s.Append(buf.b.limit());
 2707            }
 2708            else
 2709            {
 12710                s.Append(buf.b.position() - start);
 12711                s.Append(" of ");
 12712                s.Append(buf.b.limit() - start);
 2713            }
 12714            s.Append(" bytes via ");
 12715            s.Append(_endpoint.protocol());
 12716            s.Append('\n');
 12717            s.Append(ToString());
 12718            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2719        }
 12720        return op;
 2721    }
 2722
 2723    private int write(Ice.Internal.Buffer buf)
 2724    {
 12725        int start = buf.b.position();
 12726        int op = _transceiver.write(buf);
 12727        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2728        {
 12729            var s = new StringBuilder("sent ");
 12730            s.Append(buf.b.position() - start);
 12731            if (!_endpoint.datagram())
 2732            {
 12733                s.Append(" of ");
 12734                s.Append(buf.b.limit() - start);
 2735            }
 12736            s.Append(" bytes via ");
 12737            s.Append(_endpoint.protocol());
 12738            s.Append('\n');
 12739            s.Append(ToString());
 12740            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2741        }
 12742        return op;
 2743    }
 2744
 2745    private void scheduleInactivityTimer()
 2746    {
 2747        // Called with the ConnectionI mutex locked.
 2748        Debug.Assert(_inactivityTimer is null);
 2749        Debug.Assert(_inactivityTimeout > TimeSpan.Zero);
 2750
 12751        _inactivityTimer = new System.Threading.Timer(
 12752            inactivityTimer => inactivityCheck((System.Threading.Timer)inactivityTimer));
 12753        _inactivityTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 12754    }
 2755
 2756    private void cancelInactivityTimer()
 2757    {
 2758        // Called with the ConnectionI mutex locked.
 12759        if (_inactivityTimer is not null)
 2760        {
 12761            _inactivityTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 12762            _inactivityTimer.Dispose();
 12763            _inactivityTimer = null;
 2764        }
 12765    }
 2766
 2767    private void scheduleCloseTimer()
 2768    {
 12769        if (_closeTimeout > TimeSpan.Zero)
 2770        {
 2771#pragma warning disable CA2000 // closeTimer is disposed by closeTimedOut.
 12772            var closeTimer = new System.Threading.Timer(
 12773                timerObj => closeTimedOut((System.Threading.Timer)timerObj));
 2774            // schedule timer to run once; closeTimedOut disposes the timer too.
 12775            closeTimer.Change(_closeTimeout, Timeout.InfiniteTimeSpan);
 2776#pragma warning restore CA2000
 2777        }
 12778    }
 2779
 2780    private void doApplicationClose()
 2781    {
 2782        // Called with the ConnectionI mutex locked.
 2783        Debug.Assert(_state < StateClosing);
 12784        setState(
 12785            StateClosing,
 12786            new ConnectionClosedException(
 12787                "The connection was closed gracefully by the application.",
 12788                closedByApplication: true));
 12789    }
 2790
 2791    private class OutgoingMessage
 2792    {
 12793        internal OutgoingMessage(OutputStream stream, bool compress)
 2794        {
 12795            this.stream = stream;
 12796            this.compress = compress;
 12797        }
 2798
 12799        internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
 2800        {
 12801            this.outAsync = outAsync;
 12802            this.stream = stream;
 12803            this.compress = compress;
 12804            this.requestId = requestId;
 12805        }
 2806
 2807        internal void canceled()
 2808        {
 2809            Debug.Assert(outAsync is not null); // Only requests can timeout.
 12810            outAsync = null;
 12811        }
 2812
 2813        internal bool sent()
 2814        {
 12815            stream = null;
 12816            if (outAsync is not null)
 2817            {
 12818                invokeSent = outAsync.sent();
 12819                return invokeSent || receivedReply;
 2820            }
 12821            return false;
 2822        }
 2823
 2824        internal void completed(LocalException ex)
 2825        {
 12826            if (outAsync is not null)
 2827            {
 12828                if (outAsync.exception(ex))
 2829                {
 12830                    outAsync.invokeException();
 2831                }
 2832            }
 12833            stream = null;
 12834        }
 2835
 2836        internal OutputStream stream;
 2837        internal OutgoingAsyncBase outAsync;
 2838        internal bool compress;
 2839        internal int requestId;
 2840        internal bool prepared;
 2841        internal bool isSent;
 2842        internal bool invokeSent;
 2843        internal bool receivedReply;
 2844    }
 2845
 12846    private static readonly ConnectionState[] connectionStateMap = [
 12847        ConnectionState.ConnectionStateValidating,   // StateNotInitialized
 12848        ConnectionState.ConnectionStateValidating,   // StateNotValidated
 12849        ConnectionState.ConnectionStateActive,       // StateActive
 12850        ConnectionState.ConnectionStateHolding,      // StateHolding
 12851        ConnectionState.ConnectionStateClosing,      // StateClosing
 12852        ConnectionState.ConnectionStateClosing,      // StateClosingPending
 12853        ConnectionState.ConnectionStateClosed,       // StateClosed
 12854        ConnectionState.ConnectionStateClosed,       // StateFinished
 12855    ];
 2856
 2857    private readonly Instance _instance;
 2858    private readonly Transceiver _transceiver;
 2859    private readonly IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
 2860
 2861    private string _desc;
 2862    private readonly string _type;
 2863    private readonly Connector _connector;
 2864    private readonly EndpointI _endpoint;
 2865
 2866    private ObjectAdapter _adapter;
 2867
 2868    private readonly Logger _logger;
 2869    private readonly TraceLevels _traceLevels;
 2870    private readonly Ice.Internal.ThreadPool _threadPool;
 2871
 2872    private readonly TimeSpan _connectTimeout;
 2873    private readonly TimeSpan _closeTimeout;
 2874    private TimeSpan _inactivityTimeout; // protected by _mutex
 2875
 2876    private System.Threading.Timer _inactivityTimer; // can be null
 2877
 2878    private StartCallback _startCallback;
 2879
 2880    // This action must be called outside the ConnectionI lock to avoid lock acquisition deadlocks.
 2881    private readonly Action<ConnectionI> _removeFromFactory;
 2882
 2883    private readonly bool _warn;
 2884    private readonly bool _warnUdp;
 2885
 2886    private readonly int _compressionLevel;
 2887
 2888    private int _nextRequestId;
 2889
 12890    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 2891
 2892    private LocalException _exception;
 2893
 2894    private readonly int _messageSizeMax;
 2895    private readonly BatchRequestQueue _batchRequestQueue;
 2896
 12897    private readonly LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
 2898
 2899    // Contains the message which is being received. If the connection is waiting to receive a message (_readHeader ==
 2900    // true), its size is Protocol.headerSize. Otherwise, its size is the message size specified in the received message
 2901    // header.
 2902    private readonly InputStream _readStream;
 2903
 2904    // When _readHeader is true, the next bytes we'll read are the header of a new message. When false, we're reading
 2905    // next the remainder of a message that was already partially received.
 2906    private bool _readHeader;
 2907
 2908    // Contains the message which is being sent. The write stream buffer is empty if no message is being sent.
 2909    private readonly OutputStream _writeStream;
 2910
 2911    private ConnectionObserver _observer;
 2912    private int _readStreamPos;
 2913    private int _writeStreamPos;
 2914
 2915    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent callbacks and
 2916    // connection establishment callbacks that have been started (or are about to be started) by a thread of the thread
 2917    // pool associated with this connection, and have not completed yet. All these operations except the connection
 2918    // establishment callbacks execute application code or code generated from Slice definitions.
 2919    private int _upcallCount;
 2920
 2921    // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
 2922    // _dispatchCount can be greater than a non-0 _maxDispatches when a receive a batch with multiples requests.
 2923    private int _dispatchCount;
 2924
 2925    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to back-pressure the peer.
 2926    // _maxDispatches <= 0 means no limit.
 2927    private readonly int _maxDispatches;
 2928
 2929    private int _state; // The current state.
 2930    private bool _shutdownInitiated;
 2931    private bool _initialized;
 2932    private bool _validated;
 2933
 2934    // When true, the application called close and Connection must close the connection when it receives the reply
 2935    // for the last outstanding invocation.
 2936    private bool _closeRequested;
 2937
 2938    private ConnectionInfo _info;
 2939
 2940    private CloseCallback _closeCallback;
 2941
 2942    // We need to run the continuation asynchronously since it can be completed by an Ice thread pool thread.
 12943    private readonly TaskCompletionSource _closed = new(TaskCreationOptions.RunContinuationsAsynchronously);
 12944    private readonly object _mutex = new();
 2945}

Methods/Properties

start(Ice.ConnectionI.StartCallback)
startAndWait()
activate()
hold()
destroy(int)
abort()
closeAsync()
isActiveOrHolding()
throwException()
waitUntilHolding()
waitUntilFinished()
updateObserver()
sendAsyncRequest(Ice.Internal.OutgoingAsyncBase, bool, bool, int)
getBatchRequestQueue()
flushBatchRequests(Ice.CompressBatch)
flushBatchRequestsAsync(Ice.CompressBatch, System.IProgress<bool>, System.Threading.CancellationToken)
disableInactivityCheck()
setCloseCallback(Ice.CloseCallback)
asyncRequestCanceled(Ice.Internal.OutgoingAsyncBase, Ice.LocalException)
endpoint()
connector()
setAdapter(Ice.ObjectAdapter)
getAdapter()
getEndpoint()
createProxy(Ice.Identity)
setAdapterFromAdapter(Ice.ObjectAdapter)
startAsync(int, Ice.Internal.AsyncCallback)
finishAsync(int)
message(Ice.Internal.ThreadPoolCurrent)
upcall(Ice.ConnectionI.StartCallback, System.Collections.Generic.Queue<Ice.ConnectionI.OutgoingMessage>, Ice.ConnectionI.MessageInfo)
finished(Ice.Internal.ThreadPoolCurrent)
finish()
ToString()
type()
getInfo()
setBufferSize(int, int)
exception(Ice.LocalException)
getThreadPool()
.ctor(Ice.Internal.Instance, Ice.Internal.Transceiver, Ice.Internal.Connector, Ice.Internal.EndpointI, Ice.ObjectAdapter, System.Action<Ice.ConnectionI>, Ice.ConnectionOptions)
idleCheck(System.TimeSpan)
sendHeartbeat()
isHeartbeat()
toConnectionState(int)
setState(int, Ice.LocalException)
setState(int)
initiateShutdown()
initialize(int)
validate(int)
sendNextMessage(out System.Collections.Generic.Queue<Ice.ConnectionI.OutgoingMessage>)
sendMessage(Ice.ConnectionI.OutgoingMessage)
doCompress(Ice.OutputStream, bool)
parseMessage(ref Ice.ConnectionI.MessageInfo)
dispatchAll(Ice.InputStream, int, int, byte, Ice.ObjectAdapter)
dispatchAsync()
sendResponse(Ice.OutgoingResponse, bool, byte)
dispatchException(Ice.LocalException, int)
inactivityCheck(System.Threading.Timer)
connectTimedOut(System.Threading.Timer)
closeTimedOut(System.Threading.Timer)
initConnectionInfo()
warning(string, System.Exception)
observerStartRead(Ice.Internal.Buffer)
observerFinishRead(Ice.Internal.Buffer)
observerStartWrite(Ice.Internal.Buffer)
observerFinishWrite(Ice.Internal.Buffer)
read(Ice.Internal.Buffer)
write(Ice.Internal.Buffer)
scheduleInactivityTimer()
cancelInactivityTimer()
scheduleCloseTimer()
doApplicationClose()
.ctor(Ice.OutputStream, bool)
.ctor(Ice.Internal.OutgoingAsyncBase, Ice.OutputStream, bool, int)
canceled()
sent()
completed(Ice.LocalException)
.cctor()