AMI in Python with Futures
On this page:
Basic Asynchronous API in Python
Consider the following simple Slice definition:
module Demo { interface Employees { string getName(int number); } }
Asynchronous Proxy Methods in Python
In addition to the synchronous proxy method, the Python mapping generates the following asynchronous proxy method:
def getNameAsync(self, number, context=None)
As you can see, the getName
operation generates a getNameAsync
method, which optionally accepts a per-invocation context. getNameAsync
sends (or queues) an invocation of getName
, and does not block the calling thread. It returns an instance of Ice.InvocationFuture
that you can use in a number of ways, including blocking to obtain the result, configuring an action to be executed when the result becomes available, and canceling the invocation.
Here's an example that calls getNameAsync
:
e = EmployeePrx.checkedCast(...) f = e.getNameAsync(99) # Continue to do other things here... name = f.result()
Because getNameAsync
does not block, the calling thread can do other things while the operation is in progress.
An asynchronous proxy method uses the same parameter mapping as for synchronous operations; the only difference is that the result (if any) is returned via an InvocationFuture
. For example, consider the following operation:
double op(int inp1, string inp2, out bool outp1, out long outp2);
The generated code looks like this:
def opAsync(self, inp1, inp2, context=None)
Now let's call add_done_callback
to demonstrate one way of asynchronously executing an action when the invocation completes:
p.opAsync(42, "value for inp2").add_done_callback(lambda future: ret, outp1, outp2 = future.result())
As with the synchronous mapping, an operation that returns multiple values supplies its result as a tuple. The completion callback, in this case a lambda function, receives the future as its argument and extracts the values from the result tuple.
Asynchronous Exception Semantics in Python
If an invocation raises an exception, the exception can be obtained from the future in several ways:
- Call
result
on the future;result
raises the exception directly - Call
exception
on the future;exception
returns the exception object
The exception is provided by the future, even if the actual error condition for the exception was encountered during the call to the opAsync
method ("on the way out"). The advantage of this behavior is that all exception handling is located with the code that handles the future (instead of being present twice, once where the opAsync
method is called, and again where the future is handled).
There are two exceptions to this rule:
- if you destroy the communicator and then make an asynchronous invocation, the
opAsync
method throwsCommunicatorDestroyedException
directly. - a call to an
Async
function can throwTwowayOnlyException
. AnAsync
function throws this exception if you call an operation that has a return value or out-parameters on a oneway proxy.
Future
Classes in Python
Ice provides two future classes: Ice.Future
and Ice.InvocationFuture
. Asynchronous proxy invocations return an instance of InvocationFuture
, which is a subclass of Future
. The API for Ice.Future
is similar to that of Python's asyncio.Future
and concurrent.futures.Future
classes, while InvocationFuture
adds some Ice-specific methods that clients may find useful.
class Future(...): def cancel(self) def cancelled(self) def running(self) def done(self) def add_done_callback(self, fn) def result(self, timeout=None) def exception(self, timeout=None) def set_result(self, result) def set_exception(self, ex) def completed(result) completed = staticmethod(completed) class InvocationFuture(Future): def add_done_callback_async(self, fn) def is_sent(self) def is_sent_synchronously(self) def add_sent_callback(self, fn) def add_sent_callback_async(self, fn) def sent(self, timeout=None) def set_sent(self, sentSynchronously) def communicator(self) def connection(self) def proxy(self) def operation(self)
The Future
methods have the following semantics:
cancel(self)
This method prevents a queued invocation from being sent or, if the invocation has already been sent, ignores a reply if the server sends one.cancel
is a local operation and has no effect on the server. A canceled invocation is considered to be completed, meaningdone
returns true, and the result of the invocation is anIce.InvocationCanceledException
.
cancelled(self)
This method returns frue if the invocation was cancelled via a call tocancel
, or false otherwise.
running(self)
This method returns true if the invocation has not yet completed or been cancelled, or false otherwise.
done(self)
This method returns true if the invocation has completed (either successfully or exceptionally) or has been cancelled, or false otherwise.
add_done_callback(self, fn)
This method registers a callback to be executed when the invocation completes, either successfully or exceptionally. The callback function receives the future as its only argument. If the invocation is already completed at the timeadd_done_callback
is called, the callback method is invoked recursively from the calling thread, otherwise the callback method is invoked in the thread that completes the invocation.
result(self, timeout=None)
This method returns the result of the invocation. If an optional timeout is provided, the method will block for up to the given timeout waiting for the invocation to complete and raisesIce.TimeoutException
if the timeout expires without completion. If no timeout is provided, the method blocks indefinitely. If the invocation completes with an exception, the method raises the exception directly. For a Slice operation declared with avoid
return type, the method returnsNone
upon successful completion.
exception(self, timeout=None)
This method returns the exception that completed the invocation, orNone
if the invocation completed successfully. If an optional timeout is provided, the method will block for up to the given timeout waiting for the invocation to complete and raisesIce.TimeoutException
if the timeout expires without completion. If no timeout is provided, the method blocks indefinitely.
set_result(self, result)
This method completes the invocation successfully using the given result. Calling this method has no effect if the invocation is already completed.
set_exception(self, ex)
This method completes the invocation exceptionally using the given exception. Calling this method has no effect if the invocation is already completed.
completed(result)
This static convenience method returns an instance ofIce.Future
that is already completed successfully with the given result.
The InvocationFuture
methods have the following semantics:
add_done_callback_async(self, fn)
This method's semantics differ from that ofadd_done_callback
in the situation where the future is already completed. When you calladd_done_callback_async
and the future is already completed, the callback will be invoked by an Ice thread (or by a dispatcher if one is configured).
is_sent(self)
When you call an asynchronous proxy method, the Ice run time attempts to write the corresponding request to the client-side transport. If the transport cannot accept the request, the Ice run time queues the request for later transmission.is_sent
returns true if, at the time it is called, the request has been written to the local transport (whether it was initially queued or not). Otherwise, if the request is still queued or an exception occurred before the request could be sent,is_sent
returns false.
is_sent_synchronously(self)
This method returns true if a request was written to the client-side transport without first being queued. If the request was initially queued,is_sent_synchronously
returns false (independent of whether the request is still in the queue or has since been written to the client-side transport).
add_sent_callback(self, fn)
This method registers a callback to be executed when the invocation has been sent. The callback function receives two arguments: the future object and a boolean indicating whether the invocation was sent synchronously. If the invocation is already sent at the timeadd_sent_callback
is called, the callback method is invoked recursively from the calling thread. Otherwise, the callback method is invoked by an Ice thread (or by a dispatcher if one is configured).
add_sent_callback_async(self, fn)
This method's semantics differ from that ofadd_sent_callback
in the situation where the invocation is already sent. When you calladd_sent_callback_async
and the invocation is already sent, the callback will be invoked by an Ice thread (or by a dispatcher if one is configured).
sent(self, timeout=None)
This method waits for the invocation to be sent and returns a boolean indicating whether the invocation was sent synchronously. If an optional timeout is provided, the method will block for up to the given timeout waiting for the invocation to be sent and raisesIce.TimeoutException
if the timeout expires beforehand. If no timeout is provided, the method blocks indefinitely. If the invocation completes with an exception, the method raises the exception directly.
set_sent(self, sentSynchronously)
This method marks the invocation as sent, and the boolean argument indicates whether it was sent synchronously.
communicator(self)
This method returns the communicator that sent the invocation.
connection(self)
This method returns the connection that was used for the invocation. Note that, for typical asynchronous proxy invocations, this method returns a nil value because the possibility of automatic retries means the connection that is currently in use could change unexpectedly. ThegetConnection
method only returns a non-nil value when theAsyncResult
object is obtained by callingbegin_flushBatchRequests
on aConnection
object.
proxy(self)
This method returns the proxy that was used to call the asynchronous proxy method, orNone
if the future was not obtained via an asynchronous proxy invocation.
operation(self)
This method returns the name of the operation.
Python 3.5 Features
Ice's future types provide some additional features when using Python 3.5 or later.
asyncio
Integration
The Ice.wrap_future
function wraps an Ice future object with an instance of asyncio.Future
. The function accepts an Ice.Future
object and returns an asyncio.Future
object. Since Ice.Future
objects support use in multi-threaded applications, wrap_future
ensures that the resulting asyncio.Future
object is completed in a thread-safe manner.
Awaitable Objects
Ice.Future
is an awaitable object, meaning an instance can be used as the target of the await
keyword. Note however that your chosen event loop implementation must also support Ice.Future
objects. For example, attempting to call await
on an Ice.Future
while using the asyncio
event loop will result in an error because asyncio
's event loop doesn't support "foreign" future types.
One situation where Ice.Future
objects can be awaited is in a servant dispatch method that is implemented as a coroutine.
Polling for Completion in Python
The InvocationFuture
methods allow you to poll for call completion. Polling is useful in a variety of cases. As an example, consider the following simple interface to transfer files from client to server:
interface FileTransfer { void send(int offset, ByteSeq bytes); }
The client repeatedly calls send
to send a chunk of the file, indicating at which offset in the file the chunk belongs. A naïve way to transmit a file would be along the following lines:
file = open(...) ft = FileTransferPrx.checkedCast(...) chunkSize = ... offset = 0 while not file.eof(): bytes = file.read(chunkSize) # Read a chunk ft.send(offset, bytes) # Send the chunk offset += len(bytes.length)
This works, but not very well: because the client makes synchronous calls, it writes each chunk on the wire and then waits for the server to receive the data, process it, and return a reply before writing the next chunk. This means that both client and server spend much of their time doing nothing — the client does nothing while the server processes the data, and the server does nothing while it waits for the client to send the next chunk.
Using asynchronous calls, we can improve on this considerably:
file = open(...) ft = FileTransferPrx.checkedCast(...) chunkSize = ... offset = 0 results = [] numRequests = 5 while not file.eof(): bytes = file.read(chunkSize) # Read a chunk # Send up to numRequests + 1 chunks asynchronously. f = ft.sendAsync(offset, bytes) offset += len(bytes) # Wait until this request has been passed to the transport. f.sent() results.append(f) # Once there are more than numRequests, wait for the least # recent one to complete. while len(results) > numRequests: f = results[0] del results[0] f.result() # Wait for any remaining requests to complete. while len(results) > 0: f = results[0] del results[0] f.result()
With this code, the client sends up to numRequests + 1
chunks before it waits for the least recent one of these requests to complete. In other words, the client sends the next request without waiting for the preceding request to complete, up to the limit set by numRequests
. In effect, this allows the client to "keep the pipe to the server full of data": the client keeps sending data, so both client and server continuously do work.
Obviously, the correct chunk size and value of numRequests
depend on the bandwidth of the network as well as the amount of time taken by the server to process each request. However, with a little testing, you can quickly zoom in on the point where making the requests larger or queuing more requests no longer improves performance. With this technique, you can realize the full bandwidth of the link to within a percent or two of the theoretical bandwidth limit of a native socket connection.
Asynchronous Oneway Invocations in Python
You can invoke operations via oneway proxies asynchronously, provided the operation has void
return type, does not have any out-parameters, and does not raise user exceptions. If you call an asynchronous proxy method on a oneway proxy for an operation that returns values or raises a user exception, the method throws TwowayOnlyException
.
The future returned for a oneway invocation completes as soon as the request is successfully written to the client-side transport. The future completes exceptionally if an error occurs before the request is successfully written.
Flow Control in Python
Asynchronous method invocations never block the thread that calls the begin_
method: the Ice run time checks to see whether it can write the request to the local transport. If it can, it does so immediately in the caller's thread. (In that case, InvocationFuture.is_sent_synchronously
returns true.) Alternatively, if the local transport does not have sufficient buffer space to accept the request, the Ice run time queues the request internally for later transmission in the background. (In that case, InvocationFuture.is_sent_synchronously
returns false.)
This creates a potential problem: if a client sends many asynchronous requests at the time the server is too busy to keep up with them, the requests pile up in the client-side run time until, eventually, the client runs out of memory.
The API provides a way for you to implement flow control by counting the number of requests that are queued so, if that number exceeds some threshold, the client stops invoking more operations until some of the queued operations have drained out of the local transport.
You can supply a sent callback to be notified when the request was successfully sent:
def sentCallback(future, sentSynchronously): # The request was sent, send another! proxy = ... future = proxy.doSomethingAsync() future.add_sent_callback(sentCallback)
The add_sent_callback
method has the following semantics:
- If the Ice run time was able to pass the entire request to the local transport immediately, the action will be invoked from the current thread and the
sentSynchronously
argument will be true. - If Ice wasn't able to write the entire request without blocking, the action will eventually be invoked from an Ice thread pool thread and the
sentSynchronously
argument will be false.
Asynchronous Batch Requests in Python
You can invoke operations via batch oneway proxies asynchronously, provided the operation has void
return type, does not have any out-parameters, and does not raise user exceptions. If you call an asynchronous proxy method on a oneway proxy for an operation that returns values or raises a user exception, the method throws TwowayOnlyException
.
The future returned for a batch oneway invocation is always completed and indicates the successful queuing of the batch invocation. The future completes exceptionally if an error occurs before the request is queued.
Applications that send batched requests can either flush a batch explicitly or allow the Ice run time to flush automatically. The proxy method ice_flushBatchRequests
performs an immediate flush using the synchronous invocation model and may block the calling thread until the entire message can be sent. Ice also provides an asynchronous version of this method so you can flush batch requests asynchronously.
ice_flushBatchRequestsAsync
is a proxy method that flushes any batch requests queued by that proxy, without blocking the calling thread.
In addition, similar methods are available on the communicator and the Connection
object that is returned by InvocationFuture.connection
. These methods flush batch requests sent via the same communicator and via the same connection, respectively.
Concurrency Semantics for AMI in Python
For the InvocationFuture
returned by an asynchronous proxy method, the Ice run time invokes set_result
or set_exception
from an Ice thread pool thread. When you register an action with add_done_callback
, the thread in which your action executes depends on the completion status of the future. If the future is already complete at the time you call add_done_callback
, the callback function will be invoked immediately in the calling thread. If the future is not yet complete when you call add_done_callback
, the action will eventually execute in an Ice thread pool thread.
The semantics are slightly different when you register an action with add_done_callback_async
: the action is always executed in an Ice thread pool thread regardless of the completion status of the future at the time of the call.
If a dispatcher is configured, the Ice thread pool thread delegates the execution of the action to the dispatcher.
Refer to the flow control discussion for information about the concurrency semantics of the flow control methods.