ThreadPoolWorkQueue.java

// Copyright (c) ZeroC, Inc.

package com.zeroc.Ice;

import java.nio.channels.SelectableChannel;
import java.util.LinkedList;

final class ThreadPoolWorkQueue extends EventHandler {
    ThreadPoolWorkQueue(Instance instance, ThreadPool threadPool, Selector selector) {
        _threadPool = threadPool;
        _selector = selector;
        _destroyed = false;
        _registered = SocketOperation.Read;
    }

    @SuppressWarnings({"nofinalizer", "deprecation"})
    @Override
    protected synchronized void finalize() throws Throwable {
        try {
            Assert.FinalizerAssert(_destroyed);
        } catch (Exception ex) {} finally {
            super.finalize();
        }
    }

    void destroy() {
        // Called with the thread pool locked
        assert (!_destroyed);
        _destroyed = true;
        _selector.ready(this, SocketOperation.Read, true);
    }

    void queue(ThreadPoolWorkItem item) {
        // Called with the thread pool locked
        assert (item != null);
        _workItems.add(item);
        if (_workItems.size() == 1) {
            _selector.ready(this, SocketOperation.Read, true);
        }
    }

    @Override
    public void message(ThreadPoolCurrent current) {
        ThreadPoolWorkItem workItem = null;
        synchronized (_threadPool) {
            if (!_workItems.isEmpty()) {
                workItem = _workItems.removeFirst();
                assert (workItem != null);
            }
            if (_workItems.isEmpty() && !_destroyed) {
                _selector.ready(this, SocketOperation.Read, false);
            }
        }

        if (workItem != null) {
            workItem.execute(current);
        } else {
            assert _destroyed;
            _threadPool.ioCompleted(current);
            throw new ThreadPool.DestroyedException();
        }
    }

    @Override
    public void finished(ThreadPoolCurrent current, boolean close) {
        assert false;
    }

    @Override
    public String toString() {
        return "work queue";
    }

    @Override
    public SelectableChannel fd() {
        return null;
    }

    @Override
    public void setReadyCallback(ReadyCallback callback) {
        // Ignore, we don't use the ready callback.
    }

    private final ThreadPool _threadPool;
    private boolean _destroyed;
    private final Selector _selector;
    private final LinkedList<ThreadPoolWorkItem> _workItems = new LinkedList<>();
}