6#include "Ice/Demangle.h"
12# pragma clang diagnostic push
13# pragma clang diagnostic ignored "-Wshadow-field-in-constructor"
14#elif defined(__GNUC__)
15# pragma GCC diagnostic push
16# pragma GCC diagnostic ignored "-Wshadow"
21 template<
typename K,
typename V,
typename U>
class Sample;
26 template<
typename T>
class has_communicator_parameter
28 template<
typename TT,
typename SS>
29 static auto testE(
int)
noexcept
30 ->
decltype(TT::encode(std::declval<Ice::CommunicatorPtr&>(), std::declval<SS&>()), std::true_type());
32 template<
typename,
typename>
static auto testE(...) -> std::false_type;
34 template<
typename TT,
typename SS>
35 static auto testD(
int)
noexcept
36 ->
decltype(TT::decode(std::declval<Ice::CommunicatorPtr&>(),
Ice::ByteSeq()), std::true_type());
38 template<
typename,
typename>
static auto testD(...) -> std::false_type;
41 static constexpr bool value =
42 decltype(testE<DataStorm::Encoder<T>, T>(0))::value &&
decltype(testD<DataStorm::Decoder<T>, T>(0))::value;
45 template<
typename T,
typename Enabler =
void>
struct EncoderT
53 template<
typename T,
typename Enabler =
void>
struct DecoderT
61 template<
typename T>
struct EncoderT<T, std::enable_if_t<has_communicator_parameter<T>::value>>
69 template<
typename T>
struct DecoderT<T, std::enable_if_t<has_communicator_parameter<T>::value>>
77 template<
typename T>
class is_streamable
79 template<
typename TT,
typename SS>
80 static auto test(
int)
noexcept ->
decltype(std::declval<SS&>() << std::declval<TT>(), std::true_type());
82 template<
typename,
typename>
static auto test(...) noexcept -> std::false_type;
85 static constexpr
bool value = decltype(test<T, std::ostream>(0))::value;
88 template<typename T, typename Enabler =
void> struct Stringifier
90 static std::string toString(
const T& value)
92 std::ostringstream os;
93 os << IceInternal::demangle(
typeid(value).name()) <<
'(' << &value <<
')';
98 template<
typename T>
struct Stringifier<T, std::enable_if_t<is_streamable<T>::value>>
100 static std::string toString(
const T& value)
102 std::ostringstream os;
108 template<
typename T>
class AbstractElementT :
public virtual Element
111 template<
typename TT> AbstractElementT(TT&& v, std::int64_t
id) : _value(std::forward<TT>(v)), _id(id) {}
113 [[nodiscard]] std::string toString()
const override
115 std::ostringstream os;
116 os << _id <<
':' << Stringifier<T>::toString(_value);
122 return EncoderT<T>::encode(communicator, _value);
125 [[nodiscard]] std::int64_t getId()
const override {
return _id; }
127 [[nodiscard]]
const T& get()
const {
return _value; }
131 const std::int64_t _id;
134 template<
typename K,
typename V>
135 class AbstractFactoryT :
public std::enable_shared_from_this<AbstractFactoryT<K, V>>
141 void operator()(V* obj)
143 if (
auto factory = _factory.lock())
145 factory->remove(obj);
150 std::weak_ptr<AbstractFactoryT<K, V>> _factory;
155 AbstractFactoryT() =
default;
157 void init() { _deleter = Deleter{std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this()}; }
159 template<
typename F,
typename... Args>
160 [[nodiscard]] std::shared_ptr<typename V::BaseClassType> create(F&& value, Args&&... args)
162 std::lock_guard<std::mutex> lock(_mutex);
163 return createImpl(std::forward<F>(value), std::forward<Args>(args)...);
166 [[nodiscard]] std::vector<std::shared_ptr<typename V::BaseClassType>> create(std::vector<K> values)
168 std::lock_guard<std::mutex> lock(_mutex);
169 std::vector<std::shared_ptr<typename V::BaseClassType>> seq;
170 seq.reserve(values.size());
171 for (
auto& v : values)
173 seq.push_back(createImpl(std::move(v)));
179 friend struct Deleter;
181 [[nodiscard]] std::shared_ptr<typename V::BaseClassType> getImpl(std::int64_t
id)
const
183 std::lock_guard<std::mutex> lock(_mutex);
184 auto p = _elementsById.find(
id);
185 if (p != _elementsById.end())
187 return p->second.lock();
192 template<
typename F,
typename... Args> [[nodiscard]] std::shared_ptr<V> createImpl(F&& value, Args&&... args)
196 auto p = _elements.find(value);
197 if (p != _elements.end())
199 auto k = p->second.lock();
211 std::shared_ptr<V>(
new V(std::forward<F>(value), std::forward<Args>(args)..., ++_nextId), _deleter);
212 _elements[k->get()] = k;
213 _elementsById[k->getId()] = k;
221 std::shared_ptr<V> e;
222 std::lock_guard<std::mutex> lock(_mutex);
223 auto p = _elements.find(v->get());
224 if (p != _elements.end())
226 e = p->second.lock();
227 if (e && e.get() == v)
232 _elementsById.erase(v->getId());
235 mutable std::mutex _mutex;
236 std::map<K, std::weak_ptr<V>> _elements;
237 std::map<std::int64_t, std::weak_ptr<V>> _elementsById;
238 std::int64_t _nextId{1};
241 template<
typename K>
class KeyT final :
public Key,
public AbstractElementT<K>
244 [[nodiscard]] std::string toString() const final {
return "k" + AbstractElementT<K>::toString(); }
246 using AbstractElementT<K>::AbstractElementT;
247 using BaseClassType = Key;
250 template<
typename K>
class KeyFactoryT final :
public KeyFactory,
public AbstractFactoryT<K, KeyT<K>>
253 using AbstractFactoryT<K, KeyT<K>>::AbstractFactoryT;
255 [[nodiscard]] std::shared_ptr<Key> get(std::int64_t
id)
const final
257 return AbstractFactoryT<K, KeyT<K>>::getImpl(
id);
260 [[nodiscard]] std::shared_ptr<Key>
263 return AbstractFactoryT<K, KeyT<K>>::create(DecoderT<K>::decode(communicator, data));
266 [[nodiscard]]
static std::shared_ptr<KeyFactoryT<K>> createFactory()
268 auto f = std::make_shared<KeyFactoryT<K>>();
274 template<
typename T>
class TagT final :
public Tag,
public AbstractElementT<T>
277 [[nodiscard]] std::string toString() const final {
return "t" + AbstractElementT<T>::toString(); }
279 using AbstractElementT<T>::AbstractElementT;
280 using BaseClassType = Tag;
283 template<
typename T>
class TagFactoryT final :
public TagFactory,
public AbstractFactoryT<T, TagT<T>>
286 using AbstractFactoryT<T, TagT<T>>::AbstractFactoryT;
288 [[nodiscard]] std::shared_ptr<Tag> get(std::int64_t
id)
const final
290 return AbstractFactoryT<T, TagT<T>>::getImpl(
id);
293 [[nodiscard]] std::shared_ptr<Tag>
296 return AbstractFactoryT<T, TagT<T>>::create(DecoderT<T>::decode(communicator, data));
299 [[nodiscard]]
static std::shared_ptr<TagFactoryT<T>> createFactory()
301 auto f = std::make_shared<TagFactoryT<T>>();
307 template<
typename Key,
typename Value,
typename UpdateTag>
308 class SampleT final :
public Sample,
public std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>
316 const std::shared_ptr<DataStormI::Key>& key,
317 const std::shared_ptr<DataStormI::Tag>& tag,
319 std::int64_t timestamp)
320 : Sample(std::move(session), std::move(origin), id, event, key, tag, std::move(value), timestamp),
327 SampleT(
DataStorm::SampleEvent event, Value value) : Sample(event), _hasValue(true), _value(std::move(value)) {}
329 SampleT(
Ice::ByteSeq value,
const std::shared_ptr<Tag>& tag)
333 _encodedValue = std::move(value);
336 [[nodiscard]] DataStorm::Sample<Key, Value, UpdateTag> get()
338 auto impl = std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>::shared_from_this();
339 return DataStorm::Sample<Key, Value, UpdateTag>(impl);
342 [[nodiscard]]
const Key& getKey()
345 return std::static_pointer_cast<KeyT<Key>>(key)->get();
348 [[nodiscard]]
const Value& getValue()
const {
return _value; }
350 [[nodiscard]] UpdateTag getTag()
const
352 return tag ? std::static_pointer_cast<TagT<UpdateTag>>(tag)->get() : UpdateTag();
355 void setValue(Value value)
357 _value = std::move(value);
361 [[nodiscard]]
bool hasValue() const final {
return _hasValue; }
363 void setValue(
const std::shared_ptr<Sample>& sample)
final
368 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(sample)->getValue());
379 if (_encodedValue.empty())
381 _encodedValue = encodeValue(communicator);
383 return _encodedValue;
389 return EncoderT<Value>::encode(communicator, _value);
394 if (!_encodedValue.empty())
397 _value = DecoderT<Value>::decode(communicator, _encodedValue);
398 _encodedValue.clear();
407 template<
typename Key,
typename Value,
typename UpdateTag>
class SampleFactoryT final :
public SampleFactory
410 [[nodiscard]] std::shared_ptr<Sample> create(
415 const std::shared_ptr<DataStormI::Key>& key,
416 const std::shared_ptr<DataStormI::Tag>& tag,
418 std::int64_t timestamp)
final
420 return std::make_shared<SampleT<Key, Value, UpdateTag>>(
432 template<
typename C,
typename V>
class FilterT final :
public Filter,
public AbstractElementT<C>
435 template<
typename CC,
typename FF>
436 FilterT(CC&& criteria, std::string name, FF lambda, std::int64_t
id)
437 : AbstractElementT<C>::AbstractElementT(std::forward<CC>(criteria), id),
438 _name(std::move(name)),
439 _lambda(std::move(lambda))
443 [[nodiscard]] std::string toString() const final {
return "f" + AbstractElementT<C>::toString(); }
445 [[nodiscard]]
bool match(
const std::shared_ptr<Filterable>& value)
const final
447 return _lambda(std::static_pointer_cast<V>(value)->get());
450 [[nodiscard]]
const std::string& getName() const final {
return _name; }
452 using BaseClassType = Filter;
456 std::function<bool(const typename std::remove_reference<decltype(std::declval<V>().get())>::type&)> _lambda;
459 template<
typename C,
typename V>
460 class FilterFactoryT final :
public FilterFactory,
public AbstractFactoryT<C, FilterT<C, V>>
463 FilterFactoryT() =
default;
465 [[nodiscard]] std::shared_ptr<Filter> get(std::int64_t
id)
const final
467 return AbstractFactoryT<C, FilterT<C, V>>::getImpl(
id);
470 [[nodiscard]]
static std::shared_ptr<FilterFactoryT<C, V>> createFactory()
472 auto f = std::make_shared<FilterFactoryT<C, V>>();
478 template<
typename ValueT>
class FilterManagerT final :
public FilterManager
480 using Value = std::remove_reference_t<decltype(std::declval<ValueT>().get())>;
484 virtual ~Factory() =
default;
486 [[nodiscard]]
virtual std::shared_ptr<Filter> get(std::int64_t)
const = 0;
491 template<
typename Criteria>
struct FactoryT final : Factory
493 FactoryT(std::string name, std::function<std::function<
bool(
const Value&)>(
const Criteria&)> lambda)
494 : name(std::move(name)),
495 lambda(std::move(lambda))
499 [[nodiscard]] std::shared_ptr<Filter> create(
const Criteria& criteria)
501 return std::static_pointer_cast<FilterT<Criteria, ValueT>>(
502 filterFactory.create(criteria, name, lambda(criteria)));
505 [[nodiscard]] std::shared_ptr<Filter> get(std::int64_t
id)
const final {
return filterFactory.get(
id); }
507 [[nodiscard]] std::shared_ptr<Filter>
510 return create(DecoderT<Criteria>::decode(communicator, data));
513 const std::string name;
514 std::function<std::function<bool(const Value&)>(
const Criteria&)> lambda;
515 FilterFactoryT<Criteria, ValueT> filterFactory;
519 template<
typename Criteria>
520 [[nodiscard]] std::shared_ptr<Filter> create(
const std::string& name,
const Criteria& criteria)
522 auto p = _factories.find(name);
523 if (p == _factories.end())
525 throw std::invalid_argument(
"unknown filter '" + name +
"'");
528 auto factory =
dynamic_cast<FactoryT<Criteria>*
>(p->second.get());
531 throw std::invalid_argument(
"filter '" + name +
"' type doesn't match");
534 return factory->create(criteria);
537 [[nodiscard]] std::shared_ptr<Filter>
540 auto p = _factories.find(name);
541 if (p == _factories.end())
546 return p->second->decode(communicator, data);
549 [[nodiscard]] std::shared_ptr<Filter> get(
const std::string& name, std::int64_t
id)
const final
551 auto p = _factories.find(name);
552 if (p == _factories.end())
557 return p->second->get(
id);
560 template<
typename Criteria>
561 void set(std::string name, std::function<std::function<
bool(
const Value&)>(
const Criteria&)> lambda)
565 auto factory = std::make_unique<FactoryT<Criteria>>(name, std::move(lambda));
566 _factories.emplace(std::move(name), std::move(factory));
570 _factories.erase(name);
576 std::map<std::string, std::unique_ptr<Factory>> _factories;
580#if defined(__clang__)
581# pragma clang diagnostic pop
582#elif defined(__GNUC__)
583# pragma GCC diagnostic pop
A sample provides information about a data element update.
SampleEvent
Describes the operation used by a data writer to update a data element.
@ Remove
The data writer removed the element.
@ PartialUpdate
The data writer partially updated the element.
Data-centric, broker-less publish/subscribe framework. C++ only.
std::shared_ptr< Communicator > CommunicatorPtr
A shared pointer to a Communicator.
std::vector< std::byte > ByteSeq
A sequence of bytes.
static T clone(const T &value) noexcept
Clones the given value.
static T decode(const Ice::CommunicatorPtr &communicator, const Ice::ByteSeq &value) noexcept
Decodes a value.
static Ice::ByteSeq encode(const Ice::CommunicatorPtr &communicator, const T &value) noexcept
Encodes the given value.