Ice 3.8
C++ API Reference
Loading...
Searching...
No Matches
InternalT.h
1// Copyright (c) ZeroC, Inc.
2
3#pragma once
4
5#include "Config.h"
6#include "Ice/Demangle.h"
7#include "Ice/Ice.h"
8#include "InternalI.h"
9#include "Types.h"
10
11#if defined(__clang__)
12# pragma clang diagnostic push
13# pragma clang diagnostic ignored "-Wshadow-field-in-constructor"
14#elif defined(__GNUC__)
15# pragma GCC diagnostic push
16# pragma GCC diagnostic ignored "-Wshadow"
17#endif
18
19namespace DataStorm
20{
21 template<typename K, typename V, typename U> class Sample;
22}
23
24namespace DataStormI
25{
26 template<typename T> class has_communicator_parameter
27 {
28 template<typename TT, typename SS>
29 static auto testE(int) noexcept
30 -> decltype(TT::encode(std::declval<Ice::CommunicatorPtr&>(), std::declval<SS&>()), std::true_type());
31
32 template<typename, typename> static auto testE(...) -> std::false_type;
33
34 template<typename TT, typename SS>
35 static auto testD(int) noexcept
36 -> decltype(TT::decode(std::declval<Ice::CommunicatorPtr&>(), Ice::ByteSeq()), std::true_type());
37
38 template<typename, typename> static auto testD(...) -> std::false_type;
39
40 public:
41 static constexpr bool value =
42 decltype(testE<DataStorm::Encoder<T>, T>(0))::value && decltype(testD<DataStorm::Decoder<T>, T>(0))::value;
43 };
44
45 template<typename T, typename Enabler = void> struct EncoderT
46 {
47 static Ice::ByteSeq encode(const Ice::CommunicatorPtr&, const T& value)
48 {
50 }
51 };
52
53 template<typename T, typename Enabler = void> struct DecoderT
54 {
55 static T decode(const Ice::CommunicatorPtr&, const Ice::ByteSeq& data)
56 {
58 }
59 };
60
61 template<typename T> struct EncoderT<T, std::enable_if_t<has_communicator_parameter<T>::value>>
62 {
63 static Ice::ByteSeq encode(const Ice::CommunicatorPtr& communicator, const T& value)
64 {
65 return DataStorm::Encoder<T>::encode(communicator, value);
66 }
67 };
68
69 template<typename T> struct DecoderT<T, std::enable_if_t<has_communicator_parameter<T>::value>>
70 {
71 static T decode(const Ice::CommunicatorPtr& communicator, const Ice::ByteSeq& data)
72 {
73 return DataStorm::Decoder<T>::decode(communicator, data);
74 }
75 };
76
77 template<typename T> class is_streamable
78 {
79 template<typename TT, typename SS>
80 static auto test(int) noexcept -> decltype(std::declval<SS&>() << std::declval<TT>(), std::true_type());
81
82 template<typename, typename> static auto test(...) noexcept -> std::false_type;
83
84 public:
85 static constexpr bool value = decltype(test<T, std::ostream>(0))::value;
86 };
87
88 template<typename T, typename Enabler = void> struct Stringifier
89 {
90 static std::string toString(const T& value)
91 {
92 std::ostringstream os;
93 os << IceInternal::demangle(typeid(value).name()) << '(' << &value << ')';
94 return os.str();
95 }
96 };
97
98 template<typename T> struct Stringifier<T, std::enable_if_t<is_streamable<T>::value>>
99 {
100 static std::string toString(const T& value)
101 {
102 std::ostringstream os;
103 os << value;
104 return os.str();
105 }
106 };
107
108 template<typename T> class AbstractElementT : public virtual Element
109 {
110 public:
111 template<typename TT> AbstractElementT(TT&& v, std::int64_t id) : _value(std::forward<TT>(v)), _id(id) {}
112
113 [[nodiscard]] std::string toString() const override
114 {
115 std::ostringstream os;
116 os << _id << ':' << Stringifier<T>::toString(_value);
117 return os.str();
118 }
119
120 [[nodiscard]] Ice::ByteSeq encode(const Ice::CommunicatorPtr& communicator) const override
121 {
122 return EncoderT<T>::encode(communicator, _value);
123 }
124
125 [[nodiscard]] std::int64_t getId() const override { return _id; }
126
127 [[nodiscard]] const T& get() const { return _value; }
128
129 protected:
130 const T _value;
131 const std::int64_t _id;
132 };
133
134 template<typename K, typename V>
135 class AbstractFactoryT : public std::enable_shared_from_this<AbstractFactoryT<K, V>>
136 {
137 /// A custom deleter to remove the element from the factory when the shared_ptr is deleted.
138 /// The deleter is used by elements created by the factory.
139 struct Deleter
140 {
141 void operator()(V* obj)
142 {
143 if (auto factory = _factory.lock())
144 {
145 factory->remove(obj);
146 }
147 delete obj;
148 }
149
150 std::weak_ptr<AbstractFactoryT<K, V>> _factory;
151
152 } _deleter;
153
154 public:
155 AbstractFactoryT() = default;
156
157 void init() { _deleter = Deleter{std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this()}; }
158
159 template<typename F, typename... Args>
160 [[nodiscard]] std::shared_ptr<typename V::BaseClassType> create(F&& value, Args&&... args)
161 {
162 std::lock_guard<std::mutex> lock(_mutex);
163 return createImpl(std::forward<F>(value), std::forward<Args>(args)...);
164 }
165
166 [[nodiscard]] std::vector<std::shared_ptr<typename V::BaseClassType>> create(std::vector<K> values)
167 {
168 std::lock_guard<std::mutex> lock(_mutex);
169 std::vector<std::shared_ptr<typename V::BaseClassType>> seq;
170 seq.reserve(values.size());
171 for (auto& v : values)
172 {
173 seq.push_back(createImpl(std::move(v)));
174 }
175 return seq;
176 }
177
178 protected:
179 friend struct Deleter;
180
181 [[nodiscard]] std::shared_ptr<typename V::BaseClassType> getImpl(std::int64_t id) const
182 {
183 std::lock_guard<std::mutex> lock(_mutex);
184 auto p = _elementsById.find(id);
185 if (p != _elementsById.end())
186 {
187 return p->second.lock();
188 }
189 return nullptr;
190 }
191
192 template<typename F, typename... Args> [[nodiscard]] std::shared_ptr<V> createImpl(F&& value, Args&&... args)
193 {
194 // Called with _mutex locked
195
196 auto p = _elements.find(value);
197 if (p != _elements.end())
198 {
199 auto k = p->second.lock();
200 if (k)
201 {
202 return k;
203 }
204
205 // The key is being removed concurrently by the deleter, remove it now to allow the insertion of a new
206 // key. The deleter won't remove the new key.
207 _elements.erase(p);
208 }
209
210 auto k =
211 std::shared_ptr<V>(new V(std::forward<F>(value), std::forward<Args>(args)..., ++_nextId), _deleter);
212 _elements[k->get()] = k;
213 _elementsById[k->getId()] = k;
214 return k;
215 }
216
217 void remove(V* v)
218 {
219 // Make sure to declare the variable outside the synchronization in case the element needs
220 // to be deleted if it's not the same.
221 std::shared_ptr<V> e;
222 std::lock_guard<std::mutex> lock(_mutex);
223 auto p = _elements.find(v->get());
224 if (p != _elements.end())
225 {
226 e = p->second.lock();
227 if (e && e.get() == v)
228 {
229 _elements.erase(p);
230 }
231 }
232 _elementsById.erase(v->getId());
233 }
234
235 mutable std::mutex _mutex;
236 std::map<K, std::weak_ptr<V>> _elements;
237 std::map<std::int64_t, std::weak_ptr<V>> _elementsById;
238 std::int64_t _nextId{1};
239 };
240
241 template<typename K> class KeyT final : public Key, public AbstractElementT<K>
242 {
243 public:
244 [[nodiscard]] std::string toString() const final { return "k" + AbstractElementT<K>::toString(); }
245
246 using AbstractElementT<K>::AbstractElementT;
247 using BaseClassType = Key;
248 };
249
250 template<typename K> class KeyFactoryT final : public KeyFactory, public AbstractFactoryT<K, KeyT<K>>
251 {
252 public:
253 using AbstractFactoryT<K, KeyT<K>>::AbstractFactoryT;
254
255 [[nodiscard]] std::shared_ptr<Key> get(std::int64_t id) const final
256 {
257 return AbstractFactoryT<K, KeyT<K>>::getImpl(id);
258 }
259
260 [[nodiscard]] std::shared_ptr<Key>
261 decode(const Ice::CommunicatorPtr& communicator, const Ice::ByteSeq& data) final
262 {
263 return AbstractFactoryT<K, KeyT<K>>::create(DecoderT<K>::decode(communicator, data));
264 }
265
266 [[nodiscard]] static std::shared_ptr<KeyFactoryT<K>> createFactory()
267 {
268 auto f = std::make_shared<KeyFactoryT<K>>();
269 f->init();
270 return f;
271 }
272 };
273
274 template<typename T> class TagT final : public Tag, public AbstractElementT<T>
275 {
276 public:
277 [[nodiscard]] std::string toString() const final { return "t" + AbstractElementT<T>::toString(); }
278
279 using AbstractElementT<T>::AbstractElementT;
280 using BaseClassType = Tag;
281 };
282
283 template<typename T> class TagFactoryT final : public TagFactory, public AbstractFactoryT<T, TagT<T>>
284 {
285 public:
286 using AbstractFactoryT<T, TagT<T>>::AbstractFactoryT;
287
288 [[nodiscard]] std::shared_ptr<Tag> get(std::int64_t id) const final
289 {
290 return AbstractFactoryT<T, TagT<T>>::getImpl(id);
291 }
292
293 [[nodiscard]] std::shared_ptr<Tag>
294 decode(const Ice::CommunicatorPtr& communicator, const Ice::ByteSeq& data) final
295 {
296 return AbstractFactoryT<T, TagT<T>>::create(DecoderT<T>::decode(communicator, data));
297 }
298
299 [[nodiscard]] static std::shared_ptr<TagFactoryT<T>> createFactory()
300 {
301 auto f = std::make_shared<TagFactoryT<T>>();
302 f->init();
303 return f;
304 }
305 };
306
307 template<typename Key, typename Value, typename UpdateTag>
308 class SampleT final : public Sample, public std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>
309 {
310 public:
311 SampleT(
312 std::string session,
313 std::string origin,
314 std::int64_t id,
316 const std::shared_ptr<DataStormI::Key>& key,
317 const std::shared_ptr<DataStormI::Tag>& tag,
318 Ice::ByteSeq value,
319 std::int64_t timestamp)
320 : Sample(std::move(session), std::move(origin), id, event, key, tag, std::move(value), timestamp),
321 _hasValue(false)
322 {
323 }
324
325 SampleT(DataStorm::SampleEvent event) : Sample(event), _hasValue(false) {}
326
327 SampleT(DataStorm::SampleEvent event, Value value) : Sample(event), _hasValue(true), _value(std::move(value)) {}
328
329 SampleT(Ice::ByteSeq value, const std::shared_ptr<Tag>& tag)
330 : Sample(DataStorm::SampleEvent::PartialUpdate, tag),
331 _hasValue(false)
332 {
333 _encodedValue = std::move(value);
334 }
335
336 [[nodiscard]] DataStorm::Sample<Key, Value, UpdateTag> get()
337 {
338 auto impl = std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>::shared_from_this();
339 return DataStorm::Sample<Key, Value, UpdateTag>(impl);
340 }
341
342 [[nodiscard]] const Key& getKey()
343 {
344 assert(key);
345 return std::static_pointer_cast<KeyT<Key>>(key)->get();
346 }
347
348 [[nodiscard]] const Value& getValue() const { return _value; }
349
350 [[nodiscard]] UpdateTag getTag() const
351 {
352 return tag ? std::static_pointer_cast<TagT<UpdateTag>>(tag)->get() : UpdateTag();
353 }
354
355 void setValue(Value value)
356 {
357 _value = std::move(value);
358 _hasValue = true;
359 }
360
361 [[nodiscard]] bool hasValue() const final { return _hasValue; }
362
363 void setValue(const std::shared_ptr<Sample>& sample) final
364 {
365 if (sample)
366 {
368 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(sample)->getValue());
369 }
370 else
371 {
372 _value = Value{};
373 }
374 _hasValue = true;
375 }
376
377 [[nodiscard]] const Ice::ByteSeq& encode(const Ice::CommunicatorPtr& communicator) final
378 {
379 if (_encodedValue.empty())
380 {
381 _encodedValue = encodeValue(communicator);
382 }
383 return _encodedValue;
384 }
385
386 [[nodiscard]] Ice::ByteSeq encodeValue(const Ice::CommunicatorPtr& communicator) final
387 {
388 assert(_hasValue || event == DataStorm::SampleEvent::Remove);
389 return EncoderT<Value>::encode(communicator, _value);
390 }
391
392 void decode(const Ice::CommunicatorPtr& communicator) final
393 {
394 if (!_encodedValue.empty())
395 {
396 _hasValue = true;
397 _value = DecoderT<Value>::decode(communicator, _encodedValue);
398 _encodedValue.clear();
399 }
400 }
401
402 private:
403 bool _hasValue;
404 Value _value;
405 };
406
407 template<typename Key, typename Value, typename UpdateTag> class SampleFactoryT final : public SampleFactory
408 {
409 public:
410 [[nodiscard]] std::shared_ptr<Sample> create(
411 std::string session,
412 std::string origin,
413 std::int64_t id,
415 const std::shared_ptr<DataStormI::Key>& key,
416 const std::shared_ptr<DataStormI::Tag>& tag,
417 Ice::ByteSeq value,
418 std::int64_t timestamp) final
419 {
420 return std::make_shared<SampleT<Key, Value, UpdateTag>>(
421 std::move(session),
422 std::move(origin),
423 id,
424 type,
425 key,
426 tag,
427 std::move(value),
428 timestamp);
429 }
430 };
431
432 template<typename C, typename V> class FilterT final : public Filter, public AbstractElementT<C>
433 {
434 public:
435 template<typename CC, typename FF>
436 FilterT(CC&& criteria, std::string name, FF lambda, std::int64_t id)
437 : AbstractElementT<C>::AbstractElementT(std::forward<CC>(criteria), id),
438 _name(std::move(name)),
439 _lambda(std::move(lambda))
440 {
441 }
442
443 [[nodiscard]] std::string toString() const final { return "f" + AbstractElementT<C>::toString(); }
444
445 [[nodiscard]] bool match(const std::shared_ptr<Filterable>& value) const final
446 {
447 return _lambda(std::static_pointer_cast<V>(value)->get());
448 }
449
450 [[nodiscard]] const std::string& getName() const final { return _name; }
451
452 using BaseClassType = Filter;
453
454 private:
455 std::string _name;
456 std::function<bool(const typename std::remove_reference<decltype(std::declval<V>().get())>::type&)> _lambda;
457 };
458
459 template<typename C, typename V>
460 class FilterFactoryT final : public FilterFactory, public AbstractFactoryT<C, FilterT<C, V>>
461 {
462 public:
463 FilterFactoryT() = default;
464
465 [[nodiscard]] std::shared_ptr<Filter> get(std::int64_t id) const final
466 {
467 return AbstractFactoryT<C, FilterT<C, V>>::getImpl(id);
468 }
469
470 [[nodiscard]] static std::shared_ptr<FilterFactoryT<C, V>> createFactory()
471 {
472 auto f = std::make_shared<FilterFactoryT<C, V>>();
473 f->init();
474 return f;
475 }
476 };
477
478 template<typename ValueT> class FilterManagerT final : public FilterManager
479 {
480 using Value = std::remove_reference_t<decltype(std::declval<ValueT>().get())>;
481
482 struct Factory
483 {
484 virtual ~Factory() = default;
485
486 [[nodiscard]] virtual std::shared_ptr<Filter> get(std::int64_t) const = 0;
487
488 [[nodiscard]] virtual std::shared_ptr<Filter> decode(const Ice::CommunicatorPtr&, const Ice::ByteSeq&) = 0;
489 };
490
491 template<typename Criteria> struct FactoryT final : Factory
492 {
493 FactoryT(std::string name, std::function<std::function<bool(const Value&)>(const Criteria&)> lambda)
494 : name(std::move(name)),
495 lambda(std::move(lambda))
496 {
497 }
498
499 [[nodiscard]] std::shared_ptr<Filter> create(const Criteria& criteria)
500 {
501 return std::static_pointer_cast<FilterT<Criteria, ValueT>>(
502 filterFactory.create(criteria, name, lambda(criteria)));
503 }
504
505 [[nodiscard]] std::shared_ptr<Filter> get(std::int64_t id) const final { return filterFactory.get(id); }
506
507 [[nodiscard]] std::shared_ptr<Filter>
508 decode(const Ice::CommunicatorPtr& communicator, const Ice::ByteSeq& data) final
509 {
510 return create(DecoderT<Criteria>::decode(communicator, data));
511 }
512
513 const std::string name;
514 std::function<std::function<bool(const Value&)>(const Criteria&)> lambda;
515 FilterFactoryT<Criteria, ValueT> filterFactory;
516 };
517
518 public:
519 template<typename Criteria>
520 [[nodiscard]] std::shared_ptr<Filter> create(const std::string& name, const Criteria& criteria)
521 {
522 auto p = _factories.find(name);
523 if (p == _factories.end())
524 {
525 throw std::invalid_argument("unknown filter '" + name + "'");
526 }
527
528 auto factory = dynamic_cast<FactoryT<Criteria>*>(p->second.get());
529 if (!factory)
530 {
531 throw std::invalid_argument("filter '" + name + "' type doesn't match");
532 }
533
534 return factory->create(criteria);
535 }
536
537 [[nodiscard]] std::shared_ptr<Filter>
538 decode(const Ice::CommunicatorPtr& communicator, const std::string& name, const Ice::ByteSeq& data) final
539 {
540 auto p = _factories.find(name);
541 if (p == _factories.end())
542 {
543 return nullptr;
544 }
545
546 return p->second->decode(communicator, data);
547 }
548
549 [[nodiscard]] std::shared_ptr<Filter> get(const std::string& name, std::int64_t id) const final
550 {
551 auto p = _factories.find(name);
552 if (p == _factories.end())
553 {
554 return nullptr;
555 }
556
557 return p->second->get(id);
558 }
559
560 template<typename Criteria>
561 void set(std::string name, std::function<std::function<bool(const Value&)>(const Criteria&)> lambda)
562 {
563 if (lambda)
564 {
565 auto factory = std::make_unique<FactoryT<Criteria>>(name, std::move(lambda));
566 _factories.emplace(std::move(name), std::move(factory));
567 }
568 else
569 {
570 _factories.erase(name);
571 }
572 }
573
574 private:
575 // A map containing the filter factories, indexed by the filter name.
576 std::map<std::string, std::unique_ptr<Factory>> _factories;
577 };
578}
579
580#if defined(__clang__)
581# pragma clang diagnostic pop
582#elif defined(__GNUC__)
583# pragma GCC diagnostic pop
584#endif
A sample provides information about a data element update.
Definition DataStorm.h:30
SampleEvent
Describes the operation used by a data writer to update a data element.
Definition SampleEvent.h:34
@ Remove
The data writer removed the element.
Definition SampleEvent.h:45
@ PartialUpdate
The data writer partially updated the element.
Definition SampleEvent.h:42
Data-centric, broker-less publish/subscribe framework. C++ only.
Definition DataStorm.h:24
std::shared_ptr< Communicator > CommunicatorPtr
A shared pointer to a Communicator.
std::vector< std::byte > ByteSeq
A sequence of bytes.
static T clone(const T &value) noexcept
Clones the given value.
Definition Types.h:186
static T decode(const Ice::CommunicatorPtr &communicator, const Ice::ByteSeq &value) noexcept
Decodes a value.
Definition Types.h:208
static Ice::ByteSeq encode(const Ice::CommunicatorPtr &communicator, const T &value) noexcept
Encodes the given value.
Definition Types.h:197