Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/effect-TS/effect-smol/llms.txt

Use this file to discover all available pages before exploring further.

The Cluster module is marked as unstable, meaning its APIs may change in minor version releases. Use caution when upgrading Effect versions.

Overview

The effect/unstable/cluster module provides primitives for building distributed applications with Effect. It implements sharding, entity management, message passing, and distributed coordination patterns for scalable, fault-tolerant systems.

Installation

npm install effect

Key Modules

Sharding

The core service for distributed entity management and message routing.
import { Effect } from "effect"
import { Sharding, Entity, Message } from "effect/unstable/cluster"

// Define an entity
const UserEntity = Entity.make("User", {
  // Entity state and behavior
  GetProfile: Message.make<{ userId: string }, { name: string; email: string }>()
})

// Use sharding to route messages
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  
  // Create a client for the entity
  const client = yield* sharding.makeClient(UserEntity)
  
  // Send a message to an entity
  const profile = yield* client.send("user-123", UserEntity.GetProfile)
  
  return profile
})
Key Functions:
  • makeClient(entity) - Create a client for sending messages to entities
  • getShardId(entityId, group) - Get the shard ID for an entity
  • hasShardId(shardId) - Check if this runner owns a shard
  • getSnowflake - Generate unique distributed IDs
  • isShutdown - Check if sharding is shutting down

Entity

Define stateful entities that can be distributed across a cluster.
import { Effect, Schema } from "effect"
import { Entity, Message, Reply } from "effect/unstable/cluster"

// Define entity messages
const Increment = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()

// Define entity state
interface CounterState {
  value: number
}

// Create entity
const CounterEntity = Entity.make("Counter", {
  Increment,
  GetValue
}).pipe(
  Entity.withState<CounterState>({ value: 0 }),
  Entity.withHandlers({
    Increment: (state, { amount }) => 
      Effect.gen(function*() {
        const newValue = state.value + amount
        yield* Entity.setState({ value: newValue })
        return Reply.success(newValue)
      }),
    GetValue: (state) => 
      Effect.succeed(Reply.success(state.value))
  })
)
Entity Concepts:
  • EntityAddress - Unique address for routing messages to entities
  • EntityId - Identifier for entity instances
  • EntityType - Type definition for entities
  • EntityProxy - Client-side proxy for entity communication
  • EntityResource - Managed resource for entity lifecycle

Message & Reply

Type-safe message passing between entities.
import { Message, Reply } from "effect/unstable/cluster"
import { Schema } from "effect"

// Define a message with schema validation
const CreateUser = Message.make<
  { name: string; email: string },
  { id: string; createdAt: Date }
>()

// Define replies
const successReply = Reply.success({ id: "123", createdAt: new Date() })
const errorReply = Reply.error(new Error("User already exists"))

// Check reply type
if (Reply.isSuccess(successReply)) {
  console.log(successReply.value)
}

Runners

Manage runner nodes that host entities in the cluster.
import { Effect, Layer } from "effect"
import { 
  Sharding, 
  Runner, 
  RunnerStorage,
  HttpRunner 
} from "effect/unstable/cluster"

// Create an HTTP-based runner
const runnerLayer = Layer.mergeAll(
  HttpRunner.layer,
  RunnerStorage.memory
)

// Start the runner
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  const runner = yield* Runner.Runner
  
  // Register entities with the runner
  yield* runner.register(CounterEntity)
  
  // Runner will now host Counter entities
  yield* Effect.never // Keep running
}).pipe(
  Effect.provide(runnerLayer)
)
Runner Types:
  • HttpRunner - HTTP-based runner communication
  • SocketRunner - WebSocket-based runner communication
  • SingleRunner - Single-node runner for development
  • TestRunner - In-memory runner for testing

MessageStorage

Persistent message storage for reliable delivery.
import { MessageStorage, SqlMessageStorage } from "effect/unstable/cluster"

// SQL-based message storage
const messageStorageLayer = SqlMessageStorage.layer

// Use with sharding
const program = Effect.provide(shardingProgram, messageStorageLayer)

RunnerStorage

Store runner registration and health information.
import { RunnerStorage, SqlRunnerStorage } from "effect/unstable/cluster"

// SQL-based runner storage
const runnerStorageLayer = SqlRunnerStorage.layer

Singleton

Manage singleton entities that exist exactly once in the cluster.
import { Effect } from "effect"
import { Singleton, SingletonAddress } from "effect/unstable/cluster"

// Define a singleton service
const CronScheduler = Singleton.make("CronScheduler", {
  schedule: (job: string) => Effect.log(`Scheduling ${job}`),
  cancel: (jobId: string) => Effect.log(`Canceling ${jobId}`)
})

// Access the singleton
const program = Effect.gen(function*() {
  const scheduler = yield* CronScheduler
  yield* scheduler.schedule("daily-backup")
})

ClusterCron

Schedule distributed cron jobs across the cluster.
import { Effect, Schedule } from "effect"
import { ClusterCron } from "effect/unstable/cluster"

// Schedule a recurring task
const cronJob = ClusterCron.make(
  "backup-job",
  Schedule.fixed("1 hour"),
  Effect.log("Running backup...")
)

const program = Effect.gen(function*() {
  yield* ClusterCron.schedule(cronJob)
})

ShardingConfig

Configure sharding behavior.
import { ShardingConfig } from "effect/unstable/cluster"

const config = ShardingConfig.make({
  numberOfShards: 256,
  rebalanceInterval: Duration.minutes(5),
  entityTimeout: Duration.minutes(10),
  entityMaxIdle: Duration.minutes(2)
})

const shardingLayer = Sharding.layer.pipe(
  Layer.provide(Layer.succeed(ShardingConfig.ShardingConfig, config))
)

Snowflake

Generate distributed unique IDs.
import { Effect } from "effect"
import { Snowflake } from "effect/unstable/cluster"

// Generate a Snowflake ID
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  const id = yield* sharding.getSnowflake
  
  console.log(id.value) // Unique distributed ID
  console.log(id.timestamp) // Timestamp component
  console.log(id.machineId) // Machine ID component
  console.log(id.sequence) // Sequence component
  
  return id
})

ClusterError

Error types for cluster operations.
import { Match } from "effect"
import type { ClusterError } from "effect/unstable/cluster"

const handleClusterError = Match.type<ClusterError>().pipe(
  Match.when(
    { _tag: "EntityNotAssignedToRunner" },
    (err) => console.error(`Entity ${err.entityId} not on this runner`)
  ),
  Match.when(
    { _tag: "MailboxFull" },
    (err) => console.error(`Mailbox full for ${err.entityId}`)
  ),
  Match.when(
    { _tag: "PersistenceError" },
    (err) => console.error(`Persistence failed: ${err.message}`)
  ),
  Match.orElse((err) => console.error(`Cluster error: ${err}`))
)

ClusterMetrics

Monitor cluster performance and health.
import { Effect } from "effect"
import { ClusterMetrics } from "effect/unstable/cluster"

const program = Effect.gen(function*() {
  const metrics = yield* ClusterMetrics.ClusterMetrics
  
  // Get cluster metrics
  const entityCount = yield* metrics.entityCount
  const messageRate = yield* metrics.messageRate
  const activeShards = yield* metrics.activeShards
  
  return { entityCount, messageRate, activeShards }
})

ClusterWorkflowEngine

Execute distributed workflows across the cluster.
import { Effect } from "effect"
import { ClusterWorkflowEngine } from "effect/unstable/cluster"

// Define a workflow
const orderWorkflow = ClusterWorkflowEngine.defineWorkflow(
  "order-processing",
  Effect.gen(function*() {
    yield* Effect.log("Step 1: Validate order")
    yield* Effect.sleep("1 second")
    
    yield* Effect.log("Step 2: Charge payment")
    yield* Effect.sleep("1 second")
    
    yield* Effect.log("Step 3: Ship order")
    yield* Effect.sleep("1 second")
    
    return { status: "completed" }
  })
)

// Execute workflow
const program = Effect.gen(function*() {
  const engine = yield* ClusterWorkflowEngine.ClusterWorkflowEngine
  const result = yield* engine.execute(orderWorkflow, { orderId: "123" })
  return result
})

Complete Example

Here’s a complete distributed counter application:
import { Effect, Layer, Schedule } from "effect"
import {
  Entity,
  Message,
  Reply,
  Sharding,
  HttpRunner,
  RunnerStorage,
  MessageStorage
} from "effect/unstable/cluster"

// Define messages
const Increment = Message.make<{ amount: number }, number>()
const Decrement = Message.make<{ amount: number }, number>()
const GetValue = Message.make<void, number>()

// Define entity
const CounterEntity = Entity.make("Counter", {
  Increment,
  Decrement,
  GetValue
}).pipe(
  Entity.withState<{ value: number }>({ value: 0 }),
  Entity.withHandlers({
    Increment: (state, { amount }) =>
      Effect.gen(function*() {
        const newValue = state.value + amount
        yield* Entity.setState({ value: newValue })
        return Reply.success(newValue)
      }),
    Decrement: (state, { amount }) =>
      Effect.gen(function*() {
        const newValue = state.value - amount
        yield* Entity.setState({ value: newValue })
        return Reply.success(newValue)
      }),
    GetValue: (state) =>
      Effect.succeed(Reply.success(state.value))
  })
)

// Setup cluster
const clusterLayer = Layer.mergeAll(
  HttpRunner.layer,
  RunnerStorage.memory,
  MessageStorage.memory
)

// Application
const program = Effect.gen(function*() {
  const sharding = yield* Sharding.Sharding
  const client = yield* sharding.makeClient(CounterEntity)
  
  // Increment counter
  const value1 = yield* client.send("counter-1", { amount: 5 })
  console.log(`After increment: ${value1}`)
  
  // Decrement counter
  const value2 = yield* client.send("counter-1", { amount: 2 })
  console.log(`After decrement: ${value2}`)
  
  // Get current value
  const value3 = yield* client.send("counter-1", GetValue)
  console.log(`Current value: ${value3}`)
  
  return value3
}).pipe(
  Effect.provide(clusterLayer)
)

// Run
Effect.runPromise(program)

Best Practices

  1. Sharding - Design entity IDs for even distribution across shards
  2. State Management - Keep entity state small and serializable
  3. Message Design - Use schemas for type-safe message validation
  4. Error Handling - Handle cluster errors gracefully with retries
  5. Monitoring - Use ClusterMetrics for observability
  6. Persistence - Choose appropriate storage for messages and state
  7. Testing - Use TestRunner for unit testing entity behavior
  • SQL - Database integration for persistence
  • AI - AI and LLM integration
  • Process - Child process management