Language

Concurrency

Vary provides structured concurrency primitives for running work in parallel. Tasks are spawned with spawn, grouped with task_group, and cancelled cooperatively with CancellationToken.

Spawn

spawn runs a function call concurrently and returns a Task[T] handle:

def expensive(n: Int) -> Int {
    return n * n
}

let task1 = spawn expensive(10)
let task2 = spawn expensive(20)
let r1 = task1.join()
let r2 = task2.join()
print(r1 + r2)              # 500

Task[T] methods: join() (waits and returns the result), isDone() (checks completion).

Exceptions propagate through join(). If the spawned function throws, the exception is re-thrown when you join.

spawn requires a function call expression. You cannot spawn an arbitrary block of code.

Task groups

task_group creates a scope that manages concurrent tasks. All tasks spawned inside the block are awaited when the block exits:

let results = task_group {
    let a = spawn compute(1)
    let b = spawn compute(2)
    let c = spawn compute(3)
    [a.join(), b.join(), c.join()]
}

If any task fails, remaining tasks are cancelled and the error propagates.

Timeouts

Pass a timeout in milliseconds to limit how long the group runs:

let result = task_group(5000) {
    let t = spawn slow_operation()
    t.join()
}

If the timeout expires, remaining tasks are cancelled and a TimeoutError is raised. Catch it with try/except:

try {
    task_group(1000) {
        spawn very_slow()
    }
} except TimeoutError as e {
    print("Timed out")
}

Cancellation

CancellationToken enables cooperative cancellation. Task groups automatically create a token that propagates to child tasks:

def interruptible_work() -> Int {
    let token = CancellationToken.current()
    mut total = 0
    for i in range(1000000) {
        if token is not None and token.is_cancelled() {
            return total
        }
        total = total + i
    }
    return total
}

task_group(100) {
    spawn interruptible_work()
}

Cancellation is cooperative: code must check the token periodically. There is no forced thread interruption.

Standalone tokens

Create tokens outside task groups for manual cancellation:

let token = CancellationToken.create()
let task = spawn long_running(token)
# ... later
token.cancel()
let result = task.join()

CancellationToken methods: is_cancelled() (Bool), cancel() (None).

CancellationToken.current() returns CancellationToken? (None outside a task group).

Deterministic testing

For testing concurrent code deterministically, use set_test_sequential() to run all spawned tasks on the current thread:

test "concurrent sum" {
    set_test_sequential(True)
    let result = task_group {
        let a = spawn compute(1)
        let b = spawn compute(2)
        a.join() + b.join()
    }
    observe result == 5
}

Clock and RNG

Typed abstractions for injecting deterministic time and randomness:

# Real clock (production)
let clock = Clock.real()
print(clock.now_ms())

# Fixed clock (testing)
let test_clock = Clock.fixed(1000)
print(test_clock.now_ms())       # always 1000

# Seeded RNG (deterministic)
let rng = Rng.seeded(42)
print(rng.next_int(1, 100))     # same result every run

# System RNG (non-deterministic)
let sys_rng = Rng.system()
print(sys_rng.next_int(1, 100))

These types are effects: functions using Clock.real() or Rng.system() are classified as having TIME or RANDOM effects by the check engine.

Effect classification

Concurrency operations produce a CONCURRENCY effect. The check engine tracks this alongside other effects (IO, NETWORK, FILESYSTEM, PROCESS, STATE, TIME, RANDOM). Functions with too many distinct effects trigger the VCA001 broad-effects rule. Projects can deny specific effects per module via vary.toml:

[check.effects]
deny = ["CONCURRENCY"]

See Configuration for details.

← Modules and imports
Contracts →