Asynchronous Method Invocation (AMI) in Swift

Asynchronous Method Invocation (AMI) is the term used to describe the client-side support for the asynchronous programming model. AMI supports both oneway and twoway requests, but unlike their synchronous counterparts, AMI requests never block the calling thread. When a client issues an AMI request, the Ice run time hands the message off to the local transport buffer or, if the buffer is currently full, queues the request for later delivery. The application can then continue its activities and poll or wait for completion of the invocation, or receive a callback when the invocation completes.

AMI is transparent to the server: there is no way for the server to tell whether a client sent a request synchronously or asynchronously.

On this page:

Asynchronous API in Swift

Consider the following simple Slice definition:

Slice
module Demo
{ 
    interface Employees
    {
        string getName(int number);
    }
}

Asynchronous Proxy Methods in Swift

In addition to the synchronous proxy method, the Slice compiler generates the following asynchronous proxy method:

Swift
public extension EmployeesPrx {
    func getNameAsync(_ number: Int32,
                      context: Ice.Context? = nil,
                      sentOn: DispatchQueue? = PromiseKit.conf.Q.return,
                      sentFlags: DispatchWorkItemFlags? = nil,
                      sent: ((Bool) -> Void)? = nil) -> PromiseKit.Promise<String> {
        // ...
    }
}

As you can see, the getName Slice operation generates a getNameAsync method that sends (or queues) an invocation of getName. This method does not block the calling thread. It returns a promise (from PromiseKit) that you can use in a number of ways, including blocking to obtain the result or configuring an action to be executed when the result becomes available.

Here's an example that calls getNameAsync:

Swift
let e: EmployeesPrx = ...
let p = e.getNameAsync(99)
// Continue to do other things here...

let name = try p.wait()

Because getNameAsync does not block, the calling thread can do other things while the operation is in progress.

An asynchronous proxy method uses the same parameter mapping as for synchronous operations; the only difference is that the result (if any) is returned in a Promise. An operation that returns no values maps to an asynchronous proxy method that returns Promise<Void>. For example, consider the following operation:

Slice
interface Example
{
    double op(int inp1, string inp2, out bool outp1, out long outp2);
}

The generated code looks like this:

Swift
public extension ExamplePrx {
    func opAsync(inp1: Int32,
                 inp2: String, 
                 context: Ice.Context? = nil, 
                 sentOn: DispatchQueue? = PromiseKit.conf.Q.return,
                 sentFlags: DispatchWorkItemFlags? = nil,
                 sent: ((Bool) -> Void)? = nil) -> PromiseKit.Promise<(returnValue: Double, outp1: Bool, outp2: Int64)> {
        // ...
    }
}

Now let's call then to demonstrate one way of asynchronously executing an action when the invocation completes:

Swift
let e: ExamplePrx = ...
firstly {
    e.opAsync(5, "demo")
}.done {
    print("returnValue = \($0.returnValue)")
    print("outp1 = \($0.outp1)")
    print("outp2 = \($0.outp2)")
}.catch { error in
    // handle exception
}

Asynchronous Exception Semantics in Swift

If an invocation raises an exception, the exception can be obtained from the promise in several ways:

  • Read the error property of the promise
  • Call wait on the promise; wait throws the exception if the promise completed with an error
  • Use chaining methods such as catch to execute custom actions

The exception is always provided by the promise, even if the actual error condition for the exception was encountered during the call to the opAsync method ("on the way out"). The advantage of this behavior is that all exception handling is located with the code that handles the promise, and as a result, the Async method does not throw.

 

Polling for Completion in Swift

The asynchronous API allows you to poll for call completion, which can be useful in a variety of cases. As an example, consider the following simple interface to transfer files from client to server:

Slice
interface FileTransfer
{
    void send(int offset, ByteSeq bytes);
}

The client repeatedly calls send to send a chunk of the file, indicating at which offset in the file the chunk belongs. A naïve way to transmit a file would be along the following lines:

Swift
import Foundation
import Ice
let stream: Foundation.InputStream = ...
let bufferSize = 1024
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bufferSize)
defer {
    buffer.deallocate()
}

let ft: FileTransferPrx = ...

var offset = 0
while stream.hasBytesAvailable {
    let read = stream.read(buffer, maxLength: bufferSize)
    let data = Data(bytesNoCopy: buffer, count: read, deallocator: .none)
    try ft.send(offset: offset, bytes: data)
    offset += read
}

This works, but not very well: because the client makes synchronous calls, it writes each chunk on the wire and then waits for the server to receive the data, process it, and return a reply before writing the next chunk. This means that both client and server spend much of their time doing nothing — the client does nothing while the server processes the data, and the server does nothing while it waits for the client to send the next chunk.

Using asynchronous calls, we can improve on this considerably:

Swift
import Foundation
import Ice

let stream: Foundation.InputStream = ...
let bufferSize = 1024
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bufferSize)
defer {
    buffer.deallocate()
}
let ft: FileTransferPrx = ...
var offset = 0
var results = [PromiseKit.Promise<Void>]()
let numRequests = 5
while stream.hasBytesAvailable {
    let read = stream.read(buffer, maxLength: bufferSize)
    let data = Data(bytes: buffer, count: read, deallocator: .none)

    results.append(ft.sendAsync(offset: offset, bytes: data))
    offset += read

    // Once there are more than numRequests, wait for the least
    // recent one to complete.
    while results.count > numRequests {
        try results.removeFirst().wait()
    }
}

// Wait for any remaining requests to complete.
try when(fulfilled: results).wait()

With this code, the client sends up to numRequests + 1 chunks before it waits for the least recent one of these requests to complete. In other words, the client sends the next request without waiting for the preceding request to complete, up to the limit set by numRequests. In effect, this allows the client to "keep the pipe to the server full of data": the client keeps sending data, so both client and server continuously do work.

Obviously, the correct chunk size and value of numRequests depend on the bandwidth of the network as well as the amount of time taken by the server to process each request. However, with a little testing, you can quickly zoom in on the point where making the requests larger or queuing more requests no longer improves performance. With this technique, you can realize the full bandwidth of the link to within a percent or two of the theoretical bandwidth limit of a native socket connection.

 

Asynchronous Oneway Invocations in Swift

You can invoke operations via oneway proxies asynchronously, provided the operation has void return type, does not have any out-parameters, and does not raise user exceptions. If you call an asynchronous method on a oneway proxy for an operation that returns values or raises a user exception, the proxy method throws TwowayOnlyException.

The promise returned for a oneway invocation completes as soon as the request is successfully written to the client-side transport. The promise completes with an exception if an error occurs before the request is successfully written.


 

Flow Control in Swift

Asynchronous method invocations never block the thread that calls the asynchronous proxy method. The Ice run time checks to see whether it can write the request to the local transport. If it can, it does so immediately in the caller's thread. Alternatively, if the local transport does not have sufficient buffer space to accept the request, the Ice run time queues the request internally for later transmission in the background.

This creates a potential problem: if a client sends many asynchronous requests at the time the server is too busy to keep up with them, the requests pile up in the client-side run time until, eventually, the client runs out of memory.

The API provides a way for you to implement flow control by counting the number of requests that are queued so, if that number exceeds some threshold, the client stops invoking more operations until some of the queued operations have drained out of the local transport. One of the optional parameters to every asynchronous proxy invocation is a sent closure. If you set this sent parameter, the Ice run time will invoke it when the request has been sent and provide a boolean parameter indicating whether the request was sent synchronously. This parameter is true if the entire request could be transferred to the local transport in the caller's thread without blocking, otherwise the parameter is false.

Here's a simple example to demonstrate the flow control feature:

Swift
let proxy: ExamplePrx = ...
proxy.doSomethingAsync { sentSynchronously in 
    if sentSynchronously {
        // Entire request was accepted by the transport,
        // called recursively from this thread
    } else {
        // Request was queued but has now been sent,
        // called from a separate thread
    }
}

Using this feature, you can limit the number of queued requests by counting the number of requests that are queued and decrementing the count when the Ice run time passes a request to the local transport.

 

Asynchronous Batch Requests in Swift

Applications that send batched requests can either flush a batch explicitly or allow the Ice run time to flush automatically. The proxy method ice_flushBatchRequests performs an immediate flush using the synchronous invocation model and may block the calling thread until the entire message can be sent. Ice also provides asynchronous versions of this method so you can flush batch requests asynchronously.

The proxy method ice_flushBatchRequestsAsync flushes any batch requests queued by that proxy. In addition, similar methods are available on the communicator and the Connection objects. These methods flush batch requests sent via the same communicator and via the same connection, respectively.


 

Concurrency Semantics for AMI in Swift

PromiseKit always dispatches your handlers on the dispatch queue you specify with the on parameter. The default value for on is PromiseKit.conf.Q.map or PromiseKit.conf.Q.return depending on the handler. In a similar fashion, Ice executes the sent callback on the dispatch queue you specify with the sentOn parameter. 

If you pass nil for this on parameter, the thread that completes the promise executes the handler. Likewise, if you pass nil for the sentOn parameter, the thread that sent the request executes your sent callback.

See Also