< Summary

Information
Class: Ice.ConnectionI
Assembly: Ice
File(s): /_/csharp/src/Ice/ConnectionI.cs
Tag: 105_25977636357
Line coverage
87%
Covered lines: 1059
Uncovered lines: 156
Coverable lines: 1215
Total lines: 2959
Line coverage: 87.1%
Branch coverage
82%
Covered branches: 604
Total branches: 732
Branch coverage: 82.5%
Method coverage
94%
Covered methods: 74
Fully covered methods: 47
Total methods: 78
Method coverage: 94.8%
Full method coverage: 60.2%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
start(...)87.5%8894.44%
startAndWait()30%281043.75%
activate()100%22100%
hold()100%22100%
destroy(...)50%44100%
abort()100%11100%
closeAsync()100%44100%
isActiveOrHolding()100%22100%
throwException()50%2280%
waitUntilHolding()100%44100%
waitUntilFinished()100%44100%
updateObserver()66.67%6692.86%
sendAsyncRequest(...)90%101093.33%
getBatchRequestQueue()100%11100%
flushBatchRequests(...)100%1175%
flushBatchRequestsAsync(...)100%11100%
disableInactivityCheck()100%11100%
setCloseCallback(...)25%8438.46%
asyncRequestCanceled(...)52.94%473477.42%
endpoint()100%11100%
connector()100%11100%
setAdapter(...)62.5%9880%
getAdapter()100%11100%
getEndpoint()100%11100%
createProxy(...)100%11100%
setAdapterFromAdapter(...)50%4485.71%
startAsync(...)50%5466.67%
doIO()95%202092.86%
finishAsync(...)100%2424100%
message(...)84.52%988487.29%
upcall(...)80%343083.78%
finished(...)100%88100%
finish()91.67%626091.43%
ToString()100%11100%
type()100%11100%
getInfo()100%22100%
setBufferSize(...)50%2285.71%
exception(...)100%11100%
getThreadPool()100%210%
.ctor(...)81.25%161690.32%
idleCheck(...)100%44100%
sendHeartbeat()88.46%272688.89%
isHeartbeat()100%210%
toConnectionState(...)100%11100%
setState(...)75%202092.31%
setState(...)88.57%847085.94%
initiateShutdown()100%88100%
initialize(...)100%22100%
validate(...)76%625083.33%
sendNextMessage(...)96.43%302886.05%
sendMessage(...)100%1010100%
doCompress(...)100%88100%
parseMessage(...)75%634478.57%
dispatchAll(...)87.5%8881.25%
dispatchAsync()100%2272.73%
sendResponse(...)96.15%2626100%
dispatchException(...)0%156120%
inactivityCheck(...)100%44100%
connectTimedOut(...)100%22100%
closeTimedOut(...)100%22100%
initConnectionInfo()100%88100%
warning(...)100%210%
observerStartRead(...)75%44100%
observerFinishRead(...)50%2280%
observerStartWrite(...)100%44100%
observerFinishWrite(...)100%44100%
read(...)83.33%6693.33%
write(...)100%66100%
scheduleInactivityTimer()100%11100%
cancelInactivityTimer()100%22100%
scheduleCloseTimer()100%22100%
doApplicationClose()100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
canceled()100%11100%
sent()100%44100%
completed(...)100%44100%
.cctor()100%11100%

File(s)

/_/csharp/src/Ice/ConnectionI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Ice.Instrumentation;
 4using Ice.Internal;
 5using System.Diagnostics;
 6using System.Text;
 7
 8namespace Ice;
 9
 10#pragma warning disable CA1001 // _inactivityTimer is disposed by cancelInactivityTimer.
 11public sealed class ConnectionI : Internal.EventHandler, CancellationHandler, Connection
 12#pragma warning restore CA1001
 13{
 14    internal interface StartCallback
 15    {
 16        void connectionStartCompleted(ConnectionI connection);
 17
 18        void connectionStartFailed(ConnectionI connection, LocalException ex);
 19    }
 20
 21    internal void start(StartCallback callback)
 22    {
 23        try
 24        {
 125            lock (_mutex)
 26            {
 27                //
 28                // The connection might already be closed if the communicator was destroyed.
 29                //
 130                if (_state >= StateClosed)
 31                {
 32                    Debug.Assert(_exception is not null);
 033                    throw _exception;
 34                }
 35
 136                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 37                {
 138                    if (_connectTimeout > TimeSpan.Zero)
 39                    {
 40#pragma warning disable CA2000 // connectTimer is disposed by connectTimedOut.
 141                        var connectTimer = new System.Threading.Timer(
 142                            timerObj => connectTimedOut((System.Threading.Timer)timerObj));
 43                        // schedule timer to run once; connectTimedOut disposes the timer too.
 144                        connectTimer.Change(_connectTimeout, Timeout.InfiniteTimeSpan);
 45#pragma warning restore CA2000
 46                    }
 47
 148                    _startCallback = callback;
 149                    return;
 50                }
 51
 52                // The connection starts in the holding state. It will be activated by the connection factory.
 153                setState(StateHolding);
 154            }
 155        }
 156        catch (LocalException ex)
 57        {
 158            exception(ex);
 159            callback.connectionStartFailed(this, _exception);
 160            return;
 61        }
 62
 163        callback.connectionStartCompleted(this);
 164    }
 65
 66    internal void startAndWait()
 67    {
 68        try
 69        {
 170            lock (_mutex)
 71            {
 72                //
 73                // The connection might already be closed if the communicator was destroyed.
 74                //
 175                if (_state >= StateClosed)
 76                {
 77                    Debug.Assert(_exception is not null);
 078                    throw _exception;
 79                }
 80
 181                if (!initialize(SocketOperation.None) || !validate(SocketOperation.None))
 82                {
 83                    //
 84                    // Wait for the connection to be validated.
 85                    //
 086                    while (_state <= StateNotValidated)
 87                    {
 088                        Monitor.Wait(_mutex);
 89                    }
 90
 091                    if (_state >= StateClosing)
 92                    {
 93                        Debug.Assert(_exception is not null);
 094                        throw _exception;
 95                    }
 96                }
 97
 98                //
 99                // We start out in holding state.
 100                //
 1101                setState(StateHolding);
 1102            }
 1103        }
 0104        catch (LocalException ex)
 105        {
 0106            exception(ex);
 0107            waitUntilFinished();
 0108            return;
 109        }
 1110    }
 111
 112    internal void activate()
 113    {
 1114        lock (_mutex)
 115        {
 1116            if (_state <= StateNotValidated)
 117            {
 1118                return;
 119            }
 120
 1121            setState(StateActive);
 1122        }
 1123    }
 124
 125    internal void hold()
 126    {
 1127        lock (_mutex)
 128        {
 1129            if (_state <= StateNotValidated)
 130            {
 1131                return;
 132            }
 133
 1134            setState(StateHolding);
 1135        }
 1136    }
 137
 138    // DestructionReason.
 139    public const int ObjectAdapterDeactivated = 0;
 140    public const int CommunicatorDestroyed = 1;
 141
 142    internal void destroy(int reason)
 143    {
 1144        lock (_mutex)
 145        {
 146            switch (reason)
 147            {
 148                case ObjectAdapterDeactivated:
 149                {
 1150                    setState(StateClosing, new ObjectAdapterDeactivatedException(_adapter?.getName() ?? ""));
 1151                    break;
 152                }
 153
 154                case CommunicatorDestroyed:
 155                {
 1156                    setState(StateClosing, new CommunicatorDestroyedException());
 1157                    break;
 158                }
 159            }
 160        }
 1161    }
 162
 163    public void abort()
 164    {
 1165        lock (_mutex)
 166        {
 1167            setState(
 1168                StateClosed,
 1169                new ConnectionAbortedException(
 1170                    "The connection was aborted by the application.",
 1171                    closedByApplication: true));
 1172        }
 1173    }
 174
 175    public Task closeAsync()
 176    {
 1177        lock (_mutex)
 178        {
 1179            if (_state < StateClosing)
 180            {
 1181                if (_asyncRequests.Count == 0)
 182                {
 1183                    doApplicationClose();
 184                }
 185                else
 186                {
 1187                    _closeRequested = true;
 1188                    scheduleCloseTimer(); // we don't wait forever for outstanding invocations to complete
 189                }
 190            }
 191            // else nothing to do, already closing or closed.
 1192        }
 193
 1194        return _closed.Task;
 195    }
 196
 197    internal bool isActiveOrHolding()
 198    {
 1199        lock (_mutex)
 200        {
 1201            return _state > StateNotValidated && _state < StateClosing;
 202        }
 1203    }
 204
 205    public void throwException()
 206    {
 1207        lock (_mutex)
 208        {
 1209            if (_exception is not null)
 210            {
 211                Debug.Assert(_state >= StateClosing);
 0212                throw _exception;
 213            }
 1214        }
 1215    }
 216
 217    internal void waitUntilHolding()
 218    {
 1219        lock (_mutex)
 220        {
 1221            while (_state < StateHolding || _upcallCount > 0)
 222            {
 1223                Monitor.Wait(_mutex);
 224            }
 1225        }
 1226    }
 227
 228    internal void waitUntilFinished()
 229    {
 1230        lock (_mutex)
 231        {
 232            //
 233            // We wait indefinitely until the connection is finished and all
 234            // outstanding requests are completed. Otherwise we couldn't
 235            // guarantee that there are no outstanding calls when deactivate()
 236            // is called on the servant locators.
 237            //
 1238            while (_state < StateFinished || _upcallCount > 0)
 239            {
 1240                Monitor.Wait(_mutex);
 241            }
 242
 243            Debug.Assert(_state == StateFinished);
 244
 245            //
 246            // Clear the OA. See bug 1673 for the details of why this is necessary.
 247            //
 1248            _adapter = null;
 1249        }
 1250    }
 251
 252    internal void updateObserver()
 253    {
 1254        lock (_mutex)
 255        {
 1256            if (_state < StateNotValidated || _state > StateClosed)
 257            {
 0258                return;
 259            }
 260
 261            Debug.Assert(_instance.initializationData().observer is not null);
 1262            _observer = _instance.initializationData().observer.getConnectionObserver(
 1263                initConnectionInfo(),
 1264                _endpoint,
 1265                toConnectionState(_state),
 1266                _observer);
 1267            if (_observer is not null)
 268            {
 1269                _observer.attach();
 270            }
 271            else
 272            {
 1273                _writeStreamPos = -1;
 1274                _readStreamPos = -1;
 275            }
 1276        }
 1277    }
 278
 279    internal int sendAsyncRequest(
 280        OutgoingAsyncBase og,
 281        bool compress,
 282        bool response,
 283        int batchRequestCount)
 284    {
 1285        OutputStream os = og.getOs();
 286
 1287        lock (_mutex)
 288        {
 289            //
 290            // If the exception is closed before we even have a chance
 291            // to send our request, we always try to send the request
 292            // again.
 293            //
 1294            if (_exception is not null)
 295            {
 1296                throw new RetryException(_exception);
 297            }
 298
 299            Debug.Assert(_state > StateNotValidated);
 300            Debug.Assert(_state < StateClosing);
 301
 302            //
 303            // Ensure the message isn't bigger than what we can send with the
 304            // transport.
 305            //
 1306            _transceiver.checkSendSize(os.getBuffer());
 307
 308            //
 309            // Notify the request that it's cancelable with this connection.
 310            // This will throw if the request is canceled.
 311            //
 1312            og.cancelable(this);
 1313            int requestId = 0;
 1314            if (response)
 315            {
 316                //
 317                // Create a new unique request ID.
 318                //
 1319                requestId = _nextRequestId++;
 1320                if (requestId <= 0)
 321                {
 0322                    _nextRequestId = 1;
 0323                    requestId = _nextRequestId++;
 324                }
 325
 326                //
 327                // Fill in the request ID.
 328                //
 1329                os.pos(Protocol.headerSize);
 1330                os.writeInt(requestId);
 331            }
 1332            else if (batchRequestCount > 0)
 333            {
 1334                os.pos(Protocol.headerSize);
 1335                os.writeInt(batchRequestCount);
 336            }
 337
 1338            og.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
 339
 340            // We're just about to send a request, so we are not inactive anymore.
 1341            cancelInactivityTimer();
 342
 1343            int status = OutgoingAsyncBase.AsyncStatusQueued;
 344            try
 345            {
 1346                var message = new OutgoingMessage(og, os, compress, requestId);
 1347                status = sendMessage(message);
 1348            }
 1349            catch (LocalException ex)
 350            {
 1351                setState(StateClosed, ex);
 352                Debug.Assert(_exception is not null);
 1353                throw _exception;
 354            }
 355
 1356            if (response)
 357            {
 358                //
 359                // Add to the async requests map.
 360                //
 1361                _asyncRequests[requestId] = og;
 362            }
 1363            return status;
 364        }
 1365    }
 366
 1367    internal BatchRequestQueue getBatchRequestQueue() => _batchRequestQueue;
 368
 369    public void flushBatchRequests(CompressBatch compress)
 370    {
 371        try
 372        {
 1373            var completed = new FlushBatchTaskCompletionCallback();
 1374            var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 1375            outgoing.invoke(_flushBatchRequests_name, compress, true);
 1376            completed.Task.Wait();
 1377        }
 0378        catch (AggregateException ex)
 379        {
 0380            throw ex.InnerException;
 381        }
 1382    }
 383
 384    public Task flushBatchRequestsAsync(
 385        CompressBatch compress,
 386        IProgress<bool> progress = null,
 387        CancellationToken cancel = default)
 388    {
 1389        var completed = new FlushBatchTaskCompletionCallback(progress, cancel);
 1390        var outgoing = new ConnectionFlushBatchAsync(this, _instance, completed);
 1391        outgoing.invoke(_flushBatchRequests_name, compress, false);
 1392        return completed.Task;
 393    }
 394
 395    private const string _flushBatchRequests_name = "flushBatchRequests";
 396
 397    public void disableInactivityCheck()
 398    {
 1399        lock (_mutex)
 400        {
 1401            cancelInactivityTimer();
 1402            _inactivityTimeout = TimeSpan.Zero;
 1403        }
 1404    }
 405
 406    public void setCloseCallback(CloseCallback callback)
 407    {
 1408        lock (_mutex)
 409        {
 1410            if (_state >= StateClosed)
 411            {
 0412                if (callback is not null)
 413                {
 0414                    _threadPool.execute(
 0415                        () =>
 0416                        {
 0417                            try
 0418                            {
 0419                                callback(this);
 0420                            }
 0421                            catch (System.Exception ex)
 0422                            {
 0423                                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 0424                            }
 0425                        },
 0426                        this);
 427                }
 428            }
 429            else
 430            {
 1431                _closeCallback = callback;
 432            }
 1433        }
 1434    }
 435
 436    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, LocalException ex)
 437    {
 438        //
 439        // NOTE: This isn't called from a thread pool thread.
 440        //
 441
 1442        lock (_mutex)
 443        {
 1444            if (_state >= StateClosed)
 445            {
 0446                return; // The request has already been or will be shortly notified of the failure.
 447            }
 448
 1449            OutgoingMessage o = _sendStreams.FirstOrDefault(m => m.outAsync == outAsync);
 1450            if (o is not null)
 451            {
 1452                if (o.requestId > 0)
 453                {
 1454                    _asyncRequests.Remove(o.requestId);
 455                }
 456
 1457                if (ex is ConnectionAbortedException)
 458                {
 0459                    setState(StateClosed, ex);
 460                }
 461                else
 462                {
 463                    //
 464                    // If the request is being sent, don't remove it from the send streams,
 465                    // it will be removed once the sending is finished.
 466                    //
 1467                    if (o == _sendStreams.First.Value)
 468                    {
 0469                        o.canceled();
 470                    }
 471                    else
 472                    {
 1473                        o.canceled();
 1474                        _sendStreams.Remove(o);
 475                    }
 1476                    if (outAsync.exception(ex))
 477                    {
 1478                        outAsync.invokeExceptionAsync();
 479                    }
 480                }
 481
 1482                if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 483                {
 0484                    doApplicationClose();
 485                }
 1486                return;
 487            }
 488
 1489            if (outAsync is OutgoingAsync)
 490            {
 1491                foreach (KeyValuePair<int, OutgoingAsyncBase> kvp in _asyncRequests)
 492                {
 1493                    if (kvp.Value == outAsync)
 494                    {
 1495                        if (ex is ConnectionAbortedException)
 496                        {
 0497                            setState(StateClosed, ex);
 498                        }
 499                        else
 500                        {
 1501                            _asyncRequests.Remove(kvp.Key);
 1502                            if (outAsync.exception(ex))
 503                            {
 1504                                outAsync.invokeExceptionAsync();
 505                            }
 506                        }
 507
 1508                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 509                        {
 0510                            doApplicationClose();
 511                        }
 1512                        return;
 513                    }
 514                }
 515            }
 0516        }
 1517    }
 518
 1519    internal EndpointI endpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 520
 1521    internal Connector connector() => _connector; // No mutex protection necessary, _endpoint is immutable.
 522
 523    public void setAdapter(ObjectAdapter adapter)
 524    {
 1525        if (_connector is null) // server connection
 526        {
 0527            throw new InvalidOperationException("setAdapter can only be called on a client connection");
 528        }
 529
 1530        if (adapter is not null)
 531        {
 532            // Go through the adapter to set the adapter and servant manager on this connection
 533            // to ensure the object adapter is still active.
 1534            adapter.setAdapterOnConnection(this);
 535        }
 536        else
 537        {
 1538            lock (_mutex)
 539            {
 1540                if (_state <= StateNotValidated || _state >= StateClosing)
 541                {
 0542                    return;
 543                }
 1544                _adapter = null;
 1545            }
 546        }
 547
 548        //
 549        // We never change the thread pool with which we were initially
 550        // registered, even if we add or remove an object adapter.
 551        //
 1552    }
 553
 554    public ObjectAdapter getAdapter()
 555    {
 1556        lock (_mutex)
 557        {
 1558            return _adapter;
 559        }
 1560    }
 561
 1562    public Endpoint getEndpoint() => _endpoint; // No mutex protection necessary, _endpoint is immutable.
 563
 564    public ObjectPrx createProxy(Identity id)
 565    {
 1566        ObjectAdapter.checkIdentity(id);
 1567        return new ObjectPrxHelper(_instance.referenceFactory().create(id, this));
 568    }
 569
 570    public void setAdapterFromAdapter(ObjectAdapter adapter)
 571    {
 1572        lock (_mutex)
 573        {
 1574            if (_state <= StateNotValidated || _state >= StateClosing)
 575            {
 0576                return;
 577            }
 578            Debug.Assert(adapter is not null); // Called by ObjectAdapter::setAdapterOnConnection
 1579            _adapter = adapter;
 580
 581            // Clear cached connection info (if any) as it's no longer accurate.
 1582            _info = null;
 1583        }
 1584    }
 585
 586    //
 587    // Operations from EventHandler
 588    //
 589    public override bool startAsync(int operation, Ice.Internal.AsyncCallback completedCallback)
 590    {
 1591        if (_state >= StateClosed)
 592        {
 0593            return false;
 594        }
 595
 1596        if (_threadHopRequired)
 597        {
 598            // Run the I/O on a .NET ThreadPool thread so it survives the initiating Ice worker exiting.
 599            // See ctor for when this is required.
 0600            Task.Run(doIO);
 601        }
 602        else
 603        {
 1604            doIO();
 605        }
 606
 1607        return true;
 608
 609        void doIO()
 610        {
 1611            lock (_mutex)
 612            {
 1613                if (_state >= StateClosed)
 614                {
 0615                    completedCallback(this);
 0616                    return;
 617                }
 618
 619                try
 620                {
 1621                    if ((operation & SocketOperation.Write) != 0)
 622                    {
 1623                        if (_observer != null)
 624                        {
 1625                            observerStartWrite(_writeStream.getBuffer());
 626                        }
 627
 1628                        bool completedSynchronously =
 1629                            _transceiver.startWrite(
 1630                                _writeStream.getBuffer(),
 1631                                completedCallback,
 1632                                this,
 1633                                out bool messageWritten);
 634
 635                        // If the startWrite call wrote the message, we assume the message is sent now for at-most-once
 636                        // semantics in the event the connection is closed while the message is still in _sendStreams.
 1637                        if (messageWritten && _sendStreams.Count > 0)
 638                        {
 639                            // See finish() code.
 1640                            _sendStreams.First.Value.isSent = true;
 641                        }
 642
 1643                        if (completedSynchronously)
 644                        {
 645                            // If the write completed synchronously, we need to call the completedCallback.
 1646                            completedCallback(this);
 647                        }
 648                    }
 1649                    else if ((operation & SocketOperation.Read) != 0)
 650                    {
 1651                        if (_observer != null && !_readHeader)
 652                        {
 1653                            observerStartRead(_readStream.getBuffer());
 654                        }
 655
 1656                        if (_transceiver.startRead(_readStream.getBuffer(), completedCallback, this))
 657                        {
 1658                            completedCallback(this);
 659                        }
 660                    }
 1661                }
 1662                catch (LocalException ex)
 663                {
 1664                    setState(StateClosed, ex);
 1665                    completedCallback(this);
 1666                }
 667            }
 1668        }
 669    }
 670
 671    public override bool finishAsync(int operation)
 672    {
 1673        if (_state >= StateClosed)
 674        {
 1675            return false;
 676        }
 677
 678        try
 679        {
 1680            if ((operation & SocketOperation.Write) != 0)
 681            {
 1682                Ice.Internal.Buffer buf = _writeStream.getBuffer();
 1683                int start = buf.b.position();
 1684                _transceiver.finishWrite(buf);
 1685                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 686                {
 1687                    var s = new StringBuilder("sent ");
 1688                    s.Append(buf.b.position() - start);
 1689                    if (!_endpoint.datagram())
 690                    {
 1691                        s.Append(" of ");
 1692                        s.Append(buf.b.limit() - start);
 693                    }
 1694                    s.Append(" bytes via ");
 1695                    s.Append(_endpoint.protocol());
 1696                    s.Append('\n');
 1697                    s.Append(ToString());
 1698                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 699                }
 700
 1701                if (_observer is not null)
 702                {
 1703                    observerFinishWrite(_writeStream.getBuffer());
 704                }
 705            }
 1706            else if ((operation & SocketOperation.Read) != 0)
 707            {
 1708                Ice.Internal.Buffer buf = _readStream.getBuffer();
 1709                int start = buf.b.position();
 1710                _transceiver.finishRead(buf);
 1711                if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 712                {
 1713                    var s = new StringBuilder("received ");
 1714                    if (_endpoint.datagram())
 715                    {
 1716                        s.Append(buf.b.limit());
 717                    }
 718                    else
 719                    {
 1720                        s.Append(buf.b.position() - start);
 1721                        s.Append(" of ");
 1722                        s.Append(buf.b.limit() - start);
 723                    }
 1724                    s.Append(" bytes via ");
 1725                    s.Append(_endpoint.protocol());
 1726                    s.Append('\n');
 1727                    s.Append(ToString());
 1728                    _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 729                }
 730
 1731                if (_observer is not null && !_readHeader)
 732                {
 1733                    observerFinishRead(_readStream.getBuffer());
 734                }
 735            }
 1736        }
 1737        catch (LocalException ex)
 738        {
 1739            setState(StateClosed, ex);
 1740        }
 1741        return _state < StateClosed;
 742    }
 743
 744    public override void message(ThreadPoolCurrent current)
 745    {
 1746        StartCallback startCB = null;
 1747        Queue<OutgoingMessage> sentCBs = null;
 1748        var info = new MessageInfo();
 1749        int upcallCount = 0;
 750
 1751        using var msg = new ThreadPoolMessage(current, _mutex);
 1752        lock (_mutex)
 753        {
 754            try
 755            {
 1756                if (!msg.startIOScope())
 757                {
 1758                    return;
 759                }
 760
 1761                if (_state >= StateClosed)
 762                {
 0763                    return;
 764                }
 765
 766                try
 767                {
 1768                    int writeOp = SocketOperation.None;
 1769                    int readOp = SocketOperation.None;
 770
 771                    // If writes are ready, write the data from the connection's write buffer (_writeStream)
 1772                    if ((current.operation & SocketOperation.Write) != 0)
 773                    {
 1774                        if (_observer is not null)
 775                        {
 1776                            observerStartWrite(_writeStream.getBuffer());
 777                        }
 1778                        writeOp = write(_writeStream.getBuffer());
 1779                        if (_observer is not null && (writeOp & SocketOperation.Write) == 0)
 780                        {
 1781                            observerFinishWrite(_writeStream.getBuffer());
 782                        }
 783                    }
 784
 785                    // If reads are ready, read the data into the connection's read buffer (_readStream). The data is
 786                    // read until:
 787                    // - the full message is read (the transport read returns SocketOperationNone) and
 788                    //   the read buffer is fully filled
 789                    // - the read operation on the transport can't continue without blocking
 1790                    if ((current.operation & SocketOperation.Read) != 0)
 791                    {
 792                        while (true)
 793                        {
 1794                            Ice.Internal.Buffer buf = _readStream.getBuffer();
 795
 1796                            if (_observer is not null && !_readHeader)
 797                            {
 1798                                observerStartRead(buf);
 799                            }
 800
 1801                            readOp = read(buf);
 1802                            if ((readOp & SocketOperation.Read) != 0)
 803                            {
 804                                // Can't continue without blocking, exit out of the loop.
 805                                break;
 806                            }
 1807                            if (_observer is not null && !_readHeader)
 808                            {
 809                                Debug.Assert(!buf.b.hasRemaining());
 1810                                observerFinishRead(buf);
 811                            }
 812
 813                            // If read header is true, we're reading a new Ice protocol message and we need to read
 814                            // the message header.
 1815                            if (_readHeader)
 816                            {
 817                                // The next read will read the remainder of the message.
 1818                                _readHeader = false;
 819
 1820                                _observer?.receivedBytes(Protocol.headerSize);
 821
 822                                //
 823                                // Connection is validated on first message. This is only used by
 824                                // setState() to check whether or not we can print a connection
 825                                // warning (a client might close the connection forcefully if the
 826                                // connection isn't validated, we don't want to print a warning
 827                                // in this case).
 828                                //
 1829                                _validated = true;
 830
 831                                // Full header should be read because the size of _readStream is always headerSize (14)
 832                                // when reading a new message (see the code that sets _readHeader = true).
 1833                                int pos = _readStream.pos();
 1834                                if (pos < Protocol.headerSize)
 835                                {
 836                                    //
 837                                    // This situation is possible for small UDP packets.
 838                                    //
 0839                                    throw new MarshalException("Received Ice message with too few bytes in header.");
 840                                }
 841
 842                                // Decode the header.
 1843                                _readStream.pos(0);
 1844                                byte[] m = new byte[4];
 1845                                m[0] = _readStream.readByte();
 1846                                m[1] = _readStream.readByte();
 1847                                m[2] = _readStream.readByte();
 1848                                m[3] = _readStream.readByte();
 1849                                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 1850                                m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 851                                {
 0852                                    throw new ProtocolException(
 0853                                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 854                                }
 855
 1856                                var pv = new ProtocolVersion(_readStream);
 1857                                if (pv != Protocol.currentProtocol)
 858                                {
 0859                                    throw new MarshalException(
 0860                                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 861                                }
 1862                                var ev = new EncodingVersion(_readStream);
 1863                                if (ev != Protocol.currentProtocolEncoding)
 864                                {
 0865                                    throw new MarshalException(
 0866                                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 867                                }
 868
 1869                                _readStream.readByte(); // messageType
 1870                                _readStream.readByte(); // compress
 1871                                int size = _readStream.readInt();
 1872                                if (size < Protocol.headerSize)
 873                                {
 0874                                    throw new MarshalException($"Received Ice message with unexpected size {size}.");
 875                                }
 876
 877                                // Resize the read buffer to the message size.
 1878                                if (size > _messageSizeMax)
 879                                {
 1880                                    Ex.throwMemoryLimitException(size, _messageSizeMax);
 881                                }
 1882                                if (size > _readStream.size())
 883                                {
 1884                                    _readStream.resize(size);
 885                                }
 1886                                _readStream.pos(pos);
 887                            }
 888
 1889                            if (buf.b.hasRemaining())
 890                            {
 1891                                if (_endpoint.datagram())
 892                                {
 1893                                    throw new DatagramLimitException(); // The message was truncated.
 894                                }
 895                                continue;
 896                            }
 897                            break;
 898                        }
 899                    }
 900
 901                    // readOp and writeOp are set to the operations that the transport read or write calls from above
 902                    // returned. They indicate which operations will need to be monitored by the thread pool's selector
 903                    // when this method returns.
 1904                    int newOp = readOp | writeOp;
 905
 906                    // Operations that are ready. For example, if message was called with SocketOperationRead and the
 907                    // transport read returned SocketOperationNone, reads are considered done: there's no additional
 908                    // data to read.
 1909                    int readyOp = current.operation & ~newOp;
 910
 1911                    if (_state <= StateNotValidated)
 912                    {
 913                        // If the connection is still not validated and there's still data to read or write, continue
 914                        // waiting for data to read or write.
 1915                        if (newOp != 0)
 916                        {
 1917                            _threadPool.update(this, current.operation, newOp);
 1918                            return;
 919                        }
 920
 921                        // Initialize the connection if it's not initialized yet.
 1922                        if (_state == StateNotInitialized && !initialize(current.operation))
 923                        {
 1924                            return;
 925                        }
 926
 927                        // Validate the connection if it's not validated yet.
 1928                        if (_state <= StateNotValidated && !validate(current.operation))
 929                        {
 1930                            return;
 931                        }
 932
 933                        // The connection is validated and doesn't need additional data to be read or written. So
 934                        // unregister it from the thread pool's selector.
 1935                        _threadPool.unregister(this, current.operation);
 936
 937                        //
 938                        // We start out in holding state.
 939                        //
 1940                        setState(StateHolding);
 1941                        if (_startCallback is not null)
 942                        {
 1943                            startCB = _startCallback;
 1944                            _startCallback = null;
 1945                            if (startCB is not null)
 946                            {
 1947                                ++upcallCount;
 948                            }
 949                        }
 950                    }
 951                    else
 952                    {
 953                        Debug.Assert(_state <= StateClosingPending);
 954
 955                        //
 956                        // We parse messages first, if we receive a close
 957                        // connection message we won't send more messages.
 958                        //
 1959                        if ((readyOp & SocketOperation.Read) != 0)
 960                        {
 961                            // At this point, the protocol message is fully read and can therefore be decoded by
 962                            // parseMessage. parseMessage returns the operation to wait for readiness next.
 1963                            newOp |= parseMessage(ref info);
 1964                            upcallCount += info.upcallCount;
 965                        }
 966
 1967                        if ((readyOp & SocketOperation.Write) != 0)
 968                        {
 969                            // At this point the message from _writeStream is fully written and the next message can be
 970                            // written.
 971
 1972                            newOp |= sendNextMessage(out sentCBs);
 1973                            if (sentCBs is not null)
 974                            {
 1975                                ++upcallCount;
 976                            }
 977                        }
 978
 979                        // If the connection is not closed yet, we can update the thread pool selector to wait for
 980                        // readiness of read, write or both operations.
 1981                        if (_state < StateClosed)
 982                        {
 1983                            _threadPool.update(this, current.operation, newOp);
 984                        }
 985                    }
 986
 1987                    if (upcallCount == 0)
 988                    {
 1989                        return; // Nothing to execute, we're done!
 990                    }
 991
 1992                    _upcallCount += upcallCount;
 993
 994                    // There's something to execute so we mark IO as completed to elect a new leader thread and let IO
 995                    // be performed on this new leader thread while this thread continues with executing the upcalls.
 1996                    msg.ioCompleted();
 1997                }
 1998                catch (DatagramLimitException) // Expected.
 999                {
 11000                    if (_warnUdp)
 1001                    {
 01002                        _logger.warning($"maximum datagram size of {_readStream.pos()} exceeded");
 1003                    }
 11004                    _readStream.resize(Protocol.headerSize);
 11005                    _readStream.pos(0);
 11006                    _readHeader = true;
 11007                    return;
 1008                }
 11009                catch (SocketException ex)
 1010                {
 11011                    setState(StateClosed, ex);
 11012                    return;
 1013                }
 11014                catch (LocalException ex)
 1015                {
 11016                    if (_endpoint.datagram())
 1017                    {
 01018                        if (_warn)
 1019                        {
 01020                            _logger.warning($"datagram connection exception:\n{ex}\n{_desc}");
 1021                        }
 01022                        _readStream.resize(Protocol.headerSize);
 01023                        _readStream.pos(0);
 01024                        _readHeader = true;
 1025                    }
 1026                    else
 1027                    {
 11028                        setState(StateClosed, ex);
 1029                    }
 11030                    return;
 1031                }
 1032            }
 1033            finally
 1034            {
 11035                msg.finishIOScope();
 11036            }
 1037        }
 1038
 11039        _threadPool.executeFromThisThread(() => upcall(startCB, sentCBs, info), this);
 11040    }
 1041
 1042    private void upcall(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
 1043    {
 11044        int completedUpcallCount = 0;
 1045
 1046        //
 1047        // Notify the factory that the connection establishment and
 1048        // validation has completed.
 1049        //
 11050        if (startCB is not null)
 1051        {
 11052            startCB.connectionStartCompleted(this);
 11053            ++completedUpcallCount;
 1054        }
 1055
 1056        //
 1057        // Notify AMI calls that the message was sent.
 1058        //
 11059        if (sentCBs is not null)
 1060        {
 11061            foreach (OutgoingMessage m in sentCBs)
 1062            {
 11063                if (m.invokeSent)
 1064                {
 11065                    m.outAsync.invokeSent();
 1066                }
 11067                if (m.receivedReply)
 1068                {
 11069                    var outAsync = (OutgoingAsync)m.outAsync;
 11070                    if (outAsync.response())
 1071                    {
 11072                        outAsync.invokeResponse();
 1073                    }
 1074                }
 1075            }
 11076            ++completedUpcallCount;
 1077        }
 1078
 1079        //
 1080        // Asynchronous replies must be handled outside the thread
 1081        // synchronization, so that nested calls are possible.
 1082        //
 11083        if (info.outAsync is not null)
 1084        {
 11085            info.outAsync.invokeResponse();
 11086            ++completedUpcallCount;
 1087        }
 1088
 1089        //
 1090        // Method invocation (or multiple invocations for batch messages)
 1091        // must be done outside the thread synchronization, so that nested
 1092        // calls are possible.
 1093        //
 11094        if (info.requestCount > 0)
 1095        {
 11096            dispatchAll(info.stream, info.requestCount, info.requestId, info.compress, info.adapter);
 1097        }
 1098
 1099        //
 1100        // Decrease the upcall count.
 1101        //
 11102        bool finished = false;
 11103        if (completedUpcallCount > 0)
 1104        {
 11105            lock (_mutex)
 1106            {
 11107                _upcallCount -= completedUpcallCount;
 11108                if (_upcallCount == 0)
 1109                {
 1110                    // Only initiate shutdown if not already initiated. It might have already been initiated if the sent
 1111                    // callback or AMI callback was called when the connection was in the closing state.
 11112                    if (_state == StateClosing)
 1113                    {
 1114                        try
 1115                        {
 11116                            initiateShutdown();
 11117                        }
 01118                        catch (Ice.LocalException ex)
 1119                        {
 01120                            setState(StateClosed, ex);
 01121                        }
 1122                    }
 11123                    else if (_state == StateFinished)
 1124                    {
 01125                        finished = true;
 01126                        _observer?.detach();
 1127                    }
 11128                    Monitor.PulseAll(_mutex);
 1129                }
 11130            }
 1131        }
 1132
 11133        if (finished && _removeFromFactory is not null)
 1134        {
 01135            _removeFromFactory(this);
 1136        }
 11137    }
 1138
 1139    public override void finished(ThreadPoolCurrent current)
 1140    {
 1141        // Lock the connection here to ensure setState() completes before the code below is executed. This method can
 1142        // be called by the thread pool as soon as setState() calls _threadPool->finish(...). There's no need to lock
 1143        // the mutex for the remainder of the code because the data members accessed by finish() are immutable once
 1144        // _state == StateClosed (and we don't want to hold the mutex when calling upcalls).
 11145        lock (_mutex)
 1146        {
 1147            Debug.Assert(_state == StateClosed);
 11148        }
 1149
 1150        //
 1151        // If there are no callbacks to call, we don't call ioCompleted() since we're not going
 1152        // to call code that will potentially block (this avoids promoting a new leader and
 1153        // unnecessary thread creation, especially if this is called on shutdown).
 1154        //
 11155        if (_startCallback is null && _sendStreams.Count == 0 && _asyncRequests.Count == 0 && _closeCallback is null)
 1156        {
 11157            finish();
 11158            return;
 1159        }
 1160
 11161        current.ioCompleted();
 11162        _threadPool.executeFromThisThread(finish, this);
 11163    }
 1164
 1165    private void finish()
 1166    {
 11167        if (!_initialized)
 1168        {
 11169            if (_instance.traceLevels().network >= 2)
 1170            {
 11171                var s = new StringBuilder("failed to ");
 11172                s.Append(_connector is not null ? "establish" : "accept");
 11173                s.Append(' ');
 11174                s.Append(_endpoint.protocol());
 11175                s.Append(" connection\n");
 11176                s.Append(ToString());
 11177                s.Append('\n');
 11178                s.Append(_exception);
 11179                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1180            }
 1181        }
 1182        else
 1183        {
 11184            if (_instance.traceLevels().network >= 1)
 1185            {
 11186                var s = new StringBuilder("closed ");
 11187                s.Append(_endpoint.protocol());
 11188                s.Append(" connection\n");
 11189                s.Append(ToString());
 1190
 1191                // Trace the cause of most connection closures.
 11192                if (!(_exception is CommunicatorDestroyedException || _exception is ObjectAdapterDeactivatedException))
 1193                {
 11194                    s.Append('\n');
 11195                    s.Append(_exception);
 1196                }
 1197
 11198                _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1199            }
 1200        }
 1201
 11202        _startCallback?.connectionStartFailed(this, _exception);
 11203        _startCallback = null;
 1204
 11205        if (_sendStreams.Count > 0)
 1206        {
 11207            if (!_writeStream.isEmpty())
 1208            {
 1209                //
 1210                // Return the stream to the outgoing call. This is important for
 1211                // retriable AMI calls which are not marshaled again.
 1212                //
 11213                OutgoingMessage message = _sendStreams.First.Value;
 11214                _writeStream.swap(message.stream);
 1215
 1216                //
 1217                // The current message might be sent but not yet removed from _sendStreams. If
 1218                // the response has been received in the meantime, we remove the message from
 1219                // _sendStreams to not call finished on a message which is already done.
 1220                //
 11221                if (message.isSent || message.receivedReply)
 1222                {
 11223                    if (message.sent() && message.invokeSent)
 1224                    {
 11225                        message.outAsync.invokeSent();
 1226                    }
 11227                    if (message.receivedReply)
 1228                    {
 01229                        var outAsync = (OutgoingAsync)message.outAsync;
 01230                        if (outAsync.response())
 1231                        {
 01232                            outAsync.invokeResponse();
 1233                        }
 1234                    }
 11235                    _sendStreams.RemoveFirst();
 1236                }
 1237            }
 1238
 11239            foreach (OutgoingMessage o in _sendStreams)
 1240            {
 11241                o.completed(_exception);
 11242                if (o.requestId > 0) // Make sure finished isn't called twice.
 1243                {
 11244                    _asyncRequests.Remove(o.requestId);
 1245                }
 1246            }
 11247            _sendStreams.Clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
 1248        }
 1249
 11250        foreach (OutgoingAsyncBase o in _asyncRequests.Values)
 1251        {
 11252            if (o.exception(_exception))
 1253            {
 11254                o.invokeException();
 1255            }
 1256        }
 11257        _asyncRequests.Clear();
 1258
 1259        //
 1260        // Don't wait to be reaped to reclaim memory allocated by read/write streams.
 1261        //
 11262        _writeStream.clear();
 11263        _writeStream.getBuffer().clear();
 11264        _readStream.clear();
 11265        _readStream.getBuffer().clear();
 1266
 11267        if (_exception is ConnectionClosedException or
 11268            CloseConnectionException or
 11269            CommunicatorDestroyedException or
 11270            ObjectAdapterDeactivatedException)
 1271        {
 1272            // Can execute synchronously. Note that we're not within a lock(this) here.
 11273            _closed.SetResult();
 1274        }
 1275        else
 1276        {
 1277            Debug.Assert(_exception is not null);
 11278            _closed.SetException(_exception);
 1279        }
 1280
 11281        if (_closeCallback is not null)
 1282        {
 1283            try
 1284            {
 11285                _closeCallback(this);
 11286            }
 01287            catch (System.Exception ex)
 1288            {
 01289                _logger.error("connection callback exception:\n" + ex + '\n' + _desc);
 01290            }
 11291            _closeCallback = null;
 1292        }
 1293
 1294        //
 1295        // This must be done last as this will cause waitUntilFinished() to return (and communicator
 1296        // objects such as the timer might be destroyed too).
 1297        //
 11298        bool finished = false;
 11299        lock (_mutex)
 1300        {
 11301            setState(StateFinished);
 1302
 11303            if (_upcallCount == 0)
 1304            {
 11305                finished = true;
 11306                _observer?.detach();
 1307            }
 11308        }
 1309
 11310        if (finished && _removeFromFactory is not null)
 1311        {
 11312            _removeFromFactory(this);
 1313        }
 11314    }
 1315
 1316    /// <inheritdoc/>
 11317    public override string ToString() => _desc; // No mutex lock, _desc is immutable.
 1318
 1319    /// <inheritdoc/>
 11320    public string type() => _type; // No mutex lock, _type is immutable.
 1321
 1322    /// <inheritdoc/>
 1323    public ConnectionInfo getInfo()
 1324    {
 11325        lock (_mutex)
 1326        {
 11327            if (_state >= StateClosed)
 1328            {
 11329                throw _exception;
 1330            }
 11331            return initConnectionInfo();
 1332        }
 11333    }
 1334
 1335    /// <inheritdoc/>
 1336    public void setBufferSize(int rcvSize, int sndSize)
 1337    {
 11338        lock (_mutex)
 1339        {
 11340            if (_state >= StateClosed)
 1341            {
 01342                throw _exception;
 1343            }
 11344            _transceiver.setBufferSize(rcvSize, sndSize);
 11345            _info = null; // Invalidate the cached connection info
 11346        }
 11347    }
 1348
 1349    public void exception(LocalException ex)
 1350    {
 11351        lock (_mutex)
 1352        {
 11353            setState(StateClosed, ex);
 11354        }
 11355    }
 1356
 01357    public Ice.Internal.ThreadPool getThreadPool() => _threadPool;
 1358
 11359    internal ConnectionI(
 11360        Instance instance,
 11361        Transceiver transceiver,
 11362        Connector connector, // null for incoming connections, non-null for outgoing connections
 11363        EndpointI endpoint,
 11364        ObjectAdapter adapter,
 11365        Action<ConnectionI> removeFromFactory, // can be null
 11366        ConnectionOptions options)
 1367    {
 11368        _instance = instance;
 11369        _desc = transceiver.ToString();
 11370        _type = transceiver.protocol();
 11371        _connector = connector;
 11372        _endpoint = endpoint;
 11373        _adapter = adapter;
 11374        InitializationData initData = instance.initializationData();
 11375        _logger = initData.logger; // Cached for better performance.
 11376        _traceLevels = instance.traceLevels(); // Cached for better performance.
 11377        _connectTimeout = options.connectTimeout;
 11378        _closeTimeout = options.closeTimeout; // not used for datagram connections
 1379        // suppress inactivity timeout for datagram connections
 11380        _inactivityTimeout = endpoint.datagram() ? TimeSpan.Zero : options.inactivityTimeout;
 11381        _maxDispatches = options.maxDispatches;
 11382        _removeFromFactory = removeFromFactory;
 11383        _warn = initData.properties.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 11384        _warnUdp = initData.properties.getIcePropertyAsInt("Ice.Warn.Datagrams") > 0;
 11385        _nextRequestId = 1;
 11386        _messageSizeMax = connector is null ? adapter.messageSizeMax() : instance.messageSizeMax();
 11387        _batchRequestQueue = new BatchRequestQueue(instance, _endpoint.datagram());
 11388        _readStream = new InputStream(instance, Protocol.currentProtocolEncoding);
 11389        _readHeader = false;
 11390        _readStreamPos = -1;
 11391        _writeStream = new OutputStream(); // temporary stream
 11392        _writeStreamPos = -1;
 11393        _upcallCount = 0;
 11394        _state = StateNotInitialized;
 1395
 11396        _compressionLevel = initData.properties.getIcePropertyAsInt("Ice.Compression.Level");
 11397        if (_compressionLevel < 1)
 1398        {
 01399            _compressionLevel = 1;
 1400        }
 11401        else if (_compressionLevel > 9)
 1402        {
 01403            _compressionLevel = 9;
 1404        }
 1405
 11406        if (options.idleTimeout > TimeSpan.Zero && !endpoint.datagram())
 1407        {
 11408            _idleTimeoutTransceiver = new IdleTimeoutTransceiverDecorator(
 11409                transceiver,
 11410                this,
 11411                options.idleTimeout,
 11412                options.enableIdleCheck);
 11413            transceiver = _idleTimeoutTransceiver;
 1414        }
 11415        _transceiver = transceiver;
 1416
 1417        try
 1418        {
 11419            if (connector is null)
 1420            {
 1421                // adapter is always set for incoming connections
 1422                Debug.Assert(adapter is not null);
 11423                _threadPool = adapter.getThreadPool();
 1424            }
 1425            else
 1426            {
 1427                // we use the client thread pool for outgoing connections, even if there is an
 1428                // object adapter with its own thread pool.
 11429                _threadPool = instance.clientThreadPool();
 1430            }
 1431            // On Windows, async socket I/O initiated on a thread that subsequently terminates is cancelled by the OS
 1432            // with SocketError.OperationAborted. The Ice thread pool can reap workers idle past ThreadIdleTime when
 1433            // SizeMax > 1, so in that combination we hop the I/O onto the .NET ThreadPool (whose threads are managed
 1434            // by the runtime and not reaped while owning pending I/O). Other platforms and fixed-size Ice pools don't
 1435            // need the hop. See startAsync.
 11436            _threadHopRequired = AssemblyUtil.isWindows && _threadPool.canShrink;
 11437            _threadPool.initialize(this);
 11438        }
 01439        catch (LocalException)
 1440        {
 01441            throw;
 1442        }
 01443        catch (System.Exception ex)
 1444        {
 01445            throw new SyscallException(ex);
 1446        }
 11447    }
 1448
 1449    /// <summary>
 1450    /// Aborts the connection with a <see cref="ConnectionAbortedException" /> if the connection is active and
 1451    /// does not receive a byte for some time. See the IdleTimeoutTransceiverDecorator.
 1452    /// </summary>
 1453    internal void idleCheck(TimeSpan idleTimeout)
 1454    {
 11455        lock (_mutex)
 1456        {
 11457            if (_state == StateActive && _idleTimeoutTransceiver!.idleCheckEnabled)
 1458            {
 11459                int idleTimeoutInSeconds = (int)idleTimeout.TotalSeconds;
 1460
 11461                setState(
 11462                    StateClosed,
 11463                    new ConnectionAbortedException(
 11464                        $"Connection aborted by the idle check because it did not receive any bytes for {idleTimeoutInSe
 11465                        closedByApplication: false));
 1466            }
 1467            // else nothing to do
 11468        }
 11469    }
 1470
 1471    internal void sendHeartbeat()
 1472    {
 1473        Debug.Assert(!_endpoint.datagram());
 1474
 11475        lock (_mutex)
 1476        {
 11477            if (_state == StateActive || _state == StateHolding || _state == StateClosing)
 1478            {
 1479                // We check if the connection has become inactive.
 11480                if (
 11481                    _inactivityTimer is null &&           // timer not already scheduled
 11482                    _inactivityTimeout > TimeSpan.Zero && // inactivity timeout is enabled
 11483                    _state == StateActive &&              // only schedule the timer if the connection is active
 11484                    _dispatchCount == 0 &&                // no pending dispatch
 11485                    _asyncRequests.Count == 0 &&          // no pending invocation
 11486                    _readHeader &&                        // we're not waiting for the remainder of an incoming message
 11487                    _sendStreams.Count <= 1)              // there is at most one pending outgoing message
 1488                {
 1489                    // We may become inactive while the peer is back-pressuring us. In this case, we only schedule the
 1490                    // inactivity timer if there is no pending outgoing message or the pending outgoing message is a
 1491                    // heartbeat.
 1492
 1493                    // The stream of the first _sendStreams message is in _writeStream.
 11494                    if (_sendStreams.Count == 0 || isHeartbeat(_writeStream))
 1495                    {
 11496                        scheduleInactivityTimer();
 1497                    }
 1498                }
 1499
 1500                // We send a heartbeat to the peer to generate a "write" on the connection. This write in turns creates
 1501                // a read on the peer, and resets the peer's idle check timer. When _sendStream is not empty, there is
 1502                // already an outstanding write, so we don't need to send a heartbeat. It's possible the first message
 1503                // of _sendStreams was already sent but not yet removed from _sendStreams: it means the last write
 1504                // occurred very recently, which is good enough with respect to the idle check.
 1505                // As a result of this optimization, the only possible heartbeat in _sendStreams is the first
 1506                // _sendStreams message.
 11507                if (_sendStreams.Count == 0)
 1508                {
 11509                    var os = new OutputStream(Protocol.currentProtocolEncoding);
 11510                    os.writeBlob(Protocol.magic);
 11511                    ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 11512                    EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 11513                    os.writeByte(Protocol.validateConnectionMsg);
 11514                    os.writeByte(0);
 11515                    os.writeInt(Protocol.headerSize); // Message size.
 1516                    try
 1517                    {
 11518                        _ = sendMessage(new OutgoingMessage(os, compress: false));
 11519                    }
 01520                    catch (LocalException ex)
 1521                    {
 01522                        setState(StateClosed, ex);
 01523                    }
 1524                }
 1525            }
 1526            // else nothing to do
 11527        }
 1528
 1529        static bool isHeartbeat(OutputStream stream) =>
 01530            stream.getBuffer().b.get(8) == Protocol.validateConnectionMsg;
 11531    }
 1532
 1533    private const int StateNotInitialized = 0;
 1534    private const int StateNotValidated = 1;
 1535    private const int StateActive = 2;
 1536    private const int StateHolding = 3;
 1537    private const int StateClosing = 4;
 1538    private const int StateClosingPending = 5;
 1539    private const int StateClosed = 6;
 1540    private const int StateFinished = 7;
 1541
 11542    private static ConnectionState toConnectionState(int state) => connectionStateMap[state];
 1543
 1544    private void setState(int state, LocalException ex)
 1545    {
 1546        //
 1547        // If setState() is called with an exception, then only closed
 1548        // and closing states are permissible.
 1549        //
 1550        Debug.Assert(state >= StateClosing);
 1551
 11552        if (_state == state) // Don't switch twice.
 1553        {
 11554            return;
 1555        }
 1556
 11557        if (_exception is null)
 1558        {
 1559            //
 1560            // If we are in closed state, an exception must be set.
 1561            //
 1562            Debug.Assert(_state != StateClosed);
 1563
 11564            _exception = ex;
 1565
 1566            //
 1567            // We don't warn if we are not validated.
 1568            //
 11569            if (_warn && _validated)
 1570            {
 1571                //
 1572                // Don't warn about certain expected exceptions.
 1573                //
 11574                if (!(_exception is CloseConnectionException ||
 11575                     _exception is ConnectionClosedException ||
 11576                     _exception is CommunicatorDestroyedException ||
 11577                     _exception is ObjectAdapterDeactivatedException ||
 11578                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1579                {
 01580                    warning("connection exception", _exception);
 1581                }
 1582            }
 1583        }
 1584
 1585        //
 1586        // We must set the new state before we notify requests of any
 1587        // exceptions. Otherwise new requests may retry on a
 1588        // connection that is not yet marked as closed or closing.
 1589        //
 11590        setState(state);
 11591    }
 1592
 1593    private void setState(int state)
 1594    {
 1595        //
 1596        // We don't want to send close connection messages if the endpoint
 1597        // only supports oneway transmission from client to server.
 1598        //
 11599        if (_endpoint.datagram() && state == StateClosing)
 1600        {
 11601            state = StateClosed;
 1602        }
 1603
 1604        //
 1605        // Skip graceful shutdown if we are destroyed before validation.
 1606        //
 11607        if (_state <= StateNotValidated && state == StateClosing)
 1608        {
 11609            state = StateClosed;
 1610        }
 1611
 11612        if (_state == state) // Don't switch twice.
 1613        {
 01614            return;
 1615        }
 1616
 11617        if (state > StateActive)
 1618        {
 1619            // Dispose the inactivity timer, if not null.
 11620            cancelInactivityTimer();
 1621        }
 1622
 1623        try
 1624        {
 1625            switch (state)
 1626            {
 1627                case StateNotInitialized:
 1628                {
 1629                    Debug.Assert(false);
 1630                    break;
 1631                }
 1632
 1633                case StateNotValidated:
 1634                {
 11635                    if (_state != StateNotInitialized)
 1636                    {
 1637                        Debug.Assert(_state == StateClosed);
 01638                        return;
 1639                    }
 1640                    break;
 1641                }
 1642
 1643                case StateActive:
 1644                {
 1645                    //
 1646                    // Can only switch to active from holding or not validated.
 1647                    //
 11648                    if (_state != StateHolding && _state != StateNotValidated)
 1649                    {
 01650                        return;
 1651                    }
 1652
 11653                    if (_maxDispatches <= 0 || _dispatchCount < _maxDispatches)
 1654                    {
 11655                        _threadPool.register(this, SocketOperation.Read);
 11656                        _idleTimeoutTransceiver?.enableIdleCheck();
 1657                    }
 1658                    // else don't resume reading since we're at or over the _maxDispatches limit.
 1659
 11660                    break;
 1661                }
 1662
 1663                case StateHolding:
 1664                {
 1665                    //
 1666                    // Can only switch to holding from active or not validated.
 1667                    //
 11668                    if (_state != StateActive && _state != StateNotValidated)
 1669                    {
 11670                        return;
 1671                    }
 1672
 11673                    if (_state == StateActive && (_maxDispatches <= 0 || _dispatchCount < _maxDispatches))
 1674                    {
 11675                        _threadPool.unregister(this, SocketOperation.Read);
 11676                        _idleTimeoutTransceiver?.disableIdleCheck();
 1677                    }
 1678                    // else reads are already disabled because the _maxDispatches limit is reached or exceeded.
 1679
 11680                    break;
 1681                }
 1682
 1683                case StateClosing:
 1684                case StateClosingPending:
 1685                {
 1686                    //
 1687                    // Can't change back from closing pending.
 1688                    //
 11689                    if (_state >= StateClosingPending)
 1690                    {
 11691                        return;
 1692                    }
 1693                    break;
 1694                }
 1695
 1696                case StateClosed:
 1697                {
 11698                    if (_state == StateFinished)
 1699                    {
 11700                        return;
 1701                    }
 1702
 11703                    _batchRequestQueue.destroy(_exception);
 11704                    _threadPool.finish(this);
 11705                    _transceiver.close();
 11706                    break;
 1707                }
 1708
 1709                case StateFinished:
 1710                {
 1711                    Debug.Assert(_state == StateClosed);
 11712                    _transceiver.destroy();
 1713                    break;
 1714                }
 1715            }
 11716        }
 01717        catch (LocalException ex)
 1718        {
 01719            _logger.error("unexpected connection exception:\n" + ex + "\n" + _transceiver.ToString());
 01720        }
 1721
 11722        if (_instance.initializationData().observer is not null)
 1723        {
 11724            ConnectionState oldState = toConnectionState(_state);
 11725            ConnectionState newState = toConnectionState(state);
 11726            if (oldState != newState)
 1727            {
 11728                _observer = _instance.initializationData().observer.getConnectionObserver(
 11729                    initConnectionInfo(),
 11730                    _endpoint,
 11731                    newState,
 11732                    _observer);
 11733                if (_observer is not null)
 1734                {
 11735                    _observer.attach();
 1736                }
 1737                else
 1738                {
 11739                    _writeStreamPos = -1;
 11740                    _readStreamPos = -1;
 1741                }
 1742            }
 11743            if (_observer is not null && state == StateClosed && _exception is not null)
 1744            {
 11745                if (!(_exception is CloseConnectionException ||
 11746                     _exception is ConnectionClosedException ||
 11747                     _exception is CommunicatorDestroyedException ||
 11748                     _exception is ObjectAdapterDeactivatedException ||
 11749                     (_exception is ConnectionLostException && _state >= StateClosing)))
 1750                {
 11751                    _observer.failed(_exception.ice_id());
 1752                }
 1753            }
 1754        }
 11755        _state = state;
 1756
 11757        Monitor.PulseAll(_mutex);
 1758
 11759        if (_state == StateClosing && _upcallCount == 0)
 1760        {
 1761            try
 1762            {
 11763                initiateShutdown();
 11764            }
 01765            catch (LocalException ex)
 1766            {
 01767                setState(StateClosed, ex);
 01768            }
 1769        }
 11770    }
 1771
 1772    private void initiateShutdown()
 1773    {
 1774        Debug.Assert(_state == StateClosing && _upcallCount == 0);
 1775
 11776        if (_shutdownInitiated)
 1777        {
 11778            return;
 1779        }
 11780        _shutdownInitiated = true;
 1781
 11782        if (!_endpoint.datagram())
 1783        {
 1784            //
 1785            // Before we shut down, we send a close connection message.
 1786            //
 11787            var os = new OutputStream(Protocol.currentProtocolEncoding);
 11788            os.writeBlob(Protocol.magic);
 11789            ProtocolVersion.ice_write(os, Protocol.currentProtocol);
 11790            EncodingVersion.ice_write(os, Protocol.currentProtocolEncoding);
 11791            os.writeByte(Protocol.closeConnectionMsg);
 11792            os.writeByte(0); // Compression status: always zero for close connection.
 11793            os.writeInt(Protocol.headerSize); // Message size.
 1794
 11795            scheduleCloseTimer();
 1796
 11797            if ((sendMessage(new OutgoingMessage(os, compress: false)) & OutgoingAsyncBase.AsyncStatusSent) != 0)
 1798            {
 11799                setState(StateClosingPending);
 1800
 1801                //
 1802                // Notify the transceiver of the graceful connection closure.
 1803                //
 11804                int op = _transceiver.closing(true, _exception);
 11805                if (op != 0)
 1806                {
 11807                    _threadPool.register(this, op);
 1808                }
 1809            }
 1810        }
 11811    }
 1812
 1813    private bool initialize(int operation)
 1814    {
 11815        int s = _transceiver.initialize(_readStream.getBuffer(), _writeStream.getBuffer(), ref _hasMoreData);
 11816        if (s != SocketOperation.None)
 1817        {
 11818            _threadPool.update(this, operation, s);
 11819            return false;
 1820        }
 1821
 1822        //
 1823        // Update the connection description once the transceiver is initialized.
 1824        //
 11825        _desc = _transceiver.ToString();
 11826        _initialized = true;
 11827        setState(StateNotValidated);
 1828
 11829        return true;
 1830    }
 1831
 1832    private bool validate(int operation)
 1833    {
 11834        if (!_endpoint.datagram()) // Datagram connections are always implicitly validated.
 1835        {
 11836            if (_connector is null) // The server side has the active role for connection validation.
 1837            {
 11838                if (_writeStream.size() == 0)
 1839                {
 11840                    _writeStream.writeBlob(Protocol.magic);
 11841                    ProtocolVersion.ice_write(_writeStream, Protocol.currentProtocol);
 11842                    EncodingVersion.ice_write(_writeStream, Protocol.currentProtocolEncoding);
 11843                    _writeStream.writeByte(Protocol.validateConnectionMsg);
 11844                    _writeStream.writeByte(0); // Compression status (always zero for validate connection).
 11845                    _writeStream.writeInt(Protocol.headerSize); // Message size.
 11846                    TraceUtil.traceSend(_writeStream, _instance, this, _logger, _traceLevels);
 11847                    _writeStream.prepareWrite();
 1848                }
 1849
 11850                if (_observer is not null)
 1851                {
 01852                    observerStartWrite(_writeStream.getBuffer());
 1853                }
 1854
 11855                if (_writeStream.pos() != _writeStream.size())
 1856                {
 11857                    int op = write(_writeStream.getBuffer());
 11858                    if (op != 0)
 1859                    {
 11860                        _threadPool.update(this, operation, op);
 11861                        return false;
 1862                    }
 1863                }
 1864
 11865                if (_observer is not null)
 1866                {
 01867                    observerFinishWrite(_writeStream.getBuffer());
 1868                }
 1869            }
 1870            else // The client side has the passive role for connection validation.
 1871            {
 11872                if (_readStream.size() == 0)
 1873                {
 11874                    _readStream.resize(Protocol.headerSize);
 11875                    _readStream.pos(0);
 1876                }
 1877
 11878                if (_observer is not null)
 1879                {
 01880                    observerStartRead(_readStream.getBuffer());
 1881                }
 1882
 11883                if (_readStream.pos() != _readStream.size())
 1884                {
 11885                    int op = read(_readStream.getBuffer());
 11886                    if (op != 0)
 1887                    {
 11888                        _threadPool.update(this, operation, op);
 11889                        return false;
 1890                    }
 1891                }
 1892
 11893                if (_observer is not null)
 1894                {
 01895                    observerFinishRead(_readStream.getBuffer());
 1896                }
 1897
 11898                _validated = true;
 1899
 1900                Debug.Assert(_readStream.pos() == Protocol.headerSize);
 11901                _readStream.pos(0);
 11902                byte[] m = _readStream.readBlob(4);
 11903                if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] ||
 11904                   m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
 1905                {
 01906                    throw new ProtocolException(
 01907                        $"Bad magic in message header: {m[0]:X2} {m[1]:X2} {m[2]:X2} {m[3]:X2}");
 1908                }
 1909
 11910                var pv = new ProtocolVersion(_readStream);
 11911                if (pv != Protocol.currentProtocol)
 1912                {
 01913                    throw new MarshalException(
 01914                        $"Invalid protocol version in message header: {pv.major}.{pv.minor}");
 1915                }
 11916                var ev = new EncodingVersion(_readStream);
 11917                if (ev != Protocol.currentProtocolEncoding)
 1918                {
 01919                    throw new MarshalException(
 01920                        $"Invalid protocol encoding version in message header: {ev.major}.{ev.minor}");
 1921                }
 1922
 11923                byte messageType = _readStream.readByte();
 11924                if (messageType != Protocol.validateConnectionMsg)
 1925                {
 01926                    throw new ProtocolException(
 01927                        $"Received message of type {messageType} over a connection that is not yet validated.");
 1928                }
 11929                _readStream.readByte(); // Ignore compression status for validate connection.
 11930                int size = _readStream.readInt();
 11931                if (size != Protocol.headerSize)
 1932                {
 01933                    throw new MarshalException($"Received ValidateConnection message with unexpected size {size}.");
 1934                }
 11935                TraceUtil.traceRecv(_readStream, this, _logger, _traceLevels);
 1936
 1937                // Client connection starts sending heartbeats once it's received the ValidateConnection message.
 11938                _idleTimeoutTransceiver?.scheduleHeartbeat();
 1939            }
 1940        }
 1941
 11942        _writeStream.resize(0);
 11943        _writeStream.pos(0);
 1944
 11945        _readStream.resize(Protocol.headerSize);
 11946        _readStream.pos(0);
 11947        _readHeader = true;
 1948
 11949        if (_instance.traceLevels().network >= 1)
 1950        {
 11951            var s = new StringBuilder();
 11952            if (_endpoint.datagram())
 1953            {
 11954                s.Append("starting to ");
 11955                s.Append(_connector is not null ? "send" : "receive");
 11956                s.Append(' ');
 11957                s.Append(_endpoint.protocol());
 11958                s.Append(" messages\n");
 11959                s.Append(_transceiver.toDetailedString());
 1960            }
 1961            else
 1962            {
 11963                s.Append(_connector is not null ? "established" : "accepted");
 11964                s.Append(' ');
 11965                s.Append(_endpoint.protocol());
 11966                s.Append(" connection\n");
 11967                s.Append(ToString());
 1968            }
 11969            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 1970        }
 1971
 11972        return true;
 1973    }
 1974
 1975    /// <summary>
 1976    /// Sends the next queued messages. This method is called by message() once the message which is being sent
 1977    /// (_sendStreams.First) is fully sent. Before sending the next message, this message is removed from _sendsStream
 1978    /// If any, its sent callback is also queued in given callback queue.
 1979    /// </summary>
 1980    /// <param name="callbacks">The sent callbacks to call for the messages that were sent.</param>
 1981    /// <returns>The socket operation to register with the thread pool's selector to send the remainder of the pending
 1982    /// message being sent (_sendStreams.First).</returns>
 1983    private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
 1984    {
 11985        callbacks = null;
 1986
 11987        if (_sendStreams.Count == 0)
 1988        {
 1989            // This can occur if no message was being written and the socket write operation was registered with the
 1990            // thread pool (a transceiver read method can request writing data).
 11991            return SocketOperation.None;
 1992        }
 11993        else if (_state == StateClosingPending && _writeStream.pos() == 0)
 1994        {
 1995            // Message wasn't sent, empty the _writeStream, we're not going to send more data because the connection
 1996            // is being closed.
 01997            OutgoingMessage message = _sendStreams.First.Value;
 01998            _writeStream.swap(message.stream);
 01999            return SocketOperation.None;
 2000        }
 2001
 2002        // Assert that the message was fully written.
 2003        Debug.Assert(!_writeStream.isEmpty() && _writeStream.pos() == _writeStream.size());
 2004
 2005        try
 2006        {
 12007            while (true)
 2008            {
 2009                //
 2010                // The message that was being sent is sent. We can swap back the write stream buffer to the
 2011                // outgoing message (required for retry) and queue its sent callback (if any).
 2012                //
 12013                OutgoingMessage message = _sendStreams.First.Value;
 12014                _writeStream.swap(message.stream);
 12015                if (message.sent())
 2016                {
 12017                    callbacks ??= new Queue<OutgoingMessage>();
 12018                    callbacks.Enqueue(message);
 2019                }
 12020                _sendStreams.RemoveFirst();
 2021
 2022                //
 2023                // If there's nothing left to send, we're done.
 2024                //
 12025                if (_sendStreams.Count == 0)
 2026                {
 2027                    break;
 2028                }
 2029
 2030                //
 2031                // If we are in the closed state or if the close is pending, don't continue sending. This can occur if
 2032                // parseMessage (called before sendNextMessage by message()) closes the connection.
 2033                //
 12034                if (_state >= StateClosingPending)
 2035                {
 12036                    return SocketOperation.None;
 2037                }
 2038
 2039                //
 2040                // Otherwise, prepare the next message.
 2041                //
 12042                message = _sendStreams.First.Value;
 2043                Debug.Assert(!message.prepared);
 12044                OutputStream stream = message.stream;
 2045
 12046                message.stream = doCompress(message.stream, message.compress);
 12047                message.stream.prepareWrite();
 12048                message.prepared = true;
 2049
 12050                TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2051
 2052                //
 2053                // Send the message.
 2054                //
 12055                _writeStream.swap(message.stream);
 12056                if (_observer is not null)
 2057                {
 12058                    observerStartWrite(_writeStream.getBuffer());
 2059                }
 12060                if (_writeStream.pos() != _writeStream.size())
 2061                {
 12062                    int op = write(_writeStream.getBuffer());
 12063                    if (op != 0)
 2064                    {
 12065                        return op;
 2066                    }
 2067                }
 12068                if (_observer is not null)
 2069                {
 12070                    observerFinishWrite(_writeStream.getBuffer());
 2071                }
 2072
 2073                // If the message was sent right away, loop to send the next queued message.
 2074            }
 2075
 2076            // Once the CloseConnection message is sent, we transition to the StateClosingPending state.
 12077            if (_state == StateClosing && _shutdownInitiated)
 2078            {
 12079                setState(StateClosingPending);
 12080                int op = _transceiver.closing(true, _exception);
 12081                if (op != 0)
 2082                {
 12083                    return op;
 2084                }
 2085            }
 12086        }
 02087        catch (LocalException ex)
 2088        {
 02089            setState(StateClosed, ex);
 02090        }
 12091        return SocketOperation.None;
 12092    }
 2093
 2094    /// <summary>
 2095    /// Sends or queues the given message.
 2096    /// </summary>
 2097    /// <param name="message">The message to send.</param>
 2098    /// <returns>The send status.</returns>
 2099    private int sendMessage(OutgoingMessage message)
 2100    {
 2101        Debug.Assert(_state >= StateActive);
 2102        Debug.Assert(_state < StateClosed);
 2103
 2104        // Some messages are queued for sending. Just adds the message to the send queue and tell the caller that
 2105        // the message was queued.
 12106        if (_sendStreams.Count > 0)
 2107        {
 12108            _sendStreams.AddLast(message);
 12109            return OutgoingAsyncBase.AsyncStatusQueued;
 2110        }
 2111
 2112        // Prepare the message for sending.
 2113        Debug.Assert(!message.prepared);
 2114
 12115        OutputStream stream = message.stream;
 2116
 12117        message.stream = doCompress(stream, message.compress);
 12118        message.stream.prepareWrite();
 12119        message.prepared = true;
 2120
 12121        TraceUtil.traceSend(stream, _instance, this, _logger, _traceLevels);
 2122
 2123        // Send the message without blocking.
 12124        if (_observer is not null)
 2125        {
 12126            observerStartWrite(message.stream.getBuffer());
 2127        }
 12128        int op = write(message.stream.getBuffer());
 12129        if (op == 0)
 2130        {
 2131            // The message was sent so we're done.
 2132
 12133            if (_observer is not null)
 2134            {
 12135                observerFinishWrite(message.stream.getBuffer());
 2136            }
 2137
 12138            int status = OutgoingAsyncBase.AsyncStatusSent;
 12139            if (message.sent())
 2140            {
 2141                // If there's a sent callback, indicate the caller that it should invoke the sent callback.
 12142                status |= OutgoingAsyncBase.AsyncStatusInvokeSentCallback;
 2143            }
 2144
 12145            return status;
 2146        }
 2147
 2148        // The message couldn't be sent right away so we add it to the send stream queue (which is empty) and swap its
 2149        // stream with `_writeStream`. The socket operation returned by the transceiver write is registered with the
 2150        // thread pool. At this point the message() method will take care of sending the whole message (held by
 2151        // _writeStream) when the transceiver is ready to write more of the message buffer.
 2152
 12153        _writeStream.swap(message.stream);
 12154        _sendStreams.AddLast(message);
 12155        _threadPool.register(this, op);
 12156        return OutgoingAsyncBase.AsyncStatusQueued;
 2157    }
 2158
 2159    private OutputStream doCompress(OutputStream decompressed, bool compress)
 2160    {
 12161        if (BZip2.isLoaded(_logger) && compress && decompressed.size() >= 100)
 2162        {
 2163            //
 2164            // Do compression.
 2165            //
 12166            Ice.Internal.Buffer cbuf = BZip2.compress(
 12167                decompressed.getBuffer(),
 12168                Protocol.headerSize,
 12169                _compressionLevel);
 12170            if (cbuf is not null)
 2171            {
 12172                var cstream = new OutputStream(new Internal.Buffer(cbuf, true), decompressed.getEncoding());
 2173
 2174                //
 2175                // Set compression status.
 2176                //
 12177                cstream.pos(9);
 12178                cstream.writeByte(2);
 2179
 2180                //
 2181                // Write the size of the compressed stream into the header.
 2182                //
 12183                cstream.pos(10);
 12184                cstream.writeInt(cstream.size());
 2185
 2186                //
 2187                // Write the compression status and size of the compressed stream into the header of the
 2188                // decompressed stream -- we need this to trace requests correctly.
 2189                //
 12190                decompressed.pos(9);
 12191                decompressed.writeByte(2);
 12192                decompressed.writeInt(cstream.size());
 2193
 12194                return cstream;
 2195            }
 2196        }
 2197
 2198        // Write the compression status. If BZip2 is loaded and compress is set to true, we write 1, to request a
 2199        // compressed reply. Otherwise, we write 0 either BZip2 is not loaded or we are sending an uncompressed reply.
 12200        decompressed.pos(9);
 12201        decompressed.writeByte((byte)((BZip2.isLoaded(_logger) && compress) ? 1 : 0));
 2202
 2203        //
 2204        // Not compressed, fill in the message size.
 2205        //
 12206        decompressed.pos(10);
 12207        decompressed.writeInt(decompressed.size());
 2208
 12209        return decompressed;
 2210    }
 2211
 2212    private struct MessageInfo
 2213    {
 2214        public InputStream stream;
 2215        public int requestCount;
 2216        public int requestId;
 2217        public byte compress;
 2218        public ObjectAdapter adapter;
 2219        public OutgoingAsyncBase outAsync;
 2220        public int upcallCount;
 2221    }
 2222
 2223    private int parseMessage(ref MessageInfo info)
 2224    {
 2225        Debug.Assert(_state > StateNotValidated && _state < StateClosed);
 2226
 12227        info.stream = new InputStream(_instance, Protocol.currentProtocolEncoding);
 12228        _readStream.swap(info.stream);
 12229        _readStream.resize(Protocol.headerSize);
 12230        _readStream.pos(0);
 12231        _readHeader = true;
 2232
 2233        Debug.Assert(info.stream.pos() == info.stream.size());
 2234
 2235        try
 2236        {
 2237            //
 2238            // The magic and version fields have already been checked.
 2239            //
 12240            info.stream.pos(8);
 12241            byte messageType = info.stream.readByte();
 12242            info.compress = info.stream.readByte();
 12243            if (info.compress == 2)
 2244            {
 12245                if (BZip2.isLoaded(_logger))
 2246                {
 12247                    Ice.Internal.Buffer ubuf = BZip2.decompress(
 12248                        info.stream.getBuffer(),
 12249                        Protocol.headerSize,
 12250                        _messageSizeMax);
 12251                    info.stream = new InputStream(info.stream.instance(), info.stream.getEncoding(), ubuf, true);
 2252                }
 2253                else
 2254                {
 02255                    throw new FeatureNotSupportedException(
 02256                        "Cannot decompress compressed message: BZip2 library is not loaded.");
 2257                }
 2258            }
 12259            info.stream.pos(Protocol.headerSize);
 2260
 2261            switch (messageType)
 2262            {
 2263                case Protocol.closeConnectionMsg:
 2264                {
 12265                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12266                    if (_endpoint.datagram())
 2267                    {
 02268                        if (_warn)
 2269                        {
 02270                            _logger.warning("ignoring close connection message for datagram connection:\n" + _desc);
 2271                        }
 2272                    }
 2273                    else
 2274                    {
 12275                        setState(StateClosingPending, new CloseConnectionException());
 2276
 2277                        //
 2278                        // Notify the transceiver of the graceful connection closure.
 2279                        //
 12280                        int op = _transceiver.closing(false, _exception);
 12281                        if (op != 0)
 2282                        {
 12283                            scheduleCloseTimer();
 12284                            return op;
 2285                        }
 12286                        setState(StateClosed);
 2287                    }
 12288                    break;
 2289                }
 2290
 2291                case Protocol.requestMsg:
 2292                {
 12293                    if (_state >= StateClosing)
 2294                    {
 12295                        TraceUtil.trace(
 12296                            "received request during closing\n(ignored by server, client will retry)",
 12297                            info.stream,
 12298                            this,
 12299                            _logger,
 12300                            _traceLevels);
 2301                    }
 2302                    else
 2303                    {
 12304                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12305                        info.requestId = info.stream.readInt();
 12306                        info.requestCount = 1;
 12307                        info.adapter = _adapter;
 12308                        ++info.upcallCount;
 2309
 12310                        cancelInactivityTimer();
 12311                        ++_dispatchCount;
 2312                    }
 12313                    break;
 2314                }
 2315
 2316                case Protocol.requestBatchMsg:
 2317                {
 12318                    if (_state >= StateClosing)
 2319                    {
 02320                        TraceUtil.trace(
 02321                            "received batch request during closing\n(ignored by server, client will retry)",
 02322                            info.stream,
 02323                            this,
 02324                            _logger,
 02325                            _traceLevels);
 2326                    }
 2327                    else
 2328                    {
 12329                        TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12330                        int requestCount = info.stream.readInt();
 12331                        if (requestCount < 0)
 2332                        {
 02333                            throw new MarshalException($"Received batch request with {requestCount} batches.");
 2334                        }
 12335                        info.requestCount = requestCount;
 12336                        info.adapter = _adapter;
 12337                        info.upcallCount += info.requestCount;
 2338
 12339                        cancelInactivityTimer();
 12340                        _dispatchCount += info.requestCount;
 2341                    }
 12342                    break;
 2343                }
 2344
 2345                case Protocol.replyMsg:
 2346                {
 12347                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12348                    info.requestId = info.stream.readInt();
 12349                    if (_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
 2350                    {
 12351                        _asyncRequests.Remove(info.requestId);
 2352
 12353                        info.outAsync.getIs().swap(info.stream);
 2354
 2355                        //
 2356                        // If we just received the reply for a request which isn't acknowledge as
 2357                        // sent yet, we queue the reply instead of processing it right away. It
 2358                        // will be processed once the write callback is invoked for the message.
 2359                        //
 12360                        OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
 12361                        if (message is not null && message.outAsync == info.outAsync)
 2362                        {
 12363                            message.receivedReply = true;
 2364                        }
 12365                        else if (info.outAsync.response())
 2366                        {
 12367                            ++info.upcallCount;
 2368                        }
 2369                        else
 2370                        {
 12371                            info.outAsync = null;
 2372                        }
 12373                        if (_closeRequested && _state < StateClosing && _asyncRequests.Count == 0)
 2374                        {
 12375                            doApplicationClose();
 2376                        }
 2377                    }
 12378                    break;
 2379                }
 2380
 2381                case Protocol.validateConnectionMsg:
 2382                {
 12383                    TraceUtil.traceRecv(info.stream, this, _logger, _traceLevels);
 12384                    break;
 2385                }
 2386
 2387                default:
 2388                {
 02389                    TraceUtil.trace(
 02390                        "received unknown message\n(invalid, closing connection)",
 02391                        info.stream,
 02392                        this,
 02393                        _logger,
 02394                        _traceLevels);
 2395
 02396                    throw new ProtocolException($"Received Ice protocol message with unknown type: {messageType}");
 2397                }
 2398            }
 12399        }
 12400        catch (LocalException ex)
 2401        {
 12402            if (_endpoint.datagram())
 2403            {
 02404                if (_warn)
 2405                {
 02406                    _logger.warning("datagram connection exception:\n" + ex.ToString() + "\n" + _desc);
 2407                }
 2408            }
 2409            else
 2410            {
 12411                setState(StateClosed, ex);
 2412            }
 12413        }
 2414
 12415        if (_state == StateHolding)
 2416        {
 2417            // Don't continue reading if the connection is in the holding state.
 02418            return SocketOperation.None;
 2419        }
 12420        else if (_maxDispatches > 0 && _dispatchCount >= _maxDispatches)
 2421        {
 2422            // Don't continue reading if the _maxDispatches limit is reached or exceeded.
 12423            _idleTimeoutTransceiver?.disableIdleCheck();
 12424            return SocketOperation.None;
 2425        }
 2426        else
 2427        {
 2428            // Continue reading.
 12429            return SocketOperation.Read;
 2430        }
 12431    }
 2432
 2433    private void dispatchAll(
 2434        InputStream stream,
 2435        int requestCount,
 2436        int requestId,
 2437        byte compress,
 2438        ObjectAdapter adapter)
 2439    {
 2440        // Note: In contrast to other private or protected methods, this method must be called *without* the mutex
 2441        // locked.
 2442
 12443        Object dispatcher = adapter?.dispatchPipeline;
 2444
 2445        try
 2446        {
 12447            while (requestCount > 0)
 2448            {
 2449                // adapter can be null here, however the adapter set in current can't be null, and we never pass
 2450                // a null current.adapter to the application code. Once this file enables nullable, adapter should be
 2451                // adapter! below.
 12452                var request = new IncomingRequest(requestId, this, adapter, stream);
 2453
 12454                if (dispatcher is not null)
 2455                {
 2456                    // We don't and can't await the dispatchAsync: with batch requests, we want all the dispatches to
 2457                    // execute in the current Ice thread pool thread. If we awaited the dispatchAsync, we could
 2458                    // switch to a .NET thread pool thread.
 12459                    _ = dispatchAsync(request);
 2460                }
 2461                else
 2462                {
 2463                    // Received request on a connection without an object adapter.
 12464                    sendResponse(
 12465                        request.current.createOutgoingResponse(new ObjectNotExistException()),
 12466                        isTwoWay: !_endpoint.datagram() && requestId != 0,
 12467                        compress: 0);
 2468                }
 12469                --requestCount;
 2470            }
 2471
 12472            stream.clear();
 12473        }
 02474        catch (LocalException ex) // TODO: catch all exceptions
 2475        {
 2476            // Typically, the IncomingRequest constructor throws an exception, and we can't continue.
 02477            dispatchException(ex, requestCount);
 02478        }
 2479
 2480        async Task dispatchAsync(IncomingRequest request)
 2481        {
 2482            try
 2483            {
 2484                OutgoingResponse response;
 2485
 2486                try
 2487                {
 12488                    response = await dispatcher.dispatchAsync(request).ConfigureAwait(false);
 12489                }
 12490                catch (System.Exception ex)
 2491                {
 12492                    response = request.current.createOutgoingResponse(ex);
 12493                }
 2494
 12495                sendResponse(response, isTwoWay: !_endpoint.datagram() && requestId != 0, compress);
 12496            }
 02497            catch (LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 2498            {
 02499                dispatchException(ex, requestCount: 1);
 02500            }
 12501        }
 12502    }
 2503
 2504    private void sendResponse(OutgoingResponse response, bool isTwoWay, byte compress)
 2505    {
 12506        bool finished = false;
 2507        try
 2508        {
 12509            lock (_mutex)
 2510            {
 2511                Debug.Assert(_state > StateNotValidated);
 2512
 2513                try
 2514                {
 12515                    if (--_upcallCount == 0)
 2516                    {
 12517                        if (_state == StateFinished)
 2518                        {
 12519                            finished = true;
 12520                            _observer?.detach();
 2521                        }
 12522                        Monitor.PulseAll(_mutex);
 2523                    }
 2524
 12525                    if (_state >= StateClosed)
 2526                    {
 2527                        Debug.Assert(_exception is not null);
 12528                        throw _exception;
 2529                    }
 2530
 12531                    if (isTwoWay)
 2532                    {
 12533                        sendMessage(new OutgoingMessage(response.outputStream, compress > 0));
 2534                    }
 2535
 12536                    if (_state == StateActive && _maxDispatches > 0 && _dispatchCount == _maxDispatches)
 2537                    {
 2538                        // Resume reading if the connection is active and the dispatch count is about to be less than
 2539                        // _maxDispatches.
 12540                        _threadPool.update(this, SocketOperation.None, SocketOperation.Read);
 12541                        _idleTimeoutTransceiver?.enableIdleCheck();
 2542                    }
 2543
 12544                    --_dispatchCount;
 2545
 12546                    if (_state == StateClosing && _upcallCount == 0)
 2547                    {
 12548                        initiateShutdown();
 2549                    }
 12550                }
 12551                catch (LocalException ex)
 2552                {
 12553                    setState(StateClosed, ex);
 12554                }
 2555            }
 2556        }
 2557        finally
 2558        {
 12559            if (finished && _removeFromFactory is not null)
 2560            {
 12561                _removeFromFactory(this);
 2562            }
 12563        }
 12564    }
 2565
 2566    private void dispatchException(LocalException ex, int requestCount)
 2567    {
 02568        bool finished = false;
 2569
 2570        // Fatal exception while dispatching a request. Since sendResponse isn't called in case of a fatal exception
 2571        // we decrement _upcallCount here.
 02572        lock (_mutex)
 2573        {
 02574            setState(StateClosed, ex);
 2575
 02576            if (requestCount > 0)
 2577            {
 2578                Debug.Assert(_upcallCount >= requestCount);
 02579                _upcallCount -= requestCount;
 02580                if (_upcallCount == 0)
 2581                {
 02582                    if (_state == StateFinished)
 2583                    {
 02584                        finished = true;
 02585                        _observer?.detach();
 2586                    }
 02587                    Monitor.PulseAll(_mutex);
 2588                }
 2589            }
 02590        }
 2591
 02592        if (finished && _removeFromFactory is not null)
 2593        {
 02594            _removeFromFactory(this);
 2595        }
 02596    }
 2597
 2598    private void inactivityCheck(System.Threading.Timer inactivityTimer)
 2599    {
 12600        lock (_mutex)
 2601        {
 2602            // If the timers are different, it means this inactivityTimer is no longer current.
 12603            if (inactivityTimer == _inactivityTimer)
 2604            {
 12605                _inactivityTimer = null;
 12606                inactivityTimer.Dispose(); // non-blocking
 2607
 12608                if (_state == StateActive)
 2609                {
 12610                    setState(
 12611                        StateClosing,
 12612                        new ConnectionClosedException(
 12613                            "Connection closed because it remained inactive for longer than the inactivity timeout.",
 12614                            closedByApplication: false));
 2615                }
 2616            }
 2617            // Else this timer was already canceled and disposed. Nothing to do.
 12618        }
 12619    }
 2620
 2621    private void connectTimedOut(System.Threading.Timer connectTimer)
 2622    {
 12623        lock (_mutex)
 2624        {
 12625            if (_state < StateActive)
 2626            {
 12627                setState(StateClosed, new ConnectTimeoutException());
 2628            }
 12629        }
 2630        // else ignore since we're already connected.
 12631        connectTimer.Dispose();
 12632    }
 2633
 2634    private void closeTimedOut(System.Threading.Timer closeTimer)
 2635    {
 12636        lock (_mutex)
 2637        {
 12638            if (_state < StateClosed)
 2639            {
 2640                // We don't use setState(state, exception) because we want to overwrite the exception set by a
 2641                // graceful closure.
 12642                _exception = new CloseTimeoutException();
 12643                setState(StateClosed);
 2644            }
 12645        }
 2646        // else ignore since we're already closed.
 12647        closeTimer.Dispose();
 12648    }
 2649
 2650    private ConnectionInfo initConnectionInfo()
 2651    {
 2652        // Called with _mutex locked.
 2653
 12654        if (_state > StateNotInitialized && _info is not null) // Update the connection info until it's initialized
 2655        {
 12656            return _info;
 2657        }
 2658
 12659        _info =
 12660            _transceiver.getInfo(incoming: _connector is null, _adapter?.getName() ?? "", _endpoint.connectionId());
 12661        return _info;
 2662    }
 2663
 02664    private void warning(string msg, System.Exception ex) => _logger.warning($"{msg}:\n{ex}\n{_transceiver}");
 2665
 2666    private void observerStartRead(Ice.Internal.Buffer buf)
 2667    {
 12668        if (_readStreamPos >= 0)
 2669        {
 2670            Debug.Assert(!buf.empty());
 12671            _observer.receivedBytes(buf.b.position() - _readStreamPos);
 2672        }
 12673        _readStreamPos = buf.empty() ? -1 : buf.b.position();
 12674    }
 2675
 2676    private void observerFinishRead(Ice.Internal.Buffer buf)
 2677    {
 12678        if (_readStreamPos == -1)
 2679        {
 02680            return;
 2681        }
 2682        Debug.Assert(buf.b.position() >= _readStreamPos);
 12683        _observer.receivedBytes(buf.b.position() - _readStreamPos);
 12684        _readStreamPos = -1;
 12685    }
 2686
 2687    private void observerStartWrite(Ice.Internal.Buffer buf)
 2688    {
 12689        if (_writeStreamPos >= 0)
 2690        {
 2691            Debug.Assert(!buf.empty());
 12692            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2693        }
 12694        _writeStreamPos = buf.empty() ? -1 : buf.b.position();
 12695    }
 2696
 2697    private void observerFinishWrite(Ice.Internal.Buffer buf)
 2698    {
 12699        if (_writeStreamPos == -1)
 2700        {
 12701            return;
 2702        }
 12703        if (buf.b.position() > _writeStreamPos)
 2704        {
 12705            _observer.sentBytes(buf.b.position() - _writeStreamPos);
 2706        }
 12707        _writeStreamPos = -1;
 12708    }
 2709
 2710    private int read(Ice.Internal.Buffer buf)
 2711    {
 12712        int start = buf.b.position();
 12713        int op = _transceiver.read(buf, ref _hasMoreData);
 12714        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2715        {
 12716            var s = new StringBuilder("received ");
 12717            if (_endpoint.datagram())
 2718            {
 02719                s.Append(buf.b.limit());
 2720            }
 2721            else
 2722            {
 12723                s.Append(buf.b.position() - start);
 12724                s.Append(" of ");
 12725                s.Append(buf.b.limit() - start);
 2726            }
 12727            s.Append(" bytes via ");
 12728            s.Append(_endpoint.protocol());
 12729            s.Append('\n');
 12730            s.Append(ToString());
 12731            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2732        }
 12733        return op;
 2734    }
 2735
 2736    private int write(Ice.Internal.Buffer buf)
 2737    {
 12738        int start = buf.b.position();
 12739        int op = _transceiver.write(buf);
 12740        if (_instance.traceLevels().network >= 3 && buf.b.position() != start)
 2741        {
 12742            var s = new StringBuilder("sent ");
 12743            s.Append(buf.b.position() - start);
 12744            if (!_endpoint.datagram())
 2745            {
 12746                s.Append(" of ");
 12747                s.Append(buf.b.limit() - start);
 2748            }
 12749            s.Append(" bytes via ");
 12750            s.Append(_endpoint.protocol());
 12751            s.Append('\n');
 12752            s.Append(ToString());
 12753            _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString());
 2754        }
 12755        return op;
 2756    }
 2757
 2758    private void scheduleInactivityTimer()
 2759    {
 2760        // Called with the ConnectionI mutex locked.
 2761        Debug.Assert(_inactivityTimer is null);
 2762        Debug.Assert(_inactivityTimeout > TimeSpan.Zero);
 2763
 12764        _inactivityTimer = new System.Threading.Timer(
 12765            inactivityTimer => inactivityCheck((System.Threading.Timer)inactivityTimer));
 12766        _inactivityTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 12767    }
 2768
 2769    private void cancelInactivityTimer()
 2770    {
 2771        // Called with the ConnectionI mutex locked.
 12772        if (_inactivityTimer is not null)
 2773        {
 12774            _inactivityTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 12775            _inactivityTimer.Dispose();
 12776            _inactivityTimer = null;
 2777        }
 12778    }
 2779
 2780    private void scheduleCloseTimer()
 2781    {
 12782        if (_closeTimeout > TimeSpan.Zero)
 2783        {
 2784#pragma warning disable CA2000 // closeTimer is disposed by closeTimedOut.
 12785            var closeTimer = new System.Threading.Timer(
 12786                timerObj => closeTimedOut((System.Threading.Timer)timerObj));
 2787            // schedule timer to run once; closeTimedOut disposes the timer too.
 12788            closeTimer.Change(_closeTimeout, Timeout.InfiniteTimeSpan);
 2789#pragma warning restore CA2000
 2790        }
 12791    }
 2792
 2793    private void doApplicationClose()
 2794    {
 2795        // Called with the ConnectionI mutex locked.
 2796        Debug.Assert(_state < StateClosing);
 12797        setState(
 12798            StateClosing,
 12799            new ConnectionClosedException(
 12800                "The connection was closed gracefully by the application.",
 12801                closedByApplication: true));
 12802    }
 2803
 2804    private class OutgoingMessage
 2805    {
 12806        internal OutgoingMessage(OutputStream stream, bool compress)
 2807        {
 12808            this.stream = stream;
 12809            this.compress = compress;
 12810        }
 2811
 12812        internal OutgoingMessage(OutgoingAsyncBase outAsync, OutputStream stream, bool compress, int requestId)
 2813        {
 12814            this.outAsync = outAsync;
 12815            this.stream = stream;
 12816            this.compress = compress;
 12817            this.requestId = requestId;
 12818        }
 2819
 2820        internal void canceled()
 2821        {
 2822            Debug.Assert(outAsync is not null); // Only requests can timeout.
 12823            outAsync = null;
 12824        }
 2825
 2826        internal bool sent()
 2827        {
 12828            stream = null;
 12829            if (outAsync is not null)
 2830            {
 12831                invokeSent = outAsync.sent();
 12832                return invokeSent || receivedReply;
 2833            }
 12834            return false;
 2835        }
 2836
 2837        internal void completed(LocalException ex)
 2838        {
 12839            if (outAsync is not null)
 2840            {
 12841                if (outAsync.exception(ex))
 2842                {
 12843                    outAsync.invokeException();
 2844                }
 2845            }
 12846            stream = null;
 12847        }
 2848
 2849        internal OutputStream stream;
 2850        internal OutgoingAsyncBase outAsync;
 2851        internal bool compress;
 2852        internal int requestId;
 2853        internal bool prepared;
 2854        internal bool isSent;
 2855        internal bool invokeSent;
 2856        internal bool receivedReply;
 2857    }
 2858
 12859    private static readonly ConnectionState[] connectionStateMap = [
 12860        ConnectionState.ConnectionStateValidating,   // StateNotInitialized
 12861        ConnectionState.ConnectionStateValidating,   // StateNotValidated
 12862        ConnectionState.ConnectionStateActive,       // StateActive
 12863        ConnectionState.ConnectionStateHolding,      // StateHolding
 12864        ConnectionState.ConnectionStateClosing,      // StateClosing
 12865        ConnectionState.ConnectionStateClosing,      // StateClosingPending
 12866        ConnectionState.ConnectionStateClosed,       // StateClosed
 12867        ConnectionState.ConnectionStateClosed,       // StateFinished
 12868    ];
 2869
 2870    private readonly Instance _instance;
 2871    private readonly Transceiver _transceiver;
 2872    private readonly IdleTimeoutTransceiverDecorator _idleTimeoutTransceiver; // can be null
 2873
 2874    private string _desc;
 2875    private readonly string _type;
 2876    private readonly Connector _connector;
 2877    private readonly EndpointI _endpoint;
 2878
 2879    private ObjectAdapter _adapter;
 2880
 2881    private readonly Logger _logger;
 2882    private readonly TraceLevels _traceLevels;
 2883    private readonly Ice.Internal.ThreadPool _threadPool;
 2884    private readonly bool _threadHopRequired;
 2885
 2886    private readonly TimeSpan _connectTimeout;
 2887    private readonly TimeSpan _closeTimeout;
 2888    private TimeSpan _inactivityTimeout; // protected by _mutex
 2889
 2890    private System.Threading.Timer _inactivityTimer; // can be null
 2891
 2892    private StartCallback _startCallback;
 2893
 2894    // This action must be called outside the ConnectionI lock to avoid lock acquisition deadlocks.
 2895    private readonly Action<ConnectionI> _removeFromFactory;
 2896
 2897    private readonly bool _warn;
 2898    private readonly bool _warnUdp;
 2899
 2900    private readonly int _compressionLevel;
 2901
 2902    private int _nextRequestId;
 2903
 12904    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 2905
 2906    private LocalException _exception;
 2907
 2908    private readonly int _messageSizeMax;
 2909    private readonly BatchRequestQueue _batchRequestQueue;
 2910
 12911    private readonly LinkedList<OutgoingMessage> _sendStreams = new LinkedList<OutgoingMessage>();
 2912
 2913    // Contains the message which is being received. If the connection is waiting to receive a message (_readHeader ==
 2914    // true), its size is Protocol.headerSize. Otherwise, its size is the message size specified in the received message
 2915    // header.
 2916    private readonly InputStream _readStream;
 2917
 2918    // When _readHeader is true, the next bytes we'll read are the header of a new message. When false, we're reading
 2919    // next the remainder of a message that was already partially received.
 2920    private bool _readHeader;
 2921
 2922    // Contains the message which is being sent. The write stream buffer is empty if no message is being sent.
 2923    private readonly OutputStream _writeStream;
 2924
 2925    private ConnectionObserver _observer;
 2926    private int _readStreamPos;
 2927    private int _writeStreamPos;
 2928
 2929    // The upcall count keeps track of the number of dispatches, AMI (response) continuations, sent callbacks and
 2930    // connection establishment callbacks that have been started (or are about to be started) by a thread of the thread
 2931    // pool associated with this connection, and have not completed yet. All these operations except the connection
 2932    // establishment callbacks execute application code or code generated from Slice definitions.
 2933    private int _upcallCount;
 2934
 2935    // The number of outstanding dispatches. Maintained only while state is StateActive or StateHolding.
 2936    // _dispatchCount can be greater than a non-0 _maxDispatches when a receive a batch with multiples requests.
 2937    private int _dispatchCount;
 2938
 2939    // When we dispatch _maxDispatches concurrent requests, we stop reading the connection to back-pressure the peer.
 2940    // _maxDispatches <= 0 means no limit.
 2941    private readonly int _maxDispatches;
 2942
 2943    private int _state; // The current state.
 2944    private bool _shutdownInitiated;
 2945    private bool _initialized;
 2946    private bool _validated;
 2947
 2948    // When true, the application called close and Connection must close the connection when it receives the reply
 2949    // for the last outstanding invocation.
 2950    private bool _closeRequested;
 2951
 2952    private ConnectionInfo _info;
 2953
 2954    private CloseCallback _closeCallback;
 2955
 2956    // We need to run the continuation asynchronously since it can be completed by an Ice thread pool thread.
 12957    private readonly TaskCompletionSource _closed = new(TaskCreationOptions.RunContinuationsAsynchronously);
 12958    private readonly object _mutex = new();
 2959}

Methods/Properties

start(Ice.ConnectionI.StartCallback)
startAndWait()
activate()
hold()
destroy(int)
abort()
closeAsync()
isActiveOrHolding()
throwException()
waitUntilHolding()
waitUntilFinished()
updateObserver()
sendAsyncRequest(Ice.Internal.OutgoingAsyncBase, bool, bool, int)
getBatchRequestQueue()
flushBatchRequests(Ice.CompressBatch)
flushBatchRequestsAsync(Ice.CompressBatch, System.IProgress<bool>, System.Threading.CancellationToken)
disableInactivityCheck()
setCloseCallback(Ice.CloseCallback)
asyncRequestCanceled(Ice.Internal.OutgoingAsyncBase, Ice.LocalException)
endpoint()
connector()
setAdapter(Ice.ObjectAdapter)
getAdapter()
getEndpoint()
createProxy(Ice.Identity)
setAdapterFromAdapter(Ice.ObjectAdapter)
startAsync(int, Ice.Internal.AsyncCallback)
doIO()
finishAsync(int)
message(Ice.Internal.ThreadPoolCurrent)
upcall(Ice.ConnectionI.StartCallback, System.Collections.Generic.Queue<Ice.ConnectionI.OutgoingMessage>, Ice.ConnectionI.MessageInfo)
finished(Ice.Internal.ThreadPoolCurrent)
finish()
ToString()
type()
getInfo()
setBufferSize(int, int)
exception(Ice.LocalException)
getThreadPool()
.ctor(Ice.Internal.Instance, Ice.Internal.Transceiver, Ice.Internal.Connector, Ice.Internal.EndpointI, Ice.ObjectAdapter, System.Action<Ice.ConnectionI>, Ice.ConnectionOptions)
idleCheck(System.TimeSpan)
sendHeartbeat()
isHeartbeat()
toConnectionState(int)
setState(int, Ice.LocalException)
setState(int)
initiateShutdown()
initialize(int)
validate(int)
sendNextMessage(out System.Collections.Generic.Queue<Ice.ConnectionI.OutgoingMessage>)
sendMessage(Ice.ConnectionI.OutgoingMessage)
doCompress(Ice.OutputStream, bool)
parseMessage(ref Ice.ConnectionI.MessageInfo)
dispatchAll(Ice.InputStream, int, int, byte, Ice.ObjectAdapter)
dispatchAsync()
sendResponse(Ice.OutgoingResponse, bool, byte)
dispatchException(Ice.LocalException, int)
inactivityCheck(System.Threading.Timer)
connectTimedOut(System.Threading.Timer)
closeTimedOut(System.Threading.Timer)
initConnectionInfo()
warning(string, System.Exception)
observerStartRead(Ice.Internal.Buffer)
observerFinishRead(Ice.Internal.Buffer)
observerStartWrite(Ice.Internal.Buffer)
observerFinishWrite(Ice.Internal.Buffer)
read(Ice.Internal.Buffer)
write(Ice.Internal.Buffer)
scheduleInactivityTimer()
cancelInactivityTimer()
scheduleCloseTimer()
doApplicationClose()
.ctor(Ice.OutputStream, bool)
.ctor(Ice.Internal.OutgoingAsyncBase, Ice.OutputStream, bool, int)
canceled()
sent()
completed(Ice.LocalException)
.cctor()