Asynchronous Method Dispatch (AMD) in C++11

The number of simultaneous synchronous requests a server is capable of supporting is determined by the number of threads in the server's thread pool. If all of the threads are busy dispatching long-running operations, then no threads are available to process new requests and therefore clients may experience an unacceptable lack of responsiveness.

Asynchronous Method Dispatch (AMD), the server-side equivalent of AMI, addresses this scalability issue. Using AMD, a server can receive a request but then suspend its processing in order to release the dispatch thread as soon as possible. When processing resumes and the results are available, the server sends a response explicitly using a callback object provided by the Ice run time.

AMD is transparent to the client, that is, there is no way for a client to distinguish a request that, in the server, is processed synchronously from a request that is processed asynchronously.

In practical terms, an AMD operation typically queues the request data (i.e., the callback object and operation arguments) for later processing by an application thread (or thread pool). In this way, the server minimizes the use of dispatch threads and becomes capable of efficiently supporting thousands of simultaneous clients.

An alternate use case for AMD is an operation that requires further processing after completing the client's request. In order to minimize the client's delay, the operation returns the results while still in the dispatch thread, and then continues using the dispatch thread for additional work.

On this page:

Enabling AMD with Metadata in C++

To enable asynchronous dispatch, you must add an ["amd"] metadata directive to your Slice definitions. The directive applies at the interface and the operation level. If you specify ["amd"] at the interface level, all operations in that interface use asynchronous dispatch; if you specify ["amd"] for an individual operation, only that operation uses asynchronous dispatch. In either case, the metadata directive replaces synchronous dispatch, that is, a particular operation implementation must use synchronous or asynchronous dispatch and cannot use both.

Consider the following Slice definitions:

Slice
["amd"] interface I
{
    bool isValid();
    float computeRate();
}

interface J
{
    ["amd"] void startProcess();
    int endProcess();
}

In this example, both operations of interface I use asynchronous dispatch, whereas, for interface J, startProcess uses asynchronous dispatch and endProcess uses synchronous dispatch.

Specifying metadata at the operation level (rather than at the interface or class level) minimizes the amount of generated code and, more importantly, minimizes complexity: although the asynchronous model is more flexible, it is also more complicated to use. It is therefore in your best interest to limit the use of the asynchronous model to those operations that need it, while using the simpler synchronous model for the rest.

AMD Mapping in C++

The ["amd"] metadata changes the name of the dispatch pure virtual function to <operation-name>Async. This dispatch function returns void and accepts the operation's in-parameters by value, followed by two callback parameters provided by the Ice run time.

For example, suppose we have defined the following operation:

Slice
interface Example
{
    ["amd"] string op(short s, out long l);
}

The dispatch method for asynchronous invocation of operation foo is generated as follows:

C++
void opAsync(short, std::function<void(const std::string&, long long)>, std::function<void(std::exception_ptr)>, 
              const Ice::Current&) = 0;

The AMD Async function looks like very much the AMI Async function with callbacks, but these functions are not identical. The table below highlights their differences:


AMIAMD
In parameters

Passed by value or by const reference,
depending on the parameter type 

Passed by value
Return value and
out parameters 
Passed by value to the response callbackPassed by value or by const reference,
depending on the parameter type
Callback functions3 callbacks: response, exception and sent2 callbacks: response and exception
Last parameterIce::ContextIce::Current

AMD Exceptions in C++

There are two processing contexts in which the logical implementation of an AMD operation may need to report an exception: the dispatch thread (the thread that receives the invocation), and the response thread (the thread that sends the response).

These are not necessarily two different threads: it is legal to send the response from the dispatch thread.

Although we recommend that the exception callback be used to report all exceptions to the client, it is legal for the implementation to raise an exception instead, but only from the dispatch thread.

As you would expect, an exception raised from a response thread cannot be caught by the Ice run time; the application's run-time environment determines how such an exception is handled. Therefore, a response thread must ensure that it traps all exceptions and sends the appropriate response using the exception callback. Otherwise, if a response thread is terminated by an uncaught exception, the request may never be completed and the client might wait indefinitely for a response.

Whether raised in a dispatch thread or reported via the exception callback, local exceptions may undergo translation.

Chaining AMI and AMD Invocations in C++

Since the asynchronous proxy API and the asynchronous dispatch API are similar, it is possible to implement an asynchronous dispatch by sending an asynchronous request to a proxy.

Continuing our example from the previous section, suppose our servant also holds a proxy to another object of the same type:

C++
class ExampleI : public Example
{
public:
    ExampleI(std::shared_ptr<ExamplePrx>&& o) : _other(o)
    {
    }

    virtual void opAsync(short s, std::function<void(const std::string&, long long)> response, std::function<void(std::exception_ptr)> error, 
                         const Ice::Current&) override
    {
        // Ice-supplied AMD response and exception callbacks are passed as AMI callbacks
        _other->opAsync(s, response, error);
    }

private:
    const std::shared_ptr<ExamplePrx> _other;
};

Oneway Proxy

If your AMD implementation uses a oneway proxy, remember that the AMI response callback is not called: you need to call the AMD response from the AMI sent callback.


AMD Example in C++

To demonstrate the use of AMD in Ice, let us define the Slice interface for a simple computational engine:

Slice
module Demo
{
    sequence<float> Row;
    sequence<Row> Grid;

    exception RangeError {}

    interface Model
    {
        ["amd"] Grid interpolate(Grid data, float factor)
            throws RangeError;
    }
}

Given a two-dimensional grid of floating point values and a factor, the interpolate operation returns a new grid of the same size with the values interpolated in some interesting (but unspecified) way.

Our servant class derives from Demo::Model and supplies a definition for the interpolateAsync method:

C++
class ModelI : public Demo::Model
{
public:
    virtual void interpolateAsync(Demo::Grid, float,
                                  std::function<void(const Demo::Grid&)>,
                                  std::function<void(std::exception_ptr)>,
                                  const Ice::Current&);
 
private:
    std::deque<Job> _jobs;
    std::mutex _mutex;
};

The implementation of interpolateAsync uses synchronization to safely record the callback functions and arguments in a Job that is added to a queue:

C++
void 
ModelI::interpolateAsync(Demo::Grid data, float factor,
                         std::function<void(const Demo::Grid&)> response,
                         std::function<void(std::exception_ptr)> exception,
                         const Ice::Current&)
{
    std::lock_guard<std::mutex> lock(_mutex);
    _jobs.emplace_back(std::move(data), factor, std::move(response), std::move(exception));
}

After queuing the information, the operation returns control to the Ice run time, making the dispatch thread available to process another request. An application thread removes the next Job from the queue and invokes execute to perform the interpolation. Job is defined as follows:

C++
class Job
{
public:
    
    Job(Demo::Grid&&, float,
        std::function<void(const Demo::Grid&)>&&,
        std::function<void(std::exception_ptr)>&&);
   
    void execute();
 
private:
   
    void interpolateGrid(); // can throw RangeError

    Demo::Grid _data;
    const float _factor;
    const std::function<void(const Demo::Grid&)> _response;
    const std::function<void(std::exception_ptr)> _exception;
};

The implementation of execute uses interpolateGrid (not shown) to perform the computational work:

C++
void Job::execute()
{
    try
    {
        interpolateGrid();
        _response(_data);
    }
    catch(...)
    {
        _exception(std::current_exception());
    }
}

If interpolateGrid throws an exception such as range error, we capture this exception and pass it to the _exception callback. If the interpolation was successful, _response is called to send the modified grid back to the client.

See Also