2. Asynchronous Concurrenct Programming (pycos)¶
pycos provides API for asynchronous and concurrent programming with tasks using Python’s generator functions. Tasks are like light weight threads - creating and running tasks is very efficient. Moreover, unlike in the case of thread programming, a task continues to run until it voluntarily gives up control (when yield is used), so locking is not needed to protect critical sections.
Programs developed with pycos have same logic and structure as programs with threads, except for a few syntactic changes. Although the API below has many methods, most of them are for additional features of pycos (such as message passing, hot swapping, monitoring etc.), and not needed for simple programs that are similar to thread based programs. The differences compared to threaded programming are:
Instead of creating threads, tasks should be created with
Task
. The task function (first argument toTask
) should be a generator function (i.e., function with yield statements),Sockets, pipes etc, should be converted to asynchronous versions with Asynchronous Socket, Asynchronous Pipe etc. (e.g., with
async_sock = AsyncSocket(socket.socket())
pycos’s locking primitives (
pycos.Event
,pycos.Condition
, etc.) should be used in place of Python’s threading counterparts with yield on blocking operations (e.g., asyield async_event.wait()
),I/O operations, such as AsyncSocket’s
send()
,receive()
,accept()
, blocking operations, such as task’ssleep()
, Event’swait()
, etc., are implemented with generator methods; these should be used with yield (e.g., asdata = yield async_sock.receve(1024)
),Task’s
sleep()
should be used in place oftime.sleep()
(e.g., asyield task.sleep(2)
).
Tasks in pycos are essentially generator
functions that suspend execution when yield is used and are resumed by pycos’s
scheduler (Pycos) after the asynchronous operation is complete. Usually yield
is used with an asynchronous call, such as socket’s
connect()
, send()
or pipe’s
read()
, communicate()
, waiting for a message
etc. With such statements, the asynchronous call is initiated and control goes
to scheduler which schedules another task, if one is ready to execute. When the
asynchronous operation is complete, the task that called the operation becomes
ready to execute. Thus, the tasks in pycos are not strictly cooperative tasks
that pass control to each other, but each yield statement transfers control to
pycos’s scheduler, which manages them. However, pycos supports message passing,
suspend/resume calls etc., so that tasks can cooperate in a way that is easier
to program and understand.
Unlike with threads, there is no forced preemption with tasks - at any time at most one task is executing and it continues to execute until yield is called. Thus, there is no need for locking critical sections with pycos.
pycos framework consists of Pycos
scheduler, Task
to create
tasks from generator functions, Channel
to broadcast messages,
Asynchronous Socket to convert regular synchronous sockets to asynchronous
(non-blocking) sockets, Asynchronous Pipe for pipes, Asynchronous File for files,
Client and dispycos_server()
for distributed / parallel
computing, Lock
and RLock
for locking (although locking is not
required with pycos), Condition
, Event
, Semaphore
primitives very similar to thread primitives (except for a few syntactic changes
noted above).
2.1. Examples¶
See Asynchronous Concurrent Programming, Channels and Message Passing in tutorial for examples. There are many illustrative use cases in ‘examples’ directory under where pycos module is installed.
Following is a brief description of the examples included relevant to this section:
examples/tasks.py
creates a number of tasks that each suspend execution for a brief period. The number of tasks created can be increased to thousands or tens of thousands to show pycos can scale well.examples/client_server.py
shows message passing (send()
andreceive()
methods of tasks) between local client and server tasks. The remote version and local version are similar, except that remote versions register/locate tasks.examples/channel.py
uses broadcasting Channel to exchange messages in local tasks.
2.2. Confituration Parameters¶
Under pycos installation config.py
file defines following configuration parameters:
PickleProtocolVersion
is pickle (serialization) protocol version. Its default value ofNone
implies pycos uses highest protocol version supported by Python. If it is 0, default protocol version is used. Otherwise, it should be a number that is protocol version. This setting should be same among all peers. This variable affects dispy as well.MsgTimeout
is maximum duration in seconds for sending a message (withsend()
,deliver()
). If a message is not delivered for this long, it is considered an error. If more thanMaxConnectionErrors
errors happen in sending messages to a peer, that peer is considered not reachable and is closed.MaxConnectionErrors
is maximum number errors allowed when communicating with a peer. SeeMsgTimeout
parameter above.IPV4_MULTICAST_GROUP
is multicast group used by netpycos when using IPv4. Its default value is239.255.97.5
.IPV6_MULTICAST_GROUP
is multicast group used by netpycos when using IPv6. Its default value isff05::674f:48ba:b409:3171:9705
.NetPort
is port used by netpycos for network communication. Its default value is 9705.MinPulseInterval
is minimum interval in seconds for sending pulse messages between dispycos client, schedler and dispycosnode. Pulse messages are used for checking that peers are rechable. Its default value isMsgTimeout
defined above.MaxPulseInterval
is maximum interval in seconds for sending pulse messages between dispycos client, scheduler and dispycosnode. Its default value is10 * MinPulseInterval
.DispycosSchedulerPort
is port number used by dispycos scheduler. This must be an expression (to be evaluated witheval
). Its default value is'pycos.config.NetPort'
, so by default 9705 is the port used by dispycos scheduler.DispycosNodePort
is port number used by dispycosnode. This must be an expression (to be evaluated witheval
). Its default value is'pycos.config.DispycosSchedulerPort + 1'
, so by default 9706 is the port used by dispycosnode.
2.3. Pycos scheduler¶
Pycos
is a (singleton) scheduler that runs tasks similar to the way
operating system’s scheduler runs multiple processes. It is initialized
automatically (for example, when a task is created), so for most purposes the
scheduler is transparent. The scheduler in pycos
manages tasks, message
passing, I/O events, timeouts, wait/resume events etc., in a single concurrent
program; it doesn’t provide distributed programming for message passing over
network. netpycos
extends Pycos
with features supporting
distributed programming, remote execution of tasks etc. If the scheduler
instance is needed, it can be obtained with either Pycos()
or
Pycos.instance()
.
Unlike in other asynchronous frameworks, in pycos there is no explicit event loop
- the I/O events are processed by the scheduler and methods in
Asynchronous Socket, Asynchronous Pipe etc. For example, recv()
method (which must be used with yield) sets up an internal function to execute
when the socket has data to read and suspends the caller task. The scheduler can
execute any other tasks that are ready while the I/O operation is pending. When
the data has been read, the suspended task is resumed with the data read so that
Asynchronous Socket's recv()
works just as
socket.recv()
, except for using yield. Thus, programming with pycos is
very similar to that with threads, except for using yield with certain
methods.
-
class
pycos.
Pycos
¶ Creates and returns singleton scheduler. If a scheduler instance has already been created (such as when a task was created), a new instance won’t be created.
netpycos
extendsPycos
for distributed programming and the constructor there has various options to customize.The scheduler following methods:
-
instance
()¶ This static method returns instance of
Pycos
scheduler; use it asscheduler = Pycos.instance()
. If the instance has not been started (yet), it creates one and returns it.
-
cur_task
()¶ This static method returns task (instance of Task) being executed; use it as
task = Pycos.cur_task()
. As mentioned below, if task’s generator function hastask=None
parameter, Task constructor initializes it to the task instance (which is a way to document that method is used for creating tasks).
-
join
()¶ Note
This method must be called from (main) thread only - calling from a task will deadlock entire task framework.
Waits for all scheduled non-daemon tasks to finish. After join returns, more tasks can be created (which are then added to scheduler).
-
finish
()¶ Note
This method must be called from (main) thread only - calling from a task will deadlock entire task framework.
Wait for all non-daemon tasks to finish, then kill the daemon tasks and terminate the task scheduler. If necessary, a new scheduler instance may be created with
Pycos()
orPycos.instance()
.
-
terminate
()¶ Note
This method must be called from (main) thread only - calling from a task will deadlock entire task framework.
Kill all scheduled tasks and then terminate the task scheduler. If necessary, a new scheduler instance may be created with
Pycos()
orPycos.instance()
.
-
The scheduler runs in a separate thread from user program. The scheduler terminates when all non-daemon tasks are terminated, similar to Python’s threading module.
2.4. Task¶
pycos’s Task
class creates tasks (light weight processes). Tasks are similar
to threads in regular Python programs, except for a few differences as noted
above.
-
class
pycos.
Task
(target[, arg1, arg2, ...])¶ Creates a task, where target is a generator function (a function with yield statements), arg1, arg2 etc. are arguments or keyword arguments to target. If target generator function has
task=None
keyword argument, Task constructor replacesNone
with the instance of Task created, so task can use this to invoke methods in Task class (see below). Alternately, the instance can be obtained with the static methodtask = Pycos.cur_task()
.Consider the generator function (where sock is asynchronous socket and all statements are asynchronous, so all are used with yield):
def get_reply(sock, msg, task=None): yield sock.sendall(msg) yield task.sleep(1) reply = yield sock.recv(1024) return reply
A task for processing above function can be created with, for example,
Task(get_reply, conn, "ping")
. Task constructor creates task with the (generator) function get_reply with parameterssock=conn
,msg="ping"
and task set to the just created task instance. (If task=None argument is not used, the task instance can be obtained withtask = Pycos.cur_task()
.) The task is then added to Pycos scheduler so it executes concurrently with other tasks - there is no need to start it explicitly, as done with threads. Note that prior to Python 3.7+, generator functions cannot usereturn
statement. With Python version before 3.7 a return statement such asreturn v
should be replaced withraise StopIteration(v)
. Thus, with Python 3.6 or Python 2.7, in the above example the last statement should beraise StopIteration(reply)
.Blocking operations, such as
socket.recv()
,socket.connect()
, are implemented as generator functions in asynchronous implementation of socket Asynchronous Socket. These functions simply initiate the operation; yield should be used with them (as in the example above) so scheduler can run other eligible tasks while the operation is pending. Calling these methods without yield simply returns generator function itself, instead of result of the method call. So care must be taken to use yield when calling generator functions. Using yield where it is not needed is not an error; e.g.,resume()
method of tasks can be used without yield, but when used with yield, the caller gives control to scheduler which may execute resumed task right away. In rest of the documentation, methods that need to be called with yield are noted so.In rest of the documentation we follow the convention of using
task=None
keyword argument in generator methods and use task variable to refer to the task, i.e., instance of Task, executing the generator function. This variable can be used to invoke methods of Task, use it in other tasks, for example, to send messages to it, or wake up from sleep etc. A task has following methods:-
task.
suspend
(timeout=None, alarm_value=None)¶ -
task.
sleep
(timeout=None, alarm_value=None)¶ Note
This method must always be used with yield as
yield task.sleep()
.Suspends task task until timeout. If timeout is a positive number (float or int), the scheduler suspends execution of task until that many seconds (or fractions of second). If timeout is
None
, the task is not woken up by the scheduler - some other task needs to resume it. The value yielded by this method is the value it is resumed with or alarm_value if resumed by the scheduler due to timeout. If timeout=0, this method returns alarm_value without suspending the task.For example, if a task executes
v = yield task.sleep(2.9)
, it is suspended for 2.9 seconds. If before timeout, another task wakes up this task (withresume()
method) with a value, v is set to that value. Otherwise, after 2.9 seconds, this task is resumed withNone
(default alarm_value) so v is set toNone
and task continues execution. During the time task is suspended, scheduler executes other scheduled tasks. Within a tasktask.sleep
(ortask.suspend
) must be used (with yield) instead oftime.sleep
; callingtime.sleep
will deadlock entire pycos framework.
-
other.
resume
(update=None)¶ -
other.
wakeup
(update=None)¶ Wakes up (suspended) task
other
. As explained above, the suspended task gets update (any Python object) as the value of yield statement that caused it to suspend. If sleep/resume synchronization is needed (so that resume waits until specific suspend is ready to receive), Event locking primitive can be used so that resuming task waits on an event variable and suspending task sets the event before going to sleep.
-
other.
send
(msg)¶ Sends message msg to the task
other
on which this method is invoked. If the task is currently waiting for messages (withreceive()
), then it is resumed with msg. If it is not currently waiting for messages, the message is queued so that nextreceive()
returns the message without suspending.Message can be any Python object when sender and recipient are in same program/pycos (i.e., messages are not sent over network). However, when sender and recipient are on different pycos instances (over network), the messages must be serializable at the sender and unserializable at the receiver. If message includes any objects that have unserializable attributes, then their classes have to provide
__getstate__()
method to serialize the objects, and the remote program should have__setstate__()
for those classes; see Pickle protocol.If the recipient is in a remote pycos,
send()
simply queues messages for transfer over network. A daemon task in pycos transfers the messages in the order they are queued. This task, by default, may transfer each message with a new connection. As creating sockets and making connections is expensive, it may be rather inefficient, especially if messages are sent frequently. Seepeer()
method in Distributed Programming (netpycos) for specifying that messages to peers should be sent as stream, using same connection.
-
other.
deliver
(msg, timeout=None)¶ Note
This method must always be used with yield as
recvd = yield other.deliver(msg)
.Similar to
send()
except that this method must be used with yield and it returns status of delivering the message to the recipient. If it is 1, the message has been successfully placed in recipient task’s message queue (when recipient callsreceive()
, it gets the queued messages in the order they are put in the queue). If timeout is given and message couldn’t be delivered before timeout, the return value is 0. If timeout isNone
, delivery will not timeout.For local tasks (i.e., tasks executing in the same program) timeout has no effect - if the recipient is valid, message will be delivered successfully right away. However, if the recipient is a remote task (see Distributed Programming (netpycos)), network delays / failures may cause delivery to be delayed or delivery may fail (i.e., there is a possibility of delivery waiting forever); to avoid such issues, appropriate timeout may be used. If message couldn’t be put in the recipient’s queue within given timeout, then the return value would be 0.
-
task.
receive
(timeout=None, alarm_value=None)¶ -
task.
recv
(timeout=None, alarm_value=None)¶ Note
This method must always be used with yield as
msg = yield task.receive()
.Returns earliest queued message if there are pending messages, or suspends
task
until either a message is sent to it or timeout seconds elapse. If called with timeout=0, this method will not suspend the task; it will return either earliest queued message if there are messages in the queue or alarm_value otherwise.recv is synonym for receive.
-
task.
set_daemon
(flag=True)¶ Marks the task a daemon (task that may not terminate) if flag is
True
. Similar to threading module, Pycos scheduler waits for all non-daemon tasks to terminate before exiting. The daemon status can be toggled by callingset_daemon()
with flag set toTrue
orFalse
. When scheduler is terminating, daemon tasks are terminated by throwing GeneratorExit exception; the daemon task can handle it and do any cleanup before exiting the function.
-
task.
hot_swappable
(flag)¶ Marks if the task’s generator function can be replaced. This method can be used to set (with flag=True) or clear (with flag=False) the flag. With hot swapping, a task’s code can be updated (to new functionality) while the application is running.
-
task.
hot_swap
(target[, arg1, arg2, ...])¶ Requests Pycos to replace task’s generator function with target([arg1, arg2, …]). Pycos then throws
HotSwapException
in the task when:task indicated it can handle hot swap (i.e., last called
hot_swappable
with flag=True),it is currently executing at top-level in the call stack (i.e., has not called other generator functions), and
has no pending asynchronous operations (socket I/O, tasks scheduled with AsyncThreadPool, etc.).
The new generator is set as args[0] of
HotSwapException
, so the task can inspect new generator, if necessary, and can do any preparation for hot swapping, e.g., saving state (perhaps by sending state as a message to itself which can be retrieved in the new generator withreceive()
), or even ignore hot swap request. If/when it is ready for swap, it must re-raise the sameHotSwapException
(with the new generator as args[0]). This causes Pycos to close current generator function, replace it with the new generator function and schedule new generator for execution (from the beginning). Any messages (i.e., resume updates) queued in the previous generator are not reset, so new generator can process queued messages (e.g., usereceive()
in a loop with timeout=0 untilreceive()
returns alarm_value). Note thathot_swap()
changes generator function of a particular task for which it is called. If there are many tasks using that generator function,hot_swap()
may be called for each such task.
-
task.
monitor
(observe)¶ Note
This method must always be used with yield as
v = yield task.monitor(observe)
.Sets
task
as the monitor of task observe. Then, when the task observe is finished (either because task’s generator function finished exceution or was terminated by Pycos because of an uncaught exception), Pycos sends the status as message with MonitorStatus totask
.A task can be monitored by more than one monitor, and a monitor can monitor more than one task.
-
other.
throw
(*args)¶ Throws exception *args to task
other
(at the point where it is currently executing).
-
other.
terminate
()¶ Terminates the task
other
. This is useful, for example, to terminate server tasks that otherwise never terminate.
-
other
()¶ -
other.
finish
()¶ Note
This method must always be used in a task with yield as
v = yield other()
orv = yield other.finish()
.Returns the result of task
other
; ifother
has not terminated yet, caller will wait untilother
terminates.This method can also be called on remote tasks directly received from dispycos scheduler or from RPS (Remote Pico/Pycos Service). However, this method should not be used if such tasks are then sent to other remote peers.
If a task fails, the result is an instance of MonitorStatus. This is useful to detect and diagnose faults.
-
task.
value
()¶ Note
This method must be called from a thread, not a task.
Returns the result of task, possibly waiting until task terminates. This method should not be called from a task - this will cause entire task framework to deadlock. This method is meant for main thread in the user program to wait for (main) task(s) it creates.
If a task fails, the result is an instance of MonitorStatus. This is useful to detect and diagnose faults.
-
2.5. MonitorStatus¶
MonitorStatus
is used to indicate either exit status / value of task being monitored or an
exception. If it is about exit status, then the MonitorStatus object’s info
attribute
refers to the task (being monitored), type
atribute refers to type of exit value (e.g.,
StopIteration
if task exited without exceptions) and value
attribute is the exit value of
task. If status is for an exception (e.g., a task couldn’t be started due to invalid arguments),
info
attribute would be contextual information as a string (e.g., “invalid arguments to
task”), type
would be type of exception and value
would be traceback as a string.
2.6. Locking Primitives¶
-
class
pycos.
Lock
¶
-
class
pycos.
RLock
¶
-
class
pycos.
Semaphore
¶
-
class
pycos.
Event
¶
-
class
pycos.
Condition
¶
Note
With pycos locking is not needed, as there is no forced preemption -
at any time at most one task is executing and the control is transfered to
the scheduler only when yield statement is encountered. (In fact, the
implementation of asynchronous locking primitives in pycos updates lists and
counters without locking.) So with pycos Lock
and RLock
are
optional.
pycos provides asynchronous implementations of Lock
, RLock
,
Semaphore
, Event
and Condition
primitives. They are
similar to versions in threading module. Any operation that would block in
threading module must be called with yield appropriately. For example,
acquiring a lock is a blocking operation, so it should be invoked as yield
lock.acquire()
. Similarly, Event’s wait method or Condition’s wait method must
be used as yield event.wait()
or yield condition.wait()
. For example,
Condition variable cv in a client should be used as (compare to example at
threading module):
while True:
yield cv.acquire()
while not an_item_is_available():
yield cv.wait()
get_an_available_item()
cv.release()
See documentation strings in pycos
module for more details on which methods
should be used with yield and which methods need not be.
2.7. Channel¶
Channel is a broadcast mechanism with which tasks can exchange
messages. Messages sent to Channel are sent to its subscribers
(recipients). While a message can be sent one-to-one with task’s
send()
or deliver()
methods on the receiving task,
channels can be used to broadcast a message so all its subscribers get that
message.
-
class
pycos.
Channel
(name, transform=None)¶ Creates channel with name, which must be unique. If transform, is given, it must a function that is called before a message is sent to subscribers. The function is called with name of the channel and the message. It should return transformed message or
None
. IfNone
is returned, the message is dropped - subscribers will not receive the message. Otherwise, transformed message is sent to subscribers.A channel has following methods.
-
subsribe
(subscriber, timeout=None)¶ Note
This method must be used with yield as
yield channel.subscribe(task)
Subscribees subscriber (a task or even another channel) to the channel. Any messages sent to the channel are then sent to each subscriber; i.e., messages are broadcast to all subscribers. It is possible to chain or create hierarchical channels with channels subscribing to other channels. If timeout is a positive number, the call fails if subscription is not successfull (e.g., the channel couldn’t be located) before that many seconds.
-
send
(message)¶ Calls transform function of the channel (see above) if it has one. If the function returns
None
, the message is ignored. Otherwise the message is sent to current subscribers. Messages sent over a channel are queued (buffered) at receiving tasks. A task task, for example, that has subscribed to the channel can receive messages withmsg = yield task.receive()
.
-
deliver
(message, timeout=None, n=0)¶ Note
This method must be used with yield as
recvd = yield channel.deliver(msg)
Similar to
send()
, except that it waits until at least n subscribers are subscribed. It returns total number of end-point recipients (tasks) the message has been delivered to - in case of heirarchical channels, it is the sum of recipients of all the channels. This may be less than n (e.g., delivering message to a subscriber may fail, due to network error), or more (e.g., there are more subscribers, or some subscribers are channels with more than one subscriber). If n is 0, then the message will be delivered to all current subscribers. In any case, timeout is maximum amount of time in seconds (or fraction of second) for delivering the message. Thus, for example, if timeout occurs before n subscribers are subscribed to the channel, the method returns 0.
-
unsubsribe
(subscriber, timeout=None)¶ Note
This method must be used with yield as
yield channel.unsubscribe(task)
Unsubscribes the subscriber (task or another channel), so future messages to the channel are not sent to that subscriber. If timeout is a positive number, it is the number of seconds for unsubscribe request to complete.
-
close
()¶ Close the channel. The channel can’t be used for message passing after closing.
-
2.8. Message Passing¶
Task’s send()
, receive()
and deliver()
offer
one-to-one message passing and Channel’s send()
and
deliver()
offer one-to-many / broadcast message passing.
pycos delivers messages in the order they have been sent with either one-to-one or broadcast message passing (i.e., with either send or deliver methods of tasks or channels); i.e., pycos guarantees temporal order of messages.
2.9. AsyncThreadPool¶
pycos framework and all tasks run in a single thread. It implements concurrency (running more than one task) by interleaving tasks - suspending a task that is waiting for some event and running a task that is ready to execute. All the blocking operations, such as sending/receiving data (sockets, message passing), or sleeping, are implemented with generator funtions that schedule the operation and suspend the task. However, pycos framework doesn’t implement every blocking operation. Sometimes, it is necessary to use functions in other modules that block the thread until the operation is complete. For example, reading standard input will block the thread until the read method is complete. If such functions are used in a task, entire pycos framework and all tasks are blocked; i.e., pycos scheduler itself is blocked, so even if there are other tasks eligible to run, they won’t be executed. AsyncThreadPool class can be used to run such blocking functtions in separate threads so pycos itself is not affected by them.
-
class
pycos.
AsyncThreadPool
(num_threads)¶ Creates a pool with given number of threads. When a blocking function is scheduled, an available thread in the pool is used to execute that function. More threads will allow more blocking functions to be running simultaneously, but take more system resources.
-
async_task
(target, \*args, \*\*kwargs)¶ Note
This method must be used with yield as
val = yield pool.async_task(target, args, kwargs)
Schedules given target function with arguments *args and keyword arguments **kwargs for execution with a thread in the pool. If all threads are currently executing other functions, then the function will be executed when a thread becomes available (i.e., done with currently executing function).
The value returned by this method is the value returned by the function.
-
join
()¶ Waits for all scheduled blocking functions to finish. This method should be called from main thread, not from any task, as this method is a blocking operation.
-
terminate
()¶ Waits for all scheduled blocking functions to finish and then terminate the threads; the pool can no longer be used for scheduling tasks. This method should be called from main thread, not from any task, as this method is a blocking operation.
-
See examples/chat_client.py
which uses thread pool (with 1 thread)
to execute sys.stdin.readline
(a bloking function).