< Summary

Information
Class: Ice.Internal.IncomingConnectionFactory
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/Internal/ConnectionFactory.cs
Tag: 71_18251537082
Line coverage
82%
Covered lines: 229
Uncovered lines: 49
Coverable lines: 278
Total lines: 1556
Line coverage: 82.3%
Branch coverage
70%
Covered branches: 75
Total branches: 106
Branch coverage: 70.7%
Method coverage
95%
Covered methods: 20
Total methods: 21
Method coverage: 95.2%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
activate()100%11100%
hold()100%11100%
destroy()100%11100%
updateConnectionObservers()100%22100%
waitUntilHolding()100%44100%
waitUntilFinished()100%44100%
endpoint()100%11100%
connections()25%4.37471.43%
flushAsyncBatchRequests(...)50%3.19233.33%
startAsync(...)50%2293.75%
finishAsync(...)25%7.46440%
message(...)65%32.712068.33%
finished(...)50%2.02283.33%
ToString()0%7280%
connectionStartCompleted(...)100%22100%
connectionStartFailed(...)100%11100%
.ctor(...)83.33%12.161289.58%
setState(...)90.91%22.122293.75%
createAcceptor()62.5%8.34882.61%
closeAcceptor()100%22100%
removeConnection(...)100%44100%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/Internal/ConnectionFactory.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Text;
 5
 6#nullable enable
 7
 8namespace Ice.Internal;
 9
 10internal class MultiDictionary<K, V> : Dictionary<K, ICollection<V>> where K : class
 11{
 12    internal void Add(K key, V value)
 13    {
 14        if (!TryGetValue(key, out ICollection<V>? list))
 15        {
 16            list = [];
 17            Add(key, list);
 18        }
 19        list.Add(value);
 20    }
 21
 22    internal void Remove(K key, V value)
 23    {
 24        ICollection<V> list = this[key];
 25        list.Remove(value);
 26        if (list.Count == 0)
 27        {
 28            Remove(key);
 29        }
 30    }
 31}
 32
 33internal sealed class OutgoingConnectionFactory
 34{
 35    internal interface CreateConnectionCallback
 36    {
 37        void setConnection(ConnectionI connection, bool compress);
 38
 39        void setException(LocalException ex);
 40    }
 41
 42    internal void destroy()
 43    {
 44        lock (_mutex)
 45        {
 46            if (_destroyed)
 47            {
 48                return;
 49            }
 50
 51            foreach (ICollection<ConnectionI> connections in _connections.Values)
 52            {
 53                foreach (ConnectionI c in connections)
 54                {
 55                    c.destroy(ConnectionI.CommunicatorDestroyed);
 56                }
 57            }
 58
 59            _destroyed = true;
 60            _defaultObjectAdapter = null;
 61            Monitor.PulseAll(_mutex);
 62        }
 63    }
 64
 65    internal void updateConnectionObservers()
 66    {
 67        lock (_mutex)
 68        {
 69            foreach (ICollection<ConnectionI> connections in _connections.Values)
 70            {
 71                foreach (ConnectionI c in connections)
 72                {
 73                    c.updateObserver();
 74                }
 75            }
 76        }
 77    }
 78
 79    internal void waitUntilFinished()
 80    {
 81        Dictionary<Connector, ICollection<ConnectionI>> connections;
 82        lock (_mutex)
 83        {
 84            // First we wait until the factory is destroyed. We also wait until there are no pending connections
 85            // anymore. Only then we can be sure the _connections contains all connections.
 86            while (!_destroyed || _pending.Count > 0 || _pendingConnectCount > 0)
 87            {
 88                Monitor.Wait(_mutex);
 89            }
 90
 91            // We want to wait until all connections are finished outside the thread synchronization.
 92            connections = new Dictionary<Connector, ICollection<ConnectionI>>(_connections);
 93        }
 94
 95        // Now we wait until the destruction of each connection is finished.
 96        foreach (ICollection<ConnectionI> cl in connections.Values)
 97        {
 98            foreach (ConnectionI c in cl)
 99            {
 100                c.waitUntilFinished();
 101            }
 102        }
 103    }
 104
 105    internal void create(List<EndpointI> endpoints, bool hasMore, CreateConnectionCallback callback)
 106    {
 107        Debug.Assert(endpoints.Count > 0);
 108
 109        // Try to find a connection to one of the given endpoints.
 110        try
 111        {
 112            if (findConnection(endpoints, out bool compress) is ConnectionI connection)
 113            {
 114                callback.setConnection(connection, compress);
 115                return;
 116            }
 117        }
 118        catch (LocalException ex)
 119        {
 120            callback.setException(ex);
 121            return;
 122        }
 123
 124        var cb = new ConnectCallback(this, endpoints, hasMore, callback);
 125        cb.getConnectors();
 126    }
 127
 128    internal void setRouterInfo(RouterInfo routerInfo)
 129    {
 130        Debug.Assert(routerInfo is not null);
 131        ObjectAdapter adapter = routerInfo.getAdapter();
 132        EndpointI[] endpoints = routerInfo.getClientEndpoints(); // Must be called outside the synchronization
 133
 134        lock (_mutex)
 135        {
 136            if (_destroyed)
 137            {
 138                throw new CommunicatorDestroyedException();
 139            }
 140
 141            // Search for connections to the router's client proxy endpoints, and update the object adapter for such
 142            // connections, so that callbacks from the router can be received over such connections.
 143            for (int i = 0; i < endpoints.Length; i++)
 144            {
 145                EndpointI endpoint = endpoints[i];
 146
 147                // The Ice.ConnectionI object does not take the compression flag of endpoints into account, but instead
 148                // gets the information about whether messages should be compressed or not fro other sources. In order
 149                // to allow connection sharing for endpoints that differ in the value of the compression flag only, we
 150                // always set the compression flag to false here in this connection factory. We also clear the timeout
 151                // as it is no longer used for Ice 3.8.
 152                endpoint = endpoint.compress(false).timeout(-1);
 153
 154                foreach (ICollection<ConnectionI> connections in _connections.Values)
 155                {
 156                    foreach (ConnectionI connection in connections)
 157                    {
 158                        if (connection.endpoint().Equals(endpoint))
 159                        {
 160                            connection.setAdapter(adapter);
 161                        }
 162                    }
 163                }
 164            }
 165        }
 166    }
 167
 168    internal void removeAdapter(ObjectAdapter adapter)
 169    {
 170        lock (_mutex)
 171        {
 172            if (_destroyed)
 173            {
 174                return;
 175            }
 176
 177            foreach (ICollection<ConnectionI> connectionList in _connections.Values)
 178            {
 179                foreach (ConnectionI connection in connectionList)
 180                {
 181                    if (connection.getAdapter() == adapter)
 182                    {
 183                        connection.setAdapter(null);
 184                    }
 185                }
 186            }
 187        }
 188    }
 189
 190    internal void flushAsyncBatchRequests(CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
 191    {
 192        var c = new List<ConnectionI>();
 193
 194        lock (_mutex)
 195        {
 196            if (!_destroyed)
 197            {
 198                foreach (ICollection<ConnectionI> connectionList in _connections.Values)
 199                {
 200                    foreach (ConnectionI conn in connectionList)
 201                    {
 202                        if (conn.isActiveOrHolding())
 203                        {
 204                            c.Add(conn);
 205                        }
 206                    }
 207                }
 208            }
 209        }
 210
 211        foreach (ConnectionI conn in c)
 212        {
 213            try
 214            {
 215                outAsync.flushConnection(conn, compressBatch);
 216            }
 217            catch (LocalException)
 218            {
 219                // Ignore.
 220            }
 221        }
 222    }
 223
 224    // Only for use by Instance.
 225    internal OutgoingConnectionFactory(Instance instance)
 226    {
 227        _instance = instance;
 228        _connectionOptions = instance.clientConnectionOptions;
 229        _destroyed = false;
 230        _pendingConnectCount = 0;
 231    }
 232
 233    internal void setDefaultObjectAdapter(ObjectAdapter? adapter)
 234    {
 235        lock (_mutex)
 236        {
 237            _defaultObjectAdapter = adapter;
 238        }
 239    }
 240
 241    internal ObjectAdapter? getDefaultObjectAdapter()
 242    {
 243        lock (_mutex)
 244        {
 245            return _defaultObjectAdapter;
 246        }
 247    }
 248
 249    private ConnectionI? findConnection(List<EndpointI> endpoints, out bool compress)
 250    {
 251        lock (_mutex)
 252        {
 253            if (_destroyed)
 254            {
 255                throw new CommunicatorDestroyedException();
 256            }
 257
 258            DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 259            Debug.Assert(endpoints.Count > 0);
 260
 261            foreach (EndpointI proxyEndpoint in endpoints)
 262            {
 263                EndpointI endpoint = proxyEndpoint.timeout(-1); // Clear the timeout
 264
 265                if (!_connectionsByEndpoint.TryGetValue(endpoint, out ICollection<ConnectionI>? connectionList))
 266                {
 267                    continue;
 268                }
 269
 270                foreach (ConnectionI connection in connectionList)
 271                {
 272                    if (connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections
 273                    {
 274                        compress = defaultsAndOverrides.overrideCompress ?? endpoint.compress();
 275                        return connection;
 276                    }
 277                }
 278            }
 279
 280            compress = false; // Satisfy the compiler
 281            return null;
 282        }
 283    }
 284
 285    // Must be called while synchronized.
 286    private ConnectionI? findConnection(List<ConnectorInfo> connectors, out bool compress)
 287    {
 288        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 289        foreach (ConnectorInfo ci in connectors)
 290        {
 291            if (_pending.ContainsKey(ci.connector))
 292            {
 293                continue;
 294            }
 295
 296            if (!_connections.TryGetValue(ci.connector, out ICollection<ConnectionI>? connectionList))
 297            {
 298                continue;
 299            }
 300
 301            foreach (ConnectionI connection in connectionList)
 302            {
 303                if (connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections
 304                {
 305                    compress = defaultsAndOverrides.overrideCompress ?? ci.endpoint.compress();
 306                    return connection;
 307                }
 308            }
 309        }
 310
 311        compress = false; // Satisfy the compiler
 312        return null;
 313    }
 314
 315    internal void incPendingConnectCount()
 316    {
 317        // Keep track of the number of pending connects. The outgoing connection factory waitUntilFinished() method
 318        // waits for all the pending connects to terminate before return. This ensures that the communicator client
 319        // thread pool isn't destroyed too soon and will still be available to execute the ice_exception() callbacks
 320        // for the asynchronous requests waiting on a connection to be established.
 321
 322        lock (_mutex)
 323        {
 324            if (_destroyed)
 325            {
 326                throw new CommunicatorDestroyedException();
 327            }
 328            ++_pendingConnectCount;
 329        }
 330    }
 331
 332    internal void decPendingConnectCount()
 333    {
 334        lock (_mutex)
 335        {
 336            --_pendingConnectCount;
 337            Debug.Assert(_pendingConnectCount >= 0);
 338            if (_destroyed && _pendingConnectCount == 0)
 339            {
 340                Monitor.PulseAll(_mutex);
 341            }
 342        }
 343    }
 344
 345    private ConnectionI? getConnection(List<ConnectorInfo> connectors, ConnectCallback cb, out bool compress)
 346    {
 347        lock (_mutex)
 348        {
 349            if (_destroyed)
 350            {
 351                throw new CommunicatorDestroyedException();
 352            }
 353
 354            // Search for an existing connections matching one of the given endpoints.
 355            if (findConnection(connectors, out compress) is ConnectionI connection)
 356            {
 357                return connection;
 358            }
 359
 360            if (addToPending(cb, connectors))
 361            {
 362                // A connection to one of our endpoints is pending. The callback will be notified once the connection
 363                // is established. Returning null indicates that the connection is still pending.
 364                return null;
 365            }
 366        }
 367
 368        // No connection is pending. Call nextConnector to initiate connection establishment. Return null to indicate
 369        // that the connection is still pending.
 370        cb.nextConnector();
 371        return null;
 372    }
 373
 374    private ConnectionI createConnection(Transceiver transceiver, ConnectorInfo ci)
 375    {
 376        lock (_mutex)
 377        {
 378            Debug.Assert(_pending.ContainsKey(ci.connector) && transceiver != null);
 379
 380            // Create and add the connection to the connection map. Adding the connection to the map is necessary to
 381            // support the interruption of the connection initialization and validation in case the communicator is
 382            // destroyed.
 383            ConnectionI connection;
 384            try
 385            {
 386                if (_destroyed)
 387                {
 388                    throw new CommunicatorDestroyedException();
 389                }
 390
 391                connection = new ConnectionI(
 392                    _instance,
 393                    transceiver,
 394                    ci.connector,
 395                    ci.endpoint.compress(false).timeout(-1),
 396                    adapter: _defaultObjectAdapter,
 397                    removeConnection,
 398                    _connectionOptions);
 399            }
 400            catch (LocalException)
 401            {
 402                try
 403                {
 404                    transceiver.close();
 405                }
 406                catch (LocalException)
 407                {
 408                    // Ignore
 409                }
 410                throw;
 411            }
 412
 413            _connections.Add(ci.connector, connection);
 414            _connectionsByEndpoint.Add(connection.endpoint(), connection);
 415            _connectionsByEndpoint.Add(connection.endpoint().compress(true), connection);
 416            return connection;
 417        }
 418    }
 419
 420    private void finishGetConnection(
 421        List<ConnectorInfo> connectors,
 422        ConnectorInfo ci,
 423        ConnectionI connection,
 424        ConnectCallback cb)
 425    {
 426        var connectionCallbacks = new HashSet<ConnectCallback>
 427        {
 428            cb
 429        };
 430
 431        var callbacks = new HashSet<ConnectCallback>();
 432        lock (_mutex)
 433        {
 434            foreach (ConnectorInfo c in connectors)
 435            {
 436                if (_pending.TryGetValue(c.connector, out HashSet<ConnectCallback>? s))
 437                {
 438                    foreach (ConnectCallback cc in s)
 439                    {
 440                        if (cc.hasConnector(ci))
 441                        {
 442                            connectionCallbacks.Add(cc);
 443                        }
 444                        else
 445                        {
 446                            callbacks.Add(cc);
 447                        }
 448                    }
 449                    _pending.Remove(c.connector);
 450                }
 451            }
 452
 453            foreach (ConnectCallback cc in connectionCallbacks)
 454            {
 455                cc.removeFromPending();
 456                callbacks.Remove(cc);
 457            }
 458
 459            foreach (ConnectCallback cc in callbacks)
 460            {
 461                cc.removeFromPending();
 462            }
 463            Monitor.PulseAll(_mutex);
 464        }
 465
 466        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 467        bool compress = defaultsAndOverrides.overrideCompress ?? ci.endpoint.compress();
 468
 469        foreach (ConnectCallback cc in callbacks)
 470        {
 471            cc.getConnection();
 472        }
 473
 474        foreach (ConnectCallback cc in connectionCallbacks)
 475        {
 476            cc.setConnection(connection, compress);
 477        }
 478    }
 479
 480    private void finishGetConnection(List<ConnectorInfo> connectors, LocalException ex, ConnectCallback cb)
 481    {
 482        var failedCallbacks = new HashSet<ConnectCallback>
 483        {
 484            cb
 485        };
 486
 487        var callbacks = new HashSet<ConnectCallback>();
 488        lock (_mutex)
 489        {
 490            foreach (ConnectorInfo c in connectors)
 491            {
 492                if (_pending.TryGetValue(c.connector, out HashSet<ConnectCallback>? s))
 493                {
 494                    foreach (ConnectCallback cc in s)
 495                    {
 496                        if (cc.removeConnectors(connectors))
 497                        {
 498                            failedCallbacks.Add(cc);
 499                        }
 500                        else
 501                        {
 502                            callbacks.Add(cc);
 503                        }
 504                    }
 505                    _pending.Remove(c.connector);
 506                }
 507            }
 508
 509            foreach (ConnectCallback cc in callbacks)
 510            {
 511                Debug.Assert(!failedCallbacks.Contains(cc));
 512                cc.removeFromPending();
 513            }
 514            Monitor.PulseAll(_mutex);
 515        }
 516
 517        foreach (ConnectCallback cc in callbacks)
 518        {
 519            cc.getConnection();
 520        }
 521
 522        foreach (ConnectCallback cc in failedCallbacks)
 523        {
 524            cc.setException(ex);
 525        }
 526    }
 527
 528    private void handleConnectionException(LocalException ex, bool hasMore)
 529    {
 530        TraceLevels traceLevels = _instance.traceLevels();
 531        if (traceLevels.network >= 2)
 532        {
 533            var s = new StringBuilder();
 534            s.Append("connection to endpoint failed");
 535            if (ex is CommunicatorDestroyedException)
 536            {
 537                s.Append('\n');
 538            }
 539            else
 540            {
 541                if (hasMore)
 542                {
 543                    s.Append(", trying next endpoint\n");
 544                }
 545                else
 546                {
 547                    s.Append(" and no more endpoints to try\n");
 548                }
 549            }
 550            s.Append(ex);
 551            _instance.initializationData().logger!.trace(traceLevels.networkCat, s.ToString());
 552        }
 553    }
 554
 555    private bool addToPending(ConnectCallback cb, List<ConnectorInfo> connectors)
 556    {
 557        // Add the callback to each connector pending list.
 558        bool found = false;
 559        foreach (ConnectorInfo ci in connectors)
 560        {
 561            if (_pending.TryGetValue(ci.connector, out HashSet<ConnectCallback>? cbs))
 562            {
 563                found = true;
 564                cbs.Add(cb); // Add the callback to each pending connector.
 565            }
 566        }
 567
 568        if (found)
 569        {
 570            return true;
 571        }
 572
 573        // If no pending connection exists for the specified connectors, the caller is responsible for initiating
 574        // connection establishment. An empty pending list is added, and any additional callbacks for the same
 575        // connectors will be queued.
 576        foreach (ConnectorInfo ci in connectors)
 577        {
 578            if (!_pending.ContainsKey(ci.connector))
 579            {
 580                _pending.Add(ci.connector, []);
 581            }
 582        }
 583        return false;
 584    }
 585
 586    private void removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors)
 587    {
 588        foreach (ConnectorInfo ci in connectors)
 589        {
 590            if (_pending.TryGetValue(ci.connector, out HashSet<ConnectCallback>? cbs))
 591            {
 592                cbs.Remove(cb);
 593            }
 594        }
 595    }
 596
 597    private void removeConnection(ConnectionI connection)
 598    {
 599        lock (_mutex)
 600        {
 601            if (_destroyed)
 602            {
 603                return;
 604            }
 605
 606            _connections.Remove(connection.connector(), connection);
 607            _connectionsByEndpoint.Remove(connection.endpoint(), connection);
 608            _connectionsByEndpoint.Remove(connection.endpoint().compress(true), connection);
 609        }
 610    }
 611
 612    internal void handleException(LocalException ex, bool hasMore)
 613    {
 614        TraceLevels traceLevels = _instance.traceLevels();
 615        if (traceLevels.network >= 2)
 616        {
 617            var s = new StringBuilder();
 618            s.Append("couldn't resolve endpoint host");
 619            if (ex is CommunicatorDestroyedException)
 620            {
 621                s.Append('\n');
 622            }
 623            else
 624            {
 625                if (hasMore)
 626                {
 627                    s.Append(", trying next endpoint\n");
 628                }
 629                else
 630                {
 631                    s.Append(" and no more endpoints to try\n");
 632                }
 633            }
 634            s.Append(ex);
 635            _instance.initializationData().logger!.trace(traceLevels.networkCat, s.ToString());
 636        }
 637    }
 638
 639    private class ConnectorInfo
 640    {
 641        internal ConnectorInfo(Connector c, EndpointI e)
 642        {
 643            connector = c;
 644            endpoint = e;
 645        }
 646
 647        public override bool Equals(object? obj) =>
 648            (obj is ConnectorInfo r) && connector.Equals(r.connector);
 649
 650        public override int GetHashCode() => connector.GetHashCode();
 651
 652        internal Connector connector;
 653        internal EndpointI endpoint;
 654    }
 655
 656    private class ConnectCallback : ConnectionI.StartCallback, EndpointI_connectors
 657    {
 658        internal ConnectCallback(
 659            OutgoingConnectionFactory factory,
 660            List<EndpointI> endpoints,
 661            bool more,
 662            CreateConnectionCallback cb)
 663        {
 664            _factory = factory;
 665            _endpoints = endpoints;
 666            _hasMore = more;
 667            _callback = cb;
 668            _endpointsIter = 0;
 669        }
 670
 671        //
 672        // Methods from ConnectionI.StartCallback
 673        //
 674        public void connectionStartCompleted(ConnectionI connection)
 675        {
 676            _observer?.detach();
 677            connection.activate();
 678            Debug.Assert(_current is not null);
 679            _factory.finishGetConnection(_connectors, _current, connection, this);
 680        }
 681
 682        public void connectionStartFailed(ConnectionI connection, LocalException ex)
 683        {
 684            if (connectionStartFailedImpl(ex))
 685            {
 686                nextConnector();
 687            }
 688        }
 689
 690        //
 691        // Methods from EndpointI_connectors
 692        //
 693        public void connectors(List<Connector> cons)
 694        {
 695            Debug.Assert(_currentEndpoint is not null);
 696            foreach (Connector connector in cons)
 697            {
 698                _connectors.Add(new ConnectorInfo(connector, _currentEndpoint));
 699            }
 700
 701            if (_endpointsIter < _endpoints.Count)
 702            {
 703                nextEndpoint();
 704            }
 705            else
 706            {
 707                Debug.Assert(_connectors.Count > 0);
 708
 709                //
 710                // We now have all the connectors for the given endpoints. We can try to obtain the
 711                // connection.
 712                //
 713                _iter = 0;
 714                getConnection();
 715            }
 716        }
 717
 718        public void exception(LocalException ex)
 719        {
 720            _factory.handleException(ex, _hasMore || _endpointsIter < _endpoints.Count);
 721            if (_endpointsIter < _endpoints.Count)
 722            {
 723                nextEndpoint();
 724            }
 725            else if (_connectors.Count > 0)
 726            {
 727                //
 728                // We now have all the connectors for the given endpoints. We can try to obtain the
 729                // connection.
 730                //
 731                _iter = 0;
 732                getConnection();
 733            }
 734            else
 735            {
 736                _callback.setException(ex);
 737                _factory.decPendingConnectCount(); // Must be called last.
 738            }
 739        }
 740
 741        internal void setConnection(ConnectionI connection, bool compress)
 742        {
 743            //
 744            // Callback from the factory: the connection to one of the callback
 745            // connectors has been established.
 746            //
 747            _callback.setConnection(connection, compress);
 748            _factory.decPendingConnectCount(); // Must be called last.
 749        }
 750
 751        internal void setException(LocalException ex)
 752        {
 753            //
 754            // Callback from the factory: connection establishment failed.
 755            //
 756            _callback.setException(ex);
 757            _factory.decPendingConnectCount(); // Must be called last.
 758        }
 759
 760        internal bool hasConnector(ConnectorInfo ci) => _connectors.Contains(ci);
 761
 762        internal bool removeConnectors(List<ConnectorInfo> connectors)
 763        {
 764            _connectors.RemoveAll(ci => connectors.Contains(ci));
 765            return _connectors.Count == 0;
 766        }
 767
 768        internal void removeFromPending() => _factory.removeFromPending(this, _connectors);
 769
 770        internal void getConnectors()
 771        {
 772            try
 773            {
 774                // Notify the factory that there's an async connect pending. This is necessary
 775                // to prevent the outgoing connection factory to be destroyed before all the
 776                // pending asynchronous connects are finished.
 777                _factory.incPendingConnectCount();
 778            }
 779            catch (LocalException ex)
 780            {
 781                _callback.setException(ex);
 782                return;
 783            }
 784
 785            nextEndpoint();
 786        }
 787
 788        private void nextEndpoint()
 789        {
 790            try
 791            {
 792                Debug.Assert(_endpointsIter < _endpoints.Count);
 793                _currentEndpoint = _endpoints[_endpointsIter++];
 794                _currentEndpoint.connectors_async(this);
 795            }
 796            catch (LocalException ex)
 797            {
 798                exception(ex);
 799            }
 800        }
 801
 802        internal void getConnection()
 803        {
 804            try
 805            {
 806                // If all the connectors have been created, we ask the factory to get a connection.
 807                ConnectionI? connection = _factory.getConnection(_connectors, this, out bool compress);
 808                if (connection is null)
 809                {
 810                    // A null return value from getConnection indicates that the connection is being established and
 811                    // the callback will be notified when the connection establishment is done.
 812                    return;
 813                }
 814
 815                _callback.setConnection(connection, compress);
 816                _factory.decPendingConnectCount(); // Must be called last.
 817            }
 818            catch (LocalException ex)
 819            {
 820                _callback.setException(ex);
 821                _factory.decPendingConnectCount(); // Must be called last.
 822            }
 823        }
 824
 825        internal void nextConnector()
 826        {
 827            while (true)
 828            {
 829                try
 830                {
 831                    Debug.Assert(_iter < _connectors.Count);
 832                    _current = _connectors[_iter++];
 833                    Debug.Assert(_current is not null);
 834
 835                    if (
 836                        _factory._instance.initializationData().observer is
 837                        Instrumentation.CommunicatorObserver observer)
 838                    {
 839                        _observer = observer.getConnectionEstablishmentObserver(
 840                            _current.endpoint,
 841                            _current.connector.ToString()!);
 842
 843                        _observer?.attach();
 844                    }
 845
 846                    if (_factory._instance.traceLevels().network >= 2)
 847                    {
 848                        var s = new StringBuilder("trying to establish ");
 849                        s.Append(_current.endpoint.protocol());
 850                        s.Append(" connection to ");
 851                        s.Append(_current.connector.ToString());
 852                        _factory._instance.initializationData().logger!.trace(
 853                            _factory._instance.traceLevels().networkCat, s.ToString());
 854                    }
 855
 856                    ConnectionI connection = _factory.createConnection(_current.connector.connect(), _current);
 857                    connection.start(this);
 858                }
 859                catch (LocalException ex)
 860                {
 861                    if (_factory._instance.traceLevels().network >= 2)
 862                    {
 863                        Debug.Assert(_current is not null);
 864                        var s = new StringBuilder("failed to establish ");
 865                        s.Append(_current.endpoint.protocol());
 866                        s.Append(" connection to ");
 867                        s.Append(_current.connector.ToString());
 868                        s.Append('\n');
 869                        s.Append(ex);
 870                        _factory._instance.initializationData().logger!.trace(
 871                            _factory._instance.traceLevels().networkCat, s.ToString());
 872                    }
 873
 874                    if (connectionStartFailedImpl(ex))
 875                    {
 876                        continue;
 877                    }
 878                }
 879                break;
 880            }
 881        }
 882
 883        private bool connectionStartFailedImpl(LocalException ex)
 884        {
 885            if (_observer != null)
 886            {
 887                _observer.failed(ex.ice_id());
 888                _observer.detach();
 889            }
 890            _factory.handleConnectionException(ex, _hasMore || _iter < _connectors.Count); // just logging
 891
 892            // We stop on ConnectTimeoutException to fail reasonably fast when the endpoint has many connectors
 893            // (IP addresses).
 894            if (_iter < _connectors.Count && !(ex is CommunicatorDestroyedException or ConnectTimeoutException))
 895            {
 896                return true; // keep going
 897            }
 898
 899            _factory.finishGetConnection(_connectors, ex, this);
 900            return false;
 901        }
 902
 903        private readonly OutgoingConnectionFactory _factory;
 904        private readonly bool _hasMore;
 905        private readonly CreateConnectionCallback _callback;
 906        private readonly List<EndpointI> _endpoints;
 907        private int _endpointsIter;
 908        private EndpointI? _currentEndpoint;
 909        private readonly List<ConnectorInfo> _connectors = [];
 910        private int _iter;
 911        private ConnectorInfo? _current;
 912        private Instrumentation.Observer? _observer;
 913    }
 914
 915    private readonly Instance _instance;
 916    private readonly ConnectionOptions _connectionOptions;
 917    private bool _destroyed;
 918
 919    private readonly MultiDictionary<Connector, ConnectionI> _connections = [];
 920    private readonly MultiDictionary<EndpointI, ConnectionI> _connectionsByEndpoint = [];
 921    private readonly Dictionary<Connector, HashSet<ConnectCallback>> _pending = [];
 922    private int _pendingConnectCount;
 923
 924    private ObjectAdapter? _defaultObjectAdapter;
 925    private readonly object _mutex = new();
 926}
 927
 928internal sealed class IncomingConnectionFactory : EventHandler, ConnectionI.StartCallback
 929{
 930    internal void activate()
 931    {
 1932        lock (_mutex)
 933        {
 1934            setState(StateActive);
 1935        }
 1936    }
 937
 938    internal void hold()
 939    {
 1940        lock (_mutex)
 941        {
 1942            setState(StateHolding);
 1943        }
 1944    }
 945
 946    internal void destroy()
 947    {
 1948        lock (_mutex)
 949        {
 1950            setState(StateClosed);
 1951        }
 1952    }
 953
 954    internal void updateConnectionObservers()
 955    {
 1956        lock (_mutex)
 957        {
 1958            foreach (ConnectionI connection in _connections)
 959            {
 1960                connection.updateObserver();
 961            }
 962        }
 1963    }
 964
 965    internal void waitUntilHolding()
 966    {
 967        ICollection<ConnectionI> connections;
 968
 1969        lock (_mutex)
 970        {
 971            // First we wait until the connection factory itself is in holding state.
 1972            while (_state < StateHolding)
 973            {
 1974                Monitor.Wait(_mutex);
 975            }
 976
 977            // We want to wait until all connections are in holding state outside the thread synchronization.
 1978            connections = new List<ConnectionI>(_connections);
 1979        }
 980
 981        // Now we wait until each connection is in holding state.
 1982        foreach (ConnectionI connection in connections)
 983        {
 1984            connection.waitUntilHolding();
 985        }
 1986    }
 987
 988    internal void waitUntilFinished()
 989    {
 990        ICollection<ConnectionI> connections;
 991
 1992        lock (_mutex)
 993        {
 994            // First we wait until the factory is destroyed. If we are using an acceptor, we also wait for it to be
 995            // closed.
 1996            while (_state != StateFinished)
 997            {
 1998                Monitor.Wait(_mutex);
 999            }
 1000
 1001            // We want to wait until all connections are finished outside the thread synchronization.
 11002            connections = new List<ConnectionI>(_connections);
 11003        }
 1004
 11005        foreach (ConnectionI connection in connections)
 1006        {
 11007            connection.waitUntilFinished();
 1008        }
 1009
 11010        lock (_mutex)
 1011        {
 11012            _connections.Clear();
 11013        }
 11014    }
 1015
 1016    internal EndpointI endpoint()
 1017    {
 11018        lock (_mutex)
 1019        {
 11020            return _endpoint;
 1021        }
 11022    }
 1023
 1024    internal ICollection<ConnectionI> connections()
 1025    {
 11026        lock (_mutex)
 1027        {
 11028            ICollection<ConnectionI> connections = [];
 1029
 1030            // Only copy connections which have not been destroyed.
 11031            foreach (ConnectionI connection in _connections)
 1032            {
 01033                if (connection.isActiveOrHolding())
 1034                {
 01035                    connections.Add(connection);
 1036                }
 1037            }
 1038
 11039            return connections;
 1040        }
 11041    }
 1042
 1043    internal void flushAsyncBatchRequests(CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)
 1044    {
 1045        // connections() is synchronized, no need to synchronize here.
 11046        foreach (ConnectionI connection in connections())
 1047        {
 1048            try
 1049            {
 01050                outAsync.flushConnection(connection, compressBatch);
 01051            }
 01052            catch (LocalException)
 1053            {
 1054                // Ignore.
 01055            }
 1056        }
 11057    }
 1058
 1059    //
 1060    // Operations from EventHandler.
 1061    //
 1062    public override bool startAsync(int operation, AsyncCallback completedCallback)
 1063    {
 11064        if (_state >= StateClosed)
 1065        {
 01066            return false;
 1067        }
 1068
 1069        // Run the IO operation on a .NET thread pool thread to ensure the IO operation won't be interrupted if the
 1070        // Ice thread pool thread is terminated.
 11071        Task.Run(() =>
 11072        {
 11073            lock (_mutex)
 11074            {
 11075                if (_state >= StateClosed)
 11076                {
 11077                    completedCallback(this);
 11078                    return;
 11079                }
 11080
 11081                try
 11082                {
 11083                    Debug.Assert(_acceptor is not null);
 11084                    if (_acceptor.startAccept(completedCallback, this))
 11085                    {
 11086                        completedCallback(this);
 11087                    }
 11088                }
 01089                catch (LocalException ex)
 11090                {
 01091                    _acceptorException = ex;
 01092                    completedCallback(this);
 01093                }
 11094            }
 11095        });
 1096
 11097        return true;
 1098    }
 1099
 1100    public override bool finishAsync(int operation)
 1101    {
 1102        try
 1103        {
 11104            if (_acceptorException != null)
 1105            {
 01106                throw _acceptorException;
 1107            }
 1108            Debug.Assert(_acceptor is not null);
 11109            _acceptor.finishAccept();
 11110        }
 01111        catch (LocalException ex)
 1112        {
 01113            _acceptorException = null;
 01114            if (_warn)
 1115            {
 01116                _instance.initializationData().logger!.warning($"error accepting connection:\n{ex}\n{_acceptor}");
 1117            }
 01118        }
 11119        return _state < StateClosed;
 1120    }
 1121
 1122    public override void message(ThreadPoolCurrent current)
 1123    {
 11124        ConnectionI? connection = null;
 1125
 11126        using var msg = new ThreadPoolMessage(current, this);
 1127
 11128        lock (_mutex)
 1129        {
 11130            if (!msg.startIOScope())
 1131            {
 11132                return;
 1133            }
 1134
 1135            try
 1136            {
 11137                if (_state >= StateClosed)
 1138                {
 01139                    return;
 1140                }
 11141                else if (_state == StateHolding)
 1142                {
 01143                    return;
 1144                }
 1145
 11146                if (!_acceptorStarted)
 1147                {
 01148                    return;
 1149                }
 1150
 1151                //
 1152                // Now accept a new connection.
 1153                //
 11154                Transceiver? transceiver = null;
 1155                try
 1156                {
 1157                    Debug.Assert(_acceptor is not null);
 11158                    transceiver = _acceptor.accept();
 1159
 11160                    if (_maxConnections > 0 && _connections.Count == _maxConnections)
 1161                    {
 1162                        // Can't accept more connections, so we abort this transport connection.
 1163
 11164                        if (_instance.traceLevels().network >= 2)
 1165                        {
 11166                            _instance.initializationData().logger!.trace(
 11167                                _instance.traceLevels().networkCat,
 11168                                $"rejecting new {_endpoint.protocol()} connection\n{transceiver}\nbecause the maximum nu
 1169                        }
 1170
 1171                        try
 1172                        {
 11173                            transceiver.close();
 11174                        }
 01175                        catch
 1176                        {
 1177                            // Ignore
 01178                        }
 11179                        transceiver.destroy();
 11180                        return;
 1181                    }
 1182
 11183                    if (_instance.traceLevels().network >= 2)
 1184                    {
 11185                        var s = new StringBuilder("trying to accept ");
 11186                        s.Append(_endpoint.protocol());
 11187                        s.Append(" connection\n");
 11188                        s.Append(transceiver.ToString());
 11189                        _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1190                    }
 11191                }
 01192                catch (LocalException ex)
 1193                {
 01194                    if (_warn)
 1195                    {
 01196                        _instance.initializationData().logger!.warning(
 01197                            $"error accepting connection:\n{ex}\n{_acceptor}");
 1198                    }
 01199                    return;
 1200                }
 1201
 1202                Debug.Assert(transceiver is not null);
 1203
 1204                try
 1205                {
 11206                    connection = new ConnectionI(
 11207                        _instance,
 11208                        transceiver,
 11209                        connector: null,
 11210                        _endpoint,
 11211                        _adapter,
 11212                        removeConnection,
 11213                        _connectionOptions);
 11214                }
 01215                catch (LocalException ex)
 1216                {
 1217                    try
 1218                    {
 01219                        transceiver.close();
 01220                    }
 01221                    catch (LocalException)
 1222                    {
 1223                        // Ignore
 01224                    }
 1225
 01226                    if (_warn)
 1227                    {
 01228                        _instance.initializationData().logger!.warning(
 01229                            $"error accepting connection:\n{ex}\n{_acceptor}");
 1230                    }
 01231                    return;
 1232                }
 1233
 11234                _connections.Add(connection);
 11235            }
 1236            finally
 1237            {
 11238                msg.finishIOScope();
 11239            }
 1240        }
 1241
 1242        Debug.Assert(connection != null);
 11243        connection.start(this);
 11244    }
 1245
 1246    public override void finished(ThreadPoolCurrent current)
 1247    {
 11248        lock (_mutex)
 1249        {
 11250            if (_state < StateClosed)
 1251            {
 01252                return;
 1253            }
 1254
 1255            Debug.Assert(_state >= StateClosed);
 11256            setState(StateFinished);
 11257        }
 11258    }
 1259
 01260    public override string ToString() => _transceiver?.ToString() ?? _acceptor?.ToString() ?? "";
 1261
 1262    //
 1263    // Operations from ConnectionI.StartCallback
 1264    //
 1265    public void connectionStartCompleted(ConnectionI connection)
 1266    {
 11267        lock (_mutex)
 1268        {
 1269            // Initially, connections are in the holding state. If the factory is active we activate the connection.
 11270            if (_state == StateActive)
 1271            {
 11272                connection.activate();
 1273            }
 11274        }
 11275    }
 1276
 1277    public void connectionStartFailed(ConnectionI connection, LocalException ex)
 1278    {
 1279        // Do not warn about connection exceptions here. The connection is not yet validated.
 11280    }
 1281
 11282    internal IncomingConnectionFactory(Instance instance, EndpointI endpoint, ObjectAdapter adapter)
 1283    {
 11284        _instance = instance;
 11285        _connectionOptions = instance.serverConnectionOptions(adapter.getName());
 1286
 1287        // Meaningful only for non-datagram (non-UDP) connections.
 11288        _maxConnections = endpoint.datagram() ? 0 :
 11289            instance.initializationData().properties!.getPropertyAsInt($"{adapter.getName()}.MaxConnections");
 1290
 11291        _endpoint = endpoint;
 11292        _adapter = adapter;
 11293        _warn = _instance.initializationData().properties!.getIcePropertyAsInt("Ice.Warn.Connections") > 0;
 11294        _connections = [];
 11295        _state = StateHolding;
 11296        _acceptorStarted = false;
 1297
 11298        DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides();
 1299
 11300        if (defaultsAndOverrides.overrideCompress is not null)
 1301        {
 11302            _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompress.Value);
 1303        }
 1304
 1305        try
 1306        {
 11307            _transceiver = _endpoint.transceiver();
 11308            if (_transceiver != null)
 1309            {
 1310                // All this is for UDP "connections".
 1311
 11312                if (_instance.traceLevels().network >= 2)
 1313                {
 11314                    var s = new StringBuilder("attempting to bind to ");
 11315                    s.Append(_endpoint.protocol());
 11316                    s.Append(" socket\n");
 11317                    s.Append(_transceiver.ToString());
 11318                    _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1319                }
 11320                _endpoint = _transceiver.bind();
 1321
 11322                var connection = new ConnectionI(
 11323                    _instance,
 11324                    _transceiver,
 11325                    connector: null,
 11326                    _endpoint,
 11327                    adapter,
 11328                    removeFromFactory: null,
 11329                    _connectionOptions);
 11330                connection.startAndWait();
 11331                _connections.Add(connection);
 1332                Debug.Assert(_maxConnections == 0); // UDP so no max connections.
 1333            }
 1334            else
 1335            {
 11336                createAcceptor();
 1337            }
 11338        }
 11339        catch (System.Exception ex)
 1340        {
 1341            //
 1342            // Clean up.
 1343            //
 11344            if (_transceiver != null)
 1345            {
 1346                try
 1347                {
 01348                    _transceiver.close();
 01349                }
 01350                catch (LocalException)
 1351                {
 1352                    // Ignore
 01353                }
 1354            }
 1355
 11356            _state = StateFinished;
 11357            _connections.Clear();
 1358
 11359            if (ex is LocalException)
 1360            {
 11361                throw;
 1362            }
 1363            else
 1364            {
 01365                throw new SyscallException(ex);
 1366            }
 1367        }
 11368    }
 1369
 1370    private const int StateActive = 0;
 1371    private const int StateHolding = 1;
 1372    private const int StateClosed = 2;
 1373    private const int StateFinished = 3;
 1374
 1375    private void setState(int state)
 1376    {
 11377        if (_state == state) // Don't switch twice.
 1378        {
 11379            return;
 1380        }
 1381
 1382        switch (state)
 1383        {
 1384            case StateActive:
 1385            {
 11386                if (_state != StateHolding) // Can only switch from holding to active.
 1387                {
 01388                    return;
 1389                }
 1390
 11391                if (_acceptor is not null)
 1392                {
 11393                    if (_instance.traceLevels().network >= 1)
 1394                    {
 11395                        _instance.initializationData().logger!.trace(
 11396                            _instance.traceLevels().networkCat,
 11397                            $"accepting {_endpoint.protocol()} connections at {_acceptor}");
 1398                    }
 11399                    _adapter.getThreadPool().register(this, SocketOperation.Read);
 1400                }
 1401
 11402                foreach (ConnectionI connection in _connections)
 1403                {
 11404                    connection.activate();
 1405                }
 1406                break;
 1407            }
 1408
 1409            case StateHolding:
 1410            {
 11411                if (_state != StateActive) // Can only switch from active to holding.
 1412                {
 01413                    return;
 1414                }
 1415
 1416                // Stop accepting new connections.
 11417                if (_acceptor is not null)
 1418                {
 11419                    if (_instance.traceLevels().network >= 1)
 1420                    {
 11421                        _instance.initializationData().logger!.trace(
 11422                            _instance.traceLevels().networkCat,
 11423                            $"holding {_endpoint.protocol()} connections at {_acceptor}");
 1424                    }
 11425                    _adapter.getThreadPool().unregister(this, SocketOperation.Read);
 1426                }
 1427
 11428                foreach (ConnectionI connection in _connections)
 1429                {
 11430                    connection.hold();
 1431                }
 1432                break;
 1433            }
 1434
 1435            case StateClosed:
 1436            {
 11437                if (_acceptorStarted)
 1438                {
 11439                    _acceptorStarted = false;
 11440                    _adapter.getThreadPool().finish(this);
 11441                    closeAcceptor();
 1442                }
 1443                else
 1444                {
 11445                    state = StateFinished;
 1446                }
 1447
 11448                foreach (ConnectionI connection in _connections)
 1449                {
 11450                    connection.destroy(ConnectionI.ObjectAdapterDeactivated);
 1451                }
 1452                break;
 1453            }
 1454
 1455            case StateFinished:
 1456            {
 1457                Debug.Assert(_state == StateClosed);
 1458                break;
 1459            }
 1460        }
 1461
 11462        _state = state;
 11463        System.Threading.Monitor.PulseAll(_mutex);
 11464    }
 1465
 1466    private void createAcceptor()
 1467    {
 1468        try
 1469        {
 1470            Debug.Assert(!_acceptorStarted);
 11471            _acceptor = _endpoint.acceptor(_adapter.getName(), _adapter.getServerAuthenticationOptions());
 1472            Debug.Assert(_acceptor != null);
 1473
 11474            if (_instance.traceLevels().network >= 2)
 1475            {
 11476                var s = new StringBuilder("attempting to bind to ");
 11477                s.Append(_endpoint.protocol());
 11478                s.Append(" socket ");
 11479                s.Append(_acceptor.ToString());
 11480                _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1481            }
 11482            _endpoint = _acceptor.listen();
 1483
 11484            if (_instance.traceLevels().network >= 1)
 1485            {
 11486                var s = new StringBuilder("listening for ");
 11487                s.Append(_endpoint.protocol());
 11488                s.Append(" connections\n");
 11489                s.Append(_acceptor.toDetailedString());
 11490                _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1491            }
 1492
 11493            _adapter.getThreadPool().initialize(this);
 1494
 11495            if (_state == StateActive)
 1496            {
 01497                _adapter.getThreadPool().register(this, SocketOperation.Read);
 1498            }
 1499
 11500            _acceptorStarted = true;
 11501        }
 01502        catch (SystemException)
 1503        {
 01504            _acceptor?.close();
 01505            throw;
 1506        }
 11507    }
 1508
 1509    private void closeAcceptor()
 1510    {
 1511        Debug.Assert(_acceptor != null);
 1512
 11513        if (_instance.traceLevels().network >= 1)
 1514        {
 11515            var s = new StringBuilder("stopping to accept ");
 11516            s.Append(_endpoint.protocol());
 11517            s.Append(" connections at ");
 11518            s.Append(_acceptor.ToString());
 11519            _instance.initializationData().logger!.trace(_instance.traceLevels().networkCat, s.ToString());
 1520        }
 1521
 1522        Debug.Assert(!_acceptorStarted);
 11523        _acceptor.close();
 11524    }
 1525
 1526    private void removeConnection(ConnectionI connection)
 1527    {
 11528        lock (_mutex)
 1529        {
 11530            if (_state is StateActive or StateHolding)
 1531            {
 11532                _connections.Remove(connection);
 1533            }
 1534            // else it's already being cleaned up.
 11535        }
 11536    }
 1537
 1538    private readonly Instance _instance;
 1539    private readonly ConnectionOptions _connectionOptions;
 1540
 1541    private readonly int _maxConnections;
 1542
 1543    private Acceptor? _acceptor;
 1544    private readonly Transceiver? _transceiver;
 1545    private EndpointI _endpoint;
 1546    private readonly ObjectAdapter _adapter;
 1547
 1548    private readonly bool _warn;
 1549
 1550    private readonly HashSet<ConnectionI> _connections;
 1551
 1552    private int _state;
 1553    private bool _acceptorStarted;
 1554    private LocalException? _acceptorException;
 11555    private readonly object _mutex = new();
 1556}