Implementing an IceStorm Subscriber
Our weather measurement subscriber implementation takes the following steps:
- Obtain a proxy for the
TopicManager
. This is the primary IceStorm object, used by both publishers and subscribers. - Create an object adapter to host our
Monitor
servant. - Instantiate the
Monitor
servant and activate it with the object adapter. - Subscribe to the
Weather
topic. - Process
report
messages until shutdown. - Unsubscribe from the
Weather
topic.
We present monitor implementations in C++ and Java below.
On this page:
Subscriber Example in C++
Our C++ monitor implementation begins by including the necessary header files. The interesting ones are IceStorm/IceStorm.h
, which is generated from the IceStorm Slice definitions, and Monitor.h
, containing the generated code for our monitor definitions:
#include <Ice/Ice.h> #include <IceStorm/IceStorm.h> #include <Monitor.h> using namespace std; class MonitorI : public Monitor { public: virtual void report(Measurement m, const Ice::Current&) override { cout << "Measurement report:" << endl << " Tower: " << m.tower << endl << " W Spd: " << m.windSpeed << endl << " W Dir: " << m.windDirection << endl << " Temp: " << m.temperature << endl << endl; } }; int main(int argc, char* argv[]) { ... auto obj = communicator->stringToProxy("IceStorm/TopicManager:tcp -p 9999"); auto topicManager = Ice::checkedCast<IceStorm::TopicManagerPrx>(obj); auto adapter = communicator->createObjectAdapter("MonitorAdapter"); auto monitor = make_shared<MonitorI>(); auto proxy = adapter->addWithUUID(monitor)->ice_oneway(); adapter->activate(); shared_ptr<IceStorm::TopicPrx> topic; try { topic = topicManager->retrieve("Weather"); IceStorm::QoS qos; topic->subscribeAndGetPublisher(qos, proxy); } catch(const IceStorm::NoSuchTopic&) { // Error! No topic found! ... } communicator->waitForShutdown(); topic->unsubscribe(proxy); ... }
#include <Ice/Ice.h> #include <IceStorm/IceStorm.h> #include <Monitor.h> using namespace std; class MonitorI : public Monitor { public: virtual void report(const Measurement& m, const Ice::Current&) { cout << "Measurement report:" << endl << " Tower: " << m.tower << endl << " W Spd: " << m.windSpeed << endl << " W Dir: " << m.windDirection << endl << " Temp: " << m.temperature << endl << endl; } }; int main(int argc, char* argv[]) { ... Ice::ObjectPrx obj = communicator->stringToProxy("IceStorm/TopicManager:tcp -p 9999"); IceStorm::TopicManagerPrx topicManager = IceStorm::TopicManagerPrx::checkedCast(obj); Ice::ObjectAdapterPtr adapter = communicator->createObjectAdapter("MonitorAdapter"); MonitorPtr monitor = new MonitorI; Ice::ObjectPrx proxy = adapter->addWithUUID(monitor)->ice_oneway(); adapter->activate(); IceStorm::TopicPrx topic; try { topic = topicManager->retrieve("Weather"); IceStorm::QoS qos; topic->subscribeAndGetPublisher(qos, proxy); } catch(const IceStorm::NoSuchTopic&) { // Error! No topic found! ... } communicator->waitForShutdown(); topic->unsubscribe(proxy); ... }
Our implementation of the Monitor
servant is currently quite simple. A real implementation might update a graphical display, or incorporate the measurements into an ongoing calculation.
class MonitorI : public Monitor { public: virtual void report(Measurement m, const Ice::Current&) override { cout << "Measurement report:" << endl << " Tower: " << m.tower << endl << " W Spd: " << m.windSpeed << endl << " W Dir: " << m.windDirection << endl << " Temp: " << m.temperature << endl << endl; } };
class MonitorI : public Monitor { public: virtual void report(const Measurement& m, const Ice::Current&) { cout << "Measurement report:" << endl << " Tower: " << m.tower << endl << " W Spd: " << m.windSpeed << endl << " W Dir: " << m.windDirection << endl << " Temp: " << m.temperature << endl << endl; } };
After obtaining a proxy for the topic manager, the program creates an object adapter, instantiates the Monitor
servant and activates it:
auto adapter = communicator->createObjectAdapter("MonitorAdapter"); auto monitor = make_shared<MonitorI>(); auto proxy = adapter->addWithUUID(monitor)->ice_oneway(); adapter->activate();
Ice::ObjectAdapterPtr adapter = communicator->createObjectAdapter("MonitorAdapter"); MonitorPtr monitor = new MonitorI; Ice::ObjectPrx proxy = adapter->addWithUUID(monitor)->ice_oneway(); adapter->activate();
Note that the code creates a oneway proxy for the Monitor
servant. This is for efficiency reasons: by subscribing with a oneway proxy, IceStorm will deliver events to the subscriber via oneway messages, instead of via twoway messages. We also activate the object adapter at this time, which means the servant can now begin receiving invocations.
Next, the monitor subscribes to the topic:
shared_ptr<IceStorm::TopicPrx> topic; try { topic = topicManager->retrieve("Weather"); IceStorm::QoS qos; topic->subscribeAndGetPublisher(qos, proxy); } catch(const IceStorm::NoSuchTopic&) { // Error! No topic found! ... }
IceStorm::TopicPrx topic; try { topic = topicManager->retrieve("Weather"); IceStorm::QoS qos; topic->subscribeAndGetPublisher(qos, proxy); } catch(const IceStorm::NoSuchTopic&) { // Error! No topic found! ... }
Finally, the monitor blocks until the communicator is shutdown. After waitForShutdown
returns, the monitor cleans up by unsubscribing from the topic:
adapter->activate(); communicator->waitForShutdown(); topic->unsubscribe(proxy);
Subscriber Example in Java
The Java implementation of the monitor is shown below:
class MonitorI implements Monitor { @Override public void report(Measurement m, com.zeroc.Ice.Current curr) { System.out.println( "Measurement report:\n" + " Tower: " + m.tower + "\n" + " W Spd: " + m.windSpeed + "\n" + " W Dir: " + m.windDirection + "\n" + " Temp: " + m.temperature + "\n"); } } public static void main(String[] args) { ... com.zeroc.Ice.ObjectPrx obj = communicator.stringToProxy("IceStorm/TopicManager:tcp -p 9999"); com.zeroc.IceStorm.TopicManagerPrx topicManager = com.zeroc.IceStorm.TopicManagerPrx.checkedCast(obj); com.zeroc.Ice.ObjectAdapterPtr adapter = communicator.createObjectAdapter("MonitorAdapter"); Monitor monitor = new MonitorI(); com.zeroc.Ice.ObjectPrx proxy = adapter.addWithUUID(monitor).ice_oneway(); adapter.activate(); com.zeroc.IceStorm.TopicPrx topic = null; try { topic = topicManager.retrieve("Weather"); java.util.Map<String, String> qos = null; topic.subscribeAndGetPublisher(qos, proxy); } catch(com.zeroc.IceStorm.NoSuchTopic ex) { // Error! No topic found! ... } communicator.waitForShutdown(); topic.unsubscribe(proxy); ... }
Our implementation of the Monitor
servant is currently quite simple. A real implementation might update a graphical display, or incorporate the measurements into an ongoing calculation.
class MonitorI implements Monitor { public void report(Measurement m, com.zeroc.Ice.Current curr) { System.out.println( "Measurement report:\n" + " Tower: " + m.tower + "\n" + " W Spd: " + m.windSpeed + "\n" + " W Dir: " + m.windDirection + "\n" + " Temp: " + m.temperature + "\n"); } }
After obtaining a proxy for the topic manager, the program creates an object adapter, instantiates the Monitor
servant and activates it:
Monitor monitor = new MonitorI(); com.zeroc.Ice.ObjectPrx proxy = adapter.addWithUUID(monitor).ice_oneway(); adapter.activate();
Note that the code creates a oneway proxy for the Monitor
servant. This is for efficiency reasons: by subscribing with a oneway proxy, IceStorm will deliver events to the subscriber via oneway messages, instead of via twoway messages. We also activate the object adapter at this time, which means the servant can now begin receiving invocations.
Next, the monitor subscribes to the topic:
com.zeroc.IceStorm.TopicPrx topic = null; try { topic = topicManager.retrieve("Weather"); java.util.Map<String, String> qos = null; topic.subscribeAndGetPublisher(qos, proxy); } catch(com.zeroc.IceStorm.NoSuchTopic ex) { // Error! No topic found! ... }
Finally, the monitor blocks until the communicator is shutdown. After waitForShutdown
returns, the monitor cleans up by unsubscribing from the topic:
communicator.waitForShutdown(); topic.unsubscribe(proxy);