ilusm.dev

conc

Concurrency primitives - spawn goroutines, buffered channels, channel select, WaitGroup, mutex, RWMutex, atomic integers, run-once, cancellation contexts, worker pools, futures/promises, channel pipelines, fan-out/fan-in, barriers, and semaphores.

Load with: use conc

What this module does

conc exposes the full concurrency model of ilusm. It is inspired by Go's concurrency primitives and maps directly to host-native implementations. Goroutine-like tasks are spawned with consp, communicate via buffered channels (chmk/chsnd/chrcv), and are synchronised with WaitGroups, mutexes, or atomic operations.

Higher-level abstractions - worker pools, futures, channel pipelines, fan-out, fan-in, barriers, and semaphores - are built on top of these primitives entirely within ilusm. All functions are also available through the conc namespace object.

Quick example

use conc

# Spawn a concurrent task
ch = chmk(10)
consp(\() chsnd(ch, "done"))
prn(chrcv(ch, 1000))

# Worker pool
pool = conpo(4)
conpos(pool, \(job) job * 2)
conpoj(pool, 21)
prn(conpor(pool, 1000))  # 42
conpoc(pool)

# Future/promise
fut = confu()
consp(\() confur(fut, 99))
prn(confug(fut, 1000))  # 99

# Pipeline: double → add-1
pipe = conpi([\(x) x * 2, \(x) x + 1])
chsnd(pipe.input, 5)
prn(chrcv(pipe.output, 1000))  # 11

Functions

Spawn

consp(fn)

Spawns fn as a concurrent task (goroutine). Returns immediately.

consp1(fn, max)

Spawns fn with a semaphore limiting concurrency to max simultaneous tasks.

Channels

chmk(size)

Creates a buffered channel with capacity size. Use 0 for unbuffered.

chsnd(ch, value)

Sends a value into the channel.

chrcv(ch, timeout_ms)

Receives a value from the channel. Blocks up to timeout_ms milliseconds. Returns nil on timeout or if the channel is closed.

chcls(ch)

Closes the channel. Subsequent receives return nil.

chlen(ch) / chcap(ch)

Returns the current number of buffered values / the channel capacity.

Select

conse(cases, default_fn)

Non-deterministically selects the first ready channel case. cases is a list of {ch, fn} receive cases. default_fn is called if no case is ready.

WaitGroup

conwg()

Creates a new WaitGroup.

conwga(wg, delta)

Increments the WaitGroup counter by delta.

conwgd(wg)

Decrements the WaitGroup counter by 1 (signals one task is done).

conwgw(wg, timeout_ms)

Blocks until the counter reaches 0 or the timeout expires.

Mutex and RWMutex

conmu() / conlk(mu) / conul(mu) / contr(mu, timeout_ms)

Create, lock, unlock, and try-lock an exclusive mutex.

conrw() / conrl(rw) / conru(rw) / conwl(rw) / conwu(rw)

Create and operate a reader-writer mutex: read-lock, read-unlock, write-lock, write-unlock.

Atomics

conat()

Creates an atomic integer initialised to 0.

conatg(a)

Reads the atomic value.

conata(a, delta)

Atomically adds delta and returns the new value.

conatc(a, old, new)

Compare-and-swap: sets to new only if current value equals old. Returns tru on success.

Once

conon()

Creates a run-once guard.

conce(once, fn)

Calls fn exactly once, even if called from multiple goroutines concurrently.

Contexts

conctx()

Creates a background (root) context.

conctxc(parent)

Creates a cancellable child context.

conctxt(parent, ms)

Creates a context that auto-cancels after ms milliseconds.

conctxd(parent, ms)

Creates a context with a deadline at ms milliseconds from now.

concc(ctx)

Cancels a context.

concd(ctx)

Returns a channel that is closed when the context is cancelled.

Worker pool

conpo(workers)

Creates a worker pool with workers goroutines, a job channel (capacity 1000), and a results channel.

conpos(pool, job_fn)

Starts all worker goroutines. Each worker calls job_fn(job) and sends the result to the results channel.

conpoj(pool, job)

Submits a job to the pool.

conpor(pool, timeout_ms)

Receives the next result from the pool.

conpoc(pool)

Signals workers to stop, waits for all to finish, closes channels.

Futures / promises

confu()

Creates a future - a one-shot promise backed by an unbuffered channel and an atomic resolved flag.

confur(future, value)

Resolves the future with a value (CAS ensures it can only be resolved once). Returns tru if successful.

confug(future, timeout_ms)

Awaits the future's value. Blocks up to timeout_ms ms.

Pipeline, fan-out, fan-in

conpi(stages)

Creates a channel pipeline from a list of transform functions. Each stage reads from one channel and writes to the next. Returns {input, output}.

confa(input_ch, workers, work_fn)

Fan-out: spawns workers goroutines each pulling from input_ch, applying work_fn, and writing to their own output channel. Returns the list of output channels.

confai(channels)

Fan-in: merges multiple channels into a single output channel.

Barrier and semaphore

conba(count) / conbaw(barrier)

Creates a barrier for count goroutines and blocks until all have arrived.

consem(n) / consema(sem) / consemr(sem)

Creates a counting semaphore with capacity n, acquires (decrements), and releases (increments).

Notes

  • All primitives delegate to __conc_* host natives.
  • All functions are accessible via the conc namespace object with shorter names (e.g. conc.spawn, conc.chan, conc.send, etc.).
  • Requires trl and txt.