< Summary

Information
Class: Ice.Internal.OutgoingConnectionFactory.ConnectCallback
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/Internal/ConnectionFactory.cs
Tag: 71_18251537082
Line coverage
87%
Covered lines: 96
Uncovered lines: 14
Coverable lines: 110
Total lines: 1556
Line coverage: 87.2%
Branch coverage
84%
Covered branches: 32
Total branches: 38
Branch coverage: 84.2%
Method coverage
100%
Covered methods: 15
Total methods: 15
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
connectionStartCompleted(...)100%22100%
connectionStartFailed(...)100%22100%
connectors(...)100%44100%
exception(...)50%7.33666.67%
setConnection(...)100%11100%
setException(...)100%11100%
hasConnector(...)100%11100%
removeConnectors(...)100%11100%
removeFromPending()100%11100%
getConnectors()100%1.08157.14%
nextEndpoint()100%1.08157.14%
getConnection()100%2.19263.64%
nextConnector()90%101096.88%
connectionStartFailedImpl(...)83.33%1212100%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/Internal/ConnectionFactory.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Text;
 5
 6#nullable enable
 7
 8namespace Ice.Internal;
 9
 10internal class MultiDictionary<K, V> : Dictionary<K, ICollection<V>> where K : class
 11{
 12    internal void Add(K key, V value)
 13    {
 14        if (!TryGetValue(key, out ICollection<V>? list))
 15        {
 16            list = [];
 17            Add(key, list);
 18        }
 19        list.Add(value);
 20    }
 21
 22    internal void Remove(K key, V value)
 23    {
 24        ICollection<V> list = this[key];
 25        list.Remove(value);
 26        if (list.Count == 0)
 27        {
 28            Remove(key);
 29        }
 30    }
 31}
 32
 33internal sealed class OutgoingConnectionFactory
 34{
 35    internal interface CreateConnectionCallback
 36    {
 37        void setConnection(ConnectionI connection, bool compress);
 38
 39        void setException(LocalException ex);
 40    }
 41
 42    internal void destroy()
 43    {
 44        lock (_mutex)
 45        {
 46            if (_destroyed)
 47            {
 48                return;
 49            }
 50
 51            foreach (ICollection<ConnectionI> connections in _connections.Values)
 52            {
 53                foreach (ConnectionI c in connections)
 54                {
 55                    c.destroy(ConnectionI.CommunicatorDestroyed);
 56                }
 57            }
 58
 59            _destroyed = true;
 60            _defaultObjectAdapter = null;
 61            Monitor.PulseAll(_mutex);
 62        }
 63    }
 64
 65    internal void updateConnectionObservers()
 66    {
 67        lock (_mutex)
 68        {
 69            foreach (ICollection<ConnectionI> connections in _connections.Values)
 70            {
 71                foreach (ConnectionI c in connections)
 72                {
 73                    c.updateObserver();
 74                }
 75            }
 76        }
 77    }
 78
 79    internal void waitUntilFinished()
 80    {
 81        Dictionary<Connector, ICollection<ConnectionI>> connections;
 82        lock (_mutex)
 83        {
 84            // First we wait until the factory is destroyed. We also wait until there are no pending connections
 85            // anymore. Only then we can be sure the _connections contains all connections.
 86            while (!_destroyed || _pending.Count > 0 || _pendingConnectCount > 0)
 87            {
 88                Monitor.Wait(_mutex);
 89            }
 90
 91            // We want to wait until all connections are finished outside the thread synchronization.
 92            connections = new Dictionary<Connector, ICollection<ConnectionI>>(_connections);
 93        }
 94
 95        // Now we wait until the destruction of each connection is finished.
 96        foreach (ICollection<ConnectionI> cl in connections.Values)
 97        {
 98            foreach (ConnectionI c in cl)
 99            {
 100                c.waitUntilFinished();
 101            }
 102        }
 103    }
 104
 105    internal void create(List<EndpointI> endpoints, bool hasMore, CreateConnectionCallback callback)
 106    {
 107        Debug.Assert(endpoints.Count > 0);
 108
 109        // Try to find a connection to one of the given endpoints.
 110        try
 111        {
 112            if (findConnection(endpoints, out bool compress) is ConnectionI connection)
 113            {
 114                callback.setConnection(connection, compress);
 115                return;
 116            }
 117        }
 118        catch (LocalException ex)
 119        {
 120            callback.setException(ex);
 121            return;
 122        }
 123
 124        var cb = new ConnectCallback(this, endpoints, hasMore, callback);
 125        cb.getConnectors();
 126    }
 127
 128    internal void setRouterInfo(RouterInfo routerInfo)
 129    {
 130        Debug.Assert(routerInfo is not null);
 131        ObjectAdapter adapter = routerInfo.getAdapter();
 132        EndpointI[] endpoints = routerInfo.getClientEndpoints(); // Must be called outside the synchronization
 133
 134        lock (_mutex)
 135        {
 136            if (_destroyed)
 137            {
 138                throw new CommunicatorDestroyedException();
 139            }
 140
 141            // Search for connections to the router's client proxy endpoints, and update the object adapter for such
 142            // connections, so that callbacks from the router can be received over such connections.
 143            for (int i = 0; i < endpoints.Length; i++)
 144            {
 145                EndpointI endpoint = endpoints[i];
 146
 147                // The Ice.ConnectionI object does not take the compression flag of endpoints into account, but instead
 148                // gets the information about whether messages should be compressed or not fro other sources. In order
 149                // to allow connection sharing for endpoints that differ in the value of the compression flag only, we
 150                // always set the compression flag to false here in this connection factory. We also clear the timeout
 151                // as it is no longer used for Ice 3.8.
 152                endpoint = endpoint.compress(false).timeout(-1);
 153
 154                foreach (ICollection<ConnectionI> connections in _connections.Values)
 155                {
 156                    foreach (ConnectionI connection in connections)
 157                    {
 158                        if (connection.endpoint().Equals(endpoint))
 159                        {
 160                            connection.setAdapter(adapter);
 161                        }
 162                    }
 163                }
 164            }
 165        }
 166    }
 167
 168    internal void removeAdapter(ObjectAdapter adapter)
 169    {
 170        lock (_mutex)
 171        {
 172            if (_destroyed)
 173            {
 174                return;
 175            }
 176
 177            foreach (ICollection<ConnectionI> connectionList in _connections.Values)
 178            {
 179                foreach (ConnectionI connection in connectionList)
 180                {
 181                    if (connection.getAdapter() == adapter)
 182                    {
 183                        connection.setAdapter(null);
 184                    }
 185                }
 186            }
 187        }
 188    }
 189
 190    internal void flushAsyncBatchRequests(CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
 191    {
 192        var c = new List<ConnectionI>();
 193
 194        lock (_mutex)
 195        {
 196            if (!_destroyed)
 197            {
 198                foreach (ICollection<ConnectionI> connectionList in _connections.Values)
 199                {
 200                    foreach (ConnectionI conn in connectionList)
 201                    {
 202                        if (conn.isActiveOrHolding())
 203                        {
 204                            c.Add(conn);
 205                        }
 206                    }
 207                }
 208            }
 209        }
 210
 211        foreach (ConnectionI conn in c)
 212        {
 213            try
 214            {
 215                outAsync.flushConnection(conn, compressBatch);
 216            }
 217            catch (LocalException)
 218            {
 219                // Ignore.
 220            }
 221        }
 222    }
 223
 224    // Only for use by Instance.
 225    internal OutgoingConnectionFactory(Instance instance)
 226    {
 227        _instance = instance;
 228        _connectionOptions = instance.clientConnectionOptions;
 229        _destroyed = false;
 230        _pendingConnectCount = 0;
 231    }
 232
 233    internal void setDefaultObjectAdapter(ObjectAdapter? adapter)
 234    {
 235        lock (_mutex)
 236        {
 237            _defaultObjectAdapter = adapter;
 238        }
 239    }
 240
 241    internal ObjectAdapter? getDefaultObjectAdapter()
 242    {
 243        lock (_mutex)
 244        {
 245            return _defaultObjectAdapter;
 246        }
 247    }
 248
 249    private ConnectionI? findConnection(List<EndpointI> endpoints, out bool compress)
 250    {
 251        lock (_mutex)
 252        {
 253            if (_destroyed)
 254            {
 255                throw new CommunicatorDestroyedException();
 256            }
 257
 258            DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 259            Debug.Assert(endpoints.Count > 0);
 260
 261            foreach (EndpointI proxyEndpoint in endpoints)
 262            {
 263                EndpointI endpoint = proxyEndpoint.timeout(-1); // Clear the timeout
 264
 265                if (!_connectionsByEndpoint.TryGetValue(endpoint, out ICollection<ConnectionI>? connectionList))
 266                {
 267                    continue;
 268                }
 269
 270                foreach (ConnectionI connection in connectionList)
 271                {
 272                    if (connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections
 273                    {
 274                        compress = defaultsAndOverrides.overrideCompress ?? endpoint.compress();
 275                        return connection;
 276                    }
 277                }
 278            }
 279
 280            compress = false; // Satisfy the compiler
 281            return null;
 282        }
 283    }
 284
 285    // Must be called while synchronized.
 286    private ConnectionI? findConnection(List<ConnectorInfo> connectors, out bool compress)
 287    {
 288        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 289        foreach (ConnectorInfo ci in connectors)
 290        {
 291            if (_pending.ContainsKey(ci.connector))
 292            {
 293                continue;
 294            }
 295
 296            if (!_connections.TryGetValue(ci.connector, out ICollection<ConnectionI>? connectionList))
 297            {
 298                continue;
 299            }
 300
 301            foreach (ConnectionI connection in connectionList)
 302            {
 303                if (connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections
 304                {
 305                    compress = defaultsAndOverrides.overrideCompress ?? ci.endpoint.compress();
 306                    return connection;
 307                }
 308            }
 309        }
 310
 311        compress = false; // Satisfy the compiler
 312        return null;
 313    }
 314
 315    internal void incPendingConnectCount()
 316    {
 317        // Keep track of the number of pending connects. The outgoing connection factory waitUntilFinished() method
 318        // waits for all the pending connects to terminate before return. This ensures that the communicator client
 319        // thread pool isn't destroyed too soon and will still be available to execute the ice_exception() callbacks
 320        // for the asynchronous requests waiting on a connection to be established.
 321
 322        lock (_mutex)
 323        {
 324            if (_destroyed)
 325            {
 326                throw new CommunicatorDestroyedException();
 327            }
 328            ++_pendingConnectCount;
 329        }
 330    }
 331
 332    internal void decPendingConnectCount()
 333    {
 334        lock (_mutex)
 335        {
 336            --_pendingConnectCount;
 337            Debug.Assert(_pendingConnectCount >= 0);
 338            if (_destroyed && _pendingConnectCount == 0)
 339            {
 340                Monitor.PulseAll(_mutex);
 341            }
 342        }
 343    }
 344
 345    private ConnectionI? getConnection(List<ConnectorInfo> connectors, ConnectCallback cb, out bool compress)
 346    {
 347        lock (_mutex)
 348        {
 349            if (_destroyed)
 350            {
 351                throw new CommunicatorDestroyedException();
 352            }
 353
 354            // Search for an existing connections matching one of the given endpoints.
 355            if (findConnection(connectors, out compress) is ConnectionI connection)
 356            {
 357                return connection;
 358            }
 359
 360            if (addToPending(cb, connectors))
 361            {
 362                // A connection to one of our endpoints is pending. The callback will be notified once the connection
 363                // is established. Returning null indicates that the connection is still pending.
 364                return null;
 365            }
 366        }
 367
 368        // No connection is pending. Call nextConnector to initiate connection establishment. Return null to indicate
 369        // that the connection is still pending.
 370        cb.nextConnector();
 371        return null;
 372    }
 373
 374    private ConnectionI createConnection(Transceiver transceiver, ConnectorInfo ci)
 375    {
 376        lock (_mutex)
 377        {
 378            Debug.Assert(_pending.ContainsKey(ci.connector) && transceiver != null);
 379
 380            // Create and add the connection to the connection map. Adding the connection to the map is necessary to
 381            // support the interruption of the connection initialization and validation in case the communicator is
 382            // destroyed.
 383            ConnectionI connection;
 384            try
 385            {
 386                if (_destroyed)
 387                {
 388                    throw new CommunicatorDestroyedException();
 389                }
 390
 391                connection = new ConnectionI(
 392                    _instance,
 393                    transceiver,
 394                    ci.connector,
 395                    ci.endpoint.compress(false).timeout(-1),
 396                    adapter: _defaultObjectAdapter,
 397                    removeConnection,
 398                    _connectionOptions);
 399            }
 400            catch (LocalException)
 401            {
 402                try
 403                {
 404                    transceiver.close();
 405                }
 406                catch (LocalException)
 407                {
 408                    // Ignore
 409                }
 410                throw;
 411            }
 412
 413            _connections.Add(ci.connector, connection);
 414            _connectionsByEndpoint.Add(connection.endpoint(), connection);
 415            _connectionsByEndpoint.Add(connection.endpoint().compress(true), connection);
 416            return connection;
 417        }
 418    }
 419
 420    private void finishGetConnection(
 421        List<ConnectorInfo> connectors,
 422        ConnectorInfo ci,
 423        ConnectionI connection,
 424        ConnectCallback cb)
 425    {
 426        var connectionCallbacks = new HashSet<ConnectCallback>
 427        {
 428            cb
 429        };
 430
 431        var callbacks = new HashSet<ConnectCallback>();
 432        lock (_mutex)
 433        {
 434            foreach (ConnectorInfo c in connectors)
 435            {
 436                if (_pending.TryGetValue(c.connector, out HashSet<ConnectCallback>? s))
 437                {
 438                    foreach (ConnectCallback cc in s)
 439                    {
 440                        if (cc.hasConnector(ci))
 441                        {
 442                            connectionCallbacks.Add(cc);
 443                        }
 444                        else
 445                        {
 446                            callbacks.Add(cc);
 447                        }
 448                    }
 449                    _pending.Remove(c.connector);
 450                }
 451            }
 452
 453            foreach (ConnectCallback cc in connectionCallbacks)
 454            {
 455                cc.removeFromPending();
 456                callbacks.Remove(cc);
 457            }
 458
 459            foreach (ConnectCallback cc in callbacks)
 460            {
 461                cc.removeFromPending();
 462            }
 463            Monitor.PulseAll(_mutex);
 464        }
 465
 466        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 467        bool compress = defaultsAndOverrides.overrideCompress ?? ci.endpoint.compress();
 468
 469        foreach (ConnectCallback cc in callbacks)
 470        {
 471            cc.getConnection();
 472        }
 473
 474        foreach (ConnectCallback cc in connectionCallbacks)
 475        {
 476            cc.setConnection(connection, compress);
 477        }
 478    }
 479
 480    private void finishGetConnection(List<ConnectorInfo> connectors, LocalException ex, ConnectCallback cb)
 481    {
 482        var failedCallbacks = new HashSet<ConnectCallback>
 483        {
 484            cb
 485        };
 486
 487        var callbacks = new HashSet<ConnectCallback>();
 488        lock (_mutex)
 489        {
 490            foreach (ConnectorInfo c in connectors)
 491            {
 492                if (_pending.TryGetValue(c.connector, out HashSet<ConnectCallback>? s))
 493                {
 494                    foreach (ConnectCallback cc in s)
 495                    {
 496                        if (cc.removeConnectors(connectors))
 497                        {
 498                            failedCallbacks.Add(cc);
 499                        }
 500                        else
 501                        {
 502                            callbacks.Add(cc);
 503                        }
 504                    }
 505                    _pending.Remove(c.connector);
 506                }
 507            }
 508
 509            foreach (ConnectCallback cc in callbacks)
 510            {
 511                Debug.Assert(!failedCallbacks.Contains(cc));
 512                cc.removeFromPending();
 513            }
 514            Monitor.PulseAll(_mutex);
 515        }
 516
 517        foreach (ConnectCallback cc in callbacks)
 518        {
 519            cc.getConnection();
 520        }
 521
 522        foreach (ConnectCallback cc in failedCallbacks)
 523        {
 524            cc.setException(ex);
 525        }
 526    }
 527
 528    private void handleConnectionException(LocalException ex, bool hasMore)
 529    {
 530        TraceLevels traceLevels = _instance.traceLevels();
 531        if (traceLevels.network >= 2)
 532        {
 533            var s = new StringBuilder();
 534            s.Append("connection to endpoint failed");
 535            if (ex is CommunicatorDestroyedException)
 536            {
 537                s.Append('\n');
 538            }
 539            else
 540            {
 541                if (hasMore)
 542                {
 543                    s.Append(", trying next endpoint\n");
 544                }
 545                else
 546                {
 547                    s.Append(" and no more endpoints to try\n");
 548                }
 549            }
 550            s.Append(ex);
 551            _instance.initializationData().logger!.trace(traceLevels.networkCat, s.ToString());
 552        }
 553    }
 554
 555    private bool addToPending(ConnectCallback cb, List<ConnectorInfo> connectors)
 556    {
 557        // Add the callback to each connector pending list.
 558        bool found = false;
 559        foreach (ConnectorInfo ci in connectors)
 560        {
 561            if (_pending.TryGetValue(ci.connector, out HashSet<ConnectCallback>? cbs))
 562            {
 563                found = true;
 564                cbs.Add(cb); // Add the callback to each pending connector.
 565            }
 566        }
 567
 568        if (found)
 569        {
 570            return true;
 571        }
 572
 573        // If no pending connection exists for the specified connectors, the caller is responsible for initiating
 574        // connection establishment. An empty pending list is added, and any additional callbacks for the same
 575        // connectors will be queued.
 576        foreach (ConnectorInfo ci in connectors)
 577        {
 578            if (!_pending.ContainsKey(ci.connector))
 579            {
 580                _pending.Add(ci.connector, []);
 581            }
 582        }
 583        return false;
 584    }
 585
 586    private void removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors)
 587    {
 588        foreach (ConnectorInfo ci in connectors)
 589        {
 590            if (_pending.TryGetValue(ci.connector, out HashSet<ConnectCallback>? cbs))
 591            {
 592                cbs.Remove(cb);
 593            }
 594        }
 595    }
 596
 597    private void removeConnection(ConnectionI connection)
 598    {
 599        lock (_mutex)
 600        {
 601            if (_destroyed)
 602            {
 603                return;
 604            }
 605
 606            _connections.Remove(connection.connector(), connection);
 607            _connectionsByEndpoint.Remove(connection.endpoint(), connection);
 608            _connectionsByEndpoint.Remove(connection.endpoint().compress(true), connection);
 609        }
 610    }
 611
 612    internal void handleException(LocalException ex, bool hasMore)
 613    {
 614        TraceLevels traceLevels = _instance.traceLevels();
 615        if (traceLevels.network >= 2)
 616        {
 617            var s = new StringBuilder();
 618            s.Append("couldn't resolve endpoint host");
 619            if (ex is CommunicatorDestroyedException)
 620            {
 621                s.Append('\n');
 622            }
 623            else
 624            {
 625                if (hasMore)
 626                {
 627                    s.Append(", trying next endpoint\n");
 628                }
 629                else
 630                {
 631                    s.Append(" and no more endpoints to try\n");
 632                }
 633            }
 634            s.Append(ex);
 635            _instance.initializationData().logger!.trace(traceLevels.networkCat, s.ToString());
 636        }
 637    }
 638
 639    private class ConnectorInfo
 640    {
 641        internal ConnectorInfo(Connector c, EndpointI e)
 642        {
 643            connector = c;
 644            endpoint = e;
 645        }
 646
 647        public override bool Equals(object? obj) =>
 648            (obj is ConnectorInfo r) && connector.Equals(r.connector);
 649
 650        public override int GetHashCode() => connector.GetHashCode();
 651
 652        internal Connector connector;
 653        internal EndpointI endpoint;
 654    }
 655
 656    private class ConnectCallback : ConnectionI.StartCallback, EndpointI_connectors
 657    {
 1658        internal ConnectCallback(
 1659            OutgoingConnectionFactory factory,
 1660            List<EndpointI> endpoints,
 1661            bool more,
 1662            CreateConnectionCallback cb)
 663        {
 1664            _factory = factory;
 1665            _endpoints = endpoints;
 1666            _hasMore = more;
 1667            _callback = cb;
 1668            _endpointsIter = 0;
 1669        }
 670
 671        //
 672        // Methods from ConnectionI.StartCallback
 673        //
 674        public void connectionStartCompleted(ConnectionI connection)
 675        {
 1676            _observer?.detach();
 1677            connection.activate();
 678            Debug.Assert(_current is not null);
 1679            _factory.finishGetConnection(_connectors, _current, connection, this);
 1680        }
 681
 682        public void connectionStartFailed(ConnectionI connection, LocalException ex)
 683        {
 1684            if (connectionStartFailedImpl(ex))
 685            {
 1686                nextConnector();
 687            }
 1688        }
 689
 690        //
 691        // Methods from EndpointI_connectors
 692        //
 693        public void connectors(List<Connector> cons)
 694        {
 695            Debug.Assert(_currentEndpoint is not null);
 1696            foreach (Connector connector in cons)
 697            {
 1698                _connectors.Add(new ConnectorInfo(connector, _currentEndpoint));
 699            }
 700
 1701            if (_endpointsIter < _endpoints.Count)
 702            {
 1703                nextEndpoint();
 704            }
 705            else
 706            {
 707                Debug.Assert(_connectors.Count > 0);
 708
 709                //
 710                // We now have all the connectors for the given endpoints. We can try to obtain the
 711                // connection.
 712                //
 1713                _iter = 0;
 1714                getConnection();
 715            }
 1716        }
 717
 718        public void exception(LocalException ex)
 719        {
 1720            _factory.handleException(ex, _hasMore || _endpointsIter < _endpoints.Count);
 1721            if (_endpointsIter < _endpoints.Count)
 722            {
 0723                nextEndpoint();
 724            }
 1725            else if (_connectors.Count > 0)
 726            {
 727                //
 728                // We now have all the connectors for the given endpoints. We can try to obtain the
 729                // connection.
 730                //
 0731                _iter = 0;
 0732                getConnection();
 733            }
 734            else
 735            {
 1736                _callback.setException(ex);
 1737                _factory.decPendingConnectCount(); // Must be called last.
 738            }
 1739        }
 740
 741        internal void setConnection(ConnectionI connection, bool compress)
 742        {
 743            //
 744            // Callback from the factory: the connection to one of the callback
 745            // connectors has been established.
 746            //
 1747            _callback.setConnection(connection, compress);
 1748            _factory.decPendingConnectCount(); // Must be called last.
 1749        }
 750
 751        internal void setException(LocalException ex)
 752        {
 753            //
 754            // Callback from the factory: connection establishment failed.
 755            //
 1756            _callback.setException(ex);
 1757            _factory.decPendingConnectCount(); // Must be called last.
 1758        }
 759
 1760        internal bool hasConnector(ConnectorInfo ci) => _connectors.Contains(ci);
 761
 762        internal bool removeConnectors(List<ConnectorInfo> connectors)
 763        {
 1764            _connectors.RemoveAll(ci => connectors.Contains(ci));
 1765            return _connectors.Count == 0;
 766        }
 767
 1768        internal void removeFromPending() => _factory.removeFromPending(this, _connectors);
 769
 770        internal void getConnectors()
 771        {
 772            try
 773            {
 774                // Notify the factory that there's an async connect pending. This is necessary
 775                // to prevent the outgoing connection factory to be destroyed before all the
 776                // pending asynchronous connects are finished.
 1777                _factory.incPendingConnectCount();
 1778            }
 0779            catch (LocalException ex)
 780            {
 0781                _callback.setException(ex);
 0782                return;
 783            }
 784
 1785            nextEndpoint();
 1786        }
 787
 788        private void nextEndpoint()
 789        {
 790            try
 791            {
 792                Debug.Assert(_endpointsIter < _endpoints.Count);
 1793                _currentEndpoint = _endpoints[_endpointsIter++];
 1794                _currentEndpoint.connectors_async(this);
 1795            }
 0796            catch (LocalException ex)
 797            {
 0798                exception(ex);
 0799            }
 1800        }
 801
 802        internal void getConnection()
 803        {
 804            try
 805            {
 806                // If all the connectors have been created, we ask the factory to get a connection.
 1807                ConnectionI? connection = _factory.getConnection(_connectors, this, out bool compress);
 1808                if (connection is null)
 809                {
 810                    // A null return value from getConnection indicates that the connection is being established and
 811                    // the callback will be notified when the connection establishment is done.
 1812                    return;
 813                }
 814
 1815                _callback.setConnection(connection, compress);
 1816                _factory.decPendingConnectCount(); // Must be called last.
 1817            }
 0818            catch (LocalException ex)
 819            {
 0820                _callback.setException(ex);
 0821                _factory.decPendingConnectCount(); // Must be called last.
 0822            }
 1823        }
 824
 825        internal void nextConnector()
 826        {
 827            while (true)
 828            {
 829                try
 830                {
 831                    Debug.Assert(_iter < _connectors.Count);
 1832                    _current = _connectors[_iter++];
 833                    Debug.Assert(_current is not null);
 834
 1835                    if (
 1836                        _factory._instance.initializationData().observer is
 1837                        Instrumentation.CommunicatorObserver observer)
 838                    {
 1839                        _observer = observer.getConnectionEstablishmentObserver(
 1840                            _current.endpoint,
 1841                            _current.connector.ToString()!);
 842
 1843                        _observer?.attach();
 844                    }
 845
 1846                    if (_factory._instance.traceLevels().network >= 2)
 847                    {
 1848                        var s = new StringBuilder("trying to establish ");
 1849                        s.Append(_current.endpoint.protocol());
 1850                        s.Append(" connection to ");
 1851                        s.Append(_current.connector.ToString());
 1852                        _factory._instance.initializationData().logger!.trace(
 1853                            _factory._instance.traceLevels().networkCat, s.ToString());
 854                    }
 855
 1856                    ConnectionI connection = _factory.createConnection(_current.connector.connect(), _current);
 1857                    connection.start(this);
 1858                }
 1859                catch (LocalException ex)
 860                {
 1861                    if (_factory._instance.traceLevels().network >= 2)
 862                    {
 863                        Debug.Assert(_current is not null);
 1864                        var s = new StringBuilder("failed to establish ");
 1865                        s.Append(_current.endpoint.protocol());
 1866                        s.Append(" connection to ");
 1867                        s.Append(_current.connector.ToString());
 1868                        s.Append('\n');
 1869                        s.Append(ex);
 1870                        _factory._instance.initializationData().logger!.trace(
 1871                            _factory._instance.traceLevels().networkCat, s.ToString());
 872                    }
 873
 1874                    if (connectionStartFailedImpl(ex))
 875                    {
 0876                        continue;
 877                    }
 1878                }
 879                break;
 880            }
 1881        }
 882
 883        private bool connectionStartFailedImpl(LocalException ex)
 884        {
 1885            if (_observer != null)
 886            {
 1887                _observer.failed(ex.ice_id());
 1888                _observer.detach();
 889            }
 1890            _factory.handleConnectionException(ex, _hasMore || _iter < _connectors.Count); // just logging
 891
 892            // We stop on ConnectTimeoutException to fail reasonably fast when the endpoint has many connectors
 893            // (IP addresses).
 1894            if (_iter < _connectors.Count && !(ex is CommunicatorDestroyedException or ConnectTimeoutException))
 895            {
 1896                return true; // keep going
 897            }
 898
 1899            _factory.finishGetConnection(_connectors, ex, this);
 1900            return false;
 901        }
 902
 903        private readonly OutgoingConnectionFactory _factory;
 904        private readonly bool _hasMore;
 905        private readonly CreateConnectionCallback _callback;
 906        private readonly List<EndpointI> _endpoints;
 907        private int _endpointsIter;
 908        private EndpointI? _currentEndpoint;
 1909        private readonly List<ConnectorInfo> _connectors = [];
 910        private int _iter;
 911        private ConnectorInfo? _current;
 912        private Instrumentation.Observer? _observer;
 913    }
 914
 915    private readonly Instance _instance;
 916    private readonly ConnectionOptions _connectionOptions;
 917    private bool _destroyed;
 918
 919    private readonly MultiDictionary<Connector, ConnectionI> _connections = [];
 920    private readonly MultiDictionary<EndpointI, ConnectionI> _connectionsByEndpoint = [];
 921    private readonly Dictionary<Connector, HashSet<ConnectCallback>> _pending = [];
 922    private int _pendingConnectCount;
 923
 924    private ObjectAdapter? _defaultObjectAdapter;
 925    private readonly object _mutex = new();
 926}
 927
 928internal sealed class IncomingConnectionFactory : EventHandler, ConnectionI.StartCallback
 929{
 930    internal void activate()
 931    {
 932        lock (_mutex)
 933        {
 934            setState(StateActive);
 935        }
 936    }
 937
 938    internal void hold()
 939    {
 940        lock (_mutex)
 941        {
 942            setState(StateHolding);
 943        }
 944    }
 945
 946    internal void destroy()
 947    {
 948        lock (_mutex)
 949        {
 950            setState(StateClosed);
 951        }
 952    }
 953
 954    internal void updateConnectionObservers()
 955    {
 956        lock (_mutex)
 957        {
 958            foreach (ConnectionI connection in _connections)
 959            {
 960                connection.updateObserver();
 961            }
 962        }
 963    }
 964
 965    internal void waitUntilHolding()
 966    {
 967        ICollection<ConnectionI> connections;
 968
 969        lock (_mutex)
 970        {
 971            // First we wait until the connection factory itself is in holding state.
 972            while (_state < StateHolding)
 973            {
 974                Monitor.Wait(_mutex);
 975            }
 976
 977            // We want to wait until all connections are in holding state outside the thread synchronization.
 978            connections = new List<ConnectionI>(_connections);
 979        }
 980
 981        // Now we wait until each connection is in holding state.
 982        foreach (ConnectionI connection in connections)
 983        {
 984            connection.waitUntilHolding();
 985        }
 986    }
 987
 988    internal void waitUntilFinished()
 989    {
 990        ICollection<ConnectionI> connections;
 991
 992        lock (_mutex)
 993        {
 994            // First we wait until the factory is destroyed. If we are using an acceptor, we also wait for it to be
 995            // closed.
 996            while (_state != StateFinished)
 997            {
 998                Monitor.Wait(_mutex);
 999            }
 1000
 1001            // We want to wait until all connections are finished outside the thread synchronization.
 1002            connections = new List<ConnectionI>(_connections);
 1003        }
 1004
 1005        foreach (ConnectionI connection in connections)
 1006        {
 1007            connection.waitUntilFinished();
 1008        }
 1009
 1010        lock (_mutex)
 1011        {
 1012            _connections.Clear();
 1013        }
 1014    }
 1015
 1016    internal EndpointI endpoint()
 1017    {
 1018        lock (_mutex)
 1019        {
 1020            return _endpoint;
 1021        }
 1022    }
 1023
 1024    internal ICollection<ConnectionI> connections()
 1025    {
 1026        lock (_mutex)
 1027        {
 1028            ICollection<ConnectionI> connections = [];
 1029
 1030            // Only copy connections which have not been destroyed.
 1031            foreach (ConnectionI connection in _connections)
 1032            {
 1033                if (connection.isActiveOrHolding())
 1034                {
 1035                    connections.Add(connection);
 1036                }
 1037            }
 1038
 1039            return connections;
 1040        }
 1041    }
 1042
 1043    internal void flushAsyncBatchRequests(CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
 1044    {
 1045        // connections() is synchronized, no need to synchronize here.
 1046        foreach (ConnectionI connection in connections())
 1047        {
 1048            try
 1049            {
 1050                outAsync.flushConnection(connection, compressBatch);
 1051            }
 1052            catch (LocalException)
 1053            {
 1054                // Ignore.
 1055            }
 1056        }
 1057    }
 1058
 1059    //
 1060    // Operations from EventHandler.
 1061    //
 1062    public override bool startAsync(int operation, AsyncCallback completedCallback)
 1063    {
 1064        if (_state >= StateClosed)
 1065        {
 1066            return false;
 1067        }
 1068
 1069        // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
 1070        // Ice thread pool thread is terminated.
 1071        Task.Run(() =>
 1072        {
 1073            lock (_mutex)
 1074            {
 1075                if (_state >= StateClosed)
 1076                {
 1077                    completedCallback(this);
 1078                    return;
 1079                }
 1080
 1081                try
 1082                {
 1083                    Debug.Assert(_acceptor is not null);
 1084                    if (_acceptor.startAccept(completedCallback, this))
 1085                    {
 1086                        completedCallback(this);
 1087                    }
 1088                }
 1089                catch (LocalException ex)
 1090                {
 1091                    _acceptorException = ex;
 1092                    completedCallback(this);
 1093                }
 1094            }
 1095        });
 1096
 1097        return true;
 1098    }
 1099
 1100    public override bool finishAsync(int operation)
 1101    {
 1102        try
 1103        {
 1104            if (_acceptorException != null)
 1105            {
 1106                throw _acceptorException;
 1107            }
 1108            Debug.Assert(_acceptor is not null);
 1109            _acceptor.finishAccept();
 1110        }
 1111        catch (LocalException ex)
 1112        {
 1113            _acceptorException = null;
 1114            if (_warn)
 1115            {
 1116                _instance.initializationData().logger!.warning($"error accepting connection:\n{ex}\n{_acceptor}");
 1117            }
 1118        }
 1119        return _state < StateClosed;
 1120    }
 1121
 1122    public override void message(ThreadPoolCurrent current)
 1123    {
 1124        ConnectionI? connection = null;
 1125
 1126        using var msg = new ThreadPoolMessage(current, this);
 1127
 1128        lock (_mutex)
 1129        {
 1130            if (!msg.startIOScope())
 1131            {
 1132                return;
 1133            }
 1134
 1135            try
 1136            {
 1137                if (_state >= StateClosed)
 1138                {
 1139                    return;
 1140                }
 1141                else if (_state == StateHolding)
 1142                {
 1143                    return;
 1144                }
 1145
 1146                if (!_acceptorStarted)
 1147                {
 1148                    return;
 1149                }
 1150
 1151                //
 1152                // Now accept a new connection.
 1153                //
 1154                Transceiver? transceiver = null;
 1155                try
 1156                {
 1157                    Debug.Assert(_acceptor is not null);
 1158                    transceiver = _acceptor.accept();
 1159
 1160                    if (_maxConnections > 0 && _connections.Count == _maxConnections)
 1161                    {
 1162                        // Can't accept more connections, so we abort this transport connection.
 1163
 1164                        if (_instance.traceLevels().network >= 2)
 1165                        {
 1166                            _instance.initializationData().logger!.trace(
 1167                                _instance.traceLevels().networkCat,
 1168                                $"rejecting new {_endpoint.protocol()} connection\n{transceiver}\nbecause the maximum nu
 1169                        }
 1170
 1171                        try
 1172                        {
 1173                            transceiver.close();
 1174                        }
 1175                        catch
 1176                        {
 1177                            // Ignore
 1178                        }
 1179                        transceiver.destroy();
 1180                        return;
 1181                    }
 1182
 1183                    if (_instance.traceLevels().network >= 2)
 1184                    {
 1185                        var s = new StringBuilder("trying to accept ");
 1186                        s.Append(_endpoint.protocol());
 1187                        s.Append(" connection\n");
 1188                        s.Append(transceiver.ToString());
 1189                        _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1190                    }
 1191                }
 1192                catch (LocalException ex)
 1193                {
 1194                    if (_warn)
 1195                    {
 1196                        _instance.initializationData().logger!.warning(
 1197                            $"error accepting connection:\n{ex}\n{_acceptor}");
 1198                    }
 1199                    return;
 1200                }
 1201
 1202                Debug.Assert(transceiver is not null);
 1203
 1204                try
 1205                {
 1206                    connection = new ConnectionI(
 1207                        _instance,
 1208                        transceiver,
 1209                        connector: null,
 1210                        _endpoint,
 1211                        _adapter,
 1212                        removeConnection,
 1213                        _connectionOptions);
 1214                }
 1215                catch (LocalException ex)
 1216                {
 1217                    try
 1218                    {
 1219                        transceiver.close();
 1220                    }
 1221                    catch (LocalException)
 1222                    {
 1223                        // Ignore
 1224                    }
 1225
 1226                    if (_warn)
 1227                    {
 1228                        _instance.initializationData().logger!.warning(
 1229                            $"error accepting connection:\n{ex}\n{_acceptor}");
 1230                    }
 1231                    return;
 1232                }
 1233
 1234                _connections.Add(connection);
 1235            }
 1236            finally
 1237            {
 1238                msg.finishIOScope();
 1239            }
 1240        }
 1241
 1242        Debug.Assert(connection != null);
 1243        connection.start(this);
 1244    }
 1245
 1246    public override void finished(ThreadPoolCurrent current)
 1247    {
 1248        lock (_mutex)
 1249        {
 1250            if (_state < StateClosed)
 1251            {
 1252                return;
 1253            }
 1254
 1255            Debug.Assert(_state >= StateClosed);
 1256            setState(StateFinished);
 1257        }
 1258    }
 1259
 1260    public override string ToString() => _transceiver?.ToString() ?? _acceptor?.ToString() ?? "";
 1261
 1262    //
 1263    // Operations from ConnectionI.StartCallback
 1264    //
 1265    public void connectionStartCompleted(ConnectionI connection)
 1266    {
 1267        lock (_mutex)
 1268        {
 1269            // Initially, connections are in the holding state. If the factory is active we activate the connection.
 1270            if (_state == StateActive)
 1271            {
 1272                connection.activate();
 1273            }
 1274        }
 1275    }
 1276
 1277    public void connectionStartFailed(ConnectionI connection, LocalException ex)
 1278    {
 1279        // Do not warn about connection exceptions here. The connection is not yet validated.
 1280    }
 1281
 1282    internal IncomingConnectionFactory(Instance instance, EndpointI endpoint, ObjectAdapter adapter)
 1283    {
 1284        _instance = instance;
 1285        _connectionOptions = instance.serverConnectionOptions(adapter.getName());
 1286
 1287        // Meaningful only for non-datagram (non-UDP) connections.
 1288        _maxConnections = endpoint.datagram() ? 0 :
 1289            instance.initializationData().properties!.getPropertyAsInt($"{adapter.getName()}.MaxConnections");
 1290
 1291        _endpoint = endpoint;
 1292        _adapter = adapter;
 1293        _warn = _instance.initializationData().properties!.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 1294        _connections = [];
 1295        _state = StateHolding;
 1296        _acceptorStarted = false;
 1297
 1298        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 1299
 1300        if (defaultsAndOverrides.overrideCompress is not null)
 1301        {
 1302            _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompress.Value);
 1303        }
 1304
 1305        try
 1306        {
 1307            _transceiver = _endpoint.transceiver();
 1308            if (_transceiver != null)
 1309            {
 1310                // All this is for UDP "connections".
 1311
 1312                if (_instance.traceLevels().network >= 2)
 1313                {
 1314                    var s = new StringBuilder("attempting to bind to ");
 1315                    s.Append(_endpoint.protocol());
 1316                    s.Append(" socket\n");
 1317                    s.Append(_transceiver.ToString());
 1318                    _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1319                }
 1320                _endpoint = _transceiver.bind();
 1321
 1322                var connection = new ConnectionI(
 1323                    _instance,
 1324                    _transceiver,
 1325                    connector: null,
 1326                    _endpoint,
 1327                    adapter,
 1328                    removeFromFactory: null,
 1329                    _connectionOptions);
 1330                connection.startAndWait();
 1331                _connections.Add(connection);
 1332                Debug.Assert(_maxConnections == 0); // UDP so no max connections.
 1333            }
 1334            else
 1335            {
 1336                createAcceptor();
 1337            }
 1338        }
 1339        catch (System.Exception ex)
 1340        {
 1341            //
 1342            // Clean up.
 1343            //
 1344            if (_transceiver != null)
 1345            {
 1346                try
 1347                {
 1348                    _transceiver.close();
 1349                }
 1350                catch (LocalException)
 1351                {
 1352                    // Ignore
 1353                }
 1354            }
 1355
 1356            _state = StateFinished;
 1357            _connections.Clear();
 1358
 1359            if (ex is LocalException)
 1360            {
 1361                throw;
 1362            }
 1363            else
 1364            {
 1365                throw new SyscallException(ex);
 1366            }
 1367        }
 1368    }
 1369
 1370    private const int StateActive = 0;
 1371    private const int StateHolding = 1;
 1372    private const int StateClosed = 2;
 1373    private const int StateFinished = 3;
 1374
 1375    private void setState(int state)
 1376    {
 1377        if (_state == state) // Don't switch twice.
 1378        {
 1379            return;
 1380        }
 1381
 1382        switch (state)
 1383        {
 1384            case StateActive:
 1385            {
 1386                if (_state != StateHolding) // Can only switch from holding to active.
 1387                {
 1388                    return;
 1389                }
 1390
 1391                if (_acceptor is not null)
 1392                {
 1393                    if (_instance.traceLevels().network >= 1)
 1394                    {
 1395                        _instance.initializationData().logger!.trace(
 1396                            _instance.traceLevels().networkCat,
 1397                            $"accepting {_endpoint.protocol()} connections at {_acceptor}");
 1398                    }
 1399                    _adapter.getThreadPool().register(this, SocketOperation.Read);
 1400                }
 1401
 1402                foreach (ConnectionI connection in _connections)
 1403                {
 1404                    connection.activate();
 1405                }
 1406                break;
 1407            }
 1408
 1409            case StateHolding:
 1410            {
 1411                if (_state != StateActive) // Can only switch from active to holding.
 1412                {
 1413                    return;
 1414                }
 1415
 1416                // Stop accepting new connections.
 1417                if (_acceptor is not null)
 1418                {
 1419                    if (_instance.traceLevels().network >= 1)
 1420                    {
 1421                        _instance.initializationData().logger!.trace(
 1422                            _instance.traceLevels().networkCat,
 1423                            $"holding {_endpoint.protocol()} connections at {_acceptor}");
 1424                    }
 1425                    _adapter.getThreadPool().unregister(this, SocketOperation.Read);
 1426                }
 1427
 1428                foreach (ConnectionI connection in _connections)
 1429                {
 1430                    connection.hold();
 1431                }
 1432                break;
 1433            }
 1434
 1435            case StateClosed:
 1436            {
 1437                if (_acceptorStarted)
 1438                {
 1439                    _acceptorStarted = false;
 1440                    _adapter.getThreadPool().finish(this);
 1441                    closeAcceptor();
 1442                }
 1443                else
 1444                {
 1445                    state = StateFinished;
 1446                }
 1447
 1448                foreach (ConnectionI connection in _connections)
 1449                {
 1450                    connection.destroy(ConnectionI.ObjectAdapterDeactivated);
 1451                }
 1452                break;
 1453            }
 1454
 1455            case StateFinished:
 1456            {
 1457                Debug.Assert(_state == StateClosed);
 1458                break;
 1459            }
 1460        }
 1461
 1462        _state = state;
 1463        System.Threading.Monitor.PulseAll(_mutex);
 1464    }
 1465
 1466    private void createAcceptor()
 1467    {
 1468        try
 1469        {
 1470            Debug.Assert(!_acceptorStarted);
 1471            _acceptor = _endpoint.acceptor(_adapter.getName(), _adapter.getServerAuthenticationOptions());
 1472            Debug.Assert(_acceptor != null);
 1473
 1474            if (_instance.traceLevels().network >= 2)
 1475            {
 1476                var s = new StringBuilder("attempting to bind to ");
 1477                s.Append(_endpoint.protocol());
 1478                s.Append(" socket ");
 1479                s.Append(_acceptor.ToString());
 1480                _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1481            }
 1482            _endpoint = _acceptor.listen();
 1483
 1484            if (_instance.traceLevels().network >= 1)
 1485            {
 1486                var s = new StringBuilder("listening for ");
 1487                s.Append(_endpoint.protocol());
 1488                s.Append(" connections\n");
 1489                s.Append(_acceptor.toDetailedString());
 1490                _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1491            }
 1492
 1493            _adapter.getThreadPool().initialize(this);
 1494
 1495            if (_state == StateActive)
 1496            {
 1497                _adapter.getThreadPool().register(this, SocketOperation.Read);
 1498            }
 1499
 1500            _acceptorStarted = true;
 1501        }
 1502        catch (SystemException)
 1503        {
 1504            _acceptor?.close();
 1505            throw;
 1506        }
 1507    }
 1508
 1509    private void closeAcceptor()
 1510    {
 1511        Debug.Assert(_acceptor != null);
 1512
 1513        if (_instance.traceLevels().network >= 1)
 1514        {
 1515            var s = new StringBuilder("stopping to accept ");
 1516            s.Append(_endpoint.protocol());
 1517            s.Append(" connections at ");
 1518            s.Append(_acceptor.ToString());
 1519            _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1520        }
 1521
 1522        Debug.Assert(!_acceptorStarted);
 1523        _acceptor.close();
 1524    }
 1525
 1526    private void removeConnection(ConnectionI connection)
 1527    {
 1528        lock (_mutex)
 1529        {
 1530            if (_state is StateActive or StateHolding)
 1531            {
 1532                _connections.Remove(connection);
 1533            }
 1534            // else it's already being cleaned up.
 1535        }
 1536    }
 1537
 1538    private readonly Instance _instance;
 1539    private readonly ConnectionOptions _connectionOptions;
 1540
 1541    private readonly int _maxConnections;
 1542
 1543    private Acceptor? _acceptor;
 1544    private readonly Transceiver? _transceiver;
 1545    private EndpointI _endpoint;
 1546    private readonly ObjectAdapter _adapter;
 1547
 1548    private readonly bool _warn;
 1549
 1550    private readonly HashSet<ConnectionI> _connections;
 1551
 1552    private int _state;
 1553    private bool _acceptorStarted;
 1554    private LocalException? _acceptorException;
 1555    private readonly object _mutex = new();
 1556}