Ice 3.8
C++ API Reference
Loading...
Searching...
No Matches
DataStorm.h
1// Copyright (c) ZeroC, Inc.
2
3#ifndef DATASTORM_DATASTORM_H
4#define DATASTORM_DATASTORM_H
5
6#include "Config.h"
7#include "DataStorm/SampleEvent.h"
8#include "InternalI.h"
9#include "InternalT.h"
10#include "Node.h"
11#include "Types.h"
12
13#include <regex>
14
15#if defined(__clang__)
16# pragma clang diagnostic push
17# pragma clang diagnostic ignored "-Wshadow-field-in-constructor"
18#elif defined(__GNUC__)
19# pragma GCC diagnostic push
20# pragma GCC diagnostic ignored "-Wshadow"
21#endif
22
23namespace DataStorm
24{
25 /// A sample provides information about a data element update.
26 /// The Sample template provides access to the key, value as well as additional information such as the event,
27 /// timestamp, update tag. Samples are generated and published by writers and received by readers.
28 /// @headerfile DataStorm/DataStorm.h
29 template<typename Key, typename Value, typename UpdateTag = std::string> class Sample
30 {
31 public:
32 /// The type of the sample key.
33 using KeyType = Key;
34
35 /// The type of the sample value.
36 using ValueType = Value;
37
38 /// The type of the update tag. The update tag type defaults to string if it's not explicitly specified
39 /// with the Sample template parameters.
40 using UpdateTagType = UpdateTag;
41
42 /// Gets the event associated with the sample.
43 /// @return The sample event.
44 [[nodiscard]] SampleEvent getEvent() const noexcept;
45
46 /// Gets the key of the sample.
47 /// @return The sample key.
48 [[nodiscard]] const Key& getKey() const noexcept;
49
50 /// Gets the value of the sample.
51 /// Depending on the sample event, the sample value might not always be available. It's the case if the
52 /// sample event is Remove where this method will return a default value.
53 /// @return The sample value.
54 [[nodiscard]] const Value& getValue() const noexcept;
55
56 /// Gets the update tag for the partial update.
57 /// This method should only be called if the sample event is PartialUpdate.
58 /// @return The update tag.
59 [[nodiscard]] UpdateTag getUpdateTag() const;
60
61 /// Gets the timestamp of the sample.
62 /// The timestamp is generated by the writer and corresponds to the time of sending.
63 /// @return The timestamp.
64 [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> getTimeStamp() const noexcept;
65
66 /// Gets the origin of the sample.
67 /// The origin of the sample identifies uniquely on the node the writer that created the sample. It's the
68 /// name of the writer if a name was explicitly provided on creation of the writer. Otherwise, if no name
69 /// was provided, an unique identifier is generated by DataStorm.
70 /// @return The origin of the sample.
71 [[nodiscard]] const std::string& getOrigin() const noexcept;
72
73 /// Gets the session identifier of the session that received this sample.
74 /// This session identifier can be used to retrieve the Ice connection with the node.
75 /// @return The session identifier.
76 [[nodiscard]] const std::string& getSession() const noexcept;
77
78 /// @private
79 Sample(const std::shared_ptr<DataStormI::Sample>&) noexcept;
80
81 private:
82 std::shared_ptr<DataStormI::SampleT<Key, Value, UpdateTag>> _impl;
83 };
84
85 /// Converts the given sample type vector to a string and add it to the stream.
86 /// @param os The output stream
87 /// @param types The sample type vector to add to the stream
88 /// @return The output stream
89 inline std::ostream& operator<<(std::ostream& os, const SampleEventSeq& types)
90 {
91 Ice::print(os, types);
92 return os;
93 }
94
95 /// Converts the given sample to a string and add it to the stream. The implementation outputs the sample value.
96 /// @param os The output stream
97 /// @param sample The sample to add to the stream
98 /// @return The output stream
99 template<typename K, typename V, typename U>
100 std::ostream& operator<<(std::ostream& os, const Sample<K, V, U>& sample)
101 {
102 os << sample.getValue();
103 return os;
104 }
105
106 /// The Reader class is used to retrieve samples for a data element.
107 /// @headerfile DataStorm/DataStorm.h
108 template<typename Key, typename Value, typename UpdateTag> class Reader
109 {
110 public:
111 /// The key type.
112 using KeyType = Key;
113
114 /// The value type.
115 using ValueType = Value;
116
117 /// Move constructor.
118 /// @param reader The reader to move from.
119 Reader(Reader&& reader) noexcept;
120
121 /// Destructor.
122 /// The destruction of the reader disconnects the reader from the writers.
123 ~Reader();
124
125 /// Move assignment operator.
126 /// @param reader The reader to remove from.
127 /// @return A reference to this reader.
128 Reader& operator=(Reader&& reader) noexcept;
129
130 /// Indicates whether or not writers are online.
131 /// @return `true` if writers are connected, `false` otherwise.
132 [[nodiscard]] bool hasWriters() const noexcept;
133
134 /// Waits for the given number of writers to be online.
135 /// @param count The number of writers to wait for.
136 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
137 void waitForWriters(unsigned int count = 1) const;
138
139 /// Waits for writers to be offline.
140 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
141 void waitForNoWriters() const;
142
143 /// Gets the connected writers.
144 /// @return The names of the connected writers.
145 [[nodiscard]] std::vector<std::string> getConnectedWriters() const;
146
147 /// Gets the keys for which writers are connected to this reader.
148 /// @return The keys for which we have writers connected.
149 [[nodiscard]] std::vector<Key> getConnectedKeys() const;
150
151 /// Returns all the unread samples.
152 /// @return The unread samples.
153 [[nodiscard]] std::vector<Sample<Key, Value, UpdateTag>> getAllUnread();
154
155 /// Waits for the given number of unread samples to be available.
156 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
157 void waitForUnread(unsigned int count = 1) const;
158
159 /// Returns whether or not unread samples are available.
160 /// @return `true` if there unread samples are queued, `false` otherwise.
161 [[nodiscard]] bool hasUnread() const noexcept;
162
163 /// Returns the next unread sample.
164 /// @return The unread sample.
165 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
166 [[nodiscard]] Sample<Key, Value, UpdateTag> getNextUnread();
167
168 /// Calls the given functions to provide the initial set of connected keys and when a key is added or
169 /// removed from the set of connected keys. If callback functions are already set, they will be replaced.
170 /// The connected keys represent the set of keys for which writers are connected to this reader.
171 /// The @p init callback is always called after this method returns to provide the initial set of connected
172 /// keys. The @p update callback is called when new keys are added or removed from the set of connected keys.
173 /// @param init The function to call with the initial set of connected keys.
174 /// @param update The function to call when a key is added or removed from the set.
175 void onConnectedKeys(
176 std::function<void(std::vector<Key>)> init,
177 std::function<void(CallbackReason, Key)> update) noexcept;
178
179 /// Calls the given functions to provide the initial set of connected writers and when a new writer
180 /// connects or disconnects. If callback functions are already set, they will be replaced.
181 /// The @p init callback is always called after this method returns to provide the initial set of connected
182 /// writers. The @p update callback is called when new writers connect or disconnect.
183 /// @param init The function to call with the initial set of connected writers.
184 /// @param update The function to call when a new writer connects or disconnects.
186 std::function<void(std::vector<std::string>)> init,
187 std::function<void(CallbackReason, std::string)> update) noexcept;
188
189 /// Calls the given function to provide the initial set of unread samples and when new samples are queued.
190 /// If a function is already set, it will be replaced.
191 /// The @p init callback is always called after this method returns to provide the initial set of unread
192 /// samples. The @p queue callback is called when a new sample is received.
193 /// @param init The function to call with the initial set of unread samples.
194 /// @param queue The function to call when a new sample is received.
195 void onSamples(
196 std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
197 std::function<void(Sample<Key, Value, UpdateTag>)> queue) noexcept;
198
199 protected:
200 /// @private
201 Reader(const std::shared_ptr<DataStormI::DataReader>& impl) noexcept : _impl(impl) {}
202
203 /// @private
204 std::shared_ptr<DataStormI::DataReader> _impl;
205 };
206
207 /// The Writer class is used to write samples for a data element.
208 /// @headerfile DataStorm/DataStorm.h
209 template<typename Key, typename Value, typename UpdateTag> class Writer
210 {
211 public:
212 /// The key type.
213 using KeyType = Key;
214
215 /// The value type.
216 using ValueType = Value;
217
218 /// Move constructor.
219 /// @param writer The writer to move from.
220 Writer(Writer&& writer) noexcept;
221
222 /// Move assignment operator.
223 /// @param writer The writer to move from.
224 /// @return A reference to this writer.
225 Writer& operator=(Writer&& writer) noexcept;
226
227 /// Destructor.
228 /// The destruction of the writer disconnects the writer from the readers.
229 ~Writer();
230
231 /// Indicates whether or not readers are online.
232 /// @return `true` if readers are connected, `false` otherwise.
233 [[nodiscard]] bool hasReaders() const noexcept;
234
235 /// Waits for the given number of readers to be online.
236 /// @param count The number of readers to wait.
237 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
238 void waitForReaders(unsigned int count = 1) const;
239
240 /// Waits for readers to be offline.
241 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
242 void waitForNoReaders() const;
243
244 /// Gets the connected readers.
245 /// @return The names of the connected readers.
246 [[nodiscard]] std::vector<std::string> getConnectedReaders() const;
247
248 /// Gets the keys for which readers are connected to this writer.
249 /// @return The keys for which we have writers connected.
250 [[nodiscard]] std::vector<Key> getConnectedKeys() const;
251
252 /// Gets the last written sample.
253 /// @return The last written sample.
254 /// @throws std::logic_error If there's no sample.
255 [[nodiscard]] Sample<Key, Value, UpdateTag> getLast();
256
257 /// Gets all the written sample kept in the writer history.
258 /// @return The sample history.
259 [[nodiscard]] std::vector<Sample<Key, Value, UpdateTag>> getAll();
260
261 /// Calls the given functions to provide the initial set of connected keys and when a key is added or
262 /// removed from the set of connected keys. If callback functions are already set, they will be replaced.
263 /// The connected keys represent the set of keys for which writers are connected to this reader.
264 /// The @p init callback is always called after this method returns to provide the initial set of connected
265 /// keys. The @p update callback is called when new keys are added or removed from the set of connected keys.
266 /// @param init The function to call with the initial set of connected keys.
267 /// @param update The function to call when a key is added or removed from the set.
268 void onConnectedKeys(
269 std::function<void(std::vector<Key>)> init,
270 std::function<void(CallbackReason, Key)> update) noexcept;
271
272 /// Calls the given functions to provide the initial set of connected readers and when a new reader
273 /// connects or disconnects. If callback functions are already set, they will be replaced.
274 /// The @p init callback is always called after this method returns to provide the initial set of connected
275 /// readers. The @p update callback is called when new readers connect or disconnect.
276 /// @param init The function to call with the initial set of connected readers.
277 /// @param update The function to call when a new reader connects or disconnects.
279 std::function<void(std::vector<std::string>)> init,
280 std::function<void(CallbackReason, std::string)> update) noexcept;
281
282 protected:
283 /// @private
284 Writer(const std::shared_ptr<DataStormI::DataWriter>& impl) noexcept : _impl(impl) {}
285
286 /// @private
287 std::shared_ptr<DataStormI::DataWriter> _impl;
288 };
289
290 /// The Topic class.
291 /// This class allows constructing reader and writer objects. It's also used to setup filter and updater
292 /// functions.
293 /// @headerfile DataStorm/DataStorm.h
294 template<typename Key, typename Value, typename UpdateTag = std::string> class Topic
295 {
296 public:
297 /// The topic's key type.
298 using KeyType = Key;
299
300 /// The topic's value type.
301 using ValueType = Value;
302
303 /// The topic's update tag type (defaults to std::string if not specified).
304 using UpdateTagType = UpdateTag;
305
306 /// The topic's writer type.
308
309 /// The topic's reader type.
311
312 /// The topic's sample type.
314
315 /// Constructs a new Topic for the topic with the given name.
316 /// @param node The node.
317 /// @param name The name of the topic.
318 Topic(const Node& node, std::string name) noexcept;
319
320 /// Move constructor.
321 /// @param topic The topic to move from.
322 Topic(Topic&& topic) noexcept
323 : _name(std::move(topic._name)),
324 _topicFactory(std::move(topic._topicFactory)),
325 _keyFactory(std::move(topic._keyFactory)),
326 _tagFactory(std::move(topic._tagFactory)),
327 _keyFilterFactories(std::move(topic._keyFilterFactories)),
328 _sampleFilterFactories(std::move(topic._sampleFilterFactories)),
329 _reader(std::move(topic._reader)),
330 _writer(std::move(topic._writer)),
331 _updaters(std::move(topic._updaters))
332 {
333 }
334
335 /// Destructor.
336 /// The destructor disconnects the topic from peers.
337 ~Topic();
338
339 /// Move assignment operator.
340 /// @param topic The topic to move from.
341 /// @return A reference to this topic.
342 Topic& operator=(Topic&& topic) noexcept;
343
344 /// Indicates whether or not data writers are online.
345 /// @return `true` if data writers are connected, `false` otherwise.
346 [[nodiscard]] bool hasWriters() const noexcept;
347
348 /// Waits for the given number of data writers to be online.
349 /// @param count The number of data writers to wait for.
350 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
351 void waitForWriters(unsigned int count = 1) const;
352
353 /// Waits for data writers to be offline.
354 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
355 void waitForNoWriters() const;
356
357 /// Sets the default configuration used to construct readers.
358 /// @param config The default writer configuration.
359 void setWriterDefaultConfig(const WriterConfig& config) noexcept;
360
361 /// Indicates whether or not data readers are online.
362 /// @return `true` if data readers are connected, `false` otherwise.
363 [[nodiscard]] bool hasReaders() const noexcept;
364
365 /// Waits for the given number of data readers to be online.
366 /// @param count The number of data readers to wait.
367 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
368 void waitForReaders(unsigned int count = 1) const;
369
370 /// Waits for data readers to be offline.
371 /// @throws NodeShutdownException Thrown when the node is shut down while waiting.
372 void waitForNoReaders() const;
373
374 /// Sets the default configuration used to construct readers.
375 /// @param config The default reader configuration.
376 void setReaderDefaultConfig(const ReaderConfig& config) noexcept;
377
378 /// Sets an updater function for the given update tag. The function is called when a partial update is
379 /// received or sent to compute the new value. The function is provided the latest value and the partial
380 /// update. It should return the new value.
381 /// @param tag The update tag.
382 /// @param updater The updater function.
383 template<typename UpdateValue>
384 void setUpdater(const UpdateTag& tag, std::function<void(Value&, UpdateValue)> updater) noexcept;
385
386 /// Sets a key filter factory. The given factory function must return a filter function that returns `true` if
387 /// the key matches the filter criteria, `false` otherwise.
388 /// @param name The name of the key filter.
389 /// @param factory The filter factory function.
390 template<typename Criteria>
391 void setKeyFilter(
392 std::string name,
393 std::function<std::function<bool(const Key&)>(const Criteria&)> factory) noexcept;
394
395 /// Sets a sample filter factory. The given factory function must return a filter function that returns `true`
396 /// if the sample matches the filter criteria, `false` otherwise.
397 /// @param name The name of the sample filter.
398 /// @param factory The filter factory function.
399 template<typename Criteria>
400 void setSampleFilter(
401 std::string name,
402 std::function<std::function<bool(const SampleType&)>(const Criteria&)> factory) noexcept;
403
404 private:
405 [[nodiscard]] std::shared_ptr<DataStormI::TopicReader> getReader() const;
406 [[nodiscard]] std::shared_ptr<DataStormI::TopicWriter> getWriter() const;
407 [[nodiscard]] Ice::CommunicatorPtr getCommunicator() const noexcept;
408
409 template<typename, typename, typename> friend class SingleKeyWriter;
410 template<typename, typename, typename> friend class MultiKeyWriter;
411 template<typename, typename, typename> friend class SingleKeyReader;
412 template<typename, typename, typename> friend class MultiKeyReader;
413 template<typename, typename, typename> friend class FilteredKeyReader;
414
415 // These fields are non-const because we move them in the move-assignment operator.
416 std::string _name;
417 std::shared_ptr<DataStormI::TopicFactory> _topicFactory;
418 std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
419 std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
420 std::shared_ptr<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>> _keyFilterFactories;
421 std::shared_ptr<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>> _sampleFilterFactories;
422
423 mutable std::mutex _mutex;
424 mutable std::shared_ptr<DataStormI::TopicReader> _reader;
425 mutable std::shared_ptr<DataStormI::TopicWriter> _writer;
426 mutable std::map<std::shared_ptr<DataStormI::Tag>, DataStormI::Topic::Updater> _updaters;
427 };
428
429 /// Filter structure to specify the filter name and criteria value.
430 /// @headerfile DataStorm/DataStorm.h
431 template<typename T> struct Filter
432 {
433 /// Constructs a filter structure with the given name and criteria.
434 /// @param name The filter name
435 /// @param criteria The criteria
436 template<typename TT>
437 Filter(std::string name, TT&& criteria) noexcept : name(std::move(name)),
438 criteria(std::forward<TT>(criteria))
439 {
440 }
441
442 /// The filter name.
443 std::string name;
444
445 /// The filter criteria value.
447 };
448
449 /// The key reader to read the data element associated with a given key.
450 /// @headerfile DataStorm/DataStorm.h
451 template<typename Key, typename Value, typename UpdateTag = std::string>
452 class SingleKeyReader : public Reader<Key, Value, UpdateTag>
453 {
454 public:
455 /// Constructs a new reader for the given key. The construction of the reader connects the reader to writers
456 /// with a matching key.
457 /// @param topic The topic.
458 /// @param key The key of the data element to read.
459 /// @param name The optional reader name.
460 /// @param config The reader configuration.
462 const Topic<Key, Value, UpdateTag>& topic,
463 const Key& key,
464 std::string name = std::string(),
465 const ReaderConfig& config = ReaderConfig());
466
467 /// Constructs a new reader for the given key and sample filter criteria. The construction of the reader
468 /// connects the reader to writers with a matching key. The writer will only send samples matching the
469 /// given sample filter criteria to the reader.
470 /// @param topic The topic.
471 /// @param key The key of the data element to read.
472 /// @param sampleFilter The sample filter.
473 /// @param name The optional reader name.
474 /// @param config The reader configuration.
475 template<typename SampleFilterCriteria>
477 const Topic<Key, Value, UpdateTag>& topic,
478 const Key& key,
479 const Filter<SampleFilterCriteria>& sampleFilter,
480 std::string name = std::string(),
481 const ReaderConfig& config = ReaderConfig());
482
483 /// Move constructor.
484 /// @param reader The reader to move from.
485 SingleKeyReader(SingleKeyReader&& reader) noexcept;
486
487 /// Move assignment operator.
488 /// @param reader The reader to move from.
489 /// @return A reference to this reader.
490 SingleKeyReader& operator=(SingleKeyReader&& reader) noexcept;
491 };
492
493 /// The key reader to read the data element associated with a given set of keys.
494 /// @headerfile DataStorm/DataStorm.h
495 template<typename Key, typename Value, typename UpdateTag = std::string>
496 class MultiKeyReader : public Reader<Key, Value, UpdateTag>
497 {
498 public:
499 /// Constructs a new reader for the given keys. The construction of the reader connects the reader to
500 /// writers with matching keys. If an empty vector of keys is provided, the reader will connect to all the
501 /// available writers.
502 /// @param topic The topic.
503 /// @param keys The keys of the data elements to read.
504 /// @param name The optional reader name.
505 /// @param config The reader configuration.
507 const Topic<Key, Value, UpdateTag>& topic,
508 const std::vector<Key>& keys,
509 std::string name = std::string(),
510 const ReaderConfig& config = ReaderConfig());
511
512 /// Constructs a new reader for the given keys and sample filter criteria. The construction of the reader
513 /// connects the reader to writers with matching keys. If an empty vector of keys is provided, the reader
514 /// will connect to all the available writers. The writer will only send samples matching the given sample
515 /// filter criteria to the reader.
516 /// @param topic The topic.
517 /// @param keys The keys of the data elements to read.
518 /// @param sampleFilter The sample filter.
519 /// @param name The optional reader name.
520 /// @param config The reader configuration.
521 template<typename SampleFilterCriteria>
523 const Topic<Key, Value, UpdateTag>& topic,
524 const std::vector<Key>& keys,
525 const Filter<SampleFilterCriteria>& sampleFilter,
526 std::string name = std::string(),
527 const ReaderConfig& config = ReaderConfig());
528
529 /// Move constructor.
530 /// @param reader The reader to move from.
531 MultiKeyReader(MultiKeyReader&& reader) noexcept;
532
533 /// Move assignment operator.
534 /// @param reader The reader to move from.
535 /// @return A reference to this reader.
536 MultiKeyReader& operator=(MultiKeyReader&& reader) noexcept;
537 };
538
539 /// Creates a key reader for the given topic and key. This helper method deduces the topic Key, Value and
540 /// UpdateTag types from the topic argument.
541 /// @param topic The topic.
542 /// @param key The key.
543 /// @param name The optional reader name.
544 /// @param config The optional reader configuration.
545 template<typename K, typename V, typename UT>
547 const Topic<K, V, UT>& topic,
548 const typename Topic<K, V, UT>::KeyType& key,
549 std::string name = std::string(),
550 const ReaderConfig& config = ReaderConfig())
551 {
552 return SingleKeyReader<K, V, UT>(topic, key, std::move(name), config);
553 }
554
555 /// Creates a key reader for the given topic, key and sample filter. This helper method deduces the topic Key
556 /// and Value types from the topic argument.
557 /// @param topic The topic.
558 /// @param key The key.
559 /// @param sampleFilter The sample filter.
560 /// @param name The optional reader name.
561 /// @param config The optional reader configuration.
562 template<typename SFC, typename K, typename V, typename UT>
564 const Topic<K, V, UT>& topic,
565 const typename Topic<K, V, UT>::KeyType& key,
566 const Filter<SFC>& sampleFilter,
567 std::string name = std::string(),
568 const ReaderConfig& config = ReaderConfig())
569 {
570 return SingleKeyReader<K, V, UT>(topic, key, sampleFilter, std::move(name), config);
571 }
572
573 /// Creates a multi-key reader for the given topic. This helper method deduces the topic Key, Value and
574 /// UpdateTag types from the topic argument.
575 /// The reader will only receive samples for the given set of keys.
576 /// @param topic The topic.
577 /// @param keys The keys.
578 /// @param name The optional reader name.
579 /// @param config The optional reader configuration.
580 template<typename K, typename V, typename UT>
582 const Topic<K, V, UT>& topic,
583 const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
584 std::string name = std::string(),
585 const ReaderConfig& config = ReaderConfig())
586 {
587 return MultiKeyReader<K, V, UT>(topic, keys, std::move(name), config);
588 }
589
590 /// Creates a multi-key reader for the given topic, keys and sample filter. This helper method deduces the
591 /// topic Key and Value types from the topic argument.
592 /// The reader will only receive samples for the given set of keys.
593 /// @param topic The topic.
594 /// @param keys The keys.
595 /// @param sampleFilter The sample filter.
596 /// @param name The optional reader name.
597 /// @param config The optional reader configuration.
598 template<typename SFC, typename K, typename V, typename UT>
600 const Topic<K, V, UT>& topic,
601 const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
602 const Filter<SFC>& sampleFilter,
603 std::string name = std::string(),
604 const ReaderConfig& config = ReaderConfig())
605 {
606 return MultiKeyReader<K, V, UT>(topic, keys, sampleFilter, std::move(name), config);
607 }
608
609 /// Creates an any-key reader for the given topic. This helper method deduces the topic Key, Value and
610 /// UpdateTag types from the topic argument.
611 /// The reader will receive samples for any keys from the topic.
612 /// @param topic The topic.
613 /// @param name The optional reader name.
614 /// @param config The optional reader configuration.
615 template<typename K, typename V, typename UT>
617 const Topic<K, V, UT>& topic,
618 std::string name = std::string(),
619 const ReaderConfig& config = ReaderConfig())
620 {
621 return MultiKeyReader<K, V, UT>(topic, {}, std::move(name), config);
622 }
623
624 /// Creates an any-key reader for the given topic and sample filter. This helper method deduces the topic Key
625 /// and Value types from the topic argument.
626 /// The reader will receive samples for the keys from the topic.
627 /// @param topic The topic.
628 /// @param sampleFilter The sample filter.
629 /// @param name The optional reader name.
630 /// @param config The optional reader configuration.
631 template<typename SFC, typename K, typename V, typename UT>
633 const Topic<K, V, UT>& topic,
634 const Filter<SFC>& sampleFilter,
635 std::string name = std::string(),
636 const ReaderConfig& config = ReaderConfig())
637 {
638 return MultiKeyReader<K, V, UT>(topic, {}, sampleFilter, std::move(name), config);
639 }
640
641 /// The filtered reader to read data elements whose key match a given filter.
642 /// @headerfile DataStorm/DataStorm.h
643 template<typename Key, typename Value, typename UpdateTag = std::string>
644 class FilteredKeyReader : public Reader<Key, Value, UpdateTag>
645 {
646 public:
647 /// Constructs a new reader for the given key filter. The construction of the reader connects the reader to
648 /// writers whose key matches the key filter criteria.
649 /// @param topic The topic.
650 /// @param keyFilter The key filter.
651 /// @param name The optional reader name.
652 /// @param config The reader configuration.
653 /// @throws std::invalid_argument Thrown when the key filter is not registered with the topic or the filter is
654 /// invalid.
655 template<typename KeyFilterCriteria>
657 const Topic<Key, Value, UpdateTag>& topic,
658 const Filter<KeyFilterCriteria>& keyFilter,
659 std::string name = std::string(),
660 const ReaderConfig& config = ReaderConfig());
661
662 /// Constructs a new reader for the given key filter and sample filter criteria. The construction of the
663 /// reader connects the reader to writers whose key matches the key filter criteria.
664 /// @param topic The topic.
665 /// @param keyFilter The key filter.
666 /// @param sampleFilter The sample filter.
667 /// @param name The optional reader name.
668 /// @param config The reader configuration.
669 /// @throws std::invalid_argument Thrown when the key filter is not registered with the topic or the filter is
670 /// invalid.
671 template<typename KeyFilterCriteria, typename SampleFilterCriteria>
673 const Topic<Key, Value, UpdateTag>& topic,
674 const Filter<KeyFilterCriteria>& keyFilter,
675 const Filter<SampleFilterCriteria>& sampleFilter,
676 std::string name = std::string(),
677 const ReaderConfig& config = ReaderConfig());
678
679 /// Move constructor
680 /// @param reader The reader to move from.
681 FilteredKeyReader(FilteredKeyReader&& reader) noexcept;
682
683 /// Move assignment operator.
684 /// @param reader The reader to move from.
685 /// @return A reference to this reader.
687 };
688
689 /// Creates a new filtered reader for the given topic and key filter. This helper method deduces the topic Key,
690 /// Value and UpdateTag types from the topic argument.
691 /// @param topic The topic.
692 /// @param filter The key filter.
693 /// @param name The optional reader name.
694 /// @param config The optional reader configuration.
695 template<typename KFC, typename K, typename V, typename UT>
697 const Topic<K, V, UT>& topic,
698 const Filter<KFC>& filter,
699 std::string name = std::string(),
700 const ReaderConfig& config = ReaderConfig())
701 {
702 return FilteredKeyReader<K, V, UT>(topic, filter, std::move(name), config);
703 }
704
705 /// Creates a new filter reader for the given topic, key filter and sample filter. This helper method deduces
706 /// the topic Key, Value and UpdateTag types from the topic argument.
707 /// @param topic The topic.
708 /// @param keyFilter The key filter.
709 /// @param sampleFilter The sample filter.
710 /// @param name The optional reader name.
711 /// @param config The optional reader configuration.
712 template<typename KFC, typename SFC, typename K, typename V, typename UT>
714 const Topic<K, V, UT>& topic,
715 const Filter<KFC>& keyFilter,
716 const Filter<SFC>& sampleFilter,
717 std::string name = std::string(),
718 const ReaderConfig& config = ReaderConfig())
719 {
720 return FilteredKeyReader<K, V, UT>(topic, keyFilter, sampleFilter, std::move(name), config);
721 }
722
723 /// The key writer to write the data element associated with a given key.
724 /// @headerfile DataStorm/DataStorm.h
725 template<typename Key, typename Value, typename UpdateTag = std::string>
726 class SingleKeyWriter : public Writer<Key, Value, UpdateTag>
727 {
728 public:
729 /// Constructs a new writer for the given key. The construction of the writer connects the writer to readers
730 /// with a matching key.
731 /// @param topic The topic.
732 /// @param key The key of the data element to write.
733 /// @param name The optional writer name.
734 /// @param config The writer configuration.
736 const Topic<Key, Value, UpdateTag>& topic,
737 const Key& key,
738 std::string name = std::string(),
739 const WriterConfig& config = WriterConfig());
740
741 /// Move constructor.
742 /// @param writer The writer to move from.
743 SingleKeyWriter(SingleKeyWriter&& writer) noexcept;
744
745 /// Move assignment operator.
746 /// @param writer The writer to move from.
747 /// @return A reference to this writer.
748 SingleKeyWriter& operator=(SingleKeyWriter&& writer) noexcept;
749
750 /// Adds the data element. This generates a SampleEvent::Add sample with the given value.
751 /// @param value The data element value.
752 void add(const Value& value);
753
754 /// Updates the data element. This generates a SampleEvent::Update sample with the given value.
755 /// @param value The data element value.
756 void update(const Value& value);
757
758 /// Gets a partial update generator function for the given partial update tag. When called, the returned
759 /// function generates a SampleEvent::PartialUpdate sample with the given partial update value.
760 /// The UpdateValue template parameter must match the UpdateValue type used to register the updater with
761 /// the Topic::setUpdater method.
762 /// @param tag The partial update tag.
763 template<typename UpdateValue>
764 [[nodiscard]] std::function<void(const UpdateValue&)> partialUpdate(const UpdateTag& tag);
765
766 /// Removes the data element. This generates a SampleEvent::Remove sample.
767 void remove() noexcept;
768
769 private:
770 const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
771 };
772
773 /// The key writer to write data elements associated with a given set of keys.
774 /// @headerfile DataStorm/DataStorm.h
775 template<typename Key, typename Value, typename UpdateTag = std::string>
776 class MultiKeyWriter : public Writer<Key, Value, UpdateTag>
777 {
778 public:
779 /// Constructs a new writer for the given keys. The construction of the writer connects the writer to
780 /// readers with matching keys. If an empty vector of keys is provided, the writer will connect to all the
781 /// available readers.
782 /// @param topic The topic.
783 /// @param keys The keys.
784 /// @param name The optional writer name.
785 /// @param config The writer configuration.
787 const Topic<Key, Value, UpdateTag>& topic,
788 const std::vector<Key>& keys,
789 std::string name = std::string(),
790 const WriterConfig& config = WriterConfig());
791
792 /// Move constructor
793 /// @param writer The writer to move from.
794 MultiKeyWriter(MultiKeyWriter&& writer) noexcept;
795
796 /// Move assignment operator.
797 /// @param writer The writer to move from.
798 /// @return A reference to this writer.
799 MultiKeyWriter& operator=(MultiKeyWriter&& writer) noexcept;
800
801 /// Adds the data element. This generates a SampleEvent::Add sample with the given value.
802 /// @param key The key
803 /// @param value The data element value.
804 void add(const Key& key, const Value& value);
805
806 /// Updates the data element. This generates a SampleEvent::Update sample with the given value.
807 /// @param key The key
808 /// @param value The data element value.
809 void update(const Key& key, const Value& value);
810
811 /// Gets a partial update generator function for the given partial update tag. When called, the returned
812 /// function generates a SampleEvent::PartialUpdate sample with the given partial update value.
813 /// The UpdateValue template parameter must match the UpdateValue type used to register the updater with
814 /// the Topic::setUpdater method.
815 /// @param tag The partial update tag.
816 template<typename UpdateValue>
817 [[nodiscard]] std::function<void(const Key&, const UpdateValue&)> partialUpdate(const UpdateTag& tag);
818
819 /// Removes the data element. This generates a TopicEvent::Remove sample.
820 /// @param key The key
821 void remove(const Key& key) noexcept;
822
823 private:
824 const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
825 const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
826 };
827
828 /// Creates a key writer for the given topic and key. This helper method deduces the topic Key, Value and
829 /// UpdateTag types from the topic argument.
830 /// @param topic The topic.
831 /// @param key The key.
832 /// @param name The optional writer name.
833 /// @param config The optional writer configuration.
834 template<typename K, typename V, typename UT>
836 const Topic<K, V, UT>& topic,
837 const typename Topic<K, V, UT>::KeyType& key,
838 std::string name = std::string(),
839 const WriterConfig& config = WriterConfig())
840 {
841 return SingleKeyWriter<K, V, UT>(topic, key, std::move(name), config);
842 }
843
844 /// Creates a multi-key writer for the given topic and keys. This helper method deduces the topic Key, Value
845 /// and UpdateTag types from the topic argument.
846 /// @param topic The topic.
847 /// @param keys The keys.
848 /// @param name The optional writer name.
849 /// @param config The optional writer configuration.
850 template<typename K, typename V, typename UT>
852 const Topic<K, V, UT>& topic,
853 const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
854 std::string name = std::string(),
855 const WriterConfig& config = WriterConfig())
856 {
857 return MultiKeyWriter<K, V, UT>(topic, keys, std::move(name), config);
858 }
859
860 /// Creates an any-key writer for the given topic. This helper method deduces the topic Key, Value and
861 /// UpdateTag types from the topic argument.
862 /// @param topic The topic.
863 /// @param name The optional writer name.
864 /// @param config The optional writer configuration.
865 template<typename K, typename V, typename UT>
867 const Topic<K, V, UT>& topic,
868 std::string name = std::string(),
869 const WriterConfig& config = WriterConfig())
870 {
871 return MultiKeyWriter<K, V, UT>(topic, {}, std::move(name), config);
872 }
873
874 //
875 // Public template based API implementation
876 //
877
878 //
879 // Sample template implementation
880 //
881 template<typename Key, typename Value, typename UpdateTag>
883 {
884 return _impl->event;
885 }
886
887 template<typename Key, typename Value, typename UpdateTag>
888 const Key& Sample<Key, Value, UpdateTag>::getKey() const noexcept
889 {
890 return _impl->getKey();
891 }
892
893 template<typename Key, typename Value, typename UpdateTag>
894 const Value& Sample<Key, Value, UpdateTag>::getValue() const noexcept
895 {
896 return _impl->getValue();
897 }
898
899 template<typename Key, typename Value, typename UpdateTag>
901 {
902 return _impl->getTag();
903 }
904
905 template<typename Key, typename Value, typename UpdateTag>
906 std::chrono::time_point<std::chrono::system_clock> Sample<Key, Value, UpdateTag>::getTimeStamp() const noexcept
907 {
908 return _impl->timestamp;
909 }
910
911 template<typename Key, typename Value, typename UpdateTag>
912 const std::string& Sample<Key, Value, UpdateTag>::getOrigin() const noexcept
913 {
914 return _impl->origin;
915 }
916
917 template<typename Key, typename Value, typename UpdateTag>
918 const std::string& Sample<Key, Value, UpdateTag>::getSession() const noexcept
919 {
920 return _impl->session;
921 }
922
923 template<typename Key, typename Value, typename UpdateTag>
924 Sample<Key, Value, UpdateTag>::Sample(const std::shared_ptr<DataStormI::Sample>& impl) noexcept
925 : _impl(std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(impl))
926 {
927 }
928
929 //
930 // Reader template implementation
931 //
932 template<typename Key, typename Value, typename UpdateTag>
934 : _impl(std::move(reader._impl))
935 {
936 }
937
938 template<typename Key, typename Value, typename UpdateTag> Reader<Key, Value, UpdateTag>::~Reader()
939 {
940 if (_impl)
941 {
942 _impl->destroy();
943 }
944 }
945
946 template<typename Key, typename Value, typename UpdateTag>
948 {
949 if (_impl)
950 {
951 _impl->destroy();
952 }
953 _impl = std::move(reader._impl);
954 return *this;
955 }
956
957 template<typename Key, typename Value, typename UpdateTag>
959 {
960 return _impl->hasWriters();
961 }
962
963 template<typename Key, typename Value, typename UpdateTag>
965 {
966 _impl->waitForWriters(static_cast<int>(count));
967 }
968
969 template<typename Key, typename Value, typename UpdateTag>
971 {
972 _impl->waitForWriters(-1);
973 }
974
975 template<typename Key, typename Value, typename UpdateTag>
977 {
978 return _impl->getConnectedElements();
979 }
980
981 template<typename Key, typename Value, typename UpdateTag>
983 {
984 std::vector<Key> keys;
985 auto connectedKeys = _impl->getConnectedKeys();
986 keys.reserve(connectedKeys.size());
987 for (const auto& k : connectedKeys)
988 {
989 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
990 }
991 return keys;
992 }
993
994 template<typename Key, typename Value, typename UpdateTag>
995 std::vector<Sample<Key, Value, UpdateTag>> Reader<Key, Value, UpdateTag>::getAllUnread()
996 {
997 auto unread = _impl->getAllUnread();
998 std::vector<Sample<Key, Value, UpdateTag>> samples;
999 samples.reserve(unread.size());
1000 for (const auto& sample : unread)
1001 {
1002 samples.push_back(sample);
1003 }
1004 return samples;
1005 }
1006
1007 template<typename Key, typename Value, typename UpdateTag>
1009 {
1010 _impl->waitForUnread(count);
1011 }
1012
1013 template<typename Key, typename Value, typename UpdateTag>
1015 {
1016 return _impl->hasUnread();
1017 }
1018
1019 template<typename Key, typename Value, typename UpdateTag>
1024
1025 template<typename Key, typename Value, typename UpdateTag>
1027 std::function<void(std::vector<Key>)> init,
1028 std::function<void(CallbackReason, Key)> update) noexcept
1029 {
1030 _impl->onConnectedKeys(
1031 init ?
1032 [init = std::move(init)](const std::vector<std::shared_ptr<DataStormI::Key>>& connectedKeys)
1033 {
1034 std::vector<Key> keys;
1035 keys.reserve(connectedKeys.size());
1036 for(const auto& k : connectedKeys)
1037 {
1038 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1039 }
1040 init(std::move(keys));
1041 } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>{},
1042 update ?
1043 [update = std::move(update)](CallbackReason action, const std::shared_ptr<DataStormI::Key>& key)
1044 {
1045 update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
1046 } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>{});
1047 }
1048
1049 template<typename Key, typename Value, typename UpdateTag>
1051 std::function<void(std::vector<std::string>)> init,
1052 std::function<void(CallbackReason, std::string)> update) noexcept
1053 {
1054 _impl->onConnectedElements(std::move(init), std::move(update));
1055 }
1056
1057 template<typename Key, typename Value, typename UpdateTag>
1059 std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
1060 std::function<void(Sample<Key, Value, UpdateTag>)> update) noexcept
1061 {
1062 auto communicator = _impl->getCommunicator();
1063 _impl->onSamples(
1064 init ?
1065 [communicator, init = std::move(init)](const std::vector<std::shared_ptr<DataStormI::Sample>>& samplesI)
1066 {
1067 std::vector<Sample<Key, Value, UpdateTag>> samples;
1068 samples.reserve(samplesI.size());
1069 for(const auto& s : samplesI)
1070 {
1071 samples.emplace_back(s);
1072 }
1073 init(std::move(samples));
1074 } : std::function<void(const std::vector<std::shared_ptr<DataStormI::Sample>>&)>(),
1075 update ?
1076 [communicator, update = std::move(update)](const std::shared_ptr<DataStormI::Sample>& sampleI)
1077 {
1078 update(sampleI);
1079 } : std::function<void(const std::shared_ptr<DataStormI::Sample>&)>{});
1080 }
1081
1082 template<typename Key, typename Value, typename UpdateTag>
1084 const Topic<Key, Value, UpdateTag>& topic,
1085 const Key& key,
1086 std::string name,
1087 const ReaderConfig& config)
1088 : Reader<Key, Value, UpdateTag>(
1089 topic.getReader()->create({topic._keyFactory->create(key)}, std::move(name), config))
1090 {
1091 }
1092
1093 template<typename Key, typename Value, typename UpdateTag>
1094 template<typename SampleFilterCriteria>
1096 const Topic<Key, Value, UpdateTag>& topic,
1097 const Key& key,
1098 const Filter<SampleFilterCriteria>& sampleFilter,
1099 std::string name,
1100 const ReaderConfig& config)
1101 : Reader<Key, Value, UpdateTag>(topic.getReader()->create(
1102 {topic._keyFactory->create(key)},
1103 std::move(name),
1104 config,
1105 sampleFilter.name,
1106 DataStormI::EncoderT<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
1107 {
1108 }
1109
1110 template<typename Key, typename Value, typename UpdateTag>
1115
1116 template<typename Key, typename Value, typename UpdateTag>
1119 {
1121 return *this;
1122 }
1123
1124 template<typename Key, typename Value, typename UpdateTag>
1126 const Topic<Key, Value, UpdateTag>& topic,
1127 const std::vector<Key>& keys,
1128 std::string name,
1129 const ReaderConfig& config)
1130 : Reader<Key, Value, UpdateTag>(
1131 topic.getReader()->create(topic._keyFactory->create(keys), std::move(name), config))
1132 {
1133 }
1134
1135 template<typename Key, typename Value, typename UpdateTag>
1136 template<typename SampleFilterCriteria>
1138 const Topic<Key, Value, UpdateTag>& topic,
1139 const std::vector<Key>& keys,
1140 const Filter<SampleFilterCriteria>& sampleFilter,
1141 std::string name,
1142 const ReaderConfig& config)
1143 : Reader<Key, Value, UpdateTag>(topic.getReader()->create(
1144 topic._keyFactory->create(keys),
1145 std::move(name),
1146 config,
1147 sampleFilter.name,
1148 Encoder<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
1149 {
1150 }
1151
1152 template<typename Key, typename Value, typename UpdateTag>
1157
1158 template<typename Key, typename Value, typename UpdateTag>
1161 {
1163 return *this;
1164 }
1165
1166 template<typename Key, typename Value, typename UpdateTag>
1167 template<typename KeyFilterCriteria>
1169 const Topic<Key, Value, UpdateTag>& topic,
1170 const Filter<KeyFilterCriteria>& filter,
1171 std::string name,
1172 const ReaderConfig& config)
1173 : Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
1174 topic._keyFilterFactories->create(filter.name, filter.criteria),
1175 std::move(name),
1176 config))
1177 {
1178 }
1179
1180 template<typename Key, typename Value, typename UpdateTag>
1181 template<typename KeyFilterCriteria, typename SampleFilterCriteria>
1183 const Topic<Key, Value, UpdateTag>& topic,
1184 const Filter<KeyFilterCriteria>& keyFilter,
1185 const Filter<SampleFilterCriteria>& sampleFilter,
1186 std::string name,
1187 const ReaderConfig& config)
1188 : Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(
1189 topic._keyFilterFactories->create(keyFilter.name, keyFilter.criteria),
1190 std::move(name),
1191 config,
1192 sampleFilter.name,
1193 Encoder<SampleFilterCriteria>::encode(topic.getCommunicator(), sampleFilter.criteria)))
1194 {
1195 }
1196
1197 template<typename Key, typename Value, typename UpdateTag>
1203
1204 template<typename Key, typename Value, typename UpdateTag>
1207 {
1209 return *this;
1210 }
1211
1212 //
1213 // Writer template implementation
1214 //
1215 template<typename Key, typename Value, typename UpdateTag>
1216 Writer<Key, Value, UpdateTag>::Writer(Writer&& writer) noexcept : _impl(std::move(writer._impl))
1217 {
1218 }
1219
1220 template<typename Key, typename Value, typename UpdateTag> Writer<Key, Value, UpdateTag>::~Writer()
1221 {
1222 if (_impl)
1223 {
1224 _impl->destroy();
1225 }
1226 }
1227
1228 template<typename Key, typename Value, typename UpdateTag>
1230 {
1231 if (_impl)
1232 {
1233 _impl->destroy();
1234 }
1235 _impl = std::move(writer._impl);
1236 return *this;
1237 }
1238
1239 template<typename Key, typename Value, typename UpdateTag>
1241 {
1242 return _impl->hasReaders();
1243 }
1244
1245 template<typename Key, typename Value, typename UpdateTag>
1247 {
1248 return _impl->waitForReaders(static_cast<int>(count));
1249 }
1250
1251 template<typename Key, typename Value, typename UpdateTag>
1253 {
1254 return _impl->waitForReaders(-1);
1255 }
1256
1257 template<typename Key, typename Value, typename UpdateTag>
1259 {
1260 return _impl->getConnectedElements();
1261 }
1262
1263 template<typename Key, typename Value, typename UpdateTag>
1265 {
1266 std::vector<Key> keys;
1267 auto connectedKeys = _impl->getConnectedKeys();
1268 keys.reserve(connectedKeys.size());
1269 for (const auto& k : connectedKeys)
1270 {
1271 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1272 }
1273 return keys;
1274 }
1275
1276 template<typename Key, typename Value, typename UpdateTag>
1278 {
1279 auto sample = _impl->getLast();
1280 if (!sample)
1281 {
1282 throw std::logic_error("no sample");
1283 }
1284 return Sample<Key, Value, UpdateTag>(sample);
1285 }
1286
1287 template<typename Key, typename Value, typename UpdateTag>
1288 std::vector<Sample<Key, Value, UpdateTag>> Writer<Key, Value, UpdateTag>::getAll()
1289 {
1290 auto all = _impl->getAll();
1291 std::vector<Sample<Key, Value, UpdateTag>> samples;
1292 samples.reserve(all.size());
1293 for (const auto& sample : all)
1294 {
1295 samples.push_back(sample);
1296 }
1297 return samples;
1298 }
1299
1300 template<typename Key, typename Value, typename UpdateTag>
1302 std::function<void(std::vector<Key>)> init,
1303 std::function<void(CallbackReason, Key)> update) noexcept
1304 {
1305 _impl->onConnectedKeys(
1306 init ?
1307 [init = std::move(init)](const std::vector<std::shared_ptr<DataStormI::Key>>& connectedKeys)
1308 {
1309 std::vector<Key> keys;
1310 keys.reserve(connectedKeys.size());
1311 for(const auto& k : connectedKeys)
1312 {
1313 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1314 }
1315 init(std::move(keys));
1316 } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>{},
1317 update ?
1318 [update = std::move(update)](CallbackReason action, const std::shared_ptr<DataStormI::Key>& key)
1319 {
1320 update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
1321 } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>{});
1322 }
1323
1324 template<typename Key, typename Value, typename UpdateTag>
1326 std::function<void(std::vector<std::string>)> init,
1327 std::function<void(CallbackReason, std::string)> update) noexcept
1328 {
1329 _impl->onConnectedElements(std::move(init), std::move(update));
1330 }
1331
1332 template<typename Key, typename Value, typename UpdateTag>
1334 const Topic<Key, Value, UpdateTag>& topic,
1335 const Key& key,
1336 std::string name,
1337 const WriterConfig& config)
1338 : Writer<Key, Value, UpdateTag>(
1339 topic.getWriter()->create({topic._keyFactory->create(key)}, std::move(name), config)),
1340 _tagFactory(topic._tagFactory)
1341 {
1342 }
1343
1344 template<typename Key, typename Value, typename UpdateTag>
1346 : Writer<Key, Value, UpdateTag>(std::move(writer)),
1347 _tagFactory(std::move(writer._tagFactory))
1348 {
1349 }
1350
1351 template<typename Key, typename Value, typename UpdateTag>
1354 {
1356 return *this;
1357 }
1358
1359 template<typename Key, typename Value, typename UpdateTag>
1361 {
1362 Writer<Key, Value, UpdateTag>::_impl->publish(
1363 nullptr,
1364 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
1365 }
1366
1367 template<typename Key, typename Value, typename UpdateTag>
1369 {
1370 Writer<Key, Value, UpdateTag>::_impl->publish(
1371 nullptr,
1372 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
1373 }
1374
1375 template<typename Key, typename Value, typename UpdateTag>
1376 template<typename UpdateValue>
1377 std::function<void(const UpdateValue&)> SingleKeyWriter<Key, Value, UpdateTag>::partialUpdate(const UpdateTag& tag)
1378 {
1379 auto impl = Writer<Key, Value, UpdateTag>::_impl;
1380 auto updateTag = _tagFactory->create(tag);
1381 return [impl, updateTag](const UpdateValue& value)
1382 {
1383 auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
1384 impl->publish(nullptr, std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1385 };
1386 }
1387
1388 template<typename Key, typename Value, typename UpdateTag>
1390 {
1391 Writer<Key, Value, UpdateTag>::_impl->publish(
1392 nullptr,
1393 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
1394 }
1395
1396 template<typename Key, typename Value, typename UpdateTag>
1398 const Topic<Key, Value, UpdateTag>& topic,
1399 const std::vector<Key>& keys,
1400 std::string name,
1401 const WriterConfig& config)
1402 : Writer<Key, Value, UpdateTag>(
1403 topic.getWriter()->create(topic._keyFactory->create(keys), std::move(name), config)),
1404 _keyFactory(topic._keyFactory),
1405 _tagFactory(topic._tagFactory)
1406 {
1407 }
1408
1409 template<typename Key, typename Value, typename UpdateTag>
1411 : Writer<Key, Value, UpdateTag>(std::move(writer)),
1412 _keyFactory(std::move(writer._keyFactory)),
1413 _tagFactory(std::move(writer._tagFactory))
1414 {
1415 }
1416
1417 template<typename Key, typename Value, typename UpdateTag>
1420 {
1422 return *this;
1423 }
1424
1425 template<typename Key, typename Value, typename UpdateTag>
1426 void MultiKeyWriter<Key, Value, UpdateTag>::add(const Key& key, const Value& value)
1427 {
1428 Writer<Key, Value, UpdateTag>::_impl->publish(
1429 _keyFactory->create(key),
1430 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
1431 }
1432
1433 template<typename Key, typename Value, typename UpdateTag>
1434 void MultiKeyWriter<Key, Value, UpdateTag>::update(const Key& key, const Value& value)
1435 {
1436 Writer<Key, Value, UpdateTag>::_impl->publish(
1437 _keyFactory->create(key),
1438 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
1439 }
1440
1441 template<typename Key, typename Value, typename UpdateTag>
1442 template<typename UpdateValue>
1443 std::function<void(const Key&, const UpdateValue&)>
1445 {
1446 auto impl = Writer<Key, Value, UpdateTag>::_impl;
1447 auto updateTag = _tagFactory->create(tag);
1448 auto keyFactory = _keyFactory;
1449 return [impl, updateTag, keyFactory](const Key& key, const UpdateValue& value)
1450 {
1451 auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
1452 impl->publish(
1453 keyFactory->create(key),
1454 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1455 };
1456 }
1457
1458 template<typename Key, typename Value, typename UpdateTag>
1460 {
1461 Writer<Key, Value, UpdateTag>::_impl->publish(
1462 _keyFactory->create(key),
1463 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
1464 }
1465
1466 /// @private
1467 template<typename Value> std::function<std::function<bool(const Value&)>(const std::string&)> makeRegexFilter()
1468 {
1469 // std::regex's constructor accepts a const string&; it does not accept a string_view.
1470 return [](const std::string& criteria)
1471 {
1472 std::regex expr(criteria);
1473 return [expr = std::move(expr)](const Value& value)
1474 {
1475 std::ostringstream os;
1476 os << value;
1477 return std::regex_match(os.str(), expr);
1478 };
1479 };
1480 }
1481
1482 /// @private
1483 template<typename Key, typename Value, typename UpdateTag>
1484 std::function<std::function<bool(const Sample<Key, Value, UpdateTag>&)>(const SampleEventSeq&)>
1485 makeSampleEventFilter(const Topic<Key, Value, UpdateTag>&)
1486 {
1487 return [](const SampleEventSeq& criteria)
1488 {
1489 return [criteria](const Sample<Key, Value, UpdateTag>& sample)
1490 { return std::find(criteria.begin(), criteria.end(), sample.getEvent()) != criteria.end(); };
1491 };
1492 }
1493
1494 /// @private
1495 template<typename T, typename V, typename Enabler = void> struct RegexFilter
1496 {
1497 template<typename F> static void add(const F&) {}
1498 };
1499
1500 /// @private
1501 template<typename T, typename V> struct RegexFilter<T, V, std::enable_if_t<DataStormI::is_streamable<V>::value>>
1502 {
1503 template<typename F> static void add(const F& factory)
1504 {
1505 factory->set("_regex", makeRegexFilter<T>()); // Only set the _regex filter if the value is streamable
1506 }
1507 };
1508
1509 //
1510 // Topic template implementation
1511 //
1512 template<typename Key, typename Value, typename UpdateTag>
1513 Topic<Key, Value, UpdateTag>::Topic(const Node& node, std::string name) noexcept
1514 : _name(std::move(name)),
1515 _topicFactory(node._factory),
1516 _keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
1517 _tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
1518 _keyFilterFactories(std::make_shared<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>>()),
1519 _sampleFilterFactories(
1520 std::make_shared<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>>())
1521 {
1522 RegexFilter<Key, Key>::add(_keyFilterFactories);
1523 RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
1524 _sampleFilterFactories->set("_event", makeSampleEventFilter(*this));
1525 }
1526
1527 template<typename Key, typename Value, typename UpdateTag> Topic<Key, Value, UpdateTag>::~Topic()
1528 {
1529 std::lock_guard<std::mutex> lock(_mutex);
1530 if (_reader)
1531 {
1532 _reader->destroy();
1533 }
1534 if (_writer)
1535 {
1536 _writer->destroy();
1537 }
1538 }
1539
1540 template<typename Key, typename Value, typename UpdateTag>
1542 {
1543 std::lock_guard<std::mutex> lock(_mutex);
1544 if (_reader)
1545 {
1546 _reader->destroy();
1547 }
1548 if (_writer)
1549 {
1550 _writer->destroy();
1551 }
1552 _name = std::move(topic._name);
1553 _topicFactory = std::move(topic._topicFactory);
1554 _keyFactory = std::move(topic._keyFactory);
1555 _tagFactory = std::move(topic._tagFactory);
1556 _keyFilterFactories = std::move(topic._keyFilterFactories);
1557 _sampleFilterFactories = std::move(topic._sampleFilterFactories);
1558 _reader = std::move(topic._reader);
1559 _writer = std::move(topic._writer);
1560 _updaters = std::move(topic._updaters);
1561 return *this;
1562 }
1563
1564 template<typename Key, typename Value, typename UpdateTag>
1566 {
1567 return getReader()->hasWriters();
1568 }
1569
1570 template<typename Key, typename Value, typename UpdateTag>
1572 {
1573 getReader()->waitForWriters(static_cast<int>(count));
1574 }
1575
1576 template<typename Key, typename Value, typename UpdateTag>
1578 {
1579 getReader()->waitForWriters(-1);
1580 }
1581
1582 template<typename Key, typename Value, typename UpdateTag>
1584 {
1585 getReader()->setDefaultConfig(config);
1586 }
1587
1588 template<typename Key, typename Value, typename UpdateTag>
1590 {
1591 return getWriter()->hasReaders();
1592 }
1593
1594 template<typename Key, typename Value, typename UpdateTag>
1596 {
1597 getWriter()->waitForReaders(static_cast<int>(count));
1598 }
1599
1600 template<typename Key, typename Value, typename UpdateTag>
1602 {
1603 getWriter()->waitForReaders(-1);
1604 }
1605
1606 template<typename Key, typename Value, typename UpdateTag>
1608 {
1609 getWriter()->setDefaultConfig(config);
1610 }
1611
1612 template<typename Key, typename Value, typename UpdateTag>
1613 template<typename UpdateValue>
1615 const UpdateTag& tag,
1616 std::function<void(Value&, UpdateValue)> updater) noexcept
1617 {
1618 std::lock_guard<std::mutex> lock(_mutex);
1619 auto tagI = _tagFactory->create(std::move(tag));
1620 auto updaterImpl =
1621 updater ?
1622 [updater = std::move(updater)](const std::shared_ptr<DataStormI::Sample>& previous,
1623 const std::shared_ptr<DataStormI::Sample>& next,
1624 const Ice::CommunicatorPtr& communicator)
1625 {
1626 Value value;
1627 if (previous)
1628 {
1629 value = Cloner<Value>::clone(
1630 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
1631 }
1632 updater(value, Decoder<UpdateValue>::decode(communicator, next->getEncodedValue()));
1633 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(next)->setValue(std::move(value));
1634 } : std::function<void(const std::shared_ptr<DataStormI::Sample>&,
1635 const std::shared_ptr<DataStormI::Sample>&,
1636 const Ice::CommunicatorPtr&)>{};
1637
1638 if (_reader && !_writer)
1639 {
1640 _reader->setUpdater(tagI, updaterImpl);
1641 }
1642 else if (_writer && !_reader)
1643 {
1644 _writer->setUpdater(tagI, updaterImpl);
1645 }
1646 else if (_reader && _writer)
1647 {
1648 _reader->setUpdater(tagI, updaterImpl);
1649 _writer->setUpdater(tagI, updaterImpl);
1650 }
1651 else
1652 {
1653 _updaters[tagI] = updaterImpl;
1654 }
1655 }
1656
1657 template<typename Key, typename Value, typename UpdateTag>
1658 template<typename Criteria>
1660 std::string name,
1661 std::function<std::function<bool(const Key&)>(const Criteria&)> factory) noexcept
1662 {
1663 std::lock_guard<std::mutex> lock(_mutex);
1664 _keyFilterFactories->set(std::move(name), std::move(factory));
1665 }
1666
1667 template<typename Key, typename Value, typename UpdateTag>
1668 template<typename Criteria>
1670 std::string name,
1671 std::function<std::function<bool(const SampleType&)>(const Criteria&)> factory) noexcept
1672 {
1673 std::lock_guard<std::mutex> lock(_mutex);
1674 _sampleFilterFactories->set(std::move(name), std::move(factory));
1675 }
1676
1677 template<typename Key, typename Value, typename UpdateTag>
1678 std::shared_ptr<DataStormI::TopicReader> Topic<Key, Value, UpdateTag>::getReader() const
1679 {
1680 std::lock_guard<std::mutex> lock(_mutex);
1681 if (!_reader)
1682 {
1683 auto sampleFactory = std::make_shared<DataStormI::SampleFactoryT<Key, Value, UpdateTag>>();
1684 _reader = _topicFactory->createTopicReader(
1685 _name,
1686 _keyFactory,
1687 _tagFactory,
1688 std::move(sampleFactory),
1689 _keyFilterFactories,
1690 _sampleFilterFactories);
1691 _reader->setUpdaters(_writer ? _writer->getUpdaters() : _updaters);
1692 _updaters.clear();
1693 }
1694 return _reader;
1695 }
1696
1697 template<typename Key, typename Value, typename UpdateTag>
1698 std::shared_ptr<DataStormI::TopicWriter> Topic<Key, Value, UpdateTag>::getWriter() const
1699 {
1700 std::lock_guard<std::mutex> lock(_mutex);
1701 if (!_writer)
1702 {
1703 _writer = _topicFactory->createTopicWriter(
1704 _name,
1705 _keyFactory,
1706 _tagFactory,
1707 nullptr,
1708 _keyFilterFactories,
1709 _sampleFilterFactories);
1710 _writer->setUpdaters(_reader ? _reader->getUpdaters() : _updaters);
1711 _updaters.clear();
1712 }
1713 return _writer;
1714 }
1715
1716 template<typename Key, typename Value, typename UpdateTag>
1717 Ice::CommunicatorPtr Topic<Key, Value, UpdateTag>::getCommunicator() const noexcept
1718 {
1719 return _topicFactory->getCommunicator();
1720 }
1721}
1722
1723#if defined(__clang__)
1724# pragma clang diagnostic pop
1725#elif defined(__GNUC__)
1726# pragma GCC diagnostic pop
1727#endif
1728
1729#endif
FilteredKeyReader & operator=(FilteredKeyReader &&reader) noexcept
Move assignment operator.
Definition DataStorm.h:1206
FilteredKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Filter< KeyFilterCriteria > &keyFilter, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Constructs a new reader for the given key filter.
Definition DataStorm.h:1168
The filtered reader to read data elements whose key match a given filter.
Definition DataStorm.h:645
MultiKeyReader(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Constructs a new reader for the given keys.
Definition DataStorm.h:1125
MultiKeyReader & operator=(MultiKeyReader &&reader) noexcept
Move assignment operator.
Definition DataStorm.h:1160
The key reader to read the data element associated with a given set of keys.
Definition DataStorm.h:497
std::function< void(const Key &, const UpdateValue &)> partialUpdate(const UpdateTag &tag)
Gets a partial update generator function for the given partial update tag.
Definition DataStorm.h:1444
void add(const Key &key, const Value &value)
Adds the data element.
Definition DataStorm.h:1426
MultiKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Constructs a new writer for the given keys.
Definition DataStorm.h:1397
void remove(const Key &key) noexcept
Removes the data element.
Definition DataStorm.h:1459
void update(const Key &key, const Value &value)
Updates the data element.
Definition DataStorm.h:1434
MultiKeyWriter & operator=(MultiKeyWriter &&writer) noexcept
Move assignment operator.
Definition DataStorm.h:1419
The key writer to write data elements associated with a given set of keys.
Definition DataStorm.h:777
The Node class allows creating topic readers and writers.
Definition Node.h:27
The ReaderConfig class specifies configuration options specific to readers.
Definition Types.h:92
std::vector< Key > getConnectedKeys() const
Gets the keys for which writers are connected to this reader.
Definition DataStorm.h:982
void onConnectedWriters(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected writers and when a new writer conne...
Definition DataStorm.h:1050
std::vector< std::string > getConnectedWriters() const
Gets the connected writers.
Definition DataStorm.h:976
Reader & operator=(Reader &&reader) noexcept
Move assignment operator.
Definition DataStorm.h:947
void waitForNoWriters() const
Waits for writers to be offline.
Definition DataStorm.h:970
Value ValueType
The value type.
Definition DataStorm.h:115
Reader(Reader &&reader) noexcept
Move constructor.
Definition DataStorm.h:933
void waitForWriters(unsigned int count=1) const
Waits for the given number of writers to be online.
Definition DataStorm.h:964
bool hasUnread() const noexcept
Returns whether or not unread samples are available.
Definition DataStorm.h:1014
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
Definition DataStorm.h:1026
std::vector< Sample< Key, Value, UpdateTag > > getAllUnread()
Returns all the unread samples.
Definition DataStorm.h:995
Sample< Key, Value, UpdateTag > getNextUnread()
Returns the next unread sample.
Definition DataStorm.h:1020
Key KeyType
The key type.
Definition DataStorm.h:112
bool hasWriters() const noexcept
Indicates whether or not writers are online.
Definition DataStorm.h:958
void waitForUnread(unsigned int count=1) const
Waits for the given number of unread samples to be available.
Definition DataStorm.h:1008
void onSamples(std::function< void(std::vector< Sample< Key, Value, UpdateTag > >)> init, std::function< void(Sample< Key, Value, UpdateTag >)> queue) noexcept
Calls the given function to provide the initial set of unread samples and when new samples are queued...
Definition DataStorm.h:1058
~Reader()
Destructor.
Definition DataStorm.h:938
The Reader class is used to retrieve samples for a data element.
Definition DataStorm.h:109
const Value & getValue() const noexcept
Value ValueType
The type of the sample value.
Definition DataStorm.h:36
const std::string & getSession() const noexcept
const std::string & getOrigin() const noexcept
UpdateTag UpdateTagType
The type of the update tag.
Definition DataStorm.h:40
Key KeyType
The type of the sample key.
Definition DataStorm.h:33
SampleEvent getEvent() const noexcept
Gets the event associated with the sample.
Definition DataStorm.h:882
std::chrono::time_point< std::chrono::system_clock > getTimeStamp() const noexcept
const Key & getKey() const noexcept
A sample provides information about a data element update.
Definition DataStorm.h:30
SingleKeyReader & operator=(SingleKeyReader &&reader) noexcept
Move assignment operator.
Definition DataStorm.h:1118
SingleKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Key &key, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Constructs a new reader for the given key.
Definition DataStorm.h:1083
The key reader to read the data element associated with a given key.
Definition DataStorm.h:453
std::function< void(const UpdateValue &)> partialUpdate(const UpdateTag &tag)
Gets a partial update generator function for the given partial update tag.
Definition DataStorm.h:1377
void add(const Value &value)
Adds the data element.
Definition DataStorm.h:1360
void update(const Value &value)
Updates the data element.
Definition DataStorm.h:1368
void remove() noexcept
Removes the data element. This generates a SampleEvent::Remove sample.
Definition DataStorm.h:1389
SingleKeyWriter & operator=(SingleKeyWriter &&writer) noexcept
Move assignment operator.
Definition DataStorm.h:1353
SingleKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const Key &key, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Constructs a new writer for the given key.
Definition DataStorm.h:1333
The key writer to write the data element associated with a given key.
Definition DataStorm.h:727
void setUpdater(const UpdateTag &tag, std::function< void(Value &, UpdateValue)> updater) noexcept
Sets an updater function for the given update tag.
Definition DataStorm.h:1614
Topic(const Node &node, std::string name) noexcept
Constructs a new Topic for the topic with the given name.
Definition DataStorm.h:1513
void setKeyFilter(std::string name, std::function< std::function< bool(const Key &)>(const Criteria &)> factory) noexcept
Sets a key filter factory.
Definition DataStorm.h:1659
UpdateTag UpdateTagType
The topic's update tag type (defaults to std::string if not specified).
Definition DataStorm.h:304
Key KeyType
The topic's key type.
Definition DataStorm.h:298
void waitForReaders(unsigned int count=1) const
Waits for the given number of data readers to be online.
Definition DataStorm.h:1595
void waitForNoWriters() const
Waits for data writers to be offline.
Definition DataStorm.h:1577
Value ValueType
The topic's value type.
Definition DataStorm.h:301
bool hasReaders() const noexcept
Indicates whether or not data readers are online.
Definition DataStorm.h:1589
Sample< Key, Value, UpdateTag > SampleType
The topic's sample type.
Definition DataStorm.h:313
void setSampleFilter(std::string name, std::function< std::function< bool(const SampleType &)>(const Criteria &)> factory) noexcept
Sets a sample filter factory.
Definition DataStorm.h:1669
Reader< Key, Value, UpdateTag > ReaderType
The topic's reader type.
Definition DataStorm.h:310
bool hasWriters() const noexcept
Indicates whether or not data writers are online.
Definition DataStorm.h:1565
void setWriterDefaultConfig(const WriterConfig &config) noexcept
Sets the default configuration used to construct readers.
Definition DataStorm.h:1607
Topic & operator=(Topic &&topic) noexcept
Move assignment operator.
Definition DataStorm.h:1541
Writer< Key, Value, UpdateTag > WriterType
The topic's writer type.
Definition DataStorm.h:307
void setReaderDefaultConfig(const ReaderConfig &config) noexcept
Sets the default configuration used to construct readers.
Definition DataStorm.h:1583
Topic(Topic &&topic) noexcept
Move constructor.
Definition DataStorm.h:322
void waitForWriters(unsigned int count=1) const
Waits for the given number of data writers to be online.
Definition DataStorm.h:1571
void waitForNoReaders() const
Waits for data readers to be offline.
Definition DataStorm.h:1601
~Topic()
Destructor.
Definition DataStorm.h:1527
The Topic class.
Definition DataStorm.h:295
The WriterConfig class specifies configuration options specific to writers.
Definition Types.h:118
Sample< Key, Value, UpdateTag > getLast()
Gets the last written sample.
Definition DataStorm.h:1277
Writer & operator=(Writer &&writer) noexcept
Move assignment operator.
Definition DataStorm.h:1229
std::vector< Sample< Key, Value, UpdateTag > > getAll()
Gets all the written sample kept in the writer history.
Definition DataStorm.h:1288
std::vector< std::string > getConnectedReaders() const
Gets the connected readers.
Definition DataStorm.h:1258
void onConnectedReaders(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected readers and when a new reader conne...
Definition DataStorm.h:1325
void waitForReaders(unsigned int count=1) const
Waits for the given number of readers to be online.
Definition DataStorm.h:1246
~Writer()
Destructor.
Definition DataStorm.h:1220
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
Definition DataStorm.h:1301
bool hasReaders() const noexcept
Indicates whether or not readers are online.
Definition DataStorm.h:1240
std::vector< Key > getConnectedKeys() const
Gets the keys for which readers are connected to this writer.
Definition DataStorm.h:1264
Key KeyType
The key type.
Definition DataStorm.h:213
Value ValueType
The value type.
Definition DataStorm.h:216
Writer(Writer &&writer) noexcept
Move constructor.
Definition DataStorm.h:1216
void waitForNoReaders() const
Waits for readers to be offline.
Definition DataStorm.h:1252
The Writer class is used to write samples for a data element.
Definition DataStorm.h:210
SampleEvent
Describes the operation used by a data writer to update a data element.
Definition SampleEvent.h:34
@ Update
The data writer updated the element.
Definition SampleEvent.h:39
@ Remove
The data writer removed the element.
Definition SampleEvent.h:45
@ Add
The data writer added the element.
Definition SampleEvent.h:36
SingleKeyReader< K, V, UT > makeSingleKeyReader(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a key reader for the given topic and key.
Definition DataStorm.h:546
FilteredKeyReader< K, V, UT > makeFilteredKeyReader(const Topic< K, V, UT > &topic, const Filter< KFC > &filter, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a new filtered reader for the given topic and key filter.
Definition DataStorm.h:696
MultiKeyReader< K, V, UT > makeAnyKeyReader(const Topic< K, V, UT > &topic, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates an any-key reader for the given topic.
Definition DataStorm.h:616
std::vector< SampleEvent > SampleEventSeq
A sequence of sample events.
Definition SampleEvent.h:55
MultiKeyWriter< K, V, UT > makeMultiKeyWriter(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Creates a multi-key writer for the given topic and keys.
Definition DataStorm.h:851
SingleKeyWriter< K, V, UT > makeSingleKeyWriter(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Creates a key writer for the given topic and key.
Definition DataStorm.h:835
MultiKeyReader< K, V, UT > makeMultiKeyReader(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, std::string name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a multi-key reader for the given topic.
Definition DataStorm.h:581
std::ostream & operator<<(std::ostream &os, const SampleEventSeq &types)
Converts the given sample type vector to a string and add it to the stream.
Definition DataStorm.h:89
MultiKeyWriter< K, V, UT > makeAnyKeyWriter(const Topic< K, V, UT > &topic, std::string name=std::string(), const WriterConfig &config=WriterConfig())
Creates an any-key writer for the given topic.
Definition DataStorm.h:866
CallbackReason
The callback action enumerator specifies the reason why a callback is called.
Definition Types.h:141
Data-centric, broker-less publish/subscribe framework. C++ only.
Definition DataStorm.h:24
std::shared_ptr< Communicator > CommunicatorPtr
A shared pointer to a Communicator.
void print(std::ostream &stream, T v)
Prints a value to a stream.
The Ice RPC framework.
Definition SampleEvent.h:59
static T clone(const T &value) noexcept
Clones the given value.
Definition Types.h:186
static T decode(const Ice::CommunicatorPtr &communicator, const Ice::ByteSeq &value) noexcept
Decodes a value.
Definition Types.h:208
static Ice::ByteSeq encode(const Ice::CommunicatorPtr &communicator, const T &value) noexcept
Encodes the given value.
Definition Types.h:197
The Encoder template provides a method to encode decode user types.
Definition Types.h:154
Filter(std::string name, TT &&criteria) noexcept
Constructs a filter structure with the given name and criteria.
Definition DataStorm.h:437
std::string name
The filter name.
Definition DataStorm.h:443
T criteria
The filter criteria value.
Definition DataStorm.h:446
Filter structure to specify the filter name and criteria value.
Definition DataStorm.h:432