UdpMulticastClientTransceiver.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.nio.channels.SelectableChannel;
import java.util.LinkedList;
import java.util.List;

//
// This class is only used on Android, where the java.nio.channels.MulticastChannel interface is not
// supported.
//
// NOTE: Most of the important methods on java.net.MulticastSocket are synchronized.
//
final class UdpMulticastClientTransceiver implements Transceiver {
    @Override
    public SelectableChannel fd() {
        //
        // Android doesn't provide non-blocking APIs for UDP multicast.
        //
        return null;
    }

    @Override
    public void setReadyCallback(ReadyCallback callback) {
        assert (_readyCallback == null && callback != null);
        _readyCallback = callback;
        _thread.start();
    }

    @Override
    public int initialize(Buffer readBuffer, Buffer writeBuffer) {
        //
        // Nothing to do.
        //
        return SocketOperation.None;
    }

    @Override
    public int closing(boolean initiator, LocalException ex) {
        //
        // Nothing to do.
        //
        return SocketOperation.None;
    }

    @Override
    public void close() {
        MulticastSocket socket;
        Thread thread;

        synchronized (this) {
            socket = _socket;
            _socket = null;

            thread = _thread;
            _thread = null;

            if (thread != null) {
                notifyAll(); // Wake up the thread.
            }
        }

        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException ex) {
                // Ignore.
            }
        }

        if (socket != null) {
            socket.close();
        }
    }

    @Override
    public EndpointI bind() {
        //
        // Nothing to do for a client transceiver.
        //
        return null;
    }

    @Override
    public synchronized int write(Buffer buf) {
        if (_exception != null) {
            throw _exception;
            // throw (LocalException) _exception.fillInStackTrace();
        }

        if (!buf.b.hasRemaining()) {
            return SocketOperation.None;
        }

        assert (buf.b.position() == 0);
        assert (_socket != null);

        //
        // The caller is supposed to check the send size before by calling checkSendSize.
        //
        assert (java.lang.Math.min(_maxPacketSize, _size - _udpOverhead) >= buf.size());

        //
        // Queue the buffer for processing by the write thread.
        //
        _buffers.add(new Buffer(buf, true));
        notifyAll();

        return SocketOperation.None;
    }

    @Override
    public synchronized int read(Buffer buf) {
        //
        // This transceiver can only write.
        //
        throw new SocketException();
    }

    @Override
    public String protocol() {
        return _instance.protocol();
    }

    @Override
    public synchronized String toString() {
        if (_socket == null) {
            return "<closed>";
        }

        return "multicast address = " + Network.addrToString(_addr);
    }

    @Override
    public String toDetailedString() {
        StringBuilder s = new StringBuilder(toString());
        List<String> intfs =
            Network.getInterfacesForMulticast(
                _mcastInterface, Network.getProtocolSupport(_addr));
        if (!intfs.isEmpty()) {
            s.append("\nlocal interfaces = ");
            s.append(String.join(", ", intfs));
        }
        return s.toString();
    }

    @Override
    public synchronized ConnectionInfo getInfo(
            boolean incoming, String adapterName, String connectionId) {
        return new UDPConnectionInfo(
            incoming,
            adapterName,
            connectionId,
            _socket != null ? _socket.getLocalAddress().getHostAddress() : "",
            _socket != null ? _socket.getLocalPort() : -1,
            "",
            -1,
            _socket != null ? _addr.getAddress().getHostAddress() : "",
            _socket != null ? _addr.getPort() : -1,
            0,
            _size);
    }

    @Override
    public synchronized void checkSendSize(Buffer buf) {
        //
        // The maximum packetSize is either the maximum allowable UDP packet size, or the UDP send
        // buffer size (whichever is smaller).
        //
        final int packetSize = java.lang.Math.min(_maxPacketSize, _size - _udpOverhead);
        if (packetSize < buf.size()) {
            throw new DatagramLimitException();
        }
    }

    @Override
    public synchronized void setBufferSize(int rcvSize, int sndSize) {
        setBufSize(sndSize);
    }

    //
    // Only for use by UdpConnector
    //
    UdpMulticastClientTransceiver(
            ProtocolInstance instance,
            InetSocketAddress addr,
            String mcastInterface,
            int mcastTtl) {
        assert (addr.getAddress().isMulticastAddress());

        _instance = instance;
        _addr = addr;
        _mcastInterface = mcastInterface;

        try {
            _socket = new MulticastSocket();

            //
            // Configure the send buffer size.
            //
            _size = _socket.getSendBufferSize();
            _newSize = -1;
            setBufSize(-1);
            if (_newSize != -1) {
                updateBufSize();
            }

            //
            // NOTE: Setting the multicast interface before performing the connect is important for
            // some systems such as macOS.
            //
            if (!mcastInterface.isEmpty()) {
                _socket.setNetworkInterface(Network.getInterface(mcastInterface));
            }
            if (mcastTtl != -1) {
                _socket.setTimeToLive(mcastTtl);
            }

            _socket.connect(addr); // Does not block

            _thread =
                new Thread() {
                    public void run() {
                        setName("IceUDPMulticast.WriteThread");
                        runWriteThread();
                    }
                };
        } catch (Exception ex) {
            if (_socket != null) {
                _socket.close();
            }
            _socket = null;
            if (ex instanceof LocalException) {
                throw (LocalException) ex;
            } else {
                throw new SocketException(ex);
            }
        }
    }

    private synchronized void exception(LocalException ex) {
        if (_exception == null) {
            _exception = ex;
        }
    }

    private void setBufSize(int sz) {
        assert (_socket != null);

        //
        // Get property for buffer size if size not passed in.
        //
        if (sz == -1) {
            sz = _instance.properties().getPropertyAsIntWithDefault("Ice.UDP.SndSize", _size);
        }

        //
        // Check for sanity.
        //
        if (sz < (_udpOverhead + Protocol.headerSize)) {
            _instance
                .logger()
                .warning("Invalid Ice.UDP.SndSize value of " + sz + " adjusted to " + _size);
        } else if (sz != _size) {
            _newSize = sz;
        }

        //
        // Defer the actual modification of the buffer size to the helper thread.
        //
    }

    private void updateBufSize() {
        //
        // Must be called without any other threads holding the lock to the MulticastSocket!
        //

        try {
            //
            // Try to set the buffer size. The kernel will silently adjust the size to an acceptable
            // value. Then read the size back to get the size that was actually set.
            //
            _socket.setSendBufferSize(_newSize);
            _size = _socket.getSendBufferSize();

            //
            // Warn if the size that was set is less than the requested size and we have not already
            // warned.
            //
            if (_size < _newSize) {
                BufSizeWarnInfo winfo = _instance.getBufSizeWarn(UDPEndpointType.value);
                if (!winfo.sndWarn || winfo.sndSize != _newSize) {
                    _instance
                        .logger()
                        .warning(
                            "UDP send buffer size: requested size of "
                                + _newSize
                                + " adjusted to "
                                + _size);
                    _instance.setSndBufSizeWarn(UDPEndpointType.value, _newSize);
                }
            }
        } catch (IOException ex) {
            if (_socket != null) {
                _socket.close();
            }
            _socket = null;
            throw new SocketException(ex);
        }
    }

    @SuppressWarnings({"nofinalizer", "deprecation"})
    @Override
    protected synchronized void finalize() throws Throwable {
        try {
            Assert.FinalizerAssert(_socket == null);
        } catch (Exception ex) {} finally {
            super.finalize();
        }
    }

    private void runWriteThread() {
        try {
            DatagramPacket p = new DatagramPacket(new byte[0], 0);
            p.setSocketAddress(_addr);

            while (true) {
                MulticastSocket socket;
                Buffer buf;

                synchronized (this) {
                    //
                    // Wait until the socket is closed, an exception occurs, or we have something to
                    // write.
                    //
                    while (_socket != null && _exception == null && _buffers.isEmpty()) {
                        try {
                            wait();
                        } catch (InterruptedException ex) {
                            break;
                        }
                    }

                    if (_socket == null || _exception != null) {
                        break;
                    }

                    if (_newSize != -1) {
                        //
                        // Application must have called setBufferSize.
                        //
                        updateBufSize();
                        _newSize = -1;
                    }

                    socket = _socket;
                    buf = _buffers.removeFirst();
                }

                assert (buf != null);
                if (buf.b.hasRemaining()) {
                    byte[] arr;
                    int offset;
                    if (buf.b.hasArray()) {
                        arr = buf.b.array();
                        offset = buf.b.arrayOffset();
                    } else {
                        //
                        // If the buffer doesn't have a backing array, we'll have to make a copy of
                        // the data.
                        //
                        arr = new byte[buf.b.limit()];
                        offset = 0;
                        buf.b.get(arr);
                    }

                    p.setData(arr, offset, buf.b.limit());
                    socket.send(p);
                }

                synchronized (this) {
                    //
                    // After the write is complete, indicate whether we can accept more data.
                    //
                    _readyCallback.ready(SocketOperation.Write, !_buffers.isEmpty());
                }
            }
        } catch (IOException ex) {
            exception(new SocketException(ex));
        }
    }

    private final ProtocolInstance _instance;
    private final InetSocketAddress _addr;
    private final String _mcastInterface;

    private MulticastSocket _socket;
    private int _size;
    private int _newSize;

    //
    // The maximum IP datagram size is 65535. Subtract 20 bytes for the IP header and 8 bytes for
    // the UDP header to get the maximum payload.
    //
    private static final int _udpOverhead = 20 + 8;
    private static final int _maxPacketSize = 65535 - _udpOverhead;

    private Thread _thread;

    private final LinkedList<Buffer> _buffers = new LinkedList<>();

    private LocalException _exception;
    private ReadyCallback _readyCallback;
}