< Summary

Information
Class: IceDiscovery.AdapterRequest
Assembly: IceDiscovery
File(s): /_/csharp/src/IceDiscovery/LookupI.cs
Tag: 99_23991109993
Line coverage
100%
Covered lines: 44
Uncovered lines: 0
Coverable lines: 44
Total lines: 537
Line coverage: 100%
Branch coverage
100%
Covered branches: 20
Total branches: 20
Branch coverage: 100%
Method coverage
100%
Covered methods: 7
Total methods: 7
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
retry()100%22100%
response(...)100%66100%
finished(...)100%1010100%
runTimerTask()100%11100%
invokeWithLookup(...)100%11100%
sendResponse(...)100%22100%

File(s)

/_/csharp/src/IceDiscovery/LookupI.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.Text;
 5
 6namespace IceDiscovery;
 7
 8internal abstract class Request<T>
 9{
 10    protected Request(LookupI lookup, T id, int retryCount)
 11    {
 12        lookup_ = lookup;
 13        retryCount_ = retryCount;
 14        _id = id;
 15        _requestId = Guid.NewGuid().ToString();
 16    }
 17
 18    public T getId() => _id;
 19
 20    public bool addCallback(TaskCompletionSource<Ice.ObjectPrx> cb)
 21    {
 22        callbacks_.Add(cb);
 23        return callbacks_.Count == 1;
 24    }
 25
 26    public virtual bool retry() => --retryCount_ >= 0;
 27
 28    public void invoke(string domainId, Dictionary<LookupPrx, LookupReplyPrx> lookups)
 29    {
 30        _lookupCount = lookups.Count;
 31        _failureCount = 0;
 32        var id = new Ice.Identity(_requestId, "");
 33        foreach (KeyValuePair<LookupPrx, LookupReplyPrx> entry in lookups)
 34        {
 35            invokeWithLookup(
 36                domainId,
 37                entry.Key,
 38                LookupReplyPrxHelper.uncheckedCast(entry.Value.ice_identity(id)));
 39        }
 40    }
 41
 42    public bool exception()
 43    {
 44        if (++_failureCount == _lookupCount)
 45        {
 46            finished(null);
 47            return true;
 48        }
 49        return false;
 50    }
 51
 52    public string getRequestId() => _requestId;
 53
 54    public abstract void finished(Ice.ObjectPrx proxy);
 55
 56    protected abstract void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply);
 57
 58    private readonly string _requestId;
 59
 60    protected LookupI lookup_;
 61    protected int retryCount_;
 62    protected int _lookupCount;
 63    protected int _failureCount;
 64    protected List<TaskCompletionSource<Ice.ObjectPrx>> callbacks_ = new List<TaskCompletionSource<Ice.ObjectPrx>>();
 65
 66    protected T _id;
 67}
 68
 69internal class AdapterRequest : Request<string>, Ice.Internal.TimerTask
 70{
 71    public AdapterRequest(LookupI lookup, string id, int retryCount)
 172        : base(lookup, id, retryCount) => _start = DateTime.Now.Ticks;
 73
 174    public override bool retry() => _proxies.Count == 0 && --retryCount_ >= 0;
 75
 76    public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup)
 77    {
 178        if (isReplicaGroup)
 79        {
 180            _proxies.Add(proxy);
 181            if (_latency == 0)
 82            {
 183                _latency = (long)((DateTime.Now.Ticks - _start) * lookup_.latencyMultiplier() / 10000.0);
 184                if (_latency == 0)
 85                {
 186                    _latency = 1; // 1ms
 87                }
 188                lookup_.timer().cancel(this);
 189                lookup_.timer().schedule(this, _latency);
 90            }
 191            return false;
 92        }
 193        finished(proxy);
 194        return true;
 95    }
 96
 97    public override void finished(Ice.ObjectPrx proxy)
 98    {
 199        if (proxy != null || _proxies.Count == 0)
 100        {
 1101            sendResponse(proxy);
 102        }
 1103        else if (_proxies.Count == 1)
 104        {
 1105            sendResponse(_proxies.First());
 106        }
 107        else
 108        {
 1109            var endpoints = new List<Ice.Endpoint>();
 1110            Ice.ObjectPrx result = null;
 1111            foreach (Ice.ObjectPrx prx in _proxies)
 112            {
 1113                result ??= prx;
 1114                endpoints.AddRange(prx.ice_getEndpoints());
 115            }
 1116            sendResponse(result.ice_endpoints(endpoints.ToArray()));
 117        }
 1118    }
 119
 1120    public void runTimerTask() => lookup_.adapterRequestTimedOut(this);
 121
 122    protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
 123    {
 1124        lookup.findAdapterByIdAsync(domainId, _id, lookupReply).ContinueWith(
 1125            task =>
 1126            {
 1127                try
 1128                {
 1129                    task.Wait();
 1130                }
 1131                catch (AggregateException ex)
 1132                {
 1133                    lookup_.adapterRequestException(this, ex.InnerException);
 1134                }
 1135            },
 1136            lookup.ice_scheduler());
 1137    }
 138
 139    private void sendResponse(Ice.ObjectPrx proxy)
 140    {
 1141        foreach (TaskCompletionSource<Ice.ObjectPrx> cb in callbacks_)
 142        {
 1143            cb.SetResult(proxy);
 144        }
 1145        callbacks_.Clear();
 1146    }
 147
 148    //
 149    // We use a HashSet because the same IceDiscovery plugin might return multiple times
 150    // the same proxy if it's accessible through multiple network interfaces and if we
 151    // also sent the request to multiple interfaces.
 152    //
 1153    private readonly HashSet<Ice.ObjectPrx> _proxies = new HashSet<Ice.ObjectPrx>();
 154    private readonly long _start;
 155    private long _latency;
 156}
 157
 158internal class ObjectRequest : Request<Ice.Identity>, Ice.Internal.TimerTask
 159{
 160    public ObjectRequest(LookupI lookup, Ice.Identity id, int retryCount)
 161        : base(lookup, id, retryCount)
 162    {
 163    }
 164
 165    public void response(Ice.ObjectPrx proxy) => finished(proxy);
 166
 167    public override void finished(Ice.ObjectPrx proxy)
 168    {
 169        foreach (TaskCompletionSource<Ice.ObjectPrx> cb in callbacks_)
 170        {
 171            cb.SetResult(proxy);
 172        }
 173        callbacks_.Clear();
 174    }
 175
 176    public void runTimerTask() => lookup_.objectRequestTimedOut(this);
 177
 178    protected override void invokeWithLookup(string domainId, LookupPrx lookup, LookupReplyPrx lookupReply)
 179    {
 180        lookup.findObjectByIdAsync(domainId, _id, lookupReply).ContinueWith(
 181            task =>
 182            {
 183                try
 184                {
 185                    task.Wait();
 186                }
 187                catch (AggregateException ex)
 188                {
 189                    lookup_.objectRequestException(this, ex.InnerException);
 190                }
 191            },
 192            lookup.ice_scheduler());
 193    }
 194}
 195
 196internal class LookupI : LookupDisp_
 197{
 198    public LookupI(LocatorRegistryI registry, LookupPrx lookup, Ice.Properties properties)
 199    {
 200        _registry = registry;
 201        _lookup = lookup;
 202        _timeout = properties.getIcePropertyAsInt("IceDiscovery.Timeout");
 203        _retryCount = properties.getIcePropertyAsInt("IceDiscovery.RetryCount");
 204        _latencyMultiplier = properties.getIcePropertyAsInt("IceDiscovery.LatencyMultiplier");
 205        _domainId = properties.getIceProperty("IceDiscovery.DomainId");
 206        _timer = Ice.Internal.Util.getInstance(lookup.ice_getCommunicator()).timer();
 207
 208        //
 209        // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
 210        // datagram on each endpoint.
 211        //
 212        var single = new Ice.Endpoint[1];
 213        foreach (Ice.Endpoint endpt in lookup.ice_getEndpoints())
 214        {
 215            single[0] = endpt;
 216            _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null;
 217        }
 218        Debug.Assert(_lookups.Count > 0);
 219    }
 220
 221    public void setLookupReply(LookupReplyPrx lookupReply)
 222    {
 223        //
 224        // Use a lookup reply proxy whose address matches the interface used to send multicast datagrams.
 225        //
 226        var single = new Ice.Endpoint[1];
 227        foreach (LookupPrx key in new List<LookupPrx>(_lookups.Keys))
 228        {
 229            var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo();
 230            if (info.mcastInterface.Length > 0)
 231            {
 232                foreach (Ice.Endpoint q in lookupReply.ice_getEndpoints())
 233                {
 234                    Ice.EndpointInfo r = q.getInfo();
 235                    if (r is Ice.IPEndpointInfo &&
 236                        ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface, StringComparison.Ordinal))
 237                    {
 238                        single[0] = q;
 239                        _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single);
 240                        break;
 241                    }
 242                }
 243            }
 244
 245            if (_lookups[key] == null)
 246            {
 247                // Fallback: just use the given lookup reply proxy if no matching endpoint found.
 248                _lookups[key] = lookupReply;
 249            }
 250        }
 251    }
 252
 253    public override void findObjectById(string domainId, Ice.Identity id, LookupReplyPrx reply, Ice.Current current)
 254    {
 255        Ice.ObjectPrx.checkNotNull(reply, current);
 256        if (!domainId.Equals(_domainId, StringComparison.Ordinal))
 257        {
 258            return; // Ignore
 259        }
 260
 261        Ice.ObjectPrx proxy = _registry.findObject(id);
 262        if (proxy != null)
 263        {
 264            //
 265            // Reply to the multicast request using the given proxy.
 266            //
 267            try
 268            {
 269                reply.foundObjectByIdAsync(id, proxy);
 270            }
 271            catch (Ice.LocalException)
 272            {
 273                // Ignore.
 274            }
 275        }
 276    }
 277
 278    public override void findAdapterById(string domainId, string adapterId, LookupReplyPrx reply, Ice.Current current)
 279    {
 280        Ice.ObjectPrx.checkNotNull(reply, current);
 281        if (!domainId.Equals(_domainId, StringComparison.Ordinal))
 282        {
 283            return; // Ignore
 284        }
 285
 286        Ice.ObjectPrx proxy = _registry.findAdapter(adapterId, out bool isReplicaGroup);
 287        if (proxy != null)
 288        {
 289            //
 290            // Reply to the multicast request using the given proxy.
 291            //
 292            try
 293            {
 294                reply.foundAdapterByIdAsync(adapterId, proxy, isReplicaGroup);
 295            }
 296            catch (Ice.LocalException)
 297            {
 298                // Ignore.
 299            }
 300        }
 301    }
 302
 303    internal Task<Ice.ObjectPrx> findObject(Ice.Identity id)
 304    {
 305        lock (_mutex)
 306        {
 307            if (!_objectRequests.TryGetValue(id, out ObjectRequest request))
 308            {
 309                request = new ObjectRequest(this, id, _retryCount);
 310                _objectRequests.Add(id, request);
 311            }
 312
 313            var task = new TaskCompletionSource<Ice.ObjectPrx>(TaskCreationOptions.RunContinuationsAsynchronously);
 314            if (request.addCallback(task))
 315            {
 316                try
 317                {
 318                    request.invoke(_domainId, _lookups);
 319                    _timer.schedule(request, _timeout);
 320                }
 321                catch (Ice.LocalException)
 322                {
 323                    request.finished(null);
 324                    _objectRequests.Remove(id);
 325                }
 326            }
 327            return task.Task;
 328        }
 329    }
 330
 331    internal Task<Ice.ObjectPrx> findAdapter(string adapterId)
 332    {
 333        lock (_mutex)
 334        {
 335            if (!_adapterRequests.TryGetValue(adapterId, out AdapterRequest request))
 336            {
 337                request = new AdapterRequest(this, adapterId, _retryCount);
 338                _adapterRequests.Add(adapterId, request);
 339            }
 340
 341            var task = new TaskCompletionSource<Ice.ObjectPrx>(TaskCreationOptions.RunContinuationsAsynchronously);
 342            if (request.addCallback(task))
 343            {
 344                try
 345                {
 346                    request.invoke(_domainId, _lookups);
 347                    _timer.schedule(request, _timeout);
 348                }
 349                catch (Ice.LocalException)
 350                {
 351                    request.finished(null);
 352                    _adapterRequests.Remove(adapterId);
 353                }
 354            }
 355            return task.Task;
 356        }
 357    }
 358
 359    internal void foundObject(Ice.Identity id, string requestId, Ice.ObjectPrx proxy)
 360    {
 361        lock (_mutex)
 362        {
 363            if (_objectRequests.TryGetValue(id, out ObjectRequest request) && request.getRequestId() == requestId)
 364            {
 365                request.response(proxy);
 366                _timer.cancel(request);
 367                _objectRequests.Remove(id);
 368            }
 369            // else ignore responses from old requests
 370        }
 371    }
 372
 373    internal void foundAdapter(string adapterId, string requestId, Ice.ObjectPrx proxy, bool isReplicaGroup)
 374    {
 375        lock (_mutex)
 376        {
 377            if (
 378                _adapterRequests.TryGetValue(adapterId, out AdapterRequest request) &&
 379                request.getRequestId() == requestId)
 380            {
 381                if (request.response(proxy, isReplicaGroup))
 382                {
 383                    _timer.cancel(request);
 384                    _adapterRequests.Remove(request.getId());
 385                }
 386            }
 387            // else ignore responses from old requests
 388        }
 389    }
 390
 391    internal void objectRequestTimedOut(ObjectRequest request)
 392    {
 393        lock (_mutex)
 394        {
 395            if (!_objectRequests.TryGetValue(request.getId(), out ObjectRequest r) || r != request)
 396            {
 397                return;
 398            }
 399
 400            if (request.retry())
 401            {
 402                try
 403                {
 404                    request.invoke(_domainId, _lookups);
 405                    _timer.schedule(request, _timeout);
 406                    return;
 407                }
 408                catch (Ice.LocalException)
 409                {
 410                }
 411            }
 412
 413            request.finished(null);
 414            _objectRequests.Remove(request.getId());
 415            _timer.cancel(request);
 416        }
 417    }
 418
 419    internal void objectRequestException(ObjectRequest request, Exception ex)
 420    {
 421        lock (_mutex)
 422        {
 423            if (!_objectRequests.TryGetValue(request.getId(), out ObjectRequest r) || r != request)
 424            {
 425                return;
 426            }
 427
 428            if (request.exception())
 429            {
 430                if (_warnOnce)
 431                {
 432                    var s = new StringBuilder();
 433                    s.Append("failed to lookup object `");
 434                    s.Append(_lookup.ice_getCommunicator().identityToString(request.getId()));
 435                    s.Append("' with lookup proxy `");
 436                    s.Append(_lookup);
 437                    s.Append("':\n");
 438                    s.Append(ex.ToString());
 439                    _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
 440                    _warnOnce = false;
 441                }
 442                _timer.cancel(request);
 443                _objectRequests.Remove(request.getId());
 444            }
 445        }
 446    }
 447
 448    internal void adapterRequestTimedOut(AdapterRequest request)
 449    {
 450        lock (_mutex)
 451        {
 452            if (!_adapterRequests.TryGetValue(request.getId(), out AdapterRequest r) || r != request)
 453            {
 454                return;
 455            }
 456
 457            if (request.retry())
 458            {
 459                try
 460                {
 461                    request.invoke(_domainId, _lookups);
 462                    _timer.schedule(request, _timeout);
 463                    return;
 464                }
 465                catch (Ice.LocalException)
 466                {
 467                }
 468            }
 469
 470            request.finished(null);
 471            _adapterRequests.Remove(request.getId());
 472            _timer.cancel(request);
 473        }
 474    }
 475
 476    internal void adapterRequestException(AdapterRequest request, Exception ex)
 477    {
 478        lock (_mutex)
 479        {
 480            if (!_adapterRequests.TryGetValue(request.getId(), out AdapterRequest r) || r != request)
 481            {
 482                return;
 483            }
 484
 485            if (request.exception())
 486            {
 487                if (_warnOnce)
 488                {
 489                    var s = new StringBuilder();
 490                    s.Append("failed to lookup adapter `");
 491                    s.Append(request.getId());
 492                    s.Append("' with lookup proxy `");
 493                    s.Append(_lookup);
 494                    s.Append("':\n");
 495                    s.Append(ex.ToString());
 496                    _lookup.ice_getCommunicator().getLogger().warning(s.ToString());
 497                    _warnOnce = false;
 498                }
 499                _timer.cancel(request);
 500                _adapterRequests.Remove(request.getId());
 501            }
 502        }
 503    }
 504
 505    internal Ice.Internal.Timer timer() => _timer;
 506
 507    internal int latencyMultiplier() => _latencyMultiplier;
 508
 509    private readonly LocatorRegistryI _registry;
 510    private readonly LookupPrx _lookup;
 511    private readonly Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>();
 512    private readonly int _timeout;
 513    private readonly int _retryCount;
 514    private readonly int _latencyMultiplier;
 515    private readonly string _domainId;
 516
 517    private readonly Ice.Internal.Timer _timer;
 518    private bool _warnOnce = true;
 519    private readonly Dictionary<Ice.Identity, ObjectRequest> _objectRequests =
 520        new Dictionary<Ice.Identity, ObjectRequest>();
 521
 522    private readonly Dictionary<string, AdapterRequest> _adapterRequests = new Dictionary<string, AdapterRequest>();
 523    private readonly object _mutex = new();
 524}
 525
 526internal class LookupReplyI : LookupReplyDisp_
 527{
 528    public LookupReplyI(LookupI lookup) => _lookup = lookup;
 529
 530    public override void foundObjectById(Ice.Identity id, Ice.ObjectPrx proxy, Ice.Current c) =>
 531        _lookup.foundObject(id, c.id.name, proxy);
 532
 533    public override void foundAdapterById(string adapterId, Ice.ObjectPrx proxy, bool isReplicaGroup, Ice.Current c) =>
 534        _lookup.foundAdapter(adapterId, c.id.name, proxy, isReplicaGroup);
 535
 536    private readonly LookupI _lookup;
 537}