TransceiverI.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.IceBT;

import com.zeroc.Ice.Buffer;
import com.zeroc.Ice.ConnectFailedException;
import com.zeroc.Ice.EndpointI;
import com.zeroc.Ice.LocalException;
import com.zeroc.Ice.ReadyCallback;
import com.zeroc.Ice.SocketException;
import com.zeroc.Ice.SocketOperation;
import com.zeroc.Ice.Transceiver;

import android.bluetooth.BluetoothAdapter;
import android.bluetooth.BluetoothDevice;
import android.bluetooth.BluetoothSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.util.UUID;

final class TransceiverI implements Transceiver {
    @Override
    public SelectableChannel fd() {
        // Android doesn't provide non-blocking APIs for Bluetooth.
        return null;
    }

    @Override
    public void setReadyCallback(ReadyCallback callback) {
        _readyCallback = callback;
    }

    @Override
    public synchronized int initialize(Buffer readBuffer, Buffer writeBuffer) {
        if (_exception != null) {
            throw _exception;
            // throw (LocalException) _exception.fillInStackTrace();
        }

        if (_state == StateConnecting) {
            // Wait until the connect thread is finished.
            return SocketOperation.Read;
        } else if (_state == StateConnected) {
            // Update our Read state to indicate whether we still have more data waiting to be read.
            _readyCallback.ready(SocketOperation.Read, _readBuffer.b.position() > 0);
        }

        return SocketOperation.None;
    }

    @Override
    public int closing(boolean initiator, LocalException ex) {
        // If we are initiating the connection closure, wait for the peer to close the connection.
        // Otherwise, close immediately.
        return initiator ? SocketOperation.Read : SocketOperation.None;
    }

    @Override
    public void close() {
        Thread readThread = null, writeThread = null;

        synchronized (this) {
            // Close the socket first in order to interrupt the helper threads.
            if (_socket != null) {
                try {
                    _socket.close();
                } catch (IOException ex) {
                    // Ignore.
                }
                _socket = null;
            }

            readThread = _readThread;
            _readThread = null;
            writeThread = _writeThread;
            _writeThread = null;

            _state = StateClosed;

            if (writeThread != null) {
                notifyAll(); // Wake up the read/write threads.
            }
        }

        if (readThread != null) {
            try {
                readThread.join();
            } catch (InterruptedException ex) {
                // Ignore.
            }
        }

        if (writeThread != null) {
            try {
                writeThread.join();
            } catch (InterruptedException ex) {
                // Ignore.
            }
        }
    }

    @Override
    public EndpointI bind() {
        assert false;
        return null;
    }

    @Override
    public synchronized int write(Buffer buf) {
        if (_exception != null) {
            throw _exception;
            // throw (LocalException) _exception.fillInStackTrace();
        }

        // Accept up to _sndSize bytes in our internal buffer.
        final int capacity = _sndSize - _writeBuffer.b.position();
        if (capacity > 0) {
            final int num = Math.min(capacity, buf.b.remaining());
            _writeBuffer.expand(num);
            final int lim = buf.b.limit(); // Save the current limit.
            buf.limit(buf.b.position() + num); // Temporarily change the limit.
            _writeBuffer.b.put(buf.b); // Copy to our internal buffer.
            buf.limit(lim); // Restore the previous limit.

            notifyAll(); // We've added data to the internal buffer, so wake up the write thread.
        }

        return buf.b.hasRemaining() ? SocketOperation.Write : SocketOperation.None;
    }

    @Override
    public synchronized int read(Buffer buf) {
        if (_exception != null) {
            throw _exception;
            // throw (LocalException) _exception.fillInStackTrace();
        }

        // Copy the requested amount of data from our internal buffer to the given buffer.
        _readBuffer.b.flip();
        if (_readBuffer.b.hasRemaining()) {
            int bytesAvailable = _readBuffer.b.remaining();
            int bytesNeeded = buf.b.remaining();
            if (bytesAvailable > bytesNeeded) {
                bytesAvailable = bytesNeeded;
            }
            if (buf.b.hasArray()) {
                // Copy directly into the destination buffer's backing array.
                byte[] arr = buf.b.array();
                _readBuffer.b.get(arr, buf.b.arrayOffset() + buf.b.position(), bytesAvailable);
                buf.position(buf.b.position() + bytesAvailable);
            } else if (_readBuffer.b.hasArray()) {
                // Copy directly from the source buffer's backing array.
                byte[] arr = _readBuffer.b.array();
                buf.b.put(
                    arr,
                    _readBuffer.b.arrayOffset() + _readBuffer.b.position(),
                    bytesAvailable);
                _readBuffer.b.position(_readBuffer.b.position() + bytesAvailable);
            } else {
                // Copy using a temporary array.
                byte[] arr = new byte[bytesAvailable];
                _readBuffer.b.get(arr);
                buf.b.put(arr);
            }
        }
        _readBuffer.b.compact();

        // The read thread will temporarily stop reading if we exceed our configured limit.
        if (_readBuffer.b.position() < _rcvSize) {
            notifyAll();
        }

        // Update our Read state to indicate whether we still have more data waiting to be read.
        _readyCallback.ready(SocketOperation.Read, _readBuffer.b.position() > 0);

        return buf.b.hasRemaining() ? SocketOperation.Read : SocketOperation.None;
    }

    @Override
    public String protocol() {
        return _instance.protocol();
    }

    @Override
    public String toString() {
        return _desc;
    }

    @Override
    public String toDetailedString() {
        return toString();
    }

    @Override
    public com.zeroc.Ice.ConnectionInfo getInfo(
            boolean incoming, String adapterName, String connectionId) {
        assert incoming == (_adapterName != null);
        assert connectionId.equals(_connectionId);

        return new ConnectionInfo(
            incoming,
            adapterName,
            connectionId,
            _instance.bluetoothAdapter().getAddress(),
            -1, // localChannel - not available, use default value of -1
            _remoteAddr,
            -1, // remoteChannel - not available, use default value of -1
            _uuid,
            _rcvSize,
            _sndSize);
    }

    @Override
    public synchronized void setBufferSize(int rcvSize, int sndSize) {
        _rcvSize = Math.max(1024, rcvSize);
        _sndSize = Math.max(1024, sndSize);
    }

    @Override
    public void checkSendSize(Buffer buf) {}

    // Used by ConnectorI.
    TransceiverI(Instance instance, String remoteAddr, String uuid, String connectionId) {
        _instance = instance;
        _remoteAddr = remoteAddr;
        _uuid = uuid;
        _connectionId = connectionId;
        _state = StateConnecting;

        init();

        Thread connectThread =
            new Thread() {
                public void run() {
                    String name = "IceBT.ConnectThread";
                    if (_remoteAddr != null && !_remoteAddr.isEmpty()) {
                        name += "-" + _remoteAddr;
                    }
                    if (!_uuid.isEmpty()) {
                        name += "-" + _uuid;
                    }
                    setName(name);

                    runConnectThread();
                }
            };
        connectThread.setDaemon(true);
        connectThread.start();
    }

    // Used by AcceptorI.
    TransceiverI(Instance instance, BluetoothSocket socket, String uuid, String adapterName) {
        _instance = instance;
        _remoteAddr = socket.getRemoteDevice().getAddress();
        _uuid = uuid;
        _connectionId = "";
        _adapterName = adapterName;
        _socket = socket;
        _state = StateConnected;

        init();

        startReadWriteThreads();
    }

    private void init() {
        _desc = "local address = " + _instance.bluetoothAdapter().getAddress();
        if (_remoteAddr != null && !_remoteAddr.isEmpty()) {
            _desc += "\nremote address = " + _remoteAddr;
        }
        if (!_uuid.isEmpty()) {
            _desc += "\nservice uuid = " + _uuid;
        }

        final int defaultBufSize = 128 * 1024;
        _rcvSize =
            _instance.properties().getPropertyAsIntWithDefault("IceBT.RcvSize", defaultBufSize);
        _sndSize =
            _instance.properties().getPropertyAsIntWithDefault("IceBT.SndSize", defaultBufSize);

        _readBuffer = new Buffer(false);
        _writeBuffer = new Buffer(false);
    }

    private synchronized void exception(LocalException ex) {
        if (_exception == null) {
            _exception = ex;
        }
    }

    private void runConnectThread() {
        // Always cancel discovery prior to a connect attempt.
        _instance.bluetoothAdapter().cancelDiscovery();

        try {
            BluetoothAdapter adapter = _instance.bluetoothAdapter();
            assert (adapter != null);

            BluetoothDevice device = adapter.getRemoteDevice(_remoteAddr);

            // This can block for several seconds.
            BluetoothSocket socket =
                device.createRfcommSocketToServiceRecord(UUID.fromString(_uuid));
            socket.connect();

            synchronized (this) {
                if (_state == StateClosed) {
                    socket.close();
                    return;
                }

                // Connect succeeded.
                assert (_exception == null);
                _state = StateConnected;
                _socket = socket;
                startReadWriteThreads();
            }
        } catch (IOException ex) {
            exception(new ConnectFailedException(ex));
        } catch (Exception ex) {
            exception(new SocketException(ex));
        } finally {
            // This causes the Ice run time to invoke initialize() again.
            _readyCallback.ready(SocketOperation.Read, true);
        }
    }

    private void startReadWriteThreads() {
        String s = "";
        if (_remoteAddr != null && !_remoteAddr.isEmpty()) {
            s += "-" + _remoteAddr;
        }
        if (!_uuid.isEmpty()) {
            s += "-" + _uuid;
        }
        final String suffix = s;

        _readThread =
            new Thread() {
                public void run() {
                    setName("IceBT.ReadThread" + suffix);

                    runReadThread();
                }
            };
        _readThread.start();

        _writeThread =
            new Thread() {
                public void run() {
                    setName("IceBT.WriteThread" + suffix);

                    runWriteThread();
                }
            };
        _writeThread.start();
    }

    private void runReadThread() {
        InputStream in = null;

        try {
            byte[] buf = null;

            synchronized (this) {
                if (_socket == null) {
                    return;
                }
                in = _socket.getInputStream();

                buf = new byte[_rcvSize];
            }

            while (true) {
                synchronized (this) {
                    // If we've read too much data, wait until the application consumes some before
                    // we read again.
                    while (_state == StateConnected
                        && _exception == null
                        && _readBuffer.b.position() > _rcvSize) {
                        try {
                            wait();
                        } catch (InterruptedException ex) {
                            break;
                        }
                    }

                    if (_state != StateConnected || _exception != null) {
                        break;
                    }
                }

                int num = in.read(buf);
                if (num > 0) {
                    synchronized (this) {
                        _readBuffer.expand(num);
                        _readBuffer.b.put(buf, 0, num);
                        _readyCallback.ready(SocketOperation.Read, true);

                        if (buf.length != _rcvSize) {
                            // Application must have called setBufferSize.
                            buf = new byte[_rcvSize];
                        }
                    }
                }
            }
        } catch (IOException ex) {
            exception(new SocketException(ex));
            // Mark as ready for reading so that the Ice run time will invoke read() and we can
            // report the exception.
            _readyCallback.ready(SocketOperation.Read, true);
        } catch (LocalException ex) {
            exception(ex);
            // Mark as ready for reading so that the Ice run time will invoke read() and we can
            // report the exception.
            _readyCallback.ready(SocketOperation.Read, true);
        } finally {
            if (in != null) {
                try {
                    in.close();
                } catch (IOException ex) {
                    // Ignore.
                }
            }
        }
    }

    private void runWriteThread() {
        OutputStream out = null;

        try {
            synchronized (this) {
                if (_socket == null) {
                    return;
                }
                out = _socket.getOutputStream();
            }

            boolean done = false;
            while (!done) {
                ByteBuffer b = null;

                synchronized (this) {
                    while (_state == StateConnected
                        && _exception == null
                        && _writeBuffer.b.position() == 0) {
                        try {
                            wait();
                        } catch (InterruptedException ex) {
                            break;
                        }
                    }

                    if (_state != StateConnected || _exception != null) {
                        done = true;
                    }

                    b = _writeBuffer.b; // Adopt the ByteBuffer.
                    _writeBuffer.clear();
                }

                assert (b != null && b.hasArray());
                b.flip();
                if (b.hasRemaining() && !done) {
                    // write() blocks until all the data has been written.
                    out.write(b.array(), b.arrayOffset(), b.remaining());
                }

                // TODO: TBD: Recycle the buffer?

                synchronized (this) {
                    // After the write is complete, indicate whether we can accept more data.
                    _readyCallback.ready(
                        SocketOperation.Write, _writeBuffer.b.position() < _sndSize);
                }
            }
        } catch (IOException ex) {
            exception(new SocketException(ex));
        } finally {
            if (out != null) {
                try {
                    out.close();
                } catch (IOException ex) {
                    // Ignore.
                }
            }
        }
    }

    private final Instance _instance;
    private String _remoteAddr;
    private String _uuid;
    private String _connectionId;
    private String _adapterName;

    private BluetoothSocket _socket;

    private static final int StateConnecting = 0;
    private static final int StateConnected = 1;
    private static final int StateClosed = 2;
    private int _state;

    private Thread _readThread;
    private Thread _writeThread;

    private LocalException _exception;

    private int _rcvSize;
    private int _sndSize;

    private Buffer _readBuffer;
    private Buffer _writeBuffer;

    private String _desc;

    private ReadyCallback _readyCallback;
}