DataStorm  0.1
Data Distribution Service
DataStorm.h
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2018-present ZeroC, Inc. All rights reserved.
4 //
5 // **********************************************************************
6 
7 #pragma once
8 
9 #include <DataStorm/Config.h>
10 
11 #include <Ice/Ice.h>
12 
13 #include <DataStorm/Types.h>
14 #include <DataStorm/Sample.h>
15 #include <DataStorm/Node.h>
16 #include <DataStorm/InternalI.h>
17 #include <DataStorm/InternalT.h>
18 #include <DataStorm/CtrlCHandler.h>
19 
20 #include <regex>
21 
29 namespace DataStorm
30 {
31 
42 template<typename Key, typename Value, typename UpdateTag=std::string> class Sample
43 {
44 public:
45 
49  using KeyType = Key;
50 
54  using ValueType = Value;
55 
60  using UpdateTagType = UpdateTag;
61 
67  SampleEvent getEvent() const noexcept;
68 
74  const Key& getKey() const noexcept;
75 
85  const Value& getValue() const noexcept;
86 
94  UpdateTag getUpdateTag() const noexcept;
95 
104  std::chrono::time_point<std::chrono::system_clock> getTimeStamp() const noexcept;
105 
116  std::string getOrigin() const noexcept;
117 
126  std::string getSession() const noexcept;
127 
129  Sample(const std::shared_ptr<DataStormI::Sample>&) noexcept;
130 
131 private:
132 
133  std::shared_ptr<DataStormI::SampleT<Key, Value, UpdateTag>> _impl;
134 };
135 
143 std::ostream&
144 operator<<(std::ostream& os, SampleEvent sampleType)
145 {
146  switch(sampleType)
147  {
148  case SampleEvent::Add:
149  os << "Add";
150  break;
151  case SampleEvent::Update:
152  os << "Update";
153  break;
154  case SampleEvent::Remove:
155  os << "Remove";
156  break;
157  case SampleEvent::PartialUpdate:
158  os << "PartialUpdate";
159  break;
160  default:
161  os << static_cast<int>(sampleType);
162  break;
163  }
164  return os;
165 }
166 
174 std::ostream&
175 operator<<(std::ostream& os, const std::vector<SampleEvent>& types)
176 {
177  os << "[";
178  for(auto p = types.begin(); p != types.end(); ++p)
179  {
180  if(p != types.begin())
181  {
182  os << ',';
183  }
184  os << *p;
185  }
186  os << "]";
187  return os;
188 }
189 
198 template<typename K, typename V, typename U>
199 std::ostream&
200 operator<<(std::ostream& os, const Sample<K, V, U>& sample)
201 {
202  os << sample.getValue();
203  return os;
204 }
205 
211 template<typename Key, typename Value, typename UpdateTag>
212 class Reader
213 {
214 public:
215 
219  using KeyType = Key;
220 
224  using ValueType = Value;
225 
231  Reader(Reader&& reader) noexcept;
232 
237  ~Reader();
238 
244  Reader& operator=(Reader&& reader) noexcept;
245 
251  bool hasWriters() const noexcept;
252 
259  void waitForWriters(unsigned int count = 1) const;
260 
265  void waitForNoWriters() const;
266 
272  std::vector<std::string> getConnectedWriters() const noexcept;
273 
279  std::vector<Key> getConnectedKeys() const noexcept;
280 
286  std::vector<Sample<Key, Value, UpdateTag>> getAllUnread() noexcept;
287 
292  void waitForUnread(unsigned int count = 1) const;
293 
299  bool hasUnread() const noexcept;
300 
307  Sample<Key, Value, UpdateTag> getNextUnread();
308 
324  void onConnectedKeys(std::function<void(std::vector<Key>)> init,
325  std::function<void(CallbackReason, Key)> update) noexcept;
326 
339  void onConnectedWriters(std::function<void(std::vector<std::string>)> init,
340  std::function<void(CallbackReason, std::string)> update) noexcept;
341 
355  void onSamples(std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)> init,
356  std::function<void(Sample<Key, Value, UpdateTag>)> queue) noexcept;
357 
358 protected:
359 
361  Reader(const std::shared_ptr<DataStormI::DataReader>& impl) noexcept : _impl(impl)
362  {
363  }
364 
366  std::shared_ptr<DataStormI::DataReader> _impl;
367 };
368 
374 template<typename Key, typename Value, typename UpdateTag>
375 class Writer
376 {
377 public:
378 
382  using KeyType = Key;
383 
387  using ValueType = Value;
388 
394  Writer(Writer&& writer) noexcept;
395 
401  Writer& operator=(Writer&& writer) noexcept;
402 
407  ~Writer();
408 
414  bool hasReaders() const noexcept;
415 
422  void waitForReaders(unsigned int count = 1) const;
423 
428  void waitForNoReaders() const;
429 
435  std::vector<std::string> getConnectedReaders() const noexcept;
436 
442  std::vector<Key> getConnectedKeys() const noexcept;
443 
450  Sample<Key, Value, UpdateTag> getLast();
451 
457  std::vector<Sample<Key, Value, UpdateTag>> getAll() noexcept;
458 
474  void onConnectedKeys(std::function<void(std::vector<Key>)> init,
475  std::function<void(CallbackReason, Key)> update) noexcept;
476 
489  void onConnectedReaders(std::function<void(std::vector<std::string>)> init,
490  std::function<void(CallbackReason, std::string)> update) noexcept;
491 
492 protected:
493 
495  Writer(const std::shared_ptr<DataStormI::DataWriter>& impl) noexcept : _impl(impl)
496  {
497  }
498 
500  std::shared_ptr<DataStormI::DataWriter> _impl;
501 };
502 
511 template<typename Key, typename Value, typename UpdateTag=std::string>
512 class Topic
513 {
514 public:
515 
519  using KeyType = Key;
520 
524  using ValueType = Value;
525 
529  using UpdateTagType = UpdateTag;
530 
535 
540 
545 
552  Topic(const Node& node, const std::string& name) noexcept;
553 
559  Topic(Topic&& topic) noexcept;
560 
564  ~Topic();
565 
571  Topic& operator=(Topic&& topic) noexcept;
572 
578  bool hasWriters() const noexcept;
579 
586  void waitForWriters(unsigned int count = 1) const;
587 
592  void waitForNoWriters() const;
593 
599  void setWriterDefaultConfig(const WriterConfig& config) noexcept;
600 
606  bool hasReaders() const noexcept;
607 
614  void waitForReaders(unsigned int count = 1) const;
615 
620  void waitForNoReaders() const;
621 
627  void setReaderDefaultConfig(const ReaderConfig& config) noexcept;
628 
638  template<typename UpdateValue>
639  void setUpdater(const UpdateTag& tag, std::function<void (Value&, UpdateValue)> updater) noexcept;
640 
649  template<typename Criteria>
650  void setKeyFilter(const std::string& name,
651  std::function<std::function<bool (const Key&)> (const Criteria&)> factory) noexcept;
652 
661  template<typename Criteria>
662  void setSampleFilter(const std::string& name,
663  std::function<std::function<bool (const SampleType&)> (const Criteria&)> factory) noexcept;
664 
665 private:
666 
667  std::shared_ptr<DataStormI::TopicReader> getReader() const noexcept;
668  std::shared_ptr<DataStormI::TopicWriter> getWriter() const noexcept;
669  std::shared_ptr<Ice::Communicator> getCommunicator() const noexcept;
670 
671  template<typename, typename, typename> friend class SingleKeyWriter;
672  template<typename, typename, typename> friend class MultiKeyWriter;
673  template<typename, typename, typename> friend class SingleKeyReader;
674  template<typename, typename, typename> friend class MultiKeyReader;
675  template<typename, typename, typename> friend class FilteredKeyReader;
676 
677  const std::string _name;
678  const std::shared_ptr<DataStormI::TopicFactory> _topicFactory;
679  const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
680  const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
681 
682  const std::shared_ptr<DataStormI::FilterManagerT<DataStormI::KeyT<Key>>> _keyFilterFactories;
683  const std::shared_ptr<DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>> _sampleFilterFactories;
684 
685  mutable std::mutex _mutex;
686  mutable std::shared_ptr<DataStormI::TopicReader> _reader;
687  mutable std::shared_ptr<DataStormI::TopicWriter> _writer;
688  mutable std::map<std::shared_ptr<DataStormI::Tag>, DataStormI::Topic::Updater> _updaters;
689 };
690 
696 template<typename T> struct Filter
697 {
704  template<typename TT>
705  Filter(const std::string& name, TT&& criteria) noexcept : name(name), criteria(std::forward<TT>(criteria))
706  {
707  }
708 
710  std::string name;
711 
714 };
715 
721 template<typename Key, typename Value, typename UpdateTag=std::string>
722 class SingleKeyReader : public Reader<Key, Value, UpdateTag>
723 {
724 public:
725 
736  const Key& key,
737  const std::string& name = std::string(),
738  const ReaderConfig& config = ReaderConfig()) noexcept;
739 
752  template<typename SampleFilterCriteria>
754  const Key& key,
755  const Filter<SampleFilterCriteria>& sampleFilter,
756  const std::string& name = std::string(),
757  const ReaderConfig& config = ReaderConfig()) noexcept;
758 
764  SingleKeyReader(SingleKeyReader&& reader) noexcept;
765 
771  SingleKeyReader& operator=(SingleKeyReader&& reader) noexcept;
772 };
773 
779 template<typename Key, typename Value, typename UpdateTag=std::string>
780 class MultiKeyReader : public Reader<Key, Value, UpdateTag>
781 {
782 public:
783 
795  const std::vector<Key>& keys,
796  const std::string& name = std::string(),
797  const ReaderConfig& config = ReaderConfig()) noexcept;
798 
812  template<typename SampleFilterCriteria>
814  const std::vector<Key>& keys,
815  const Filter<SampleFilterCriteria>& sampleFilter,
816  const std::string& name = std::string(),
817  const ReaderConfig& config = ReaderConfig()) noexcept;
818 
824  MultiKeyReader(MultiKeyReader&& reader) noexcept;
825 
831  MultiKeyReader& operator=(MultiKeyReader&& reader) noexcept;
832 };
833 
843 template<typename K, typename V, typename UT>
846  const typename Topic<K, V, UT>::KeyType& key,
847  const std::string& name = std::string(),
848  const ReaderConfig& config = ReaderConfig()) noexcept
849 {
850  return SingleKeyReader<K, V, UT>(topic, key, name, config);
851 }
852 
863 template<typename SFC, typename K, typename V, typename UT>
864 SingleKeyReader<K, V, UT>
866  const typename Topic<K, V, UT>::KeyType& key,
867  const Filter<SFC>& sampleFilter,
868  const std::string& name = std::string(),
869  const ReaderConfig& config = ReaderConfig()) noexcept
870 {
871  return SingleKeyReader<K, V, UT>(topic, key, sampleFilter, name, config);
872 }
873 
885 template<typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
887  const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
888  const std::string& name = std::string(),
889  const ReaderConfig& config = ReaderConfig()) noexcept
890 {
891  return MultiKeyReader<K, V, UT>(topic, keys, name, config);
892 }
893 
906 template<typename SFC, typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
908  const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
909  const Filter<SFC>& sampleFilter,
910  const std::string& name = std::string(),
911  const ReaderConfig& config = ReaderConfig()) noexcept
912 {
913  return MultiKeyReader<K, V, UT>(topic, keys, sampleFilter, name, config);
914 }
915 
926 template<typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
928  const std::string& name = std::string(),
929  const ReaderConfig& config = ReaderConfig()) noexcept
930 {
931  return MultiKeyReader<K, V, UT>(topic, {}, name, config);
932 }
933 
945 template<typename SFC, typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
947  const Filter<SFC>& sampleFilter,
948  const std::string& name = std::string(),
949  const ReaderConfig& config = ReaderConfig()) noexcept
950 {
951  return MultiKeyReader<K, V, UT>(topic, {}, sampleFilter, name, config);
952 }
953 
959 template<typename Key, typename Value, typename UpdateTag=std::string>
960 class FilteredKeyReader : public Reader<Key, Value, UpdateTag>
961 {
962 public:
963 
976  template<typename KeyFilterCriteria>
978  const Filter<KeyFilterCriteria>& keyFilter,
979  const std::string& name = std::string(),
980  const ReaderConfig& config = ReaderConfig());
981 
996  template<typename KeyFilterCriteria, typename SampleFilterCriteria>
998  const Filter<KeyFilterCriteria>& keyFilter,
999  const Filter<SampleFilterCriteria>& sampleFilter,
1000  const std::string& name = std::string(),
1001  const ReaderConfig& config = ReaderConfig());
1002 
1008  FilteredKeyReader(FilteredKeyReader&& reader) noexcept;
1009 
1015  FilteredKeyReader& operator=(FilteredKeyReader&& reader) noexcept;
1016 };
1017 
1027 template<typename KFC, typename K, typename V, typename UT> FilteredKeyReader<K, V, UT>
1029  const Filter<KFC>& filter,
1030  const std::string& name = std::string(),
1031  const ReaderConfig& config = ReaderConfig())
1032 {
1033  return FilteredKeyReader<K, V, UT>(topic, filter, name, config);
1034 }
1035 
1046 template<typename KFC, typename SFC, typename K, typename V, typename UT> FilteredKeyReader<K, V, UT>
1048  const Filter<KFC>& keyFilter,
1049  const Filter<SFC>& sampleFilter,
1050  const std::string& name = std::string(),
1051  const ReaderConfig& config = ReaderConfig())
1052 {
1053  return FilteredKeyReader<K, V, UT>(topic, keyFilter, sampleFilter, name, config);
1054 }
1055 
1061 template<typename Key, typename Value, typename UpdateTag=std::string>
1062 class SingleKeyWriter : public Writer<Key, Value, UpdateTag>
1063 {
1064 public:
1065 
1076  const Key& key,
1077  const std::string& name = std::string(),
1078  const WriterConfig& config = WriterConfig()) noexcept;
1079 
1085  SingleKeyWriter(SingleKeyWriter&& writer) noexcept;
1086 
1092  SingleKeyWriter& operator=(SingleKeyWriter&& writer) noexcept;
1093 
1100  void add(const Value& value) noexcept;
1101 
1108  void update(const Value& value) noexcept;
1109 
1120  template<typename UpdateValue> std::function<void(const UpdateValue&)> partialUpdate(const UpdateTag& tag) noexcept;
1121 
1125  void remove() noexcept;
1126 
1127 private:
1128 
1129  const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
1130 };
1131 
1137 template<typename Key, typename Value, typename UpdateTag=std::string>
1138 class MultiKeyWriter : public Writer<Key, Value, UpdateTag>
1139 {
1140 public:
1141 
1153  const std::vector<Key>& keys,
1154  const std::string& name = std::string(),
1155  const WriterConfig& config = WriterConfig()) noexcept;
1156 
1162  MultiKeyWriter(MultiKeyWriter&& writer) noexcept;
1163 
1169  MultiKeyWriter& operator=(MultiKeyWriter&& writer) noexcept;
1170 
1178  void add(const Key& key, const Value& value) noexcept;
1179 
1187  void update(const Key& key, const Value& value) noexcept;
1188 
1199  template<typename UpdateValue> std::function<void(const Key&, const UpdateValue&)>
1200  partialUpdate(const UpdateTag& tag) noexcept;
1201 
1207  void remove(const Key& key) noexcept;
1208 
1209 private:
1210 
1211  const std::shared_ptr<DataStormI::KeyFactoryT<Key>> _keyFactory;
1212  const std::shared_ptr<DataStormI::TagFactoryT<UpdateTag>> _tagFactory;
1213 };
1214 
1224 template<typename K, typename V, typename UT> SingleKeyWriter<K, V, UT>
1226  const typename Topic<K, V, UT>::KeyType& key,
1227  const std::string& name = std::string(),
1228  const WriterConfig& config = WriterConfig()) noexcept
1229 {
1230  return SingleKeyWriter<K, V, UT>(topic, key, name, config);
1231 }
1232 
1242 template<typename K, typename V, typename UT> MultiKeyWriter<K, V, UT>
1244  const std::vector<typename Topic<K, V, UT>::KeyType>& keys,
1245  const std::string& name = std::string(),
1246  const WriterConfig& config = WriterConfig()) noexcept
1247 {
1248  return MultiKeyWriter<K, V, UT>(topic, keys, name, config);
1249 }
1250 
1259 template<typename K, typename V, typename UT> MultiKeyWriter<K, V, UT>
1261  const std::string& name = std::string(),
1262  const WriterConfig& config = WriterConfig()) noexcept
1263 {
1264  return MultiKeyWriter<K, V, UT>(topic, {}, name, config);
1265 }
1266 
1267 }
1268 
1269 //
1270 // Public template based API implementation
1271 //
1272 
1273 namespace DataStorm
1274 {
1275 
1276 //
1277 // Sample template implementation
1278 //
1279 template<typename Key, typename Value, typename UpdateTag> SampleEvent
1281 {
1282  return _impl->event;
1283 }
1284 
1285 template<typename Key, typename Value, typename UpdateTag> const Key&
1287 {
1288  return _impl->getKey();
1289 }
1290 
1291 template<typename Key, typename Value, typename UpdateTag> const Value&
1293 {
1294  return _impl->getValue();
1295 }
1296 
1297 template<typename Key, typename Value, typename UpdateTag> UpdateTag
1299 {
1300  return _impl->getTag();
1301 }
1302 
1303 template<typename Key, typename Value, typename UpdateTag> std::chrono::time_point<std::chrono::system_clock>
1305 {
1306  return _impl->timestamp;
1307 }
1308 
1309 template<typename Key, typename Value, typename UpdateTag> std::string
1311 {
1312  return _impl->origin;
1313 }
1314 
1315 template<typename Key, typename Value, typename UpdateTag> std::string
1317 {
1318  return _impl->session;
1319 }
1320 
1321 template<typename Key, typename Value, typename UpdateTag> Sample<Key, Value, UpdateTag>::Sample(
1322  const std::shared_ptr<DataStormI::Sample>& impl) noexcept :
1323  _impl(std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(impl))
1324 {
1325 }
1326 
1327 //
1328 // Reader template implementation
1329 //
1330 template<typename Key, typename Value, typename UpdateTag>
1331 Reader<Key, Value, UpdateTag>::Reader(Reader<Key, Value, UpdateTag>&& reader) noexcept : _impl(std::move(reader._impl))
1332 {
1333 }
1334 
1335 template<typename Key, typename Value, typename UpdateTag>
1337 {
1338  if(_impl)
1339  {
1340  _impl->destroy();
1341  }
1342 }
1343 
1344 template<typename Key, typename Value, typename UpdateTag> Reader<Key, Value, UpdateTag>&
1346 {
1347  _impl = std::move(reader._impl);
1348  return *this;
1349 }
1350 
1351 template<typename Key, typename Value, typename UpdateTag> bool
1353 {
1354  return _impl->hasWriters();
1355 }
1356 
1357 template<typename Key, typename Value, typename UpdateTag> void
1359 {
1360  _impl->waitForWriters(count);
1361 }
1362 
1363 template<typename Key, typename Value, typename UpdateTag> void
1365 {
1366  _impl->waitForWriters(-1);
1367 }
1368 
1369 template<typename Key, typename Value, typename UpdateTag> std::vector<std::string>
1371 {
1372  return _impl->getConnectedElements();
1373 }
1374 
1375 template<typename Key, typename Value, typename UpdateTag> std::vector<Key>
1377 {
1378  std::vector<Key> keys;
1379  auto connectedKeys = _impl->getConnectedKeys();
1380  keys.reserve(connectedKeys.size());
1381  for(const auto& k : connectedKeys)
1382  {
1383  keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1384  }
1385  return keys;
1386 }
1387 
1388 template<typename Key, typename Value, typename UpdateTag> std::vector<Sample<Key, Value, UpdateTag>>
1390 {
1391  auto unread = _impl->getAllUnread();
1392  std::vector<Sample<Key, Value, UpdateTag>> samples;
1393  samples.reserve(unread.size());
1394  for(auto sample : unread)
1395  {
1396  samples.emplace_back(sample);
1397  }
1398  return samples;
1399 }
1400 
1401 template<typename Key, typename Value, typename UpdateTag> void
1403 {
1404  _impl->waitForUnread(count);
1405 }
1406 
1407 template<typename Key, typename Value, typename UpdateTag> bool
1409 {
1410  return _impl->hasUnread();
1411 }
1412 
1413 template<typename Key, typename Value, typename UpdateTag> Sample<Key, Value, UpdateTag>
1415 {
1416  return Sample<Key, Value, UpdateTag>(_impl->getNextUnread());
1417 }
1418 
1419 template<typename Key, typename Value, typename UpdateTag> void
1420 Reader<Key, Value, UpdateTag>::onConnectedKeys(std::function<void(std::vector<Key>)> init,
1421  std::function<void(CallbackReason, Key)> update) noexcept
1422 {
1423  _impl->onConnectedKeys(init ? [init](std::vector<std::shared_ptr<DataStormI::Key>> connectedKeys)
1424  {
1425  std::vector<Key> keys;
1426  keys.reserve(connectedKeys.size());
1427  for(const auto& k : connectedKeys)
1428  {
1429  keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1430  }
1431  init(move(keys));
1432  } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>(),
1433  update ? [update](CallbackReason action, std::shared_ptr<DataStormI::Key> key)
1434  {
1435  update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
1436  } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>());
1437 }
1438 
1439 template<typename Key, typename Value, typename UpdateTag> void
1440 Reader<Key, Value, UpdateTag>::onConnectedWriters(std::function<void(std::vector<std::string>)> init,
1441  std::function<void(CallbackReason, std::string)> update) noexcept
1442 {
1443  _impl->onConnectedElements(init, update);
1444 }
1445 
1446 template<typename Key, typename Value, typename UpdateTag> void
1448  std::function<void(Sample<Key, Value, UpdateTag>)> update) noexcept
1449 {
1450  auto communicator = _impl->getCommunicator();
1451  _impl->onSamples(init ? [communicator, init](const std::vector<std::shared_ptr<DataStormI::Sample>>& samplesI)
1452  {
1453  std::vector<Sample<Key, Value, UpdateTag>> samples;
1454  samples.reserve(samplesI.size());
1455  for(const auto& s : samplesI)
1456  {
1457  samples.emplace_back(s);
1458  }
1459  init(move(samples));
1460  } : std::function<void(const std::vector<std::shared_ptr<DataStormI::Sample>>&)>(),
1461  update ? [communicator, update](const std::shared_ptr<DataStormI::Sample>& sampleI)
1462  {
1463  update(sampleI);
1464  } : std::function<void(const std::shared_ptr<DataStormI::Sample>&)>());
1465 }
1466 
1467 template<typename Key, typename Value, typename UpdateTag>
1469  const Key& key,
1470  const std::string& name,
1471  const ReaderConfig& config) noexcept :
1472  Reader<Key, Value, UpdateTag>(topic.getReader()->create({ topic._keyFactory->create(key) }, name, config))
1473 {
1474 }
1475 
1476 template<typename Key, typename Value, typename UpdateTag> template<typename SFC>
1478  const Key& key,
1479  const Filter<SFC>& sampleFilter,
1480  const std::string& name,
1481  const ReaderConfig& config) noexcept :
1482  Reader<Key, Value, UpdateTag>(topic.getReader()->create({ topic._keyFactory->create(key) },
1483  name,
1484  config,
1485  sampleFilter.name,
1486  DataStormI::EncoderT<SFC>::encode(topic.getCommunicator(),
1487  sampleFilter.criteria)))
1488 {
1489 }
1490 
1491 template<typename Key, typename Value, typename UpdateTag>
1493  Reader<Key, Value, UpdateTag>(std::move(reader))
1494 {
1495 }
1496 
1497 template<typename Key, typename Value, typename UpdateTag> SingleKeyReader<Key, Value, UpdateTag>&
1499 {
1500  Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
1501  return *this;
1502 }
1503 
1504 template<typename Key, typename Value, typename UpdateTag>
1506  const std::vector<Key>& keys,
1507  const std::string& name,
1508  const ReaderConfig& config) noexcept :
1509  Reader<Key, Value, UpdateTag>(topic.getReader()->create(topic._keyFactory->create(keys),
1510  name,
1511  config))
1512 {
1513 }
1514 
1515 template<typename Key, typename Value, typename UpdateTag> template<typename SFC>
1517  const std::vector<Key>& keys,
1518  const Filter<SFC>& sampleFilter,
1519  const std::string& name,
1520  const ReaderConfig& config) noexcept :
1521  Reader<Key, Value, UpdateTag>(topic.getReader()->create(topic._keyFactory->create(keys),
1522  name,
1523  config,
1524  sampleFilter.name,
1525  Encoder<SFC>::encode(topic.getCommunicator(),
1526  sampleFilter.criteria)))
1527 {
1528 }
1529 
1530 template<typename Key, typename Value, typename UpdateTag>
1532  Reader<Key, Value, UpdateTag>(std::move(reader))
1533 {
1534 }
1535 
1536 template<typename Key, typename Value, typename UpdateTag>
1539 {
1540  Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
1541  return *this;
1542 }
1543 
1544 template<typename Key, typename Value, typename UpdateTag> template<typename KFC>
1546  const Filter<KFC>& filter,
1547  const std::string& name,
1548  const ReaderConfig& config) :
1549  Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(topic._keyFilterFactories->create(filter.name,
1550  filter.criteria),
1551  name,
1552  config))
1553 {
1554 }
1555 
1556 template<typename Key, typename Value, typename UpdateTag> template<typename KFC, typename SFC>
1558  const Filter<KFC>& keyFilter,
1559  const Filter<SFC>& sampleFilter,
1560  const std::string& name,
1561  const ReaderConfig& config) :
1562  Reader<Key, Value, UpdateTag>(topic.getReader()->createFiltered(topic._keyFilterFactories->create(keyFilter.name,
1563  keyFilter.criteria),
1564  name,
1565  config,
1566  sampleFilter.name,
1567  Encoder<SFC>::encode(topic.getCommunicator(),
1568  sampleFilter.criteria)))
1569 {
1570 }
1571 
1572 template<typename Key, typename Value, typename UpdateTag>
1574  Reader<Key, Value, UpdateTag>(std::move(reader))
1575 {
1576 }
1577 
1578 template<typename Key, typename Value, typename UpdateTag> FilteredKeyReader<Key, Value, UpdateTag>&
1580 {
1581  Reader<Key, Value, UpdateTag>::operator=(std::move(reader));
1582  return *this;
1583 }
1584 
1585 //
1586 // Writer template implementation
1587 //
1588 template<typename Key, typename Value, typename UpdateTag>
1589 Writer<Key, Value, UpdateTag>::Writer(Writer&& writer) noexcept : _impl(std::move(writer._impl))
1590 {
1591 }
1592 
1593 template<typename Key, typename Value, typename UpdateTag> Writer<Key, Value, UpdateTag>&
1595 {
1596  _impl = std::move(writer._impl);
1597  return *this;
1598 }
1599 
1600 template<typename Key, typename Value, typename UpdateTag>
1602 {
1603  if(_impl)
1604  {
1605  _impl->destroy();
1606  }
1607 }
1608 
1609 template<typename Key, typename Value, typename UpdateTag> bool
1611 {
1612  return _impl->hasReaders();
1613 }
1614 
1615 template<typename Key, typename Value, typename UpdateTag> void
1617 {
1618  return _impl->waitForReaders(count);
1619 }
1620 
1621 template<typename Key, typename Value, typename UpdateTag> void
1623 {
1624  return _impl->waitForReaders(-1);
1625 }
1626 
1627 template<typename Key, typename Value, typename UpdateTag> std::vector<std::string>
1629 {
1630  return _impl->getConnectedElements();
1631 }
1632 
1633 template<typename Key, typename Value, typename UpdateTag> std::vector<Key>
1635 {
1636  std::vector<Key> keys;
1637  auto connectedKeys = _impl->getConnectedKeys();
1638  keys.reserve(connectedKeys.size());
1639  for(const auto& k : connectedKeys)
1640  {
1641  keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1642  }
1643  return keys;
1644 }
1645 
1646 template<typename Key, typename Value, typename UpdateTag> Sample<Key, Value, UpdateTag>
1648 {
1649  auto sample = _impl->getLast();
1650  if(!sample)
1651  {
1652  throw std::logic_error("no sample");
1653  }
1654  return Sample<Key, Value, UpdateTag>(sample);
1655 }
1656 
1657 template<typename Key, typename Value, typename UpdateTag> std::vector<Sample<Key, Value, UpdateTag>>
1659 {
1660  auto all = _impl->getAll();
1661  std::vector<Sample<Key, Value, UpdateTag>> samples;
1662  samples.reserve(all.size());
1663  for(auto sample : all)
1664  {
1665  samples.emplace_back(sample);
1666  }
1667  return samples;
1668 }
1669 
1670 template<typename Key, typename Value, typename UpdateTag> void
1671 Writer<Key, Value, UpdateTag>::onConnectedKeys(std::function<void(std::vector<Key>)> init,
1672  std::function<void(CallbackReason, Key)> update) noexcept
1673 {
1674  _impl->onConnectedKeys(init ? [init](std::vector<std::shared_ptr<DataStormI::Key>> connectedKeys)
1675  {
1676  std::vector<Key> keys;
1677  keys.reserve(connectedKeys.size());
1678  for(const auto& k : connectedKeys)
1679  {
1680  keys.push_back(std::static_pointer_cast<DataStormI::KeyT<Key>>(k)->get());
1681  }
1682  init(move(keys));
1683  } : std::function<void(std::vector<std::shared_ptr<DataStormI::Key>>)>(),
1684  update ? [update](CallbackReason action, std::shared_ptr<DataStormI::Key> key)
1685  {
1686  update(action, std::static_pointer_cast<DataStormI::KeyT<Key>>(key)->get());
1687  } : std::function<void(CallbackReason, std::shared_ptr<DataStormI::Key>)>());
1688 }
1689 
1690 template<typename Key, typename Value, typename UpdateTag> void
1691 Writer<Key, Value, UpdateTag>::onConnectedReaders(std::function<void(std::vector<std::string>)> init,
1692  std::function<void(CallbackReason, std::string)> update) noexcept
1693 {
1694  _impl->onConnectedElements(init, update);
1695 }
1696 
1697 template<typename Key, typename Value, typename UpdateTag>
1699  const Key& key,
1700  const std::string& name,
1701  const WriterConfig& config) noexcept :
1702  Writer<Key, Value, UpdateTag>(topic.getWriter()->create({ topic._keyFactory->create(key) },
1703  name,
1704  config)),
1705  _tagFactory(topic._tagFactory)
1706 {
1707 }
1708 
1709 template<typename Key, typename Value, typename UpdateTag>
1711  Writer<Key, Value, UpdateTag>(std::move(writer)),
1712  _tagFactory(std::move(writer._tagFactory))
1713 {
1714 }
1715 
1716 template<typename Key, typename Value, typename UpdateTag> SingleKeyWriter<Key, Value, UpdateTag>&
1718 {
1719  Writer<Key, Value, UpdateTag>::operator=(std::move(writer));
1720  return *this;
1721 }
1722 
1723 template<typename Key, typename Value, typename UpdateTag> void
1725 {
1726  Writer<Key, Value, UpdateTag>::_impl->publish(nullptr,
1727  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
1728 }
1729 
1730 template<typename Key, typename Value, typename UpdateTag> void
1732 {
1733  Writer<Key, Value, UpdateTag>::_impl->publish(nullptr,
1734  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
1735 }
1736 
1737 template<typename Key, typename Value, typename UpdateTag>
1738 template<typename UpdateValue> std::function<void(const UpdateValue&)>
1740 {
1742  auto updateTag = _tagFactory->create(tag);
1743  return [impl, updateTag](const UpdateValue& value) {
1744  auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
1745  impl->publish(nullptr, std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1746  };
1747 }
1748 
1749 template<typename Key, typename Value, typename UpdateTag> void
1751 {
1752  Writer<Key, Value, UpdateTag>::_impl->publish(nullptr,
1753  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
1754 }
1755 
1756 template<typename Key, typename Value, typename UpdateTag>
1758  const std::vector<Key>& keys,
1759  const std::string& name,
1760  const WriterConfig& config) noexcept :
1761  Writer<Key, Value, UpdateTag>(topic.getWriter()->create(topic._keyFactory->create(keys),
1762  name,
1763  config)),
1764  _keyFactory(topic._keyFactory),
1765  _tagFactory(topic._tagFactory)
1766 {
1767 }
1768 
1769 template<typename Key, typename Value, typename UpdateTag>
1771  Writer<Key, Value, UpdateTag>(std::move(writer)),
1772  _keyFactory(std::move(writer._keyFactory)),
1773  _tagFactory(std::move(writer._tagFactory))
1774 {
1775 }
1776 
1777 template<typename Key, typename Value, typename UpdateTag> MultiKeyWriter<Key, Value, UpdateTag>&
1779 {
1780  Writer<Key, Value, UpdateTag>::operator=(std::move(writer));
1781  return *this;
1782 }
1783 
1784 template<typename Key, typename Value, typename UpdateTag> void
1785 MultiKeyWriter<Key, Value, UpdateTag>::add(const Key& key, const Value& value) noexcept
1786 {
1787  Writer<Key, Value, UpdateTag>::_impl->publish(_keyFactory->create(key),
1788  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Add, value));
1789 }
1790 
1791 template<typename Key, typename Value, typename UpdateTag> void
1792 MultiKeyWriter<Key, Value, UpdateTag>::update(const Key& key, const Value& value) noexcept
1793 {
1794  Writer<Key, Value, UpdateTag>::_impl->publish(_keyFactory->create(key),
1795  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Update, value));
1796 }
1797 
1798 template<typename Key, typename Value, typename UpdateTag>
1799 template<typename UpdateValue> std::function<void(const Key&, const UpdateValue&)>
1801 {
1803  auto updateTag = _tagFactory->create(tag);
1804  auto keyFactory = _keyFactory;
1805  return [impl, updateTag, keyFactory](const Key& key, const UpdateValue& value) {
1806  auto encoded = Encoder<UpdateValue>::encode(impl->getCommunicator(), value);
1807  impl->publish(keyFactory->create(key),
1808  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(encoded, updateTag));
1809  };
1810 }
1811 
1812 template<typename Key, typename Value, typename UpdateTag> void
1814 {
1815  Writer<Key, Value, UpdateTag>::_impl->publish(_keyFactory->create(key),
1816  std::make_shared<DataStormI::SampleT<Key, Value, UpdateTag>>(SampleEvent::Remove));
1817 }
1818 
1819 #if !defined(__clang__) && defined(__GNUC__) && ((__GNUC__* 100) + __GNUC_MINOR__) < 490
1820 
1821 #include <regex.h>
1822 
1824 class RegExp
1825 {
1826 public:
1827 
1828  RegExp(const std::string& criteria)
1829  {
1830  if(regcomp(&_expr, criteria.c_str(), REG_EXTENDED) != 0)
1831  {
1832  throw std::invalid_argument(criteria);
1833  }
1834  }
1835 
1836  ~RegExp()
1837  {
1838  regfree(&_expr);
1839  }
1840 
1841  bool match(const std::string& value) const
1842  {
1843  return regexec(&_expr, value.c_str(), 0, 0, 0) == 0;
1844  }
1845 
1846 private:
1847 
1848  regex_t _expr;
1849 };
1850 
1851 #endif
1852 
1854 template<typename Value> std::function<std::function<bool (const Value&)> (const std::string&)>
1855 makeRegexFilter() noexcept
1856 {
1857  return [](const std::string& criteria)
1858  {
1859 #if !defined(__clang__) && defined(__GNUC__) && ((__GNUC__* 100) + __GNUC_MINOR__) < 490
1860  auto expr = std::make_shared<RegExp>(criteria);
1861  return [expr](const Value& value)
1862  {
1863  std::ostringstream os;
1864  os << value;
1865  return expr->match(os.str());
1866  };
1867 #else
1868  std::regex expr(criteria);
1869  return [expr](const Value& value)
1870  {
1871  std::ostringstream os;
1872  os << value;
1873  return std::regex_match(os.str(), expr);
1874  };
1875 #endif
1876  };
1877 }
1878 
1880 template<typename Key, typename Value, typename UpdateTag>
1881 std::function<std::function<bool (const Sample<Key, Value, UpdateTag>&)> (const std::vector<SampleEvent>&)>
1882 makeSampleEventFilter(const Topic<Key, Value, UpdateTag>& topic) noexcept
1883 {
1884  return [](const std::vector<SampleEvent>& criteria)
1885  {
1886  return [criteria](const Sample<Key, Value, UpdateTag>& sample)
1887  {
1888  return std::find(criteria.begin(), criteria.end(), sample.getEvent()) != criteria.end();
1889  };
1890  };
1891 }
1892 
1894 template<typename T, typename V, typename Enabler=void>
1895 struct RegexFilter
1896 {
1897  template<typename F> static void
1898  add(F factory)
1899  {
1900  }
1901 };
1902 
1904 template<typename T, typename V>
1905 struct RegexFilter<T, V, typename std::enable_if<DataStormI::is_streamable<V>::value>::type>
1906 {
1907  template<typename F> static void
1908  add(F factory)
1909  {
1910  factory->set("_regex", makeRegexFilter<T>()); // Only set the _regex filter if the value is streamable
1911  }
1912 };
1913 
1914 //
1915 // Topic template implementation
1916 //
1917 template<typename Key, typename Value, typename UpdateTag>
1918 Topic<Key, Value, UpdateTag>::Topic(const Node& node, const std::string& name) noexcept :
1919  _name(name),
1920  _topicFactory(node._factory),
1921  _keyFactory(DataStormI::KeyFactoryT<Key>::createFactory()),
1922  _tagFactory(DataStormI::TagFactoryT<UpdateTag>::createFactory()),
1923  _keyFilterFactories(DataStormI::FilterManagerT<DataStormI::KeyT<Key>>::create()),
1924  _sampleFilterFactories(DataStormI::FilterManagerT<DataStormI::SampleT<Key, Value, UpdateTag>>::create())
1925 {
1926  RegexFilter<Key, Key>::add(_keyFilterFactories);
1927  RegexFilter<Sample<Key, Value, UpdateTag>, Value>::add(_sampleFilterFactories);
1928  _sampleFilterFactories->set("_event", makeSampleEventFilter(*this));
1929 }
1930 
1931 template<typename Key, typename Value, typename UpdateTag>
1933  _name(std::move(topic._name)),
1934  _topicFactory(std::move(topic._topicFactory)),
1935  _keyFactory(std::move(topic._keyFactory)),
1936  _tagFactory(std::move(topic._tagFactory)),
1937  _keyFilterFactories(std::move(topic._keyFilterFactories)),
1938  _sampleFilterFactories(std::move(topic._sampleFilterFactories)),
1939  _reader(std::move(topic._reader)),
1940  _writer(std::move(topic._writer)),
1941  _updaters(std::move(topic._updaters))
1942 {
1943 }
1944 
1945 template<typename Key, typename Value, typename UpdateTag>
1947 {
1948  std::lock_guard<std::mutex> lock(_mutex);
1949  if(_reader)
1950  {
1951  _reader->destroy();
1952  }
1953  if(_writer)
1954  {
1955  _writer->destroy();
1956  }
1957 }
1958 
1959 template<typename Key, typename Value, typename UpdateTag> Topic<Key, Value, UpdateTag>&
1961 {
1962  _name = std::move(topic._name);
1963  _topicFactory = std::move(topic._topicFactory);
1964  _keyFactory = std::move(topic._keyFactory);
1965  _tagFactory = std::move(topic._tagFactory);
1966  _keyFilterFactories = std::move(topic._keyFilterFactories);
1967  _sampleFilterFactories = std::move(topic._sampleFilterFactories);
1968  _reader = std::move(topic._reader);
1969  _writer = std::move(topic._writer);
1970  _updaters = std::move(topic._updaters);
1971  return *this;
1972 }
1973 
1974 template<typename Key, typename Value, typename UpdateTag> bool
1976 {
1977  return getReader()->hasWriters();
1978 }
1979 
1980 template<typename Key, typename Value, typename UpdateTag> void
1982 {
1983  getReader()->waitForWriters(count);
1984 }
1985 
1986 template<typename Key, typename Value, typename UpdateTag> void
1988 {
1989  getReader()->waitForWriters(-1);
1990 }
1991 
1992 template<typename Key, typename Value, typename UpdateTag> void
1994 {
1995  getReader()->setDefaultConfig(config);
1996 }
1997 
1998 template<typename Key, typename Value, typename UpdateTag> bool
2000 {
2001  return getWriter()->hasReaders();
2002 }
2003 
2004 template<typename Key, typename Value, typename UpdateTag> void
2006 {
2007  getWriter()->waitForReaders(count);
2008 }
2009 
2010 template<typename Key, typename Value, typename UpdateTag> void
2012 {
2013  getWriter()->waitForReaders(-1);
2014 }
2015 
2016 template<typename Key, typename Value, typename UpdateTag> void
2018 {
2019  getWriter()->setDefaultConfig(config);
2020 }
2021 
2022 template<typename Key, typename Value, typename UpdateTag> template<typename UpdateValue> void
2023 Topic<Key, Value, UpdateTag>::setUpdater(const UpdateTag& tag, std::function<void (Value&, UpdateValue)> updater) noexcept
2024 {
2025  std::lock_guard<std::mutex> lock(_mutex);
2026  auto tagI = _tagFactory->create(std::move(tag));
2027  auto updaterImpl = updater ? [updater](const std::shared_ptr<DataStormI::Sample>& previous,
2028  const std::shared_ptr<DataStormI::Sample>& next,
2029  const std::shared_ptr<Ice::Communicator>& communicator)
2030  {
2031  Value value;
2032  if(previous)
2033  {
2034  value = Cloner<Value>::clone(
2035  std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(previous)->getValue());
2036  }
2037  updater(value, Decoder<UpdateValue>::decode(communicator, next->getEncodedValue()));
2038  std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(next)->setValue(std::move(value));
2039  } : std::function<void(const std::shared_ptr<DataStormI::Sample>&,
2040  const std::shared_ptr<DataStormI::Sample>&,
2041  const std::shared_ptr<Ice::Communicator>&)>();
2042 
2043  if(_reader && !_writer)
2044  {
2045  _reader->setUpdater(tagI, updaterImpl);
2046  }
2047  else if(_writer && !_reader)
2048  {
2049  _writer->setUpdater(tagI, updaterImpl);
2050  }
2051  else if(_reader && _writer)
2052  {
2053  _reader->setUpdater(tagI, updaterImpl);
2054  _writer->setUpdater(tagI, updaterImpl);
2055  }
2056  else
2057  {
2058  _updaters[tagI] = updaterImpl;
2059  }
2060 }
2061 
2062 template<typename Key, typename Value, typename UpdateTag> template<typename Criteria> void
2064  std::function<std::function<bool (const Key&)>(const Criteria&)> factory) noexcept
2065 {
2066  std::lock_guard<std::mutex> lock(_mutex);
2067  _keyFilterFactories->set(name, factory);
2068 }
2069 
2070 template<typename Key, typename Value, typename UpdateTag> template<typename Criteria> void
2072  std::function<std::function<bool (const Sample<Key, Value, UpdateTag>&)>(const Criteria&)> factory) noexcept
2073 {
2074  std::lock_guard<std::mutex> lock(_mutex);
2075  _sampleFilterFactories->set(name, factory);
2076 }
2077 
2078 template<typename Key, typename Value, typename UpdateTag> std::shared_ptr<DataStormI::TopicReader>
2080 {
2081  std::lock_guard<std::mutex> lock(_mutex);
2082  if(!_reader)
2083  {
2084  auto sampleFactory = std::make_shared<DataStormI::SampleFactoryT<Key, Value, UpdateTag>>();
2085  _reader = _topicFactory->createTopicReader(_name, _keyFactory, _tagFactory, sampleFactory, _keyFilterFactories,
2086  _sampleFilterFactories);
2087  _reader->setUpdaters(_writer ? _writer->getUpdaters() : _updaters);
2088  _updaters.clear();
2089  }
2090  return _reader;
2091 }
2092 
2093 template<typename Key, typename Value, typename UpdateTag> std::shared_ptr<DataStormI::TopicWriter>
2094 Topic<Key, Value, UpdateTag>::getWriter() const noexcept
2095 {
2096  std::lock_guard<std::mutex> lock(_mutex);
2097  if(!_writer)
2098  {
2099  _writer = _topicFactory->createTopicWriter(_name, _keyFactory, _tagFactory, nullptr, _keyFilterFactories,
2100  _sampleFilterFactories);
2101  _writer->setUpdaters(_reader ? _reader->getUpdaters() : _updaters);
2102  _updaters.clear();
2103  }
2104  return _writer;
2105 }
2106 
2107 template<typename Key, typename Value, typename UpdateTag> std::shared_ptr<Ice::Communicator>
2108 Topic<Key, Value, UpdateTag>::getCommunicator() const noexcept
2109 {
2110  return _topicFactory->getCommunicator();
2111 }
2112 
2113 }
UpdateTag UpdateTagType
The topic&#39;s update tag type (defaults to std::string if not specified).
Definition: DataStorm.h:529
Sample< Key, Value, UpdateTag > getLast()
Get the last written sample.
Definition: DataStorm.h:1647
void waitForWriters(unsigned int count=1) const
Wait for given number of data writers to be online.
Definition: DataStorm.h:1981
bool hasWriters() const noexcept
Indicates whether or not data writers are online.
Definition: DataStorm.h:1975
std::string name
The filter name.
Definition: DataStorm.h:710
std::string getOrigin() const noexcept
The origin of the sample.
Definition: DataStorm.h:1310
void remove(const Key &key) noexcept
Remove the data element.
Definition: DataStorm.h:1813
Topic(const Node &node, const std::string &name) noexcept
Construct a new Topic for the topic with the given name.
Definition: DataStorm.h:1918
MultiKeyReader(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Construct a new reader for the given keys.
Definition: DataStorm.h:1505
void update(const Key &key, const Value &value) noexcept
Update the data element.
Definition: DataStorm.h:1792
MultiKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const std::vector< Key > &keys, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Construct a new writer for the given keys.
Definition: DataStorm.h:1757
Value ValueType
The type of the sample value.
Definition: DataStorm.h:54
std::function< void(const Key &, const UpdateValue &)> partialUpdate(const UpdateTag &tag) noexcept
Get a partial udpate generator function for the given partial update tag.
Definition: DataStorm.h:1800
bool hasWriters() const noexcept
Indicates whether or not writers are online.
Definition: DataStorm.h:1352
Value ValueType
The value type.
Definition: DataStorm.h:224
FilteredKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Filter< KeyFilterCriteria > &keyFilter, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig())
Construct a new reader for the given key filter.
void setUpdater(const UpdateTag &tag, std::function< void(Value &, UpdateValue)> updater) noexcept
Set an updater function for the given update tag.
Definition: DataStorm.h:2023
Key KeyType
The key type.
Definition: DataStorm.h:382
const Value & getValue() const noexcept
The value of the sample.
Definition: DataStorm.h:1292
void add(const Key &key, const Value &value) noexcept
Add the data element.
Definition: DataStorm.h:1785
Value ValueType
The topic&#39;s value type.
Definition: DataStorm.h:524
SingleKeyReader(const Topic< Key, Value, UpdateTag > &topic, const Key &key, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Construct a new reader for the given key.
Definition: DataStorm.h:1468
T criteria
The filter criteria value.
Definition: DataStorm.h:713
std::chrono::time_point< std::chrono::system_clock > getTimeStamp() const noexcept
The timestamp of the sample.
Definition: DataStorm.h:1304
Reader(Reader &&reader) noexcept
Transfers the given reader to this reader.
Definition: DataStorm.h:1331
MultiKeyWriter & operator=(MultiKeyWriter &&writer) noexcept
Move assignement operator.
Definition: DataStorm.h:1778
The filtered reader to read data elements whose key match a given filter.
Definition: DataStorm.h:960
std::function< void(const UpdateValue &)> partialUpdate(const UpdateTag &tag) noexcept
Get a partial udpate generator function for the given partial update tag.
Definition: DataStorm.h:1739
Topic & operator=(Topic &&topic) noexcept
Move assignement operator.
Definition: DataStorm.h:1960
std::vector< Key > getConnectedKeys() const noexcept
Get the keys for which writers are connected to this reader.
Definition: DataStorm.h:1376
static T clone(const T &value) noexcept
Clone the given value.
Definition: Types.h:277
Value ValueType
The value type.
Definition: DataStorm.h:387
~Topic()
Destruct the Topic.
Definition: DataStorm.h:1946
The Decoder template provides a method to decode user types.
Definition: Types.h:241
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
Definition: DataStorm.h:1671
bool hasUnread() const noexcept
Returns wether or not unread samples are available.
Definition: DataStorm.h:1408
Key KeyType
The type of the sample key.
Definition: DataStorm.h:49
MultiKeyWriter< K, V, UT > makeAnyKeyWriter(const Topic< K, V, UT > &topic, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Creates an any-key writer for the given topic.
Definition: DataStorm.h:1260
The WriterConfig class specifies configuration options specific to writers.
Definition: Types.h:161
~Reader()
Destruct the reader.
Definition: DataStorm.h:1336
FilteredKeyReader< K, V, UT > makeFilteredKeyReader(const Topic< K, V, UT > &topic, const Filter< KFC > &filter, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig())
Creates a new filtered reader for the given topic and key filter.
Definition: DataStorm.h:1028
Writer(Writer &&writer) noexcept
Transfers the given writer to this writer.
Definition: DataStorm.h:1589
The Topic class.
Definition: DataStorm.h:512
std::vector< std::string > getConnectedReaders() const noexcept
Get the connected readers.
Definition: DataStorm.h:1628
bool hasReaders() const noexcept
Indicates whether or not readers are online.
Definition: DataStorm.h:1610
void onConnectedWriters(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected writers and when a new writer conne...
Definition: DataStorm.h:1440
The Reader class is used to retrieve samples for a data element.
Definition: DataStorm.h:212
MultiKeyReader< K, V, UT > makeMultiKeyReader(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Creates a multi-key reader for the given topic.
Definition: DataStorm.h:886
The writer class is used to write samples for a data element.
Definition: DataStorm.h:375
MultiKeyWriter< K, V, UT > makeMultiKeyWriter(const Topic< K, V, UT > &topic, const std::vector< typename Topic< K, V, UT >::KeyType > &keys, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Creates a multi-key writer for the given topic and keys.
Definition: DataStorm.h:1243
void waitForReaders(unsigned int count=1) const
Wait for given number of data readers to be online.
Definition: DataStorm.h:2005
void remove() noexcept
Remove the data element.
Definition: DataStorm.h:1750
void setSampleFilter(const std::string &name, std::function< std::function< bool(const SampleType &)>(const Criteria &)> factory) noexcept
Set a sample filter factory.
Definition: DataStorm.h:2071
MultiKeyReader< K, V, UT > makeAnyKeyReader(const Topic< K, V, UT > &topic, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Creates an any-key reader for the given topic.
Definition: DataStorm.h:927
The Encoder template provides a method to encode decode user types.
Definition: Types.h:214
void onConnectedKeys(std::function< void(std::vector< Key >)> init, std::function< void(CallbackReason, Key)> update) noexcept
Calls the given functions to provide the initial set of connected keys and when a key is added or rem...
Definition: DataStorm.h:1420
The key writer to write the data element associated with a given key.
Definition: DataStorm.h:1062
Key KeyType
The topic&#39;s key type.
Definition: DataStorm.h:519
std::vector< Key > getConnectedKeys() const noexcept
Get the keys for which readers are connected to this writer.
Definition: DataStorm.h:1634
MultiKeyReader & operator=(MultiKeyReader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1538
UpdateTag getUpdateTag() const noexcept
The update tag for the partial update.
Definition: DataStorm.h:1298
CallbackReason
The callback action enumurator specifies the reason why a callback is called.
Definition: Types.h:195
SampleEvent getEvent() const noexcept
The event associated with the sample.
Definition: DataStorm.h:1280
Filter(const std::string &name, TT &&criteria) noexcept
Construct a filter structure with the given name and criteria.
Definition: DataStorm.h:705
void waitForNoWriters() const
Wait for readers to be offline.
Definition: DataStorm.h:1364
void waitForUnread(unsigned int count=1) const
Wait for given number of unread samples to be available.
Definition: DataStorm.h:1402
void onSamples(std::function< void(std::vector< Sample< Key, Value, UpdateTag >>)> init, std::function< void(Sample< Key, Value, UpdateTag >)> queue) noexcept
Calls the given function to provide the initial set of unread samples and when new samples are queued...
Definition: DataStorm.h:1447
FilteredKeyReader & operator=(FilteredKeyReader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1579
void add(const Value &value) noexcept
Add the data element.
Definition: DataStorm.h:1724
SingleKeyReader & operator=(SingleKeyReader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1498
Definition: CtrlCHandler.h:13
bool hasReaders() const noexcept
Indicates whether or not data readers are online.
Definition: DataStorm.h:1999
SingleKeyReader< K, V, UT > makeSingleKeyReader(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, const std::string &name=std::string(), const ReaderConfig &config=ReaderConfig()) noexcept
Creates a key reader for the given topic and key.
Definition: DataStorm.h:845
SingleKeyWriter< K, V, UT > makeSingleKeyWriter(const Topic< K, V, UT > &topic, const typename Topic< K, V, UT >::KeyType &key, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Creates a key writer for the given topic and key.
Definition: DataStorm.h:1225
void waitForWriters(unsigned int count=1) const
Wait for given number of writers to be online.
Definition: DataStorm.h:1358
The key reader to read the data element associated with a given key.
Definition: DataStorm.h:722
SingleKeyWriter(const Topic< Key, Value, UpdateTag > &topic, const Key &key, const std::string &name=std::string(), const WriterConfig &config=WriterConfig()) noexcept
Construct a new writer for the given key.
Definition: DataStorm.h:1698
const Key & getKey() const noexcept
The key of the sample.
Definition: DataStorm.h:1286
std::vector< Sample< Key, Value, UpdateTag > > getAllUnread() noexcept
Returns all the unread samples.
Definition: DataStorm.h:1389
void setReaderDefaultConfig(const ReaderConfig &config) noexcept
Set the default configuration used to construct readers.
Definition: DataStorm.h:1993
A sample provides information about a data element update.
Definition: DataStorm.h:42
std::string getSession() const noexcept
Get the session identifier of the session that received this sample.
Definition: DataStorm.h:1316
void waitForReaders(unsigned int count=1) const
Wait for given number of readers to be online.
Definition: DataStorm.h:1616
void waitForNoReaders() const
Wait for data readers to be offline.
Definition: DataStorm.h:2011
The Node class allows creating topic readers and writers.
Definition: Node.h:39
Key KeyType
The key type.
Definition: DataStorm.h:219
UpdateTag UpdateTagType
The type of the update tag.
Definition: DataStorm.h:60
The ReaderConfig class specifies configuration options specific to readers.
Definition: Types.h:122
The key reader to read the data element associated with a given set of keys.
Definition: DataStorm.h:780
void waitForNoReaders() const
Wait for readers to be offline.
Definition: DataStorm.h:1622
void setKeyFilter(const std::string &name, std::function< std::function< bool(const Key &)>(const Criteria &)> factory) noexcept
Set a key filter factory.
Definition: DataStorm.h:2063
void onConnectedReaders(std::function< void(std::vector< std::string >)> init, std::function< void(CallbackReason, std::string)> update) noexcept
Calls the given functions to provide the initial set of connected readers and when a new reader conne...
Definition: DataStorm.h:1691
std::vector< std::string > getConnectedWriters() const noexcept
Get the connected writers.
Definition: DataStorm.h:1370
Definition: InternalI.h:18
void setWriterDefaultConfig(const WriterConfig &config) noexcept
Set the default configuration used to construct readers.
Definition: DataStorm.h:2017
static std::vector< unsigned char > encode(const std::shared_ptr< Ice::Communicator > &communicator, const T &value) noexcept
Encode the given value.
Definition: Types.h:326
Writer & operator=(Writer &&writer) noexcept
Move assignement operator.
Definition: DataStorm.h:1594
SingleKeyWriter & operator=(SingleKeyWriter &&writer) noexcept
Move assignement operator.
Definition: DataStorm.h:1717
void waitForNoWriters() const
Wait for data writers to be offline.
Definition: DataStorm.h:1987
The key writer to write data elements associated with a given set of keys.
Definition: DataStorm.h:1138
Filter structure to specify the filter name and criteria value.
Definition: DataStorm.h:696
void update(const Value &value) noexcept
Update the data element.
Definition: DataStorm.h:1731
Reader & operator=(Reader &&reader) noexcept
Move assignement operator.
Definition: DataStorm.h:1345
std::vector< Sample< Key, Value, UpdateTag > > getAll() noexcept
Get all the written sample kept in the writer history.
Definition: DataStorm.h:1658
~Writer()
Destruct the writer.
Definition: DataStorm.h:1601
Sample< Key, Value, UpdateTag > getNextUnread()
Returns the next unread sample.
Definition: DataStorm.h:1414