14 #include <DataStorm/Sample.h> 42 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
class Sample 67 SampleEvent
getEvent() const noexcept;
74 const Key&
getKey() const noexcept;
85 const Value&
getValue() const noexcept;
104 std::chrono::time_point<std::chrono::system_clock>
getTimeStamp() const noexcept;
133 std::shared_ptr<
DataStormI::SampleT<Key, Value, UpdateTag>> _impl;
144 operator<<(std::ostream& os, SampleEvent sampleType)
148 case SampleEvent::Add:
151 case SampleEvent::Update:
154 case SampleEvent::Remove:
157 case SampleEvent::PartialUpdate:
158 os <<
"PartialUpdate";
161 os << static_cast<int>(sampleType);
175 operator<<(std::ostream& os, const std::vector<SampleEvent>& types)
178 for(
auto p = types.begin(); p != types.end(); ++p)
180 if(p != types.begin())
198 template<
typename K,
typename V,
typename U>
200 operator<<(std::ostream& os, const Sample<K, V, U>& sample)
202 os << sample.getValue();
211 template<
typename Key,
typename Value,
typename UpdateTag>
251 bool hasWriters() const noexcept;
259 void waitForWriters(
unsigned int count = 1) const;
265 void waitForNoWriters() const;
272 std::vector<std::
string> getConnectedWriters() const noexcept;
279 std::vector<Key> getConnectedKeys() const noexcept;
286 std::vector<
Sample<Key, Value, UpdateTag>> getAllUnread() noexcept;
292 void waitForUnread(
unsigned int count = 1) const;
299 bool hasUnread() const noexcept;
307 Sample<Key, Value, UpdateTag> getNextUnread();
324 void onConnectedKeys(std::function<
void(std::vector<Key>)> init,
339 void onConnectedWriters(std::function<
void(std::vector<std::
string>)> init,
340 std::function<
void(
CallbackReason, std::
string)> update) noexcept;
355 void onSamples(std::function<
void(std::vector<
Sample<Key, Value, UpdateTag>>)> init,
356 std::function<
void(
Sample<Key, Value, UpdateTag>)> queue) noexcept;
361 Reader(const std::shared_ptr<
DataStormI::DataReader>& impl) noexcept : _impl(impl)
366 std::shared_ptr<DataStormI::DataReader> _impl;
374 template<
typename Key,
typename Value,
typename UpdateTag>
414 bool hasReaders() const noexcept;
422 void waitForReaders(
unsigned int count = 1) const;
428 void waitForNoReaders() const;
435 std::vector<std::
string> getConnectedReaders() const noexcept;
442 std::vector<Key> getConnectedKeys() const noexcept;
450 Sample<Key, Value, UpdateTag> getLast();
457 std::vector<
Sample<Key, Value, UpdateTag>> getAll() noexcept;
474 void onConnectedKeys(std::function<
void(std::vector<Key>)> init,
489 void onConnectedReaders(std::function<
void(std::vector<std::
string>)> init,
490 std::function<
void(
CallbackReason, std::
string)> update) noexcept;
495 Writer(const std::shared_ptr<
DataStormI::DataWriter>& impl) noexcept : _impl(impl)
500 std::shared_ptr<DataStormI::DataWriter> _impl;
511 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
552 Topic(
const Node& node,
const std::string& name) noexcept;
578 bool hasWriters() const noexcept;
586 void waitForWriters(
unsigned int count = 1) const;
592 void waitForNoWriters() const;
599 void setWriterDefaultConfig(const
WriterConfig& config) noexcept;
606 bool hasReaders() const noexcept;
614 void waitForReaders(
unsigned int count = 1) const;
620 void waitForNoReaders() const;
627 void setReaderDefaultConfig(const
ReaderConfig& config) noexcept;
638 template<typename UpdateValue>
639 void setUpdater(const UpdateTag& tag, std::function<
void (Value&, UpdateValue)> updater) noexcept;
649 template<typename Criteria>
650 void setKeyFilter(const std::
string& name,
651 std::function<std::function<
bool (const Key&)> (const Criteria&)> factory) noexcept;
661 template<typename Criteria>
662 void setSampleFilter(const std::
string& name,
663 std::function<std::function<
bool (const
SampleType&)> (const Criteria&)> factory) noexcept;
667 std::shared_ptr<
DataStormI::TopicReader> getReader() const noexcept;
668 std::shared_ptr<
DataStormI::TopicWriter> getWriter() const noexcept;
669 std::shared_ptr<Ice::Communicator> getCommunicator() const noexcept;
677 const std::
string _name;
678 const std::shared_ptr<
DataStormI::TopicFactory> _topicFactory;
679 const std::shared_ptr<
DataStormI::KeyFactoryT<Key>> _keyFactory;
680 const std::shared_ptr<
DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
683 const std::shared_ptr<
DataStormI::FilterManagerT<
DataStormI::SampleT<Key, Value, UpdateTag>>> _sampleFilterFactories;
685 mutable std::mutex _mutex;
686 mutable std::shared_ptr<
DataStormI::TopicReader> _reader;
687 mutable std::shared_ptr<
DataStormI::TopicWriter> _writer;
704 template<
typename TT>
705 Filter(
const std::string& name, TT&& criteria) noexcept : name(name), criteria(std::forward<TT>(criteria))
721 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
737 const std::string& name = std::string(),
752 template<
typename SampleFilterCriteria>
756 const std::string& name = std::string(),
779 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
795 const std::vector<Key>& keys,
796 const std::string& name = std::string(),
812 template<
typename SampleFilterCriteria>
814 const std::vector<Key>& keys,
816 const std::string& name = std::string(),
843 template<
typename K,
typename V,
typename UT>
847 const std::string& name = std::string(),
863 template<
typename SFC,
typename K,
typename V,
typename UT>
864 SingleKeyReader<K, V, UT>
868 const std::string& name = std::string(),
885 template<
typename K,
typename V,
typename UT> MultiKeyReader<K, V, UT>
888 const std::string& name = std::string(),
906 template<
typename SFC,
typename K,
typename V,
typename UT> MultiKeyReader<K, V, UT>
910 const std::string& name = std::string(),
926 template<
typename K,
typename V,
typename UT> MultiKeyReader<K, V, UT>
928 const std::string& name = std::string(),
945 template<
typename SFC,
typename K,
typename V,
typename UT> MultiKeyReader<K, V, UT>
948 const std::string& name = std::string(),
959 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
976 template<
typename KeyFilterCriteria>
979 const std::string& name = std::string(),
996 template<
typename KeyFilterCriteria,
typename SampleFilterCriteria>
1000 const std::string& name = std::string(),
1030 const std::string& name = std::string(),
1046 template<
typename KFC,
typename SFC,
typename K,
typename V,
typename UT> FilteredKeyReader<K, V, UT>
1050 const std::string& name = std::string(),
1061 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
1077 const std::string& name = std::string(),
1100 void add(
const Value& value) noexcept;
1108 void update(
const Value& value) noexcept;
1120 template<
typename UpdateValue> std::function<void(const UpdateValue&)> partialUpdate(
const UpdateTag& tag) noexcept;
1125 void remove() noexcept;
1129 const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
1137 template<
typename Key,
typename Value,
typename UpdateTag=std::
string>
1153 const std::vector<Key>& keys,
1154 const std::string& name = std::string(),
1178 void add(
const Key& key,
const Value& value) noexcept;
1187 void update(
const Key& key,
const Value& value) noexcept;
1199 template<
typename UpdateValue> std::function<void(const Key&, const UpdateValue&)>
1200 partialUpdate(
const UpdateTag& tag) noexcept;
1207 void remove(
const Key& key) noexcept;
1211 const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
1212 const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
1227 const std::string& name = std::string(),
1242 template<
typename K,
typename V,
typename UT> MultiKeyWriter<K, V, UT>
1245 const std::string& name = std::string(),
1259 template<
typename K,
typename V,
typename UT> MultiKeyWriter<K, V, UT>
1261 const std::string& name = std::string(),
1279 template<
typename Key,
typename Value,
typename UpdateTag> SampleEvent
1282 return _impl->event;
1285 template<
typename Key,
typename Value,
typename UpdateTag>
const Key&
1288 return _impl->getKey();
1291 template<
typename Key,
typename Value,
typename UpdateTag>
const Value&
1294 return _impl->getValue();
1297 template<
typename Key,
typename Value,
typename UpdateTag> UpdateTag
1300 return _impl->getTag();
1303 template<
typename Key,
typename Value,
typename UpdateTag> std::chrono::time_point<std::chrono::system_clock>
1306 return _impl->timestamp;
1309 template<
typename Key,
typename Value,
typename UpdateTag> std::string
1312 return _impl->origin;
1315 template<
typename Key,
typename Value,
typename UpdateTag> std::string
1318 return _impl->session;
1322 const std::shared_ptr<DataStormI::Sample>& impl) noexcept :
1323 _impl(std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(impl))
1330 template<
typename Key,
typename Value,
typename UpdateTag>
1335 template<
typename Key,
typename Value,
typename UpdateTag>
1347 _impl = std::move(reader._impl);
1351 template<
typename Key,
typename Value,
typename UpdateTag>
bool 1354 return _impl->hasWriters();
1357 template<
typename Key,
typename Value,
typename UpdateTag>
void 1360 _impl->waitForWriters(count);
1363 template<
typename Key,
typename Value,
typename UpdateTag>
void 1366 _impl->waitForWriters(-1);
1369 template<
typename Key,
typename Value,
typename UpdateTag> std::vector<std::string>
1372 return _impl->getConnectedElements();
1375 template<
typename Key,
typename Value,
typename UpdateTag> std::vector<Key>
1378 std::vector<Key> keys;
1379 auto connectedKeys = _impl->getConnectedKeys();
1380 keys.reserve(connectedKeys.size());
1381 for(
const auto& k : connectedKeys)
1383 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->
get());
1388 template<
typename Key,
typename Value,
typename UpdateTag> std::vector<Sample<Key, Value, UpdateTag>>
1391 auto unread = _impl->getAllUnread();
1392 std::vector<Sample<Key, Value, UpdateTag>> samples;
1393 samples.reserve(unread.size());
1394 for(
auto sample : unread)
1396 samples.emplace_back(sample);
1401 template<
typename Key,
typename Value,
typename UpdateTag>
void 1404 _impl->waitForUnread(count);
1407 template<
typename Key,
typename Value,
typename UpdateTag>
bool 1410 return _impl->hasUnread();
1419 template<
typename Key,
typename Value,
typename UpdateTag>
void 1423 _impl->onConnectedKeys(init ? [init](std::vector<std::shared_ptr<DataStormI::Key>> connectedKeys)
1425 std::vector<Key> keys;
1426 keys.reserve(connectedKeys.size());
1427 for(
const auto& k : connectedKeys)
1429 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->
get());
1432 } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>(),
1433 update ? [update](
CallbackReason action, std::shared_ptr<DataStormI::Key> key)
1435 update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->
get());
1436 } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>());
1439 template<
typename Key,
typename Value,
typename UpdateTag>
void 1441 std::function<
void(
CallbackReason, std::string)> update) noexcept
1443 _impl->onConnectedElements(init, update);
1446 template<
typename Key,
typename Value,
typename UpdateTag>
void 1450 auto communicator = _impl->getCommunicator();
1451 _impl->onSamples(init ? [communicator, init](
const std::vector<std::shared_ptr<DataStormI::Sample>>& samplesI)
1453 std::vector<Sample<Key, Value, UpdateTag>> samples;
1454 samples.reserve(samplesI.size());
1455 for(
const auto& s : samplesI)
1457 samples.emplace_back(s);
1459 init(move(samples));
1460 } : std::function<void(const std::vector<std::shared_ptr<DataStormI::Sample>>&)>(),
1461 update ? [communicator, update](
const std::shared_ptr<DataStormI::Sample>& sampleI)
1464 } : std::function<void(const std::shared_ptr<DataStormI::Sample>&)>());
1467 template<
typename Key,
typename Value,
typename UpdateTag>
1470 const std::string& name,
1476 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename SFC>
1480 const std::string& name,
1486 DataStormI::EncoderT<SFC>::encode(topic.getCommunicator(),
1491 template<
typename Key,
typename Value,
typename UpdateTag>
1504 template<
typename Key,
typename Value,
typename UpdateTag>
1506 const std::vector<Key>& keys,
1507 const std::string& name,
1515 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename SFC>
1517 const std::vector<Key>& keys,
1519 const std::string& name,
1530 template<
typename Key,
typename Value,
typename UpdateTag>
1536 template<
typename Key,
typename Value,
typename UpdateTag>
1544 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename KFC>
1547 const std::string& name,
1549 Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(topic._keyFilterFactories->create(filter.name,
1556 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename KFC,
typename SFC>
1560 const std::string& name,
1562 Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(topic._keyFilterFactories->create(keyFilter.name,
1563 keyFilter.criteria),
1567 Encoder<SFC>::encode(topic.getCommunicator(),
1568 sampleFilter.criteria)))
1572 template<
typename Key,
typename Value,
typename UpdateTag>
1588 template<
typename Key,
typename Value,
typename UpdateTag>
1596 _impl = std::move(writer._impl);
1600 template<
typename Key,
typename Value,
typename UpdateTag>
1609 template<
typename Key,
typename Value,
typename UpdateTag>
bool 1612 return _impl->hasReaders();
1615 template<
typename Key,
typename Value,
typename UpdateTag>
void 1618 return _impl->waitForReaders(count);
1621 template<
typename Key,
typename Value,
typename UpdateTag>
void 1624 return _impl->waitForReaders(-1);
1627 template<
typename Key,
typename Value,
typename UpdateTag> std::vector<std::string>
1630 return _impl->getConnectedElements();
1633 template<
typename Key,
typename Value,
typename UpdateTag> std::vector<Key>
1636 std::vector<Key> keys;
1637 auto connectedKeys = _impl->getConnectedKeys();
1638 keys.reserve(connectedKeys.size());
1639 for(
const auto& k : connectedKeys)
1641 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->
get());
1649 auto sample = _impl->getLast();
1652 throw std::logic_error(
"no sample");
1657 template<
typename Key,
typename Value,
typename UpdateTag> std::vector<Sample<Key, Value, UpdateTag>>
1660 auto all = _impl->getAll();
1661 std::vector<Sample<Key, Value, UpdateTag>> samples;
1662 samples.reserve(all.size());
1663 for(
auto sample : all)
1665 samples.emplace_back(sample);
1670 template<
typename Key,
typename Value,
typename UpdateTag>
void 1674 _impl->onConnectedKeys(init ? [init](std::vector<std::shared_ptr<DataStormI::Key>> connectedKeys)
1676 std::vector<Key> keys;
1677 keys.reserve(connectedKeys.size());
1678 for(
const auto& k : connectedKeys)
1680 keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->
get());
1683 } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>(),
1684 update ? [update](
CallbackReason action, std::shared_ptr<DataStormI::Key> key)
1686 update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->
get());
1687 } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>());
1690 template<
typename Key,
typename Value,
typename UpdateTag>
void 1692 std::function<
void(
CallbackReason, std::string)> update) noexcept
1694 _impl->onConnectedElements(init, update);
1697 template<
typename Key,
typename Value,
typename UpdateTag>
1700 const std::string& name,
1705 _tagFactory(topic._tagFactory)
1709 template<
typename Key,
typename Value,
typename UpdateTag>
1712 _tagFactory(std::move(writer._tagFactory))
1723 template<
typename Key,
typename Value,
typename UpdateTag>
void 1727 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
1730 template<
typename Key,
typename Value,
typename UpdateTag>
void 1734 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
1737 template<
typename Key,
typename Value,
typename UpdateTag>
1738 template<
typename UpdateValue> std::function<void(const UpdateValue&)>
1742 auto updateTag = _tagFactory->create(tag);
1743 return [impl, updateTag](
const UpdateValue& value) {
1745 impl->publish(
nullptr, std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1749 template<
typename Key,
typename Value,
typename UpdateTag>
void 1753 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
1756 template<
typename Key,
typename Value,
typename UpdateTag>
1758 const std::vector<Key>& keys,
1759 const std::string& name,
1764 _keyFactory(topic._keyFactory),
1765 _tagFactory(topic._tagFactory)
1769 template<
typename Key,
typename Value,
typename UpdateTag>
1772 _keyFactory(std::move(writer._keyFactory)),
1773 _tagFactory(std::move(writer._tagFactory))
1784 template<
typename Key,
typename Value,
typename UpdateTag>
void 1788 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
1791 template<
typename Key,
typename Value,
typename UpdateTag>
void 1795 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
1798 template<
typename Key,
typename Value,
typename UpdateTag>
1799 template<
typename UpdateValue> std::function<void(const Key&, const UpdateValue&)>
1803 auto updateTag = _tagFactory->create(tag);
1804 auto keyFactory = _keyFactory;
1805 return [impl, updateTag, keyFactory](
const Key& key,
const UpdateValue& value) {
1807 impl->publish(keyFactory->create(key),
1808 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1812 template<
typename Key,
typename Value,
typename UpdateTag>
void 1816 std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
1819 #if !defined(__clang__) && defined(__GNUC__) && ((__GNUC__* 100) + __GNUC_MINOR__) < 490 1828 RegExp(
const std::string& criteria)
1830 if(regcomp(&_expr, criteria.c_str(), REG_EXTENDED) != 0)
1832 throw std::invalid_argument(criteria);
1841 bool match(
const std::string& value)
const 1843 return regexec(&_expr, value.c_str(), 0, 0, 0) == 0;
1854 template<
typename Value> std::function<std::function<bool (const Value&)> (
const std::string&)>
1855 makeRegexFilter() noexcept
1857 return [](
const std::string& criteria)
1859 #if !defined(__clang__) && defined(__GNUC__) && ((__GNUC__* 100) + __GNUC_MINOR__) < 490 1860 auto expr = std::make_shared<RegExp>(criteria);
1861 return [expr](
const Value& value)
1863 std::ostringstream os;
1865 return expr->match(os.str());
1868 std::regex expr(criteria);
1869 return [expr](
const Value& value)
1871 std::ostringstream os;
1873 return std::regex_match(os.str(), expr);
1880 template<
typename Key,
typename Value,
typename UpdateTag>
1881 std::function<std::function<bool (const Sample<Key, Value, UpdateTag>&)> (
const std::vector<SampleEvent>&)>
1882 makeSampleEventFilter(
const Topic<Key, Value, UpdateTag>& topic) noexcept
1884 return [](
const std::vector<SampleEvent>& criteria)
1886 return [criteria](
const Sample<Key, Value, UpdateTag>& sample)
1888 return std::find(criteria.begin(), criteria.end(), sample.getEvent()) != criteria.end();
1894 template<
typename T,
typename V,
typename Enabler=
void>
1897 template<
typename F>
static void 1904 template<
typename T,
typename V>
1905 struct RegexFilter<T, V, typename std::enable_if<DataStormI::is_streamable<V>::value>::type>
1907 template<
typename F>
static void 1910 factory->set(
"_regex", makeRegexFilter<T>());
1917 template<
typename Key,
typename Value,
typename UpdateTag>
1920 _topicFactory(node._factory),
1921 _keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
1922 _tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
1923 _keyFilterFactories(DataStormI::FilterManagerT<DataStormI::KeyT<Key>>::create()),
1924 _sampleFilterFactories(DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>::create())
1926 RegexFilter<Key, Key>::add(_keyFilterFactories);
1927 RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
1928 _sampleFilterFactories->set(
"_event", makeSampleEventFilter(*
this));
1931 template<
typename Key,
typename Value,
typename UpdateTag>
1933 _name(std::move(topic._name)),
1934 _topicFactory(std::move(topic._topicFactory)),
1935 _keyFactory(std::move(topic._keyFactory)),
1936 _tagFactory(std::move(topic._tagFactory)),
1937 _keyFilterFactories(std::move(topic._keyFilterFactories)),
1938 _sampleFilterFactories(std::move(topic._sampleFilterFactories)),
1939 _reader(std::move(topic._reader)),
1940 _writer(std::move(topic._writer)),
1941 _updaters(std::move(topic._updaters))
1945 template<
typename Key,
typename Value,
typename UpdateTag>
1948 std::lock_guard<std::mutex> lock(_mutex);
1962 _name = std::move(topic._name);
1963 _topicFactory = std::move(topic._topicFactory);
1964 _keyFactory = std::move(topic._keyFactory);
1965 _tagFactory = std::move(topic._tagFactory);
1966 _keyFilterFactories = std::move(topic._keyFilterFactories);
1967 _sampleFilterFactories = std::move(topic._sampleFilterFactories);
1968 _reader = std::move(topic._reader);
1969 _writer = std::move(topic._writer);
1970 _updaters = std::move(topic._updaters);
1974 template<
typename Key,
typename Value,
typename UpdateTag>
bool 1977 return getReader()->hasWriters();
1980 template<
typename Key,
typename Value,
typename UpdateTag>
void 1983 getReader()->waitForWriters(count);
1986 template<
typename Key,
typename Value,
typename UpdateTag>
void 1989 getReader()->waitForWriters(-1);
1992 template<
typename Key,
typename Value,
typename UpdateTag>
void 1995 getReader()->setDefaultConfig(config);
1998 template<
typename Key,
typename Value,
typename UpdateTag>
bool 2001 return getWriter()->hasReaders();
2004 template<
typename Key,
typename Value,
typename UpdateTag>
void 2007 getWriter()->waitForReaders(count);
2010 template<
typename Key,
typename Value,
typename UpdateTag>
void 2013 getWriter()->waitForReaders(-1);
2016 template<
typename Key,
typename Value,
typename UpdateTag>
void 2019 getWriter()->setDefaultConfig(config);
2022 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename UpdateValue>
void 2025 std::lock_guard<std::mutex> lock(_mutex);
2026 auto tagI = _tagFactory->create(std::move(tag));
2027 auto updaterImpl = updater ? [updater](
const std::shared_ptr<DataStormI::Sample>& previous,
2028 const std::shared_ptr<DataStormI::Sample>& next,
2029 const std::shared_ptr<Ice::Communicator>& communicator)
2035 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
2038 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(next)->setValue(std::move(value));
2039 } : std::function<void(const std::shared_ptr<DataStormI::Sample>&,
2040 const std::shared_ptr<DataStormI::Sample>&,
2041 const std::shared_ptr<Ice::Communicator>&)>();
2043 if(_reader && !_writer)
2045 _reader->setUpdater(tagI, updaterImpl);
2047 else if(_writer && !_reader)
2049 _writer->setUpdater(tagI, updaterImpl);
2051 else if(_reader && _writer)
2053 _reader->setUpdater(tagI, updaterImpl);
2054 _writer->setUpdater(tagI, updaterImpl);
2058 _updaters[tagI] = updaterImpl;
2062 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename Criteria>
void 2064 std::function<std::function<
bool (
const Key&)>(
const Criteria&)> factory) noexcept
2066 std::lock_guard<std::mutex> lock(_mutex);
2067 _keyFilterFactories->set(name, factory);
2070 template<
typename Key,
typename Value,
typename UpdateTag>
template<
typename Criteria>
void 2074 std::lock_guard<std::mutex> lock(_mutex);
2075 _sampleFilterFactories->set(name, factory);
2078 template<
typename Key,
typename Value,
typename UpdateTag> std::shared_ptr<DataStormI::TopicReader>
2081 std::lock_guard<std::mutex> lock(_mutex);
2084 auto sampleFactory = std::make_shared<DataStormI::SampleFactoryT<Key, Value, UpdateTag>>();
2085 _reader = _topicFactory->createTopicReader(_name, _keyFactory, _tagFactory, sampleFactory, _keyFilterFactories,
2086 _sampleFilterFactories);
2087 _reader->setUpdaters(_writer ? _writer->getUpdaters() : _updaters);
2093 template<
typename Key,
typename Value,
typename UpdateTag> std::shared_ptr<DataStormI::TopicWriter>
2094 Topic<Key, Value, UpdateTag>::getWriter() const noexcept
2096 std::lock_guard<std::mutex> lock(_mutex);
2099 _writer = _topicFactory->createTopicWriter(_name, _keyFactory, _tagFactory,
nullptr, _keyFilterFactories,
2100 _sampleFilterFactories);
2101 _writer->setUpdaters(_reader ? _reader->getUpdaters() : _updaters);
2107 template<
typename Key,
typename Value,
typename UpdateTag> std::shared_ptr<Ice::Communicator>
2108 Topic<Key, Value, UpdateTag>::getCommunicator() const noexcept
2110 return _topicFactory->getCommunicator();
UpdateTag UpdateTagType
The topic's update tag type (defaults to std::string if not specified).
Definition: DataStorm.h:529
Sample< Key, Value, UpdateTag > getLast()
Get the last written sample.
Definition: DataStorm.h:1647
void waitForWriters(unsigned int count=1) const
Wait for given number of data writers to be online.
Definition: DataStorm.h:1981
bool hasWriters() const noexcept
Indicates whether or not data writers are online.
Definition: DataStorm.h:1975
std::string name
The filter name.
Definition: DataStorm.h:710
std::string getOrigin() const noexcept
The origin of the sample.
Definition: DataStorm.h:1310
void remove(const Key &key) noexcept
Remove the data element.
Definition: DataStorm.h:1813
Topic(const Node &node, const std::string &name) noexcept
Construct a new Topic for the topic with the given name.
Definition: DataStorm.h:1918
MultiKeyReader(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Construct a new reader for the given keys.
Definition: DataStorm.h:1505
void update(const Key &key, const Value &value) noexcept
Update the data element.
Definition: DataStorm.h:1792
MultiKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Construct a new writer for the given keys.
Definition: DataStorm.h:1757
Value ValueType
The type of the sample value.
Definition: DataStorm.h:54
std::function< void(const Key &, const UpdateValue &)> partialUpdate(const UpdateTag &tag) noexcept
Get a partial udpate generator function for the given partial update tag.
Definition: DataStorm.h:1800
bool hasWriters() const noexcept
Indicates whether or not writers are online.
Definition: DataStorm.h:1352
Value ValueType
The value type.
Definition: DataStorm.h:224
FilteredKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Filter< KeyFilterCriteria > &keyFilter, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig())
Construct a new reader for the given key filter.
void setUpdater(const UpdateTag &tag, std::function< void(Value &, UpdateValue)> updater) noexcept
Set an updater function for the given update tag.
Definition: DataStorm.h:2023
Key KeyType
The key type.
Definition: DataStorm.h:382
const Value & getValue() const noexcept
The value of the sample.
Definition: DataStorm.h:1292
void add(const Key &key, const Value &value) noexcept
Add the data element.
Definition: DataStorm.h:1785
Value ValueType
The topic's value type.
Definition: DataStorm.h:524
SingleKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Key &key, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Construct a new reader for the given key.
Definition: DataStorm.h:1468
T criteria
The filter criteria value.
Definition: DataStorm.h:713
std::chrono::time_point< std::chrono::system_clock > getTimeStamp() const noexcept
The timestamp of the sample.
Definition: DataStorm.h:1304
Reader(Reader &&reader) noexcept
Transfers the given reader to this reader.
Definition: DataStorm.h:1331
MultiKeyWriter & operator=(MultiKeyWriter &&writer) noexcept
Move assignement operator.
Definition: DataStorm.h:1778
The filtered reader to read data elements whose key match a given filter.
Definition: DataStorm.h:960
std::function< void(const UpdateValue &)> partialUpdate(const UpdateTag &tag) noexcept
Get a partial udpate generator function for the given partial update tag.
Definition: DataStorm.h:1739
Topic & operator=(Topic &&topic) noexcept
Move assignement operator.
Definition: DataStorm.h:1960
std::vector< Key > getConnectedKeys() const noexcept
Get the keys for which writers are connected to this reader.
Definition: DataStorm.h:1376
static T clone(const T &value) noexcept
Clone the given value.
Definition: Types.h:277
Value ValueType
The value type.
Definition: DataStorm.h:387
~Topic()
Destruct the Topic.
Definition: DataStorm.h:1946
The Decoder template provides a method to decode user types.
Definition: Types.h:241
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
Definition: DataStorm.h:1671
bool hasUnread() const noexcept
Returns wether or not unread samples are available.
Definition: DataStorm.h:1408
Key KeyType
The type of the sample key.
Definition: DataStorm.h:49
MultiKeyWriter< K, V, UT > makeAnyKeyWriter(const Topic< K, V, UT > &topic, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Creates an any-key writer for the given topic.
Definition: DataStorm.h:1260
The WriterConfig class specifies configuration options specific to writers.
Definition: Types.h:161
~Reader()
Destruct the reader.
Definition: DataStorm.h:1336
FilteredKeyReader< K, V, UT > makeFilteredKeyReader(const Topic< K, V, UT > &topic, const Filter< KFC > &filter, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a new filtered reader for the given topic and key filter.
Definition: DataStorm.h:1028
Writer(Writer &&writer) noexcept
Transfers the given writer to this writer.
Definition: DataStorm.h:1589
The Topic class.
Definition: DataStorm.h:512
std::vector< std::string > getConnectedReaders() const noexcept
Get the connected readers.
Definition: DataStorm.h:1628
bool hasReaders() const noexcept
Indicates whether or not readers are online.
Definition: DataStorm.h:1610
void onConnectedWriters(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected writers and when a new writer conne...
Definition: DataStorm.h:1440
The Reader class is used to retrieve samples for a data element.
Definition: DataStorm.h:212
MultiKeyReader< K, V, UT > makeMultiKeyReader(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Creates a multi-key reader for the given topic.
Definition: DataStorm.h:886
The writer class is used to write samples for a data element.
Definition: DataStorm.h:375
MultiKeyWriter< K, V, UT > makeMultiKeyWriter(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Creates a multi-key writer for the given topic and keys.
Definition: DataStorm.h:1243
void waitForReaders(unsigned int count=1) const
Wait for given number of data readers to be online.
Definition: DataStorm.h:2005
void remove() noexcept
Remove the data element.
Definition: DataStorm.h:1750
void setSampleFilter(const std::string &name, std::function< std::function< bool(const SampleType &)>(const Criteria &)> factory) noexcept
Set a sample filter factory.
Definition: DataStorm.h:2071
MultiKeyReader< K, V, UT > makeAnyKeyReader(const Topic< K, V, UT > &topic, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Creates an any-key reader for the given topic.
Definition: DataStorm.h:927
The Encoder template provides a method to encode decode user types.
Definition: Types.h:214
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
Definition: DataStorm.h:1420
The key writer to write the data element associated with a given key.
Definition: DataStorm.h:1062
Key KeyType
The topic's key type.
Definition: DataStorm.h:519
std::vector< Key > getConnectedKeys() const noexcept
Get the keys for which readers are connected to this writer.
Definition: DataStorm.h:1634
MultiKeyReader & operator=(MultiKeyReader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1538
UpdateTag getUpdateTag() const noexcept
The update tag for the partial update.
Definition: DataStorm.h:1298
CallbackReason
The callback action enumurator specifies the reason why a callback is called.
Definition: Types.h:195
SampleEvent getEvent() const noexcept
The event associated with the sample.
Definition: DataStorm.h:1280
Filter(const std::string &name, TT &&criteria) noexcept
Construct a filter structure with the given name and criteria.
Definition: DataStorm.h:705
void waitForNoWriters() const
Wait for readers to be offline.
Definition: DataStorm.h:1364
void waitForUnread(unsigned int count=1) const
Wait for given number of unread samples to be available.
Definition: DataStorm.h:1402
void onSamples(std::function< void(std::vector< Sample< Key, Value, UpdateTag >>)> init, std::function< void(Sample< Key, Value, UpdateTag >)> queue) noexcept
Calls the given function to provide the initial set of unread samples and when new samples are queued...
Definition: DataStorm.h:1447
FilteredKeyReader & operator=(FilteredKeyReader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1579
void add(const Value &value) noexcept
Add the data element.
Definition: DataStorm.h:1724
SingleKeyReader & operator=(SingleKeyReader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1498
Definition: CtrlCHandler.h:13
bool hasReaders() const noexcept
Indicates whether or not data readers are online.
Definition: DataStorm.h:1999
SingleKeyReader< K, V, UT > makeSingleKeyReader(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Creates a key reader for the given topic and key.
Definition: DataStorm.h:845
SingleKeyWriter< K, V, UT > makeSingleKeyWriter(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Creates a key writer for the given topic and key.
Definition: DataStorm.h:1225
void waitForWriters(unsigned int count=1) const
Wait for given number of writers to be online.
Definition: DataStorm.h:1358
The key reader to read the data element associated with a given key.
Definition: DataStorm.h:722
SingleKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const Key &key, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Construct a new writer for the given key.
Definition: DataStorm.h:1698
const Key & getKey() const noexcept
The key of the sample.
Definition: DataStorm.h:1286
std::vector< Sample< Key, Value, UpdateTag > > getAllUnread() noexcept
Returns all the unread samples.
Definition: DataStorm.h:1389
void setReaderDefaultConfig(const ReaderConfig &config) noexcept
Set the default configuration used to construct readers.
Definition: DataStorm.h:1993
A sample provides information about a data element update.
Definition: DataStorm.h:42
std::string getSession() const noexcept
Get the session identifier of the session that received this sample.
Definition: DataStorm.h:1316
void waitForReaders(unsigned int count=1) const
Wait for given number of readers to be online.
Definition: DataStorm.h:1616
void waitForNoReaders() const
Wait for data readers to be offline.
Definition: DataStorm.h:2011
The Node class allows creating topic readers and writers.
Definition: Node.h:39
Key KeyType
The key type.
Definition: DataStorm.h:219
UpdateTag UpdateTagType
The type of the update tag.
Definition: DataStorm.h:60
The ReaderConfig class specifies configuration options specific to readers.
Definition: Types.h:122
The key reader to read the data element associated with a given set of keys.
Definition: DataStorm.h:780
void waitForNoReaders() const
Wait for readers to be offline.
Definition: DataStorm.h:1622
void setKeyFilter(const std::string &name, std::function< std::function< bool(const Key &)>(const Criteria &)> factory) noexcept
Set a key filter factory.
Definition: DataStorm.h:2063
void onConnectedReaders(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected readers and when a new reader conne...
Definition: DataStorm.h:1691
std::vector< std::string > getConnectedWriters() const noexcept
Get the connected writers.
Definition: DataStorm.h:1370
Definition: InternalI.h:18
void setWriterDefaultConfig(const WriterConfig &config) noexcept
Set the default configuration used to construct readers.
Definition: DataStorm.h:2017
static std::vector< unsigned char > encode(const std::shared_ptr< Ice::Communicator > &communicator, const T &value) noexcept
Encode the given value.
Definition: Types.h:326
Writer & operator=(Writer &&writer) noexcept
Move assignement operator.
Definition: DataStorm.h:1594
SingleKeyWriter & operator=(SingleKeyWriter &&writer) noexcept
Move assignement operator.
Definition: DataStorm.h:1717
void waitForNoWriters() const
Wait for data writers to be offline.
Definition: DataStorm.h:1987
The key writer to write data elements associated with a given set of keys.
Definition: DataStorm.h:1138
Filter structure to specify the filter name and criteria value.
Definition: DataStorm.h:696
void update(const Value &value) noexcept
Update the data element.
Definition: DataStorm.h:1731
Reader & operator=(Reader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1345
std::vector< Sample< Key, Value, UpdateTag > > getAll() noexcept
Get all the written sample kept in the writer history.
Definition: DataStorm.h:1658
~Writer()
Destruct the writer.
Definition: DataStorm.h:1601
Sample< Key, Value, UpdateTag > getNextUnread()
Returns the next unread sample.
Definition: DataStorm.h:1414