< Summary

Information
Class: Ice.Internal.BatchRequestQueue
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/Internal/BatchRequestQueue.cs
Tag: 71_18251537082
Line coverage
89%
Covered lines: 82
Uncovered lines: 10
Coverable lines: 92
Total lines: 222
Line coverage: 89.1%
Branch coverage
83%
Covered branches: 25
Total branches: 30
Branch coverage: 83.3%
Method coverage
87%
Covered methods: 7
Total methods: 8
Method coverage: 87.5%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%44100%
prepareBatchRequest(...)50%2.01287.5%
finishBatchRequest(...)100%88100%
abortBatchRequest(...)0%620%
swap(...)100%66100%
destroy(...)100%11100%
waitStreamInUse(...)66.67%7.33666.67%
enqueueBatchRequest(...)100%22100%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/Internal/BatchRequestQueue.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4
 5namespace Ice.Internal;
 6
 7internal sealed class BatchRequestI : BatchRequest
 8{
 9    internal BatchRequestI(BatchRequestQueue queue) => _queue = queue;
 10
 11    internal void reset(ObjectPrx proxy, string operation, int size)
 12    {
 13        _proxy = proxy;
 14        _operation = operation;
 15        _size = size;
 16    }
 17
 18    public void enqueue() => _queue.enqueueBatchRequest(_proxy);
 19
 20    public ObjectPrx getProxy() => _proxy;
 21
 22    public string getOperation() => _operation;
 23
 24    public int getSize() => _size;
 25
 26    private readonly BatchRequestQueue _queue;
 27    private ObjectPrx _proxy;
 28    private string _operation;
 29    private int _size;
 30}
 31
 32internal sealed class BatchRequestQueue
 33{
 134    internal BatchRequestQueue(Instance instance, bool datagram)
 35    {
 136        InitializationData initData = instance.initializationData();
 137        _interceptor = initData.batchRequestInterceptor;
 138        _batchStreamInUse = false;
 139        _batchRequestNum = 0;
 140        _batchStream =
 141            new OutputStream(Ice.Util.currentProtocolEncoding, instance.defaultsAndOverrides().defaultFormat);
 142        _batchStream.writeBlob(Protocol.requestBatchHdr);
 143        _batchMarker = _batchStream.size();
 144        _request = new BatchRequestI(this);
 45
 146        _maxSize = instance.batchAutoFlushSize();
 147        if (_maxSize > 0 && datagram)
 48        {
 149            int udpSndSize = initData.properties.getPropertyAsIntWithDefault(
 150                "Ice.UDP.SndSize",
 151                65535 - _udpOverhead);
 152            if (udpSndSize < _maxSize)
 53            {
 154                _maxSize = udpSndSize;
 55            }
 56        }
 157    }
 58
 59    internal void prepareBatchRequest(OutputStream os)
 60    {
 161        lock (_mutex)
 62        {
 163            if (_exception != null)
 64            {
 065                throw _exception;
 66            }
 167            waitStreamInUse(false);
 168            _batchStreamInUse = true;
 169            _batchStream.swap(os);
 170        }
 171    }
 72
 73    internal void finishBatchRequest(OutputStream os, ObjectPrx proxy, string operation)
 74    {
 75        //
 76        // No need for synchronization, no other threads are supposed
 77        // to modify the queue since we set _batchStreamInUse to true.
 78        //
 79        Debug.Assert(_batchStreamInUse);
 180        _batchStream.swap(os);
 81
 82        try
 83        {
 184            _batchStreamCanFlush = true; // Allow flush to proceed even if the stream is marked in use.
 85
 186            if (_maxSize > 0 && _batchStream.size() >= _maxSize)
 87            {
 188                _ = proxy.ice_flushBatchRequestsAsync(); // Auto flush
 89            }
 90
 91            Debug.Assert(_batchMarker < _batchStream.size());
 192            if (_interceptor != null)
 93            {
 194                _request.reset(proxy, operation, _batchStream.size() - _batchMarker);
 195                _interceptor(_request, _batchRequestNum, _batchMarker);
 96            }
 97            else
 98            {
 199                bool? compress = ((ObjectPrxHelperBase)proxy).iceReference().getCompressOverride();
 1100                if (compress is not null)
 101                {
 1102                    _batchCompress |= compress.Value;
 103                }
 1104                _batchMarker = _batchStream.size();
 1105                ++_batchRequestNum;
 106            }
 1107        }
 108        finally
 109        {
 1110            lock (_mutex)
 111            {
 1112                _batchStream.resize(_batchMarker);
 1113                _batchStreamInUse = false;
 1114                _batchStreamCanFlush = false;
 1115                Monitor.PulseAll(_mutex);
 1116            }
 1117        }
 1118    }
 119
 120    internal void abortBatchRequest(OutputStream os)
 121    {
 0122        lock (_mutex)
 123        {
 0124            if (_batchStreamInUse)
 125            {
 0126                _batchStream.swap(os);
 0127                _batchStream.resize(_batchMarker);
 0128                _batchStreamInUse = false;
 0129                Monitor.PulseAll(_mutex);
 130            }
 0131        }
 0132    }
 133
 134    internal int swap(OutputStream os, out bool compress)
 135    {
 1136        lock (_mutex)
 137        {
 1138            if (_batchRequestNum == 0)
 139            {
 1140                compress = false;
 1141                return 0;
 142            }
 143
 1144            waitStreamInUse(true);
 145
 1146            byte[] lastRequest = null;
 1147            if (_batchMarker < _batchStream.size())
 148            {
 1149                lastRequest = new byte[_batchStream.size() - _batchMarker];
 1150                Buffer buffer = _batchStream.getBuffer();
 1151                buffer.b.position(_batchMarker);
 1152                buffer.b.get(lastRequest);
 1153                _batchStream.resize(_batchMarker);
 154            }
 155
 1156            int requestNum = _batchRequestNum;
 1157            compress = _batchCompress;
 1158            _batchStream.swap(os);
 159
 160            //
 161            // Reset the batch.
 162            //
 1163            _batchRequestNum = 0;
 1164            _batchCompress = false;
 1165            _batchStream.writeBlob(Protocol.requestBatchHdr);
 1166            _batchMarker = _batchStream.size();
 1167            if (lastRequest != null)
 168            {
 1169                _batchStream.writeBlob(lastRequest);
 170            }
 1171            return requestNum;
 172        }
 1173    }
 174
 175    internal void destroy(LocalException ex)
 176    {
 1177        lock (_mutex)
 178        {
 1179            _exception = ex;
 1180        }
 1181    }
 182
 183    private void waitStreamInUse(bool flush)
 184    {
 185        //
 186        // This is similar to a mutex lock in that the stream is
 187        // only "locked" while marshaling. As such we don't permit the wait
 188        // to be interrupted. Instead the interrupted status is saved and
 189        // restored.
 190        //
 1191        while (_batchStreamInUse && !(flush && _batchStreamCanFlush))
 192        {
 0193            Monitor.Wait(_mutex);
 194        }
 1195    }
 196
 197    internal void enqueueBatchRequest(ObjectPrx proxy)
 198    {
 199        Debug.Assert(_batchMarker < _batchStream.size());
 1200        bool? compress = ((ObjectPrxHelperBase)proxy).iceReference().getCompressOverride();
 1201        if (compress is not null)
 202        {
 1203            _batchCompress |= compress.Value;
 204        }
 1205        _batchMarker = _batchStream.size();
 1206        ++_batchRequestNum;
 1207    }
 208
 1209    private readonly object _mutex = new();
 210
 211    private readonly System.Action<BatchRequest, int, int> _interceptor;
 212    private readonly OutputStream _batchStream;
 213    private bool _batchStreamInUse;
 214    private bool _batchStreamCanFlush;
 215    private int _batchRequestNum;
 216    private int _batchMarker;
 217    private bool _batchCompress;
 218    private readonly BatchRequestI _request;
 219    private LocalException _exception;
 220    private readonly int _maxSize;
 221    private const int _udpOverhead = 20 + 8;
 222}