Tasks and Parallel Computing¶
Tasks¶
Task
(func)¶Create a
Task
(i.e. thread, or coroutine) to execute the given function (which must be callable with no arguments). The task exits when this function returns.
yieldto
(task, arg = nothing)¶Switch to the given task. The first time a task is switched to, the task’s function is called with no arguments. On subsequent switches,
arg
is returned from the task’s last call toyieldto
. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged.
current_task
()¶Get the currently running
Task
.
istaskdone
(task) → Bool¶Tell whether a task has exited.
istaskstarted
(task) → Bool¶Tell whether a task has started executing.
consume
(task, values...)¶Receive the next value passed to
produce
by the specified task. Additional arguments may be passed, to be returned from the lastproduce
call in the producer.
produce
(value)¶Send the given value to the last
consume
call, switching to the consumer task. If the nextconsume
call passes any values, they are returned byproduce
.
yield
()¶Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.
task_local_storage
(symbol)¶Look up the value of a symbol in the current task’s task-local storage.
task_local_storage
(symbol, value)Assign a value to a symbol in the current task’s task-local storage.
task_local_storage
(body, symbol, value)Call the function
body
with a modified task-local storage, in whichvalue
is assigned tosymbol
; the previous value ofsymbol
, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping.
Condition
()¶Create an edge-triggered event source that tasks can wait for. Tasks that call
wait
on aCondition
are suspended and queued. Tasks are woken up whennotify
is later called on theCondition
. Edge triggering means that only tasks waiting at the timenotify
is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. TheChannel
type does this, and so can be used for level-triggered events.
notify
(condition, val=nothing; all=true, error=false)¶Wake up tasks waiting for a condition, passing them
val
. Ifall
istrue
(the default), all waiting tasks are woken, otherwise only one is. Iferror
istrue
, the passed value is raised as an exception in the woken tasks.
schedule
(t::Task, [val]; error=false)¶Add a task to the scheduler’s queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as
wait
.If a second argument is provided, it will be passed to the task (via the return value of
yieldto
) when it runs again. Iferror
istrue
, the value is raised as an exception in the woken task.
@schedule
()¶Wrap an expression in a
Task
and add it to the local machine’s scheduler queue.
@task
()¶Wrap an expression in a
Task
without executing it, and return theTask
. This only creates a task, and does not run it.
sleep
(seconds)¶Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of
0.001
.
ReentrantLock
()¶Creates a reentrant lock. The same task can acquire the lock as many times as required. Each lock must be matched with an unlock.
lock
(l::ReentrantLock)¶Associates
l
with the current task. Ifl
is already locked by a different task, waits for it to become available. The same task can acquire the lock multiple times. Each “lock” must be matched by an “unlock”
unlock
(l::ReentrantLock)¶Releases ownership of the lock by the current task. If the lock had been acquired before, it just decrements an internal counter and returns immediately.
Channel{T}
(sz::Int)¶Constructs a
Channel
that can hold a maximum ofsz
objects of typeT
.put!
calls on a full channel block till an object is removed withtake!
.Other constructors:
Channel()
- equivalent toChannel{Any}(32)
Channel(sz::Int)
equivalent toChannel{Any}(sz)
General Parallel Computing Support¶
addprocs
(n::Integer; exeflags=``) → List of process identifiers¶Launches workers using the in-built
LocalManager
which only launches workers on the local host. This can be used to take advantage of multiple cores.addprocs(4)
will add 4 processes on the local machine.
addprocs
() → List of process identifiersEquivalent to
addprocs(CPU_CORES)
Note that workers do not run a
.juliarc.jl
startup script, nor do they synchronize their global state (such as global variables, new method definitions, and loaded modules) with any of the other running processes.
addprocs
(machines; keyword_args...) → List of process identifiersAdd processes on remote machines via SSH. Requires
julia
to be installed in the same location on each node, or to be available via a shared file system.machines
is a vector of machine specifications. Worker are started for each specification.A machine specification is either a string
machine_spec
or a tuple -(machine_spec,count)
.machine_spec
is a string of the form[user@]host[:port][bind_addr[:port]]
.user
defaults to current user,port
to the standard ssh port. If[bind_addr[:port]]
is specified, other workers will connect to this worker at the specifiedbind_addr
andport
.count
is the number of workers to be launched on the specified host. If specified as:auto
it will launch as many workers as the number of cores on the specific host.Keyword arguments:
tunnel
: iftrue
then SSH tunneling will be used to connect to the worker from the master process. Default isfalse
.sshflags
: specifies additional ssh options, e.g.
sshflags=`-i /home/foo/bar.pem`
max_parallel
: specifies the maximum number of workers connected to in parallel at a host. Defaults to 10.dir
: specifies the working directory on the workers. Defaults to the host’s current directory (as found bypwd()
)exename
: name of thejulia
executable. Defaults to"$JULIA_HOME/julia"
or"$JULIA_HOME/julia-debug"
as the case may be.exeflags
: additional flags passed to the worker processes.topology
: Specifies how the workers connect to each other. Sending a message between unconnected workers results in an error.topology=:all_to_all
: All processes are connected to each other. This is the default.topology=:master_slave
: Only the driver process, i.e. pid 1 connects to the workers. The workers do not connect to each other.topology=:custom
: Thelaunch
method of the cluster manager specifes the connection topology via fieldsident
andconnect_idents
inWorkerConfig
. A worker with a cluster manager identityident
will connect to all workers specified inconnect_idents
.
Environment variables :
If the master process fails to establish a connection with a newly launched worker within 60.0 seconds, the worker treats it a fatal situation and terminates. This timeout can be controlled via environment variable
JULIA_WORKER_TIMEOUT
. The value ofJULIA_WORKER_TIMEOUT
on the master process, specifies the number of seconds a newly launched worker waits for connection establishment.
addprocs
(manager::ClusterManager; kwargs...) → List of process identifiersLaunches worker processes via the specified cluster manager.
For example Beowulf clusters are supported via a custom cluster manager implemented in package
ClusterManagers
.The number of seconds a newly launched worker waits for connection establishment from the master can be specified via variable
JULIA_WORKER_TIMEOUT
in the worker process’s environment. Relevant only when using TCP/IP as transport.
nprocs
()¶Get the number of available processes.
nworkers
()¶Get the number of available worker processes. This is one less than
nprocs()
. Equal tonprocs()
ifnprocs()==1
.
procs
()¶Returns a list of all process identifiers.
workers
()¶Returns a list of all worker process identifiers.
rmprocs
(pids...)¶Removes the specified workers.
interrupt
([pids...])¶Interrupt the current executing task on the specified workers. This is equivalent to pressing Ctrl-C on the local machine. If no arguments are given, all workers are interrupted.
myid
()¶Get the id of the current process.
pmap
(f, lsts...; err_retry=true, err_stop=false, pids=workers())¶Transform collections
lsts
by applyingf
to each element in parallel. (Note thatf
must be made available to all worker processes; see Code Availability and Loading Packages for details.) Ifnprocs()>1
, the calling process will be dedicated to assigning tasks. All other available processes will be used as parallel workers, or on the processes specified bypids
.If
err_retry
istrue
, it retries a failed application off
on a different worker. Iferr_stop
istrue
, it takes precedence over the value oferr_retry
andpmap
stops execution on the first error.
remotecall
(id, func, args...)¶Call a function asynchronously on the given arguments on the specified process. Returns a
RemoteRef
.
wait
([x])¶Block the current task until some event occurs, depending on the type of the argument:
RemoteRef
: Wait for a value to become available for the specified remote reference.Channel
: Wait for a value to be appended to the channel.Condition
: Wait fornotify
on a condition.Process
: Wait for a process or process chain to exit. Theexitcode
field of a process can be used to determine success or failure.Task
: Wait for aTask
to finish, returning its result value. If the task fails with an exception, the exception is propagated (re-thrown in the task that calledwait
).RawFD
: Wait for changes on a file descriptor (seepoll_fd
for keyword arguments and return code)
If no argument is passed, the task blocks for an undefined period. If the task’s state is set to
:waiting
, it can only be restarted by an explicit call toschedule
oryieldto
. If the task’s state is:runnable
, it might be restarted unpredictably.Often
wait
is called within awhile
loop to ensure a waited-for condition is met before proceeding.
fetch
(x)¶Waits and fetches a value from
x
depending on the type ofx
. Does not remove the item fetched:RemoteRef
: Wait for and get the value of a remote reference. If the remote value is an exception, throws aRemoteException
which captures the remote exception and backtrace.Channel
: Wait for and get the first available item from the channel.
remotecall_wait
(id, func, args...)¶Perform
wait(remotecall(...))
in one message.
remotecall_fetch
(id, func, args...)¶Perform
fetch(remotecall(...))
in one message. Any remote exceptions are captured in aRemoteException
and thrown.
put!
(RemoteRef, value)¶Store a value to a remote reference. Implements “shared queue of length 1” semantics: if a value is already present, blocks until the value is removed with
take!
. Returns its first argument.
put!
(Channel, value)Appends an item to the channel. Blocks if the channel is full.
take!
(RemoteRef)¶Fetch the value of a remote reference, removing it so that the reference is empty again.
take!
(Channel)Removes and returns a value from a
Channel
. Blocks till data is available.
isready
(r::RemoteRef)¶Determine whether a
RemoteRef
has a value stored to it. Note that this function can cause race conditions, since by the time you receive its result it may no longer be true. It is recommended that this function only be used on aRemoteRef
that is assigned once.If the argument
RemoteRef
is owned by a different node, this call will block to wait for the answer. It is recommended to wait forr
in a separate task instead, or to use a localRemoteRef
as a proxy:rr=RemoteRef()@asyncput!(rr,remotecall_fetch(p,long_computation))isready(rr)# will not block
close
(Channel)¶Closes a channel. An exception is thrown by:
put!
on a closed channel.take!
andfetch
on an empty, closed channel.
RemoteRef
()¶Make an uninitialized remote reference on the local machine.
RemoteRef
(n)Make an uninitialized remote reference on process
n
.
timedwait
(testcb::Function, secs::Float64; pollint::Float64=0.1)¶Waits till
testcb
returnstrue
or forsecs
seconds, whichever is earlier.testcb
is polled everypollint
seconds.
@spawn
()¶Creates a closure around an expression and runs it on an automatically-chosen process, returning a
RemoteRef
to the result.
@spawnat
()¶Accepts two arguments,
p
and an expression. A closure is created around the expression and run asynchronously on processp
. Returns aRemoteRef
to the result.
@fetch
()¶Equivalent to
fetch(@spawnexpr)
.
@fetchfrom
()¶Equivalent to
fetch(@spawnatpexpr)
.
@async
()¶Like
@schedule
,@async
wraps an expression in aTask
and adds it to the local machine’s scheduler queue. Additionally it adds the task to the set of items that the nearest enclosing@sync
waits for.@async
also wraps the expression in aletx=x,y=y,...
block to create a new scope with copies of all variables referenced in the expression.
@sync
()¶Wait until all dynamically-enclosed uses of
@async
,@spawn
,@spawnat
and@parallel
are complete. All exceptions thrown by enclosed async operations are collected and thrown as aCompositeException
.
@parallel
()¶A parallel for loop of the form :
@parallel[reducer]forvar=rangebodyend
The specified range is partitioned and locally executed across all workers. In case an optional reducer function is specified,
@parallel
performs local reductions on each worker with a final reduction on the calling process.Note that without a reducer function,
@parallel
executes asynchronously, i.e. it spawns independent tasks on all available workers and returns immediately without waiting for completion. To wait for completion, prefix the call with@sync
, like :@sync@parallelforvar=rangebodyend
@everywhere
()¶Execute an expression on all processes. Errors on any of the processes are collected into a
CompositeException
and thrown.
Cluster Manager Interface¶
This interface provides a mechanism to launch and manage Julia workers on different cluster environments. LocalManager, for launching additional workers on the same host and SSHManager, for launching on remote hosts via ssh are present in Base. TCP/IP sockets are used to connect and transport messages between processes. It is possible for Cluster Managers to provide a different transport.
launch
(manager::FooManager, params::Dict, launched::Vector{WorkerConfig}, launch_ntfy::Condition)¶Implemented by cluster managers. For every Julia worker launched by this function, it should append a
WorkerConfig
entry tolaunched
and notifylaunch_ntfy
. The function MUST exit once all workers, requested bymanager
have been launched.params
is a dictionary of all keyword argumentsaddprocs
was called with.
manage
(manager::FooManager, pid::Int, config::WorkerConfig. op::Symbol)¶Implemented by cluster managers. It is called on the master process, during a worker’s lifetime, with appropriate
op
values:- with
:register
/:deregister
when a worker is added / removed from the Julia worker pool. - with
:interrupt
wheninterrupt(workers)
is called. TheClusterManager
should signal the appropriate worker with an interrupt signal. - with
:finalize
for cleanup purposes.
- with
kill
(manager::FooManager, pid::Int, config::WorkerConfig)¶Implemented by cluster managers. It is called on the master process, by
rmprocs
. It should cause the remote worker specified bypid
to exit.Base.kill(manager::ClusterManager.....)
executes a remoteexit()
onpid
init_worker
(manager::FooManager)¶Called by cluster managers implementing custom transports. It initializes a newly launched process as a worker. Command line argument
--worker
has the effect of initializing a process as a worker using TCP/IP sockets for transport.
connect
(manager::FooManager, pid::Int, config::WorkerConfig) -> (instrm::AsyncStream, outstrm::AsyncStream)¶Implemented by cluster managers using custom transports. It should establish a logical connection to worker with id
pid
, specified byconfig
and return a pair ofAsyncStream
objects. Messages frompid
to current process will be read offinstrm
, while messages to be sent topid
will be written tooutstrm
. The custom transport implementation must ensure that messages are delivered and received completely and in order.Base.connect(manager::ClusterManager.....)
sets up TCP/IP socket connections in-between workers.
Base.
process_messages
(instrm::AsyncStream, outstrm::AsyncStream)¶Called by cluster managers using custom transports. It should be called when the custom transport implementation receives the first message from a remote worker. The custom transport must manage a logical connection to the remote worker and provide two
AsyncStream
objects, one for incoming messages and the other for messages addressed to the remote worker.