Asynchronous Method Dispatch (AMD) in C++11
The number of simultaneous synchronous requests a server is capable of supporting is determined by the number of threads in the server's thread pool. If all of the threads are busy dispatching long-running operations, then no threads are available to process new requests and therefore clients may experience an unacceptable lack of responsiveness.
Asynchronous Method Dispatch (AMD), the server-side equivalent of AMI, addresses this scalability issue. Using AMD, a server can receive a request but then suspend its processing in order to release the dispatch thread as soon as possible. When processing resumes and the results are available, the server sends a response explicitly using a callback object provided by the Ice run time.
AMD is transparent to the client, that is, there is no way for a client to distinguish a request that, in the server, is processed synchronously from a request that is processed asynchronously.
In practical terms, an AMD operation typically queues the request data (i.e., the callback object and operation arguments) for later processing by an application thread (or thread pool). In this way, the server minimizes the use of dispatch threads and becomes capable of efficiently supporting thousands of simultaneous clients.
An alternate use case for AMD is an operation that requires further processing after completing the client's request. In order to minimize the client's delay, the operation returns the results while still in the dispatch thread, and then continues using the dispatch thread for additional work.
On this page:
Enabling AMD with Metadata in C++
To enable asynchronous dispatch, you must add an ["amd"]
metadata directive to your Slice definitions. The directive applies at the interface and the operation level. If you specify ["amd"]
at the interface level, all operations in that interface use asynchronous dispatch; if you specify ["amd"]
for an individual operation, only that operation uses asynchronous dispatch. In either case, the metadata directive replaces synchronous dispatch, that is, a particular operation implementation must use synchronous or asynchronous dispatch and cannot use both.
Consider the following Slice definitions:
["amd"] interface I { bool isValid(); float computeRate(); } interface J { ["amd"] void startProcess(); int endProcess(); }
In this example, both operations of interface I
use asynchronous dispatch, whereas, for interface J
, startProcess
uses asynchronous dispatch and endProcess
uses synchronous dispatch.
Specifying metadata at the operation level (rather than at the interface or class level) minimizes the amount of generated code and, more importantly, minimizes complexity: although the asynchronous model is more flexible, it is also more complicated to use. It is therefore in your best interest to limit the use of the asynchronous model to those operations that need it, while using the simpler synchronous model for the rest.
AMD Mapping in C++
The ["amd"
] metadata changes the name of the dispatch pure virtual function to <operation-name>Async
.
This dispatch function returns void
and accepts the operation's in-parameters by value, followed by two callback parameters provided by the Ice run time.
For example, suppose we have defined the following operation:
interface Example { ["amd"] string op(short s, out long l); }
The dispatch method for asynchronous invocation of operation foo
is generated as follows:
void opAsync(short, std::function<void(const std::string&, long long)>, std::function<void(std::exception_ptr)>, const Ice::Current&) = 0;
The AMD Async
function looks like very much the AMI Async
function with callbacks, but these functions are not identical. The table below highlights their differences:
AMI | AMD | |
---|---|---|
In parameters | Passed by value or by const reference, | Passed by value |
Return value and out parameters | Passed by value to the response callback | Passed by value or by const reference, depending on the parameter type |
Callback functions | 3 callbacks: response, exception and sent | 2 callbacks: response and exception |
Last parameter | Ice::Context | Ice::Current |
AMD Exceptions in C++
There are two processing contexts in which the logical implementation of an AMD operation may need to report an exception: the dispatch thread (the thread that receives the invocation), and the response thread (the thread that sends the response).
These are not necessarily two different threads: it is legal to send the response from the dispatch thread.
Although we recommend that the exception callback be used to report all exceptions to the client, it is legal for the implementation to raise an exception instead, but only from the dispatch thread.
As you would expect, an exception raised from a response thread cannot be caught by the Ice run time; the application's run-time environment determines how such an exception is handled. Therefore, a response thread must ensure that it traps all exceptions and sends the appropriate response using the exception callback. Otherwise, if a response thread is terminated by an uncaught exception, the request may never be completed and the client might wait indefinitely for a response.
Whether raised in a dispatch thread or reported via the exception callback, local exceptions may undergo translation.
Chaining AMI and AMD Invocations in C++
Since the asynchronous proxy API and the asynchronous dispatch API are similar, it is possible to implement an asynchronous dispatch by sending an asynchronous request to a proxy.
Continuing our example from the previous section, suppose our servant also holds a proxy to another object of the same type:
class ExampleI : public Example { public: ExampleI(std::shared_ptr<ExamplePrx>&& o) : _other(o) { } virtual void opAsync(short s, std::function<void(const std::string&, long long)> response, std::function<void(std::exception_ptr)> error, const Ice::Current&) override { // Ice-supplied AMD response and exception callbacks are passed as AMI callbacks _other->opAsync(s, response, error); } private: const std::shared_ptr<ExamplePrx> _other; };
Oneway Proxy
If your AMD implementation uses a oneway proxy, remember that the AMI response callback is not called: you need to call the AMD response from the AMI sent
callback.
AMD Example in C++
To demonstrate the use of AMD in Ice, let us define the Slice interface for a simple computational engine:
module Demo { sequence<float> Row; sequence<Row> Grid; exception RangeError {} interface Model { ["amd"] Grid interpolate(Grid data, float factor) throws RangeError; } }
Given a two-dimensional grid of floating point values and a factor, the interpolate
operation returns a new grid of the same size with the values interpolated in some interesting (but unspecified) way.
Our servant class derives from Demo::Model
and supplies a definition for the interpolateAsync
method:
class ModelI : public Demo::Model { public: virtual void interpolateAsync(Demo::Grid, float, std::function<void(const Demo::Grid&)>, std::function<void(std::exception_ptr)>, const Ice::Current&); private: std::deque<Job> _jobs; std::mutex _mutex; };
The implementation of interpolateAsync
uses synchronization to safely record the callback functions and arguments in a Job
that is added to a queue:
void ModelI::interpolateAsync(Demo::Grid data, float factor, std::function<void(const Demo::Grid&)> response, std::function<void(std::exception_ptr)> exception, const Ice::Current&) { std::lock_guard<std::mutex> lock(_mutex); _jobs.emplace_back(std::move(data), factor, std::move(response), std::move(exception)); }
After queuing the information, the operation returns control to the Ice run time, making the dispatch thread available to process another request. An application thread removes the next Job
from the queue and invokes execute
to perform the interpolation. Job
is defined as follows:
class Job { public: Job(Demo::Grid&&, float, std::function<void(const Demo::Grid&)>&&, std::function<void(std::exception_ptr)>&&); void execute(); private: void interpolateGrid(); // can throw RangeError Demo::Grid _data; const float _factor; const std::function<void(const Demo::Grid&)> _response; const std::function<void(std::exception_ptr)> _exception; };
The implementation of execute
uses interpolateGrid
(not shown) to perform the computational work:
void Job::execute() { try { interpolateGrid(); _response(_data); } catch(...) { _exception(std::current_exception()); } }
If interpolateGrid
throws an exception such as range error, we capture this exception and pass it to the _exception
callback. If the interpolation was successful, _response
is called to send the modified grid back to the client.