Ice 3.8
C++ API Reference
Loading...
Searching...
No Matches
OutgoingAsync.h
1// Copyright (c) ZeroC, Inc.
2
3#ifndef ICE_OUTGOING_ASYNC_H
4#define ICE_OUTGOING_ASYNC_H
5
6#include "CommunicatorF.h"
7#include "ConnectionF.h"
8#include "ConnectionIF.h"
9#include "InputStream.h"
10#include "LocalExceptions.h"
11#include "ObjectAdapterF.h"
12#include "ObserverHelper.h"
13#include "OutputStream.h"
14#include "Proxy.h"
15#include "RequestHandlerF.h"
16#include "TimerTask.h"
17
18#include <cassert>
19#include <exception>
20#include <string_view>
21
22#if defined(__clang__)
23# pragma clang diagnostic push
24// See #2747
25# pragma clang diagnostic ignored "-Wshadow-uncaptured-local"
26# pragma clang diagnostic ignored "-Wweak-vtables"
27#endif
28
29namespace IceInternal
30{
31 class OutgoingAsyncBase;
32 class RetryException;
33 class CollocatedRequestHandler;
34
35 enum AsyncStatus
36 {
37 AsyncStatusQueued = 0,
38 AsyncStatusSent = 1,
39 AsyncStatusInvokeSentCallback = 2
40 };
41
42 class ICE_API OutgoingAsyncCompletionCallback
43 {
44 public:
45 virtual ~OutgoingAsyncCompletionCallback();
46
47 protected:
48 // Returns true if handleInvokeSent handles sent callbacks.
49 virtual bool handleSent(bool done, bool alreadySent) noexcept = 0;
50
51 // Returns true if handleInvokeException handles exception callbacks.
52 virtual bool handleException(std::exception_ptr) noexcept = 0;
53
54 // Returns true if handleInvokeResponse handles response callbacks.
55 // This function can unmarshal the response and throw an exception.
56 virtual bool handleResponse(bool) = 0;
57
58 virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const = 0;
59 virtual void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const = 0;
60 virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const = 0;
61 };
62
63 //
64 // Base class for handling asynchronous invocations. This class is
65 // responsible for the handling of the output stream and the child
66 // invocation observer.
67 //
68 class ICE_API OutgoingAsyncBase : public virtual OutgoingAsyncCompletionCallback,
69 public std::enable_shared_from_this<OutgoingAsyncBase>
70 {
71 public:
72 virtual bool sent();
73 virtual bool exception(std::exception_ptr);
74 virtual bool response();
75
76 void invokeSentAsync();
77 void invokeExceptionAsync();
78 void invokeResponseAsync();
79
80 void invokeSent();
81 void invokeException();
82 void invokeResponse();
83
84 virtual void cancelable(const IceInternal::CancellationHandlerPtr&);
85 void cancel();
86
87 void
88 attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, std::int32_t requestId);
89
90 void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, std::int32_t requestId);
91
92 Ice::OutputStream* getOs() { return &_os; }
93
94 Ice::InputStream* getIs() { return &_is; }
95
96 protected:
97 OutgoingAsyncBase(const InstancePtr&);
98
99 bool sentImpl(bool);
100 bool exceptionImpl(std::exception_ptr);
101 bool responseImpl(bool, bool);
102
103 void cancel(std::exception_ptr);
104
105 void warning(std::string_view callbackName, std::exception_ptr eptr) const;
106
107 //
108 // This virtual method is necessary for the communicator flush
109 // batch requests implementation.
110 //
111 virtual IceInternal::InvocationObserver& getObserver() { return _observer; }
112
113 const InstancePtr _instance;
114 Ice::ConnectionPtr _cachedConnection;
115 bool _sentSynchronously{false};
116 bool _doneInSent{false};
117 unsigned char _state{0};
118
119 std::mutex _m;
120 using Lock = std::lock_guard<std::mutex>;
121
122 std::exception_ptr _ex;
123 std::exception_ptr _cancellationException;
124
125 InvocationObserver _observer;
126 ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver;
127
128 Ice::OutputStream _os;
129 Ice::InputStream _is;
130
131 CancellationHandlerPtr _cancellationHandler;
132
133 static const unsigned char OK;
134 static const unsigned char Sent;
135 };
136
137 using OutgoingAsyncBasePtr = std::shared_ptr<OutgoingAsyncBase>;
138
139 //
140 // Base class for proxy based invocations. This class handles the
141 // retry for proxy invocations. It also ensures the child observer is
142 // correct notified of failures and make sure the retry task is
143 // correctly canceled when the invocation completes.
144 //
145 class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, public TimerTask
146 {
147 public:
148 virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0;
149 virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) = 0;
150
151 bool exception(std::exception_ptr) override;
152
153 void retryException();
154 void retry();
155 void abort(std::exception_ptr);
156
157 std::shared_ptr<ProxyOutgoingAsyncBase> shared_from_this()
158 {
159 return std::static_pointer_cast<ProxyOutgoingAsyncBase>(OutgoingAsyncBase::shared_from_this());
160 }
161
162 protected:
163 ProxyOutgoingAsyncBase(Ice::ObjectPrx);
164 ~ProxyOutgoingAsyncBase() override;
165
166 void invokeImpl(bool);
167 bool sentImpl(bool);
168 bool exceptionImpl(std::exception_ptr);
169 bool responseImpl(bool, bool);
170
171 void runTimerTask() override;
172
173 const Ice::ObjectPrx _proxy;
174 RequestHandlerPtr _handler;
176
177 private:
178 int handleRetryAfterException(std::exception_ptr);
179 int checkRetryAfterException(std::exception_ptr);
180
181 int _cnt{0};
182 bool _sent{false};
183 };
184
185 using ProxyOutgoingAsyncBasePtr = std::shared_ptr<ProxyOutgoingAsyncBase>;
186
187 //
188 // Class for handling Slice operation invocations
189 //
190 class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase
191 {
192 public:
193 OutgoingAsync(Ice::ObjectPrx, bool);
194
195 void prepare(std::string_view operation, Ice::OperationMode mode, const Ice::Context& context);
196
197 bool sent() override;
198 bool response() override;
199
200 AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) override;
201 AsyncStatus invokeCollocated(CollocatedRequestHandler*) override;
202
203 void abort(std::exception_ptr);
204 void invoke(std::string_view);
205 void invoke(
206 std::string_view,
208 std::optional<Ice::FormatType>,
209 const Ice::Context&,
210 const std::function<void(Ice::OutputStream*)>&);
211 void throwUserException();
212
213 Ice::OutputStream* startWriteParams(std::optional<Ice::FormatType> format)
214 {
215 _os.startEncapsulation(_encoding, format);
216 return &_os;
217 }
218 void endWriteParams() { _os.endEncapsulation(); }
219 void writeEmptyParams() { _os.writeEmptyEncapsulation(_encoding); }
220 void writeParamEncaps(const std::byte* encaps, std::int32_t size)
221 {
222 if (size == 0)
223 {
224 _os.writeEmptyEncapsulation(_encoding);
225 }
226 else
227 {
228 _os.writeEncapsulation(encaps, size);
229 }
230 }
231
232 protected:
233 const Ice::EncodingVersion _encoding;
234 std::function<void(const Ice::UserException&)> _userException;
235 bool _synchronous;
236 };
237
238 using OutgoingAsyncPtr = std::shared_ptr<OutgoingAsync>;
239
240 class ICE_API LambdaInvoke : public virtual OutgoingAsyncCompletionCallback
241 {
242 public:
243 LambdaInvoke(std::function<void(std::exception_ptr)> exception, std::function<void(bool)> sent)
244 : _exception(std::move(exception)),
245 _sent(std::move(sent))
246 {
247 }
248
249 protected:
250 bool handleSent(bool, bool) noexcept final;
251 bool handleException(std::exception_ptr) noexcept final;
252 bool handleResponse(bool) final;
253
254 void handleInvokeSent(bool, OutgoingAsyncBase*) const final;
255 void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const final;
256 void handleInvokeResponse(bool, OutgoingAsyncBase*) const final;
257
258 std::function<void(std::exception_ptr)> _exception;
259 std::function<void(bool)> _sent;
260 std::function<void(bool)> _response;
261 };
262
263 template<typename R> class PromiseInvoke : public virtual OutgoingAsyncCompletionCallback
264 {
265 public:
266 [[nodiscard]] std::future<R> getFuture() { return _promise.get_future(); }
267
268 protected:
269 bool handleSent(bool, bool) noexcept override { return false; }
270
271 bool handleException(std::exception_ptr ex) noexcept final
272 {
273 _promise.set_exception(ex);
274 return false;
275 }
276
277 bool handleResponse(bool ok) final
278 {
279 _response(ok);
280 return false;
281 }
282
283 void handleInvokeSent(bool, OutgoingAsyncBase*) const final { assert(false); }
284
285 void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const final { assert(false); }
286
287 void handleInvokeResponse(bool, OutgoingAsyncBase*) const final { assert(false); }
288
289 std::promise<R> _promise;
290 std::function<void(bool)> _response;
291 };
292
293 template<typename T> class OutgoingAsyncT : public OutgoingAsync
294 {
295 public:
296 using OutgoingAsync::OutgoingAsync;
297
298 void invoke(
299 std::string_view operation,
301 std::optional<Ice::FormatType> format,
302 const Ice::Context& ctx,
303 const std::function<void(Ice::OutputStream*)>& write,
304 std::function<void(const Ice::UserException&)> userException)
305 {
306 _read = [](Ice::InputStream* stream)
307 {
308 T v;
309 stream->read(v);
310 return v;
311 };
312 _userException = std::move(userException);
313 OutgoingAsync::invoke(operation, mode, format, ctx, write);
314 }
315
316 void invoke(
317 std::string_view operation,
319 std::optional<Ice::FormatType> format,
320 const Ice::Context& ctx,
321 const std::function<void(Ice::OutputStream*)>& write,
322 std::function<void(const Ice::UserException&)> userException,
323 std::function<T(Ice::InputStream*)> read)
324 {
325 _read = std::move(read);
326 _userException = std::move(userException);
327 OutgoingAsync::invoke(operation, mode, format, ctx, write);
328 }
329
330 protected:
331 std::function<T(Ice::InputStream*)> _read;
332 };
333
334 template<> class OutgoingAsyncT<void> : public OutgoingAsync
335 {
336 public:
337 using OutgoingAsync::OutgoingAsync;
338
339 void invoke(
340 std::string_view operation,
342 std::optional<Ice::FormatType> format,
343 const Ice::Context& ctx,
344 const std::function<void(Ice::OutputStream*)>& write,
345 std::function<void(const Ice::UserException&)> userException)
346 {
347 _userException = std::move(userException);
348 OutgoingAsync::invoke(operation, mode, format, ctx, write);
349 }
350 };
351
352 template<typename R> class LambdaOutgoing : public OutgoingAsyncT<R>, public LambdaInvoke
353 {
354 public:
355 LambdaOutgoing(
356 Ice::ObjectPrx proxy,
357 std::function<void(R)> response,
358 std::function<void(std::exception_ptr)> ex,
359 std::function<void(bool)> sent)
360 : OutgoingAsyncT<R>(std::move(proxy), false),
361 LambdaInvoke(std::move(ex), std::move(sent))
362 {
363 _response = [this, response = std::move(response)](bool ok)
364 {
365 if (!ok)
366 {
367 this->throwUserException();
368 }
369 else if (response)
370 {
371 assert(this->_read);
372 this->_is.startEncapsulation();
373 R v = this->_read(&this->_is);
374 this->_is.endEncapsulation();
375 try
376 {
377 response(std::move(v));
378 }
379 catch (...)
380 {
381 this->warning("response", std::current_exception());
382 }
383 }
384 };
385 }
386 };
387
388 template<> class LambdaOutgoing<void> : public OutgoingAsyncT<void>, public LambdaInvoke
389 {
390 public:
391 LambdaOutgoing(
392 Ice::ObjectPrx proxy,
393 std::function<void()> response,
394 std::function<void(std::exception_ptr)> ex,
395 std::function<void(bool)> sent)
396 : OutgoingAsyncT<void>(std::move(proxy), false),
397 LambdaInvoke(std::move(ex), std::move(sent))
398 {
399 _response = [this, response = std::move(response)](bool ok)
400 {
401 if (!ok)
402 {
403 this->throwUserException();
404 }
405 else if (response)
406 {
407 if (!this->_is.b.empty())
408 {
409 this->_is.skipEmptyEncapsulation();
410 }
411 try
412 {
413 response();
414 }
415 catch (...)
416 {
417 this->warning("response", std::current_exception());
418 }
419 }
420 };
421 }
422 };
423
424 template<typename R> class PromiseOutgoing : public OutgoingAsyncT<R>, public PromiseInvoke<R>
425 {
426 public:
427 PromiseOutgoing(Ice::ObjectPrx proxy, bool sync) : OutgoingAsyncT<R>(std::move(proxy), sync)
428 {
429 this->_response = [this](bool ok)
430 {
431 if (ok)
432 {
433 assert(this->_read);
434 this->_is.startEncapsulation();
435 R v = this->_read(&this->_is);
436 this->_is.endEncapsulation();
437 this->_promise.set_value(std::move(v));
438 }
439 else
440 {
441 this->throwUserException();
442 }
443 };
444 }
445 };
446
447 template<> class PromiseOutgoing<void> : public OutgoingAsyncT<void>, public PromiseInvoke<void>
448 {
449 public:
450 PromiseOutgoing(Ice::ObjectPrx proxy, bool sync) : OutgoingAsyncT<void>(std::move(proxy), sync)
451 {
452 this->_response = [&](bool ok)
453 {
454 if (this->_is.b.empty())
455 {
456 //
457 // If there's no response (oneway, batch-oneway proxies), we just set the promise
458 // on completion without reading anything from the input stream. This is required for
459 // batch invocations.
460 //
461 this->_promise.set_value();
462 }
463 else if (ok)
464 {
465 this->_is.skipEmptyEncapsulation();
466 this->_promise.set_value();
467 }
468 else
469 {
470 this->throwUserException();
471 }
472 };
473 }
474
475 bool handleSent(bool done, bool) noexcept final
476 {
477 if (done)
478 {
479 PromiseInvoke<void>::_promise.set_value();
480 }
481 return false;
482 }
483 };
484
485 template<typename R, typename Obj, typename Fn, typename... Args>
486 [[nodiscard]] inline std::future<R> makePromiseOutgoing(bool sync, Obj obj, Fn fn, Args&&... args)
487 {
488 auto outAsync = std::make_shared<PromiseOutgoing<R>>(*obj, sync);
489 (obj->*fn)(outAsync, std::forward<Args>(args)...);
490 return outAsync->getFuture();
491 }
492
493 template<typename R, typename Re, typename E, typename S, typename Obj, typename Fn, typename... Args>
494 [[nodiscard]] inline std::function<void()> makeLambdaOutgoing(Re r, E e, S s, Obj obj, Fn fn, Args&&... args)
495 {
496 auto outAsync = std::make_shared<LambdaOutgoing<R>>(*obj, std::move(r), std::move(e), std::move(s));
497 (obj->*fn)(outAsync, std::forward<Args>(args)...);
498 return [outAsync]() { outAsync->cancel(); };
499 }
500}
501
502#if defined(__clang__)
503# pragma clang diagnostic pop
504#endif
505
506#endif
std::shared_ptr< ConnectionInfo > ConnectionInfoPtr
A shared pointer to a ConnectionInfo.
Definition ConnectionF.h:21
std::shared_ptr< ObjectAdapter > ObjectAdapterPtr
A shared pointer to an ObjectAdapter.
std::shared_ptr< Endpoint > EndpointPtr
A shared pointer to an Endpoint.
Definition EndpointF.h:20
OperationMode
Specifies if an operation is idempotent, which affects the retry behavior of the Ice client runtime.
@ Normal
A non-idempotent operation (the default).
std::shared_ptr< Connection > ConnectionPtr
A shared pointer to a Connection.
Definition ConnectionF.h:18
@ UserException
The dispatch completed with a Slice user exception.
Definition ReplyStatus.h:33
std::map< std::string, std::string, std::less<> > Context
Represents additional information carried by an Ice request.
Definition Context.h:28