PluginI.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.IceLocatorDiscovery;

import com.zeroc.Ice.BlobjectAsync;
import com.zeroc.Ice.Communicator;
import com.zeroc.Ice.CommunicatorDestroyedException;
import com.zeroc.Ice.Current;
import com.zeroc.Ice.Endpoint;
import com.zeroc.Ice.EndpointInfo;
import com.zeroc.Ice.IPEndpointInfo;
import com.zeroc.Ice.Identity;
import com.zeroc.Ice.LocalException;
import com.zeroc.Ice.Locator;
import com.zeroc.Ice.LocatorPrx;
import com.zeroc.Ice.LocatorRegistryPrx;
import com.zeroc.Ice.Network;
import com.zeroc.Ice.NoEndpointException;
import com.zeroc.Ice.Object;
import com.zeroc.Ice.ObjectAdapter;
import com.zeroc.Ice.ObjectAdapterDeactivatedException;
import com.zeroc.Ice.ObjectAdapterDestroyedException;
import com.zeroc.Ice.ObjectNotExistException;
import com.zeroc.Ice.ObjectPrx;
import com.zeroc.Ice.OperationInterruptedException;
import com.zeroc.Ice.OperationMode;
import com.zeroc.Ice.Properties;
import com.zeroc.Ice.RequestFailedException;
import com.zeroc.Ice.Time;
import com.zeroc.Ice.UDPEndpointInfo;
import com.zeroc.Ice.UnknownException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

class PluginI implements Plugin {
    private static class Request {
        Request(
                LocatorI locator,
                String operation,
                OperationMode mode,
                byte[] inParams,
                Map<String, String> context,
                CompletableFuture<Object.Ice_invokeResult> f) {
            _locator = locator;
            _operation = operation;
            _mode = mode;
            _inParams = inParams;
            _context = context;
            _future = f;
        }

        void invoke(LocatorPrx l) {
            if (_locatorPrx == null || !_locatorPrx.equals(l)) {
                _locatorPrx = l;
                try {
                    final CompletableFuture<Object.Ice_invokeResult> f =
                        l.ice_invokeAsync(_operation, _mode, _inParams, _context);
                    f.whenComplete(
                        (result, ex) -> {
                            if (ex != null) {
                                exception((LocalException) ex);
                            } else {
                                _future.complete(result);
                            }
                        });
                } catch (LocalException ex) {
                    exception(ex);
                }
            } else {
                assert (_exception != null); // Don't retry if the proxy didn't change
                exception(_exception);
            }
        }

        private void exception(LocalException ex) {
            try {
                throw ex;
            } catch (RequestFailedException exc) {
                _future.completeExceptionally(ex);
            } catch (UnknownException exc) {
                _future.completeExceptionally(ex);
            } catch (NoEndpointException exc) {
                _future.completeExceptionally(new ObjectNotExistException());
            } catch (ObjectAdapterDeactivatedException exc) {
                _future.completeExceptionally(new ObjectNotExistException());
            } catch (ObjectAdapterDestroyedException exc) {
                _future.completeExceptionally(new ObjectNotExistException());
            } catch (CommunicatorDestroyedException exc) {
                _future.completeExceptionally(new ObjectNotExistException());
            } catch (LocalException exc) {
                _exception = exc;
                _locator.invoke(_locatorPrx, Request.this); // Retry with new locator proxy
            }
        }

        private final LocatorI _locator;
        private LocalException _exception;
        private final String _operation;
        private final OperationMode _mode;
        private final Map<String, String> _context;
        private final byte[] _inParams;
        private final CompletableFuture<Object.Ice_invokeResult> _future;

        private LocatorPrx _locatorPrx;
    }

    private static class VoidLocatorI implements Locator {
        @Override
        public CompletionStage<ObjectPrx> findObjectByIdAsync(
                Identity id, Current current) {
            return CompletableFuture.completedFuture((ObjectPrx) null);
        }

        @Override
        public CompletionStage<ObjectPrx> findAdapterByIdAsync(
                String id, Current current) {
            return CompletableFuture.completedFuture((ObjectPrx) null);
        }

        @Override
        public LocatorRegistryPrx getRegistry(Current current) {
            return null;
        }
    }

    private static class LocatorI implements BlobjectAsync {
        LocatorI(
                LookupPrx lookup,
                Properties properties,
                String instanceName,
                LocatorPrx voidLocator) {
            _lookup = lookup;
            _timeout = properties.getIcePropertyAsInt("IceLocatorDiscovery.Timeout");
            if (_timeout < 0) {
                _timeout = 300;
            }
            _retryCount = properties.getIcePropertyAsInt("IceLocatorDiscovery.RetryCount");
            if (_retryCount < 0) {
                _retryCount = 0;
            }
            _retryDelay = properties.getIcePropertyAsInt("IceLocatorDiscovery.RetryDelay");
            if (_retryDelay < 0) {
                _retryDelay = 0;
            }
            _timer = lookup.ice_getCommunicator().getInstance().timer();
            _traceLevel = properties.getIcePropertyAsInt("IceLocatorDiscovery.Trace.Lookup");
            _instanceName = instanceName;
            _warned = false;
            _locator = lookup.ice_getCommunicator().getDefaultLocator();
            _voidLocator = voidLocator;
            _pendingRetryCount = 0;
            _pending = false;
            _failureCount = 0;
            _warnOnce = true;

            // Create one lookup proxy per endpoint from the given proxy. We want to send a
            // multicast
            // datagram on each endpoint.
            Endpoint[] single = new Endpoint[1];
            for (Endpoint endpt : lookup.ice_getEndpoints()) {
                single[0] = endpt;
                _lookups.put((LookupPrx) lookup.ice_endpoints(single), null);
            }
            assert (!_lookups.isEmpty());
        }

        public void setLookupReply(LookupReplyPrx lookupReply) {
            // Use a lookup reply proxy whose address matches the interface used to send multicast
            // datagrams.
            Endpoint[] single = new Endpoint[1];
            for (Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) {
                UDPEndpointInfo info =
                    (UDPEndpointInfo)
                        entry.getKey().ice_getEndpoints()[0].getInfo();
                if (!info.mcastInterface.isEmpty()) {
                    for (Endpoint q : lookupReply.ice_getEndpoints()) {
                        EndpointInfo r = q.getInfo();
                        if (r instanceof IPEndpointInfo
                            && ((IPEndpointInfo) r)
                            .host.equals(info.mcastInterface)) {
                            single[0] = q;
                            entry.setValue((LookupReplyPrx) lookupReply.ice_endpoints(single));
                        }
                    }
                }

                if (entry.getValue() == null) {
                    // Fallback: just use the given lookup reply proxy if no matching endpoint
                    // found.
                    entry.setValue(lookupReply);
                }
            }
        }

        @Override
        public CompletionStage<Object.Ice_invokeResult> ice_invokeAsync(
                byte[] inParams, Current current) {
            CompletableFuture<Object.Ice_invokeResult> f = new CompletableFuture<>();
            invoke(
                null,
                new Request(this, current.operation, current.mode, inParams, current.ctx, f));
            return f;
        }

        public List<LocatorPrx> getLocators(String instanceName, int waitTime) {
            // Clear locators from previous search.
            synchronized (this) {
                _locators.clear();
            }

            // Find a locator
            invoke(null, null);

            // Wait for responses
            try {
                if (instanceName.isEmpty()) {
                    Thread.sleep(waitTime);
                } else {
                    synchronized (this) {
                        while (!_locators.containsKey(instanceName) && _pending) {
                            wait(waitTime);
                        }
                    }
                }
            } catch (InterruptedException ex) {
                throw new OperationInterruptedException(ex);
            }

            // Return found locators
            synchronized (this) {
                return new ArrayList<>(_locators.values());
            }
        }

        public synchronized void foundLocator(LocatorPrx locator) {
            if (locator == null) {
                if (_traceLevel > 2) {
                    _lookup.ice_getCommunicator()
                        .getLogger()
                        .trace("Lookup", "ignoring locator reply: (null locator)");
                }
                return;
            }

            if (!_instanceName.isEmpty()
                && !locator.ice_getIdentity().category.equals(_instanceName)) {
                if (_traceLevel > 2) {
                    StringBuffer s =
                        new StringBuffer(
                            "ignoring locator reply: instance name doesn't match\n");
                    s.append("expected = ").append(_instanceName);
                    s.append("received = ").append(locator.ice_getIdentity().category);
                    _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.toString());
                }
                return;
            }

            // If we already have a locator assigned, ensure the given locator has the same
            // identity, otherwise ignore it.
            if (!_pendingRequests.isEmpty()
                && _locator != null
                && !locator.ice_getIdentity()
                .category
                .equals(_locator.ice_getIdentity().category)) {
                if (!_warned) {
                    _warned = true; // Only warn once

                    locator.ice_getCommunicator()
                        .getLogger()
                        .warning(
                            "received Ice locator with different instance name:\n"
                                + "using = `"
                                + _locator.ice_getIdentity().category
                                + "'\n"
                                + "received = `"
                                + locator.ice_getIdentity().category
                                + "'\n"
                                + "This is typically the case if multiple Ice locators"
                                + " with different instance names are deployed and the"
                                + " property `IceLocatorDiscovery.InstanceName'is not"
                                + " set.");
                }
                return;
            }

            if (_pending) {
                // No need to continue, we found a locator
                _future.cancel(false);
                _future = null;
                _pendingRetryCount = 0;
                _pending = false;
            }

            if (_traceLevel > 0) {
                StringBuffer s = new StringBuffer("locator lookup succeeded:\nlocator = ");
                s.append(locator);
                if (!_instanceName.isEmpty()) {
                    s.append("\ninstance name = ").append(_instanceName);
                }
                _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.toString());
            }

            LocatorPrx l =
                _pendingRequests.isEmpty()
                    ? _locators.get(locator.ice_getIdentity().category)
                    : _locator;
            if (l != null) {
                // We found another locator replica, append its endpoints to the
                // current locator proxy endpoints.
                List<Endpoint> newEndpoints =
                    new ArrayList<>(Arrays.asList(l.ice_getEndpoints()));
                for (Endpoint p : locator.ice_getEndpoints()) {
                    // Only add endpoints if not already in the locator proxy endpoints
                    boolean found = false;
                    for (Endpoint q : newEndpoints) {
                        if (p.equals(q)) {
                            found = true;
                            break;
                        }
                    }
                    if (!found) {
                        newEndpoints.add(p);
                    }
                }
                l =
                    (LocatorPrx)
                        l.ice_endpoints(
                            newEndpoints.toArray(
                                new Endpoint[newEndpoints.size()]));
            } else {
                l = locator;
            }

            if (_pendingRequests.isEmpty()) {
                _locators.put(locator.ice_getIdentity().category, l);
                notify();
            } else {
                _locator = l;
                if (_instanceName.isEmpty()) {
                    _instanceName =
                        _locator.ice_getIdentity().category; // Stick to the first locator
                }

                // Send pending requests if any.
                for (Request req : _pendingRequests) {
                    req.invoke(_locator);
                }
                _pendingRequests.clear();
            }
        }

        public synchronized void invoke(LocatorPrx locator, Request request) {
            if (request != null && _locator != null && _locator != locator) {
                request.invoke(_locator);
            } else if (request != null
                && Time.currentMonotonicTimeMillis() < _nextRetry) {
                request.invoke(
                    _voidLocator); // Don't retry to find a locator before the retry delay
                // expires
            } else {
                _locator = null;

                if (request != null) {
                    _pendingRequests.add(request);
                }

                // If there are no requests in progress.
                if (!_pending) {
                    _pending = true;
                    _pendingRetryCount = _retryCount;
                    _failureCount = 0;
                    try {
                        if (_traceLevel > 1) {
                            StringBuilder s = new StringBuilder("looking up locator:\nlookup = ");
                            s.append(_lookup);
                            if (!_instanceName.isEmpty()) {
                                s.append("\ninstance name = ").append(_instanceName);
                            }
                            _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.toString());
                        }
                        for (Map.Entry<LookupPrx, LookupReplyPrx> entry : _lookups.entrySet()) {
                            entry.getKey()
                                .findLocatorAsync(_instanceName, entry.getValue())
                                .whenCompleteAsync(
                                    (v, ex) -> {
                                        if (ex != null) {
                                            exception(ex);
                                        }
                                    },
                                    entry.getKey()
                                        .ice_executor()); // Send multicast request.
                        }
                        _future =
                            _timer.schedule(
                                _retryTask,
                                _timeout,
                                TimeUnit.MILLISECONDS);
                    } catch (LocalException ex) {
                        if (_traceLevel > 0) {
                            StringBuilder s =
                                new StringBuilder("locator lookup failed:\nlookup = ");
                            s.append(_lookup);
                            if (!_instanceName.isEmpty()) {
                                s.append("\ninstance name = ").append(_instanceName);
                            }
                            s.append("\n").append(ex);
                            _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.toString());
                        }

                        for (Request req : _pendingRequests) {
                            req.invoke(_voidLocator);
                        }
                        _pendingRequests.clear();
                        _pending = false;
                        _pendingRetryCount = 0;
                    }
                }
            }
        }

        synchronized void exception(Throwable ex) {
            if (++_failureCount == _lookups.size() && _pending) {
                // All the lookup calls failed, cancel the timer and propagate the error to the
                // requests.
                _future.cancel(false);
                _future = null;
                _pendingRetryCount = 0;
                _pending = false;

                if (_warnOnce) {
                    StringBuilder builder = new StringBuilder();
                    builder.append("failed to lookup locator with lookup proxy `");
                    builder.append(_lookup);
                    builder.append("':\n");
                    builder.append(ex);
                    _lookup.ice_getCommunicator().getLogger().warning(builder.toString());
                    _warnOnce = false;
                }

                if (_traceLevel > 0) {
                    StringBuilder s = new StringBuilder("locator lookup failed:\nlookup = ");
                    s.append(_lookup);
                    if (!_instanceName.isEmpty()) {
                        s.append("\ninstance name = ").append(_instanceName);
                    }
                    s.append("\n").append(ex);
                    _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.toString());
                }

                if (_pendingRequests.isEmpty()) {
                    notify();
                } else {
                    for (Request req : _pendingRequests) {
                        req.invoke(_voidLocator);
                    }
                    _pendingRequests.clear();
                }
            }
        }

        private Runnable _retryTask =
            new Runnable() {
                @Override
                public void run() {
                    synchronized (LocatorI.this) {
                        if (!_pending) {
                            assert (_pendingRequests.isEmpty());
                            return; // Request failed
                        }

                        if (_pendingRetryCount > 0) {
                            --_pendingRetryCount;
                            try {
                                if (_traceLevel > 1) {
                                    StringBuilder s =
                                        new StringBuilder(
                                            "retrying locator lookup:\nlookup = ");
                                    s.append(_lookup);
                                    s.append("\nretry count = ").append(_retryCount);
                                    if (!_instanceName.isEmpty()) {
                                        s.append("\ninstance name = ").append(_instanceName);
                                    }
                                    _lookup.ice_getCommunicator()
                                        .getLogger()
                                        .trace("Lookup", s.toString());
                                }

                                _failureCount = 0;
                                for (Map.Entry<LookupPrx, LookupReplyPrx> entry :
                                            _lookups.entrySet()) {
                                    entry.getKey()
                                        .findLocatorAsync(_instanceName, entry.getValue())
                                        .whenCompleteAsync(
                                            (v, ex) -> {
                                                if (ex != null) {
                                                    exception(ex);
                                                }
                                            },
                                            entry.getKey()
                                                .ice_executor()); // Send multicast
                                    // request.
                                }
                                _future =
                                    _timer.schedule(
                                        _retryTask,
                                        _timeout,
                                        TimeUnit.MILLISECONDS);
                                return;
                            } catch (LocalException ex) {}
                            _pendingRetryCount = 0;
                        }

                        assert (_pendingRetryCount == 0);
                        _pending = false;

                        if (_traceLevel > 0) {
                            StringBuilder s =
                                new StringBuilder("locator lookup timed out:\nlookup = ");
                            s.append(_lookup);
                            if (!_instanceName.isEmpty()) {
                                s.append("\ninstance name = ").append(_instanceName);
                            }
                            _lookup.ice_getCommunicator()
                                .getLogger()
                                .trace("Lookup", s.toString());
                        }

                        if (_pendingRequests.isEmpty()) {
                            notify();
                        } else {
                            for (Request req : _pendingRequests) {
                                req.invoke(_voidLocator);
                            }
                            _pendingRequests.clear();
                        }
                        _nextRetry =
                            Time.currentMonotonicTimeMillis() + _retryDelay;
                    }
                }
            };

        private final LookupPrx _lookup;
        private final Map<LookupPrx, LookupReplyPrx> _lookups = new HashMap<>();
        private int _timeout;
        private Future<?> _future;
        private final ScheduledExecutorService _timer;
        private final int _traceLevel;
        private int _retryCount;
        private int _retryDelay;

        private String _instanceName;
        private boolean _warned;
        private LocatorPrx _locator;
        private LocatorPrx _voidLocator;
        private Map<String, LocatorPrx> _locators = new HashMap<>();

        private boolean _pending;
        private int _pendingRetryCount;
        private int _failureCount;
        private boolean _warnOnce;
        private List<Request> _pendingRequests = new ArrayList<>();
        private long _nextRetry;
    }

    private class LookupReplyI implements LookupReply {
        LookupReplyI(LocatorI locator) {
            _locator = locator;
        }

        @Override
        public void foundLocator(LocatorPrx locator, Current curr) {
            _locator.foundLocator(locator);
        }

        private final LocatorI _locator;
    }

    public PluginI(Communicator communicator) {
        _communicator = communicator;
    }

    @Override
    public void initialize() {
        Properties properties = _communicator.getProperties();

        boolean ipv4 = properties.getIcePropertyAsInt("Ice.IPv4") > 0;
        boolean preferIPv6 = properties.getIcePropertyAsInt("Ice.PreferIPv6Address") > 0;
        String address = properties.getIceProperty("IceLocatorDiscovery.Address");
        if (address.isEmpty()) {
            address = ipv4 && !preferIPv6 ? "239.255.0.1" : "ff15::1";
        }
        int port = properties.getIcePropertyAsInt("IceLocatorDiscovery.Port");
        String intf = properties.getIceProperty("IceLocatorDiscovery.Interface");

        String lookupEndpoints = properties.getIceProperty("IceLocatorDiscovery.Lookup");
        if (lookupEndpoints.isEmpty()) {
            int protocol = ipv4 && !preferIPv6 ? Network.EnableIPv4 : Network.EnableIPv6;
            List<String> interfaces = Network.getInterfacesForMulticast(intf, protocol);
            for (String p : interfaces) {
                if (p != interfaces.get(0)) {
                    lookupEndpoints += ":";
                }
                lookupEndpoints +=
                    "udp -h \"" + address + "\" -p " + port + " --interface \"" + p + "\"";
            }
        }

        if (properties.getIceProperty("IceLocatorDiscovery.Reply.Endpoints").isEmpty()) {
            properties.setProperty(
                "IceLocatorDiscovery.Reply.Endpoints",
                "udp -h " + (intf.isEmpty() ? "*" : "\"" + intf + "\""));
        }

        if (properties.getIceProperty("IceLocatorDiscovery.Locator.Endpoints").isEmpty()) {
            properties.setProperty(
                "IceLocatorDiscovery.Locator.AdapterId",
                UUID.randomUUID().toString());
        }

        _replyAdapter = _communicator.createObjectAdapter("IceLocatorDiscovery.Reply");
        _locatorAdapter = _communicator.createObjectAdapter("IceLocatorDiscovery.Locator");

        // We don't want those adapters to be registered with the locator so clear their locator.
        _replyAdapter.setLocator(null);
        _locatorAdapter.setLocator(null);

        ObjectPrx lookupPrx =
            ObjectPrx.createProxy(_communicator, "IceLocatorDiscovery/Lookup -d:" + lookupEndpoints);
        lookupPrx = lookupPrx.ice_router(null);

        LocatorPrx voidLoc =
            LocatorPrx.uncheckedCast(
                _locatorAdapter.addWithUUID(new VoidLocatorI()));

        String instanceName = properties.getIceProperty("IceLocatorDiscovery.InstanceName");
        Identity id = new Identity();
        id.name = "Locator";
        id.category =
            !instanceName.isEmpty() ? instanceName : UUID.randomUUID().toString();
        _locator =
            new LocatorI(LookupPrx.uncheckedCast(lookupPrx), properties, instanceName, voidLoc);
        _defaultLocator = _communicator.getDefaultLocator();
        _locatorPrx = LocatorPrx.uncheckedCast(_locatorAdapter.add(_locator, id));
        _communicator.setDefaultLocator(_locatorPrx);

        ObjectPrx lookupReply =
            _replyAdapter.addWithUUID(new LookupReplyI(_locator)).ice_datagram();
        _locator.setLookupReply(LookupReplyPrx.uncheckedCast(lookupReply));

        _replyAdapter.activate();
        _locatorAdapter.activate();
    }

    @Override
    public void destroy() {
        if (_replyAdapter != null) {
            _replyAdapter.destroy();
        }
        if (_locatorAdapter != null) {
            _locatorAdapter.destroy();
        }
        if (_communicator.getDefaultLocator().equals(_locatorPrx)) {
            // Restore original default locator proxy, if the user didn't change it in the meantime
            _communicator.setDefaultLocator(_defaultLocator);
        }
    }

    public List<LocatorPrx> getLocators(String instanceName, int waitTime) {
        return _locator.getLocators(instanceName, waitTime);
    }

    private final Communicator _communicator;
    private ObjectAdapter _locatorAdapter;
    private ObjectAdapter _replyAdapter;
    private LocatorI _locator;
    private LocatorPrx _locatorPrx;
    private LocatorPrx _defaultLocator;
}