Vary provides structured concurrency primitives for running work in parallel. Tasks are spawned with spawn, grouped with task_group, and cancelled cooperatively with CancellationToken.
spawn runs a function call concurrently and returns a Task[T] handle:
def expensive(n: Int) -> Int {
return n * n
}
let task1 = spawn expensive(10)
let task2 = spawn expensive(20)
let r1 = task1.join()
let r2 = task2.join()
print(r1 + r2) # 500
Task[T] methods: join() (waits and returns the result), isDone() (checks completion).
Exceptions propagate through join(). If the spawned function throws, the exception is re-thrown when you join.
spawn requires a function call expression. You cannot spawn an arbitrary block of code.
task_group creates a scope that manages concurrent tasks. All tasks spawned inside the block are awaited when the block exits:
let results = task_group {
let a = spawn compute(1)
let b = spawn compute(2)
let c = spawn compute(3)
[a.join(), b.join(), c.join()]
}
If any task fails, remaining tasks are cancelled and the error propagates.
Pass a timeout in milliseconds to limit how long the group runs:
let result = task_group(5000) {
let t = spawn slow_operation()
t.join()
}
If the timeout expires, remaining tasks are cancelled and a TimeoutError is raised. Catch it with try/except:
try {
task_group(1000) {
spawn very_slow()
}
} except TimeoutError as e {
print("Timed out")
}
CancellationToken enables cooperative cancellation. Task groups automatically create a token that propagates to child tasks:
def interruptible_work() -> Int {
let token = CancellationToken.current()
mut total = 0
for i in range(1000000) {
if token is not None and token.is_cancelled() {
return total
}
total = total + i
}
return total
}
task_group(100) {
spawn interruptible_work()
}
Cancellation is cooperative: code must check the token periodically. There is no forced thread interruption.
Create tokens outside task groups for manual cancellation:
let token = CancellationToken.create()
let task = spawn long_running(token)
# ... later
token.cancel()
let result = task.join()
CancellationToken methods: is_cancelled() (Bool), cancel() (None).
CancellationToken.current() returns CancellationToken? (None outside a task group).
For testing concurrent code deterministically, use set_test_sequential() to run all spawned tasks on the current thread:
test "concurrent sum" {
set_test_sequential(True)
let result = task_group {
let a = spawn compute(1)
let b = spawn compute(2)
a.join() + b.join()
}
observe result == 5
}
Typed abstractions for injecting deterministic time and randomness:
# Real clock (production)
let clock = Clock.real()
print(clock.now_ms())
# Fixed clock (testing)
let test_clock = Clock.fixed(1000)
print(test_clock.now_ms()) # always 1000
# Seeded RNG (deterministic)
let rng = Rng.seeded(42)
print(rng.next_int(1, 100)) # same result every run
# System RNG (non-deterministic)
let sys_rng = Rng.system()
print(sys_rng.next_int(1, 100))
These types are effects: functions using Clock.real() or Rng.system() are classified as having TIME or RANDOM effects by the check engine.
Concurrency operations produce a CONCURRENCY effect. The check engine tracks this alongside other effects (IO, NETWORK, FILESYSTEM, PROCESS, STATE, TIME, RANDOM). Functions with too many distinct effects trigger the VCA001 broad-effects rule. Projects can deny specific effects per module via vary.toml:
[check.effects]
deny = ["CONCURRENCY"]
See Configuration for details.