Reader

Readers are used to receive samples published by writers. Four reader types are supported:

  • single-key reader: a single key reader subscribes for receiving samples for a single data element identified by the key provided on creation of the reader
  • multi-key reader: a multi-key reader subscribes for receiving samples for multiple data elements identified by a set of keys provided on creation of the reader
  • any-key reader: an any-key reader subscribes for receiving samples for any data element. No keys are provided when creating the reader
  • filtered-key reader: a filtered-key reader subscribes for receiving samples for multiple data elements whose key matches a filter provided on creation of the reader

A reader is created using a topic plus optionally a configuration structure. Each reader keeps the history of the unread received samples. As soon as a sample has been read, it's removed from the history. The application can be notified when writers connect to or disconnect from a reader by registering listener functions with this reader. 

A name can also be assigned to the reader on creation. Writers will use this name for the listener notification.

An optional sample filter name and criteria value can also be provided when creating a reader. Sample filtering can be specified for all the reader types.

On this page:

Creation

Single-key reader

A single-key reader enables the subscriber to receive samples for a single key. The key is provided when the reader is created. A single-key reader object is created by calling one of the SingleKeyReader constructors or by using one of the makeSingleKeyReader factory functions:

C++
template<typename Key, typename Value, typename UpdateTag=std::string>
class SingleKeyReader : public Reader<Key, Value, UpdateTag>
{
public:
    SingleKeyReader(const Topic<Key, Value, UpdateTag>& topic, const Key& key, const string& name = string(), const ReaderConfig& config = ReaderConfig());
    template<typename SampleFilterCriteria> SingleKeyReader(const Topic<Key, Value, UpdateTag>& topic, const Key& key, const Filter<SampleFilterCriteria>& sampleFilter, const string& name = string(), const ReaderConfig& config = ReaderConfig());
};

template<typename K, typename V, typename UT> KeyReader<K, V, UT>
makeSingleKeyReader(const Topic<K, V, UT>& topic, const typename Topic<K, V, UT>::KeyType& key, const string& name = string(), const ReaderConfig& config = ReaderConfig());

template<typename SFC, typename K, typename V, typename UT> KeyReader<K, V, UT>
makeSingleKeyReader(const Topic<K, V, UT>& topic, const typename Topic<K, V, UT>::KeyType& key, const string& name = string(), const Filter<SampleFilterCriteria>& sampleFilter, const ReaderConfig& config = ReaderConfig());

The first constructor accepts a topic parameter, a key and an optional name and reader configuration structure. The second constructor accepts the same parameters and also accept a sample filter parameter. Refer to the sample filtering section below for additional information on sample filters.

The makeSingleKeyReader functions accepts the same parameters. The key, value and update tag template parameters (K, V, UT) don't need to be provided when using this function, they are automatically deduced by the compiler from the topic parameter.

For example, to create a reader to read temperatures for the room "kitchen":

C++
DataStorm::Topic<string, float> temperatures = ...;
SingleKeyReader<string, float> reader1(temperatures, "kitchen"); // using the constructor
auto reader2 = makeSingleKeyReader(temperatures, "kitchen");
auto reader3 = make_shared<SingleKeyReader<string, float>>(temperatures, "kitchen"); // on the heap, using the constructor

When a single-key reader is created, DataStorm notifies connected peers. If these peers have writers matching the reader's key, the writers connect to the reader in order to publish samples to it.

Multi-key reader

A multi-key reader enables the subscriber to receive samples for multiple keys. The keys are provided when the reader is created. A multi-key reader object is created by calling one of the MultiKeyReader constructors or by using one of the makeMultiKeyReader factory functions:

C++
template<typename Key, typename Value, typename UpdateTag=std::string>
class MultiKeyReader : public Reader<Key, Value, UpdateTag>
{
public:
    MultiKeyReader(const Topic<Key, Value, UpdateTag>& topic, const std::vector<Key>& keys, const string& name = string(), const ReaderConfig& config = ReaderConfig());
    template<typename SampleFilterCriteria> MultiKeyReader(const Topic<Key, Value, UpdateTag>& topic, const std::vector<Key>& keys, const Filter<SampleFilterCriteria>& sampleFilter, const string& name = string(), const ReaderConfig& config = ReaderConfig());
};

template<typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
makeMultiKeyReader(const Topic<K, V, UT>& topic, const std::vector<typename Topic<K, V, UT>::KeyType>& keys, const string& name = string(), const ReaderConfig& config = ReaderConfig())

template<typename SFC, typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
makeMultiKeyReader(const Topic<K, V, UT>& topic, const std::vector<typename Topic<K, V, UT>::KeyType>& keys, const Filter<SampleFilterCriteria>& sampleFilter, const string& name = string(), const ReaderConfig& config = ReaderConfig())

The first constructor accepts a topic parameter, a collection of keys and an optional name andreader configuration structure. The second constructor accepts the same parameters and also accept a sample filter parameter. Refer to the sample filtering section below for additional information on sample filters.

The makeMultiKeyReader functions accepts the same parameters. The key, value and update tag template parameters (K, V, UT) don't need to be provided when using this function, they are automatically deduced by the compiler from the topic parameter.

For example, to create a reader to receive temperatures for the room "kitchen" and "living room":

C++
DataStorm::Topic<string, float> temperatures = ...;
MultiKeyReader<string, float> reader1(temperatures, {"kitchen", "living room"}); // using the constructor
auto reader2 = makeMultiKeyReader(temperatures, {"kitchen", "living room"}); // using the factory function
auto reader = make_shared<MultiKeyReader<string, float>>>(temperatures, vector<string> {"kitchen", "living room"}); // on the heap, using the constructor

When a multi-key reader is created, DataStorm notifies connected peers. If these peers have writers matching at least one of the reader's keys, the writers connect to the reader in order to sent samples to it.

Any-key reader

An any-key reader enables the subscriber to receive samples for any keys. Contrary to single-key or multi-key readers, no keys are provided when the reader is created. An any-key reader object is created by calling one of the MultiKeyReader constructors described above. An empty key vector is provided for the keys parameter. Two makeAnyKeyReader factory functions are also provided:

C++
template<typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
makeAnyKeyReader(const Topic<K, V, UT>& topic, const string& name = string(), const ReaderConfig& config = ReaderConfig())

template<typename SFC, typename K, typename V, typename UT> MultiKeyReader<K, V, UT>
makeAnyKeyReader(const Topic<K, V, UT>& topic, const Filter<SampleFilterCriteria>& sampleFilter, const string& name = string(), const ReaderConfig& config = ReaderConfig())

The makeAnyKeyReader functions accept the same parameters as the MultiKeyReader constructors except for the keys parameter which is missing. The key, value and update tag template parameters (K, V, UT) don't need to be provided when using this function, they are automatically deduced by the compiler from the topic parameter.

When an any-key reader is created, DataStorm notifies connected peers. Since an any-key reader might receive samples for any keys, all the writers will connect to the any-key reader to send samples to it.

Filtered-key reader

A filtered-key reader enables the subscriber to receive samples for a set of keys matching a given filter criteria value. To create a filtered-key reader, the subscriber must provide a key filter name and criteria value. The key filter name must match the name used to register the key filter function with the topic on the subscriber and publisher side. A filtered-key reader object is created by calling one of the FilteredKeyReader constructor or by using one of the makeFilteredKeyReader factory functions:

C++
template<typename Key, typename Value, typename UpdateTag=std::string>
class FilteredKeyReader : public Reader<Key, Value, UpdateTag>
{
public:
    template<typename KeyFilterCriteria> FilteredKeyReader(const Topic<Key, Value, UpdateTag>& topic, const Filter<KeyFilterCriteria>& keyFilter, const string& name = string(), const ReaderConfig& config = ReaderConfig());
    template<typename KeyFilterCriteria, typename SampleFilterCriteria>
    FilteredKeyReader(const Topic<Key, Value, UpdateTag>& topic, const Filter<KeyFilterCriteria>& keyFilter, const Filter<SampleFilterCriteria>& sampleFilter, const string& name = string(), const ReaderConfig& config = ReaderConfig());
};

template<typename KFC, typename K, typename V, typename UT> FilteredReader<K, V, UT>
makeFilteredReader(const Topic<K, V, UT>& topic, const Filter<KFC>&, const string& name = string(), const ReaderConfig& config = ReaderConfig())

template<typename KFC, typename SFC, typename K, typename V, typename UT> FilteredReader<K, V, UT>
makeFilteredReader(const Topic<K, V, UT>& topic, const Filter<KFC>&, const Filter<SFC>&, const string& name = string(), const ReaderConfig& config = ReaderConfig())

The first constructor accepts a topic parameter, the filter structure and an optional reader configuration structure. The second constructor accepts the same parameters and also accept a sample filter parameter. Refer to the sample filtering section below for additional information on sample filters.

The makeFilteredKeyReader functions accept the same parameters. The key, value and update tag template parameters (K, V, UT) don't need to be provided when using this function, they are automatically deduced by the compiler from the topic parameter.

The Filter structure definition is shown below:

C++
template<typename T> struct Filter
{
    template<typename TT> Filter(const std::string& name, TT&& criteria);
    std::string name;
    T criteria;
};

The key filter criteria type (KFC) needs to match exactly the type specified to register the key filter function with the topic.

For example, to create a reader to receive temperatures for rooms starting with the "house-" string:

C++
DataStorm::Topic<string, float> temperatures = ...;
DataStorm::FilteredReader<string, float> reader1(temperatures, Filter<string>("startswith", "house-")); // using the constructor
auto reader2 = makeFilteredReader(temperatures, Filter<string>("startswith", "house-")); // using the factory function
auto reader3 = make_shared<DataStorm::FilteredReader<string, float>>(temperatures, Filter<string>("startswith", "house-")); // on the heap, using the constructor

When a filtered-key reader is created, DataStorm notifies connected peers. Writers evaluate the key filter using the criteria value and connect to the filtered-key reader if the key matches.

Receiving samples

To retrieve received samples, the Reader base-class provides several methods:

  • getAllUnread
  • waitForUnread
  • hasUnread
  • getNextUnread

An application can use these methods as follow to receive samples:

C++
auto single = makeSingleKeyReader(temperatures, "kitchen");
auto sample = single.getNextUnread();

The reader maintains a queue of unread samples. When unread samples are obtained with getAllUnread or getNextUnread, they are removed from the queue. The waitForUnread and getNextUnread methods will block until a new unread sample is available. If the node is shutdown, they will raise NodeShutdownException.

The application can also register a callback to be notified when samples are received using the onSamples method. If unread samples are queued with the reader the callback will be called immediately with these samples.

Sample filtering

Sample filtering can be enabled on reader creation by providing a filter structure.

Even though the sample filtering is specified on the reader, the sample filtering is performed on the writer side. Sample filtering can be used to minimize the number of samples sent to readers and reduce network traffic. Sample filter functions are therefore registered on the publisher's topic.

The sample filter name must match one of the sample filter registered on the publisher's topic. The criteria value type must also match the type used to register the sample filter. If the types aren't the same, the writer won't be able to unmarshal the criteria value. The filter criteria type is specified with the Filter template parameter SFC.

For example, to receive only temperatures inferior to 30º, a subscriber could create the following reader:

C++
DataStorm::Topic<string, float> temperatures = ...;
auto reader = makeSingleKeyReader(temperatures, "kitchen", Filter<float>("inferior", 30.0f));

In this example, "inferior" is the sample filter name and 30.0f is the sample filter criteria value. The "inferior" sample filter must be registered with the publisher's topic. The filter criteria type is specified using the the Filter template parameter, it's a float in the example above.

You can also combine key and sample filtering by using a filtered-key reader:

C++
auto reader = makeFilteredReader(temperatures, Filter<string>("_regex", ".*sleepingroom.*"), Filter<float>("inferior", 0.0f));

This reader will only receive temperatures from sleeping rooms when they are below 0º.

Configuration

A reader supports several configuration parameters:

C++
class Config
{
    Ice::optional<int> sampleCount;
    Ice::optional<int> sampleLifetime;
    Ice::optional<ClearHistoryPolicy> clearHistory;
};
class ReaderConfig : public Config
{
    Ice::optional<DiscardPolicy> discardPolicy;
};

The parameters are specified as optional values. You can only configure the parameters you need. If a parameter is not set, the default value will be used.

Configuration parameters for readers can be specified and overriden at several levels:

  • with configuration properties prefixed with DataStorm.Topic (e.g.: DataStorm.Topic.SampleCount).
  • with configuration properties prefixed with DataStorm.Topic.<topic name> (e.g.: DataStorm.Topic.temperatures.ClearHistory).
  • the topic's reader configuration structure set with the setReaderDefaultConfig method of the DataStorm::Topic class.
  • when the writer is created using the last parameter of the reader make factory function presented above.

A parameter set at a lower level (configuration provided to the reader make factory function) overrides the one set at a higher level (e.g.: configuration properties).

Sample count

The sample count configuration specifies the number of samples that will be kept in the queue of unread samples. When the reader queue is full, the oldest samples are removed to make room for the newly received samples.

Sample lifetime

The sample lifetime configuration specifies how long a sample will be kept in the unread queue. Samples older than the sample lifetime are removed from the queue.

Clear history

The history queue of unread samples can be automatically cleared when a new sample is received and depending on the clear history policy configuration:

C++
enum struct ClearHistoryPolicy
{
    OnAdd,
    OnRemove,
    OnAll,
    OnAllExceptPartialUpdate,
    Never
};

The clear history policy is based on the sample event. It can be cleared for all events or only for Add or Remove. The history can also be cleared for all events except if it's a partial update. This can be useful if you want the reader to keep all the partial updates between full updates.

Discard policy

You can set a discard policy to discard samples based on different policies configured with the following enumeration:

enum struct DiscardPolicy
{
    None,
    SendTime,
    Priority
};

The SendTime discard policy will reject samples with a timestamp that is older than the last received sample. The Priority policy will reject samples from writers configured with a lower priority than the highest priority connected writer.

Listeners

Connected keys

The reader maintains a set of connected keys. This set represents all the keys for which writers are connected. The two listener functions registered with the writer's onConnectedKeys method enable the application to monitor the changes to this set.

C++
enum struct CallbackReason
{
    Connect,
    Disconnect
};

template<typename Key, typename Value, typename UpdateTag> class Writer
{
    void onConnectedKeys(std::function<void(std::vector<Key>)> init, std::function<void(CallbackReason, Key)> update);
};

The init callback is called with the initial set of connected keys. It's called immediately after the onConnectedKeys method returns. The update callback accepts two parameters: an enumeration to specify if the callback is called following the connection or disconnection of a key and the key.

Connected writers

Writers connected with the reader can be monitored by registering two callback functions with the onConnectedWriters method:

C++
template<typename Key, typename Value, typename UpdateTag> class Writer
{
    void onConnectedWriters(std::function<void(std::vector<std::string>)> init, std::function<void(CallbackReason, std::string)> update);
};

The init callback is called with the initial set of connected readers. It's called immediately after the onConnectedWriters method returns. The update callback accepts two parameters: an enumeration to specify if the callback is called following the connection or disconnection of a writer and the name of the writer.

Writers names are specified when the writer is created. If no name is provided, a default name is computed by DataStorm. This name is guaranteed to be unique within the scope of the DataStorm node.

Samples

The onSamples method enables the application to register a callback function to be notified when samples are queued:

C++
template<typename Key, typename Value, typename UpdateTag> class Reader
{
    void onSamples(std::function<void(std::vector<Sample<Key, Value, UpdateTag>>)>);
};

Once onSamples returns, the callback is called if there are unread samples queued with the reader.

Coordination

The reader class supports methods to coordinate with readers:

  • hasWriters to check if writers are connected
  • waitWriters to wait for a given number of writers to connect
  • waitForNoWriters to wait for all the writers to disconnect

In addition, the getConnectedKeys method returns the set of keys for which writers are connected to and getConnectedWriters returns the names of the connected writers. The wait methods will raise NodeShutdownException if the node is shutdown.