BatchRequestQueue.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import java.util.Optional;

class BatchRequestQueue {
    class BatchRequestI implements BatchRequest {
        public void reset(ObjectPrx proxy, String operation, int size) {
            _proxy = proxy;
            _operation = operation;
            _size = size;
        }

        @Override
        public void enqueue() {
            enqueueBatchRequest(_proxy);
        }

        @Override
        public ObjectPrx getProxy() {
            return _proxy;
        }

        @Override
        public String getOperation() {
            return _operation;
        }

        @Override
        public int getSize() {
            return _size;
        }

        private ObjectPrx _proxy;
        private String _operation;
        private int _size;
    }

    public BatchRequestQueue(Instance instance, boolean datagram) {
        InitializationData initData = instance.initializationData();
        _interceptor = initData.batchRequestInterceptor;
        _batchStreamInUse = false;
        _batchRequestNum = 0;
        _batchStream =
            new OutputStream(
                Protocol.currentProtocolEncoding,
                instance.defaultsAndOverrides().defaultFormat,
                instance.cacheMessageBuffers() > 1);
        _batchStream.writeBlob(Protocol.requestBatchHdr);
        _batchMarker = _batchStream.size();
        _batchCompress = false;
        _request = new BatchRequestI();

        _maxSize = instance.batchAutoFlushSize();
        if (_maxSize > 0 && datagram) {
            int udpSndSize =
                initData.properties.getPropertyAsIntWithDefault(
                    "Ice.UDP.SndSize", 65535 - _udpOverhead);
            if (udpSndSize < _maxSize) {
                _maxSize = udpSndSize;
            }
        }
    }

    public synchronized void prepareBatchRequest(OutputStream os) {
        if (_exception != null) {
            throw (LocalException) _exception.fillInStackTrace();
        }

        waitStreamInUse(false);
        _batchStreamInUse = true;
        _batchStream.swap(os);
    }

    public void finishBatchRequest(OutputStream os, ObjectPrx proxy, String operation) {
        //
        // No need for synchronization, no other threads are supposed to modify the queue since we
        // set _batchStreamInUse to true.
        //
        assert _batchStreamInUse;
        _batchStream.swap(os);

        try {
            _batchStreamCanFlush =
                true; // Allow flush to proceed even if the stream is marked in use.

            if (_maxSize > 0 && _batchStream.size() >= _maxSize) {
                proxy.ice_flushBatchRequestsAsync(); // Auto flush
            }

            assert (_batchMarker < _batchStream.size());
            if (_interceptor != null) {
                _request.reset(proxy, operation, _batchStream.size() - _batchMarker);
                _interceptor.enqueue(_request, _batchRequestNum, _batchMarker);
            } else {
                Optional<Boolean> compress = proxy._getReference().getCompressOverride();
                if (compress.isPresent()) {
                    _batchCompress |= compress.get();
                }
                _batchMarker = _batchStream.size();
                ++_batchRequestNum;
            }
        } finally {
            synchronized (this) {
                _batchStream.resize(_batchMarker);
                _batchStreamInUse = false;
                _batchStreamCanFlush = false;
                notifyAll();
            }
        }
    }

    public synchronized void abortBatchRequest(OutputStream os) {
        if (_batchStreamInUse) {
            _batchStream.swap(os);
            _batchStream.resize(_batchMarker);
            _batchStreamInUse = false;
            notifyAll();
        }
    }

    public class SwapResult {
        public int batchRequestNum;
        public boolean compress;
    }

    public synchronized SwapResult swap(OutputStream os) {
        if (_batchRequestNum == 0) {
            return null;
        }

        waitStreamInUse(true);

        byte[] lastRequest = null;
        if (_batchMarker < _batchStream.size()) {
            lastRequest = new byte[_batchStream.size() - _batchMarker];
            Buffer buffer = _batchStream.getBuffer();
            buffer.position(_batchMarker);
            buffer.b.get(lastRequest);
            _batchStream.resize(_batchMarker);
        }

        SwapResult result = new SwapResult();
        result.batchRequestNum = _batchRequestNum;
        result.compress = _batchCompress;
        _batchStream.swap(os);

        //
        // Reset the batch.
        //
        _batchRequestNum = 0;
        _batchCompress = false;
        _batchStream.writeBlob(Protocol.requestBatchHdr);
        _batchMarker = _batchStream.size();
        if (lastRequest != null) {
            _batchStream.writeBlob(lastRequest);
        }
        return result;
    }

    public synchronized void destroy(LocalException ex) {
        _exception = ex;
    }

    private void waitStreamInUse(boolean flush) {
        //
        // This is similar to a mutex lock in that the stream is
        // only "locked" while marshaling. As such we don't permit the wait
        // to be interrupted. Instead the interrupted status is saved and
        // restored.
        //
        boolean interrupted = false;
        while (_batchStreamInUse && !(flush && _batchStreamCanFlush)) {
            try {
                wait();
            } catch (InterruptedException ex) {
                interrupted = true;
            }
        }
        //
        // Restore the interrupted flag if we were interrupted.
        //
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    private void enqueueBatchRequest(ObjectPrx proxy) {
        assert (_batchMarker < _batchStream.size());
        Optional<Boolean> compress = proxy._getReference().getCompressOverride();
        if (compress.isPresent()) {
            _batchCompress |= compress.get();
        }
        _batchMarker = _batchStream.size();
        ++_batchRequestNum;
    }

    private final BatchRequestInterceptor _interceptor;
    private final OutputStream _batchStream;
    private boolean _batchStreamInUse;
    private boolean _batchStreamCanFlush;
    private int _batchRequestNum;
    private int _batchMarker;
    private boolean _batchCompress;
    private final BatchRequestI _request;
    private LocalException _exception;
    private int _maxSize;

    private static final int _udpOverhead = 20 + 8;
}