18 template<
typename K,
typename V,
typename U>
class Sample;
26 class has_communicator_parameter
28 template<
typename TT,
typename SS>
29 static auto testE(
int) -> decltype(TT::encode(std::declval<std::shared_ptr<Ice::Communicator>&>(),
30 std::declval<SS&>()), std::true_type());
32 template<
typename,
typename>
33 static auto testE(...) -> std::false_type;
35 template<
typename TT,
typename SS>
36 static auto testD(
int) -> decltype(TT::decode(std::declval<std::shared_ptr<Ice::Communicator>&>(),
37 std::vector<unsigned char>()), std::true_type());
39 template<
typename,
typename>
40 static auto testD(...) -> std::false_type;
47 template<
typename T,
typename Enabler=
void>
50 static std::vector<unsigned char> encode(
const std::shared_ptr<Ice::Communicator>&,
const T& value)
56 template<
typename T,
typename Enabler=
void>
59 static T decode(
const std::shared_ptr<Ice::Communicator>&,
const std::vector<unsigned char>& data)
66 struct EncoderT<T, typename std::enable_if<has_communicator_parameter<T>::value>::type>
68 static std::vector<unsigned char> encode(
const std::shared_ptr<Ice::Communicator>& communicator,
const T& value)
75 struct DecoderT<T, typename std::enable_if<has_communicator_parameter<T>::value>::type>
77 static T decode(
const std::shared_ptr<Ice::Communicator>& communicator,
const std::vector<unsigned char>& data)
86 template<
typename TT,
typename SS>
87 static auto test(
int) -> decltype(std::declval<SS&>() << std::declval<TT>(), std::true_type());
89 template<
typename,
typename>
90 static auto test(...) -> std::false_type;
94 static const bool value = decltype(test<T, std::ostream>(0))::value;
97 template<
typename T,
typename Enabler=
void>
struct Stringifier
100 toString(
const T& value)
102 std::ostringstream os;
103 os <<
typeid(value).name() <<
'(' << &value <<
')';
108 template<
typename T>
struct Stringifier<T, typename std::enable_if<is_streamable<T>::value>::type>
111 toString(
const T& value)
113 std::ostringstream os;
119 template<
typename T>
class AbstractElementT :
virtual public Element
123 template<
typename TT>
124 AbstractElementT(TT&& v,
long long int id) : _value(std::forward<TT>(v)), _id(id)
128 virtual std::string toString()
const override 130 std::ostringstream os;
131 os << _id << ':' << Stringifier<T>::toString(_value);
135 virtual std::vector<unsigned char> encode(
const std::shared_ptr<Ice::Communicator>& communicator)
const override 137 return EncoderT<T>::encode(communicator, _value);
140 virtual long long int getId()
const override 153 const long long int _id;
156 template<
typename K,
typename V>
class AbstractFactoryT :
public std::enable_shared_from_this<AbstractFactoryT<K, V>>
160 void operator()(V* obj)
162 auto factory = _factory.lock();
165 factory->remove(obj);
169 std::weak_ptr<AbstractFactoryT<K, V>> _factory;
175 AbstractFactoryT() : _nextId(1)
182 _deleter = { std::enable_shared_from_this<AbstractFactoryT<K, V>>::shared_from_this() };
185 template<
typename F,
typename... Args> std::shared_ptr<typename V::BaseClassType>
186 create(F&& value, Args&&... args)
188 std::lock_guard<std::mutex> lock(_mutex);
189 return createImpl(std::forward<F>(value), std::forward<Args>(args)...);
192 std::vector<std::shared_ptr<typename V::BaseClassType>>
193 create(std::vector<K> values)
195 std::lock_guard<std::mutex> lock(_mutex);
196 std::vector<std::shared_ptr<typename V::BaseClassType>> seq;
197 for(
auto& v : values)
199 seq.push_back(createImpl(std::move(v)));
206 friend struct Deleter;
208 std::shared_ptr<typename V::BaseClassType>
209 getImpl(
long long id)
const 211 std::lock_guard<std::mutex> lock(_mutex);
212 auto p = _elementsById.find(
id);
213 if(p != _elementsById.end())
215 auto k = p->second.lock();
224 template<
typename F,
typename... Args> std::shared_ptr<V>
225 createImpl(F&& value, Args&&... args)
227 auto p = _elements.find(value);
228 if(p != _elements.end())
230 auto k = p->second.lock();
244 auto k = std::shared_ptr<V>(
new V(std::forward<F>(value), std::forward<Args>(args)..., ++_nextId), _deleter);
245 _elements[k->get()] = k;
246 _elementsById[k->getId()] = k;
252 std::lock_guard<std::mutex> lock(_mutex);
253 auto p = _elements.find(v->get());
254 if(p != _elements.end())
256 auto e = p->second.lock();
257 if(e && e.get() == v)
262 _elementsById.erase(v->getId());
265 mutable std::mutex _mutex;
266 std::map<K, std::weak_ptr<V>> _elements;
267 std::map<long long int, std::weak_ptr<V>> _elementsById;
268 long long int _nextId;
271 template<
typename K>
class KeyT :
public Key,
public AbstractElementT<K>
275 virtual std::string toString()
const override 277 return "k" + AbstractElementT<K>::toString();
280 using AbstractElementT<K>::AbstractElementT;
281 using BaseClassType = Key;
284 template<
typename K>
class KeyFactoryT :
public KeyFactory,
public AbstractFactoryT<K, KeyT<K>>
288 using AbstractFactoryT<K, KeyT<K>>::AbstractFactoryT;
290 virtual std::shared_ptr<Key>
291 get(
long long int id)
const override 293 return AbstractFactoryT<K, KeyT<K>>::getImpl(
id);
296 virtual std::shared_ptr<Key>
297 decode(
const std::shared_ptr<Ice::Communicator>& communicator,
const std::vector<unsigned char>& data)
override 299 return AbstractFactoryT<K, KeyT<K>>::create(DecoderT<K>::decode(communicator, data));
302 static std::shared_ptr<KeyFactoryT<K>> createFactory()
304 auto f = std::make_shared<KeyFactoryT<K>>();
310 template<
typename T>
class TagT :
public Tag,
public AbstractElementT<T>
314 virtual std::string toString()
const override 316 return "t" + AbstractElementT<T>::toString();
319 using AbstractElementT<T>::AbstractElementT;
320 using BaseClassType = Tag;
323 template<
typename T>
class TagFactoryT :
public TagFactory,
public AbstractFactoryT<T, TagT<T>>
327 using AbstractFactoryT<T, TagT<T>>::AbstractFactoryT;
329 virtual std::shared_ptr<Tag>
330 get(
long long int id)
const override 332 return AbstractFactoryT<T, TagT<T>>::getImpl(
id);
335 virtual std::shared_ptr<Tag>
336 decode(
const std::shared_ptr<Ice::Communicator>& communicator,
const std::vector<unsigned char>& data)
override 338 return AbstractFactoryT<T, TagT<T>>::create(DecoderT<T>::decode(communicator, data));
341 static std::shared_ptr<TagFactoryT<T>> createFactory()
343 auto f = std::make_shared<TagFactoryT<T>>();
349 template<
typename Key,
typename Value,
typename UpdateTag>
class SampleT :
350 public Sample,
public std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>
354 SampleT(
const std::string& session,
355 const std::string& origin,
357 DataStorm::SampleEvent event,
358 const std::shared_ptr<DataStormI::Key>& key,
359 const std::shared_ptr<DataStormI::Tag>& tag,
360 std::vector<unsigned char> value,
361 long long int timestamp) :
362 Sample(session, origin, id, event, key, tag, value, timestamp), _hasValue(false)
366 SampleT(DataStorm::SampleEvent event) : Sample(event), _hasValue(false)
370 SampleT(DataStorm::SampleEvent event, Value value) : Sample(event), _hasValue(true), _value(std::move(value))
374 SampleT(std::vector<unsigned char> value,
const std::shared_ptr<Tag>& tag) :
375 Sample(
DataStorm::SampleEvent::PartialUpdate, tag),
378 _encodedValue = std::move(value);
384 auto impl = std::enable_shared_from_this<SampleT<Key, Value, UpdateTag>>::shared_from_this();
391 return std::static_pointer_cast<KeyT<Key>>(key)->
get();
394 const Value& getValue()
const 399 UpdateTag getTag()
const 401 return tag ? std::static_pointer_cast<TagT<UpdateTag>>(tag)->
get() : UpdateTag();
404 void setValue(Value value)
406 _value = std::move(value);
410 virtual bool hasValue()
const override 415 virtual void setValue(
const std::shared_ptr<Sample>& sample)
override 420 std::static_pointer_cast<DataStormI::SampleT<Key, Value, UpdateTag>>(sample)->getValue());
429 virtual const std::vector<unsigned char>& encode(
const std::shared_ptr<Ice::Communicator>& communicator)
override 431 if(_encodedValue.empty())
433 _encodedValue = encodeValue(communicator);
435 return _encodedValue;
438 virtual std::vector<unsigned char> encodeValue(
const std::shared_ptr<Ice::Communicator>& communicator)
override 440 assert(_hasValue || event == DataStorm::SampleEvent::Remove);
441 return EncoderT<Value>::encode(communicator, _value);
444 virtual void decode(
const std::shared_ptr<Ice::Communicator>& communicator)
override 446 if(!_encodedValue.empty())
449 _value = DecoderT<Value>::decode(communicator, _encodedValue);
450 _encodedValue.clear();
460 template<
typename Key,
typename Value,
typename UpdateTag>
class SampleFactoryT :
public SampleFactory
464 virtual std::shared_ptr<Sample> create(
const std::string& session,
465 const std::string& origin,
467 DataStorm::SampleEvent type,
468 const std::shared_ptr<DataStormI::Key>& key,
469 const std::shared_ptr<DataStormI::Tag>& tag,
470 std::vector<unsigned char> value,
471 long long int timestamp)
473 return std::make_shared<SampleT<Key, Value, UpdateTag>>(session,
484 template<
typename C,
typename V>
class FilterT :
public Filter,
public AbstractElementT<C>
488 template<
typename CC>
489 FilterT(CC&& criteria,
long long int id) : AbstractElementT<C>::AbstractElementT(std::forward<CC>(criteria), id)
493 virtual std::string toString()
const override 495 return "f" + AbstractElementT<C>::toString();
498 virtual bool match(
const std::shared_ptr<Filterable>& value)
const override 500 return _lambda(std::static_pointer_cast<V>(value)->
get());
503 virtual const std::string& getName()
const override 508 template<
typename FF>
void 509 init(
const std::string& name, FF&& lambda)
512 _lambda = std::forward<FF>(lambda);
515 using BaseClassType = Filter;
520 std::function<bool(const typename std::remove_reference<decltype(std::declval<V>().
get())>::type&)> _lambda;
523 template<
typename C,
typename V>
class FilterFactoryT :
public FilterFactory,
public AbstractFactoryT<C, FilterT<C, V>>
531 virtual std::shared_ptr<Filter>
532 get(
long long int id)
const override 534 return AbstractFactoryT<C, FilterT<C, V>>::getImpl(
id);
537 virtual std::shared_ptr<Filter>
538 decode(
const std::shared_ptr<Ice::Communicator>& communicator,
const std::vector<unsigned char>& data)
override 540 return AbstractFactoryT<C, FilterT<C, V>>::create(DecoderT<C>::decode(communicator, data));
543 static std::shared_ptr<FilterFactoryT<C, V>> createFactory()
545 auto f = std::make_shared<FilterFactoryT<C, V>>();
551 template<
typename ValueT>
class FilterManagerT :
public FilterManager
553 using Value =
typename std::remove_reference<decltype(std::declval<ValueT>().
get())>::type;
557 virtual ~Factory() =
default;
559 virtual std::shared_ptr<Filter>
get(
long long int)
const = 0;
561 virtual std::shared_ptr<Filter>
562 decode(
const std::shared_ptr<Ice::Communicator>&,
const std::vector<unsigned char>&) = 0;
565 template<
typename Criteria>
struct FactoryT : Factory
567 FactoryT(
const std::string& name, std::function<std::function<
bool(
const Value&)>(
const Criteria&)> lambda) :
568 name(name), lambda(std::move(lambda))
572 std::shared_ptr<Filter> create(Criteria criteria)
574 auto filter = std::static_pointer_cast<FilterT<Criteria, ValueT>>(filterFactory.create(criteria));
575 filter->init(name, lambda(filter->get()));
579 virtual std::shared_ptr<Filter>
580 get(
long long int id)
const 582 return filterFactory.get(
id);
585 virtual std::shared_ptr<Filter>
586 decode(
const std::shared_ptr<Ice::Communicator>& communicator,
const std::vector<unsigned char>& data)
588 return create(DecoderT<Criteria>::decode(communicator, data));
591 const std::string name;
592 std::function<std::function<bool(const Value&)>(
const Criteria&)> lambda;
593 FilterFactoryT<Criteria, ValueT> filterFactory;
602 template<
typename Criteria> std::shared_ptr<Filter>
603 create(
const std::string& name,
const Criteria& criteria)
605 auto p = _factories.find(name);
606 if(p == _factories.end())
608 throw std::invalid_argument(
"unknown filter `" + name +
"'");
611 auto factory =
dynamic_cast<FactoryT<Criteria>*
>(p->second.get());
614 throw std::invalid_argument(
"filter `" + name +
"' type doesn't match");
617 return factory->create(criteria);
620 virtual std::shared_ptr<Filter>
621 decode(
const std::shared_ptr<Ice::Communicator>& communicator,
622 const std::string& name,
623 const std::vector<unsigned char>& data)
override 625 auto p = _factories.find(name);
626 if(p == _factories.end())
631 return p->second->decode(communicator, data);
634 virtual std::shared_ptr<Filter>
635 get(
const std::string& name,
long long int id)
const override 637 auto p = _factories.find(name);
638 if(p == _factories.end())
643 return p->second->get(
id);
646 template<
typename Criteria>
void 647 set(
const std::string& name, std::function<std::function<bool(const Value&)>(
const Criteria&)> lambda)
651 _factories[name] = std::unique_ptr<Factory>(
new FactoryT<Criteria>(name, std::move(lambda)));
655 _factories.erase(name);
659 static std::shared_ptr<FilterManagerT<ValueT>> create()
661 return std::make_shared<FilterManagerT<ValueT>>();
666 std::map<std::string, std::unique_ptr<Factory>> _factories;
static T decode(const std::shared_ptr< Ice::Communicator > &communicator, const std::vector< unsigned char > &value) noexcept
Decode a value.
Definition: Types.h:339
static T clone(const T &value) noexcept
Clone the given value.
Definition: Types.h:277
The Encoder template provides a method to encode decode user types.
Definition: Types.h:214
Definition: CtrlCHandler.h:13
A sample provides information about a data element update.
Definition: DataStorm.h:42
Definition: InternalI.h:18
static std::vector< unsigned char > encode(const std::shared_ptr< Ice::Communicator > &communicator, const T &value) noexcept
Encode the given value.
Definition: Types.h:326