< Summary

Information
Class: Ice.Internal.CollocatedRequestHandler
Assembly: Ice
File(s): /home/runner/work/ice/ice/csharp/src/Ice/Internal/CollocatedRequestHandler.cs
Tag: 71_18251537082
Line coverage
90%
Covered lines: 147
Uncovered lines: 16
Coverable lines: 163
Total lines: 349
Line coverage: 90.1%
Branch coverage
92%
Covered branches: 65
Total branches: 70
Branch coverage: 92.8%
Method coverage
91%
Covered methods: 11
Total methods: 12
Method coverage: 91.6%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
fillInValue(...)100%11100%
.ctor(...)100%11100%
sendAsyncRequest(...)100%11100%
asyncRequestCanceled(...)85.71%14.031494.74%
getConnection()100%11100%
invokeAsyncRequest(...)100%12.131290.32%
sentAsync(...)100%44100%
dispatchAll(...)100%12.181289.29%
dispatchAsync()100%1.01178.57%
sendResponse(...)100%1414100%
dispatchException(...)100%210%
handleException(...)70%11.231076.92%

File(s)

/home/runner/work/ice/ice/csharp/src/Ice/Internal/CollocatedRequestHandler.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4
 5namespace Ice.Internal;
 6
 7public class CollocatedRequestHandler : RequestHandler
 8{
 19    private static void fillInValue(Ice.OutputStream os, int pos, int value) => os.rewriteInt(value, pos);
 10
 111    internal CollocatedRequestHandler(Reference reference, Ice.ObjectAdapter adapter)
 12    {
 113        _reference = reference;
 114        _executor = _reference.getInstance().initializationData().executor != null;
 115        _response = _reference.isTwoway;
 116        _adapter = adapter;
 17
 118        _logger = _reference.getInstance().initializationData().logger; // Cached for better performance.
 119        _traceLevels = _reference.getInstance().traceLevels(); // Cached for better performance.
 120        _requestId = 0;
 121    }
 22
 123    public int sendAsyncRequest(ProxyOutgoingAsyncBase outAsync) => outAsync.invokeCollocated(this);
 24
 25    public void asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex)
 26    {
 127        lock (_mutex)
 28        {
 129            if (_sendAsyncRequests.TryGetValue(outAsync, out int requestId))
 30            {
 131                if (requestId > 0)
 32                {
 133                    _asyncRequests.Remove(requestId);
 34                }
 135                _sendAsyncRequests.Remove(outAsync);
 136                if (outAsync.exception(ex))
 37                {
 138                    outAsync.invokeExceptionAsync();
 39                }
 140                _adapter.decDirectCount(); // invokeAll won't be called, decrease the direct count.
 141                return;
 42            }
 143            if (outAsync is OutgoingAsync)
 44            {
 145                var o = (OutgoingAsync)outAsync;
 46                Debug.Assert(o != null);
 147                foreach (KeyValuePair<int, OutgoingAsyncBase> e in _asyncRequests)
 48                {
 149                    if (e.Value == o)
 50                    {
 151                        _asyncRequests.Remove(e.Key);
 152                        if (outAsync.exception(ex))
 53                        {
 154                            outAsync.invokeExceptionAsync();
 55                        }
 156                        return;
 57                    }
 58                }
 59            }
 060        }
 161    }
 62
 163    public ConnectionI getConnection() => null;
 64
 65    internal int invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestCount, bool synchronous)
 66    {
 67        //
 68        // Increase the direct count to prevent the thread pool from being destroyed before
 69        // invokeAll is called. This will also throw if the object adapter has been destroyed.
 70        //
 171        _adapter.incDirectCount();
 72
 173        int requestId = 0;
 74        try
 75        {
 176            lock (_mutex)
 77            {
 178                outAsync.cancelable(this); // This will throw if the request is canceled
 79
 180                if (_response)
 81                {
 182                    requestId = ++_requestId;
 183                    _asyncRequests.Add(requestId, outAsync);
 84                }
 85
 186                _sendAsyncRequests.Add(outAsync, requestId);
 187            }
 88
 189            outAsync.attachCollocatedObserver(_adapter, requestId);
 190            if (!synchronous || !_response || _reference.getInvocationTimeout() > TimeSpan.Zero)
 91            {
 92                // Don't invoke from the user thread if async or invocation timeout is set
 193                _adapter.getThreadPool().execute(
 194                    () =>
 195                    {
 196                        if (sentAsync(outAsync))
 197                        {
 198                            dispatchAll(outAsync.getOs(), requestId, batchRequestCount);
 199                        }
 1100                    },
 1101                    null);
 102            }
 1103            else if (_executor)
 104            {
 1105                _adapter.getThreadPool().executeFromThisThread(
 1106                    () =>
 1107                    {
 1108                        if (sentAsync(outAsync))
 1109                        {
 1110                            dispatchAll(outAsync.getOs(), requestId, batchRequestCount);
 1111                        }
 1112                    },
 1113                    null);
 114            }
 115            else // Optimization: directly call invokeAll if there's no executor.
 116            {
 1117                if (sentAsync(outAsync))
 118                {
 1119                    dispatchAll(outAsync.getOs(), requestId, batchRequestCount);
 120                }
 121            }
 1122        }
 0123        catch
 124        {
 125            // Decrement the direct count if any exception is thrown synchronously.
 0126            _adapter.decDirectCount();
 0127            throw;
 128        }
 1129        return OutgoingAsyncBase.AsyncStatusQueued;
 130    }
 131
 132    private bool sentAsync(OutgoingAsyncBase outAsync)
 133    {
 1134        lock (_mutex)
 135        {
 1136            if (!_sendAsyncRequests.Remove(outAsync))
 137            {
 1138                return false; // The request timed-out.
 139            }
 140
 1141            if (!outAsync.sent())
 142            {
 1143                return true;
 144            }
 1145        }
 1146        outAsync.invokeSent();
 1147        return true;
 1148    }
 149
 150    private void dispatchAll(Ice.OutputStream os, int requestId, int requestCount)
 151    {
 1152        if (_traceLevels.protocol >= 1)
 153        {
 1154            fillInValue(os, 10, os.size());
 1155            if (requestId > 0)
 156            {
 1157                fillInValue(os, Protocol.headerSize, requestId);
 158            }
 1159            else if (requestCount > 0)
 160            {
 1161                fillInValue(os, Protocol.headerSize, requestCount);
 162            }
 1163            TraceUtil.traceSend(os, _reference.getInstance(), connection: null, _logger, _traceLevels);
 164        }
 165
 1166        var iss = new Ice.InputStream(_reference.getInstance(), os.getEncoding(), os.getBuffer(), false);
 167
 1168        if (requestCount > 0)
 169        {
 1170            iss.pos(Protocol.requestBatchHdr.Length);
 171        }
 172        else
 173        {
 1174            iss.pos(Protocol.requestHdr.Length);
 175        }
 176
 1177        int dispatchCount = requestCount > 0 ? requestCount : 1;
 178        Debug.Assert(!_response || dispatchCount == 1);
 179
 1180        Object dispatcher = _adapter.dispatchPipeline;
 181        try
 182        {
 1183            while (dispatchCount > 0)
 184            {
 185                // Increase the direct count for the dispatch. We increase it again here for
 186                // each dispatch. It's important for the direct count to be > 0 until the last
 187                // collocated request response is sent to make sure the thread pool isn't
 188                // destroyed before. It's decremented when processing the response.
 189                try
 190                {
 1191                    _adapter.incDirectCount();
 1192                }
 1193                catch (Ice.ObjectAdapterDestroyedException ex)
 194                {
 1195                    handleException(ex, requestId, amd: false);
 1196                    break;
 197                }
 198
 1199                var request = new IncomingRequest(requestId, connection: null, _adapter, iss);
 200                // See comment in ConnectionI
 1201                _ = dispatchAsync(request);
 1202                --dispatchCount;
 203            }
 1204        }
 0205        catch (Ice.LocalException ex)
 206        {
 0207            dispatchException(ex, requestId, amd: false); // Fatal invocation exception
 0208        }
 209
 1210        _adapter.decDirectCount();
 211
 212        async Task dispatchAsync(IncomingRequest request)
 213        {
 1214            bool amd = false;
 215
 216            try
 217            {
 218                OutgoingResponse response;
 219
 220                try
 221                {
 1222                    ValueTask<OutgoingResponse> valueTask = dispatcher.dispatchAsync(request);
 1223                    amd = !valueTask.IsCompleted;
 1224                    response = await valueTask.ConfigureAwait(false);
 1225                }
 1226                catch (System.Exception ex)
 227                {
 1228                    response = request.current.createOutgoingResponse(ex);
 1229                }
 230
 1231                sendResponse(response, requestId, amd);
 1232            }
 0233            catch (Ice.LocalException ex) // TODO: catch all exceptions to avoid UnobservedTaskException
 234            {
 0235                dispatchException(ex, requestId, amd);
 0236            }
 1237        }
 1238    }
 239
 240    private void sendResponse(OutgoingResponse response, int requestId, bool amd)
 241    {
 1242        if (_response)
 243        {
 244            OutgoingAsyncBase outAsync;
 1245            OutputStream outputStream = response.outputStream;
 1246            lock (_mutex)
 247            {
 1248                if (_traceLevels.protocol >= 1)
 249                {
 1250                    fillInValue(outputStream, 10, outputStream.size());
 251                }
 252
 253                // Adopt the OutputStream's buffer.
 1254                var inputStream = new InputStream(
 1255                    _reference.getInstance(),
 1256                    outputStream.getEncoding(),
 1257                    outputStream.getBuffer(),
 1258                    adopt: true);
 259
 1260                inputStream.pos(Protocol.replyHdr.Length + 4);
 261
 1262                if (_traceLevels.protocol >= 1)
 263                {
 1264                    TraceUtil.traceRecv(inputStream, connection: null, _logger, _traceLevels);
 265                }
 266
 1267                if (_asyncRequests.TryGetValue(requestId, out outAsync))
 268                {
 1269                    outAsync.getIs().swap(inputStream);
 1270                    if (!outAsync.response())
 271                    {
 1272                        outAsync = null;
 273                    }
 1274                    _asyncRequests.Remove(requestId);
 275                }
 1276            }
 277
 1278            if (outAsync is not null)
 279            {
 1280                if (amd)
 281                {
 1282                    outAsync.invokeResponseAsync();
 283                }
 284                else
 285                {
 1286                    outAsync.invokeResponse();
 287                }
 288            }
 289        }
 1290        _adapter.decDirectCount();
 1291    }
 292
 293    private void dispatchException(Ice.LocalException ex, int requestId, bool amd)
 294    {
 0295        handleException(ex, requestId, amd);
 0296        _adapter.decDirectCount();
 0297    }
 298
 299    private void handleException(Ice.Exception ex, int requestId, bool amd)
 300    {
 1301        if (!_response)
 302        {
 0303            return; // Ignore exception for oneway proxies.
 304        }
 305
 306        OutgoingAsyncBase outAsync;
 1307        lock (_mutex)
 308        {
 1309            if (_asyncRequests.TryGetValue(requestId, out outAsync))
 310            {
 1311                if (!outAsync.exception(ex))
 312                {
 0313                    outAsync = null;
 314                }
 1315                _asyncRequests.Remove(requestId);
 316            }
 1317        }
 318
 1319        if (outAsync != null)
 320        {
 321            //
 322            // If called from an AMD dispatch, invoke asynchronously
 323            // the completion callback since this might be called from
 324            // the user code.
 325            //
 1326            if (amd)
 327            {
 0328                outAsync.invokeExceptionAsync();
 329            }
 330            else
 331            {
 1332                outAsync.invokeException();
 333            }
 334        }
 1335    }
 336
 337    private readonly Reference _reference;
 338    private readonly bool _executor;
 339    private readonly bool _response;
 340    private readonly Ice.ObjectAdapter _adapter;
 341    private readonly Ice.Logger _logger;
 342    private readonly TraceLevels _traceLevels;
 343
 344    private int _requestId;
 345
 1346    private readonly Dictionary<OutgoingAsyncBase, int> _sendAsyncRequests = new Dictionary<OutgoingAsyncBase, int>();
 1347    private readonly Dictionary<int, OutgoingAsyncBase> _asyncRequests = new Dictionary<int, OutgoingAsyncBase>();
 1348    private readonly object _mutex = new();
 349}