< Summary

Information
Class: Ice.ConnectionI
Assembly: Ice
File(s): /_/csharp/src/Ice/ConnectionI.cs
Tag: 91_21789722663
Line coverage
87%
Covered lines: 1090
Uncovered lines: 153
Coverable lines: 1243
Total lines: 2944
Line coverage: 87.6%
Branch coverage
83%
Covered branches: 605
Total branches: 728
Branch coverage: 83.1%
Method coverage
94%
Covered methods: 73
Total methods: 77
Method coverage: 94.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
start(...)87.5%8.01894.44%
startAndWait()30%27.81043.75%
activate()50%2.02283.33%
hold()50%2.02283.33%
destroy(...)50%44100%
abort()100%11100%
closeAsync()100%44100%
isActiveOrHolding()100%22100%
throwException()50%2.03280%
waitUntilHolding()100%44100%
waitUntilFinished()100%44100%
updateObserver()66.67%6.01692.86%
sendAsyncRequest(...)90%10.031093.33%
getBatchRequestQueue()100%11100%
flushBatchRequests(...)100%1.02175%
flushBatchRequestsAsync(...)100%11100%
disableInactivityCheck()100%11100%
setCloseCallback(...)25%7.73438.46%
asyncRequestCanceled(...)52.94%47.313477.42%
endpoint()100%11100%
connector()100%11100%
setAdapter(...)62.5%8.51880%
getAdapter()100%11100%
getEndpoint()100%11100%
createProxy(...)100%11100%
setAdapterFromAdapter(...)50%4.05485.71%
startAsync(...)50%2297.14%
finishAsync(...)100%2424100%
message(...)84.52%98.498487.29%
upcall(...)96.67%30.483091.89%
finished(...)100%88100%
finish()91.67%62.276091.43%
ToString()100%11100%
type()100%11100%
getInfo()100%22100%
setBufferSize(...)50%2.01285.71%
exception(...)100%11100%
getThreadPool()100%210%
.ctor(...)85.71%14.191490.16%
idleCheck(...)100%44100%
sendHeartbeat()88.46%26.932688.89%
isHeartbeat()100%210%
toConnectionState(...)100%11100%
setState(...)75%20.182092.31%
setState(...)88.57%83.627085.94%
initiateShutdown()100%88100%
initialize(...)100%22100%
validate(...)76%61.585083.33%
sendNextMessage(...)92.86%31.382883.72%
sendMessage(...)100%1010100%
doCompress(...)100%88100%
parseMessage(...)75%63.054478.57%
dispatchAll(...)87.5%8.42881.25%
dispatchAsync()100%2.08272.73%
sendResponse(...)96.15%2626100%
dispatchException(...)0%156120%
inactivityCheck(...)100%44100%
connectTimedOut(...)100%22100%
closeTimedOut(...)100%22100%
initConnectionInfo()100%88100%
warning(...)100%210%
observerStartRead(...)75%44100%
observerFinishRead(...)50%2.03280%
observerStartWrite(...)100%44100%
observerFinishWrite(...)100%44100%
read(...)83.33%6.01693.33%
write(...)100%66100%
scheduleInactivityTimer()100%11100%
cancelInactivityTimer()100%22100%
scheduleCloseTimer()100%22100%
doApplicationClose()100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
canceled()100%11100%
sent()100%44100%
completed(...)100%44100%
.cctor()100%11100%

File(s)

/_/csharp/src/Ice/ConnectionI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Ice.Instrumentation;
 4using Ice.Internal;
 5using System.Diagnostics;
 6using System.Text;
 7
 8namespace Ice;
 9
 10#pragma warning disable CA1001 // _inactivityTimer is disposed by cancelInactivityTimer.
 11public sealed class ConnectionI : Internal.EventHandler, CancellationHandler, Connection
 12#pragma warning restore CA1001
 13{
 14    internal interface StartCallback
 15    {
 16        void connectionStartCompleted(ConnectionI connection);
 17
 18        void connectionStartFailed(ConnectionI connection, LocalException ex);
 19    }
 20
 21    internal void start(StartCallback callback)
 22    {
 23        try
 24        {
 125            lock (_mutex)
 26            {
 27                //
 28                // The connection might already be closed if the communicator was destroyed.
 29                //
 130                if (_state >= StateClosed)
 31                {
 32                    Debug.Assert(_exception is not null);
 033                    throw _exception;
 34                }
 35
 136                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 37                {
 138                    if (_connectTimeout > TimeSpan.Zero)
 39                    {
 40#pragma warning disable CA2000 // connectTimer is disposed by connectTimedOut.
 141                        var connectTimer = new System.Threading.Timer(
 142                            timerObj => connectTimedOut((System.Threading.Timer)timerObj));
 43                        // schedule timer to run once; connectTimedOut disposes the timer too.
 144                        connectTimer.Change(_connectTimeout, Timeout.InfiniteTimeSpan);
 45#pragma warning restore CA2000
 46                    }
 47
 148                    _startCallback = callback;
 149                    return;
 50                }
 51
 52                // The connection starts in the holding state. It will be activated by the connection factory.
 153                setState(StateHolding);
 154            }
 155        }
 156        catch (LocalException ex)
 57        {
 158            exception(ex);
 159            callback.connectionStartFailed(this, _exception);
 160            return;
 61        }
 62
 163        callback.connectionStartCompleted(this);
 164    }
 65
 66    internal void startAndWait()
 67    {
 68        try
 69        {
 170            lock (_mutex)
 71            {
 72                //
 73                // The connection might already be closed if the communicator was destroyed.
 74                //
 175                if (_state >= StateClosed)
 76                {
 77                    Debug.Assert(_exception is not null);
 078                    throw _exception;
 79                }
 80
 181                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 82                {
 83                    //
 84                    // Wait for the connection to be validated.
 85                    //
 086                    while (_state <= StateNotValidated)
 87                    {
 088                        Monitor.Wait(_mutex);
 89                    }
 90
 091                    if (_state >= StateClosing)
 92                    {
 93                        Debug.Assert(_exception is not null);
 094                        throw _exception;
 95                    }
 96                }
 97
 98                //
 99                // We start out in holding state.
 100                //
 1101                setState(StateHolding);
 1102            }
 1103        }
 0104        catch (LocalException ex)
 105        {
 0106            exception(ex);
 0107            waitUntilFinished();
 0108            return;
 109        }
 1110    }
 111
 112    internal void activate()
 113    {
 1114        lock (_mutex)
 115        {
 1116            if (_state <= StateNotValidated)
 117            {
 0118                return;
 119            }
 120
 1121            setState(StateActive);
 1122        }
 1123    }
 124
 125    internal void hold()
 126    {
 1127        lock (_mutex)
 128        {
 1129            if (_state <= StateNotValidated)
 130            {
 0131                return;
 132            }
 133
 1134            setState(StateHolding);
 1135        }
 1136    }
 137
 138    // DestructionReason.
 139    public const int ObjectAdapterDeactivated = 0;
 140    public const int CommunicatorDestroyed = 1;
 141
 142    internal void destroy(int reason)
 143    {
 1144        lock (_mutex)
 145        {
 146            switch (reason)
 147            {
 148                case ObjectAdapterDeactivated:
 149                {
 1150                    setState(StateClosing, new ObjectAdapterDeactivatedException(_adapter?.getName() ?? ""));
 1151                    break;
 152                }
 153
 154                case CommunicatorDestroyed:
 155                {
 1156                    setState(StateClosing, new CommunicatorDestroyedException());
 1157                    break;
 158                }
 159            }
 160        }
 1161    }
 162
 163    public void abort()
 164    {
 1165        lock (_mutex)
 166        {
 1167            setState(
 1168                StateClosed,
 1169                new ConnectionAbortedException(
 1170                    "The connection was aborted by the application.",
 1171                    closedByApplication: true));
 1172        }
 1173    }
 174
 175    public Task closeAsync()
 176    {
 1177        lock (_mutex)
 178        {
 1179            if (_state < StateClosing)
 180            {
 1181                if (_asyncRequests.Count == 0)
 182                {
 1183                    doApplicationClose();
 184                }
 185                else
 186                {
 1187                    _closeRequested = true;
 1188                    scheduleCloseTimer(); // we don't wait forever for outstanding invocations to complete
 189                }
 190            }
 191            // else nothing to do, already closing or closed.
 1192        }
 193
 1194        return _closed.Task;
 195    }
 196
 197    internal bool isActiveOrHolding()
 198    {
 1199        lock (_mutex)
 200        {
 1201            return _state > StateNotValidated && _state < StateClosing;
 202        }
 1203    }
 204
 205    public void throwException()
 206    {
 1207        lock (_mutex)
 208        {
 1209            if (_exception is not null)
 210            {
 211                Debug.Assert(_state >= StateClosing);
 0212                throw _exception;
 213            }
 1214        }
 1215    }
 216
 217    internal void waitUntilHolding()
 218    {
 1219        lock (_mutex)
 220        {
 1221            while (_state < StateHolding || _upcallCount > 0)
 222            {
 1223                Monitor.Wait(_mutex);
 224            }
 1225        }
 1226    }
 227
 228    internal void waitUntilFinished()
 229    {
 1230        lock (_mutex)
 231        {
 232            //
 233            // We wait indefinitely until the connection is finished and all
 234            // outstanding requests are completed. Otherwise we couldn't
 235            // guarantee that there are no outstanding calls when deactivate()
 236            // is called on the servant locators.
 237            //
 1238            while (_state < StateFinished || _upcallCount > 0)
 239            {
 1240                Monitor.Wait(_mutex);
 241            }
 242
 243            Debug.Assert(_state == StateFinished);
 244
 245            //
 246            // Clear the OA. See bug 1673 for the details of why this is necessary.
 247            //
 1248            _adapter = null;
 1249        }
 1250    }
 251
 252    internal void updateObserver()
 253    {
 1254        lock (_mutex)
 255        {
 1256            if (_state < StateNotValidated || _state > StateClosed)
 257            {
 0258                return;
 259            }
 260
 261            Debug.Assert(_instance.initializationData().observer is not null);
 1262            _observer = _instance.initializationData().observer.getConnectionObserver(
 1263                initConnectionInfo(),
 1264                _endpoint,
 1265                toConnectionState(_state),
 1266                _observer);
 1267            if (_observer is not null)
 268            {
 1269                _observer.attach();
 270            }
 271            else
 272            {
 1273                _writeStreamPos = -1;
 1274                _readStreamPos = -1;
 275            }
 1276        }
 1277    }
 278
 279    internal int sendAsyncRequest(
 280        OutgoingAsyncBase og,
 281        bool compress,
 282        bool response,
 283        int batchRequestCount)
 284    {
 1285        OutputStream os = og.getOs();
 286
 1287        lock (_mutex)
 288        {
 289            //
 290            // If the exception is closed before we even have a chance
 291            // to send our request, we always try to send the request
 292            // again.
 293            //
 1294            if (_exception is not null)
 295            {
 1296                throw new RetryException(_exception);
 297            }
 298
 299            Debug.Assert(_state > StateNotValidated);
 300            Debug.Assert(_state < StateClosing);
 301
 302            //
 303            // Ensure the message isn't bigger than what we can send with the
 304            // transport.
 305            //
 1306            _transceiver.checkSendSize(os.getBuffer());
 307
 308            //
 309            // Notify the request that it's cancelable with this connection.
 310            // This will throw if the request is canceled.
 311            //
 1312            og.cancelable(this);
 1313            int requestId = 0;
 1314            if (response)
 315            {
 316                //
 317                // Create a new unique request ID.
 318                //
 1319                requestId = _nextRequestId++;
 1320                if (requestId <= 0)
 321                {
 0322                    _nextRequestId = 1;
 0323                    requestId = _nextRequestId++;
 324                }
 325
 326                //
 327                // Fill in the request ID.
 328                //
 1329                os.pos(Protocol.headerSize);
 1330                os.writeInt(requestId);
 331            }
 1332            else if (batchRequestCount > 0)
 333            {
 1334                os.pos(Protocol.headerSize);
 1335                os.writeInt(batchRequestCount);
 336            }
 337
 1338            og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
 339
 340            // We're just about to send a request, so we are not inactive anymore.
 1341            cancelInactivityTimer();
 342
 1343            int status = OutgoingAsyncBase.AsyncStatusQueued;
 344            try
 345            {
 1346                var message = new OutgoingMessage(og, os, compress, requestId);
 1347                status = sendMessage(message);
 1348            }
 1349            catch (LocalException ex)
 350            {
 1351                setState(StateClosed, ex);
 352                Debug.Assert(_exception is not null);
 1353                throw _exception;
 354            }
 355
 1356            if (response)
 357            {
 358                //
 359                // Add to the async requests map.
 360                //
 1361                _asyncRequests[requestId] = og;
 362            }
 1363            return status;
 364        }
 1365    }
 366
 1367    internal BatchRequestQueue getBatchRequestQueue() => _batchRequestQueue;
 368
 369    public void flushBatchRequests(CompressBatch compress)
 370    {
 371        try
 372        {
 1373            var completed = new FlushBatchTaskCompletionCallback();
 1374            var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 1375            outgoing.invoke(_flushBatchRequests_name, compress, true);
 1376            completed.Task.Wait();
 1377        }
 0378        catch (AggregateException ex)
 379        {
 0380            throw ex.InnerException;
 381        }
 1382    }
 383
 384    public Task flushBatchRequestsAsync(
 385        CompressBatch compress,
 386        IProgress<bool> progress = null,
 387        CancellationToken cancel = default)
 388    {
 1389        var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
 1390        var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 1391        outgoing.invoke(_flushBatchRequests_name, compress, false);
 1392        return completed.Task;
 393    }
 394
 395    private const string _flushBatchRequests_name = "flushBatchRequests";
 396
 397    public void disableInactivityCheck()
 398    {
 1399        lock (_mutex)
 400        {
 1401            cancelInactivityTimer();
 1402            _inactivityTimeout = TimeSpan.Zero;
 1403        }
 1404    }
 405
 406    public void setCloseCallback(CloseCallback callback)
 407    {
 1408        lock (_mutex)
 409        {
 1410            if (_state >= StateClosed)
 411            {
 0412                if (callback is not null)
 413                {
 0414                    _threadPool.execute(
 0415                        () =>
 0416                        {
 0417                            try
 0418                            {
 0419                                callback(this);
 0420                            }
 0421                            catch (System.Exception ex)
 0422                            {
 0423                                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 0424                            }
 0425                        },
 0426                        this);
 427                }
 428            }
 429            else
 430            {
 1431                _closeCallback = callback;
 432            }
 1433        }
 1434    }
 435
 436    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
 437    {
 438        //
 439        // NOTE: This isn't called from a thread pool thread.
 440        //
 441
 1442        lock (_mutex)
 443        {
 1444            if (_state >= StateClosed)
 445            {
 0446                return; // The request has already been or will be shortly notified of the failure.
 447            }
 448
 1449            OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
 1450            if (o is not null)
 451            {
 1452                if (o.requestId > 0)
 453                {
 1454                    _asyncRequests.Remove(o.requestId);
 455                }
 456
 1457                if (ex is ConnectionAbortedException)
 458                {
 0459                    setState(StateClosed, ex);
 460                }
 461                else
 462                {
 463                    //
 464                    // If the request is being sent, don't remove it from the send streams,
 465                    // it will be removed once the sending is finished.
 466                    //
 1467                    if (o == _sendStreams.First.Value)
 468                    {
 0469                        o.canceled();
 470                    }
 471                    else
 472                    {
 1473                        o.canceled();
 1474                        _sendStreams.Remove(o);
 475                    }
 1476                    if (outAsync.exception(ex))
 477                    {
 1478                        outAsync.invokeExceptionAsync();
 479                    }
 480                }
 481
 1482                if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 483                {
 0484                    doApplicationClose();
 485                }
 1486                return;
 487            }
 488
 1489            if (outAsync is OutgoingAsync)
 490            {
 1491                foreach (KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
 492                {
 1493                    if (kvp.Value == outAsync)
 494                    {
 1495                        if (ex is ConnectionAbortedException)
 496                        {
 0497                            setState(StateClosed, ex);
 498                        }
 499                        else
 500                        {
 1501                            _asyncRequests.Remove(kvp.Key);
 1502                            if (outAsync.exception(ex))
 503                            {
 1504                                outAsync.invokeExceptionAsync();
 505                            }
 506                        }
 507
 1508                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 509                        {
 0510                            doApplicationClose();
 511                        }
 1512                        return;
 513                    }
 514                }
 515            }
 0516        }
 1517    }
 518
 1519    internal EndpointI endpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 520
 1521    internal Connector connector() => _connector; // No mutex protection necessary, _endpoint is immutable.
 522
 523    public void setAdapter(ObjectAdapter adapter)
 524    {
 1525        if (_connector is null) // server connection
 526        {
 0527            throw new InvalidOperationException("setAdapter can only be called on a client connection");
 528        }
 529
 1530        if (adapter is not null)
 531        {
 532            // Go through the adapter to set the adapter and servant manager on this connection
 533            // to ensure the object adapter is still active.
 1534            adapter.setAdapterOnConnection(this);
 535        }
 536        else
 537        {
 1538            lock (_mutex)
 539            {
 1540                if (_state <= StateNotValidated || _state >= StateClosing)
 541                {
 0542                    return;
 543                }
 1544                _adapter = null;
 1545            }
 546        }
 547
 548        //
 549        // We never change the thread pool with which we were initially
 550        // registered, even if we add or remove an object adapter.
 551        //
 1552    }
 553
 554    public ObjectAdapter getAdapter()
 555    {
 1556        lock (_mutex)
 557        {
 1558            return _adapter;
 559        }
 1560    }
 561
 1562    public Endpoint getEndpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 563
 564    public ObjectPrx createProxy(Identity id)
 565    {
 1566        ObjectAdapter.checkIdentity(id);
 1567        return new ObjectPrxHelper(_instance.referenceFactory().create(id, this));
 568    }
 569
 570    public void setAdapterFromAdapter(ObjectAdapter adapter)
 571    {
 1572        lock (_mutex)
 573        {
 1574            if (_state <= StateNotValidated || _state >= StateClosing)
 575            {
 0576                return;
 577            }
 578            Debug.Assert(adapter is not null); // Called by ObjectAdapter::setAdapterOnConnection
 1579            _adapter = adapter;
 580
 581            // Clear cached connection info (if any) as it's no longer accurate.
 1582            _info = null;
 1583        }
 1584    }
 585
 586    //
 587    // Operations from EventHandler
 588    //
 589    public override bool startAsync(int operation, Ice.Internal.AsyncCallback completedCallback)
 590    {
 1591        if (_state >= StateClosed)
 592        {
 0593            return false;
 594        }
 595
 596        // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
 597        // Ice thread pool thread is terminated (.NET Socket read/write fail with a SocketError.OperationAborted
 598        // error if started from a thread which is later terminated).
 1599        Task.Run(() =>
 1600        {
 1601            lock (_mutex)
 1602            {
 1603                if (_state >= StateClosed)
 1604                {
 1605                    completedCallback(this);
 1606                    return;
 1607                }
 1608
 1609                try
 1610                {
 1611                    if ((operation & SocketOperation.Write) != 0)
 1612                    {
 1613                        if (_observer != null)
 1614                        {
 1615                            observerStartWrite(_writeStream.getBuffer());
 1616                        }
 1617
 1618                        bool completedSynchronously =
 1619                            _transceiver.startWrite(
 1620                                _writeStream.getBuffer(),
 1621                                completedCallback,
 1622                                this,
 1623                                out bool messageWritten);
 1624
 1625                        // If the startWrite call wrote the message, we assume the message is sent now for at-most-once
 1626                        // semantics in the event the connection is closed while the message is still in _sendStreams.
 1627                        if (messageWritten && _sendStreams.Count > 0)
 1628                        {
 1629                            // See finish() code.
 1630                            _sendStreams.First.Value.isSent = true;
 1631                        }
 1632
 1633                        if (completedSynchronously)
 1634                        {
 1635                            // If the write completed synchronously, we need to call the completedCallback.
 1636                            completedCallback(this);
 1637                        }
 1638                    }
 1639                    else if ((operation & SocketOperation.Read) != 0)
 1640                    {
 1641                        if (_observer != null && !_readHeader)
 1642                        {
 1643                            observerStartRead(_readStream.getBuffer());
 1644                        }
 1645
 1646                        if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
 1647                        {
 1648                            completedCallback(this);
 1649                        }
 1650                    }
 1651                }
 1652                catch (LocalException ex)
 1653                {
 1654                    setState(StateClosed, ex);
 1655                    completedCallback(this);
 1656                }
 1657            }
 1658        });
 659
 1660        return true;
 661    }
 662
 663    public override bool finishAsync(int operation)
 664    {
 1665        if (_state >= StateClosed)
 666        {
 1667            return false;
 668        }
 669
 670        try
 671        {
 1672            if ((operation & SocketOperation.Write) != 0)
 673            {
 1674                Ice.Internal.Buffer buf = _writeStream.getBuffer();
 1675                int start = buf.b.position();
 1676                _transceiver.finishWrite(buf);
 1677                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 678                {
 1679                    var s = new StringBuilder("sent ");
 1680                    s.Append(buf.b.position() - start);
 1681                    if (!_endpoint.datagram())
 682                    {
 1683                        s.Append(" of ");
 1684                        s.Append(buf.b.limit() - start);
 685                    }
 1686                    s.Append(" bytes via ");
 1687                    s.Append(_endpoint.protocol());
 1688                    s.Append('\n');
 1689                    s.Append(ToString());
 1690                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 691                }
 692
 1693                if (_observer is not null)
 694                {
 1695                    observerFinishWrite(_writeStream.getBuffer());
 696                }
 697            }
 1698            else if ((operation & SocketOperation.Read) != 0)
 699            {
 1700                Ice.Internal.Buffer buf = _readStream.getBuffer();
 1701                int start = buf.b.position();
 1702                _transceiver.finishRead(buf);
 1703                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 704                {
 1705                    var s = new StringBuilder("received ");
 1706                    if (_endpoint.datagram())
 707                    {
 1708                        s.Append(buf.b.limit());
 709                    }
 710                    else
 711                    {
 1712                        s.Append(buf.b.position() - start);
 1713                        s.Append(" of ");
 1714                        s.Append(buf.b.limit() - start);
 715                    }
 1716                    s.Append(" bytes via ");
 1717                    s.Append(_endpoint.protocol());
 1718                    s.Append('\n');
 1719                    s.Append(ToString());
 1720                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 721                }
 722
 1723                if (_observer is not null && !_readHeader)
 724                {
 1725                    observerFinishRead(_readStream.getBuffer());
 726                }
 727            }
 1728        }
 1729        catch (LocalException ex)
 730        {
 1731            setState(StateClosed, ex);
 1732        }
 1733        return _state < StateClosed;
 734    }
 735
 736    public override void message(ThreadPoolCurrent current)
 737    {
 1738        StartCallback startCB = null;
 1739        Queue<OutgoingMessage> sentCBs = null;
 1740        var info = new MessageInfo();
 1741        int upcallCount = 0;
 742
 1743        using var msg = new ThreadPoolMessage(current, _mutex);
 1744        lock (_mutex)
 745        {
 746            try
 747            {
 1748                if (!msg.startIOScope())
 749                {
 1750                    return;
 751                }
 752
 1753                if (_state >= StateClosed)
 754                {
 0755                    return;
 756                }
 757
 758                try
 759                {
 1760                    int writeOp = SocketOperation.None;
 1761                    int readOp = SocketOperation.None;
 762
 763                    // If writes are ready, write the data from the connection's write buffer (_writeStream)
 1764                    if ((current.operation & SocketOperation.Write) != 0)
 765                    {
 1766                        if (_observer is not null)
 767                        {
 1768                            observerStartWrite(_writeStream.getBuffer());
 769                        }
 1770                        writeOp = write(_writeStream.getBuffer());
 1771                        if (_observer is not null && (writeOp & SocketOperation.Write) == 0)
 772                        {
 1773                            observerFinishWrite(_writeStream.getBuffer());
 774                        }
 775                    }
 776
 777                    // If reads are ready, read the data into the connection's read buffer (_readStream). The data is
 778                    // read until:
 779                    // - the full message is read (the transport read returns SocketOperationNone) and
 780                    //   the read buffer is fully filled
 781                    // - the read operation on the transport can't continue without blocking
 1782                    if ((current.operation & SocketOperation.Read) != 0)
 783                    {
 784                        while (true)
 785                        {
 1786                            Ice.Internal.Buffer buf = _readStream.getBuffer();
 787
 1788                            if (_observer is not null && !_readHeader)
 789                            {
 1790                                observerStartRead(buf);
 791                            }
 792
 1793                            readOp = read(buf);
 1794                            if ((readOp & SocketOperation.Read) != 0)
 795                            {
 796                                // Can't continue without blocking, exit out of the loop.
 797                                break;
 798                            }
 1799                            if (_observer is not null && !_readHeader)
 800                            {
 801                                Debug.Assert(!buf.b.hasRemaining());
 1802                                observerFinishRead(buf);
 803                            }
 804
 805                            // If read header is true, we're reading a new Ice protocol message and we need to read
 806                            // the message header.
 1807                            if (_readHeader)
 808                            {
 809                                // The next read will read the remainder of the message.
 1810                                _readHeader = false;
 811
 1812                                _observer?.receivedBytes(Protocol.headerSize);
 813
 814                                //
 815                                // Connection is validated on first message. This is only used by
 816                                // setState() to check whether or not we can print a connection
 817                                // warning (a client might close the connection forcefully if the
 818                                // connection isn't validated, we don't want to print a warning
 819                                // in this case).
 820                                //
 1821                                _validated = true;
 822
 823                                // Full header should be read because the size of _readStream is always headerSize (14)
 824                                // when reading a new message (see the code that sets _readHeader = true).
 1825                                int pos = _readStream.pos();
 1826                                if (pos < Protocol.headerSize)
 827                                {
 828                                    //
 829                                    // This situation is possible for small UDP packets.
 830                                    //
 0831                                    throw new MarshalException("Received Ice message with too few bytes in header.");
 832                                }
 833
 834                                // Decode the header.
 1835                                _readStream.pos(0);
 1836                                byte[] m = new byte[4];
 1837                                m[0] = _readStream.readByte();
 1838                                m[1] = _readStream.readByte();
 1839                                m[2] = _readStream.readByte();
 1840                                m[3] = _readStream.readByte();
 1841                                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 1842                                m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 843                                {
 0844                                    throw new ProtocolException(
 0845                                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 846                                }
 847
 1848                                var pv = new ProtocolVersion(_readStream);
 1849                                if (pv != Protocol.currentProtocol)
 850                                {
 0851                                    throw new MarshalException(
 0852                                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 853                                }
 1854                                var ev = new EncodingVersion(_readStream);
 1855                                if (ev != Protocol.currentProtocolEncoding)
 856                                {
 0857                                    throw new MarshalException(
 0858                                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 859                                }
 860
 1861                                _readStream.readByte(); // messageType
 1862                                _readStream.readByte(); // compress
 1863                                int size = _readStream.readInt();
 1864                                if (size < Protocol.headerSize)
 865                                {
 0866                                    throw new MarshalException($"Received Ice message with unexpected size {size}.");
 867                                }
 868
 869                                // Resize the read buffer to the message size.
 1870                                if (size > _messageSizeMax)
 871                                {
 1872                                    Ex.throwMemoryLimitException(size, _messageSizeMax);
 873                                }
 1874                                if (size > _readStream.size())
 875                                {
 1876                                    _readStream.resize(size);
 877                                }
 1878                                _readStream.pos(pos);
 879                            }
 880
 1881                            if (buf.b.hasRemaining())
 882                            {
 1883                                if (_endpoint.datagram())
 884                                {
 1885                                    throw new DatagramLimitException(); // The message was truncated.
 886                                }
 887                                continue;
 888                            }
 889                            break;
 890                        }
 891                    }
 892
 893                    // readOp and writeOp are set to the operations that the transport read or write calls from above
 894                    // returned. They indicate which operations will need to be monitored by the thread pool's selector
 895                    // when this method returns.
 1896                    int newOp = readOp | writeOp;
 897
 898                    // Operations that are ready. For example, if message was called with SocketOperationRead and the
 899                    // transport read returned SocketOperationNone, reads are considered done: there's no additional
 900                    // data to read.
 1901                    int readyOp = current.operation & ~newOp;
 902
 1903                    if (_state <= StateNotValidated)
 904                    {
 905                        // If the connection is still not validated and there's still data to read or write, continue
 906                        // waiting for data to read or write.
 1907                        if (newOp != 0)
 908                        {
 1909                            _threadPool.update(this, current.operation, newOp);
 1910                            return;
 911                        }
 912
 913                        // Initialize the connection if it's not initialized yet.
 1914                        if (_state == StateNotInitialized && !initialize(current.operation))
 915                        {
 1916                            return;
 917                        }
 918
 919                        // Validate the connection if it's not validated yet.
 1920                        if (_state <= StateNotValidated && !validate(current.operation))
 921                        {
 1922                            return;
 923                        }
 924
 925                        // The connection is validated and doesn't need additional data to be read or written. So
 926                        // unregister it from the thread pool's selector.
 1927                        _threadPool.unregister(this, current.operation);
 928
 929                        //
 930                        // We start out in holding state.
 931                        //
 1932                        setState(StateHolding);
 1933                        if (_startCallback is not null)
 934                        {
 1935                            startCB = _startCallback;
 1936                            _startCallback = null;
 1937                            if (startCB is not null)
 938                            {
 1939                                ++upcallCount;
 940                            }
 941                        }
 942                    }
 943                    else
 944                    {
 945                        Debug.Assert(_state <= StateClosingPending);
 946
 947                        //
 948                        // We parse messages first, if we receive a close
 949                        // connection message we won't send more messages.
 950                        //
 1951                        if ((readyOp & SocketOperation.Read) != 0)
 952                        {
 953                            // At this point, the protocol message is fully read and can therefore be decoded by
 954                            // parseMessage. parseMessage returns the operation to wait for readiness next.
 1955                            newOp |= parseMessage(ref info);
 1956                            upcallCount += info.upcallCount;
 957                        }
 958
 1959                        if ((readyOp & SocketOperation.Write) != 0)
 960                        {
 961                            // At this point the message from _writeStream is fully written and the next message can be
 962                            // written.
 963
 1964                            newOp |= sendNextMessage(out sentCBs);
 1965                            if (sentCBs is not null)
 966                            {
 1967                                ++upcallCount;
 968                            }
 969                        }
 970
 971                        // If the connection is not closed yet, we can update the thread pool selector to wait for
 972                        // readiness of read, write or both operations.
 1973                        if (_state < StateClosed)
 974                        {
 1975                            _threadPool.update(this, current.operation, newOp);
 976                        }
 977                    }
 978
 1979                    if (upcallCount == 0)
 980                    {
 1981                        return; // Nothing to execute, we're done!
 982                    }
 983
 1984                    _upcallCount += upcallCount;
 985
 986                    // There's something to execute so we mark IO as completed to elect a new leader thread and let IO
 987                    // be performed on this new leader thread while this thread continues with executing the upcalls.
 1988                    msg.ioCompleted();
 1989                }
 1990                catch (DatagramLimitException) // Expected.
 991                {
 1992                    if (_warnUdp)
 993                    {
 0994                        _logger.warning($"maximum datagram size of {_readStream.pos()} exceeded");
 995                    }
 1996                    _readStream.resize(Protocol.headerSize);
 1997                    _readStream.pos(0);
 1998                    _readHeader = true;
 1999                    return;
 1000                }
 11001                catch (SocketException ex)
 1002                {
 11003                    setState(StateClosed, ex);
 11004                    return;
 1005                }
 11006                catch (LocalException ex)
 1007                {
 11008                    if (_endpoint.datagram())
 1009                    {
 01010                        if (_warn)
 1011                        {
 01012                            _logger.warning($"datagram connection exception:\n{ex}\n{_desc}");
 1013                        }
 01014                        _readStream.resize(Protocol.headerSize);
 01015                        _readStream.pos(0);
 01016                        _readHeader = true;
 1017                    }
 1018                    else
 1019                    {
 11020                        setState(StateClosed, ex);
 1021                    }
 11022                    return;
 1023                }
 1024            }
 1025            finally
 1026            {
 11027                msg.finishIOScope();
 11028            }
 1029        }
 1030
 11031        _threadPool.executeFromThisThread(() => upcall(startCB, sentCBs, info), this);
 11032    }
 1033
 1034    private void upcall(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
 1035    {
 11036        int completedUpcallCount = 0;
 1037
 1038        //
 1039        // Notify the factory that the connection establishment and
 1040        // validation has completed.
 1041        //
 11042        if (startCB is not null)
 1043        {
 11044            startCB.connectionStartCompleted(this);
 11045            ++completedUpcallCount;
 1046        }
 1047
 1048        //
 1049        // Notify AMI calls that the message was sent.
 1050        //
 11051        if (sentCBs is not null)
 1052        {
 11053            foreach (OutgoingMessage m in sentCBs)
 1054            {
 11055                if (m.invokeSent)
 1056                {
 11057                    m.outAsync.invokeSent();
 1058                }
 11059                if (m.receivedReply)
 1060                {
 11061                    var outAsync = (OutgoingAsync)m.outAsync;
 11062                    if (outAsync.response())
 1063                    {
 11064                        outAsync.invokeResponse();
 1065                    }
 1066                }
 1067            }
 11068            ++completedUpcallCount;
 1069        }
 1070
 1071        //
 1072        // Asynchronous replies must be handled outside the thread
 1073        // synchronization, so that nested calls are possible.
 1074        //
 11075        if (info.outAsync is not null)
 1076        {
 11077            info.outAsync.invokeResponse();
 11078            ++completedUpcallCount;
 1079        }
 1080
 1081        //
 1082        // Method invocation (or multiple invocations for batch messages)
 1083        // must be done outside the thread synchronization, so that nested
 1084        // calls are possible.
 1085        //
 11086        if (info.requestCount > 0)
 1087        {
 11088            dispatchAll(info.stream, info.requestCount, info.requestId, info.compress, info.adapter);
 1089        }
 1090
 1091        //
 1092        // Decrease the upcall count.
 1093        //
 11094        bool finished = false;
 11095        if (completedUpcallCount > 0)
 1096        {
 11097            lock (_mutex)
 1098            {
 11099                _upcallCount -= completedUpcallCount;
 11100                if (_upcallCount == 0)
 1101                {
 1102                    // Only initiate shutdown if not already initiated. It might have already been initiated if the sent
 1103                    // callback or AMI callback was called when the connection was in the closing state.
 11104                    if (_state == StateClosing)
 1105                    {
 1106                        try
 1107                        {
 11108                            initiateShutdown();
 11109                        }
 01110                        catch (Ice.LocalException ex)
 1111                        {
 01112                            setState(StateClosed, ex);
 01113                        }
 1114                    }
 11115                    else if (_state == StateFinished)
 1116                    {
 11117                        finished = true;
 11118                        _observer?.detach();
 1119                    }
 11120                    Monitor.PulseAll(_mutex);
 1121                }
 11122            }
 1123        }
 1124
 11125        if (finished && _removeFromFactory is not null)
 1126        {
 11127            _removeFromFactory(this);
 1128        }
 11129    }
 1130
 1131    public override void finished(ThreadPoolCurrent current)
 1132    {
 1133        // Lock the connection here to ensure setState() completes before the code below is executed. This method can
 1134        // be called by the thread pool as soon as setState() calls _threadPool->finish(...). There's no need to lock
 1135        // the mutex for the remainder of the code because the data members accessed by finish() are immutable once
 1136        // _state == StateClosed (and we don't want to hold the mutex when calling upcalls).
 11137        lock (_mutex)
 1138        {
 1139            Debug.Assert(_state == StateClosed);
 11140        }
 1141
 1142        //
 1143        // If there are no callbacks to call, we don't call ioCompleted() since we're not going
 1144        // to call code that will potentially block (this avoids promoting a new leader and
 1145        // unnecessary thread creation, especially if this is called on shutdown).
 1146        //
 11147        if (_startCallback is null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback is null)
 1148        {
 11149            finish();
 11150            return;
 1151        }
 1152
 11153        current.ioCompleted();
 11154        _threadPool.executeFromThisThread(finish, this);
 11155    }
 1156
 1157    private void finish()
 1158    {
 11159        if (!_initialized)
 1160        {
 11161            if (_instance.traceLevels().network >= 2)
 1162            {
 11163                var s = new StringBuilder("failed to ");
 11164                s.Append(_connector is not null ? "establish" : "accept");
 11165                s.Append(' ');
 11166                s.Append(_endpoint.protocol());
 11167                s.Append(" connection\n");
 11168                s.Append(ToString());
 11169                s.Append('\n');
 11170                s.Append(_exception);
 11171                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1172            }
 1173        }
 1174        else
 1175        {
 11176            if (_instance.traceLevels().network >= 1)
 1177            {
 11178                var s = new StringBuilder("closed ");
 11179                s.Append(_endpoint.protocol());
 11180                s.Append(" connection\n");
 11181                s.Append(ToString());
 1182
 1183                // Trace the cause of most connection closures.
 11184                if (!(_exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException))
 1185                {
 11186                    s.Append('\n');
 11187                    s.Append(_exception);
 1188                }
 1189
 11190                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1191            }
 1192        }
 1193
 11194        _startCallback?.connectionStartFailed(this, _exception);
 11195        _startCallback = null;
 1196
 11197        if (_sendStreams.Count > 0)
 1198        {
 11199            if (!_writeStream.isEmpty())
 1200            {
 1201                //
 1202                // Return the stream to the outgoing call. This is important for
 1203                // retriable AMI calls which are not marshaled again.
 1204                //
 11205                OutgoingMessage message = _sendStreams.First.Value;
 11206                _writeStream.swap(message.stream);
 1207
 1208                //
 1209                // The current message might be sent but not yet removed from _sendStreams. If
 1210                // the response has been received in the meantime, we remove the message from
 1211                // _sendStreams to not call finished on a message which is already done.
 1212                //
 11213                if (message.isSent || message.receivedReply)
 1214                {
 11215                    if (message.sent() && message.invokeSent)
 1216                    {
 11217                        message.outAsync.invokeSent();
 1218                    }
 11219                    if (message.receivedReply)
 1220                    {
 01221                        var outAsync = (OutgoingAsync)message.outAsync;
 01222                        if (outAsync.response())
 1223                        {
 01224                            outAsync.invokeResponse();
 1225                        }
 1226                    }
 11227                    _sendStreams.RemoveFirst();
 1228                }
 1229            }
 1230
 11231            foreach (OutgoingMessage o in _sendStreams)
 1232            {
 11233                o.completed(_exception);
 11234                if (o.requestId > 0) // Make sure finished isn't called twice.
 1235                {
 11236                    _asyncRequests.Remove(o.requestId);
 1237                }
 1238            }
 11239            _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
 1240        }
 1241
 11242        foreach (OutgoingAsyncBase o in _asyncRequests.Values)
 1243        {
 11244            if (o.exception(_exception))
 1245            {
 11246                o.invokeException();
 1247            }
 1248        }
 11249        _asyncRequests.Clear();
 1250
 1251        //
 1252        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
 1253        //
 11254        _writeStream.clear();
 11255        _writeStream.getBuffer().clear();
 11256        _readStream.clear();
 11257        _readStream.getBuffer().clear();
 1258
 11259        if (_exception is ConnectionClosedException or
 11260            CloseConnectionException or
 11261            CommunicatorDestroyedException or
 11262            ObjectAdapterDeactivatedException)
 1263        {
 1264            // Can execute synchronously. Note that we're not within a lock(this) here.
 11265            _closed.SetResult();
 1266        }
 1267        else
 1268        {
 1269            Debug.Assert(_exception is not null);
 11270            _closed.SetException(_exception);
 1271        }
 1272
 11273        if (_closeCallback is not null)
 1274        {
 1275            try
 1276            {
 11277                _closeCallback(this);
 11278            }
 01279            catch (System.Exception ex)
 1280            {
 01281                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 01282            }
 11283            _closeCallback = null;
 1284        }
 1285
 1286        //
 1287        // This must be done last as this will cause waitUntilFinished() to return (and communicator
 1288        // objects such as the timer might be destroyed too).
 1289        //
 11290        bool finished = false;
 11291        lock (_mutex)
 1292        {
 11293            setState(StateFinished);
 1294
 11295            if (_upcallCount == 0)
 1296            {
 11297                finished = true;
 11298                _observer?.detach();
 1299            }
 11300        }
 1301
 11302        if (finished && _removeFromFactory is not null)
 1303        {
 11304            _removeFromFactory(this);
 1305        }
 11306    }
 1307
 1308    /// <inheritdoc/>
 11309    public override string ToString() => _desc; // No mutex lock, _desc is immutable.
 1310
 1311    /// <inheritdoc/>
 11312    public string type() => _type; // No mutex lock, _type is immutable.
 1313
 1314    /// <inheritdoc/>
 1315    public ConnectionInfo getInfo()
 1316    {
 11317        lock (_mutex)
 1318        {
 11319            if (_state >= StateClosed)
 1320            {
 11321                throw _exception;
 1322            }
 11323            return initConnectionInfo();
 1324        }
 11325    }
 1326
 1327    /// <inheritdoc/>
 1328    public void setBufferSize(int rcvSize, int sndSize)
 1329    {
 11330        lock (_mutex)
 1331        {
 11332            if (_state >= StateClosed)
 1333            {
 01334                throw _exception;
 1335            }
 11336            _transceiver.setBufferSize(rcvSize, sndSize);
 11337            _info = null; // Invalidate the cached connection info
 11338        }
 11339    }
 1340
 1341    public void exception(LocalException ex)
 1342    {
 11343        lock (_mutex)
 1344        {
 11345            setState(StateClosed, ex);
 11346        }
 11347    }
 1348
 01349    public Ice.Internal.ThreadPool getThreadPool() => _threadPool;
 1350
 11351    internal ConnectionI(
 11352        Instance instance,
 11353        Transceiver transceiver,
 11354        Connector connector, // null for incoming connections, non-null for outgoing connections
 11355        EndpointI endpoint,
 11356        ObjectAdapter adapter,
 11357        Action<ConnectionI> removeFromFactory, // can be null
 11358        ConnectionOptions options)
 1359    {
 11360        _instance = instance;
 11361        _desc = transceiver.ToString();
 11362        _type = transceiver.protocol();
 11363        _connector = connector;
 11364        _endpoint = endpoint;
 11365        _adapter = adapter;
 11366        InitializationData initData = instance.initializationData();
 11367        _logger = initData.logger; // Cached for better performance.
 11368        _traceLevels = instance.traceLevels(); // Cached for better performance.
 11369        _connectTimeout = options.connectTimeout;
 11370        _closeTimeout = options.closeTimeout; // not used for datagram connections
 1371        // suppress inactivity timeout for datagram connections
 11372        _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
 11373        _maxDispatches = options.maxDispatches;
 11374        _removeFromFactory = removeFromFactory;
 11375        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 11376        _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
 11377        _nextRequestId = 1;
 11378        _messageSizeMax = connector is null ? adapter.messageSizeMax() : instance.messageSizeMax();
 11379        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
 11380        _readStream = new InputStream(instance, Protocol.currentProtocolEncoding);
 11381        _readHeader = false;
 11382        _readStreamPos = -1;
 11383        _writeStream = new OutputStream(); // temporary stream
 11384        _writeStreamPos = -1;
 11385        _upcallCount = 0;
 11386        _state = StateNotInitialized;
 1387
 11388        _compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
 11389        if (_compressionLevel < 1)
 1390        {
 01391            _compressionLevel = 1;
 1392        }
 11393        else if (_compressionLevel > 9)
 1394        {
 01395            _compressionLevel = 9;
 1396        }
 1397
 11398        if (options.idleTimeout > TimeSpan.Zero && !endpoint.datagram())
 1399        {
 11400            _idleTimeoutTransceiver = new IdleTimeoutTransceiverDecorator(
 11401                transceiver,
 11402                this,
 11403                options.idleTimeout,
 11404                options.enableIdleCheck);
 11405            transceiver = _idleTimeoutTransceiver;
 1406        }
 11407        _transceiver = transceiver;
 1408
 1409        try
 1410        {
 11411            if (connector is null)
 1412            {
 1413                // adapter is always set for incoming connections
 1414                Debug.Assert(adapter is not null);
 11415                _threadPool = adapter.getThreadPool();
 1416            }
 1417            else
 1418            {
 1419                // we use the client thread pool for outgoing connections, even if there is an
 1420                // object adapter with its own thread pool.
 11421                _threadPool = instance.clientThreadPool();
 1422            }
 11423            _threadPool.initialize(this);
 11424        }
 01425        catch (LocalException)
 1426        {
 01427            throw;
 1428        }
 01429        catch (System.Exception ex)
 1430        {
 01431            throw new SyscallException(ex);
 1432        }
 11433    }
 1434
 1435    /// <summary>
 1436    /// Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active and
 1437    /// does not receive a byte for some time. See the IdleTimeoutTransceiverDecorator.
 1438    /// </summary>
 1439    internal void idleCheck(TimeSpan idleTimeout)
 1440    {
 11441        lock (_mutex)
 1442        {
 11443            if (_state == StateActive && _idleTimeoutTransceiver!.idleCheckEnabled)
 1444            {
 11445                int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;
 1446
 11447                setState(
 11448                    StateClosed,
 11449                    new ConnectionAbortedException(
 11450                        $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSe
 11451                        closedByApplication: false));
 1452            }
 1453            // else nothing to do
 11454        }
 11455    }
 1456
 1457    internal void sendHeartbeat()
 1458    {
 1459        Debug.Assert(!_endpoint.datagram());
 1460
 11461        lock (_mutex)
 1462        {
 11463            if (_state == StateActive || _state == StateHolding || _state == StateClosing)
 1464            {
 1465                // We check if the connection has become inactive.
 11466                if (
 11467                    _inactivityTimer is null &&           // timer not already scheduled
 11468                    _inactivityTimeout > TimeSpan.Zero && // inactivity timeout is enabled
 11469                    _state == StateActive &&              // only schedule the timer if the connection is active
 11470                    _dispatchCount == 0 &&                // no pending dispatch
 11471                    _asyncRequests.Count == 0 &&          // no pending invocation
 11472                    _readHeader &&                        // we're not waiting for the remainder of an incoming message
 11473                    _sendStreams.Count <= 1)              // there is at most one pending outgoing message
 1474                {
 1475                    // We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
 1476                    // inactivity timer if there is no pending outgoing message or the pending outgoing message is a
 1477                    // heartbeat.
 1478
 1479                    // The stream of the first _sendStreams message is in _writeStream.
 11480                    if (_sendStreams.Count == 0 || isHeartbeat(_writeStream))
 1481                    {
 11482                        scheduleInactivityTimer();
 1483                    }
 1484                }
 1485
 1486                // We send a heartbeat to the peer to generate a "write" on the connection. This write in turns creates
 1487                // a read on the peer, and resets the peer's idle check timer. When _sendStream is not empty, there is
 1488                // already an outstanding write, so we don't need to send a heartbeat. It's possible the first message
 1489                // of _sendStreams was already sent but not yet removed from _sendStreams: it means the last write
 1490                // occurred very recently, which is good enough with respect to the idle check.
 1491                // As a result of this optimization, the only possible heartbeat in _sendStreams is the first
 1492                // _sendStreams message.
 11493                if (_sendStreams.Count == 0)
 1494                {
 11495                    var os = new OutputStream(Protocol.currentProtocolEncoding);
 11496                    os.writeBlob(Protocol.magic);
 11497                    ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 11498                    EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 11499                    os.writeByte(Protocol.validateConnectionMsg);
 11500                    os.writeByte(0);
 11501                    os.writeInt(Protocol.headerSize); // Message size.
 1502                    try
 1503                    {
 11504                        _ = sendMessage(new OutgoingMessage(os, compress: false));
 11505                    }
 01506                    catch (LocalException ex)
 1507                    {
 01508                        setState(StateClosed, ex);
 01509                    }
 1510                }
 1511            }
 1512            // else nothing to do
 11513        }
 1514
 1515        static bool isHeartbeat(OutputStream stream) =>
 01516            stream.getBuffer().b.get(8) == Protocol.validateConnectionMsg;
 11517    }
 1518
 1519    private const int StateNotInitialized = 0;
 1520    private const int StateNotValidated = 1;
 1521    private const int StateActive = 2;
 1522    private const int StateHolding = 3;
 1523    private const int StateClosing = 4;
 1524    private const int StateClosingPending = 5;
 1525    private const int StateClosed = 6;
 1526    private const int StateFinished = 7;
 1527
 11528    private static ConnectionState toConnectionState(int state) => connectionStateMap[state];
 1529
 1530    private void setState(int state, LocalException ex)
 1531    {
 1532        //
 1533        // If setState() is called with an exception, then only closed
 1534        // and closing states are permissible.
 1535        //
 1536        Debug.Assert(state >= StateClosing);
 1537
 11538        if (_state == state) // Don't switch twice.
 1539        {
 11540            return;
 1541        }
 1542
 11543        if (_exception is null)
 1544        {
 1545            //
 1546            // If we are in closed state, an exception must be set.
 1547            //
 1548            Debug.Assert(_state != StateClosed);
 1549
 11550            _exception = ex;
 1551
 1552            //
 1553            // We don't warn if we are not validated.
 1554            //
 11555            if (_warn && _validated)
 1556            {
 1557                //
 1558                // Don't warn about certain expected exceptions.
 1559                //
 11560                if (!(_exception is CloseConnectionException ||
 11561                     _exception is ConnectionClosedException ||
 11562                     _exception is CommunicatorDestroyedException ||
 11563                     _exception is ObjectAdapterDeactivatedException ||
 11564                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1565                {
 01566                    warning("connection exception", _exception);
 1567                }
 1568            }
 1569        }
 1570
 1571        //
 1572        // We must set the new state before we notify requests of any
 1573        // exceptions. Otherwise new requests may retry on a
 1574        // connection that is not yet marked as closed or closing.
 1575        //
 11576        setState(state);
 11577    }
 1578
 1579    private void setState(int state)
 1580    {
 1581        //
 1582        // We don't want to send close connection messages if the endpoint
 1583        // only supports oneway transmission from client to server.
 1584        //
 11585        if (_endpoint.datagram() && state == StateClosing)
 1586        {
 11587            state = StateClosed;
 1588        }
 1589
 1590        //
 1591        // Skip graceful shutdown if we are destroyed before validation.
 1592        //
 11593        if (_state <= StateNotValidated && state == StateClosing)
 1594        {
 11595            state = StateClosed;
 1596        }
 1597
 11598        if (_state == state) // Don't switch twice.
 1599        {
 01600            return;
 1601        }
 1602
 11603        if (state > StateActive)
 1604        {
 1605            // Dispose the inactivity timer, if not null.
 11606            cancelInactivityTimer();
 1607        }
 1608
 1609        try
 1610        {
 1611            switch (state)
 1612            {
 1613                case StateNotInitialized:
 1614                {
 1615                    Debug.Assert(false);
 1616                    break;
 1617                }
 1618
 1619                case StateNotValidated:
 1620                {
 11621                    if (_state != StateNotInitialized)
 1622                    {
 1623                        Debug.Assert(_state == StateClosed);
 01624                        return;
 1625                    }
 1626                    break;
 1627                }
 1628
 1629                case StateActive:
 1630                {
 1631                    //
 1632                    // Can only switch to active from holding or not validated.
 1633                    //
 11634                    if (_state != StateHolding && _state != StateNotValidated)
 1635                    {
 01636                        return;
 1637                    }
 1638
 11639                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)
 1640                    {
 11641                        _threadPool.register(this, SocketOperation.Read);
 11642                        _idleTimeoutTransceiver?.enableIdleCheck();
 1643                    }
 1644                    // else don't resume reading since we're at or over the _maxDispatches limit.
 1645
 11646                    break;
 1647                }
 1648
 1649                case StateHolding:
 1650                {
 1651                    //
 1652                    // Can only switch to holding from active or not validated.
 1653                    //
 11654                    if (_state != StateActive && _state != StateNotValidated)
 1655                    {
 11656                        return;
 1657                    }
 1658
 11659                    if (_state == StateActive && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches))
 1660                    {
 11661                        _threadPool.unregister(this, SocketOperation.Read);
 11662                        _idleTimeoutTransceiver?.disableIdleCheck();
 1663                    }
 1664                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
 1665
 11666                    break;
 1667                }
 1668
 1669                case StateClosing:
 1670                case StateClosingPending:
 1671                {
 1672                    //
 1673                    // Can't change back from closing pending.
 1674                    //
 11675                    if (_state >= StateClosingPending)
 1676                    {
 11677                        return;
 1678                    }
 1679                    break;
 1680                }
 1681
 1682                case StateClosed:
 1683                {
 11684                    if (_state == StateFinished)
 1685                    {
 11686                        return;
 1687                    }
 1688
 11689                    _batchRequestQueue.destroy(_exception);
 11690                    _threadPool.finish(this);
 11691                    _transceiver.close();
 11692                    break;
 1693                }
 1694
 1695                case StateFinished:
 1696                {
 1697                    Debug.Assert(_state == StateClosed);
 11698                    _transceiver.destroy();
 1699                    break;
 1700                }
 1701            }
 11702        }
 01703        catch (LocalException ex)
 1704        {
 01705            _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
 01706        }
 1707
 11708        if (_instance.initializationData().observer is not null)
 1709        {
 11710            ConnectionState oldState = toConnectionState(_state);
 11711            ConnectionState newState = toConnectionState(state);
 11712            if (oldState != newState)
 1713            {
 11714                _observer = _instance.initializationData().observer.getConnectionObserver(
 11715                    initConnectionInfo(),
 11716                    _endpoint,
 11717                    newState,
 11718                    _observer);
 11719                if (_observer is not null)
 1720                {
 11721                    _observer.attach();
 1722                }
 1723                else
 1724                {
 11725                    _writeStreamPos = -1;
 11726                    _readStreamPos = -1;
 1727                }
 1728            }
 11729            if (_observer is not null && state == StateClosed && _exception is not null)
 1730            {
 11731                if (!(_exception is CloseConnectionException ||
 11732                     _exception is ConnectionClosedException ||
 11733                     _exception is CommunicatorDestroyedException ||
 11734                     _exception is ObjectAdapterDeactivatedException ||
 11735                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1736                {
 11737                    _observer.failed(_exception.ice_id());
 1738                }
 1739            }
 1740        }
 11741        _state = state;
 1742
 11743        Monitor.PulseAll(_mutex);
 1744
 11745        if (_state == StateClosing && _upcallCount == 0)
 1746        {
 1747            try
 1748            {
 11749                initiateShutdown();
 11750            }
 01751            catch (LocalException ex)
 1752            {
 01753                setState(StateClosed, ex);
 01754            }
 1755        }
 11756    }
 1757
 1758    private void initiateShutdown()
 1759    {
 1760        Debug.Assert(_state == StateClosing && _upcallCount == 0);
 1761
 11762        if (_shutdownInitiated)
 1763        {
 11764            return;
 1765        }
 11766        _shutdownInitiated = true;
 1767
 11768        if (!_endpoint.datagram())
 1769        {
 1770            //
 1771            // Before we shut down, we send a close connection message.
 1772            //
 11773            var os = new OutputStream(Protocol.currentProtocolEncoding);
 11774            os.writeBlob(Protocol.magic);
 11775            ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 11776            EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 11777            os.writeByte(Protocol.closeConnectionMsg);
 11778            os.writeByte(0); // Compression status: always zero for close connection.
 11779            os.writeInt(Protocol.headerSize); // Message size.
 1780
 11781            scheduleCloseTimer();
 1782
 11783            if ((sendMessage(new OutgoingMessage(os, compress: false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
 1784            {
 11785                setState(StateClosingPending);
 1786
 1787                //
 1788                // Notify the transceiver of the graceful connection closure.
 1789                //
 11790                int op = _transceiver.closing(true, _exception);
 11791                if (op != 0)
 1792                {
 11793                    _threadPool.register(this, op);
 1794                }
 1795            }
 1796        }
 11797    }
 1798
 1799    private bool initialize(int operation)
 1800    {
 11801        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
 11802        if (s != SocketOperation.None)
 1803        {
 11804            _threadPool.update(this, operation, s);
 11805            return false;
 1806        }
 1807
 1808        //
 1809        // Update the connection description once the transceiver is initialized.
 1810        //
 11811        _desc = _transceiver.ToString();
 11812        _initialized = true;
 11813        setState(StateNotValidated);
 1814
 11815        return true;
 1816    }
 1817
 1818    private bool validate(int operation)
 1819    {
 11820        if (!_endpoint.datagram()) // Datagram connections are always implicitly validated.
 1821        {
 11822            if (_connector is null) // The server side has the active role for connection validation.
 1823            {
 11824                if (_writeStream.size() == 0)
 1825                {
 11826                    _writeStream.writeBlob(Protocol.magic);
 11827                    ProtocolVersion.ice_write(_writeStream, Protocol.currentProtocol);
 11828                    EncodingVersion.ice_write(_writeStream, Protocol.currentProtocolEncoding);
 11829                    _writeStream.writeByte(Protocol.validateConnectionMsg);
 11830                    _writeStream.writeByte(0); // Compression status (always zero for validate connection).
 11831                    _writeStream.writeInt(Protocol.headerSize); // Message size.
 11832                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
 11833                    _writeStream.prepareWrite();
 1834                }
 1835
 11836                if (_observer is not null)
 1837                {
 01838                    observerStartWrite(_writeStream.getBuffer());
 1839                }
 1840
 11841                if (_writeStream.pos() != _writeStream.size())
 1842                {
 11843                    int op = write(_writeStream.getBuffer());
 11844                    if (op != 0)
 1845                    {
 11846                        _threadPool.update(this, operation, op);
 11847                        return false;
 1848                    }
 1849                }
 1850
 11851                if (_observer is not null)
 1852                {
 01853                    observerFinishWrite(_writeStream.getBuffer());
 1854                }
 1855            }
 1856            else // The client side has the passive role for connection validation.
 1857            {
 11858                if (_readStream.size() == 0)
 1859                {
 11860                    _readStream.resize(Protocol.headerSize);
 11861                    _readStream.pos(0);
 1862                }
 1863
 11864                if (_observer is not null)
 1865                {
 01866                    observerStartRead(_readStream.getBuffer());
 1867                }
 1868
 11869                if (_readStream.pos() != _readStream.size())
 1870                {
 11871                    int op = read(_readStream.getBuffer());
 11872                    if (op != 0)
 1873                    {
 11874                        _threadPool.update(this, operation, op);
 11875                        return false;
 1876                    }
 1877                }
 1878
 11879                if (_observer is not null)
 1880                {
 01881                    observerFinishRead(_readStream.getBuffer());
 1882                }
 1883
 11884                _validated = true;
 1885
 1886                Debug.Assert(_readStream.pos() == Protocol.headerSize);
 11887                _readStream.pos(0);
 11888                byte[] m = _readStream.readBlob(4);
 11889                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 11890                   m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 1891                {
 01892                    throw new ProtocolException(
 01893                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 1894                }
 1895
 11896                var pv = new ProtocolVersion(_readStream);
 11897                if (pv != Protocol.currentProtocol)
 1898                {
 01899                    throw new MarshalException(
 01900                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 1901                }
 11902                var ev = new EncodingVersion(_readStream);
 11903                if (ev != Protocol.currentProtocolEncoding)
 1904                {
 01905                    throw new MarshalException(
 01906                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 1907                }
 1908
 11909                byte messageType = _readStream.readByte();
 11910                if (messageType != Protocol.validateConnectionMsg)
 1911                {
 01912                    throw new ProtocolException(
 01913                        $"Received message of type {messageType} over a connection that is not yet validated.");
 1914                }
 11915                _readStream.readByte(); // Ignore compression status for validate connection.
 11916                int size = _readStream.readInt();
 11917                if (size != Protocol.headerSize)
 1918                {
 01919                    throw new MarshalException($"Received ValidateConnection message with unexpected size {size}.");
 1920                }
 11921                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);
 1922
 1923                // Client connection starts sending heartbeats once it's received the ValidateConnection message.
 11924                _idleTimeoutTransceiver?.scheduleHeartbeat();
 1925            }
 1926        }
 1927
 11928        _writeStream.resize(0);
 11929        _writeStream.pos(0);
 1930
 11931        _readStream.resize(Protocol.headerSize);
 11932        _readStream.pos(0);
 11933        _readHeader = true;
 1934
 11935        if (_instance.traceLevels().network >= 1)
 1936        {
 11937            var s = new StringBuilder();
 11938            if (_endpoint.datagram())
 1939            {
 11940                s.Append("starting to ");
 11941                s.Append(_connector is not null ? "send" : "receive");
 11942                s.Append(' ');
 11943                s.Append(_endpoint.protocol());
 11944                s.Append(" messages\n");
 11945                s.Append(_transceiver.toDetailedString());
 1946            }
 1947            else
 1948            {
 11949                s.Append(_connector is not null ? "established" : "accepted");
 11950                s.Append(' ');
 11951                s.Append(_endpoint.protocol());
 11952                s.Append(" connection\n");
 11953                s.Append(ToString());
 1954            }
 11955            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1956        }
 1957
 11958        return true;
 1959    }
 1960
 1961    /// <summary>
 1962    /// Sends the next queued messages. This method is called by message() once the message which is being sent
 1963    /// (_sendStreams.First) is fully sent. Before sending the next message, this message is removed from _sendsStream
 1964    /// If any, its sent callback is also queued in given callback queue.
 1965    /// </summary>
 1966    /// <param name="callbacks">The sent callbacks to call for the messages that were sent.</param>
 1967    /// <returns>The socket operation to register with the thread pool's selector to send the remainder of the pending
 1968    /// message being sent (_sendStreams.First).</returns>
 1969    private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
 1970    {
 11971        callbacks = null;
 1972
 11973        if (_sendStreams.Count == 0)
 1974        {
 1975            // This can occur if no message was being written and the socket write operation was registered with the
 1976            // thread pool (a transceiver read method can request writing data).
 11977            return SocketOperation.None;
 1978        }
 11979        else if (_state == StateClosingPending && _writeStream.pos() == 0)
 1980        {
 1981            // Message wasn't sent, empty the _writeStream, we're not going to send more data because the connection
 1982            // is being closed.
 01983            OutgoingMessage message = _sendStreams.First.Value;
 01984            _writeStream.swap(message.stream);
 01985            return SocketOperation.None;
 1986        }
 1987
 1988        // Assert that the message was fully written.
 1989        Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
 1990
 1991        try
 1992        {
 11993            while (true)
 1994            {
 1995                //
 1996                // The message that was being sent is sent. We can swap back the write stream buffer to the
 1997                // outgoing message (required for retry) and queue its sent callback (if any).
 1998                //
 11999                OutgoingMessage message = _sendStreams.First.Value;
 12000                _writeStream.swap(message.stream);
 12001                if (message.sent())
 2002                {
 12003                    callbacks ??= new Queue<OutgoingMessage>();
 12004                    callbacks.Enqueue(message);
 2005                }
 12006                _sendStreams.RemoveFirst();
 2007
 2008                //
 2009                // If there's nothing left to send, we're done.
 2010                //
 12011                if (_sendStreams.Count == 0)
 2012                {
 2013                    break;
 2014                }
 2015
 2016                //
 2017                // If we are in the closed state or if the close is pending, don't continue sending. This can occur if
 2018                // parseMessage (called before sendNextMessage by message()) closes the connection.
 2019                //
 12020                if (_state >= StateClosingPending)
 2021                {
 02022                    return SocketOperation.None;
 2023                }
 2024
 2025                //
 2026                // Otherwise, prepare the next message.
 2027                //
 12028                message = _sendStreams.First.Value;
 2029                Debug.Assert(!message.prepared);
 12030                OutputStream stream = message.stream;
 2031
 12032                message.stream = doCompress(message.stream, message.compress);
 12033                message.stream.prepareWrite();
 12034                message.prepared = true;
 2035
 12036                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2037
 2038                //
 2039                // Send the message.
 2040                //
 12041                _writeStream.swap(message.stream);
 12042                if (_observer is not null)
 2043                {
 12044                    observerStartWrite(_writeStream.getBuffer());
 2045                }
 12046                if (_writeStream.pos() != _writeStream.size())
 2047                {
 12048                    int op = write(_writeStream.getBuffer());
 12049                    if (op != 0)
 2050                    {
 12051                        return op;
 2052                    }
 2053                }
 12054                if (_observer is not null)
 2055                {
 12056                    observerFinishWrite(_writeStream.getBuffer());
 2057                }
 2058
 2059                // If the message was sent right away, loop to send the next queued message.
 2060            }
 2061
 2062            // Once the CloseConnection message is sent, we transition to the StateClosingPending state.
 12063            if (_state == StateClosing && _shutdownInitiated)
 2064            {
 12065                setState(StateClosingPending);
 12066                int op = _transceiver.closing(true, _exception);
 12067                if (op != 0)
 2068                {
 12069                    return op;
 2070                }
 2071            }
 12072        }
 02073        catch (LocalException ex)
 2074        {
 02075            setState(StateClosed, ex);
 02076        }
 12077        return SocketOperation.None;
 12078    }
 2079
 2080    /// <summary>
 2081    /// Sends or queues the given message.
 2082    /// </summary>
 2083    /// <param name="message">The message to send.</param>
 2084    /// <returns>The send status.</returns>
 2085    private int sendMessage(OutgoingMessage message)
 2086    {
 2087        Debug.Assert(_state >= StateActive);
 2088        Debug.Assert(_state < StateClosed);
 2089
 2090        // Some messages are queued for sending. Just adds the message to the send queue and tell the caller that
 2091        // the message was queued.
 12092        if (_sendStreams.Count > 0)
 2093        {
 12094            _sendStreams.AddLast(message);
 12095            return OutgoingAsyncBase.AsyncStatusQueued;
 2096        }
 2097
 2098        // Prepare the message for sending.
 2099        Debug.Assert(!message.prepared);
 2100
 12101        OutputStream stream = message.stream;
 2102
 12103        message.stream = doCompress(stream, message.compress);
 12104        message.stream.prepareWrite();
 12105        message.prepared = true;
 2106
 12107        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2108
 2109        // Send the message without blocking.
 12110        if (_observer is not null)
 2111        {
 12112            observerStartWrite(message.stream.getBuffer());
 2113        }
 12114        int op = write(message.stream.getBuffer());
 12115        if (op == 0)
 2116        {
 2117            // The message was sent so we're done.
 2118
 12119            if (_observer is not null)
 2120            {
 12121                observerFinishWrite(message.stream.getBuffer());
 2122            }
 2123
 12124            int status = OutgoingAsyncBase.AsyncStatusSent;
 12125            if (message.sent())
 2126            {
 2127                // If there's a sent callback, indicate the caller that it should invoke the sent callback.
 12128                status |= OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
 2129            }
 2130
 12131            return status;
 2132        }
 2133
 2134        // The message couldn't be sent right away so we add it to the send stream queue (which is empty) and swap its
 2135        // stream with `_writeStream`. The socket operation returned by the transceiver write is registered with the
 2136        // thread pool. At this point the message() method will take care of sending the whole message (held by
 2137        // _writeStream) when the transceiver is ready to write more of the message buffer.
 2138
 12139        _writeStream.swap(message.stream);
 12140        _sendStreams.AddLast(message);
 12141        _threadPool.register(this, op);
 12142        return OutgoingAsyncBase.AsyncStatusQueued;
 2143    }
 2144
 2145    private OutputStream doCompress(OutputStream decompressed, bool compress)
 2146    {
 12147        if (BZip2.isLoaded(_logger) && compress && decompressed.size() >= 100)
 2148        {
 2149            //
 2150            // Do compression.
 2151            //
 12152            Ice.Internal.Buffer cbuf = BZip2.compress(
 12153                decompressed.getBuffer(),
 12154                Protocol.headerSize,
 12155                _compressionLevel);
 12156            if (cbuf is not null)
 2157            {
 12158                var cstream = new OutputStream(new Internal.Buffer(cbuf, true), decompressed.getEncoding());
 2159
 2160                //
 2161                // Set compression status.
 2162                //
 12163                cstream.pos(9);
 12164                cstream.writeByte(2);
 2165
 2166                //
 2167                // Write the size of the compressed stream into the header.
 2168                //
 12169                cstream.pos(10);
 12170                cstream.writeInt(cstream.size());
 2171
 2172                //
 2173                // Write the compression status and size of the compressed stream into the header of the
 2174                // decompressed stream -- we need this to trace requests correctly.
 2175                //
 12176                decompressed.pos(9);
 12177                decompressed.writeByte(2);
 12178                decompressed.writeInt(cstream.size());
 2179
 12180                return cstream;
 2181            }
 2182        }
 2183
 2184        // Write the compression status. If BZip2 is loaded and compress is set to true, we write 1, to request a
 2185        // compressed reply. Otherwise, we write 0 either BZip2 is not loaded or we are sending an uncompressed reply.
 12186        decompressed.pos(9);
 12187        decompressed.writeByte((byte)((BZip2.isLoaded(_logger) && compress) ? 1 : 0));
 2188
 2189        //
 2190        // Not compressed, fill in the message size.
 2191        //
 12192        decompressed.pos(10);
 12193        decompressed.writeInt(decompressed.size());
 2194
 12195        return decompressed;
 2196    }
 2197
 2198    private struct MessageInfo
 2199    {
 2200        public InputStream stream;
 2201        public int requestCount;
 2202        public int requestId;
 2203        public byte compress;
 2204        public ObjectAdapter adapter;
 2205        public OutgoingAsyncBase outAsync;
 2206        public int upcallCount;
 2207    }
 2208
 2209    private int parseMessage(ref MessageInfo info)
 2210    {
 2211        Debug.Assert(_state > StateNotValidated && _state < StateClosed);
 2212
 12213        info.stream = new InputStream(_instance, Protocol.currentProtocolEncoding);
 12214        _readStream.swap(info.stream);
 12215        _readStream.resize(Protocol.headerSize);
 12216        _readStream.pos(0);
 12217        _readHeader = true;
 2218
 2219        Debug.Assert(info.stream.pos() == info.stream.size());
 2220
 2221        try
 2222        {
 2223            //
 2224            // The magic and version fields have already been checked.
 2225            //
 12226            info.stream.pos(8);
 12227            byte messageType = info.stream.readByte();
 12228            info.compress = info.stream.readByte();
 12229            if (info.compress == 2)
 2230            {
 12231                if (BZip2.isLoaded(_logger))
 2232                {
 12233                    Ice.Internal.Buffer ubuf = BZip2.decompress(
 12234                        info.stream.getBuffer(),
 12235                        Protocol.headerSize,
 12236                        _messageSizeMax);
 12237                    info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
 2238                }
 2239                else
 2240                {
 02241                    throw new FeatureNotSupportedException(
 02242                        "Cannot decompress compressed message: BZip2 library is not loaded.");
 2243                }
 2244            }
 12245            info.stream.pos(Protocol.headerSize);
 2246
 2247            switch (messageType)
 2248            {
 2249                case Protocol.closeConnectionMsg:
 2250                {
 12251                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12252                    if (_endpoint.datagram())
 2253                    {
 02254                        if (_warn)
 2255                        {
 02256                            _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
 2257                        }
 2258                    }
 2259                    else
 2260                    {
 12261                        setState(StateClosingPending, new CloseConnectionException());
 2262
 2263                        //
 2264                        // Notify the transceiver of the graceful connection closure.
 2265                        //
 12266                        int op = _transceiver.closing(false, _exception);
 12267                        if (op != 0)
 2268                        {
 12269                            scheduleCloseTimer();
 12270                            return op;
 2271                        }
 12272                        setState(StateClosed);
 2273                    }
 12274                    break;
 2275                }
 2276
 2277                case Protocol.requestMsg:
 2278                {
 12279                    if (_state >= StateClosing)
 2280                    {
 12281                        TraceUtil.trace(
 12282                            "received request during closing\n(ignored by server, client will retry)",
 12283                            info.stream,
 12284                            this,
 12285                            _logger,
 12286                            _traceLevels);
 2287                    }
 2288                    else
 2289                    {
 12290                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12291                        info.requestId = info.stream.readInt();
 12292                        info.requestCount = 1;
 12293                        info.adapter = _adapter;
 12294                        ++info.upcallCount;
 2295
 12296                        cancelInactivityTimer();
 12297                        ++_dispatchCount;
 2298                    }
 12299                    break;
 2300                }
 2301
 2302                case Protocol.requestBatchMsg:
 2303                {
 12304                    if (_state >= StateClosing)
 2305                    {
 02306                        TraceUtil.trace(
 02307                            "received batch request during closing\n(ignored by server, client will retry)",
 02308                            info.stream,
 02309                            this,
 02310                            _logger,
 02311                            _traceLevels);
 2312                    }
 2313                    else
 2314                    {
 12315                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12316                        int requestCount = info.stream.readInt();
 12317                        if (requestCount < 0)
 2318                        {
 02319                            throw new MarshalException($"Received batch request with {requestCount} batches.");
 2320                        }
 12321                        info.requestCount = requestCount;
 12322                        info.adapter = _adapter;
 12323                        info.upcallCount += info.requestCount;
 2324
 12325                        cancelInactivityTimer();
 12326                        _dispatchCount += info.requestCount;
 2327                    }
 12328                    break;
 2329                }
 2330
 2331                case Protocol.replyMsg:
 2332                {
 12333                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12334                    info.requestId = info.stream.readInt();
 12335                    if (_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
 2336                    {
 12337                        _asyncRequests.Remove(info.requestId);
 2338
 12339                        info.outAsync.getIs().swap(info.stream);
 2340
 2341                        //
 2342                        // If we just received the reply for a request which isn't acknowledge as
 2343                        // sent yet, we queue the reply instead of processing it right away. It
 2344                        // will be processed once the write callback is invoked for the message.
 2345                        //
 12346                        OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
 12347                        if (message is not null && message.outAsync == info.outAsync)
 2348                        {
 12349                            message.receivedReply = true;
 2350                        }
 12351                        else if (info.outAsync.response())
 2352                        {
 12353                            ++info.upcallCount;
 2354                        }
 2355                        else
 2356                        {
 12357                            info.outAsync = null;
 2358                        }
 12359                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 2360                        {
 12361                            doApplicationClose();
 2362                        }
 2363                    }
 12364                    break;
 2365                }
 2366
 2367                case Protocol.validateConnectionMsg:
 2368                {
 12369                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12370                    break;
 2371                }
 2372
 2373                default:
 2374                {
 02375                    TraceUtil.trace(
 02376                        "received unknown message\n(invalid, closing connection)",
 02377                        info.stream,
 02378                        this,
 02379                        _logger,
 02380                        _traceLevels);
 2381
 02382                    throw new ProtocolException($"Received Ice protocol message with unknown type: {messageType}");
 2383                }
 2384            }
 12385        }
 12386        catch (LocalException ex)
 2387        {
 12388            if (_endpoint.datagram())
 2389            {
 02390                if (_warn)
 2391                {
 02392                    _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
 2393                }
 2394            }
 2395            else
 2396            {
 12397                setState(StateClosed, ex);
 2398            }
 12399        }
 2400
 12401        if (_state == StateHolding)
 2402        {
 2403            // Don't continue reading if the connection is in the holding state.
 02404            return SocketOperation.None;
 2405        }
 12406        else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
 2407        {
 2408            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
 12409            _idleTimeoutTransceiver?.disableIdleCheck();
 12410            return SocketOperation.None;
 2411        }
 2412        else
 2413        {
 2414            // Continue reading.
 12415            return SocketOperation.Read;
 2416        }
 12417    }
 2418
 2419    private void dispatchAll(
 2420        InputStream stream,
 2421        int requestCount,
 2422        int requestId,
 2423        byte compress,
 2424        ObjectAdapter adapter)
 2425    {
 2426        // Note: In contrast to other private or protected methods, this method must be called *without* the mutex
 2427        // locked.
 2428
 12429        Object dispatcher = adapter?.dispatchPipeline;
 2430
 2431        try
 2432        {
 12433            while (requestCount > 0)
 2434            {
 2435                // adapter can be null here, however the adapter set in current can't be null, and we never pass
 2436                // a null current.adapter to the application code. Once this file enables nullable, adapter should be
 2437                // adapter! below.
 12438                var request = new IncomingRequest(requestId, this, adapter, stream);
 2439
 12440                if (dispatcher is not null)
 2441                {
 2442                    // We don't and can't await the dispatchAsync: with batch requests, we want all the dispatches to
 2443                    // execute in the current Ice thread pool thread. If we awaited the dispatchAsync, we could
 2444                    // switch to a .NET thread pool thread.
 12445                    _ = dispatchAsync(request);
 2446                }
 2447                else
 2448                {
 2449                    // Received request on a connection without an object adapter.
 12450                    sendResponse(
 12451                        request.current.createOutgoingResponse(new ObjectNotExistException()),
 12452                        isTwoWay: !_endpoint.datagram() && requestId != 0,
 12453                        compress: 0);
 2454                }
 12455                --requestCount;
 2456            }
 2457
 12458            stream.clear();
 12459        }
 02460        catch (LocalException ex) // TODO: catch all exceptions
 2461        {
 2462            // Typically, the IncomingRequest constructor throws an exception, and we can't continue.
 02463            dispatchException(ex, requestCount);
 02464        }
 2465
 2466        async Task dispatchAsync(IncomingRequest request)
 2467        {
 2468            try
 2469            {
 2470                OutgoingResponse response;
 2471
 2472                try
 2473                {
 12474                    response = await dispatcher.dispatchAsync(request).ConfigureAwait(false);
 12475                }
 12476                catch (System.Exception ex)
 2477                {
 12478                    response = request.current.createOutgoingResponse(ex);
 12479                }
 2480
 12481                sendResponse(response, isTwoWay: !_endpoint.datagram() && requestId != 0, compress);
 12482            }
 02483            catch (LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 2484            {
 02485                dispatchException(ex, requestCount: 1);
 02486            }
 12487        }
 12488    }
 2489
 2490    private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compress)
 2491    {
 12492        bool finished = false;
 2493        try
 2494        {
 12495            lock (_mutex)
 2496            {
 2497                Debug.Assert(_state > StateNotValidated);
 2498
 2499                try
 2500                {
 12501                    if (--_upcallCount == 0)
 2502                    {
 12503                        if (_state == StateFinished)
 2504                        {
 12505                            finished = true;
 12506                            _observer?.detach();
 2507                        }
 12508                        Monitor.PulseAll(_mutex);
 2509                    }
 2510
 12511                    if (_state >= StateClosed)
 2512                    {
 2513                        Debug.Assert(_exception is not null);
 12514                        throw _exception;
 2515                    }
 2516
 12517                    if (isTwoWay)
 2518                    {
 12519                        sendMessage(new OutgoingMessage(response.outputStream, compress > 0));
 2520                    }
 2521
 12522                    if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
 2523                    {
 2524                        // Resume reading if the connection is active and the dispatch count is about to be less than
 2525                        // _maxDispatches.
 12526                        _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
 12527                        _idleTimeoutTransceiver?.enableIdleCheck();
 2528                    }
 2529
 12530                    --_dispatchCount;
 2531
 12532                    if (_state == StateClosing && _upcallCount == 0)
 2533                    {
 12534                        initiateShutdown();
 2535                    }
 12536                }
 12537                catch (LocalException ex)
 2538                {
 12539                    setState(StateClosed, ex);
 12540                }
 2541            }
 2542        }
 2543        finally
 2544        {
 12545            if (finished && _removeFromFactory is not null)
 2546            {
 12547                _removeFromFactory(this);
 2548            }
 12549        }
 12550    }
 2551
 2552    private void dispatchException(LocalException ex, int requestCount)
 2553    {
 02554        bool finished = false;
 2555
 2556        // Fatal exception while dispatching a request. Since sendResponse isn't called in case of a fatal exception
 2557        // we decrement _upcallCount here.
 02558        lock (_mutex)
 2559        {
 02560            setState(StateClosed, ex);
 2561
 02562            if (requestCount > 0)
 2563            {
 2564                Debug.Assert(_upcallCount >= requestCount);
 02565                _upcallCount -= requestCount;
 02566                if (_upcallCount == 0)
 2567                {
 02568                    if (_state == StateFinished)
 2569                    {
 02570                        finished = true;
 02571                        _observer?.detach();
 2572                    }
 02573                    Monitor.PulseAll(_mutex);
 2574                }
 2575            }
 02576        }
 2577
 02578        if (finished && _removeFromFactory is not null)
 2579        {
 02580            _removeFromFactory(this);
 2581        }
 02582    }
 2583
 2584    private void inactivityCheck(System.Threading.Timer inactivityTimer)
 2585    {
 12586        lock (_mutex)
 2587        {
 2588            // If the timers are different, it means this inactivityTimer is no longer current.
 12589            if (inactivityTimer == _inactivityTimer)
 2590            {
 12591                _inactivityTimer = null;
 12592                inactivityTimer.Dispose(); // non-blocking
 2593
 12594                if (_state == StateActive)
 2595                {
 12596                    setState(
 12597                        StateClosing,
 12598                        new ConnectionClosedException(
 12599                            "Connection closed because it remained inactive for longer than the inactivity timeout.",
 12600                            closedByApplication: false));
 2601                }
 2602            }
 2603            // Else this timer was already canceled and disposed. Nothing to do.
 12604        }
 12605    }
 2606
 2607    private void connectTimedOut(System.Threading.Timer connectTimer)
 2608    {
 12609        lock (_mutex)
 2610        {
 12611            if (_state < StateActive)
 2612            {
 12613                setState(StateClosed, new ConnectTimeoutException());
 2614            }
 12615        }
 2616        // else ignore since we're already connected.
 12617        connectTimer.Dispose();
 12618    }
 2619
 2620    private void closeTimedOut(System.Threading.Timer closeTimer)
 2621    {
 12622        lock (_mutex)
 2623        {
 12624            if (_state < StateClosed)
 2625            {
 2626                // We don't use setState(state, exception) because we want to overwrite the exception set by a
 2627                // graceful closure.
 12628                _exception = new CloseTimeoutException();
 12629                setState(StateClosed);
 2630            }
 12631        }
 2632        // else ignore since we're already closed.
 12633        closeTimer.Dispose();
 12634    }
 2635
 2636    private ConnectionInfo initConnectionInfo()
 2637    {
 2638        // Called with _mutex locked.
 2639
 12640        if (_state > StateNotInitialized && _info is not null) // Update the connection info until it's initialized
 2641        {
 12642            return _info;
 2643        }
 2644
 12645        _info =
 12646            _transceiver.getInfo(incoming: _connector is null, _adapter?.getName() ?? "", _endpoint.connectionId());
 12647        return _info;
 2648    }
 2649
 02650    private void warning(string msg, System.Exception ex) => _logger.warning($"{msg}:\n{ex}\n{_transceiver}");
 2651
 2652    private void observerStartRead(Ice.Internal.Buffer buf)
 2653    {
 12654        if (_readStreamPos >= 0)
 2655        {
 2656            Debug.Assert(!buf.empty());
 12657            _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2658        }
 12659        _readStreamPos = buf.empty() ? -1 : buf.b.position();
 12660    }
 2661
 2662    private void observerFinishRead(Ice.Internal.Buffer buf)
 2663    {
 12664        if (_readStreamPos == -1)
 2665        {
 02666            return;
 2667        }
 2668        Debug.Assert(buf.b.position() >= _readStreamPos);
 12669        _observer.receivedBytes(buf.b.position() - _readStreamPos);
 12670        _readStreamPos = -1;
 12671    }
 2672
 2673    private void observerStartWrite(Ice.Internal.Buffer buf)
 2674    {
 12675        if (_writeStreamPos >= 0)
 2676        {
 2677            Debug.Assert(!buf.empty());
 12678            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2679        }
 12680        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
 12681    }
 2682
 2683    private void observerFinishWrite(Ice.Internal.Buffer buf)
 2684    {
 12685        if (_writeStreamPos == -1)
 2686        {
 12687            return;
 2688        }
 12689        if (buf.b.position() > _writeStreamPos)
 2690        {
 12691            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2692        }
 12693        _writeStreamPos = -1;
 12694    }
 2695
 2696    private int read(Ice.Internal.Buffer buf)
 2697    {
 12698        int start = buf.b.position();
 12699        int op = _transceiver.read(buf, ref _hasMoreData);
 12700        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2701        {
 12702            var s = new StringBuilder("received ");
 12703            if (_endpoint.datagram())
 2704            {
 02705                s.Append(buf.b.limit());
 2706            }
 2707            else
 2708            {
 12709                s.Append(buf.b.position() - start);
 12710                s.Append(" of ");
 12711                s.Append(buf.b.limit() - start);
 2712            }
 12713            s.Append(" bytes via ");
 12714            s.Append(_endpoint.protocol());
 12715            s.Append('\n');
 12716            s.Append(ToString());
 12717            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2718        }
 12719        return op;
 2720    }
 2721
 2722    private int write(Ice.Internal.Buffer buf)
 2723    {
 12724        int start = buf.b.position();
 12725        int op = _transceiver.write(buf);
 12726        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2727        {
 12728            var s = new StringBuilder("sent ");
 12729            s.Append(buf.b.position() - start);
 12730            if (!_endpoint.datagram())
 2731            {
 12732                s.Append(" of ");
 12733                s.Append(buf.b.limit() - start);
 2734            }
 12735            s.Append(" bytes via ");
 12736            s.Append(_endpoint.protocol());
 12737            s.Append('\n');
 12738            s.Append(ToString());
 12739            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2740        }
 12741        return op;
 2742    }
 2743
 2744    private void scheduleInactivityTimer()
 2745    {
 2746        // Called with the ConnectionI mutex locked.
 2747        Debug.Assert(_inactivityTimer is null);
 2748        Debug.Assert(_inactivityTimeout > TimeSpan.Zero);
 2749
 12750        _inactivityTimer = new System.Threading.Timer(
 12751            inactivityTimer => inactivityCheck((System.Threading.Timer)inactivityTimer));
 12752        _inactivityTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 12753    }
 2754
 2755    private void cancelInactivityTimer()
 2756    {
 2757        // Called with the ConnectionI mutex locked.
 12758        if (_inactivityTimer is not null)
 2759        {
 12760            _inactivityTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 12761            _inactivityTimer.Dispose();
 12762            _inactivityTimer = null;
 2763        }
 12764    }
 2765
 2766    private void scheduleCloseTimer()
 2767    {
 12768        if (_closeTimeout > TimeSpan.Zero)
 2769        {
 2770#pragma warning disable CA2000 // closeTimer is disposed by closeTimedOut.
 12771            var closeTimer = new System.Threading.Timer(
 12772                timerObj => closeTimedOut((System.Threading.Timer)timerObj));
 2773            // schedule timer to run once; closeTimedOut disposes the timer too.
 12774            closeTimer.Change(_closeTimeout, Timeout.InfiniteTimeSpan);
 2775#pragma warning restore CA2000
 2776        }
 12777    }
 2778
 2779    private void doApplicationClose()
 2780    {
 2781        // Called with the ConnectionI mutex locked.
 2782        Debug.Assert(_state < StateClosing);
 12783        setState(
 12784            StateClosing,
 12785            new ConnectionClosedException(
 12786                "The connection was closed gracefully by the application.",
 12787                closedByApplication: true));
 12788    }
 2789
 2790    private class OutgoingMessage
 2791    {
 12792        internal OutgoingMessage(OutputStream stream, bool compress)
 2793        {
 12794            this.stream = stream;
 12795            this.compress = compress;
 12796        }
 2797
 12798        internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
 2799        {
 12800            this.outAsync = outAsync;
 12801            this.stream = stream;
 12802            this.compress = compress;
 12803            this.requestId = requestId;
 12804        }
 2805
 2806        internal void canceled()
 2807        {
 2808            Debug.Assert(outAsync is not null); // Only requests can timeout.
 12809            outAsync = null;
 12810        }
 2811
 2812        internal bool sent()
 2813        {
 12814            stream = null;
 12815            if (outAsync is not null)
 2816            {
 12817                invokeSent = outAsync.sent();
 12818                return invokeSent || receivedReply;
 2819            }
 12820            return false;
 2821        }
 2822
 2823        internal void completed(LocalException ex)
 2824        {
 12825            if (outAsync is not null)
 2826            {
 12827                if (outAsync.exception(ex))
 2828                {
 12829                    outAsync.invokeException();
 2830                }
 2831            }
 12832            stream = null;
 12833        }
 2834
 2835        internal OutputStream stream;
 2836        internal OutgoingAsyncBase outAsync;
 2837        internal bool compress;
 2838        internal int requestId;
 2839        internal bool prepared;
 2840        internal bool isSent;
 2841        internal bool invokeSent;
 2842        internal bool receivedReply;
 2843    }
 2844
 12845    private static readonly ConnectionState[] connectionStateMap = [
 12846        ConnectionState.ConnectionStateValidating,   // StateNotInitialized
 12847        ConnectionState.ConnectionStateValidating,   // StateNotValidated
 12848        ConnectionState.ConnectionStateActive,       // StateActive
 12849        ConnectionState.ConnectionStateHolding,      // StateHolding
 12850        ConnectionState.ConnectionStateClosing,      // StateClosing
 12851        ConnectionState.ConnectionStateClosing,      // StateClosingPending
 12852        ConnectionState.ConnectionStateClosed,       // StateClosed
 12853        ConnectionState.ConnectionStateClosed,       // StateFinished
 12854    ];
 2855
 2856    private readonly Instance _instance;
 2857    private readonly Transceiver _transceiver;
 2858    private readonly IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
 2859
 2860    private string _desc;
 2861    private readonly string _type;
 2862    private readonly Connector _connector;
 2863    private readonly EndpointI _endpoint;
 2864
 2865    private ObjectAdapter _adapter;
 2866
 2867    private readonly Logger _logger;
 2868    private readonly TraceLevels _traceLevels;
 2869    private readonly Ice.Internal.ThreadPool _threadPool;
 2870
 2871    private readonly TimeSpan _connectTimeout;
 2872    private readonly TimeSpan _closeTimeout;
 2873    private TimeSpan _inactivityTimeout; // protected by _mutex
 2874
 2875    private System.Threading.Timer _inactivityTimer; // can be null
 2876
 2877    private StartCallback _startCallback;
 2878
 2879    // This action must be called outside the ConnectionI lock to avoid lock acquisition deadlocks.
 2880    private readonly Action<ConnectionI> _removeFromFactory;
 2881
 2882    private readonly bool _warn;
 2883    private readonly bool _warnUdp;
 2884
 2885    private readonly int _compressionLevel;
 2886
 2887    private int _nextRequestId;
 2888
 12889    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 2890
 2891    private LocalException _exception;
 2892
 2893    private readonly int _messageSizeMax;
 2894    private readonly BatchRequestQueue _batchRequestQueue;
 2895
 12896    private readonly LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
 2897
 2898    // Contains the message which is being received. If the connection is waiting to receive a message (_readHeader ==
 2899    // true), its size is Protocol.headerSize. Otherwise, its size is the message size specified in the received message
 2900    // header.
 2901    private readonly InputStream _readStream;
 2902
 2903    // When _readHeader is true, the next bytes we'll read are the header of a new message. When false, we're reading
 2904    // next the remainder of a message that was already partially received.
 2905    private bool _readHeader;
 2906
 2907    // Contains the message which is being sent. The write stream buffer is empty if no message is being sent.
 2908    private readonly OutputStream _writeStream;
 2909
 2910    private ConnectionObserver _observer;
 2911    private int _readStreamPos;
 2912    private int _writeStreamPos;
 2913
 2914    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent callbacks and
 2915    // connection establishment callbacks that have been started (or are about to be started) by a thread of the thread
 2916    // pool associated with this connection, and have not completed yet. All these operations except the connection
 2917    // establishment callbacks execute application code or code generated from Slice definitions.
 2918    private int _upcallCount;
 2919
 2920    // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
 2921    // _dispatchCount can be greater than a non-0 _maxDispatches when a receive a batch with multiples requests.
 2922    private int _dispatchCount;
 2923
 2924    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to back-pressure the peer.
 2925    // _maxDispatches <= 0 means no limit.
 2926    private readonly int _maxDispatches;
 2927
 2928    private int _state; // The current state.
 2929    private bool _shutdownInitiated;
 2930    private bool _initialized;
 2931    private bool _validated;
 2932
 2933    // When true, the application called close and Connection must close the connection when it receives the reply
 2934    // for the last outstanding invocation.
 2935    private bool _closeRequested;
 2936
 2937    private ConnectionInfo _info;
 2938
 2939    private CloseCallback _closeCallback;
 2940
 2941    // We need to run the continuation asynchronously since it can be completed by an Ice thread pool thread.
 12942    private readonly TaskCompletionSource _closed = new(TaskCreationOptions.RunContinuationsAsynchronously);
 12943    private readonly object _mutex = new();
 2944}

Methods/Properties

start(Ice.ConnectionI.StartCallback)
startAndWait()
activate()
hold()
destroy(int)
abort()
closeAsync()
isActiveOrHolding()
throwException()
waitUntilHolding()
waitUntilFinished()
updateObserver()
sendAsyncRequest(Ice.Internal.OutgoingAsyncBase, bool, bool, int)
getBatchRequestQueue()
flushBatchRequests(Ice.CompressBatch)
flushBatchRequestsAsync(Ice.CompressBatch, System.IProgress<bool>, System.Threading.CancellationToken)
disableInactivityCheck()
setCloseCallback(Ice.CloseCallback)
asyncRequestCanceled(Ice.Internal.OutgoingAsyncBase, Ice.LocalException)
endpoint()
connector()
setAdapter(Ice.ObjectAdapter)
getAdapter()
getEndpoint()
createProxy(Ice.Identity)
setAdapterFromAdapter(Ice.ObjectAdapter)
startAsync(int, Ice.Internal.AsyncCallback)
finishAsync(int)
message(Ice.Internal.ThreadPoolCurrent)
upcall(Ice.ConnectionI.StartCallback, System.Collections.Generic.Queue<Ice.ConnectionI.OutgoingMessage>, Ice.ConnectionI.MessageInfo)
finished(Ice.Internal.ThreadPoolCurrent)
finish()
ToString()
type()
getInfo()
setBufferSize(int, int)
exception(Ice.LocalException)
getThreadPool()
.ctor(Ice.Internal.Instance, Ice.Internal.Transceiver, Ice.Internal.Connector, Ice.Internal.EndpointI, Ice.ObjectAdapter, System.Action<Ice.ConnectionI>, Ice.ConnectionOptions)
idleCheck(System.TimeSpan)
sendHeartbeat()
isHeartbeat()
toConnectionState(int)
setState(int, Ice.LocalException)
setState(int)
initiateShutdown()
initialize(int)
validate(int)
sendNextMessage(out System.Collections.Generic.Queue<Ice.ConnectionI.OutgoingMessage>)
sendMessage(Ice.ConnectionI.OutgoingMessage)
doCompress(Ice.OutputStream, bool)
parseMessage(ref Ice.ConnectionI.MessageInfo)
dispatchAll(Ice.InputStream, int, int, byte, Ice.ObjectAdapter)
dispatchAsync()
sendResponse(Ice.OutgoingResponse, bool, byte)
dispatchException(Ice.LocalException, int)
inactivityCheck(System.Threading.Timer)
connectTimedOut(System.Threading.Timer)
closeTimedOut(System.Threading.Timer)
initConnectionInfo()
warning(string, System.Exception)
observerStartRead(Ice.Internal.Buffer)
observerFinishRead(Ice.Internal.Buffer)
observerStartWrite(Ice.Internal.Buffer)
observerFinishWrite(Ice.Internal.Buffer)
read(Ice.Internal.Buffer)
write(Ice.Internal.Buffer)
scheduleInactivityTimer()
cancelInactivityTimer()
scheduleCloseTimer()
doApplicationClose()
.ctor(Ice.OutputStream, bool)
.ctor(Ice.Internal.OutgoingAsyncBase, Ice.OutputStream, bool, int)
canceled()
sent()
completed(Ice.LocalException)
.cctor()