< Summary

Information
Class: IceDiscovery.LookupI
Assembly: IceDiscovery
File(s): /_/csharp/src/IceDiscovery/LookupI.cs
Tag: 99_23991109993
Line coverage
67%
Covered lines: 112
Uncovered lines: 53
Coverable lines: 165
Total lines: 537
Line coverage: 67.8%
Branch coverage
69%
Covered branches: 47
Total branches: 68
Branch coverage: 69.1%
Method coverage
85%
Covered methods: 12
Total methods: 14
Method coverage: 85.7%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%22100%
setLookupReply(...)91.67%13.421278.57%
findObjectById(...)75%4.43470%
findAdapterById(...)75%4.43470%
findObject(...)100%4.3473.33%
findAdapter(...)100%4.3473.33%
foundObject(...)100%44100%
foundAdapter(...)100%66100%
objectRequestTimedOut(...)0%4260%
objectRequestException(...)0%7280%
adapterRequestTimedOut(...)66.67%6.35678.57%
adapterRequestException(...)75%8.01894.44%
timer()100%11100%
latencyMultiplier()100%11100%

File(s)

/_/csharp/src/IceDiscovery/LookupI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Text;
 5
 6namespace IceDiscovery;
 7
 8internal abstract class Request<T>
 9{
 10    protected Request(LookupI lookup, T id, int retryCount)
 11    {
 12        lookup_ = lookup;
 13        retryCount_ = retryCount;
 14        _id = id;
 15        _requestId = Guid.NewGuid().ToString();
 16    }
 17
 18    public T getId() => _id;
 19
 20    public bool addCallback(TaskCompletionSource<Ice.ObjectPrx> cb)
 21    {
 22        callbacks_.Add(cb);
 23        return callbacks_.Count == 1;
 24    }
 25
 26    public virtual bool retry() => --retryCount_ >= 0;
 27
 28    public void invoke(string domainId, Dictionary<LookupPrx, LookupReplyPrx> lookups)
 29    {
 30        _lookupCount = lookups.Count;
 31        _failureCount = 0;
 32        var id = new Ice.Identity(_requestId, "");
 33        foreach (KeyValuePair<LookupPrx, LookupReplyPrx> entry in lookups)
 34        {
 35            invokeWithLookup(
 36                domainId,
 37                entry.Key,
 38                LookupReplyPrxHelper.uncheckedCast(entry.Value.ice_identity(id)));
 39        }
 40    }
 41
 42    public bool exception()
 43    {
 44        if (++_failureCount == _lookupCount)
 45        {
 46            finished(null);
 47            return true;
 48        }
 49        return false;
 50    }
 51
 52    public string getRequestId() => _requestId;
 53
 54    public abstract void finished(Ice.ObjectPrx proxy);
 55
 56    protected abstract void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply);
 57
 58    private readonly string _requestId;
 59
 60    protected LookupI lookup_;
 61    protected int retryCount_;
 62    protected int _lookupCount;
 63    protected int _failureCount;
 64    protected List<TaskCompletionSource<Ice.ObjectPrx>> callbacks_ = new List<TaskCompletionSource<Ice.ObjectPrx>>();
 65
 66    protected T _id;
 67}
 68
 69internal class AdapterRequest : Request<string>, Ice.Internal.TimerTask
 70{
 71    public AdapterRequest(LookupI lookup, string id, int retryCount)
 72        : base(lookup, id, retryCount) => _start = DateTime.Now.Ticks;
 73
 74    public override bool retry() => _proxies.Count == 0 && --retryCount_ >= 0;
 75
 76    public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup)
 77    {
 78        if (isReplicaGroup)
 79        {
 80            _proxies.Add(proxy);
 81            if (_latency == 0)
 82            {
 83                _latency = (long)((DateTime.Now.Ticks - _start) * lookup_.latencyMultiplier() / 10000.0);
 84                if (_latency == 0)
 85                {
 86                    _latency = 1; // 1ms
 87                }
 88                lookup_.timer().cancel(this);
 89                lookup_.timer().schedule(this, _latency);
 90            }
 91            return false;
 92        }
 93        finished(proxy);
 94        return true;
 95    }
 96
 97    public override void finished(Ice.ObjectPrx proxy)
 98    {
 99        if (proxy != null || _proxies.Count == 0)
 100        {
 101            sendResponse(proxy);
 102        }
 103        else if (_proxies.Count == 1)
 104        {
 105            sendResponse(_proxies.First());
 106        }
 107        else
 108        {
 109            var endpoints = new List<Ice.Endpoint>();
 110            Ice.ObjectPrx result = null;
 111            foreach (Ice.ObjectPrx prx in _proxies)
 112            {
 113                result ??= prx;
 114                endpoints.AddRange(prx.ice_getEndpoints());
 115            }
 116            sendResponse(result.ice_endpoints(endpoints.ToArray()));
 117        }
 118    }
 119
 120    public void runTimerTask() => lookup_.adapterRequestTimedOut(this);
 121
 122    protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
 123    {
 124        lookup.findAdapterByIdAsync(domainId, _id, lookupReply).ContinueWith(
 125            task =>
 126            {
 127                try
 128                {
 129                    task.Wait();
 130                }
 131                catch (AggregateException ex)
 132                {
 133                    lookup_.adapterRequestException(this, ex.InnerException);
 134                }
 135            },
 136            lookup.ice_scheduler());
 137    }
 138
 139    private void sendResponse(Ice.ObjectPrx proxy)
 140    {
 141        foreach (TaskCompletionSource<Ice.ObjectPrx> cb in callbacks_)
 142        {
 143            cb.SetResult(proxy);
 144        }
 145        callbacks_.Clear();
 146    }
 147
 148    //
 149    // We use a HashSet because the same IceDiscovery plugin might return multiple times
 150    // the same proxy if it's accessible through multiple network interfaces and if we
 151    // also sent the request to multiple interfaces.
 152    //
 153    private readonly HashSet<Ice.ObjectPrx> _proxies = new HashSet<Ice.ObjectPrx>();
 154    private readonly long _start;
 155    private long _latency;
 156}
 157
 158internal class ObjectRequest : Request<Ice.Identity>, Ice.Internal.TimerTask
 159{
 160    public ObjectRequest(LookupI lookup, Ice.Identity id, int retryCount)
 161        : base(lookup, id, retryCount)
 162    {
 163    }
 164
 165    public void response(Ice.ObjectPrx proxy) => finished(proxy);
 166
 167    public override void finished(Ice.ObjectPrx proxy)
 168    {
 169        foreach (TaskCompletionSource<Ice.ObjectPrx> cb in callbacks_)
 170        {
 171            cb.SetResult(proxy);
 172        }
 173        callbacks_.Clear();
 174    }
 175
 176    public void runTimerTask() => lookup_.objectRequestTimedOut(this);
 177
 178    protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
 179    {
 180        lookup.findObjectByIdAsync(domainId, _id, lookupReply).ContinueWith(
 181            task =>
 182            {
 183                try
 184                {
 185                    task.Wait();
 186                }
 187                catch (AggregateException ex)
 188                {
 189                    lookup_.objectRequestException(this, ex.InnerException);
 190                }
 191            },
 192            lookup.ice_scheduler());
 193    }
 194}
 195
 196internal class LookupI : LookupDisp_
 197{
 1198    public LookupI(LocatorRegistryI registry, LookupPrx lookup, Ice.Properties properties)
 199    {
 1200        _registry = registry;
 1201        _lookup = lookup;
 1202        _timeout = properties.getIcePropertyAsInt("IceDiscovery.Timeout");
 1203        _retryCount = properties.getIcePropertyAsInt("IceDiscovery.RetryCount");
 1204        _latencyMultiplier = properties.getIcePropertyAsInt("IceDiscovery.LatencyMultiplier");
 1205        _domainId = properties.getIceProperty("IceDiscovery.DomainId");
 1206        _timer = Ice.Internal.Util.getInstance(lookup.ice_getCommunicator()).timer();
 207
 208        //
 209        // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
 210        // datagram on each endpoint.
 211        //
 1212        var single = new Ice.Endpoint[1];
 1213        foreach (Ice.Endpoint endpt in lookup.ice_getEndpoints())
 214        {
 1215            single[0] = endpt;
 1216            _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null;
 217        }
 218        Debug.Assert(_lookups.Count > 0);
 1219    }
 220
 221    public void setLookupReply(LookupReplyPrx lookupReply)
 222    {
 223        //
 224        // Use a lookup reply proxy whose address matches the interface used to send multicast datagrams.
 225        //
 1226        var single = new Ice.Endpoint[1];
 1227        foreach (LookupPrx key in new List<LookupPrx>(_lookups.Keys))
 228        {
 1229            var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo();
 1230            if (info.mcastInterface.Length > 0)
 231            {
 1232                foreach (Ice.Endpoint q in lookupReply.ice_getEndpoints())
 233                {
 1234                    Ice.EndpointInfo r = q.getInfo();
 1235                    if (r is Ice.IPEndpointInfo &&
 1236                        ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface, StringComparison.Ordinal))
 237                    {
 0238                        single[0] = q;
 0239                        _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single);
 0240                        break;
 241                    }
 242                }
 243            }
 244
 1245            if (_lookups[key] == null)
 246            {
 247                // Fallback: just use the given lookup reply proxy if no matching endpoint found.
 1248                _lookups[key] = lookupReply;
 249            }
 250        }
 1251    }
 252
 253    public override void findObjectById(string domainId, Ice.Identity id, LookupReplyPrx reply, Ice.Current current)
 254    {
 1255        Ice.ObjectPrx.checkNotNull(reply, current);
 1256        if (!domainId.Equals(_domainId, StringComparison.Ordinal))
 257        {
 0258            return; // Ignore
 259        }
 260
 1261        Ice.ObjectPrx proxy = _registry.findObject(id);
 1262        if (proxy != null)
 263        {
 264            //
 265            // Reply to the multicast request using the given proxy.
 266            //
 267            try
 268            {
 1269                reply.foundObjectByIdAsync(id, proxy);
 1270            }
 0271            catch (Ice.LocalException)
 272            {
 273                // Ignore.
 0274            }
 275        }
 1276    }
 277
 278    public override void findAdapterById(string domainId, string adapterId, LookupReplyPrx reply, Ice.Current current)
 279    {
 1280        Ice.ObjectPrx.checkNotNull(reply, current);
 1281        if (!domainId.Equals(_domainId, StringComparison.Ordinal))
 282        {
 0283            return; // Ignore
 284        }
 285
 1286        Ice.ObjectPrx proxy = _registry.findAdapter(adapterId, out bool isReplicaGroup);
 1287        if (proxy != null)
 288        {
 289            //
 290            // Reply to the multicast request using the given proxy.
 291            //
 292            try
 293            {
 1294                reply.foundAdapterByIdAsync(adapterId, proxy, isReplicaGroup);
 1295            }
 0296            catch (Ice.LocalException)
 297            {
 298                // Ignore.
 0299            }
 300        }
 1301    }
 302
 303    internal Task<Ice.ObjectPrx> findObject(Ice.Identity id)
 304    {
 1305        lock (_mutex)
 306        {
 1307            if (!_objectRequests.TryGetValue(id, out ObjectRequest request))
 308            {
 1309                request = new ObjectRequest(this, id, _retryCount);
 1310                _objectRequests.Add(id, request);
 311            }
 312
 1313            var task = new TaskCompletionSource<Ice.ObjectPrx>(TaskCreationOptions.RunContinuationsAsynchronously);
 1314            if (request.addCallback(task))
 315            {
 316                try
 317                {
 1318                    request.invoke(_domainId, _lookups);
 1319                    _timer.schedule(request, _timeout);
 1320                }
 0321                catch (Ice.LocalException)
 322                {
 0323                    request.finished(null);
 0324                    _objectRequests.Remove(id);
 0325                }
 326            }
 1327            return task.Task;
 328        }
 1329    }
 330
 331    internal Task<Ice.ObjectPrx> findAdapter(string adapterId)
 332    {
 1333        lock (_mutex)
 334        {
 1335            if (!_adapterRequests.TryGetValue(adapterId, out AdapterRequest request))
 336            {
 1337                request = new AdapterRequest(this, adapterId, _retryCount);
 1338                _adapterRequests.Add(adapterId, request);
 339            }
 340
 1341            var task = new TaskCompletionSource<Ice.ObjectPrx>(TaskCreationOptions.RunContinuationsAsynchronously);
 1342            if (request.addCallback(task))
 343            {
 344                try
 345                {
 1346                    request.invoke(_domainId, _lookups);
 1347                    _timer.schedule(request, _timeout);
 1348                }
 0349                catch (Ice.LocalException)
 350                {
 0351                    request.finished(null);
 0352                    _adapterRequests.Remove(adapterId);
 0353                }
 354            }
 1355            return task.Task;
 356        }
 1357    }
 358
 359    internal void foundObject(Ice.Identity id, string requestId, Ice.ObjectPrx proxy)
 360    {
 1361        lock (_mutex)
 362        {
 1363            if (_objectRequests.TryGetValue(id, out ObjectRequest request) && request.getRequestId() == requestId)
 364            {
 1365                request.response(proxy);
 1366                _timer.cancel(request);
 1367                _objectRequests.Remove(id);
 368            }
 369            // else ignore responses from old requests
 1370        }
 1371    }
 372
 373    internal void foundAdapter(string adapterId, string requestId, Ice.ObjectPrx proxy, bool isReplicaGroup)
 374    {
 1375        lock (_mutex)
 376        {
 1377            if (
 1378                _adapterRequests.TryGetValue(adapterId, out AdapterRequest request) &&
 1379                request.getRequestId() == requestId)
 380            {
 1381                if (request.response(proxy, isReplicaGroup))
 382                {
 1383                    _timer.cancel(request);
 1384                    _adapterRequests.Remove(request.getId());
 385                }
 386            }
 387            // else ignore responses from old requests
 1388        }
 1389    }
 390
 391    internal void objectRequestTimedOut(ObjectRequest request)
 392    {
 0393        lock (_mutex)
 394        {
 0395            if (!_objectRequests.TryGetValue(request.getId(), out ObjectRequest r) || r != request)
 396            {
 0397                return;
 398            }
 399
 0400            if (request.retry())
 401            {
 402                try
 403                {
 0404                    request.invoke(_domainId, _lookups);
 0405                    _timer.schedule(request, _timeout);
 0406                    return;
 407                }
 0408                catch (Ice.LocalException)
 409                {
 0410                }
 411            }
 412
 0413            request.finished(null);
 0414            _objectRequests.Remove(request.getId());
 0415            _timer.cancel(request);
 0416        }
 0417    }
 418
 419    internal void objectRequestException(ObjectRequest request, Exception ex)
 420    {
 0421        lock (_mutex)
 422        {
 0423            if (!_objectRequests.TryGetValue(request.getId(), out ObjectRequest r) || r != request)
 424            {
 0425                return;
 426            }
 427
 0428            if (request.exception())
 429            {
 0430                if (_warnOnce)
 431                {
 0432                    var s = new StringBuilder();
 0433                    s.Append("failed to lookup object `");
 0434                    s.Append(_lookup.ice_getCommunicator().identityToString(request.getId()));
 0435                    s.Append("' with lookup proxy `");
 0436                    s.Append(_lookup);
 0437                    s.Append("':\n");
 0438                    s.Append(ex.ToString());
 0439                    _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
 0440                    _warnOnce = false;
 441                }
 0442                _timer.cancel(request);
 0443                _objectRequests.Remove(request.getId());
 444            }
 0445        }
 0446    }
 447
 448    internal void adapterRequestTimedOut(AdapterRequest request)
 449    {
 1450        lock (_mutex)
 451        {
 1452            if (!_adapterRequests.TryGetValue(request.getId(), out AdapterRequest r) || r != request)
 453            {
 0454                return;
 455            }
 456
 1457            if (request.retry())
 458            {
 459                try
 460                {
 1461                    request.invoke(_domainId, _lookups);
 1462                    _timer.schedule(request, _timeout);
 1463                    return;
 464                }
 0465                catch (Ice.LocalException)
 466                {
 0467                }
 468            }
 469
 1470            request.finished(null);
 1471            _adapterRequests.Remove(request.getId());
 1472            _timer.cancel(request);
 1473        }
 1474    }
 475
 476    internal void adapterRequestException(AdapterRequest request, Exception ex)
 477    {
 1478        lock (_mutex)
 479        {
 1480            if (!_adapterRequests.TryGetValue(request.getId(), out AdapterRequest r) || r != request)
 481            {
 0482                return;
 483            }
 484
 1485            if (request.exception())
 486            {
 1487                if (_warnOnce)
 488                {
 1489                    var s = new StringBuilder();
 1490                    s.Append("failed to lookup adapter `");
 1491                    s.Append(request.getId());
 1492                    s.Append("' with lookup proxy `");
 1493                    s.Append(_lookup);
 1494                    s.Append("':\n");
 1495                    s.Append(ex.ToString());
 1496                    _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
 1497                    _warnOnce = false;
 498                }
 1499                _timer.cancel(request);
 1500                _adapterRequests.Remove(request.getId());
 501            }
 1502        }
 1503    }
 504
 1505    internal Ice.Internal.Timer timer() => _timer;
 506
 1507    internal int latencyMultiplier() => _latencyMultiplier;
 508
 509    private readonly LocatorRegistryI _registry;
 510    private readonly LookupPrx _lookup;
 1511    private readonly Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>();
 512    private readonly int _timeout;
 513    private readonly int _retryCount;
 514    private readonly int _latencyMultiplier;
 515    private readonly string _domainId;
 516
 517    private readonly Ice.Internal.Timer _timer;
 1518    private bool _warnOnce = true;
 1519    private readonly Dictionary<Ice.Identity, ObjectRequest> _objectRequests =
 1520        new Dictionary<Ice.Identity, ObjectRequest>();
 521
 1522    private readonly Dictionary<string, AdapterRequest> _adapterRequests = new Dictionary<string, AdapterRequest>();
 1523    private readonly object _mutex = new();
 524}
 525
 526internal class LookupReplyI : LookupReplyDisp_
 527{
 528    public LookupReplyI(LookupI lookup) => _lookup = lookup;
 529
 530    public override void foundObjectById(Ice.Identity id, Ice.ObjectPrx proxy, Ice.Current c) =>
 531        _lookup.foundObject(id, c.id.name, proxy);
 532
 533    public override void foundAdapterById(string adapterId, Ice.ObjectPrx proxy, bool isReplicaGroup, Ice.Current c) =>
 534        _lookup.foundAdapter(adapterId, c.id.name, proxy, isReplicaGroup);
 535
 536    private readonly LookupI _lookup;
 537}