Alpha. Vary is under active development and not ready for production use. Syntax, APIs, performance, and behaviour may change between releases.
Concurrency
Vary provides structured concurrency primitives for running work in parallel. Tasks are spawned with spawn, grouped with task_group, and cancelled cooperatively with CancellationToken.
Spawn
spawn runs a function call concurrently and returns a Task[T] handle:
def expensive(n: Int) -> Int {
return n * n
}
let task1 = spawn expensive(10)
let task2 = spawn expensive(20)
let r1 = task1.join()
let r2 = task2.join()
print(r1 + r2) # 500
Task[T] methods: join() (waits and returns the result), isDone() (checks completion).
Exceptions propagate through join(). If the spawned function throws, the exception is re-thrown when you join.
spawn requires a function call expression. You cannot spawn an arbitrary block of code.
Task groups
task_group creates a scope that manages concurrent tasks. All tasks spawned inside the block are awaited when the block exits:
let results = task_group {
let a = spawn compute(1)
let b = spawn compute(2)
let c = spawn compute(3)
[a.join(), b.join(), c.join()]
}
If any task fails, remaining tasks are cancelled and the error propagates.
Timeouts
Pass a timeout in milliseconds to limit how long the group runs:
let result = task_group(5000) {
let t = spawn slow_operation()
t.join()
}
If the timeout expires, remaining tasks are cancelled and a TimeoutError is raised. Catch it with try/except:
try {
task_group(1000) {
spawn very_slow()
}
} except TimeoutError as e {
print("Timed out")
}
Cancellation
CancellationToken enables cooperative cancellation. Task groups automatically create a token that propagates to child tasks:
def interruptible_work() -> Int {
let token = CancellationToken.current()
mut total = 0
for i in range(1000000) {
if token is not None and token.is_cancelled() {
return total
}
total = total + i
}
return total
}
task_group(100) {
spawn interruptible_work()
}
Cancellation is cooperative: code must check the token periodically. There is no forced thread interruption.
Standalone tokens
Create tokens outside task groups for manual cancellation:
let token = CancellationToken.create()
let task = spawn long_running(token)
# ... later
token.cancel()
let result = task.join()
CancellationToken methods: is_cancelled() (Bool), cancel() (None).
CancellationToken.current() returns CancellationToken? (None outside a task group).
Deterministic testing
For testing concurrent code deterministically, use set_test_sequential() to run all spawned tasks on the current thread:
test "concurrent sum" {
set_test_sequential(True)
let result = task_group {
let a = spawn compute(1)
let b = spawn compute(2)
a.join() + b.join()
}
observe result == 5
}
Clock and RNG
Typed abstractions for injecting deterministic time and randomness:
# Real clock (production)
let clock = Clock.real()
print(clock.now_ms())
# Fixed clock (testing)
let test_clock = Clock.fixed(1000)
print(test_clock.now_ms()) # always 1000
# Seeded RNG (deterministic)
let rng = Rng.seeded(42)
print(rng.next_int(1, 100)) # same result every run
# System RNG (non-deterministic)
let sys_rng = Rng.system()
print(sys_rng.next_int(1, 100))
These types are effects: functions using Clock.real() or Rng.system() are classified as having TIME or RANDOM effects by the check engine.
Effect classification
Concurrency operations produce a CONCURRENCY effect. The check engine tracks this alongside other effects (IO, NETWORK, FILESYSTEM, PROCESS, STATE, TIME, RANDOM). Functions with too many distinct effects trigger the VCA001 broad-effects rule. Projects can deny specific effects per module via vary.toml:
[check.effects]
deny = ["CONCURRENCY"]
See Configuration for details.