3#ifndef DATASTORM_DATASTORM_H
4#define DATASTORM_DATASTORM_H
7#include "DataStorm/SampleEvent.h"
16# pragma clang diagnostic push
17# pragma clang diagnostic ignored "-Wshadow-field-in-constructor"
18#elif defined(__GNUC__)
19# pragma GCC diagnostic push
20# pragma GCC diagnostic ignored "-Wshadow"
29 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
class Sample
48 [[nodiscard]] const Key&
getKey() const noexcept;
54 [[nodiscard]] const Value&
getValue() const noexcept;
64 [[nodiscard]] std::chrono::time_point<std::chrono::system_clock>
getTimeStamp() const noexcept;
71 [[nodiscard]] const std::
string&
getOrigin() const noexcept;
76 [[nodiscard]] const std::
string&
getSession() const noexcept;
79 Sample(const std::shared_ptr<DataStormI::Sample>&) noexcept;
82 std::shared_ptr<DataStormI::SampleT<Key, Value, UpdateTag>> _impl;
89 inline std::ostream& operator<<(std::ostream& os, const
SampleEventSeq& types)
99 template<
typename K,
typename V,
typename U>
108 template<
typename Key,
typename Value,
typename UpdateTag>
class Reader
132 [[nodiscard]]
bool hasWriters() const noexcept;
161 [[nodiscard]]
bool hasUnread() const noexcept;
176 std::function<
void(std::vector<Key>)> init,
186 std::function<
void(std::vector<std::
string>)> init,
187 std::function<
void(
CallbackReason, std::
string)> update) noexcept;
196 std::function<
void(std::vector<
Sample<Key, Value, UpdateTag>>)> init,
197 std::function<
void(
Sample<Key, Value, UpdateTag>)> queue) noexcept;
201 Reader(const std::shared_ptr<DataStormI::DataReader>& impl) noexcept : _impl(impl) {}
204 std::shared_ptr<DataStormI::DataReader> _impl;
209 template<
typename Key,
typename Value,
typename UpdateTag>
class Writer
233 [[nodiscard]]
bool hasReaders() const noexcept;
259 [[nodiscard]] std::vector<
Sample<Key, Value, UpdateTag>>
getAll();
269 std::function<
void(std::vector<Key>)> init,
279 std::function<
void(std::vector<std::
string>)> init,
280 std::function<
void(
CallbackReason, std::
string)> update) noexcept;
284 Writer(const std::shared_ptr<DataStormI::DataWriter>& impl) noexcept : _impl(impl) {}
287 std::shared_ptr<DataStormI::DataWriter> _impl;
294 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
class Topic
318 Topic(
const Node& node, std::string name)
noexcept;
323 : _name(std::move(topic._name)),
324 _topicFactory(std::move(topic._topicFactory)),
325 _keyFactory(std::move(topic._keyFactory)),
326 _tagFactory(std::move(topic._tagFactory)),
327 _keyFilterFactories(std::move(topic._keyFilterFactories)),
328 _sampleFilterFactories(std::move(topic._sampleFilterFactories)),
329 _reader(std::move(topic._reader)),
330 _writer(std::move(topic._writer)),
331 _updaters(std::move(topic._updaters))
346 [[nodiscard]]
bool hasWriters() const noexcept;
351 void waitForWriters(
unsigned int count = 1) const;
355 void waitForNoWriters() const;
359 void setWriterDefaultConfig(const
WriterConfig& config) noexcept;
363 [[nodiscard]]
bool hasReaders() const noexcept;
368 void waitForReaders(
unsigned int count = 1) const;
372 void waitForNoReaders() const;
376 void setReaderDefaultConfig(const
ReaderConfig& config) noexcept;
383 template<typename UpdateValue>
384 void setUpdater(const UpdateTag& tag, std::function<
void(Value&, UpdateValue)> updater) noexcept;
390 template<typename Criteria>
393 std::function<std::function<
bool(const Key&)>(const Criteria&)> factory) noexcept;
399 template<typename Criteria>
400 void setSampleFilter(
402 std::function<std::function<
bool(const SampleType&)>(const Criteria&)> factory) noexcept;
405 [[nodiscard]] std::shared_ptr<DataStormI::TopicReader> getReader() const;
406 [[nodiscard]] std::shared_ptr<DataStormI::TopicWriter> getWriter() const;
407 [[nodiscard]]
Ice::CommunicatorPtr getCommunicator() const noexcept;
410 template<typename, typename, typename> friend class
MultiKeyWriter;
412 template<typename, typename, typename> friend class
MultiKeyReader;
417 std::shared_ptr<DataStormI::TopicFactory> _topicFactory;
418 std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
419 std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
420 std::shared_ptr<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>> _keyFilterFactories;
421 std::shared_ptr<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>> _sampleFilterFactories;
423 mutable std::mutex _mutex;
424 mutable std::shared_ptr<DataStormI::TopicReader> _reader;
425 mutable std::shared_ptr<DataStormI::TopicWriter> _writer;
426 mutable std::map<std::shared_ptr<DataStormI::Tag>, DataStormI::
Topic::Updater> _updaters;
436 template<
typename TT>
451 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
464 std::string name = std::string(),
475 template<
typename SampleFilterCriteria>
480 std::string name = std::string(),
495 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
508 const std::vector<Key>& keys,
509 std::string name = std::string(),
521 template<
typename SampleFilterCriteria>
524 const std::vector<Key>& keys,
526 std::string name = std::string(),
545 template<
typename K,
typename V,
typename UT>
549 std::string name = std::string(),
562 template<
typename SFC,
typename K,
typename V,
typename UT>
567 std::string name = std::string(),
580 template<
typename K,
typename V,
typename UT>
584 std::string name = std::string(),
598 template<
typename SFC,
typename K,
typename V,
typename UT>
603 std::string name = std::string(),
615 template<
typename K,
typename V,
typename UT>
618 std::string name = std::string(),
631 template<
typename SFC,
typename K,
typename V,
typename UT>
635 std::string name = std::string(),
643 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
655 template<
typename KeyFilterCriteria>
659 std::string name = std::string(),
671 template<
typename KeyFilterCriteria,
typename SampleFilterCriteria>
676 std::string name = std::string(),
695 template<
typename KFC,
typename K,
typename V,
typename UT>
699 std::string name = std::string(),
712 template<
typename KFC,
typename SFC,
typename K,
typename V,
typename UT>
717 std::string name = std::string(),
725 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
738 std::string name = std::string(),
752 void add(
const Value& value);
756 void update(
const Value& value);
763 template<
typename UpdateValue>
764 [[nodiscard]] std::function<void(
const UpdateValue&)>
partialUpdate(
const UpdateTag& tag);
770 const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
775 template<
typename Key,
typename Value,
typename UpdateTag = std::
string>
788 const std::vector<Key>& keys,
789 std::string name = std::string(),
804 void add(
const Key& key,
const Value& value);
809 void update(
const Key& key,
const Value& value);
816 template<
typename UpdateValue>
817 [[nodiscard]] std::function<void(
const Key&,
const UpdateValue&)>
partialUpdate(
const UpdateTag& tag);
821 void remove(
const Key& key)
noexcept;
824 const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
825 const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
834 template<
typename K,
typename V,
typename UT>
838 std::string name = std::string(),
850 template<
typename K,
typename V,
typename UT>
854 std::string name = std::string(),
865 template<
typename K,
typename V,
typename UT>
868 std::string name = std::string(),
881 template<
typename Key,
typename Value,
typename UpdateTag>
887 template<
typename Key,
typename Value,
typename UpdateTag>
890 return _impl->getKey();
893 template<
typename Key,
typename Value,
typename UpdateTag>
896 return _impl->getValue();
899 template<
typename Key,
typename Value,
typename UpdateTag>
902 return _impl->getTag();
905 template<
typename Key,
typename Value,
typename UpdateTag>
908 return _impl->timestamp;
911 template<
typename Key,
typename Value,
typename UpdateTag>
914 return _impl->origin;
917 template<
typename Key,
typename Value,
typename UpdateTag>
920 return _impl->session;
923 template<
typename Key,
typename Value,
typename UpdateTag>
924 Sample<Key, Value, UpdateTag>::Sample(
const std::shared_ptr<DataStormI::Sample>& impl) noexcept
925 : _impl(std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(impl))
932 template<
typename Key,
typename Value,
typename UpdateTag>
934 : _impl(std::move(reader._impl))
946 template<
typename Key,
typename Value,
typename UpdateTag>
953 _impl = std::move(reader._impl);
957 template<
typename Key,
typename Value,
typename UpdateTag>
960 return _impl->hasWriters();
963 template<
typename Key,
typename Value,
typename UpdateTag>
966 _impl->waitForWriters(
static_cast<int>(count));
969 template<
typename Key,
typename Value,
typename UpdateTag>
972 _impl->waitForWriters(-1);
975 template<
typename Key,
typename Value,
typename UpdateTag>
978 return _impl->getConnectedElements();
981 template<
typename Key,
typename Value,
typename UpdateTag>
984 std::vector<Key> keys;
985 auto connectedKeys = _impl->getConnectedKeys();
986 keys.reserve(connectedKeys.size());
987 for (
const auto& k : connectedKeys)
989 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
994 template<
typename Key,
typename Value,
typename UpdateTag>
997 auto unread = _impl->getAllUnread();
998 std::vector<Sample<Key, Value, UpdateTag>> samples;
999 samples.reserve(unread.size());
1000 for (
const auto& sample : unread)
1002 samples.push_back(sample);
1007 template<
typename Key,
typename Value,
typename UpdateTag>
1010 _impl->waitForUnread(count);
1013 template<
typename Key,
typename Value,
typename UpdateTag>
1016 return _impl->hasUnread();
1019 template<
typename Key,
typename Value,
typename UpdateTag>
1025 template<
typename Key,
typename Value,
typename UpdateTag>
1027 std::function<
void(std::vector<Key>)> init,
1030 _impl->onConnectedKeys(
1032 [init = std::move(init)](
const std::vector<std::shared_ptr<DataStormI::Key>>& connectedKeys)
1034 std::vector<Key> keys;
1035 keys.reserve(connectedKeys.size());
1036 for(
const auto& k : connectedKeys)
1038 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1040 init(std::move(keys));
1041 } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>{},
1043 [update = std::move(update)](
CallbackReason action,
const std::shared_ptr<DataStormI::Key>& key)
1045 update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
1046 } : std::function<void(
CallbackReason, std::shared_ptr<DataStormI::Key>)>{});
1049 template<
typename Key,
typename Value,
typename UpdateTag>
1051 std::function<
void(std::vector<std::string>)> init,
1052 std::function<
void(
CallbackReason, std::string)> update)
noexcept
1054 _impl->onConnectedElements(std::move(init), std::move(update));
1057 template<
typename Key,
typename Value,
typename UpdateTag>
1062 auto communicator = _impl->getCommunicator();
1065 [communicator, init = std::move(init)](
const std::vector<std::shared_ptr<DataStormI::Sample>>& samplesI)
1067 std::vector<Sample<Key, Value, UpdateTag>> samples;
1068 samples.reserve(samplesI.size());
1069 for(
const auto& s : samplesI)
1071 samples.emplace_back(s);
1073 init(std::move(samples));
1074 } : std::function<void(
const std::vector<std::shared_ptr<DataStormI::Sample>>&)>(),
1076 [communicator, update = std::move(update)](
const std::shared_ptr<DataStormI::Sample>& sampleI)
1079 } : std::function<void(
const std::shared_ptr<DataStormI::Sample>&)>{});
1082 template<
typename Key,
typename Value,
typename UpdateTag>
1088 :
Reader<Key, Value, UpdateTag>(
1089 topic.getReader()->create({topic._keyFactory->create(key)}, std::move(name), config))
1093 template<
typename Key,
typename Value,
typename UpdateTag>
1094 template<
typename SampleFilterCriteria>
1101 :
Reader<Key, Value, UpdateTag>(topic.getReader()->create(
1102 {topic._keyFactory->create(key)},
1106 DataStormI::EncoderT<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.
criteria)))
1110 template<
typename Key,
typename Value,
typename UpdateTag>
1116 template<
typename Key,
typename Value,
typename UpdateTag>
1124 template<
typename Key,
typename Value,
typename UpdateTag>
1127 const std::vector<Key>& keys,
1130 :
Reader<Key, Value, UpdateTag>(
1131 topic.getReader()->create(topic._keyFactory->create(keys), std::move(name), config))
1135 template<
typename Key,
typename Value,
typename UpdateTag>
1136 template<
typename SampleFilterCriteria>
1139 const std::vector<Key>& keys,
1143 :
Reader<Key, Value, UpdateTag>(topic.getReader()->create(
1144 topic._keyFactory->create(keys),
1148 Encoder<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
1152 template<
typename Key,
typename Value,
typename UpdateTag>
1158 template<
typename Key,
typename Value,
typename UpdateTag>
1166 template<
typename Key,
typename Value,
typename UpdateTag>
1167 template<
typename KeyFilterCriteria>
1173 :
Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
1174 topic._keyFilterFactories->create(filter.name, filter.criteria),
1180 template<
typename Key,
typename Value,
typename UpdateTag>
1181 template<
typename KeyFilterCriteria,
typename SampleFilterCriteria>
1188 :
Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
1189 topic._keyFilterFactories->create(keyFilter.name, keyFilter.criteria),
1193 Encoder<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
1197 template<
typename Key,
typename Value,
typename UpdateTag>
1204 template<
typename Key,
typename Value,
typename UpdateTag>
1215 template<
typename Key,
typename Value,
typename UpdateTag>
1228 template<
typename Key,
typename Value,
typename UpdateTag>
1235 _impl = std::move(writer._impl);
1239 template<
typename Key,
typename Value,
typename UpdateTag>
1245 template<
typename Key,
typename Value,
typename UpdateTag>
1248 return _impl->waitForReaders(
static_cast<int>(count));
1251 template<
typename Key,
typename Value,
typename UpdateTag>
1254 return _impl->waitForReaders(-1);
1257 template<
typename Key,
typename Value,
typename UpdateTag>
1260 return _impl->getConnectedElements();
1263 template<
typename Key,
typename Value,
typename UpdateTag>
1266 std::vector<Key> keys;
1267 auto connectedKeys = _impl->getConnectedKeys();
1268 keys.reserve(connectedKeys.size());
1269 for (
const auto& k : connectedKeys)
1271 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1276 template<
typename Key,
typename Value,
typename UpdateTag>
1279 auto sample = _impl->getLast();
1282 throw std::logic_error(
"no sample");
1287 template<
typename Key,
typename Value,
typename UpdateTag>
1290 auto all = _impl->getAll();
1291 std::vector<Sample<Key, Value, UpdateTag>> samples;
1292 samples.reserve(all.size());
1293 for (
const auto& sample : all)
1295 samples.push_back(sample);
1300 template<
typename Key,
typename Value,
typename UpdateTag>
1302 std::function<
void(std::vector<Key>)> init,
1305 _impl->onConnectedKeys(
1307 [init = std::move(init)](
const std::vector<std::shared_ptr<DataStormI::Key>>& connectedKeys)
1309 std::vector<Key> keys;
1310 keys.reserve(connectedKeys.size());
1311 for(
const auto& k : connectedKeys)
1313 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1315 init(std::move(keys));
1316 } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>{},
1318 [update = std::move(update)](
CallbackReason action,
const std::shared_ptr<DataStormI::Key>& key)
1320 update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
1321 } : std::function<void(
CallbackReason, std::shared_ptr<DataStormI::Key>)>{});
1324 template<
typename Key,
typename Value,
typename UpdateTag>
1326 std::function<
void(std::vector<std::string>)> init,
1327 std::function<
void(
CallbackReason, std::string)> update)
noexcept
1329 _impl->onConnectedElements(std::move(init), std::move(update));
1332 template<
typename Key,
typename Value,
typename UpdateTag>
1338 :
Writer<Key, Value, UpdateTag>(
1339 topic.getWriter()->create({topic._keyFactory->create(key)}, std::move(name), config)),
1340 _tagFactory(topic._tagFactory)
1344 template<
typename Key,
typename Value,
typename UpdateTag>
1347 _tagFactory(std::move(writer._tagFactory))
1351 template<
typename Key,
typename Value,
typename UpdateTag>
1359 template<
typename Key,
typename Value,
typename UpdateTag>
1362 Writer<Key, Value, UpdateTag>::_impl->publish(
1364 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(
SampleEvent::Add, value));
1367 template<
typename Key,
typename Value,
typename UpdateTag>
1370 Writer<Key, Value, UpdateTag>::_impl->publish(
1372 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(
SampleEvent::Update, value));
1375 template<
typename Key,
typename Value,
typename UpdateTag>
1376 template<
typename UpdateValue>
1379 auto impl = Writer<Key, Value, UpdateTag>::_impl;
1380 auto updateTag = _tagFactory->create(tag);
1381 return [impl, updateTag](
const UpdateValue& value)
1384 impl->publish(
nullptr, std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1388 template<
typename Key,
typename Value,
typename UpdateTag>
1391 Writer<Key, Value, UpdateTag>::_impl->publish(
1396 template<
typename Key,
typename Value,
typename UpdateTag>
1399 const std::vector<Key>& keys,
1402 :
Writer<Key, Value, UpdateTag>(
1403 topic.getWriter()->create(topic._keyFactory->create(keys), std::move(name), config)),
1404 _keyFactory(topic._keyFactory),
1405 _tagFactory(topic._tagFactory)
1409 template<
typename Key,
typename Value,
typename UpdateTag>
1412 _keyFactory(std::move(writer._keyFactory)),
1413 _tagFactory(std::move(writer._tagFactory))
1417 template<
typename Key,
typename Value,
typename UpdateTag>
1425 template<
typename Key,
typename Value,
typename UpdateTag>
1428 Writer<Key, Value, UpdateTag>::_impl->publish(
1429 _keyFactory->create(key),
1430 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(
SampleEvent::Add, value));
1433 template<
typename Key,
typename Value,
typename UpdateTag>
1436 Writer<Key, Value, UpdateTag>::_impl->publish(
1437 _keyFactory->create(key),
1438 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(
SampleEvent::Update, value));
1441 template<
typename Key,
typename Value,
typename UpdateTag>
1442 template<
typename UpdateValue>
1443 std::function<void(
const Key&,
const UpdateValue&)>
1446 auto impl = Writer<Key, Value, UpdateTag>::_impl;
1447 auto updateTag = _tagFactory->create(tag);
1448 auto keyFactory = _keyFactory;
1449 return [impl, updateTag, keyFactory](
const Key& key,
const UpdateValue& value)
1453 keyFactory->create(key),
1454 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1458 template<
typename Key,
typename Value,
typename UpdateTag>
1461 Writer<Key, Value, UpdateTag>::_impl->publish(
1462 _keyFactory->create(key),
1467 template<
typename Value> std::function<std::function<bool(const Value&)>(
const std::string&)> makeRegexFilter()
1470 return [](
const std::string& criteria)
1472 std::regex expr(criteria);
1473 return [expr = std::move(expr)](
const Value& value)
1475 std::ostringstream os;
1477 return std::regex_match(os.str(), expr);
1483 template<
typename Key,
typename Value,
typename UpdateTag>
1490 {
return std::find(criteria.begin(), criteria.end(), sample.getEvent()) != criteria.end(); };
1495 template<
typename T,
typename V,
typename Enabler =
void>
struct RegexFilter
1497 template<
typename F>
static void add(
const F&) {}
1501 template<
typename T,
typename V>
struct RegexFilter<T, V, std::enable_if_t<DataStormI::is_streamable<V>::value>>
1503 template<
typename F>
static void add(
const F& factory)
1505 factory->set(
"_regex", makeRegexFilter<T>());
1512 template<
typename Key,
typename Value,
typename UpdateTag>
1514 : _name(std::move(name)),
1515 _topicFactory(node._factory),
1516 _keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
1517 _tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
1518 _keyFilterFactories(std::make_shared<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>>()),
1519 _sampleFilterFactories(
1520 std::make_shared<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>>())
1522 RegexFilter<Key, Key>::add(_keyFilterFactories);
1523 RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
1524 _sampleFilterFactories->set(
"_event", makeSampleEventFilter(*
this));
1529 std::lock_guard<std::mutex> lock(_mutex);
1540 template<
typename Key,
typename Value,
typename UpdateTag>
1543 std::lock_guard<std::mutex> lock(_mutex);
1552 _name = std::move(topic._name);
1553 _topicFactory = std::move(topic._topicFactory);
1554 _keyFactory = std::move(topic._keyFactory);
1555 _tagFactory = std::move(topic._tagFactory);
1556 _keyFilterFactories = std::move(topic._keyFilterFactories);
1557 _sampleFilterFactories = std::move(topic._sampleFilterFactories);
1558 _reader = std::move(topic._reader);
1559 _writer = std::move(topic._writer);
1560 _updaters = std::move(topic._updaters);
1564 template<
typename Key,
typename Value,
typename UpdateTag>
1570 template<
typename Key,
typename Value,
typename UpdateTag>
1573 getReader()->waitForWriters(
static_cast<int>(count));
1576 template<
typename Key,
typename Value,
typename UpdateTag>
1579 getReader()->waitForWriters(-1);
1582 template<
typename Key,
typename Value,
typename UpdateTag>
1585 getReader()->setDefaultConfig(config);
1588 template<
typename Key,
typename Value,
typename UpdateTag>
1591 return getWriter()->hasReaders();
1594 template<
typename Key,
typename Value,
typename UpdateTag>
1597 getWriter()->waitForReaders(
static_cast<int>(count));
1600 template<
typename Key,
typename Value,
typename UpdateTag>
1603 getWriter()->waitForReaders(-1);
1606 template<
typename Key,
typename Value,
typename UpdateTag>
1609 getWriter()->setDefaultConfig(config);
1612 template<
typename Key,
typename Value,
typename UpdateTag>
1613 template<
typename UpdateValue>
1615 const UpdateTag& tag,
1616 std::function<
void(Value&, UpdateValue)> updater)
noexcept
1618 std::lock_guard<std::mutex> lock(_mutex);
1619 auto tagI = _tagFactory->create(std::move(tag));
1622 [updater = std::move(updater)](
const std::shared_ptr<DataStormI::Sample>& previous,
1623 const std::shared_ptr<DataStormI::Sample>& next,
1630 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
1633 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(next)->setValue(std::move(value));
1634 } : std::function<void(
const std::shared_ptr<DataStormI::Sample>&,
1635 const std::shared_ptr<DataStormI::Sample>&,
1638 if (_reader && !_writer)
1640 _reader->setUpdater(tagI, updaterImpl);
1642 else if (_writer && !_reader)
1644 _writer->setUpdater(tagI, updaterImpl);
1646 else if (_reader && _writer)
1648 _reader->setUpdater(tagI, updaterImpl);
1649 _writer->setUpdater(tagI, updaterImpl);
1653 _updaters[tagI] = updaterImpl;
1657 template<
typename Key,
typename Value,
typename UpdateTag>
1658 template<
typename Criteria>
1661 std::function<std::function<
bool(
const Key&)>(
const Criteria&)> factory)
noexcept
1663 std::lock_guard<std::mutex> lock(_mutex);
1664 _keyFilterFactories->set(std::move(name), std::move(factory));
1667 template<
typename Key,
typename Value,
typename UpdateTag>
1668 template<
typename Criteria>
1671 std::function<std::function<
bool(
const SampleType&)>(
const Criteria&)> factory)
noexcept
1673 std::lock_guard<std::mutex> lock(_mutex);
1674 _sampleFilterFactories->set(std::move(name), std::move(factory));
1677 template<
typename Key,
typename Value,
typename UpdateTag>
1678 std::shared_ptr<DataStormI::TopicReader> Topic<Key, Value, UpdateTag>::getReader()
const
1680 std::lock_guard<std::mutex> lock(_mutex);
1683 auto sampleFactory = std::make_shared<DataStormI::SampleFactoryT<Key, Value, UpdateTag>>();
1684 _reader = _topicFactory->createTopicReader(
1688 std::move(sampleFactory),
1689 _keyFilterFactories,
1690 _sampleFilterFactories);
1691 _reader->setUpdaters(_writer ? _writer->getUpdaters() : _updaters);
1697 template<
typename Key,
typename Value,
typename UpdateTag>
1698 std::shared_ptr<DataStormI::TopicWriter> Topic<Key, Value, UpdateTag>::getWriter()
const
1700 std::lock_guard<std::mutex> lock(_mutex);
1703 _writer = _topicFactory->createTopicWriter(
1708 _keyFilterFactories,
1709 _sampleFilterFactories);
1710 _writer->setUpdaters(_reader ? _reader->getUpdaters() : _updaters);
1716 template<
typename Key,
typename Value,
typename UpdateTag>
1719 return _topicFactory->getCommunicator();
1723#if defined(__clang__)
1724# pragma clang diagnostic pop
1725#elif defined(__GNUC__)
1726# pragma GCC diagnostic pop
FilteredKeyReader & operator=(FilteredKeyReader &&reader) noexcept
Move assignment operator.
FilteredKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Filter< KeyFilterCriteria > &keyFilter, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Constructs a new reader for the given key filter.
The filtered reader to read data elements whose key match a given filter.
MultiKeyReader(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Constructs a new reader for the given keys.
MultiKeyReader & operator=(MultiKeyReader &&reader) noexcept
Move assignment operator.
The key reader to read the data element associated with a given set of keys.
std::function< void(const Key &, const UpdateValue &)> partialUpdate(const UpdateTag &tag)
Gets a partial update generator function for the given partial update tag.
void add(const Key &key, const Value &value)
Adds the data element.
MultiKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Constructs a new writer for the given keys.
void remove(const Key &key) noexcept
Removes the data element.
void update(const Key &key, const Value &value)
Updates the data element.
MultiKeyWriter & operator=(MultiKeyWriter &&writer) noexcept
Move assignment operator.
The key writer to write data elements associated with a given set of keys.
The Node class allows creating topic readers and writers.
The ReaderConfig class specifies configuration options specific to readers.
std::vector< Key > getConnectedKeys() const
Gets the keys for which writers are connected to this reader.
void onConnectedWriters(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected writers and when a new writer conne...
std::vector< std::string > getConnectedWriters() const
Gets the connected writers.
Reader & operator=(Reader &&reader) noexcept
Move assignment operator.
void waitForNoWriters() const
Waits for writers to be offline.
Value ValueType
The value type.
Reader(Reader &&reader) noexcept
Move constructor.
void waitForWriters(unsigned int count=1) const
Waits for the given number of writers to be online.
bool hasUnread() const noexcept
Returns whether or not unread samples are available.
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
std::vector< Sample< Key, Value, UpdateTag > > getAllUnread()
Returns all the unread samples.
Sample< Key, Value, UpdateTag > getNextUnread()
Returns the next unread sample.
bool hasWriters() const noexcept
Indicates whether or not writers are online.
void waitForUnread(unsigned int count=1) const
Waits for the given number of unread samples to be available.
void onSamples(std::function< void(std::vector< Sample< Key, Value, UpdateTag > >)> init, std::function< void(Sample< Key, Value, UpdateTag >)> queue) noexcept
Calls the given function to provide the initial set of unread samples and when new samples are queued...
The Reader class is used to retrieve samples for a data element.
const Value & getValue() const noexcept
Value ValueType
The type of the sample value.
const std::string & getSession() const noexcept
UpdateTag getUpdateTag() const
const std::string & getOrigin() const noexcept
UpdateTag UpdateTagType
The type of the update tag.
Key KeyType
The type of the sample key.
SampleEvent getEvent() const noexcept
Gets the event associated with the sample.
std::chrono::time_point< std::chrono::system_clock > getTimeStamp() const noexcept
const Key & getKey() const noexcept
A sample provides information about a data element update.
SingleKeyReader & operator=(SingleKeyReader &&reader) noexcept
Move assignment operator.
SingleKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Key &key, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Constructs a new reader for the given key.
The key reader to read the data element associated with a given key.
std::function< void(const UpdateValue &)> partialUpdate(const UpdateTag &tag)
Gets a partial update generator function for the given partial update tag.
void add(const Value &value)
Adds the data element.
void update(const Value &value)
Updates the data element.
void remove() noexcept
Removes the data element. This generates a SampleEvent::Remove sample.
SingleKeyWriter & operator=(SingleKeyWriter &&writer) noexcept
Move assignment operator.
SingleKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const Key &key, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Constructs a new writer for the given key.
The key writer to write the data element associated with a given key.
void setUpdater(const UpdateTag &tag, std::function< void(Value &, UpdateValue)> updater) noexcept
Sets an updater function for the given update tag.
Topic(const Node &node, std::string name) noexcept
Constructs a new Topic for the topic with the given name.
void setKeyFilter(std::string name, std::function< std::function< bool(const Key &)>(const Criteria &)> factory) noexcept
Sets a key filter factory.
UpdateTag UpdateTagType
The topic's update tag type (defaults to std::string if not specified).
Key KeyType
The topic's key type.
void waitForReaders(unsigned int count=1) const
Waits for the given number of data readers to be online.
void waitForNoWriters() const
Waits for data writers to be offline.
Value ValueType
The topic's value type.
bool hasReaders() const noexcept
Indicates whether or not data readers are online.
Sample< Key, Value, UpdateTag > SampleType
The topic's sample type.
void setSampleFilter(std::string name, std::function< std::function< bool(const SampleType &)>(const Criteria &)> factory) noexcept
Sets a sample filter factory.
Reader< Key, Value, UpdateTag > ReaderType
The topic's reader type.
bool hasWriters() const noexcept
Indicates whether or not data writers are online.
void setWriterDefaultConfig(const WriterConfig &config) noexcept
Sets the default configuration used to construct readers.
Topic & operator=(Topic &&topic) noexcept
Move assignment operator.
Writer< Key, Value, UpdateTag > WriterType
The topic's writer type.
void setReaderDefaultConfig(const ReaderConfig &config) noexcept
Sets the default configuration used to construct readers.
Topic(Topic &&topic) noexcept
Move constructor.
void waitForWriters(unsigned int count=1) const
Waits for the given number of data writers to be online.
void waitForNoReaders() const
Waits for data readers to be offline.
The WriterConfig class specifies configuration options specific to writers.
Sample< Key, Value, UpdateTag > getLast()
Gets the last written sample.
Writer & operator=(Writer &&writer) noexcept
Move assignment operator.
std::vector< Sample< Key, Value, UpdateTag > > getAll()
Gets all the written sample kept in the writer history.
std::vector< std::string > getConnectedReaders() const
Gets the connected readers.
void onConnectedReaders(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected readers and when a new reader conne...
void waitForReaders(unsigned int count=1) const
Waits for the given number of readers to be online.
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
bool hasReaders() const noexcept
Indicates whether or not readers are online.
std::vector< Key > getConnectedKeys() const
Gets the keys for which readers are connected to this writer.
Value ValueType
The value type.
Writer(Writer &&writer) noexcept
Move constructor.
void waitForNoReaders() const
Waits for readers to be offline.
The Writer class is used to write samples for a data element.
SampleEvent
Describes the operation used by a data writer to update a data element.
@ Update
The data writer updated the element.
@ Remove
The data writer removed the element.
@ Add
The data writer added the element.
SingleKeyReader< K, V, UT > makeSingleKeyReader(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a key reader for the given topic and key.
FilteredKeyReader< K, V, UT > makeFilteredKeyReader(const Topic< K, V, UT > &topic, const Filter< KFC > &filter, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a new filtered reader for the given topic and key filter.
MultiKeyReader< K, V, UT > makeAnyKeyReader(const Topic< K, V, UT > &topic, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates an any-key reader for the given topic.
std::vector< SampleEvent > SampleEventSeq
A sequence of sample events.
MultiKeyWriter< K, V, UT > makeMultiKeyWriter(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Creates a multi-key writer for the given topic and keys.
SingleKeyWriter< K, V, UT > makeSingleKeyWriter(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Creates a key writer for the given topic and key.
MultiKeyReader< K, V, UT > makeMultiKeyReader(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a multi-key reader for the given topic.
std::ostream & operator<<(std::ostream &os, const SampleEventSeq &types)
Converts the given sample type vector to a string and add it to the stream.
MultiKeyWriter< K, V, UT > makeAnyKeyWriter(const Topic< K, V, UT > &topic, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Creates an any-key writer for the given topic.
CallbackReason
The callback action enumerator specifies the reason why a callback is called.
Data-centric, broker-less publish/subscribe framework. C++ only.
std::shared_ptr< Communicator > CommunicatorPtr
A shared pointer to a Communicator.
void print(std::ostream &stream, T v)
Prints a value to a stream.
static T clone(const T &value) noexcept
Clones the given value.
static T decode(const Ice::CommunicatorPtr &communicator, const Ice::ByteSeq &value) noexcept
Decodes a value.
static Ice::ByteSeq encode(const Ice::CommunicatorPtr &communicator, const T &value) noexcept
Encodes the given value.
The Encoder template provides a method to encode decode user types.
Filter(std::string name, TT &&criteria) noexcept
Constructs a filter structure with the given name and criteria.
std::string name
The filter name.
T criteria
The filter criteria value.
Filter structure to specify the filter name and criteria value.