Documentation Index
Fetch the complete documentation index at: https://mintlify.com/effect-TS/effect-smol/llms.txt
Use this file to discover all available pages before exploring further.
The Cluster module is marked as unstable, meaning its APIs may change in minor version releases. Use caution when upgrading Effect versions.
Overview
The effect/unstable/cluster module provides primitives for building distributed applications with Effect. It implements sharding, entity management, message passing, and distributed coordination patterns for scalable, fault-tolerant systems.
Installation
Key Modules
Sharding
The core service for distributed entity management and message routing.
import { Effect } from "effect"
import { Sharding, Entity, Message } from "effect/unstable/cluster"
// Define an entity
const UserEntity = Entity.make("User", {
// Entity state and behavior
GetProfile: Message.make<{ userId: string }, { name: string; email: string }>()
})
// Use sharding to route messages
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
// Create a client for the entity
const client = yield* sharding.makeClient(UserEntity)
// Send a message to an entity
const profile = yield* client.send("user-123", UserEntity.GetProfile)
return profile
})
Key Functions:
makeClient(entity) - Create a client for sending messages to entities
getShardId(entityId, group) - Get the shard ID for an entity
hasShardId(shardId) - Check if this runner owns a shard
getSnowflake - Generate unique distributed IDs
isShutdown - Check if sharding is shutting down
Entity
Define stateful entities that can be distributed across a cluster.
import { Effect, Schema } from "effect"
import { Entity, Message, Reply } from "effect/unstable/cluster"
// Define entity messages
const Increment = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()
// Define entity state
interface CounterState {
value: number
}
// Create entity
const CounterEntity = Entity.make("Counter", {
Increment,
GetValue
}).pipe(
Entity.withState<CounterState>({ value: 0 }),
Entity.withHandlers({
Increment: (state, { amount }) =>
Effect.gen(function*() {
const newValue = state.value + amount
yield* Entity.setState({ value: newValue })
return Reply.success(newValue)
}),
GetValue: (state) =>
Effect.succeed(Reply.success(state.value))
})
)
Entity Concepts:
- EntityAddress - Unique address for routing messages to entities
- EntityId - Identifier for entity instances
- EntityType - Type definition for entities
- EntityProxy - Client-side proxy for entity communication
- EntityResource - Managed resource for entity lifecycle
Message & Reply
Type-safe message passing between entities.
import { Message, Reply } from "effect/unstable/cluster"
import { Schema } from "effect"
// Define a message with schema validation
const CreateUser = Message.make<
{ name: string; email: string },
{ id: string; createdAt: Date }
>()
// Define replies
const successReply = Reply.success({ id: "123", createdAt: new Date() })
const errorReply = Reply.error(new Error("User already exists"))
// Check reply type
if (Reply.isSuccess(successReply)) {
console.log(successReply.value)
}
Runners
Manage runner nodes that host entities in the cluster.
import { Effect, Layer } from "effect"
import {
Sharding,
Runner,
RunnerStorage,
HttpRunner
} from "effect/unstable/cluster"
// Create an HTTP-based runner
const runnerLayer = Layer.mergeAll(
HttpRunner.layer,
RunnerStorage.memory
)
// Start the runner
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
const runner = yield* Runner.Runner
// Register entities with the runner
yield* runner.register(CounterEntity)
// Runner will now host Counter entities
yield* Effect.never // Keep running
}).pipe(
Effect.provide(runnerLayer)
)
Runner Types:
- HttpRunner - HTTP-based runner communication
- SocketRunner - WebSocket-based runner communication
- SingleRunner - Single-node runner for development
- TestRunner - In-memory runner for testing
MessageStorage
Persistent message storage for reliable delivery.
import { MessageStorage, SqlMessageStorage } from "effect/unstable/cluster"
// SQL-based message storage
const messageStorageLayer = SqlMessageStorage.layer
// Use with sharding
const program = Effect.provide(shardingProgram, messageStorageLayer)
RunnerStorage
Store runner registration and health information.
import { RunnerStorage, SqlRunnerStorage } from "effect/unstable/cluster"
// SQL-based runner storage
const runnerStorageLayer = SqlRunnerStorage.layer
Singleton
Manage singleton entities that exist exactly once in the cluster.
import { Effect } from "effect"
import { Singleton, SingletonAddress } from "effect/unstable/cluster"
// Define a singleton service
const CronScheduler = Singleton.make("CronScheduler", {
schedule: (job: string) => Effect.log(`Scheduling ${job}`),
cancel: (jobId: string) => Effect.log(`Canceling ${jobId}`)
})
// Access the singleton
const program = Effect.gen(function*() {
const scheduler = yield* CronScheduler
yield* scheduler.schedule("daily-backup")
})
ClusterCron
Schedule distributed cron jobs across the cluster.
import { Effect, Schedule } from "effect"
import { ClusterCron } from "effect/unstable/cluster"
// Schedule a recurring task
const cronJob = ClusterCron.make(
"backup-job",
Schedule.fixed("1 hour"),
Effect.log("Running backup...")
)
const program = Effect.gen(function*() {
yield* ClusterCron.schedule(cronJob)
})
ShardingConfig
Configure sharding behavior.
import { ShardingConfig } from "effect/unstable/cluster"
const config = ShardingConfig.make({
numberOfShards: 256,
rebalanceInterval: Duration.minutes(5),
entityTimeout: Duration.minutes(10),
entityMaxIdle: Duration.minutes(2)
})
const shardingLayer = Sharding.layer.pipe(
Layer.provide(Layer.succeed(ShardingConfig.ShardingConfig, config))
)
Snowflake
Generate distributed unique IDs.
import { Effect } from "effect"
import { Snowflake } from "effect/unstable/cluster"
// Generate a Snowflake ID
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
const id = yield* sharding.getSnowflake
console.log(id.value) // Unique distributed ID
console.log(id.timestamp) // Timestamp component
console.log(id.machineId) // Machine ID component
console.log(id.sequence) // Sequence component
return id
})
ClusterError
Error types for cluster operations.
import { Match } from "effect"
import type { ClusterError } from "effect/unstable/cluster"
const handleClusterError = Match.type<ClusterError>().pipe(
Match.when(
{ _tag: "EntityNotAssignedToRunner" },
(err) => console.error(`Entity ${err.entityId} not on this runner`)
),
Match.when(
{ _tag: "MailboxFull" },
(err) => console.error(`Mailbox full for ${err.entityId}`)
),
Match.when(
{ _tag: "PersistenceError" },
(err) => console.error(`Persistence failed: ${err.message}`)
),
Match.orElse((err) => console.error(`Cluster error: ${err}`))
)
ClusterMetrics
Monitor cluster performance and health.
import { Effect } from "effect"
import { ClusterMetrics } from "effect/unstable/cluster"
const program = Effect.gen(function*() {
const metrics = yield* ClusterMetrics.ClusterMetrics
// Get cluster metrics
const entityCount = yield* metrics.entityCount
const messageRate = yield* metrics.messageRate
const activeShards = yield* metrics.activeShards
return { entityCount, messageRate, activeShards }
})
ClusterWorkflowEngine
Execute distributed workflows across the cluster.
import { Effect } from "effect"
import { ClusterWorkflowEngine } from "effect/unstable/cluster"
// Define a workflow
const orderWorkflow = ClusterWorkflowEngine.defineWorkflow(
"order-processing",
Effect.gen(function*() {
yield* Effect.log("Step 1: Validate order")
yield* Effect.sleep("1 second")
yield* Effect.log("Step 2: Charge payment")
yield* Effect.sleep("1 second")
yield* Effect.log("Step 3: Ship order")
yield* Effect.sleep("1 second")
return { status: "completed" }
})
)
// Execute workflow
const program = Effect.gen(function*() {
const engine = yield* ClusterWorkflowEngine.ClusterWorkflowEngine
const result = yield* engine.execute(orderWorkflow, { orderId: "123" })
return result
})
Complete Example
Here’s a complete distributed counter application:
import { Effect, Layer, Schedule } from "effect"
import {
Entity,
Message,
Reply,
Sharding,
HttpRunner,
RunnerStorage,
MessageStorage
} from "effect/unstable/cluster"
// Define messages
const Increment = Message.make<{ amount: number }, number>()
const Decrement = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()
// Define entity
const CounterEntity = Entity.make("Counter", {
Increment,
Decrement,
GetValue
}).pipe(
Entity.withState<{ value: number }>({ value: 0 }),
Entity.withHandlers({
Increment: (state, { amount }) =>
Effect.gen(function*() {
const newValue = state.value + amount
yield* Entity.setState({ value: newValue })
return Reply.success(newValue)
}),
Decrement: (state, { amount }) =>
Effect.gen(function*() {
const newValue = state.value - amount
yield* Entity.setState({ value: newValue })
return Reply.success(newValue)
}),
GetValue: (state) =>
Effect.succeed(Reply.success(state.value))
})
)
// Setup cluster
const clusterLayer = Layer.mergeAll(
HttpRunner.layer,
RunnerStorage.memory,
MessageStorage.memory
)
// Application
const program = Effect.gen(function*() {
const sharding = yield* Sharding.Sharding
const client = yield* sharding.makeClient(CounterEntity)
// Increment counter
const value1 = yield* client.send("counter-1", { amount: 5 })
console.log(`After increment: ${value1}`)
// Decrement counter
const value2 = yield* client.send("counter-1", { amount: 2 })
console.log(`After decrement: ${value2}`)
// Get current value
const value3 = yield* client.send("counter-1", GetValue)
console.log(`Current value: ${value3}`)
return value3
}).pipe(
Effect.provide(clusterLayer)
)
// Run
Effect.runPromise(program)
Best Practices
- Sharding - Design entity IDs for even distribution across shards
- State Management - Keep entity state small and serializable
- Message Design - Use schemas for type-safe message validation
- Error Handling - Handle cluster errors gracefully with retries
- Monitoring - Use ClusterMetrics for observability
- Persistence - Choose appropriate storage for messages and state
- Testing - Use TestRunner for unit testing entity behavior
- SQL - Database integration for persistence
- AI - AI and LLM integration
- Process - Child process management