ilusm.dev

syn

Concurrency primitives - create buffered or unbuffered channels with send/receive/close methods, spawn concurrent tasks, wait for completion, run a list of functions in parallel, pipe a channel to another, tee one channel to two, merge-receive from multiple channels with timeout, fan-in multiple sources to one destination, fan-out one source to multiple destinations, and create a fixed-size worker pool.

Load with: use syn

What this module does

syn wraps the host's concurrency syscalls (__sys_spawn, __sys_wait, __sys_chan_*) in a high-level Go-inspired API. Channels are first-class objects with .send, .recv, .trcv (timed receive), and .close. Tasks are lightweight concurrent units returned by synsp/synru with a .wai() method.

The higher-level combinators (synpu, synte, synfi, synfo, synpl) make it easy to compose pipeline and worker-pool patterns without manual loop management.

Quick example

use syn

# Channels
ch = synch(nil)     # unbuffered channel
ch2 = synch(10)     # buffered channel (capacity 10)

# Spawn a producer
t = synsp(\()
    ch.send("hello")
    ch.send("world")
    ch.close()
)

# Receive
prn(ch.recv())  # "hello"
prn(ch.recv())  # "world"

# Run + wait
task = synru(\() compute())
task.wai()

# Parallel execution
tasks = synpa([\() work1(), \() work2(), \() work3()])
synpw(tasks)  # wait for all

# Worker pool
pool = synpl(4, \(item) item * 2, nil, nil)
pool.jobs.send(10)
pool.jobs.send(20)
prn(pool.out.recv())  # 20
prn(pool.out.recv())  # 40

Functions

Channels

synch(cap) / syn.chan(cap)

Creates a channel. cap=nil or 0 for unbuffered, integer for buffered. Returns an object with .snd(v), .rcv(), .trcv(ms) (timed receive), and .cls().

Tasks

synsp(fn) / syn.spawn(fn)

Spawns fn as a concurrent task. Returns a raw task handle.

synwa(task) / syn.wait(task)

Waits for a raw task handle to complete.

synru(fn) / syn.run(fn)

Spawns fn and returns {wai: fn} - a task object with a .wai() method.

Parallel execution

synpa(fns) / syn.par(fns)

Spawns all functions in the list concurrently. Returns a list of task objects.

synpw(tasks) / syn.pwait(tasks)

Waits for all task objects in the list to complete.

Channel combinators

synpu(src, dst) / syn.pump(src, dst)

Continuously forwards every value received from src to dst in a background task.

synte(src, a, b) / syn.tee(src, a, b)

Forwards each value from src to both a and b in a background task.

synmr(chs, timeout_ms) / syn.mrecv(chs, ms)

Polls all channels with a timed receive and returns the first available value as {i, v} where i is the channel index.

synfi(srcs, dst) / syn.fanin(srcs, dst)

Fans multiple source channels into one destination channel, spawning one forwarder per source.

synfo(src, dsts) / syn.fanout(src, dsts)

Broadcasts each value from src to all channels in dsts.

synpl(n, fn, buf_in, buf_out) / syn.pool(n, fn, bi, bo)

Creates a worker pool with n workers each running fn(item). Returns {jobs, out, workers} - send to jobs, receive results from out.

Notes

  • Requires host to inject __sys_spawn, __sys_wait, __sys_chan_new, __sys_chan_send, __sys_chan_recv, __sys_chan_close, and __sys_sleep_ms.
  • Requires trl.