Documentation Index
Fetch the complete documentation index at: https://mintlify.com/effect-TS/effect-smol/llms.txt
Use this file to discover all available pages before exploring further.
The Worker module provides abstractions for spawning and communicating with worker threads in a type-safe, platform-agnostic way. It supports bidirectional communication with automatic message handling and error propagation.
Overview
Worker features:
- Type-safe message passing between main and worker threads
- Bidirectional communication (send and receive)
- Automatic error handling and propagation
- Scope-based lifecycle management
- Platform-agnostic abstractions
- Support for transferable objects
Basic Concepts
The Worker system consists of:
- Spawner: Factory for creating worker instances
- Worker: Handle for communicating with a worker thread
- WorkerPlatform: Platform-specific worker implementation
- Message Protocol: Type-safe message passing
Creating Workers
Define a Spawner
First, define how workers should be spawned:
import { Worker, Layer } from "effect"
// For Node.js
import { Worker as NodeWorker } from "worker_threads"
const SpawnerLive = Worker.layerSpawner(
(id: number) => new NodeWorker("./worker.js", { workerData: { id } })
)
Spawn a Worker
import { Effect, Worker } from "effect"
const program = Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
// Spawn worker with ID 0
const worker = yield* platform.spawn(0)
// Use worker...
})
Message Passing
Sending Messages
Send messages from the main thread to the worker:
import { Effect, Worker } from "effect"
interface WorkerInput {
readonly task: string
readonly data: unknown
}
const program = Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn<unknown, WorkerInput>(0)
// Send a message
yield* worker.send({
task: "process",
data: { value: 42 }
})
// Send with transferables (for ArrayBuffer, MessagePort, etc.)
const buffer = new ArrayBuffer(1024)
yield* worker.send(
{ task: "process-buffer", buffer },
[buffer] // Transferred, not copied
)
})
Receiving Messages
Handle incoming messages in the worker using run:
import { Console, Effect, Worker } from "effect"
interface WorkerOutput {
readonly type: "result" | "progress"
readonly data: unknown
}
const program = Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn<WorkerOutput, unknown>(0)
// Run message handler
yield* worker.run((message) =>
Effect.gen(function*() {
if (message.type === "result") {
yield* Console.log("Got result:", message.data)
} else if (message.type === "progress") {
yield* Console.log("Progress:", message.data)
}
})
)
})
Worker Implementation
Inside the worker file, use WorkerRunner to handle messages:
// worker.ts
import { Console, Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"
interface WorkerInput {
readonly task: string
readonly data: unknown
}
interface WorkerOutput {
readonly type: "result" | "progress"
readonly data: unknown
}
// Define message handler
const handler = (message: WorkerInput) =>
Effect.gen(function*() {
yield* Console.log(`Processing task: ${message.task}`)
// Send progress update
yield* WorkerRunner.send<WorkerOutput>({
type: "progress",
data: { percent: 50 }
})
// Process the task
const result = yield* processTask(message)
// Send result
yield* WorkerRunner.send<WorkerOutput>({
type: "result",
data: result
})
})
// Run the worker
WorkerRunner.run(handler)
Bidirectional Communication
Workers support full bidirectional message passing:
// Main thread
import { Effect, Worker } from "effect"
const mainProgram = Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn<WorkerToMain, MainToWorker>(0)
// Start message handler in background
yield* Effect.forkDaemon(
worker.run((message) =>
Effect.gen(function*() {
yield* Console.log("Worker says:", message)
// Respond to worker
yield* worker.send({ response: "acknowledged" })
})
)
)
// Send initial message
yield* worker.send({ command: "start" })
})
// Worker thread
import { Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"
const handler = (message: MainToWorker) =>
Effect.gen(function*() {
if (message.command === "start") {
// Send message to main
yield* WorkerRunner.send({ status: "started" })
// Wait for response (handled by main's worker.run)
}
})
WorkerRunner.run(handler)
Spawn Callbacks
Execute code when the worker spawns:
import { Console, Effect, Worker } from "effect"
const program = Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn(0)
yield* worker.run(
(message) => processMessage(message),
{
onSpawn: Console.log("Worker spawned and ready!")
}
)
})
Error Handling
Errors in workers are propagated as WorkerError:
import { Effect, Worker } from "effect"
const program = Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn(0)
yield* worker.send({ task: "dangerous" }).pipe(
Effect.catchTag("WorkerError", (error) =>
Effect.gen(function*() {
if (error.reason._tag === "WorkerSendError") {
yield* Effect.log("Failed to send message:", error.reason.message)
}
})
)
)
})
WorkerError Types
WorkerSendError: Failed to send message to worker
- Other platform-specific errors
Complete Example
Main Thread
// main.ts
import { Console, Effect, Layer } from "effect"
import { Worker as NodeWorker } from "worker_threads"
import { Worker, WorkerPlatform } from "effect/unstable/workers"
interface TaskInput {
readonly task: "compute" | "process"
readonly value: number
}
interface TaskOutput {
readonly type: "progress" | "result"
readonly value: number
}
// Create spawner layer
const SpawnerLive = Worker.layerSpawner(
(id: number) => new NodeWorker("./worker.js", { workerData: { id } })
)
// Create platform layer
const PlatformLive = Layer.effect(
WorkerPlatform,
Effect.map(
Worker.Spawner,
(spawner) => Worker.makePlatform(/* ... platform-specific setup ... */)
)
).pipe(Layer.provide(SpawnerLive))
// Main program
const program = Effect.gen(function*() {
const platform = yield* WorkerPlatform
const worker = yield* platform.spawn<TaskOutput, TaskInput>(0)
// Handle worker messages
yield* Effect.forkDaemon(
worker.run((message) =>
Effect.gen(function*() {
if (message.type === "progress") {
yield* Console.log(`Progress: ${message.value}%`)
} else if (message.type === "result") {
yield* Console.log(`Result: ${message.value}`)
}
})
)
)
// Send tasks to worker
yield* worker.send({ task: "compute", value: 42 })
yield* worker.send({ task: "process", value: 100 })
// Wait for completion
yield* Effect.sleep("5 seconds")
}).pipe(Effect.provide(PlatformLive))
Effect.runPromise(program)
Worker Thread
// worker.ts
import { Console, Effect } from "effect"
import { WorkerRunner } from "effect/unstable/workers"
interface TaskInput {
readonly task: "compute" | "process"
readonly value: number
}
interface TaskOutput {
readonly type: "progress" | "result"
readonly value: number
}
const handler = (message: TaskInput) =>
Effect.gen(function*() {
yield* Console.log(`Worker received task: ${message.task}`)
// Report progress
yield* WorkerRunner.send<TaskOutput>({
type: "progress",
value: 50
})
// Simulate work
yield* Effect.sleep("1 second")
// Calculate result
const result = message.task === "compute"
? message.value * 2
: message.value + 10
// Send result
yield* WorkerRunner.send<TaskOutput>({
type: "result",
value: result
})
})
// Start the worker
WorkerRunner.run(handler)
Node.js Worker Threads
import { Worker as NodeWorker } from "worker_threads"
import { Effect, Layer, Worker } from "effect"
import { NodeWorkerPlatform } from "@effect/platform-node"
const SpawnerLive = Worker.layerSpawner(
(id: number) => new NodeWorker("./worker.js", {
workerData: { id },
env: process.env
})
)
const PlatformLive = NodeWorkerPlatform.layer.pipe(
Layer.provide(SpawnerLive)
)
Web Workers (Browser)
import { Effect, Layer, Worker } from "effect"
import { BrowserWorkerPlatform } from "@effect/platform-browser"
const SpawnerLive = Worker.layerSpawner(
(id: number) => new Worker("./worker.js", { name: `worker-${id}` })
)
const PlatformLive = BrowserWorkerPlatform.layer.pipe(
Layer.provide(SpawnerLive)
)
Advanced Patterns
Worker Pool
Create a pool of workers for parallel processing:
import { Array, Effect, Worker } from "effect"
const createWorkerPool = (size: number) =>
Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
// Spawn multiple workers
const workers = yield* Effect.all(
Array.range(0, size - 1).map((id) => platform.spawn(id))
)
return workers
})
const program = Effect.gen(function*() {
const pool = yield* createWorkerPool(4)
// Distribute work across pool
const tasks = Array.range(0, 100)
yield* Effect.all(
tasks.map((task, index) => {
const worker = pool[index % pool.length]
return worker.send({ task })
}),
{ concurrency: "unbounded" }
)
})
Worker with Scope
Manage worker lifecycle with scopes:
import { Effect, Scope, Worker } from "effect"
const program = Effect.scoped(
Effect.gen(function*() {
const platform = yield* Worker.WorkerPlatform
const worker = yield* platform.spawn(0)
// Worker automatically cleaned up when scope exits
yield* worker.send({ task: "process" })
// Process results...
})
)
Type Reference
Worker
interface Worker<O = unknown, I = unknown> {
readonly send: (
message: I,
transfers?: ReadonlyArray<unknown>
) => Effect.Effect<void, WorkerError>
readonly run: <A, E, R>(
handler: (message: O) => Effect.Effect<A, E, R>,
options?: {
readonly onSpawn?: Effect.Effect<void> | undefined
}
) => Effect.Effect<never, E | WorkerError, R>
}
class WorkerPlatform extends Service<WorkerPlatform, {
readonly spawn: <O = unknown, I = unknown>(
id: number
) => Effect.Effect<Worker<O, I>, WorkerError, Spawner>
}> {}
Spawner
interface Spawner {
readonly _: unique symbol
}
interface SpawnerFn<W = unknown> {
(id: number): W
}
Message Types
type PlatformMessage =
| readonly [ready: 0]
| readonly [data: 1, unknown]
See Also
- Effect - Core Effect operations
- Fiber - Lightweight concurrency
- Scope - Resource management
- Layer - Dependency injection