DataStorm  0.1
Data Distribution Service
InternalI.h
Go to the documentation of this file.
1 // **********************************************************************
2 //
3 // Copyright (c) 2018-present ZeroC, Inc. All rights reserved.
4 //
5 // **********************************************************************
6 
7 #pragma once
8 
9 #include <DataStorm/Config.h>
10 #include <DataStorm/Sample.h>
11 #include <DataStorm/Types.h>
12 
13 #include <Ice/Ice.h>
14 
15 //
16 // Private abstract API used by the template based API and the internal DataStorm implementation.
17 //
18 namespace DataStormI
19 {
20 
21 class Instance;
22 
23 class Filterable
24 {
25 public:
26 
27  virtual ~Filterable() = default;
28 };
29 
30 class Element
31 {
32 public:
33 
34  virtual ~Element() = default;
35  virtual std::string toString() const = 0;
36  virtual std::vector<unsigned char> encode(const std::shared_ptr<Ice::Communicator>&) const = 0;
37  virtual long long int getId() const = 0;
38 };
39 
40 class Key : public Filterable, virtual public Element
41 {
42 };
43 
44 class KeyFactory
45 {
46 public:
47 
48  virtual ~KeyFactory() = default;
49  virtual std::shared_ptr<Key> get(long long int) const = 0;
50  virtual std::shared_ptr<Key> decode(const std::shared_ptr<Ice::Communicator>&, const std::vector<unsigned char>&) = 0;
51 };
52 
53 class Tag : virtual public Element
54 {
55 };
56 
57 class TagFactory
58 {
59 public:
60 
61  virtual ~TagFactory() = default;
62  virtual std::shared_ptr<Tag> get(long long int) const = 0;
63  virtual std::shared_ptr<Tag> decode(const std::shared_ptr<Ice::Communicator>&, const std::vector<unsigned char>&) = 0;
64 };
65 
66 class Sample : public Filterable
67 {
68 public:
69 
70  Sample(const std::string& session,
71  const std::string& origin,
72  long long int id,
73  DataStorm::SampleEvent event,
74  const std::shared_ptr<Key>& key,
75  const std::shared_ptr<Tag>& tag,
76  std::vector<unsigned char> value,
77  long long int timestamp) :
78  session(session), origin(origin), id(id), event(event), key(key), tag(tag),
79  timestamp(std::chrono::microseconds(timestamp)),
80  _encodedValue(std::move(value))
81  {
82  }
83 
84  Sample(DataStorm::SampleEvent event, const std::shared_ptr<Tag>& tag = nullptr) : event(event), tag(tag)
85  {
86  }
87 
88  virtual bool hasValue() const = 0;
89  virtual void setValue(const std::shared_ptr<Sample>&) = 0;
90 
91  virtual void decode(const std::shared_ptr<Ice::Communicator>&) = 0;
92  virtual const std::vector<unsigned char>& encode(const std::shared_ptr<Ice::Communicator>&) = 0;
93  virtual std::vector<unsigned char> encodeValue(const std::shared_ptr<Ice::Communicator>&) = 0;
94 
95  const std::vector<unsigned char>& getEncodedValue() const
96  {
97  return _encodedValue;
98  }
99 
100  std::string session;
101  std::string origin;
102  long long int id;
103  DataStorm::SampleEvent event;
104  std::shared_ptr<Key> key;
105  std::shared_ptr<Tag> tag;
106  std::chrono::time_point<std::chrono::system_clock> timestamp;
107 
108 protected:
109 
110  std::vector<unsigned char> _encodedValue;
111 };
112 
113 class SampleFactory
114 {
115 public:
116 
117  virtual ~SampleFactory() = default;
118 
119  virtual std::shared_ptr<Sample> create(const std::string&,
120  const std::string&,
121  long long int,
122  DataStorm::SampleEvent,
123  const std::shared_ptr<Key>&,
124  const std::shared_ptr<Tag>&,
125  std::vector<unsigned char>,
126  long long int) = 0;
127 };
128 
129 class Filter : virtual public Element
130 {
131 public:
132 
133  virtual bool match(const std::shared_ptr<Filterable>&) const = 0;
134  virtual const std::string& getName() const = 0;
135 };
136 
137 class FilterFactory
138 {
139 public:
140 
141  virtual ~FilterFactory() = default;
142 
143  virtual std::shared_ptr<Filter> get(long long int) const = 0;
144  virtual std::shared_ptr<Filter> decode(const std::shared_ptr<Ice::Communicator>&, const std::vector<unsigned char>&) = 0;
145 };
146 
147 class FilterManager
148 {
149 public:
150 
151  virtual ~FilterManager() = default;
152 
153  virtual std::shared_ptr<Filter> get(const std::string&, long long int) const = 0;
154 
155  virtual std::shared_ptr<Filter>
156  decode(const std::shared_ptr<Ice::Communicator>&, const std::string&, const std::vector<unsigned char>&) = 0;
157 };
158 
159 class DataElement
160 {
161 public:
162 
163  virtual ~DataElement() = default;
164 
165  using Id = std::tuple<std::string, long long int, long long int>;
166 
167  virtual std::vector<std::string> getConnectedElements() const = 0;
168  virtual std::vector<std::shared_ptr<Key>> getConnectedKeys() const = 0;
169  virtual void onConnectedKeys(std::function<void(std::vector<std::shared_ptr<Key>>)>,
170  std::function<void(DataStorm::CallbackReason, std::shared_ptr<Key>)>) = 0;
171  virtual void onConnectedElements(std::function<void(std::vector<std::string>)>,
172  std::function<void(DataStorm::CallbackReason, std::string)>) = 0;
173 
174  virtual void destroy() = 0;
175  virtual std::shared_ptr<Ice::Communicator> getCommunicator() const = 0;
176 };
177 
178 class DataReader : virtual public DataElement
179 {
180 public:
181 
182  virtual bool hasWriters() = 0;
183  virtual void waitForWriters(int) = 0;
184  virtual int getInstanceCount() const = 0;
185 
186  virtual std::vector<std::shared_ptr<Sample>> getAllUnread() = 0;
187  virtual void waitForUnread(unsigned int) const = 0;
188  virtual bool hasUnread() const = 0;
189  virtual std::shared_ptr<Sample> getNextUnread() = 0;
190 
191  virtual void onSamples(std::function<void(const std::vector<std::shared_ptr<Sample>>&)>,
192  std::function<void(const std::shared_ptr<Sample>&)>) = 0;
193 };
194 
195 class DataWriter : virtual public DataElement
196 {
197 public:
198 
199  virtual bool hasReaders() const = 0;
200  virtual void waitForReaders(int) const = 0;
201 
202  virtual std::shared_ptr<Sample> getLast() const = 0;
203  virtual std::vector<std::shared_ptr<Sample>> getAll() const = 0;
204 
205  virtual void publish(const std::shared_ptr<Key>&, const std::shared_ptr<Sample>&) = 0;
206 };
207 
208 class Topic
209 {
210 public:
211 
212  virtual ~Topic() = default;
213 
214  using Updater = std::function<void(const std::shared_ptr<Sample>&,
215  const std::shared_ptr<Sample>&,
216  const std::shared_ptr<Ice::Communicator>&)>;
217 
218  virtual void setUpdater(const std::shared_ptr<Tag>&, Updater) = 0;
219 
220  virtual void setUpdaters(std::map<std::shared_ptr<Tag>, Updater>) = 0;
221  virtual std::map<std::shared_ptr<Tag>, Updater> getUpdaters() const = 0;
222 
223  virtual std::string getName() const = 0;
224  virtual void destroy() = 0;
225 };
226 
227 class TopicReader : virtual public Topic
228 {
229 public:
230 
231  virtual std::shared_ptr<DataReader> createFiltered(const std::shared_ptr<Filter>&,
232  const std::string&,
234  const std::string& = std::string(),
235  std::vector<unsigned char> = {}) = 0;
236 
237  virtual std::shared_ptr<DataReader> create(const std::vector<std::shared_ptr<Key>>&,
238  const std::string&,
240  const std::string& = std::string(),
241  std::vector<unsigned char> = {}) = 0;
242 
243  virtual void setDefaultConfig(DataStorm::ReaderConfig) = 0;
244  virtual bool hasWriters() const = 0;
245  virtual void waitForWriters(int) const = 0;
246 };
247 
248 class TopicWriter : virtual public Topic
249 {
250 public:
251 
252  virtual std::shared_ptr<DataWriter> create(const std::vector<std::shared_ptr<Key>>&,
253  const std::string&,
255 
256  virtual void setDefaultConfig(DataStorm::WriterConfig) = 0;
257  virtual bool hasReaders() const = 0;
258  virtual void waitForReaders(int) const = 0;
259 };
260 
261 class TopicFactory
262 {
263 public:
264 
265  virtual ~TopicFactory() = default;
266 
267  virtual std::shared_ptr<TopicReader> createTopicReader(const std::string&,
268  const std::shared_ptr<KeyFactory>&,
269  const std::shared_ptr<TagFactory>&,
270  const std::shared_ptr<SampleFactory>&,
271  const std::shared_ptr<FilterManager>&,
272  const std::shared_ptr<FilterManager>&) = 0;
273 
274  virtual std::shared_ptr<TopicWriter> createTopicWriter(const std::string&,
275  const std::shared_ptr<KeyFactory>&,
276  const std::shared_ptr<TagFactory>&,
277  const std::shared_ptr<SampleFactory>&,
278  const std::shared_ptr<FilterManager>&,
279  const std::shared_ptr<FilterManager>&) = 0;
280 
281  virtual std::shared_ptr<Ice::Communicator> getCommunicator() const = 0;
282 };
283 
284 }
The WriterConfig class specifies configuration options specific to writers.
Definition: Types.h:161
CallbackReason
The callback action enumurator specifies the reason why a callback is called.
Definition: Types.h:195
The ReaderConfig class specifies configuration options specific to readers.
Definition: Types.h:122
Definition: InternalI.h:18