< Summary

Information
Class: IceDiscovery.Request<T>
Assembly: IceDiscovery
File(s): /home/runner/work/ice/ice/csharp/src/IceDiscovery/LookupI.cs
Tag: 71_18251537082
Line coverage
96%
Covered lines: 24
Uncovered lines: 1
Coverable lines: 25
Total lines: 534
Line coverage: 96%
Branch coverage
100%
Covered branches: 4
Total branches: 4
Branch coverage: 100%
Method coverage
85%
Covered methods: 6
Total methods: 7
Method coverage: 85.7%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
getId()100%11100%
addCallback(...)100%11100%
retry()100%210%
invoke(...)100%22100%
exception()100%22100%
getRequestId()100%11100%

File(s)

/home/runner/work/ice/ice/csharp/src/IceDiscovery/LookupI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Text;
 5
 6namespace IceDiscovery;
 7
 8internal abstract class Request<T>
 9{
 110    protected Request(LookupI lookup, T id, int retryCount)
 11    {
 112        lookup_ = lookup;
 113        retryCount_ = retryCount;
 114        _id = id;
 115        _requestId = Guid.NewGuid().ToString();
 116    }
 17
 118    public T getId() => _id;
 19
 20    public bool addCallback(TaskCompletionSource<Ice.ObjectPrx> cb)
 21    {
 122        callbacks_.Add(cb);
 123        return callbacks_.Count == 1;
 24    }
 25
 026    public virtual bool retry() => --retryCount_ >= 0;
 27
 28    public void invoke(string domainId, Dictionary<LookupPrx, LookupReplyPrx> lookups)
 29    {
 130        _lookupCount = lookups.Count;
 131        _failureCount = 0;
 132        var id = new Ice.Identity(_requestId, "");
 133        foreach (KeyValuePair<LookupPrx, LookupReplyPrx> entry in lookups)
 34        {
 135            invokeWithLookup(
 136                domainId,
 137                entry.Key,
 138                LookupReplyPrxHelper.uncheckedCast(entry.Value.ice_identity(id)));
 39        }
 140    }
 41
 42    public bool exception()
 43    {
 144        if (++_failureCount == _lookupCount)
 45        {
 146            finished(null);
 147            return true;
 48        }
 149        return false;
 50    }
 51
 152    public string getRequestId() => _requestId;
 53
 54    public abstract void finished(Ice.ObjectPrx proxy);
 55
 56    protected abstract void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply);
 57
 58    private readonly string _requestId;
 59
 60    protected LookupI lookup_;
 61    protected int retryCount_;
 62    protected int _lookupCount;
 63    protected int _failureCount;
 164    protected List<TaskCompletionSource<Ice.ObjectPrx>> callbacks_ = new List<TaskCompletionSource<Ice.ObjectPrx>>();
 65
 66    protected T _id;
 67}
 68
 69internal class AdapterRequest : Request<string>, Ice.Internal.TimerTask
 70{
 71    public AdapterRequest(LookupI lookup, string id, int retryCount)
 72        : base(lookup, id, retryCount) => _start = DateTime.Now.Ticks;
 73
 74    public override bool retry() => _proxies.Count == 0 && --retryCount_ >= 0;
 75
 76    public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup)
 77    {
 78        if (isReplicaGroup)
 79        {
 80            _proxies.Add(proxy);
 81            if (_latency == 0)
 82            {
 83                _latency = (long)((DateTime.Now.Ticks - _start) * lookup_.latencyMultiplier() / 10000.0);
 84                if (_latency == 0)
 85                {
 86                    _latency = 1; // 1ms
 87                }
 88                lookup_.timer().cancel(this);
 89                lookup_.timer().schedule(this, _latency);
 90            }
 91            return false;
 92        }
 93        finished(proxy);
 94        return true;
 95    }
 96
 97    public override void finished(Ice.ObjectPrx proxy)
 98    {
 99        if (proxy != null || _proxies.Count == 0)
 100        {
 101            sendResponse(proxy);
 102        }
 103        else if (_proxies.Count == 1)
 104        {
 105            sendResponse(_proxies.First());
 106        }
 107        else
 108        {
 109            var endpoints = new List<Ice.Endpoint>();
 110            Ice.ObjectPrx result = null;
 111            foreach (Ice.ObjectPrx prx in _proxies)
 112            {
 113                result ??= prx;
 114                endpoints.AddRange(prx.ice_getEndpoints());
 115            }
 116            sendResponse(result.ice_endpoints(endpoints.ToArray()));
 117        }
 118    }
 119
 120    public void runTimerTask() => lookup_.adapterRequestTimedOut(this);
 121
 122    protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
 123    {
 124        lookup.findAdapterByIdAsync(domainId, _id, lookupReply).ContinueWith(
 125            task =>
 126            {
 127                try
 128                {
 129                    task.Wait();
 130                }
 131                catch (AggregateException ex)
 132                {
 133                    lookup_.adapterRequestException(this, ex.InnerException);
 134                }
 135            },
 136            lookup.ice_scheduler());
 137    }
 138
 139    private void sendResponse(Ice.ObjectPrx proxy)
 140    {
 141        foreach (TaskCompletionSource<Ice.ObjectPrx> cb in callbacks_)
 142        {
 143            cb.SetResult(proxy);
 144        }
 145        callbacks_.Clear();
 146    }
 147
 148    //
 149    // We use a HashSet because the same IceDiscovery plugin might return multiple times
 150    // the same proxy if it's accessible through multiple network interfaces and if we
 151    // also sent the request to multiple interfaces.
 152    //
 153    private readonly HashSet<Ice.ObjectPrx> _proxies = new HashSet<Ice.ObjectPrx>();
 154    private readonly long _start;
 155    private long _latency;
 156}
 157
 158internal class ObjectRequest : Request<Ice.Identity>, Ice.Internal.TimerTask
 159{
 160    public ObjectRequest(LookupI lookup, Ice.Identity id, int retryCount)
 161        : base(lookup, id, retryCount)
 162    {
 163    }
 164
 165    public void response(Ice.ObjectPrx proxy) => finished(proxy);
 166
 167    public override void finished(Ice.ObjectPrx proxy)
 168    {
 169        foreach (TaskCompletionSource<Ice.ObjectPrx> cb in callbacks_)
 170        {
 171            cb.SetResult(proxy);
 172        }
 173        callbacks_.Clear();
 174    }
 175
 176    public void runTimerTask() => lookup_.objectRequestTimedOut(this);
 177
 178    protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
 179    {
 180        lookup.findObjectByIdAsync(domainId, _id, lookupReply).ContinueWith(
 181            task =>
 182            {
 183                try
 184                {
 185                    task.Wait();
 186                }
 187                catch (AggregateException ex)
 188                {
 189                    lookup_.objectRequestException(this, ex.InnerException);
 190                }
 191            },
 192            lookup.ice_scheduler());
 193    }
 194}
 195
 196internal class LookupI : LookupDisp_
 197{
 198    public LookupI(LocatorRegistryI registry, LookupPrx lookup, Ice.Properties properties)
 199    {
 200        _registry = registry;
 201        _lookup = lookup;
 202        _timeout = properties.getIcePropertyAsInt("IceDiscovery.Timeout");
 203        _retryCount = properties.getIcePropertyAsInt("IceDiscovery.RetryCount");
 204        _latencyMultiplier = properties.getIcePropertyAsInt("IceDiscovery.LatencyMultiplier");
 205        _domainId = properties.getIceProperty("IceDiscovery.DomainId");
 206        _timer = Ice.Internal.Util.getInstance(lookup.ice_getCommunicator()).timer();
 207
 208        //
 209        // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
 210        // datagram on each endpoint.
 211        //
 212        var single = new Ice.Endpoint[1];
 213        foreach (Ice.Endpoint endpt in lookup.ice_getEndpoints())
 214        {
 215            single[0] = endpt;
 216            _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null;
 217        }
 218        Debug.Assert(_lookups.Count > 0);
 219    }
 220
 221    public void setLookupReply(LookupReplyPrx lookupReply)
 222    {
 223        //
 224        // Use a lookup reply proxy whose address matches the interface used to send multicast datagrams.
 225        //
 226        var single = new Ice.Endpoint[1];
 227        foreach (LookupPrx key in new List<LookupPrx>(_lookups.Keys))
 228        {
 229            var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo();
 230            if (info.mcastInterface.Length > 0)
 231            {
 232                foreach (Ice.Endpoint q in lookupReply.ice_getEndpoints())
 233                {
 234                    Ice.EndpointInfo r = q.getInfo();
 235                    if (r is Ice.IPEndpointInfo &&
 236                        ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface, StringComparison.Ordinal))
 237                    {
 238                        single[0] = q;
 239                        _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single);
 240                    }
 241                }
 242            }
 243
 244            if (_lookups[key] == null)
 245            {
 246                // Fallback: just use the given lookup reply proxy if no matching endpoint found.
 247                _lookups[key] = lookupReply;
 248            }
 249        }
 250    }
 251
 252    public override void findObjectById(string domainId, Ice.Identity id, LookupReplyPrx reply, Ice.Current current)
 253    {
 254        if (!domainId.Equals(_domainId, StringComparison.Ordinal))
 255        {
 256            return; // Ignore
 257        }
 258
 259        Ice.ObjectPrx proxy = _registry.findObject(id);
 260        if (proxy != null)
 261        {
 262            //
 263            // Reply to the multicast request using the given proxy.
 264            //
 265            try
 266            {
 267                reply.foundObjectByIdAsync(id, proxy);
 268            }
 269            catch (Ice.LocalException)
 270            {
 271                // Ignore.
 272            }
 273        }
 274    }
 275
 276    public override void findAdapterById(string domainId, string adapterId, LookupReplyPrx reply, Ice.Current current)
 277    {
 278        if (!domainId.Equals(_domainId, StringComparison.Ordinal))
 279        {
 280            return; // Ignore
 281        }
 282
 283        Ice.ObjectPrx proxy = _registry.findAdapter(adapterId, out bool isReplicaGroup);
 284        if (proxy != null)
 285        {
 286            //
 287            // Reply to the multicast request using the given proxy.
 288            //
 289            try
 290            {
 291                reply.foundAdapterByIdAsync(adapterId, proxy, isReplicaGroup);
 292            }
 293            catch (Ice.LocalException)
 294            {
 295                // Ignore.
 296            }
 297        }
 298    }
 299
 300    internal Task<Ice.ObjectPrx> findObject(Ice.Identity id)
 301    {
 302        lock (_mutex)
 303        {
 304            if (!_objectRequests.TryGetValue(id, out ObjectRequest request))
 305            {
 306                request = new ObjectRequest(this, id, _retryCount);
 307                _objectRequests.Add(id, request);
 308            }
 309
 310            var task = new TaskCompletionSource<Ice.ObjectPrx>(TaskCreationOptions.RunContinuationsAsynchronously);
 311            if (request.addCallback(task))
 312            {
 313                try
 314                {
 315                    request.invoke(_domainId, _lookups);
 316                    _timer.schedule(request, _timeout);
 317                }
 318                catch (Ice.LocalException)
 319                {
 320                    request.finished(null);
 321                    _objectRequests.Remove(id);
 322                }
 323            }
 324            return task.Task;
 325        }
 326    }
 327
 328    internal Task<Ice.ObjectPrx> findAdapter(string adapterId)
 329    {
 330        lock (_mutex)
 331        {
 332            if (!_adapterRequests.TryGetValue(adapterId, out AdapterRequest request))
 333            {
 334                request = new AdapterRequest(this, adapterId, _retryCount);
 335                _adapterRequests.Add(adapterId, request);
 336            }
 337
 338            var task = new TaskCompletionSource<Ice.ObjectPrx>(TaskCreationOptions.RunContinuationsAsynchronously);
 339            if (request.addCallback(task))
 340            {
 341                try
 342                {
 343                    request.invoke(_domainId, _lookups);
 344                    _timer.schedule(request, _timeout);
 345                }
 346                catch (Ice.LocalException)
 347                {
 348                    request.finished(null);
 349                    _adapterRequests.Remove(adapterId);
 350                }
 351            }
 352            return task.Task;
 353        }
 354    }
 355
 356    internal void foundObject(Ice.Identity id, string requestId, Ice.ObjectPrx proxy)
 357    {
 358        lock (_mutex)
 359        {
 360            if (_objectRequests.TryGetValue(id, out ObjectRequest request) && request.getRequestId() == requestId)
 361            {
 362                request.response(proxy);
 363                _timer.cancel(request);
 364                _objectRequests.Remove(id);
 365            }
 366            // else ignore responses from old requests
 367        }
 368    }
 369
 370    internal void foundAdapter(string adapterId, string requestId, Ice.ObjectPrx proxy, bool isReplicaGroup)
 371    {
 372        lock (_mutex)
 373        {
 374            if (
 375                _adapterRequests.TryGetValue(adapterId, out AdapterRequest request) &&
 376                request.getRequestId() == requestId)
 377            {
 378                if (request.response(proxy, isReplicaGroup))
 379                {
 380                    _timer.cancel(request);
 381                    _adapterRequests.Remove(request.getId());
 382                }
 383            }
 384            // else ignore responses from old requests
 385        }
 386    }
 387
 388    internal void objectRequestTimedOut(ObjectRequest request)
 389    {
 390        lock (_mutex)
 391        {
 392            if (!_objectRequests.TryGetValue(request.getId(), out ObjectRequest r) || r != request)
 393            {
 394                return;
 395            }
 396
 397            if (request.retry())
 398            {
 399                try
 400                {
 401                    request.invoke(_domainId, _lookups);
 402                    _timer.schedule(request, _timeout);
 403                    return;
 404                }
 405                catch (Ice.LocalException)
 406                {
 407                }
 408            }
 409
 410            request.finished(null);
 411            _objectRequests.Remove(request.getId());
 412            _timer.cancel(request);
 413        }
 414    }
 415
 416    internal void objectRequestException(ObjectRequest request, Exception ex)
 417    {
 418        lock (_mutex)
 419        {
 420            if (!_objectRequests.TryGetValue(request.getId(), out ObjectRequest r) || r != request)
 421            {
 422                return;
 423            }
 424
 425            if (request.exception())
 426            {
 427                if (_warnOnce)
 428                {
 429                    var s = new StringBuilder();
 430                    s.Append("failed to lookup object `");
 431                    s.Append(_lookup.ice_getCommunicator().identityToString(request.getId()));
 432                    s.Append("' with lookup proxy `");
 433                    s.Append(_lookup);
 434                    s.Append("':\n");
 435                    s.Append(ex.ToString());
 436                    _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
 437                    _warnOnce = false;
 438                }
 439                _timer.cancel(request);
 440                _objectRequests.Remove(request.getId());
 441            }
 442        }
 443    }
 444
 445    internal void adapterRequestTimedOut(AdapterRequest request)
 446    {
 447        lock (_mutex)
 448        {
 449            if (!_adapterRequests.TryGetValue(request.getId(), out AdapterRequest r) || r != request)
 450            {
 451                return;
 452            }
 453
 454            if (request.retry())
 455            {
 456                try
 457                {
 458                    request.invoke(_domainId, _lookups);
 459                    _timer.schedule(request, _timeout);
 460                    return;
 461                }
 462                catch (Ice.LocalException)
 463                {
 464                }
 465            }
 466
 467            request.finished(null);
 468            _adapterRequests.Remove(request.getId());
 469            _timer.cancel(request);
 470        }
 471    }
 472
 473    internal void adapterRequestException(AdapterRequest request, Exception ex)
 474    {
 475        lock (_mutex)
 476        {
 477            if (!_adapterRequests.TryGetValue(request.getId(), out AdapterRequest r) || r != request)
 478            {
 479                return;
 480            }
 481
 482            if (request.exception())
 483            {
 484                if (_warnOnce)
 485                {
 486                    var s = new StringBuilder();
 487                    s.Append("failed to lookup adapter `");
 488                    s.Append(request.getId());
 489                    s.Append("' with lookup proxy `");
 490                    s.Append(_lookup);
 491                    s.Append("':\n");
 492                    s.Append(ex.ToString());
 493                    _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
 494                    _warnOnce = false;
 495                }
 496                _timer.cancel(request);
 497                _adapterRequests.Remove(request.getId());
 498            }
 499        }
 500    }
 501
 502    internal Ice.Internal.Timer timer() => _timer;
 503
 504    internal int latencyMultiplier() => _latencyMultiplier;
 505
 506    private readonly LocatorRegistryI _registry;
 507    private readonly LookupPrx _lookup;
 508    private readonly Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>();
 509    private readonly int _timeout;
 510    private readonly int _retryCount;
 511    private readonly int _latencyMultiplier;
 512    private readonly string _domainId;
 513
 514    private readonly Ice.Internal.Timer _timer;
 515    private bool _warnOnce = true;
 516    private readonly Dictionary<Ice.Identity, ObjectRequest> _objectRequests =
 517        new Dictionary<Ice.Identity, ObjectRequest>();
 518
 519    private readonly Dictionary<string, AdapterRequest> _adapterRequests = new Dictionary<string, AdapterRequest>();
 520    private readonly object _mutex = new();
 521}
 522
 523internal class LookupReplyI : LookupReplyDisp_
 524{
 525    public LookupReplyI(LookupI lookup) => _lookup = lookup;
 526
 527    public override void foundObjectById(Ice.Identity id, Ice.ObjectPrx proxy, Ice.Current c) =>
 528        _lookup.foundObject(id, c.id.name, proxy);
 529
 530    public override void foundAdapterById(string adapterId, Ice.ObjectPrx proxy, bool isReplicaGroup, Ice.Current c) =>
 531        _lookup.foundAdapter(adapterId, c.id.name, proxy, isReplicaGroup);
 532
 533    private readonly LookupI _lookup;
 534}