Ice 3.8
C++ API Reference
Loading...
Searching...
No Matches
OutgoingAsync.h
1// Copyright (c) ZeroC, Inc.
2
3#ifndef ICE_OUTGOING_ASYNC_H
4#define ICE_OUTGOING_ASYNC_H
5
6#include "CommunicatorF.h"
7#include "ConnectionF.h"
8#include "ConnectionIF.h"
9#include "InputStream.h"
10#include "LocalExceptions.h"
11#include "ObjectAdapterF.h"
12#include "ObserverHelper.h"
13#include "OutputStream.h"
14#include "Proxy.h"
15#include "RequestHandlerF.h"
16#include "TimerTask.h"
17
18#include <cassert>
19#include <exception>
20#include <string_view>
21
22#if defined(_MSC_VER)
23# pragma warning(push)
24# pragma warning(disable : 4250) // ... : inherits ... via dominance
25# pragma warning(disable : 4251) // class ... needs to have dll-interface to be used by clients of class ...
26#elif defined(__clang__)
27# pragma clang diagnostic push
28// See #2747
29# pragma clang diagnostic ignored "-Wshadow-uncaptured-local"
30# pragma clang diagnostic ignored "-Wweak-vtables"
31#endif
32
33namespace IceInternal
34{
35 class OutgoingAsyncBase;
36 class RetryException;
37 class CollocatedRequestHandler;
38
39 enum AsyncStatus
40 {
41 AsyncStatusQueued = 0,
42 AsyncStatusSent = 1,
43 AsyncStatusInvokeSentCallback = 2
44 };
45
46 class ICE_API OutgoingAsyncCompletionCallback
47 {
48 public:
49 virtual ~OutgoingAsyncCompletionCallback();
50
51 protected:
52 // Returns true if handleInvokeSent handles sent callbacks.
53 virtual bool handleSent(bool done, bool alreadySent) noexcept = 0;
54
55 // Returns true if handleInvokeException handles exception callbacks.
56 virtual bool handleException(std::exception_ptr) noexcept = 0;
57
58 // Returns true if handleInvokeResponse handles response callbacks.
59 // This function can unmarshal the response and throw an exception.
60 virtual bool handleResponse(bool) = 0;
61
62 virtual void handleInvokeSent(bool, OutgoingAsyncBase*) const = 0;
63 virtual void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const = 0;
64 virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const = 0;
65 };
66
67 //
68 // Base class for handling asynchronous invocations. This class is
69 // responsible for the handling of the output stream and the child
70 // invocation observer.
71 //
72 class ICE_API OutgoingAsyncBase : public virtual OutgoingAsyncCompletionCallback,
73 public std::enable_shared_from_this<OutgoingAsyncBase>
74 {
75 public:
76 virtual bool sent();
77 virtual bool exception(std::exception_ptr);
78 virtual bool response();
79
80 void invokeSentAsync();
81 void invokeExceptionAsync();
82 void invokeResponseAsync();
83
84 void invokeSent();
85 void invokeException();
86 void invokeResponse();
87
88 virtual void cancelable(const IceInternal::CancellationHandlerPtr&);
89 void cancel();
90
91 void
92 attachRemoteObserver(const Ice::ConnectionInfoPtr& c, const Ice::EndpointPtr& endpt, std::int32_t requestId);
93
94 void attachCollocatedObserver(const Ice::ObjectAdapterPtr& adapter, std::int32_t requestId);
95
96 Ice::OutputStream* getOs() { return &_os; }
97
98 Ice::InputStream* getIs() { return &_is; }
99
100 protected:
101 OutgoingAsyncBase(const InstancePtr&);
102
103 bool sentImpl(bool);
104 bool exceptionImpl(std::exception_ptr);
105 bool responseImpl(bool, bool);
106
107 void cancel(std::exception_ptr);
108
109 void warning(std::string_view callbackName, std::exception_ptr eptr) const;
110
111 //
112 // This virtual method is necessary for the communicator flush
113 // batch requests implementation.
114 //
115 virtual IceInternal::InvocationObserver& getObserver() { return _observer; }
116
117 const InstancePtr _instance;
118 Ice::ConnectionPtr _cachedConnection;
119 bool _sentSynchronously{false};
120 bool _doneInSent{false};
121 unsigned char _state{0};
122
123 std::mutex _m;
124 using Lock = std::lock_guard<std::mutex>;
125
126 std::exception_ptr _ex;
127 std::exception_ptr _cancellationException;
128
129 InvocationObserver _observer;
130 ObserverHelperT<Ice::Instrumentation::ChildInvocationObserver> _childObserver;
131
132 Ice::OutputStream _os;
133 Ice::InputStream _is;
134
135 CancellationHandlerPtr _cancellationHandler;
136
137 static const unsigned char OK;
138 static const unsigned char Sent;
139 };
140
141 using OutgoingAsyncBasePtr = std::shared_ptr<OutgoingAsyncBase>;
142
143 //
144 // Base class for proxy based invocations. This class handles the
145 // retry for proxy invocations. It also ensures the child observer is
146 // correct notified of failures and make sure the retry task is
147 // correctly canceled when the invocation completes.
148 //
149 class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase, public TimerTask
150 {
151 public:
152 virtual AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) = 0;
153 virtual AsyncStatus invokeCollocated(CollocatedRequestHandler*) = 0;
154
155 bool exception(std::exception_ptr) override;
156
157 void retryException();
158 void retry();
159 void abort(std::exception_ptr);
160
161 std::shared_ptr<ProxyOutgoingAsyncBase> shared_from_this()
162 {
163 return std::static_pointer_cast<ProxyOutgoingAsyncBase>(OutgoingAsyncBase::shared_from_this());
164 }
165
166 protected:
167 ProxyOutgoingAsyncBase(Ice::ObjectPrx);
168 ~ProxyOutgoingAsyncBase() override;
169
170 void invokeImpl(bool);
171 bool sentImpl(bool);
172 bool exceptionImpl(std::exception_ptr);
173 bool responseImpl(bool, bool);
174
175 void runTimerTask() override;
176
177 const Ice::ObjectPrx _proxy;
178 RequestHandlerPtr _handler;
180
181 private:
182 int handleRetryAfterException(std::exception_ptr);
183 int checkRetryAfterException(std::exception_ptr);
184
185 int _cnt{0};
186 bool _sent{false};
187 };
188
189 using ProxyOutgoingAsyncBasePtr = std::shared_ptr<ProxyOutgoingAsyncBase>;
190
191 //
192 // Class for handling Slice operation invocations
193 //
194 class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase
195 {
196 public:
197 OutgoingAsync(Ice::ObjectPrx, bool);
198
199 void prepare(std::string_view operation, Ice::OperationMode mode, const Ice::Context& context);
200
201 bool sent() override;
202 bool response() override;
203
204 AsyncStatus invokeRemote(const Ice::ConnectionIPtr&, bool, bool) override;
205 AsyncStatus invokeCollocated(CollocatedRequestHandler*) override;
206
207 void abort(std::exception_ptr);
208 void invoke(std::string_view);
209 void invoke(
210 std::string_view,
212 std::optional<Ice::FormatType>,
213 const Ice::Context&,
214 const std::function<void(Ice::OutputStream*)>&);
215 void throwUserException();
216
217 Ice::OutputStream* startWriteParams(std::optional<Ice::FormatType> format)
218 {
219 _os.startEncapsulation(_encoding, format);
220 return &_os;
221 }
222 void endWriteParams() { _os.endEncapsulation(); }
223 void writeEmptyParams() { _os.writeEmptyEncapsulation(_encoding); }
224 void writeParamEncaps(const std::byte* encaps, std::int32_t size)
225 {
226 if (size == 0)
227 {
228 _os.writeEmptyEncapsulation(_encoding);
229 }
230 else
231 {
232 _os.writeEncapsulation(encaps, size);
233 }
234 }
235
236 protected:
237 const Ice::EncodingVersion _encoding;
238 std::function<void(const Ice::UserException&)> _userException;
239 bool _synchronous;
240 };
241
242 using OutgoingAsyncPtr = std::shared_ptr<OutgoingAsync>;
243
244 class ICE_API LambdaInvoke : public virtual OutgoingAsyncCompletionCallback
245 {
246 public:
247 LambdaInvoke(std::function<void(std::exception_ptr)> exception, std::function<void(bool)> sent)
248 : _exception(std::move(exception)),
249 _sent(std::move(sent))
250 {
251 }
252
253 protected:
254 bool handleSent(bool, bool) noexcept final;
255 bool handleException(std::exception_ptr) noexcept final;
256 bool handleResponse(bool) final;
257
258 void handleInvokeSent(bool, OutgoingAsyncBase*) const final;
259 void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const final;
260 void handleInvokeResponse(bool, OutgoingAsyncBase*) const final;
261
262 std::function<void(std::exception_ptr)> _exception;
263 std::function<void(bool)> _sent;
264 std::function<void(bool)> _response;
265 };
266
267 template<typename R> class PromiseInvoke : public virtual OutgoingAsyncCompletionCallback
268 {
269 public:
270 [[nodiscard]] std::future<R> getFuture() { return _promise.get_future(); }
271
272 protected:
273 bool handleSent(bool, bool) noexcept override { return false; }
274
275 bool handleException(std::exception_ptr ex) noexcept final
276 {
277 _promise.set_exception(ex);
278 return false;
279 }
280
281 bool handleResponse(bool ok) final
282 {
283 _response(ok);
284 return false;
285 }
286
287 void handleInvokeSent(bool, OutgoingAsyncBase*) const final { assert(false); }
288
289 void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const final { assert(false); }
290
291 void handleInvokeResponse(bool, OutgoingAsyncBase*) const final { assert(false); }
292
293 std::promise<R> _promise;
294 std::function<void(bool)> _response;
295 };
296
297 template<typename T> class OutgoingAsyncT : public OutgoingAsync
298 {
299 public:
300 using OutgoingAsync::OutgoingAsync;
301
302 void invoke(
303 std::string_view operation,
305 std::optional<Ice::FormatType> format,
306 const Ice::Context& ctx,
307 const std::function<void(Ice::OutputStream*)>& write,
308 std::function<void(const Ice::UserException&)> userException)
309 {
310 _read = [](Ice::InputStream* stream)
311 {
312 T v{};
313 stream->read(v);
314 return v;
315 };
316 _userException = std::move(userException);
317 OutgoingAsync::invoke(operation, mode, format, ctx, write);
318 }
319
320 void invoke(
321 std::string_view operation,
323 std::optional<Ice::FormatType> format,
324 const Ice::Context& ctx,
325 const std::function<void(Ice::OutputStream*)>& write,
326 std::function<void(const Ice::UserException&)> userException,
327 std::function<T(Ice::InputStream*)> read)
328 {
329 _read = std::move(read);
330 _userException = std::move(userException);
331 OutgoingAsync::invoke(operation, mode, format, ctx, write);
332 }
333
334 protected:
335 std::function<T(Ice::InputStream*)> _read;
336 };
337
338 template<> class OutgoingAsyncT<void> : public OutgoingAsync
339 {
340 public:
341 using OutgoingAsync::OutgoingAsync;
342
343 void invoke(
344 std::string_view operation,
346 std::optional<Ice::FormatType> format,
347 const Ice::Context& ctx,
348 const std::function<void(Ice::OutputStream*)>& write,
349 std::function<void(const Ice::UserException&)> userException)
350 {
351 _userException = std::move(userException);
352 OutgoingAsync::invoke(operation, mode, format, ctx, write);
353 }
354 };
355
356 template<typename R> class LambdaOutgoing : public OutgoingAsyncT<R>, public LambdaInvoke
357 {
358 public:
359 LambdaOutgoing(
360 Ice::ObjectPrx proxy,
361 std::function<void(R)> response,
362 std::function<void(std::exception_ptr)> ex,
363 std::function<void(bool)> sent)
364 : OutgoingAsyncT<R>(std::move(proxy), false),
365 LambdaInvoke(std::move(ex), std::move(sent))
366 {
367 _response = [this, response = std::move(response)](bool ok)
368 {
369 if (!ok)
370 {
371 this->throwUserException();
372 }
373 else if (response)
374 {
375 assert(this->_read);
376 this->_is.startEncapsulation();
377 R v = this->_read(&this->_is);
378 this->_is.endEncapsulation();
379 try
380 {
381 response(std::move(v));
382 }
383 catch (...)
384 {
385 this->warning("response", std::current_exception());
386 }
387 }
388 };
389 }
390 };
391
392 template<> class LambdaOutgoing<void> : public OutgoingAsyncT<void>, public LambdaInvoke
393 {
394 public:
395 LambdaOutgoing(
396 Ice::ObjectPrx proxy,
397 std::function<void()> response,
398 std::function<void(std::exception_ptr)> ex,
399 std::function<void(bool)> sent)
400 : OutgoingAsyncT<void>(std::move(proxy), false),
401 LambdaInvoke(std::move(ex), std::move(sent))
402 {
403 _response = [this, response = std::move(response)](bool ok)
404 {
405 if (!ok)
406 {
407 this->throwUserException();
408 }
409 else if (response)
410 {
411 if (!this->_is.b.empty())
412 {
413 this->_is.skipEmptyEncapsulation();
414 }
415 try
416 {
417 response();
418 }
419 catch (...)
420 {
421 this->warning("response", std::current_exception());
422 }
423 }
424 };
425 }
426 };
427
428 template<typename R> class PromiseOutgoing : public OutgoingAsyncT<R>, public PromiseInvoke<R>
429 {
430 public:
431 PromiseOutgoing(Ice::ObjectPrx proxy, bool sync) : OutgoingAsyncT<R>(std::move(proxy), sync)
432 {
433 this->_response = [this](bool ok)
434 {
435 if (ok)
436 {
437 assert(this->_read);
438 this->_is.startEncapsulation();
439 R v = this->_read(&this->_is);
440 this->_is.endEncapsulation();
441 this->_promise.set_value(std::move(v));
442 }
443 else
444 {
445 this->throwUserException();
446 }
447 };
448 }
449 };
450
451 template<> class PromiseOutgoing<void> : public OutgoingAsyncT<void>, public PromiseInvoke<void>
452 {
453 public:
454 PromiseOutgoing(Ice::ObjectPrx proxy, bool sync) : OutgoingAsyncT<void>(std::move(proxy), sync)
455 {
456 this->_response = [&](bool ok)
457 {
458 if (this->_is.b.empty())
459 {
460 //
461 // If there's no response (oneway, batch-oneway proxies), we just set the promise
462 // on completion without reading anything from the input stream. This is required for
463 // batch invocations.
464 //
465 this->_promise.set_value();
466 }
467 else if (ok)
468 {
469 this->_is.skipEmptyEncapsulation();
470 this->_promise.set_value();
471 }
472 else
473 {
474 this->throwUserException();
475 }
476 };
477 }
478
479 bool handleSent(bool done, bool) noexcept final
480 {
481 if (done)
482 {
483 PromiseInvoke<void>::_promise.set_value();
484 }
485 return false;
486 }
487 };
488
489 template<typename R, typename Obj, typename Fn, typename... Args>
490 [[nodiscard]] inline std::future<R> makePromiseOutgoing(bool sync, Obj obj, Fn fn, Args&&... args)
491 {
492 auto outAsync = std::make_shared<PromiseOutgoing<R>>(*obj, sync);
493 (obj->*fn)(outAsync, std::forward<Args>(args)...);
494 return outAsync->getFuture();
495 }
496
497 template<typename R, typename Re, typename E, typename S, typename Obj, typename Fn, typename... Args>
498 [[nodiscard]] inline std::function<void()> makeLambdaOutgoing(Re r, E e, S s, Obj obj, Fn fn, Args&&... args)
499 {
500 auto outAsync = std::make_shared<LambdaOutgoing<R>>(*obj, std::move(r), std::move(e), std::move(s));
501 (obj->*fn)(outAsync, std::forward<Args>(args)...);
502 return [outAsync]() { outAsync->cancel(); };
503 }
504}
505
506#if defined(_MSC_VER)
507# pragma warning(pop)
508#elif defined(__clang__)
509# pragma clang diagnostic pop
510#endif
511
512#endif
std::shared_ptr< ConnectionInfo > ConnectionInfoPtr
A shared pointer to a ConnectionInfo.
Definition ConnectionF.h:21
std::shared_ptr< ObjectAdapter > ObjectAdapterPtr
A shared pointer to an ObjectAdapter.
std::shared_ptr< Endpoint > EndpointPtr
A shared pointer to an Endpoint.
Definition EndpointF.h:20
OperationMode
Specifies if an operation is idempotent, which affects the retry behavior of the Ice client runtime.
@ Normal
A non-idempotent operation (the default).
std::shared_ptr< Connection > ConnectionPtr
A shared pointer to a Connection.
Definition ConnectionF.h:18
@ UserException
The dispatch completed with a Slice user exception.
Definition ReplyStatus.h:40
std::map< std::string, std::string, std::less<> > Context
Represents additional information carried by an Ice request.
Definition Context.h:34