Parallel Computing¶
Most modern computers possess more than one CPU, and several computers can be combined together in a cluster. Harnessing the power of these multiple CPUs allows many computations to be completed more quickly. There are two major factors that influence performance: the speed of the CPUs themselves, and the speed of their access to memory. In a cluster, it’s fairly obvious that a given CPU will have fastest access to the RAM within the same computer (node). Perhaps more surprisingly, similar issues are relevant on a typical multicore laptop, due to differences in the speed of main memory and the cache. Consequently, a good multiprocessing environment should allow control over the “ownership” of a chunk of memory by a particular CPU. Julia provides a multiprocessing environment based on message passing to allow programs to run on multiple processes in separate memory domains at once.
Julia’s implementation of message passing is different from other environments such as MPI [1]. Communication in Julia is generally “one-sided”, meaning that the programmer needs to explicitly manage only one process in a two-process operation. Furthermore, these operations typically do not look like “message send” and “message receive” but rather resemble higher-level operations like calls to user functions.
Parallel programming in Julia is built on two primitives: remote references and remote calls. A remote reference is an object that can be used from any process to refer to an object stored on a particular process. A remote call is a request by one process to call a certain function on certain arguments on another (possibly the same) process.
Remote references come in two flavors - Future
and RemoteChannel
.
A remote call returns a Future
to its result. Remote calls
return immediately; the process that made the call proceeds to its
next operation while the remote call happens somewhere else. You can
wait for a remote call to finish by calling wait()
on the returned
Future
, and you can obtain the full value of the result using
fetch()
.
On the other hand RemoteChannel
s are rewritable. For example, multiple processes
can co-ordinate their processing by referencing the same remote Channel
.
Let’s try this out. Starting with julia-pn
provides n
worker
processes on the local machine. Generally it makes sense for n
to
equal the number of CPU cores on the machine.
$./julia-p2julia>r=remotecall(rand,2,2,2)Future(2,1,3,Nullable{Any}())julia>s=@spawnat21.+fetch(r)Future(2,1,6,Nullable{Any}())julia>fetch(s)2×2Array{Float64,2}:1.604011.501111.174571.15741
The first argument to remotecall()
is the function to call.
Most parallel programming in Julia does not reference specific processes
or the number of processes available, but remotecall()
is
considered a low-level interface providing finer control. The second
argument to remotecall()
is the index of the process
that will do the work, and the remaining arguments will be passed
to the function being called.
As you can see, in the first line we asked process 2 to
construct a 2-by-2 random matrix, and in the second line we asked it
to add 1 to it. The result of both calculations is available in the
two futures, r
and s
. The @spawnat
macro
evaluates the expression in the second argument on the process
specified by the first argument.
Occasionally you might want a remotely-computed value immediately. This
typically happens when you read from a remote object to obtain data
needed by the next local operation. The function remotecall_fetch()
exists for this purpose. It is equivalent to fetch(remotecall(...))
but is more efficient.
julia>remotecall_fetch(getindex,2,r,1,1)0.10824216411304866
Remember that getindex(r,1,1)
is equivalent to
r[1,1]
, so this call fetches the first element of the future r
.
The syntax of remotecall()
is not especially convenient. The macro
@spawn
makes things easier. It operates on an expression rather than
a function, and picks where to do the operation for you:
julia>r=@spawnrand(2,2)Future(2,1,4,Nullable{Any}())julia>s=@spawn1.+fetch(r)Future(3,1,5,Nullable{Any}())julia>fetch(s)1.108242164113048661.137982338779231161.123762927063550741.18750497916607167
Note that we used 1.+fetch(r)
instead of 1.+r
. This is because we
do not know where the code will run, so in general a fetch()
might be
required to move r
to the process doing the addition. In this
case, @spawn
is smart enough to perform the computation on the
process that owns r
, so the fetch()
will be a no-op (no work is done).
(It is worth noting that @spawn
is not built-in but defined in Julia
as a macro. It is possible to define your
own such constructs.)
An important thing to remember is that, once fetched, a Future
will cache its value
locally. Further fetch()
calls do not entail a network hop. Once all referencing
Future
s have fetched, the remote stored value is deleted.
Code Availability and Loading Packages¶
Your code must be available on any process that runs it. For example, type the following into the Julia prompt:
julia>function rand2(dims...)return2*rand(dims...)endjulia>rand2(2,2)2×2Array{Float64,2}:0.1537560.3685141.151190.918912julia>fetch(@spawnrand2(2,2))ERROR:Onworker2:function rand2notdefinedonprocess2
Process 1 knew about the function rand2
, but process 2 did not.
Most commonly you’ll be loading code from files or packages, and you
have a considerable amount of flexibility in controlling which
processes load code. Consider a file, "DummyModule.jl"
, containing
the following code:
moduleDummyModuleexportMyType,ftype MyTypea::Intendf(x)=x^2+1println("loaded")end
Starting Julia with julia-p2
, you can use this to verify the following:
include("DummyModule.jl")
loads the file on just a single process (whichever one executes the statement).usingDummyModule
causes the module to be loaded on all processes; however, the module is brought into scope only on the one executing the statement.As long as
DummyModule
is loaded on process 2, commands likerr=RemoteChannel(2)put!(rr,MyType(7))
allow you to store an object of type
MyType
on process 2 even ifDummyModule
is not in scope on process 2.
You can force a command to run on all processes using the @everywhere
macro.
For example, @everywhere
can also be used to directly define a function on all processes:
julia>@everywhereid=myid()julia>remotecall_fetch(()->id,2)2
A file can also be preloaded on multiple processes at startup, and a driver script can be used to drive the computation:
julia-p<n>-Lfile1.jl-Lfile2.jldriver.jl
Each process has an associated identifier. The process providing the interactive Julia prompt
always has an id
equal to 1, as would the Julia process running the driver script in the
example above.
The processes used by default for parallel operations are referred to as “workers”.
When there is only one process, process 1 is considered a worker. Otherwise, workers are
considered to be all processes other than process 1.
The base Julia installation has in-built support for two types of clusters:
- A local cluster specified with the
-p
option as shown above. - A cluster spanning machines using the
--machinefile
option. This uses a passwordlessssh
login to start Julia worker processes (from the same path as the current host) on the specified machines.
Functions addprocs()
, rmprocs()
, workers()
, and others are available as a programmatic means of
adding, removing and querying the processes in a cluster.
Note that workers do not run a .juliarc.jl
startup script, nor do they synchronize their global state
(such as global variables, new method definitions, and loaded modules) with any of the other running processes.
Other types of clusters can be supported by writing your own custom
ClusterManager
, as described below in the ClusterManagers
section.
Data Movement¶
Sending messages and moving data constitute most of the overhead in a parallel program. Reducing the number of messages and the amount of data sent is critical to achieving performance and scalability. To this end, it is important to understand the data movement performed by Julia’s various parallel programming constructs.
fetch()
can be considered an explicit data movement operation, since
it directly asks that an object be moved to the local machine.
@spawn
(and a few related constructs) also moves data, but this is
not as obvious, hence it can be called an implicit data movement
operation. Consider these two approaches to constructing and squaring a
random matrix:
# method 1A=rand(1000,1000)Bref=@spawnA^2...fetch(Bref)# method 2Bref=@spawnrand(1000,1000)^2...fetch(Bref)
The difference seems trivial, but in fact is quite significant due to
the behavior of @spawn
. In the first method, a random matrix is
constructed locally, then sent to another process where it is squared.
In the second method, a random matrix is both constructed and squared on
another process. Therefore the second method sends much less data than
the first.
In this toy example, the two methods are easy to distinguish and choose
from. However, in a real program designing data movement might require
more thought and likely some measurement. For example, if the first
process needs matrix A
then the first method might be better. Or,
if computing A
is expensive and only the current process has it,
then moving it to another process might be unavoidable. Or, if the
current process has very little to do between the @spawn
and
fetch(Bref)
, it might be better to eliminate the parallelism
altogether. Or imagine rand(1000,1000)
is replaced with a more
expensive operation. Then it might make sense to add another @spawn
statement just for this step.
Parallel Map and Loops¶
Fortunately, many useful parallel computations do not require data
movement. A common example is a Monte Carlo simulation, where multiple
processes can handle independent simulation trials simultaneously. We
can use @spawn
to flip coins on two processes. First, write the
following function in count_heads.jl
:
function count_heads(n)c::Int=0fori=1:nc+=rand(Bool)endcend
The function count_heads
simply adds together n
random bits.
Here is how we can perform some trials on two machines, and add together the
results:
@everywhereinclude("count_heads.jl")a=@spawncount_heads(100000000)b=@spawncount_heads(100000000)fetch(a)+fetch(b)
This example demonstrates a powerful and often-used
parallel programming pattern. Many iterations run independently over
several processes, and then their results are combined using some
function. The combination process is called a reduction, since it is
generally tensor-rank-reducing: a vector of numbers is reduced to a
single number, or a matrix is reduced to a single row or column, etc. In
code, this typically looks like the pattern x=f(x,v[i])
, where
x
is the accumulator, f
is the reduction function, and the
v[i]
are the elements being reduced. It is desirable for f
to be
associative, so that it does not matter what order the operations are
performed in.
Notice that our use of this pattern with count_heads
can be
generalized. We used two explicit @spawn
statements, which limits
the parallelism to two processes. To run on any number of processes,
we can use a parallel for loop, which can be written in Julia like
this:
nheads=@parallel(+)fori=1:200000000Int(rand(Bool))end
This construct implements the pattern of assigning iterations to
multiple processes, and combining them with a specified reduction (in
this case (+)
). The result of each iteration is taken as the value
of the last expression inside the loop. The whole parallel loop
expression itself evaluates to the final answer.
Note that although parallel for loops look like serial for loops, their behavior is dramatically different. In particular, the iterations do not happen in a specified order, and writes to variables or arrays will not be globally visible since iterations run on different processes. Any variables used inside the parallel loop will be copied and broadcast to each process.
For example, the following code will not work as intended:
a=zeros(100000)@parallelfori=1:100000a[i]=iend
However, this code will not initialize all of a
, since each
process will have a separate copy of it. Parallel for loops like these
must be avoided. Fortunately, Shared Arrays
can be used to get around this limitation:
a=SharedArray(Float64,10)@parallelfori=1:10a[i]=iend
Using “outside” variables in parallel loops is perfectly reasonable if the variables are read-only:
a=randn(1000)@parallel(+)fori=1:100000f(a[rand(1:end)])end
Here each iteration applies f
to a randomly-chosen sample from a
vector a
shared by all processes.
As you could see, the reduction operator can be omitted if it is not needed.
In that case, the loop executes asynchronously, i.e. it spawns independent
tasks on all available workers and returns an array of Future
immediately without waiting for completion.
The caller can wait for the Future
completions at a later
point by calling fetch()
on them, or wait for completion at the end of the
loop by prefixing it with @sync
, like @sync@parallelfor
.
In some cases no reduction operator is needed, and we merely wish to
apply a function to all integers in some range (or, more generally, to
all elements in some collection). This is another useful operation
called parallel map, implemented in Julia as the pmap()
function.
For example, we could compute the singular values of several large
random matrices in parallel as follows:
M=Matrix{Float64}[rand(1000,1000)fori=1:10]pmap(svd,M)
Julia’s pmap()
is designed for the case where each function call does
a large amount of work. In contrast, @parallelfor
can handle
situations where each iteration is tiny, perhaps merely summing two
numbers. Only worker processes are used by both pmap()
and @parallelfor
for the parallel computation. In case of @parallelfor
, the final reduction
is done on the calling process.
Synchronization With Remote References¶
Scheduling¶
Julia’s parallel programming platform uses
Tasks (aka Coroutines) to switch among
multiple computations. Whenever code performs a communication operation
like fetch()
or wait()
, the current task is suspended and a
scheduler picks another task to run. A task is restarted when the event
it is waiting for completes.
For many problems, it is not necessary to think about tasks directly. However, they can be used to wait for multiple events at the same time, which provides for dynamic scheduling. In dynamic scheduling, a program decides what to compute or where to compute it based on when other jobs finish. This is needed for unpredictable or unbalanced workloads, where we want to assign more work to processes only when they finish their current tasks.
As an example, consider computing the singular values of matrices of different sizes:
M=Matrix{Float64}[rand(800,800),rand(600,600),rand(800,800),rand(600,600)]pmap(svd,M)
If one process handles both 800×800 matrices and another handles both
600×600 matrices, we will not get as much scalability as we could. The
solution is to make a local task to “feed” work to each process when
it completes its current task. For example, consider a simple pmap()
implementation:
function pmap(f,lst)np=nprocs()# determine the number of processes availablen=length(lst)results=Vector{Any}(n)i=1# function to produce the next work item from the queue.# in this case it's just an index.nextidx()=(idx=i;i+=1;idx)@syncbeginforp=1:npifp!=myid()||np==1@asyncbeginwhiletrueidx=nextidx()ifidx>nbreakendresults[idx]=remotecall_fetch(f,p,lst[idx])endendendendendresultsend
@async
is similar to @spawn
, but only runs tasks on the
local process. We use it to create a “feeder” task for each process.
Each task picks the next index that needs to be computed, then waits for
its process to finish, then repeats until we run out of indexes. Note
that the feeder tasks do not begin to execute until the main task
reaches the end of the @sync
block, at which point it surrenders
control and waits for all the local tasks to complete before returning
from the function. The feeder tasks are able to share state via
nextidx()
because they all run on the same process. No locking is
required, since the threads are scheduled cooperatively and not
preemptively. This means context switches only occur at well-defined
points: in this case, when remotecall_fetch()
is called.
Channels¶
Channels provide for a fast means of inter-task communication. A
Channel{T}(n::Int)
is a shared queue of maximum length n
holding objects of type T
. Multiple readers can read off the Channel
via fetch()
and take!()
. Multiple writers can add to the Channel
via
put!()
. isready()
tests for the presence of any object in
the channel, while wait()
waits for an object to become available.
close()
closes a Channel
. On a closed Channel
, put!()
will fail,
while take!()
and fetch()
successfully return any existing values
till it is emptied.
A Channel
can be used as an iterable object in a for
loop, in which
case the loop runs as long as the Channel
has data or is open. The loop
variable takes on all values added to the Channel
. An empty, closed Channel
causes the for
loop to terminate.
Remote references and AbstractChannels¶
Remote references always refer to an implementation of an AbstractChannel
.
A concrete implementation of an AbstractChannel
(like Channel
), is required
to implement put!()
, take!()
, fetch()
, isready()
and wait()
.
The remote object referred to by a Future
is stored in a Channel{Any}(1)
,
i.e., a Channel
of size 1 capable of holding objects of Any
type.
RemoteChannel
, which is rewritable, can point to any type and size of channels, or any other
implementation of an AbstractChannel
.
The constructor RemoteChannel(f::Function,pid)()
allows us to construct references to channels holding
more than one value of a specific type. f()
is a function executed on pid
and it must return
an AbstractChannel
.
For example, RemoteChannel(()->Channel{Int}(10),pid)
, will return a reference to a channel of type Int
and size 10. The channel exists on worker pid
.
Methods put!()
, take!()
, fetch()
, isready()
and wait()
on a RemoteChannel
are proxied onto
the backing store on the remote process.
RemoteChannel
can thus be used to refer to user implemented AbstractChannel
objects. A simple
example of this is provided in examples/dictchannel.jl
which uses a dictionary as its remote store.
Remote References and Distributed Garbage Collection¶
Objects referred to by remote references can be freed only when all held references in the cluster are deleted.
The node where the value is stored keeps track of which of the workers have a reference to it.
Every time a RemoteChannel
or a (unfetched) Future
is serialized to a worker, the node pointed
to by the reference is notified. And every time a RemoteChannel
or a (unfetched) Future
is garbage collected locally, the node owning the value is again notified.
The notifications are done via sending of “tracking” messages - an “add reference” message when a reference is serialized to a different process and a “delete reference” message when a reference is locally garbage collected.
Since Future
s are write-once and cached locally, the act of fetch()
ing a Future
also updates
reference tracking information on the node owning the value.
The node which owns the value frees it once all references to it are cleared.
With Future
s, serializing an already fetched Future
to a different node also sends the value
since the original remote store may have collected the value by this time.
It is important to note that when an object is locally garbage collected depends on the size of the object and the current memory pressure in the system.
In case of remote references, the size of the local reference object is quite small, while the value
stored on the remote node may be quite large. Since the local object may not be collected immediately, it is
a good practice to explicitly call finalize()
on local instances of a RemoteChannel
, or on unfetched
Future
s. Since calling fetch()
on a Future
also removes its reference from the remote store, this
is not required on fetched Future
s. Explicitly calling finalize()
results in an immediate message sent to
the remote node to go ahead and remove its reference to the value.
Once finalized, a reference becomes invalid and cannot be used in any further calls.
Shared Arrays¶
Shared Arrays use system shared memory to map the same array across
many processes. While there are some similarities to a DArray,
the behavior of a SharedArray
is quite different. In a DArray,
each process has local access to just a chunk of the data, and no two
processes share the same chunk; in contrast, in a SharedArray
each
“participating” process has access to the entire array. A
SharedArray
is a good choice when you want to have a large amount
of data jointly accessible to two or more processes on the same machine.
SharedArray
indexing (assignment and accessing values) works just
as with regular arrays, and is efficient because the underlying memory
is available to the local process. Therefore, most algorithms work
naturally on SharedArray
s, albeit in single-process mode. In
cases where an algorithm insists on an Array
input, the underlying
array can be retrieved from a SharedArray
by calling sdata()
.
For other AbstractArray
types, sdata()
just returns the object
itself, so it’s safe to use sdata()
on any Array
-type object.
The constructor for a shared array is of the form:
SharedArray(T::Type,dims::NTuple;init=false,pids=Int[])
which creates a shared array of a bits type T
and size dims
across the processes specified by pids
. Unlike distributed
arrays, a shared array is accessible only from those participating
workers specified by the pids
named argument (and the creating
process too, if it is on the same host).
If an init
function, of signature initfn(S::SharedArray)
, is
specified, it is called on all the participating workers. You can
specify that each worker runs the init
function on a
distinct portion of the array, thereby parallelizing initialization.
Here’s a brief example:
julia>addprocs(3)3-elementArray{Int64,1}:234julia>S=SharedArray(Int,(3,4),init=S->S[Base.localindexes(S)]=myid())3×4SharedArray{Int64,2}:223423342344julia>S[3,2]=77julia>S3×4SharedArray{Int64,2}:223423342744
Base.localindexes()
provides disjoint one-dimensional ranges of indexes,
and is sometimes convenient for splitting up tasks among processes.
You can, of course, divide the work any way you wish:
julia>S=SharedArray(Int,(3,4),init=S->S[indexpids(S):length(procs(S)):length(S)]=myid())3×4SharedArray{Int64,2}:222233334444
Since all processes have access to the underlying data, you do have to be careful not to set up conflicts. For example:
@syncbeginforpinprocs(S)@asyncbeginremotecall_wait(fill!,p,S,p)endendend
would result in undefined behavior. Because each process fills the
entire array with its own pid
, whichever process is the last to
execute (for any particular element of S
) will have its pid
retained.
As a more extended and complex example, consider running the following “kernel” in parallel:
q[i,j,t+1]=q[i,j,t]+u[i,j,t]
In this case, if we try to split up the work using a one-dimensional
index, we are likely to run into trouble: if q[i,j,t]
is near the
end of the block assigned to one worker and q[i,j,t+1]
is near the
beginning of the block assigned to another, it’s very likely that
q[i,j,t]
will not be ready at the time it’s needed for computing
q[i,j,t+1]
. In such cases, one is better off chunking the array
manually. Let’s split along the second dimension:
# This function retuns the (irange,jrange) indexes assigned to this worker@everywherefunction myrange(q::SharedArray)idx=indexpids(q)ifidx==0# This worker is not assigned a piecereturn1:0,1:0endnchunks=length(procs(q))splits=[round(Int,s)forsinlinspace(0,size(q,2),nchunks+1)]1:size(q,1),splits[idx]+1:splits[idx+1]end# Here's the kernel@everywherefunction advection_chunk!(q,u,irange,jrange,trange)@show(irange,jrange,trange)# display so we can see what's happeningfortintrange,jinjrange,iinirangeq[i,j,t+1]=q[i,j,t]+u[i,j,t]endqend# Here's a convenience wrapper for a SharedArray implementation@everywhereadvection_shared_chunk!(q,u)=advection_chunk!(q,u,myrange(q)...,1:size(q,3)-1)
Now let’s compare three different versions, one that runs in a single process:
advection_serial!(q,u)=advection_chunk!(q,u,1:size(q,1),1:size(q,2),1:size(q,3)-1)
one that uses @parallel
:
function advection_parallel!(q,u)fort=1:size(q,3)-1@sync@parallelforj=1:size(q,2)fori=1:size(q,1)q[i,j,t+1]=q[i,j,t]+u[i,j,t]endendendqend
and one that delegates in chunks:
function advection_shared!(q,u)@syncbeginforpinprocs(q)@asyncremotecall_wait(advection_shared_chunk!,p,q,u)endendqend
If we create SharedArrays and time these functions, we get the following results (with julia-p4
):
q=SharedArray(Float64,(500,500,500))u=SharedArray(Float64,(500,500,500))# Run once to JIT-compileadvection_serial!(q,u)advection_parallel!(q,u)advection_shared!(q,u)# Now the real results:julia>@timeadvection_serial!(q,u);(irange,jrange,trange)=(1:500,1:500,1:499)830.220milliseconds(216allocations:13820bytes)julia>@timeadvection_parallel!(q,u);2.495seconds(3999kallocations:289MB,2.09%gctime)julia>@timeadvection_shared!(q,u);Fromworker2:(irange,jrange,trange)=(1:500,1:125,1:499)Fromworker4:(irange,jrange,trange)=(1:500,251:375,1:499)Fromworker3:(irange,jrange,trange)=(1:500,126:250,1:499)Fromworker5:(irange,jrange,trange)=(1:500,376:500,1:499)238.119milliseconds(2264allocations:169KB)
The biggest advantage of advection_shared!
is that it minimizes traffic
among the workers, allowing each to compute for an extended time on the
assigned piece.
ClusterManagers¶
The launching, management and networking of Julia processes into a logical
cluster is done via cluster managers. A ClusterManager
is responsible for
- launching worker processes in a cluster environment
- managing events during the lifetime of each worker
- optionally, providing data transport
A Julia cluster has the following characteristics:
- The initial Julia process, also called the
master
, is special and has anid
of 1. - Only the
master
process can add or remove worker processes. - All processes can directly communicate with each other.
Connections between workers (using the in-built TCP/IP transport) is established in the following manner:
addprocs()
is called on the master process with aClusterManager
objectaddprocs()
calls the appropriatelaunch()
method which spawns required number of worker processes on appropriate machines- Each worker starts listening on a free port and writes out its host and port information to
STDOUT
- The cluster manager captures the
STDOUT
of each worker and makes it available to the master process - The master process parses this information and sets up TCP/IP connections to each worker
- Every worker is also notified of other workers in the cluster
- Each worker connects to all workers whose
id
is less than the worker’s ownid
- In this way a mesh network is established, wherein every worker is directly connected with every other worker
While the default transport layer uses plain TCPSocket
, it is possible for a Julia cluster to provide
its own transport.
Julia provides two in-built cluster managers:
LocalManager
, used whenaddprocs()
oraddprocs(np::Integer)
are calledSSHManager
, used whenaddprocs(hostnames::Array)
is called with a list of hostnames
LocalManager
is used to launch additional workers on the same host, thereby leveraging multi-core
and multi-processor hardware.
Thus, a minimal cluster manager would need to:
- be a subtype of the abstract
ClusterManager
- implement
launch()
, a method responsible for launching new workers - implement
manage()
, which is called at various events during a worker’s lifetime (for example, sending an interrupt signal)
addprocs(manager::FooManager)
requires FooManager
to implement:
function launch(manager::FooManager,params::Dict,launched::Array,c::Condition)...endfunction manage(manager::FooManager,id::Integer,config::WorkerConfig,op::Symbol)...end
As an example let us see how the LocalManager
, the manager responsible for
starting workers on the same host, is implemented:
immutableLocalManager<:ClusterManagernp::Integerendfunction launch(manager::LocalManager,params::Dict,launched::Array,c::Condition)...endfunction manage(manager::LocalManager,id::Integer,config::WorkerConfig,op::Symbol)...end
The launch()
method takes the following arguments:
manager::ClusterManager
- the cluster manageraddprocs()
is called withparams::Dict
- all the keyword arguments passed toaddprocs()
launched::Array
- the array to append one or moreWorkerConfig
objects toc::Condition
- the condition variable to be notified as and when workers are launched
The launch()
method is called asynchronously in a separate task. The termination of this task
signals that all requested workers have been launched. Hence the launch()
function MUST exit as soon
as all the requested workers have been launched.
Newly launched workers are connected to each other, and the master process, in a all-to-all manner.
Specifying the command argument --worker<cookie>
results in the launched processes initializing themselves
as workers and connections being setup via TCP/IP sockets. Optionally, --bind-tobind_addr[:port]
may also be specified to enable other workers to connect to it at the specified bind_addr
and port
.
This is useful for multi-homed hosts.
As an example of a non-TCP/IP transport, an implementation may choose to use MPI, in which case
--worker
must NOT be specified. Instead, newly launched workers should call init_worker(cookie)
before using any of the parallel constructs.
For every worker launched, the launch()
method must add a WorkerConfig
object (with appropriate fields initialized) to launched
type WorkerConfig# Common fields relevant to all cluster managersio::Nullable{IO}host::Nullable{AbstractString}port::Nullable{Integer}# Used when launching additional workers at a hostcount::Nullable{Union{Int,Symbol}}exename::Nullable{AbstractString}exeflags::Nullable{Cmd}# External cluster managers can use this to store information at a per-worker level# Can be a dict if multiple fields need to be stored.userdata::Nullable{Any}# SSHManager / SSH tunnel connections to workerstunnel::Nullable{Bool}bind_addr::Nullable{AbstractString}sshflags::Nullable{Cmd}max_parallel::Nullable{Integer}connect_at::Nullable{Any}.....end
Most of the fields in WorkerConfig
are used by the inbuilt managers.
Custom cluster managers would typically specify only io
or host
/ port
:
- If
io
is specified, it is used to read host/port information. A Julia worker prints out its bind address and port at startup. This allows Julia workers to listen on any free port available instead of requiring worker ports to be configured manually. - If
io
is not specified,host
andport
are used to connect. count
,exename
andexeflags
are relevant for launching additional workers from a worker. For example, a cluster manager may launch a single worker per node, and use that to launch additional workers.count
with an integer valuen
will launch a total ofn
workers.count
with a value of:auto
will launch as many workers as the number of cores on that machine.exename
is the name of thejulia
executable including the full path.exeflags
should be set to the required command line arguments for new workers.
tunnel
,bind_addr
,sshflags
andmax_parallel
are used when a ssh tunnel is required to connect to the workers from the master process.userdata
is provided for custom cluster managers to store their own worker specific information.
manage(manager::FooManager,id::Integer,config::WorkerConfig,op::Symbol)
is called at different
times during the worker’s lifetime with appropriate op
values:
- with
:register
/:deregister
when a worker is added / removed from the Julia worker pool. - with
:interrupt
wheninterrupt(workers)
is called. TheClusterManager
should signal the appropriate worker with an interrupt signal. - with
:finalize
for cleanup purposes.
Cluster Managers with custom transports¶
Replacing the default TCP/IP all-to-all socket connections with a custom transport layer is a little more involved. Each Julia process has as many communication tasks as the workers it is connected to. For example, consider a Julia cluster of 32 processes in a all-to-all mesh network:
- Each Julia process thus has 31 communication tasks
- Each task handles all incoming messages from a single remote worker in a message processing loop
- The message processing loop waits on an
IO
object (for example, aTCPSocket
in the default implementation), reads an entire message, processes it and waits for the next one - Sending messages to a process is done directly from any Julia task - not just communication tasks - again, via the appropriate
IO
object
Replacing the default transport involves the new implementation to setup connections to remote workers, and to provide appropriate
IO
objects that the message processing loops can wait on. The manager specific callbacks to be implemented are:
connect(manager::FooManager,pid::Integer,config::WorkerConfig)kill(manager::FooManager,pid::Int,config::WorkerConfig)
The default implementation (which uses TCP/IP sockets) is implemented as connect(manager::ClusterManager,pid::Integer,config::WorkerConfig)
.
connect
should return a pair of IO
objects, one for reading data sent from worker pid
,
and the other to write data that needs to be sent to worker pid
. Custom cluster managers can use an in-memory BufferStream
as the plumbing to proxy data between the custom, possibly non-IO
transport and Julia’s in-built parallel infrastructure.
A BufferStream
is an in-memory IOBuffer
which behaves like an IO
- it is a stream which can be handled asynchronously.
Folder examples/clustermanager/0mq
contains an example of using ZeroMQ to connect Julia workers in a star topology with a 0MQ broker in the middle.
Note: The Julia processes are still all logically connected to each other - any worker can message any other worker directly without any
awareness of 0MQ being used as the transport layer.
When using custom transports:
- Julia workers must NOT be started with
--worker
. Starting with--worker
will result in the newly launched workers defaulting to the TCP/IP socket transport implementation - For every incoming logical connection with a worker,
Base.process_messages(rd::IO,wr::IO)()
must be called. This launches a new task that handles reading and writing of messages from/to the worker represented by theIO
objects init_worker(cookie,manager::FooManager)
MUST be called as part of worker process initialization- Field
connect_at::Any
inWorkerConfig
can be set by the cluster manager whenlaunch()
is called. The value of this field is passed in in allconnect()
callbacks. Typically, it carries information on how to connect to a worker. For example, the TCP/IP socket transport uses this field to specify the(host,port)
tuple at which to connect to a worker
kill(manager,pid,config)
is called to remove a worker from the cluster.
On the master process, the corresponding IO
objects must be closed by the implementation to ensure proper cleanup.
The default implementation simply executes an exit()
call on the specified remote worker.
examples/clustermanager/simple
is an example that shows a simple implementation using UNIX domain sockets for cluster setup.
Network requirements for LocalManager and SSHManager¶
Julia clusters are designed to be executed on already secured environments on infrastructure such as local laptops,
departmental clusters, or even on the cloud. This section covers network security requirements for the inbuilt LocalManager
and SSHManager
:
The master process does not listen on any port. It only connects out to the workers.
Each worker binds to only one of the local interfaces and listens on the first free port starting from
9009
.LocalManager
, i.e.addprocs(N)
, by default binds only to the loopback interface. This means that workers consequently started on remote hosts, or anyone with malicious intentions is unable to connect to the cluster. Aaddprocs(4)
followed by aaddprocs(["remote_host"])
will fail. Some users may need to create a cluster comprised of their local system and a few remote systems. This can be done by explicitly requestingLocalManager
to bind to an external network interface via therestrict
keyword argument. For example,addprocs(4;restrict=false)
.SSHManager
, i.e.addprocs(list_of_remote_hosts)
launches workers on remote hosts via SSH. It is to be noted that by default SSH is only used to launch Julia workers. Subsequent master-worker and worker-worker connections use plain, unencrypted TCP/IP sockets. The remote hosts must have passwordless login enabled. Additional SSH flags or credentials may be specified via keyword argumentsshflags
.addprocs(list_of_remote_hosts;tunnel=true,sshflags=<sshkeysandotherflags>)
is useful when we wish to use SSH connections for master-worker too. A typical scenario for this is a local laptop running the Julia REPL (i.e., the master) with the rest of the cluster on the cloud, say on Amazon EC2. In this case only port 22 needs to be opened at the remote cluster coupled with SSH client authenticated via public key infrastructure (PKI). Authentication credentials can be supplied viasshflags
, for examplesshflags=`-e<keyfile>`
.Note that worker-worker connections are still plain TCP and the local security policy on the remote cluster must allow for free connections between worker nodes, at least for ports 9009 and above.
Securing and encrypting all worker-worker traffic (via SSH), or encrypting individual messages can be done via a custom ClusterManager.
Cluster cookie¶
All processes in a cluster share the same cookie which, by default, is a randomly generated string on the master process:
Base.cluster_cookie()
returns the cookie, whileBase.cluster_cookie(cookie)()
sets it and returns the new cookie.- All connections are authenticated on both sides to ensure that only workers started by the master are allowed to connect to each other.
- The cookie must be passed to the workers at startup via argument
--worker<cookie>
. Custom ClusterManagers can retrieve the cookie on the master by callingBase.cluster_cookie()
. Cluster managers not using the default TCP/IP transport (and hence not specifying--worker
) must callinit_worker(cookie,manager)
with the same cookie as on the master.
Note that environments requiring higher levels of security can implement this via a custom ClusterManager
.
For example, cookies can be pre-shared and hence not specified as a startup argument.
Specifying network topology (Experimental)¶
Keyword argument topology
to addprocs
is used to specify how the workers must be connected to each other:
:all_to_all
: is the default, where all workers are connected to each other.:master_slave
: only the driver process, i.e.pid
1 has connections to the workers.:custom
: thelaunch
method of the cluster manager specifies the connection topology. Fieldsident
andconnect_idents
inWorkerConfig
are used to specify the same.connect_idents
is a list ofClusterManager
provided identifiers to workers that worker with identified byident
must connect to.
Currently, sending a message between unconnected workers results in an error. This behaviour, as with the functionality and interface, should be considered experimental in nature and may change in future releases.
Multi-threading (Experimental)¶
In addition to tasks, remote calls, and remote references, Julia from v0.5
forwards will natively support
multi-threading. Note that this section is experimental and the interfaces may change in the
future.
Setup¶
By default, Julia starts up with a single thread of execution. This can be verified by
using the command Threads.nthreads()
:
julia>Threads.nthreads()1
The number of threads Julia starts up with is controlled by an environment variable
called JULIA_NUM_THREADS
. Now, let’s start up Julia with 4 threads:
exportJULIA_NUM_THREADS=4
(The above command works on bourne shells on Linux and OSX. Note that if you’re using
a C shell on these platforms, you should use the keyword set
instead of export
.
If you’re on Windows, start up the command line in the location of julia.exe
and
use set
instead of export
.)
Let’s verify there are 4 threads at our disposal.
julia>Threads.nthreads()4
But we are currently on the master thread. To check, we use the command Threads.threadid()
julia>Threads.threadid()1
The @threads
Macro¶
Let’s work a simple example using our native threads. Let us create an array of zeros:
julia>a=zeros(10)10-elementArray{Float64,1}:0.00.00.00.00.00.00.00.00.00.0
Let us operate on this array simultaneously using 4 threads. We’ll have each thread write its thread ID into each location.
Julia supports parallel loops using the Threads.@threads
macro. This macro is affixed in front
of a for
loop to indicate to Julia that the loop is a multi-threaded region.
Threads.@threadsfori=1:10a[i]=Threads.threadid()end
The iteration space is split amongst the threads, after which each thread writes its thread ID to its assigned locations.:
julia>a10-elementArray{Float64,1}:1.01.01.02.02.02.03.03.04.04.0
Note that Threads.@threads
does not have an optional reduction parameter like @parallel
.
@threadcall (Experimental)¶
All I/O tasks, timers, REPL commands, etc are multiplexed onto a single OS thread via an event loop.
A patched version of libuv (http://docs.libuv.org/en/v1.x/) provides this functionality. Yield points provide
for co-operatively scheduling multiple tasks onto the same OS thread. I/O tasks and timers yield implicitly while
waiting for the event to occur. Calling yield()
explicitly allows for other tasks to be scheduled.
Thus, a task executing a ccall
effectively prevents the Julia scheduler from executing any other
tasks till the call returns. This is true for all calls into external libraries. Exceptions are calls into
custom C code that call back into Julia (which may then yield) or C code that calls jl_yield()
(C equivalent of yield()
).
Note that while Julia code runs on a single thread (by default), libraries used by Julia may launch their own internal threads. For example, the BLAS library may start as many threads as there are cores on a machine.
The @threadcall
macro addresses scenarios where we do not want a ccall
to block the main Julia event loop.
It schedules a C function for execution in a separate thread. A threadpool with a default size of 4 is used for this.
The size of the threadpool is controlled via environment variable UV_THREADPOOL_SIZE
. While waiting for a free thread,
and during function execution once a thread is available, the requesting task (on the main Julia event loop)
yields to other tasks. Note that @threadcall
does not return till the execution is complete. From a user point of
view, it is therefore a blocking call like other Julia APIs.
It is very important that the called function does not call back into Julia.
@threadcall
may be removed/changed in future versions of Julia.
Footnotes
[1] | In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding RMA to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see http://www.mpi-forum.org/docs. |