EndpointHostResolver.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import com.zeroc.Ice.Instrumentation.CommunicatorObserver;
import com.zeroc.Ice.Instrumentation.Observer;
import com.zeroc.Ice.Instrumentation.ThreadObserver;
import com.zeroc.Ice.Instrumentation.ThreadState;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class EndpointHostResolver {
EndpointHostResolver(Instance instance) {
_instance = instance;
_protocol = instance.protocolSupport();
_preferIPv6 = instance.preferIPv6();
try {
_threadName =
Util.createThreadName(
_instance.initializationData().properties, "Ice.HostResolver");
_executor =
Executors.newFixedThreadPool(
1,
Util.createThreadFactory(
_instance.initializationData().properties, _threadName));
updateObserver();
} catch (RuntimeException ex) {
String s =
"cannot create thread for endpoint host resolver thread:\n" + Ex.toString(ex);
_instance.initializationData().logger.error(s);
throw ex;
}
}
synchronized void resolve(
final String host,
final int port,
final IPEndpointI endpoint,
final EndpointI_connectors callback) {
//
// TODO: Optimize to avoid the lookup if the given host is a textual IPv4 or IPv6 address.
// This requires implementing parsing of IPv4/IPv6 addresses (Java does not provide such
// methods).
//
assert (!_destroyed);
NetworkProxy networkProxy = _instance.networkProxy();
if (networkProxy == null) {
List<InetSocketAddress> addrs =
Network.getAddresses(host, port, _protocol, _preferIPv6, false);
if (addrs != null) {
callback.connectors(endpoint.connectors(addrs, networkProxy));
return;
}
}
final ThreadObserver threadObserver = _observer;
final Observer observer = getObserver(endpoint);
if (observer != null) {
observer.attach();
}
_executor.execute(
() -> {
synchronized (EndpointHostResolver.this) {
if (_destroyed) {
var ex = new CommunicatorDestroyedException();
if (observer != null) {
observer.failed(ex.ice_id());
observer.detach();
}
callback.exception(ex);
return;
}
}
if (threadObserver != null) {
threadObserver.stateChanged(
ThreadState.ThreadStateIdle, ThreadState.ThreadStateInUseForOther);
}
Observer obsv = observer;
try {
int protocol = _protocol;
NetworkProxy np = _instance.networkProxy();
if (np != null) {
np = np.resolveHost(_protocol);
if (np != null) {
protocol = np.getProtocolSupport();
}
}
List<InetSocketAddress> addresses =
Network.getAddresses(host, port, protocol, _preferIPv6, true);
if (obsv != null) {
obsv.detach();
obsv = null;
}
callback.connectors(endpoint.connectors(addresses, np));
} catch (LocalException ex) {
if (obsv != null) {
obsv.failed(ex.ice_id());
obsv.detach();
}
callback.exception(ex);
} finally {
if (threadObserver != null) {
threadObserver.stateChanged(
ThreadState.ThreadStateInUseForOther,
ThreadState.ThreadStateIdle);
}
}
});
}
synchronized void destroy() {
if (!_destroyed) {
_destroyed = true;
//
// Shutdown the executor. No new tasks will be accepted. Existing tasks will execute.
//
_executor.shutdown();
}
}
void joinWithThread() throws InterruptedException {
// Wait for the executor to terminate.
try {
while (!_executor.isTerminated()) {
// A very long time.
_executor.awaitTermination(100000, TimeUnit.SECONDS);
}
} finally {
if (_observer != null) {
_observer.detach();
}
}
}
synchronized void updateObserver() {
CommunicatorObserver obsv =
_instance.initializationData().observer;
if (obsv != null) {
_observer =
obsv.getThreadObserver(
"Communicator", _threadName, ThreadState.ThreadStateIdle, _observer);
if (_observer != null) {
_observer.attach();
}
}
}
private Observer getObserver(IPEndpointI endpoint) {
CommunicatorObserver obsv =
_instance.initializationData().observer;
if (obsv != null) {
return obsv.getEndpointLookupObserver(endpoint);
}
return null;
}
private final Instance _instance;
private final int _protocol;
private final boolean _preferIPv6;
private boolean _destroyed;
private ThreadObserver _observer;
private String _threadName;
private ExecutorService _executor;
}