Tasks
Core.Task — TypeTask(func[, reserved_stack::Int])Create a Task (i.e. coroutine) to execute the given function func (which must be callable with no arguments). The task exits when this function returns. The task will run in the "world age" from the parent at construction when scheduled.
The optional reserved_stack argument specifies the size of the stack available for this task, in bytes. The default, 0, uses the system-dependent stack size default.
By default tasks will have the sticky bit set to true t.sticky. This models the historic default for @async. Sticky tasks can only be run on the worker thread they are first scheduled on, and when scheduled will make the task that they were scheduled from sticky. To obtain the behavior of Threads.@spawn set the sticky bit manually to false.
Examples
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);In this example, b is a runnable Task that hasn't started yet.
Base.@task — Macro@taskWrap an expression in a Task without executing it, and return the Task. This only creates a task, and does not run it.
By default tasks will have the sticky bit set to true t.sticky. This models the historic default for @async. Sticky tasks can only be run on the worker thread they are first scheduled on, and when scheduled will make the task that they were scheduled from sticky. To obtain the behavior of Threads.@spawn set the sticky bit manually to false.
Examples
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.@async — Macro@asyncWrap an expression in a Task and add it to the local machine's scheduler queue.
Values can be interpolated into @async via $, which copies the value directly into the constructed underlying closure. This allows you to insert the value of a variable, isolating the asynchronous code from changes to the variable's value in the current task.
It is strongly encouraged to favor Threads.@spawn over @async always even when no parallelism is required especially in publicly distributed libraries.  This is because a use of @async disables the migration of the parent task across worker threads in the current implementation of Julia.  Thus, seemingly innocent use of @async in a library function can have a large impact on the performance of very different parts of user applications.
Base.asyncmap — Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)Uses multiple concurrent tasks to map f over a collection (or multiple equal length collections). For multiple collection arguments, f is applied elementwise.
The output is guaranteed to be the same order as the elements of the collection(s) c.
ntasks specifies the number of tasks to run concurrently. Depending on the length of the collections, if ntasks is unspecified, up to 100 tasks will be used for concurrent mapping.
ntasks can also be specified as a zero-arg function. In this case, the number of tasks to run in parallel is checked before processing every element and a new task started if the value of ntasks_func is greater than the current number of tasks.
If batch_size is specified, the collection is processed in batch mode. f must then be a function that must accept a Vector of argument tuples and must return a vector of results. The input vector will have a length of batch_size or less.
The following examples highlight execution in different tasks by returning the objectid of the tasks in which the mapping function is executed.
First, with ntasks undefined, each element is processed in a different task.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Vector{UInt64}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5With ntasks=2 all elements are processed in 2 tasks.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Vector{UInt64}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2With batch_size defined, the mapping function needs to be changed to accept an array of argument tuples and return an array of results. map is used in the modified mapping function to achieve this.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Vector{String}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"Base.asyncmap! — Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)Like asyncmap, but stores output in results rather than returning a collection.
Base.current_task — Functioncurrent_task()Get the currently running Task.
Base.istaskdone — Functionistaskdone(t::Task)::BoolDetermine whether a task has exited.
Examples
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.istaskstarted — Functionistaskstarted(t::Task)::BoolDetermine whether a task has started executing.
Examples
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
falseBase.istaskfailed — Functionistaskfailed(t::Task)::BoolDetermine whether a task has exited because an exception was thrown.
Examples
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
trueBase.task_local_storage — Methodtask_local_storage(key)Look up the value of a key in the current task's task-local storage.
Base.task_local_storage — Methodtask_local_storage(key, value)Assign a value to a key in the current task's task-local storage.
Base.task_local_storage — Methodtask_local_storage(body, key, value)Call the function body with a modified task-local storage, in which value is assigned to key; the previous value of key, or lack thereof, is restored afterwards. Useful for emulating dynamic scoping.
Core.ConcurrencyViolationError — TypeConcurrencyViolationError(msg) <: ExceptionAn error thrown when a detectable violation of concurrent semantics has occurred.
A non-exhaustive list of examples of when this is used include:
- Throwing when a deadlock has been detected (e.g. wait(current_task()))
- Known-unsafe behavior is attempted (e.g. yield(current_task))
- A known non-threadsafe datastructure is attempted to be modified from multiple concurrent tasks
- A lock is being unlocked that wasn't locked by this task
Scheduling
Base.yield — Functionyield(t::Task, arg = nothing)A fast, unfair-scheduling version of schedule(t, arg); yield() which immediately yields to t before calling the scheduler.
Throws a ConcurrencyViolationError if t is the currently running task.
yield()Switch to the scheduler to allow another scheduled task to run. A task that calls this function is still runnable, and will be restarted immediately if there are no other runnable tasks.
Base.yieldto — Functionyieldto(t::Task, arg = nothing)Switch to the given task. The first time a task is switched to, the task's function is called with no arguments. On subsequent switches, arg is returned from the task's last call to yieldto. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged.
Base.sleep — Functionsleep(seconds)Block the current task for a specified number of seconds. The minimum sleep time is 1 millisecond or input of 0.001.
Base.schedule — Functionschedule(t::Task, [val]; error=false)Add a Task to the scheduler's queue. This causes the task to run constantly when the system is otherwise idle, unless the task performs a blocking operation such as wait.
If a second argument val is provided, it will be passed to the task (via the return value of yieldto) when it runs again. If error is true, the value is raised as an exception in the woken task.
It is incorrect to use schedule on an arbitrary Task that has already been started. See the API reference for more information.
By default tasks will have the sticky bit set to true t.sticky. This models the historic default for @async. Sticky tasks can only be run on the worker thread they are first scheduled on, and when scheduled will make the task that they were scheduled from sticky. To obtain the behavior of Threads.@spawn set the sticky bit manually to false.
Examples
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
trueSynchronization
Base.errormonitor — Functionerrormonitor(t::Task)Print an error log to stderr if task t fails.
Examples
julia> wait(errormonitor(Threads.@spawn error("task failed")); throw = false)
Unhandled Task ERROR: task failed
Stacktrace:
[...]Base.@sync — Macro@syncWait until all lexically-enclosed uses of @async, @spawn, Distributed.@spawnat and Distributed.@distributed are complete. All exceptions thrown by enclosed async operations are collected and thrown as a CompositeException.
Examples
julia> Threads.nthreads()
4
julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2Base.wait — Functionwait([x])Block the current task until some event occurs.
- Channel: Wait for a value to be appended to the channel.
- Condition: Wait for- notifyon a condition and return the- valparameter passed to- notify. See the- Condition-specific docstring of- waitfor the exact behavior.
- Process: Wait for a process or process chain to exit. The- exitcodefield of a process can be used to determine success or failure.
- Task: Wait for a- Taskto finish. See the- Task-specific docstring of- waitfor the exact behavior.
- RawFD: Wait for changes on a file descriptor (see the- FileWatchingpackage).
If no argument is passed, the task blocks for an undefined period. A task can only be restarted by an explicit call to schedule or yieldto.
Often wait is called within a while loop to ensure a waited-for condition is met before proceeding.
Base.waitany — Functionwaitany(tasks; throw=true) -> (done_tasks, remaining_tasks)Wait until at least one of the given tasks have been completed.
If throw is true, throw CompositeException when one of the completed tasks completes with an exception.
The return value consists of two task vectors. The first one consists of completed tasks, and the other consists of uncompleted tasks.
This may scale poorly compared to writing code that uses multiple individual tasks that each runs serially, since this needs to scan the list of tasks each time and synchronize with each one every time this is called. Or consider using waitall(tasks; failfast=true) instead.
Base.waitall — Functionwaitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)Wait until all the given tasks have been completed.
If failfast is true, the function will return when at least one of the given tasks is finished by exception. If throw is true, throw CompositeException when one of the completed tasks has failed.
failfast and throw keyword arguments work independently; when only throw=true is specified, this function waits for all the tasks to complete.
The return value consists of two task vectors. The first one consists of completed tasks, and the other consists of uncompleted tasks.
Base.fetch — Methodfetch(t::Task)Wait for a Task to finish, then return its result value. If the task fails with an exception, a TaskFailedException (which wraps the failed task) is thrown.
Base.fetch — Methodfetch(x::Any)Return x.
Base.timedwait — Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)Wait until testcb() returns true or timeout seconds have passed, whichever is earlier. The test function is polled every pollint seconds. The minimum value for pollint is 0.001 seconds, that is, 1 millisecond.
Return :ok or :timed_out.
Examples
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:okBase.Condition — TypeCondition()Create an edge-triggered event source that tasks can wait for. Tasks that call wait on a Condition are suspended and queued. Tasks are woken up when notify is later called on the Condition. Waiting on a condition can return a value or raise an error if the optional arguments of notify are used. Edge triggering means that only tasks waiting at the time notify is called can be woken up. For level-triggered notifications, you must keep extra state to keep track of whether a notification has happened. The Channel and Threads.Event types do this, and can be used for level-triggered events.
This object is NOT thread-safe. See Threads.Condition for a thread-safe version.
Base.Threads.Condition — TypeThreads.Condition([lock])A thread-safe version of Base.Condition.
To call wait or notify on a Threads.Condition, you must first call lock on it. When wait is called, the lock is atomically released during blocking, and will be reacquired before wait returns. Therefore idiomatic use of a Threads.Condition c looks like the following:
lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
endBase.Event — TypeEvent([autoreset=false])Create a level-triggered event source. Tasks that call wait on an Event are suspended and queued until notify is called on the Event. After notify is called, the Event remains in a signaled state and tasks will no longer block when waiting for it, until reset is called.
If autoreset is true, at most one task will be released from wait for each call to notify.
This provides an acquire & release memory ordering on notify/wait.
Base.notify — Functionnotify(condition, val=nothing; all=true, error=false)Wake up tasks waiting for a condition, passing them val. If all is true (the default), all waiting tasks are woken, otherwise only one is. If error is true, the passed value is raised as an exception in the woken tasks.
Return the count of tasks woken up. Return 0 if no tasks are waiting on condition.
Base.reset — Methodreset(::Event)Reset an Event back into an un-set state. Then any future calls to wait will block until notify is called again.
Base.Semaphore — TypeSemaphore(sem_size)Create a counting semaphore that allows at most sem_size acquires to be in use at any time. Each acquire must be matched with a release.
This provides a acquire & release memory ordering on acquire/release calls.
Base.acquire — Functionacquire(f, s::Semaphore)Execute f after acquiring from Semaphore s, and release on completion or error.
For example, a do-block form that ensures only 2 calls of foo will be active at the same time:
s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
endacquire(s::Semaphore)Wait for one of the sem_size permits to be available, blocking until one can be acquired.
Base.release — Functionrelease(s::Semaphore)Return one permit to the pool, possibly allowing another task to acquire it and resume execution.
Base.AbstractLock — TypeAbstractLockAbstract supertype describing types that implement the synchronization primitives: lock, trylock, unlock, and islocked.
Base.lock — Functionlock(f::Function, l::Lockable)Acquire the lock associated with l, execute f with the lock held, and release the lock when f returns. f will receive one positional argument: the value wrapped by l. If the lock is already locked by a different task/thread, wait for it to become available. When this function returns, the lock has been released, so the caller should not attempt to unlock it.
lock(f::Function, lock)Acquire the lock, execute f with the lock held, and release the lock when f returns. If the lock is already locked by a different task/thread, wait for it to become available.
When this function returns, the lock has been released, so the caller should not attempt to unlock it.
See also: @lock.
Using a Channel as the second argument requires Julia 1.7 or later.
lock(lock)Acquire the lock when it becomes available. If the lock is already locked by a different task/thread, wait for it to become available.
Each lock must be matched by an unlock.
Base.unlock — Functionunlock(lock)Releases ownership of the lock.
If this is a recursive lock which has been acquired before, decrement an internal counter and return immediately.
Base.trylock — Functiontrylock(lock) -> Success (Boolean)Acquire the lock if it is available, and return true if successful. If the lock is already locked by a different task/thread, return false.
Each successful trylock must be matched by an unlock.
Function trylock combined with islocked can be used for writing the test-and-test-and-set or exponential backoff algorithms if it is supported by the typeof(lock) (read its documentation).
Base.islocked — Functionislocked(lock) -> Status (Boolean)Check whether the lock is held by any task/thread. This function alone should not be used for synchronization. However, islocked combined with trylock can be used for writing the test-and-test-and-set or exponential backoff algorithms if it is supported by the typeof(lock) (read its documentation).
Extended help
For example, an exponential backoff can be implemented as follows if the lock implementation satisfied the properties documented below.
nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
endImplementation
A lock implementation is advised to define islocked with the following properties and note it in its docstring.
- islocked(lock)is data-race-free.
- If islocked(lock)returnsfalse, an immediate invocation oftrylock(lock)must succeed (returnstrue) if there is no interference from other tasks.
Base.ReentrantLock — TypeReentrantLock()Creates a re-entrant lock for synchronizing Tasks. The same task can acquire the lock as many times as required (this is what the "Reentrant" part of the name means). Each lock must be matched with an unlock.
Calling lock will also inhibit running of finalizers on that thread until the corresponding unlock. Use of the standard lock pattern illustrated below should naturally be supported, but beware of inverting the try/lock order or missing the try block entirely (e.g. attempting to return with the lock still held):
This provides a acquire/release memory ordering on lock/unlock calls.
lock(l)
try
    <atomic work>
finally
    unlock(l)
endIf !islocked(lck::ReentrantLock) holds, trylock(lck) succeeds unless there are other tasks attempting to hold the lock "at the same time."
Base.@lock — Macro@lock l exprMacro version of lock(f, l::AbstractLock) but with expr instead of f function. Expands to:
lock(l)
try
    expr
finally
    unlock(l)
endThis is similar to using lock with a do block, but avoids creating a closure and thus can improve the performance.
Base.Lockable — TypeLockable(value, lock = ReentrantLock())Creates a Lockable object that wraps value and associates it with the provided lock. This object supports @lock, lock, trylock, unlock. To access the value, index the lockable object while holding the lock.
Example
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # must hold the lock to access the value
1-element Vector{Int64}:
 1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"Channels
Base.AbstractChannel — TypeAbstractChannel{T}Representation of a channel passing objects of type T.
Base.Channel — TypeChannel{T=Any}(size::Int=0)Constructs a Channel with an internal buffer that can hold a maximum of size objects of type T. put! calls on a full channel block until an object is removed with take!.
Channel(0) constructs an unbuffered channel. put! blocks until a matching take! is called. And vice-versa.
Other constructors:
- Channel(): default constructor, equivalent to- Channel{Any}(0)
- Channel(Inf): equivalent to- Channel{Any}(typemax(Int))
- Channel(sz): equivalent to- Channel{Any}(sz)
Base.Channel — MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)Create a new task from func, bind it to a new channel of type T and size size, and schedule the task, all in a single call. The channel is automatically closed when the task terminates.
func must accept the bound channel as its only argument.
If you need a reference to the created task, pass a Ref{Task} object via the keyword argument taskref.
If spawn=true, the Task created for func may be scheduled on another thread in parallel, equivalent to creating a task via Threads.@spawn.
If spawn=true and the threadpool argument is not set, it defaults to :default.
If the threadpool argument is set (to :default or :interactive), this implies that spawn=true and the new Task is spawned to the specified threadpool.
Return a Channel.
Examples
julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4Referencing the created task:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
trueThe spawn= parameter was added in Julia 1.3. This constructor was added in Julia 1.3. In earlier versions of Julia, Channel used keyword arguments to set size and T, but those constructors are deprecated.
julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end;
julia> String(collect(chnl))
"hello world"Base.put! — Methodput!(c::Channel, v)Append an item v to the channel c. Blocks if the channel is full.
For unbuffered channels, blocks until a take! is performed by a different task.
v now gets converted to the channel's type with convert as put! is called.
Base.take! — Methodtake!(c::Channel)Removes and returns a value from a Channel in order. Blocks until data is available. For unbuffered channels, blocks until a put! is performed by a different task.
Examples
Buffered channel
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1Unbuffered channel
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1Base.isfull — Methodisfull(c::Channel)Determines if a Channel is full, in the sense that calling put!(c, some_value) would have blocked. Returns immediately, does not block.
Note that it may frequently be the case that put! will not block after this returns true. Users must take precautions not to accidentally create live-lock bugs in their code by calling this method, as these are generally harder to debug than deadlocks. It is also possible that put! will block after this call returns false, if there are multiple producer tasks calling put! in parallel.
Examples
Buffered channel
julia> c = Channel(1); # capacity = 1
julia> isfull(c)
false
julia> put!(c, 1);
julia> isfull(c)
trueUnbuffered channel
julia> c = Channel(); # capacity = 0
julia> isfull(c) # unbuffered channel is always full
trueBase.isready — Methodisready(c::Channel)Determines whether a Channel has a value stored in it. Returns immediately, does not block.
For unbuffered channels, return true if there are tasks waiting on a put!.
Examples
Buffered channel
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
trueUnbuffered channel
julia> c = Channel();
julia> isready(c)  # no tasks waiting to put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);  # schedule a put! task
julia> isready(c)
trueBase.isopen — Methodisopen(c::Channel)Determines whether a Channel is open for new put! operations. Notice that a Channel can be closed and still have buffered elements which can be consumed with take!.
Examples
Buffered channel with task
julia> c = Channel(ch -> put!(ch, 1), 1);
julia> isopen(c) # The channel is closed to new `put!`s
false
julia> isready(c) # The channel is closed but still contains elements
true
julia> take!(c)
1
julia> isready(c)
falseUnbuffered channel
julia> c = Channel{Int}();
julia> isopen(c)
true
julia> close(c)
julia> isopen(c)
falseBase.fetch — Methodfetch(c::Channel)Waits for and returns (without removing) the first available item from the Channel. Note: fetch is unsupported on an unbuffered (0-size) Channel.
Examples
Buffered channel
julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;
julia> fetch(c)
1
julia> collect(c)  # item is not removed
3-element Vector{Any}:
 1
 2
 3Base.close — Methodclose(c::Channel[, excp::Exception])Close a channel. An exception (optionally given by excp), is thrown by:
Base.bind — Methodbind(chnl::Channel, task::Task)Associate the lifetime of chnl with a task. Channel chnl is automatically closed when the task terminates. Any uncaught exception in the task is propagated to all waiters on chnl.
The chnl object can be explicitly closed independent of task termination. Terminating tasks have no effect on already closed Channel objects.
When a channel is bound to multiple tasks, the first task to terminate will close the channel. When multiple channels are bound to the same task, termination of the task will close all of the bound channels.
Examples
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
falsejulia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]Low-level synchronization using schedule and wait
The easiest correct use of schedule is on a Task that is not started (scheduled) yet. However, it is possible to use schedule and wait as a very low-level building block for constructing synchronization interfaces. A crucial pre-condition of calling schedule(task) is that the caller must "own" the task; i.e., it must know that the call to wait in the given task is happening at the locations known to the code calling schedule(task). One strategy for ensuring such pre-condition is to use atomics, as demonstrated in the following example:
@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end
mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end
function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
        # `wait()` immediately. In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end
ev = OneWayEvent()
@sync begin
    Threads.@spawn begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end
# output
notifying...
doneOneWayEvent lets one task to wait for another task's notify. It is a limited communication interface since wait can only be used once from a single task (note the non-atomic assignment of ev.task)
In this example, notify(ev::OneWayEvent) is allowed to call schedule(ev.task) if and only if it modifies the state from OWE_WAITING to OWE_NOTIFYING. This lets us know that the task executing wait(ev::OneWayEvent) is now in the ok branch and that there cannot be other tasks that try to schedule(ev.task) since their @atomicreplace(ev.state, state => OWE_NOTIFYING) will fail.