LookupI.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.IceDiscovery;
import com.zeroc.Ice.Current;
import com.zeroc.Ice.Endpoint;
import com.zeroc.Ice.EndpointInfo;
import com.zeroc.Ice.IPEndpointInfo;
import com.zeroc.Ice.Identity;
import com.zeroc.Ice.LocalException;
import com.zeroc.Ice.ObjectPrx;
import com.zeroc.Ice.Properties;
import com.zeroc.Ice.UDPEndpointInfo;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
class LookupI implements Lookup {
private abstract class Request<T, Ret> implements Runnable {
Request(T id, int retryCount) {
_id = id;
_requestId = UUID.randomUUID().toString();
_retryCount = retryCount;
}
T getId() {
return _id;
}
boolean addFuture(CompletableFuture<Ret> f) {
_futures.add(f);
return _futures.size() == 1;
}
boolean retry() {
return --_retryCount >= 0;
}
void invoke(String domainId, Map<LookupPrx, LookupReplyPrx> lookups) {
_lookupCount = lookups.size();
_failureCount = 0;
final Identity id = new Identity(_requestId, "");
for (Map.Entry<LookupPrx, LookupReplyPrx> entry : lookups.entrySet()) {
invokeWithLookup(
domainId,
entry.getKey(),
LookupReplyPrx.uncheckedCast(entry.getValue().ice_identity(id)));
}
}
boolean exception() {
if (++_failureCount == _lookupCount) {
finished(null);
return true;
}
return false;
}
String getRequestId() {
return _requestId;
}
void scheduleTimer(long timeout) {
_future = _timer.schedule(this, timeout, TimeUnit.MILLISECONDS);
}
void cancelTimer() {
assert _future != null;
_future.cancel(false);
_future = null;
}
abstract void finished(ObjectPrx proxy);
protected abstract void invokeWithLookup(
String domainId, LookupPrx lookup, LookupReplyPrx lookupReply);
private final String _requestId;
protected int _retryCount;
protected int _lookupCount;
protected int _failureCount;
protected List<CompletableFuture<Ret>> _futures = new ArrayList<>();
protected T _id;
protected Future<?> _future;
}
private class AdapterRequest extends Request<String, ObjectPrx> {
AdapterRequest(String id, int retryCount) {
super(id, retryCount);
_start = System.nanoTime();
_latency = 0;
}
@Override
boolean retry() {
return _proxies.isEmpty() && --_retryCount >= 0;
}
boolean response(ObjectPrx proxy, boolean isReplicaGroup) {
if (isReplicaGroup) {
_proxies.add(proxy);
if (_latency == 0) {
_latency =
(long) ((System.nanoTime() - _start) * _latencyMultiplier / 100000.0);
if (_latency == 0) {
_latency = 1; // 1ms
}
cancelTimer();
scheduleTimer(_latency);
}
return false;
}
finished(proxy);
return true;
}
@Override
void finished(ObjectPrx proxy) {
if (proxy != null || _proxies.isEmpty()) {
sendResponse(proxy);
} else if (_proxies.size() == 1) {
sendResponse(_proxies.toArray(new ObjectPrx[1])[0]);
} else {
List<Endpoint> endpoints = new ArrayList<>();
ObjectPrx result = null;
for (ObjectPrx prx : _proxies) {
if (result == null) {
result = prx;
}
endpoints.addAll(Arrays.asList(prx.ice_getEndpoints()));
}
sendResponse(
result.ice_endpoints(
endpoints.toArray(new Endpoint[endpoints.size()])));
}
}
@Override
public void run() {
adapterRequestTimedOut(this);
}
@Override
protected void invokeWithLookup(
String domainId, LookupPrx lookup, LookupReplyPrx lookupReply) {
lookup.findAdapterByIdAsync(domainId, _id, lookupReply)
.whenCompleteAsync(
(v, ex) -> {
if (ex != null) {
adapterRequestException(AdapterRequest.this, ex);
}
},
lookup.ice_executor());
}
private void sendResponse(ObjectPrx proxy) {
for (CompletableFuture<ObjectPrx> f : _futures) {
f.complete(proxy);
}
_futures.clear();
}
// We use a set because the same IceDiscovery plugin might return multiple times
// the same proxy if it's accessible through multiple network interfaces and if we
// also sent the request to multiple interfaces.
private Set<ObjectPrx> _proxies = new HashSet<>();
private long _start;
private long _latency;
}
private class ObjectRequest extends Request<Identity, ObjectPrx> {
ObjectRequest(Identity id, int retryCount) {
super(id, retryCount);
}
void response(ObjectPrx proxy) {
finished(proxy);
}
@Override
void finished(ObjectPrx proxy) {
for (CompletableFuture<ObjectPrx> f : _futures) {
f.complete(proxy);
}
_futures.clear();
}
@Override
public void run() {
objectRequestTimedOut(this);
}
@Override
protected void invokeWithLookup(
String domainId, LookupPrx lookup, LookupReplyPrx lookupReply) {
lookup.findObjectByIdAsync(domainId, _id, lookupReply)
.whenCompleteAsync(
(v, ex) -> {
if (ex != null) {
objectRequestException(ObjectRequest.this, ex);
}
},
lookup.ice_executor());
}
}
public LookupI(
LocatorRegistryI registry, LookupPrx lookup, Properties properties) {
_registry = registry;
_lookup = lookup;
_timeout = properties.getIcePropertyAsInt("IceDiscovery.Timeout");
_retryCount = properties.getIcePropertyAsInt("IceDiscovery.RetryCount");
_latencyMultiplier = properties.getIcePropertyAsInt("IceDiscovery.LatencyMultiplier");
_domainId = properties.getIceProperty("IceDiscovery.DomainId");
_timer = lookup.ice_getCommunicator().getInstance().timer();
Endpoint[] single = new Endpoint[1];
for (Endpoint endpt : lookup.ice_getEndpoints()) {
single[0] = endpt;
_lookups.put((LookupPrx) lookup.ice_endpoints(single), null);
}
assert (!_lookups.isEmpty());
}
void setLookupReply(LookupReplyPrx lookupReply) {
// Use a lookup reply proxy whose address matches the interface used to send multicast
// datagrams.
Endpoint[] single = new Endpoint[1];
for (Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) {
UDPEndpointInfo info =
(UDPEndpointInfo) entry.getKey().ice_getEndpoints()[0].getInfo();
if (!info.mcastInterface.isEmpty()) {
for (Endpoint q : lookupReply.ice_getEndpoints()) {
EndpointInfo r = q.getInfo();
if (r instanceof IPEndpointInfo
&& ((IPEndpointInfo) r)
.host.equals(info.mcastInterface)) {
single[0] = q;
entry.setValue((LookupReplyPrx) lookupReply.ice_endpoints(single));
}
}
}
if (entry.getValue() == null) {
// Fallback: just use the given lookup reply proxy if no matching endpoint found.
entry.setValue(lookupReply);
}
}
}
@Override
public void findObjectById(
String domainId,
Identity id,
LookupReplyPrx reply,
Current c) {
if (!domainId.equals(_domainId)) {
return; // Ignore.
}
ObjectPrx proxy = _registry.findObject(id);
if (proxy != null) {
// Reply to the multicast request using the given proxy.
try {
reply.foundObjectByIdAsync(id, proxy);
} catch (LocalException ex) {
// Ignore
}
}
}
@Override
public void findAdapterById(
String domainId,
String adapterId,
LookupReplyPrx reply,
Current c) {
if (!domainId.equals(_domainId)) {
return; // Ignore.
}
LocatorRegistryI.FindAdapterResult r = _registry.findAdapter(adapterId);
if (r.returnValue != null) {
// Reply to the multicast request using the given proxy.
try {
reply.foundAdapterByIdAsync(adapterId, r.returnValue, r.isReplicaGroup);
} catch (LocalException ex) {
// Ignore
}
}
}
synchronized void findObject(
CompletableFuture<ObjectPrx> f, Identity id) {
ObjectRequest request = _objectRequests.get(id);
if (request == null) {
request = new ObjectRequest(id, _retryCount);
_objectRequests.put(id, request);
}
if (request.addFuture(f)) {
try {
request.invoke(_domainId, _lookups);
request.scheduleTimer(_timeout);
} catch (LocalException ex) {
request.finished(null);
_objectRequests.remove(id);
}
}
}
synchronized void findAdapter(CompletableFuture<ObjectPrx> f, String adapterId) {
AdapterRequest request = _adapterRequests.get(adapterId);
if (request == null) {
request = new AdapterRequest(adapterId, _retryCount);
_adapterRequests.put(adapterId, request);
}
if (request.addFuture(f)) {
try {
request.invoke(_domainId, _lookups);
request.scheduleTimer(_timeout);
} catch (LocalException ex) {
request.finished(null);
_adapterRequests.remove(adapterId);
}
}
}
synchronized void foundObject(Identity id, String requestId, ObjectPrx proxy) {
// Ignore responses from old requests
ObjectRequest request = _objectRequests.get(id);
if (request != null && request.getRequestId().equals(requestId)) {
request.response(proxy);
request.cancelTimer();
_objectRequests.remove(id);
}
}
synchronized void foundAdapter(String adapterId, String requestId, ObjectPrx proxy, boolean isReplicaGroup) {
// Ignore responses from old requests
AdapterRequest request = _adapterRequests.get(adapterId);
if (request != null && request.getRequestId().equals(requestId)) {
if (request.response(proxy, isReplicaGroup)) {
request.cancelTimer();
_adapterRequests.remove(adapterId);
}
}
}
synchronized void objectRequestTimedOut(ObjectRequest request) {
ObjectRequest r = _objectRequests.get(request.getId());
if (r == null || request != r) {
return;
}
if (request.retry()) {
try {
request.invoke(_domainId, _lookups);
request.scheduleTimer(_timeout);
return;
} catch (LocalException ex) {}
}
request.finished(null);
_objectRequests.remove(request.getId());
}
synchronized void objectRequestException(ObjectRequest request, Throwable ex) {
ObjectRequest r = _objectRequests.get(request.getId());
if (r == null || r != request) {
return;
}
if (request.exception()) {
if (_warnOnce) {
StringBuilder s = new StringBuilder();
s.append("failed to lookup object `");
s.append(_lookup.ice_getCommunicator().identityToString(request.getId()));
s.append("' with lookup proxy `");
s.append(_lookup);
s.append("':\n");
s.append(ex.toString());
_lookup.ice_getCommunicator().getLogger().warning(s.toString());
_warnOnce = false;
}
request.cancelTimer();
_objectRequests.remove(request.getId());
}
}
synchronized void adapterRequestTimedOut(AdapterRequest request) {
AdapterRequest r = _adapterRequests.get(request.getId());
if (r == null || r != request) {
return;
}
if (request.retry()) {
try {
request.invoke(_domainId, _lookups);
request.scheduleTimer(_timeout);
return;
} catch (LocalException ex) {}
}
request.finished(null);
_adapterRequests.remove(request.getId());
}
synchronized void adapterRequestException(AdapterRequest request, Throwable ex) {
AdapterRequest r = _adapterRequests.get(request.getId());
if (r == null || r != request) {
return;
}
if (request.exception()) {
if (_warnOnce) {
StringBuilder s = new StringBuilder();
s.append("failed to lookup adapter `");
s.append(request.getId());
s.append("' with lookup proxy `");
s.append(_lookup);
s.append("':\n");
s.append(ex.toString());
_lookup.ice_getCommunicator().getLogger().warning(s.toString());
_warnOnce = false;
}
request.cancelTimer();
_adapterRequests.remove(request.getId());
}
}
private final LocatorRegistryI _registry;
private final LookupPrx _lookup;
private final Map<LookupPrx, LookupReplyPrx> _lookups = new HashMap<>();
private final int _timeout;
private final int _retryCount;
private final int _latencyMultiplier;
private final String _domainId;
private final ScheduledExecutorService _timer;
private boolean _warnOnce = true;
private final Map<Identity, ObjectRequest> _objectRequests = new HashMap<>();
private final Map<String, AdapterRequest> _adapterRequests = new HashMap<>();
}