< Summary

Information
Class: Ice.Internal.BatchRequestI
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/Internal/BatchRequestQueue.cs
Tag: 71_18251537082
Line coverage
100%
Covered lines: 9
Uncovered lines: 0
Coverable lines: 9
Total lines: 222
Line coverage: 100%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage
100%
Covered methods: 6
Total methods: 6
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
reset(...)100%11100%
enqueue()100%11100%
getProxy()100%11100%
getOperation()100%11100%
getSize()100%11100%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/Internal/BatchRequestQueue.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4
 5namespace Ice.Internal;
 6
 7internal sealed class BatchRequestI : BatchRequest
 8{
 19    internal BatchRequestI(BatchRequestQueue queue) => _queue = queue;
 10
 11    internal void reset(ObjectPrx proxy, string operation, int size)
 12    {
 113        _proxy = proxy;
 114        _operation = operation;
 115        _size = size;
 116    }
 17
 118    public void enqueue() => _queue.enqueueBatchRequest(_proxy);
 19
 120    public ObjectPrx getProxy() => _proxy;
 21
 122    public string getOperation() => _operation;
 23
 124    public int getSize() => _size;
 25
 26    private readonly BatchRequestQueue _queue;
 27    private ObjectPrx _proxy;
 28    private string _operation;
 29    private int _size;
 30}
 31
 32internal sealed class BatchRequestQueue
 33{
 34    internal BatchRequestQueue(Instance instance, bool datagram)
 35    {
 36        InitializationData initData = instance.initializationData();
 37        _interceptor = initData.batchRequestInterceptor;
 38        _batchStreamInUse = false;
 39        _batchRequestNum = 0;
 40        _batchStream =
 41            new OutputStream(Ice.Util.currentProtocolEncoding, instance.defaultsAndOverrides().defaultFormat);
 42        _batchStream.writeBlob(Protocol.requestBatchHdr);
 43        _batchMarker = _batchStream.size();
 44        _request = new BatchRequestI(this);
 45
 46        _maxSize = instance.batchAutoFlushSize();
 47        if (_maxSize > 0 && datagram)
 48        {
 49            int udpSndSize = initData.properties.getPropertyAsIntWithDefault(
 50                "Ice.UDP.SndSize",
 51                65535 - _udpOverhead);
 52            if (udpSndSize < _maxSize)
 53            {
 54                _maxSize = udpSndSize;
 55            }
 56        }
 57    }
 58
 59    internal void prepareBatchRequest(OutputStream os)
 60    {
 61        lock (_mutex)
 62        {
 63            if (_exception != null)
 64            {
 65                throw _exception;
 66            }
 67            waitStreamInUse(false);
 68            _batchStreamInUse = true;
 69            _batchStream.swap(os);
 70        }
 71    }
 72
 73    internal void finishBatchRequest(OutputStream os, ObjectPrx proxy, string operation)
 74    {
 75        //
 76        // No need for synchronization, no other threads are supposed
 77        // to modify the queue since we set _batchStreamInUse to true.
 78        //
 79        Debug.Assert(_batchStreamInUse);
 80        _batchStream.swap(os);
 81
 82        try
 83        {
 84            _batchStreamCanFlush = true; // Allow flush to proceed even if the stream is marked in use.
 85
 86            if (_maxSize > 0 && _batchStream.size() >= _maxSize)
 87            {
 88                _ = proxy.ice_flushBatchRequestsAsync(); // Auto flush
 89            }
 90
 91            Debug.Assert(_batchMarker < _batchStream.size());
 92            if (_interceptor != null)
 93            {
 94                _request.reset(proxy, operation, _batchStream.size() - _batchMarker);
 95                _interceptor(_request, _batchRequestNum, _batchMarker);
 96            }
 97            else
 98            {
 99                bool? compress = ((ObjectPrxHelperBase)proxy).iceReference().getCompressOverride();
 100                if (compress is not null)
 101                {
 102                    _batchCompress |= compress.Value;
 103                }
 104                _batchMarker = _batchStream.size();
 105                ++_batchRequestNum;
 106            }
 107        }
 108        finally
 109        {
 110            lock (_mutex)
 111            {
 112                _batchStream.resize(_batchMarker);
 113                _batchStreamInUse = false;
 114                _batchStreamCanFlush = false;
 115                Monitor.PulseAll(_mutex);
 116            }
 117        }
 118    }
 119
 120    internal void abortBatchRequest(OutputStream os)
 121    {
 122        lock (_mutex)
 123        {
 124            if (_batchStreamInUse)
 125            {
 126                _batchStream.swap(os);
 127                _batchStream.resize(_batchMarker);
 128                _batchStreamInUse = false;
 129                Monitor.PulseAll(_mutex);
 130            }
 131        }
 132    }
 133
 134    internal int swap(OutputStream os, out bool compress)
 135    {
 136        lock (_mutex)
 137        {
 138            if (_batchRequestNum == 0)
 139            {
 140                compress = false;
 141                return 0;
 142            }
 143
 144            waitStreamInUse(true);
 145
 146            byte[] lastRequest = null;
 147            if (_batchMarker < _batchStream.size())
 148            {
 149                lastRequest = new byte[_batchStream.size() - _batchMarker];
 150                Buffer buffer = _batchStream.getBuffer();
 151                buffer.b.position(_batchMarker);
 152                buffer.b.get(lastRequest);
 153                _batchStream.resize(_batchMarker);
 154            }
 155
 156            int requestNum = _batchRequestNum;
 157            compress = _batchCompress;
 158            _batchStream.swap(os);
 159
 160            //
 161            // Reset the batch.
 162            //
 163            _batchRequestNum = 0;
 164            _batchCompress = false;
 165            _batchStream.writeBlob(Protocol.requestBatchHdr);
 166            _batchMarker = _batchStream.size();
 167            if (lastRequest != null)
 168            {
 169                _batchStream.writeBlob(lastRequest);
 170            }
 171            return requestNum;
 172        }
 173    }
 174
 175    internal void destroy(LocalException ex)
 176    {
 177        lock (_mutex)
 178        {
 179            _exception = ex;
 180        }
 181    }
 182
 183    private void waitStreamInUse(bool flush)
 184    {
 185        //
 186        // This is similar to a mutex lock in that the stream is
 187        // only "locked" while marshaling. As such we don't permit the wait
 188        // to be interrupted. Instead the interrupted status is saved and
 189        // restored.
 190        //
 191        while (_batchStreamInUse && !(flush && _batchStreamCanFlush))
 192        {
 193            Monitor.Wait(_mutex);
 194        }
 195    }
 196
 197    internal void enqueueBatchRequest(ObjectPrx proxy)
 198    {
 199        Debug.Assert(_batchMarker < _batchStream.size());
 200        bool? compress = ((ObjectPrxHelperBase)proxy).iceReference().getCompressOverride();
 201        if (compress is not null)
 202        {
 203            _batchCompress |= compress.Value;
 204        }
 205        _batchMarker = _batchStream.size();
 206        ++_batchRequestNum;
 207    }
 208
 209    private readonly object _mutex = new();
 210
 211    private readonly System.Action<BatchRequest, int, int> _interceptor;
 212    private readonly OutputStream _batchStream;
 213    private bool _batchStreamInUse;
 214    private bool _batchStreamCanFlush;
 215    private int _batchRequestNum;
 216    private int _batchMarker;
 217    private bool _batchCompress;
 218    private readonly BatchRequestI _request;
 219    private LocalException _exception;
 220    private readonly int _maxSize;
 221    private const int _udpOverhead = 20 + 8;
 222}