DataStorm  0.1
Data Distribution Service
InternalT.h
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2018-present ZeroC, Inc. All rights reserved.
4 //
5 // **********************************************************************
6 
7 #pragma once
8 
9 #include <DataStorm/Config.h>
10 #include <DataStorm/Types.h>
11 #include <DataStorm/InternalI.h>
12 
13 #include <Ice/Ice.h>
14 
15 namespace DataStorm
16 {
17 
18 template<typename K, typename V, typename U> class Sample;
19 
20 }
21 
22 namespace DataStormI
23 {
24 
25 template<typename T>
26 class has_communicator_parameter
27 {
28  template<typename TT, typename SS>
29  static auto testE(int) -> decltype(TT::encode(std::declval<std::shared_ptr<Ice::Communicator>&>(),
30  std::declval<SS&>()), std::true_type());
31 
32  template<typename, typename>
33  static auto testE(...) -> std::false_type;
34 
35  template<typename TT, typename SS>
36  static auto testD(int) -> decltype(TT::decode(std::declval<std::shared_ptr<Ice::Communicator>&>(),
37  std::vector<unsigned char>()), std::true_type());
38 
39  template<typename, typename>
40  static auto testD(...) -> std::false_type;
41 
42 public:
43 
44  static const bool value = decltype(testE<DataStorm::Encoder<T>, T>(0))::value && decltype(testD<DataStorm::Decoder<T>, T>(0))::value;
45 };
46 
47 template<typename T, typename Enabler=void>
48 struct EncoderT
49 {
50  static std::vector<unsigned char> encode(const std::shared_ptr<Ice::Communicator>&, const T& value)
51  {
52  return DataStorm::Encoder<T>::encode(value);
53  }
54 };
55 
56 template<typename T, typename Enabler=void>
57 struct DecoderT
58 {
59  static T decode(const std::shared_ptr<Ice::Communicator>&, const std::vector<unsigned char>& data)
60  {
61  return DataStorm::Decoder<T>::decode(data);
62  }
63 };
64 
65 template<typename T>
66 struct EncoderT<T, typename std::enable_if<has_communicator_parameter<T>::value>::type>
67 {
68  static std::vector<unsigned char> encode(const std::shared_ptr<Ice::Communicator>& communicator, const T& value)
69  {
70  return DataStorm::Encoder<T>::encode(communicator, value);
71  }
72 };
73 
74 template<typename T>
75 struct DecoderT<T, typename std::enable_if<has_communicator_parameter<T>::value>::type>
76 {
77  static T decode(const std::shared_ptr<Ice::Communicator>& communicator, const std::vector<unsigned char>& data)
78  {
79  return DataStorm::Decoder<T>::decode(communicator, data);
80  }
81 };
82 
83 template<typename T>
84 class is_streamable
85 {
86  template<typename TT, typename SS>
87  static auto test(int) -> decltype(std::declval<SS&>() << std::declval<TT>(), std::true_type());
88 
89  template<typename, typename>
90  static auto test(...) -> std::false_type;
91 
92 public:
93 
94  static const bool value = decltype(test<T, std::ostream>(0))::value;
95 };
96 
97 template<typename T, typename Enabler=void> struct Stringifier
98 {
99  static std::string
100  toString(const T& value)
101  {
102  std::ostringstream os;
103  os << typeid(value).name() << '(' << &value << ')';
104  return os.str();
105  }
106 };
107 
108 template<typename T> struct Stringifier<T, typename std::enable_if<is_streamable<T>::value>::type>
109 {
110  static std::string
111  toString(const T& value)
112  {
113  std::ostringstream os;
114  os << value;
115  return os.str();
116  }
117 };
118 
119 template<typename T> class AbstractElementT : virtual public Element
120 {
121 public:
122 
123  template<typename TT>
124  AbstractElementT(TT&& v, long long int id) : _value(std::forward<TT>(v)), _id(id)
125  {
126  }
127 
128  virtual std::string toString() const override
129  {
130  std::ostringstream os;
131  os << _id << ':' << Stringifier<T>::toString(_value);
132  return os.str();
133  }
134 
135  virtual std::vector<unsigned char> encode(const std::shared_ptr<Ice::Communicator>& communicator) const override
136  {
137  return EncoderT<T>::encode(communicator, _value);
138  }
139 
140  virtual long long int getId() const override
141  {
142  return _id;
143  }
144 
145  const T& get() const
146  {
147  return _value;
148  }
149 
150 protected:
151 
152  const T _value;
153  const long long int _id;
154 };
155 
156 template<typename K, typename V> class AbstractFactoryT : public std::enable_shared_from_this<AbstractFactoryT<K, V>>
157 {
158  struct Deleter
159  {
160  void operator()(V* obj)
161  {
162  auto factory = _factory.lock();
163  if(factory)
164  {
165  factory->remove(obj);
166  }
167  }
168 
169  std::weak_ptr<AbstractFactoryT<K, V>> _factory;
170 
171  } _deleter;
172 
173 public:
174 
175  AbstractFactoryT() : _nextId(1)
176  {
177  }
178 
179  void
180  init()
181  {
182  _deleter = { std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this() };
183  }
184 
185  template<typename F, typename... Args> std::shared_ptr<typename V::BaseClassType>
186  create(F&& value, Args&&... args)
187  {
188  std::lock_guard<std::mutex> lock(_mutex);
189  return createImpl(std::forward<F>(value), std::forward<Args>(args)...);
190  }
191 
192  std::vector<std::shared_ptr<typename V::BaseClassType>>
193  create(std::vector<K> values)
194  {
195  std::lock_guard<std::mutex> lock(_mutex);
196  std::vector<std::shared_ptr<typename V::BaseClassType>> seq;
197  for(auto& v : values)
198  {
199  seq.push_back(createImpl(std::move(v)));
200  }
201  return seq;
202  }
203 
204 protected:
205 
206  friend struct Deleter;
207 
208  std::shared_ptr<typename V::BaseClassType>
209  getImpl(long long id) const
210  {
211  std::lock_guard<std::mutex> lock(_mutex);
212  auto p = _elementsById.find(id);
213  if(p != _elementsById.end())
214  {
215  auto k = p->second.lock();
216  if(k)
217  {
218  return k;
219  }
220  }
221  return nullptr;
222  }
223 
224  template<typename F, typename... Args> std::shared_ptr<V>
225  createImpl(F&& value, Args&&... args)
226  {
227  auto p = _elements.find(value);
228  if(p != _elements.end())
229  {
230  auto k = p->second.lock();
231  if(k)
232  {
233  return k;
234  }
235 
236  //
237  // The key is being removed concurrently by the deleter, remove it now
238  // to allow the insertion of a new key. The deleter won't remove the
239  // new key.
240  //
241  _elements.erase(p);
242  }
243 
244  auto k = std::shared_ptr<V>(new V(std::forward<F>(value), std::forward<Args>(args)..., ++_nextId), _deleter);
245  _elements[k->get()] = k;
246  _elementsById[k->getId()] = k;
247  return k;
248  }
249 
250  void remove(V* v)
251  {
252  std::lock_guard<std::mutex> lock(_mutex);
253  auto p = _elements.find(v->get());
254  if(p != _elements.end())
255  {
256  auto e = p->second.lock();
257  if(e && e.get() == v)
258  {
259  _elements.erase(p);
260  }
261  }
262  _elementsById.erase(v->getId());
263  }
264 
265  mutable std::mutex _mutex;
266  std::map<K, std::weak_ptr<V>> _elements;
267  std::map<long long int, std::weak_ptr<V>> _elementsById;
268  long long int _nextId;
269 };
270 
271 template<typename K> class KeyT : public Key, public AbstractElementT<K>
272 {
273 public:
274 
275  virtual std::string toString() const override
276  {
277  return "k" + AbstractElementT<K>::toString();
278  }
279 
280  using AbstractElementT<K>::AbstractElementT;
281  using BaseClassType = Key;
282 };
283 
284 template<typename K> class KeyFactoryT : public KeyFactory, public AbstractFactoryT<K, KeyT<K>>
285 {
286 public:
287 
288  using AbstractFactoryT<K, KeyT<K>>::AbstractFactoryT;
289 
290  virtual std::shared_ptr<Key>
291  get(long long int id) const override
292  {
293  return AbstractFactoryT<K, KeyT<K>>::getImpl(id);
294  }
295 
296  virtual std::shared_ptr<Key>
297  decode(const std::shared_ptr<Ice::Communicator>& communicator, const std::vector<unsigned char>& data) override
298  {
299  return AbstractFactoryT<K, KeyT<K>>::create(DecoderT<K>::decode(communicator, data));
300  }
301 
302  static std::shared_ptr<KeyFactoryT<K>> createFactory()
303  {
304  auto f = std::make_shared<KeyFactoryT<K>>();
305  f->init();
306  return f;
307  }
308 };
309 
310 template<typename T> class TagT : public Tag, public AbstractElementT<T>
311 {
312 public:
313 
314  virtual std::string toString() const override
315  {
316  return "t" + AbstractElementT<T>::toString();
317  }
318 
319  using AbstractElementT<T>::AbstractElementT;
320  using BaseClassType = Tag;
321 };
322 
323 template<typename T> class TagFactoryT : public TagFactory, public AbstractFactoryT<T, TagT<T>>
324 {
325 public:
326 
327  using AbstractFactoryT<T, TagT<T>>::AbstractFactoryT;
328 
329  virtual std::shared_ptr<Tag>
330  get(long long int id) const override
331  {
332  return AbstractFactoryT<T, TagT<T>>::getImpl(id);
333  }
334 
335  virtual std::shared_ptr<Tag>
336  decode(const std::shared_ptr<Ice::Communicator>& communicator, const std::vector<unsigned char>& data) override
337  {
338  return AbstractFactoryT<T, TagT<T>>::create(DecoderT<T>::decode(communicator, data));
339  }
340 
341  static std::shared_ptr<TagFactoryT<T>> createFactory()
342  {
343  auto f = std::make_shared<TagFactoryT<T>>();
344  f->init();
345  return f;
346  }
347 };
348 
349 template<typename Key, typename Value, typename UpdateTag> class SampleT :
350  public Sample, public std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>
351 {
352 public:
353 
354  SampleT(const std::string& session,
355  const std::string& origin,
356  long long int id,
357  DataStorm::SampleEvent event,
358  const std::shared_ptr<DataStormI::Key>& key,
359  const std::shared_ptr<DataStormI::Tag>& tag,
360  std::vector<unsigned char> value,
361  long long int timestamp) :
362  Sample(session, origin, id, event, key, tag, value, timestamp), _hasValue(false)
363  {
364  }
365 
366  SampleT(DataStorm::SampleEvent event) : Sample(event), _hasValue(false)
367  {
368  }
369 
370  SampleT(DataStorm::SampleEvent event, Value value) : Sample(event), _hasValue(true), _value(std::move(value))
371  {
372  }
373 
374  SampleT(std::vector<unsigned char> value, const std::shared_ptr<Tag>& tag) :
375  Sample(DataStorm::SampleEvent::PartialUpdate, tag),
376  _hasValue(false)
377  {
378  _encodedValue = std::move(value);
379  }
380 
382  get()
383  {
384  auto impl = std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>::shared_from_this();
386  }
387 
388  const Key& getKey()
389  {
390  assert(key);
391  return std::static_pointer_cast<KeyT<Key>>(key)->get();
392  }
393 
394  const Value& getValue() const
395  {
396  return _value;
397  }
398 
399  UpdateTag getTag() const
400  {
401  return tag ? std::static_pointer_cast<TagT<UpdateTag>>(tag)->get() : UpdateTag();
402  }
403 
404  void setValue(Value value)
405  {
406  _value = std::move(value);
407  _hasValue = true;
408  }
409 
410  virtual bool hasValue() const override
411  {
412  return _hasValue;
413  }
414 
415  virtual void setValue(const std::shared_ptr<Sample>& sample) override
416  {
417  if(sample)
418  {
420  std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(sample)->getValue());
421  }
422  else
423  {
424  _value = Value();
425  }
426  _hasValue = true;
427  }
428 
429  virtual const std::vector<unsigned char>& encode(const std::shared_ptr<Ice::Communicator>& communicator) override
430  {
431  if(_encodedValue.empty())
432  {
433  _encodedValue = encodeValue(communicator);
434  }
435  return _encodedValue;
436  }
437 
438  virtual std::vector<unsigned char> encodeValue(const std::shared_ptr<Ice::Communicator>& communicator) override
439  {
440  assert(_hasValue || event == DataStorm::SampleEvent::Remove);
441  return EncoderT<Value>::encode(communicator, _value);
442  }
443 
444  virtual void decode(const std::shared_ptr<Ice::Communicator>& communicator) override
445  {
446  if(!_encodedValue.empty())
447  {
448  _hasValue = true;
449  _value = DecoderT<Value>::decode(communicator, _encodedValue);
450  _encodedValue.clear();
451  }
452  }
453 
454 private:
455 
456  bool _hasValue;
457  Value _value;
458 };
459 
460 template<typename Key, typename Value, typename UpdateTag> class SampleFactoryT : public SampleFactory
461 {
462 public:
463 
464  virtual std::shared_ptr<Sample> create(const std::string& session,
465  const std::string& origin,
466  long long int id,
467  DataStorm::SampleEvent type,
468  const std::shared_ptr<DataStormI::Key>& key,
469  const std::shared_ptr<DataStormI::Tag>& tag,
470  std::vector<unsigned char> value,
471  long long int timestamp)
472  {
473  return std::make_shared<SampleT<Key, Value, UpdateTag>>(session,
474  origin,
475  id,
476  type,
477  key,
478  tag,
479  std::move(value),
480  timestamp);
481  }
482 };
483 
484 template<typename C, typename V> class FilterT : public Filter, public AbstractElementT<C>
485 {
486 public:
487 
488  template<typename CC>
489  FilterT(CC&& criteria, long long int id) : AbstractElementT<C>::AbstractElementT(std::forward<CC>(criteria), id)
490  {
491  }
492 
493  virtual std::string toString() const override
494  {
495  return "f" + AbstractElementT<C>::toString();
496  }
497 
498  virtual bool match(const std::shared_ptr<Filterable>& value) const override
499  {
500  return _lambda(std::static_pointer_cast<V>(value)->get());
501  }
502 
503  virtual const std::string& getName() const override
504  {
505  return _name;
506  }
507 
508  template<typename FF> void
509  init(const std::string& name, FF&& lambda)
510  {
511  _name = name;
512  _lambda = std::forward<FF>(lambda);
513  }
514 
515  using BaseClassType = Filter;
516 
517 private:
518 
519  std::string _name;
520  std::function<bool(const typename std::remove_reference<decltype(std::declval<V>().get())>::type&)> _lambda;
521 };
522 
523 template<typename C, typename V> class FilterFactoryT : public FilterFactory, public AbstractFactoryT<C, FilterT<C, V>>
524 {
525 public:
526 
527  FilterFactoryT()
528  {
529  }
530 
531  virtual std::shared_ptr<Filter>
532  get(long long int id) const override
533  {
534  return AbstractFactoryT<C, FilterT<C, V>>::getImpl(id);
535  }
536 
537  virtual std::shared_ptr<Filter>
538  decode(const std::shared_ptr<Ice::Communicator>& communicator, const std::vector<unsigned char>& data) override
539  {
540  return AbstractFactoryT<C, FilterT<C, V>>::create(DecoderT<C>::decode(communicator, data));
541  }
542 
543  static std::shared_ptr<FilterFactoryT<C, V>> createFactory()
544  {
545  auto f = std::make_shared<FilterFactoryT<C, V>>();
546  f->init();
547  return f;
548  }
549 };
550 
551 template<typename ValueT> class FilterManagerT : public FilterManager
552 {
553  using Value = typename std::remove_reference<decltype(std::declval<ValueT>().get())>::type;
554 
555  struct Factory
556  {
557  virtual ~Factory() = default;
558 
559  virtual std::shared_ptr<Filter> get(long long int) const = 0;
560 
561  virtual std::shared_ptr<Filter>
562  decode(const std::shared_ptr<Ice::Communicator>&, const std::vector<unsigned char>&) = 0;
563  };
564 
565  template<typename Criteria> struct FactoryT : Factory
566  {
567  FactoryT(const std::string& name, std::function<std::function<bool(const Value&)>(const Criteria&)> lambda) :
568  name(name), lambda(std::move(lambda))
569  {
570  }
571 
572  std::shared_ptr<Filter> create(Criteria criteria)
573  {
574  auto filter = std::static_pointer_cast<FilterT<Criteria, ValueT>>(filterFactory.create(criteria));
575  filter->init(name, lambda(filter->get()));
576  return filter;
577  }
578 
579  virtual std::shared_ptr<Filter>
580  get(long long int id) const
581  {
582  return filterFactory.get(id);
583  }
584 
585  virtual std::shared_ptr<Filter>
586  decode(const std::shared_ptr<Ice::Communicator>& communicator, const std::vector<unsigned char>& data)
587  {
588  return create(DecoderT<Criteria>::decode(communicator, data));
589  }
590 
591  const std::string name;
592  std::function<std::function<bool(const Value&)>(const Criteria&)> lambda;
593  FilterFactoryT<Criteria, ValueT> filterFactory;
594  };
595 
596 public:
597 
598  FilterManagerT()
599  {
600  }
601 
602  template<typename Criteria> std::shared_ptr<Filter>
603  create(const std::string& name, const Criteria& criteria)
604  {
605  auto p = _factories.find(name);
606  if(p == _factories.end())
607  {
608  throw std::invalid_argument("unknown filter `" + name + "'");
609  }
610 
611  auto factory = dynamic_cast<FactoryT<Criteria>*>(p->second.get());
612  if(!factory)
613  {
614  throw std::invalid_argument("filter `" + name + "' type doesn't match");
615  }
616 
617  return factory->create(criteria);
618  }
619 
620  virtual std::shared_ptr<Filter>
621  decode(const std::shared_ptr<Ice::Communicator>& communicator,
622  const std::string& name,
623  const std::vector<unsigned char>& data) override
624  {
625  auto p = _factories.find(name);
626  if(p == _factories.end())
627  {
628  return nullptr;
629  }
630 
631  return p->second->decode(communicator, data);
632  }
633 
634  virtual std::shared_ptr<Filter>
635  get(const std::string& name, long long int id) const override
636  {
637  auto p = _factories.find(name);
638  if(p == _factories.end())
639  {
640  return nullptr;
641  }
642 
643  return p->second->get(id);
644  }
645 
646  template<typename Criteria> void
647  set(const std::string& name, std::function<std::function<bool(const Value&)>(const Criteria&)> lambda)
648  {
649  if(lambda)
650  {
651  _factories[name] = std::unique_ptr<Factory>(new FactoryT<Criteria>(name, std::move(lambda)));
652  }
653  else
654  {
655  _factories.erase(name);
656  }
657  }
658 
659  static std::shared_ptr<FilterManagerT<ValueT>> create()
660  {
661  return std::make_shared<FilterManagerT<ValueT>>();
662  }
663 
664 private:
665 
666  std::map<std::string, std::unique_ptr<Factory>> _factories;
667 };
668 
669 }
static T decode(const std::shared_ptr< Ice::Communicator > &communicator, const std::vector< unsigned char > &value) noexcept
Decode a value.
Definition: Types.h:339
static T clone(const T &value) noexcept
Clone the given value.
Definition: Types.h:277
The Encoder template provides a method to encode decode user types.
Definition: Types.h:214
Definition: CtrlCHandler.h:13
A sample provides information about a data element update.
Definition: DataStorm.h:42
Definition: InternalI.h:18
static std::vector< unsigned char > encode(const std::shared_ptr< Ice::Communicator > &communicator, const T &value) noexcept
Encode the given value.
Definition: Types.h:326