Documentation Index
Fetch the complete documentation index at: https://mintlify.com/effect-TS/effect-smol/llms.txt
Use this file to discover all available pages before exploring further.
A PubSub is an asynchronous message hub where publishers can publish messages and subscribers can subscribe to receive those messages. PubSub supports various backpressure strategies, message replay, and concurrent access from multiple producers and consumers.
Overview
PubSub provides a powerful publish-subscribe pattern:
- Multiple subscribers: Messages are broadcast to all active subscribers
- Backpressure strategies: Control behavior when capacity is reached
- Replay buffer: Late subscribers can receive past messages
- Scoped subscriptions: Automatic cleanup with Effect scopes
Basic Usage
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
// Publisher
yield* PubSub.publish(pubsub, "Hello")
yield* PubSub.publish(pubsub, "World")
// Subscriber
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const message1 = yield* PubSub.take(subscription)
const message2 = yield* PubSub.take(subscription)
console.log(message1, message2) // "Hello", "World"
}))
})
Types
PubSub
interface PubSub<in out A>
An asynchronous message hub supporting multiple publishers and subscribers.
Subscription
interface Subscription<out A>
A consumer’s connection to a PubSub, allowing them to take messages.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* Effect.scoped(Effect.gen(function*() {
const subscription: PubSub.Subscription<string> =
yield* PubSub.subscribe(pubsub)
const message = yield* PubSub.take(subscription)
console.log(message)
}))
})
Creating PubSubs
bounded
const bounded: <A>(capacity: number | {
readonly capacity: number
readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates a bounded PubSub with backpressure strategy. When full, publishers are suspended until space becomes available.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
// Simple bounded PubSub
const pubsub = yield* PubSub.bounded<string>(100)
// With replay buffer for late subscribers
const pubsubWithReplay = yield* PubSub.bounded<string>({
capacity: 100,
replay: 10 // Last 10 messages replayed to new subscribers
})
})
dropping
const dropping: <A>(capacity: number | {
readonly capacity: number
readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates a bounded PubSub that drops new messages when full.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.dropping<string>(3)
yield* PubSub.publish(pubsub, "msg1") // succeeds
yield* PubSub.publish(pubsub, "msg2") // succeeds
yield* PubSub.publish(pubsub, "msg3") // succeeds
const dropped = yield* PubSub.publish(pubsub, "msg4")
console.log("Message dropped:", !dropped) // true
})
sliding
const sliding: <A>(capacity: number | {
readonly capacity: number
readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates a bounded PubSub that evicts old messages when full.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.sliding<string>(3)
yield* PubSub.publish(pubsub, "msg1")
yield* PubSub.publish(pubsub, "msg2")
yield* PubSub.publish(pubsub, "msg3")
yield* PubSub.publish(pubsub, "msg4") // "msg1" is evicted
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const messages = yield* PubSub.takeAll(subscription)
console.log(messages) // ["msg2", "msg3", "msg4"]
}))
})
unbounded
const unbounded: <A>(options?: {
readonly replay?: number | undefined
}) => Effect<PubSub<A>>
Creates an unbounded PubSub.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.unbounded<string>()
// Can publish unlimited messages
for (let i = 0; i < 1000; i++) {
yield* PubSub.publish(pubsub, `message-${i}`)
}
})
Publishing
publish
const publish: {
<A>(value: A): (self: PubSub<A>) => Effect<boolean>
<A>(self: PubSub<A>, value: A): Effect<boolean>
}
Publishes a message to all subscribers.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
const published = yield* PubSub.publish(pubsub, "Hello World")
console.log("Message published:", published) // true
})
publishAll
const publishAll: {
<A>(elements: Iterable<A>): (self: PubSub<A>) => Effect<boolean>
<A>(self: PubSub<A>, elements: Iterable<A>): Effect<boolean>
}
Publishes multiple messages at once.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
const messages = ["Hello", "World", "from", "Effect"]
const allPublished = yield* PubSub.publishAll(pubsub, messages)
console.log("All messages published:", allPublished)
})
publishUnsafe
const publishUnsafe: {
<A>(value: A): (self: PubSub<A>) => boolean
<A>(self: PubSub<A>, value: A): boolean
}
Synchronously publishes a message without Effect wrapping.
import { PubSub } from "effect"
declare const pubsub: PubSub.PubSub<string>
const published = PubSub.publishUnsafe(pubsub, "Hello")
if (published) {
console.log("Message published successfully")
}
Subscribing
subscribe
const subscribe: <A>(
self: PubSub<A>
) => Effect<Subscription<A>, never, Scope>
Creates a subscription to receive messages. Subscriptions are automatically cleaned up when the scope exits.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* PubSub.publish(pubsub, "Hello")
yield* PubSub.publish(pubsub, "World")
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const msg1 = yield* PubSub.take(subscription)
const msg2 = yield* PubSub.take(subscription)
console.log(msg1, msg2) // "Hello", "World"
}))
})
Multiple Subscribers
All subscribers receive the same messages:
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* PubSub.publish(pubsub, "Broadcast")
yield* Effect.scoped(Effect.gen(function*() {
const sub1 = yield* PubSub.subscribe(pubsub)
const sub2 = yield* PubSub.subscribe(pubsub)
const [msg1, msg2] = yield* Effect.all([
PubSub.take(sub1),
PubSub.take(sub2)
])
console.log("Both received:", msg1, msg2) // "Broadcast", "Broadcast"
}))
})
Taking from Subscriptions
take
const take: <A>(self: Subscription<A>) => Effect<A>
Takes a single message from the subscription.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
// Start taking (will suspend until message arrives)
const takeFiber = yield* Effect.forkChild(
PubSub.take(subscription)
)
yield* PubSub.publish(pubsub, "Hello")
const message = yield* Fiber.join(takeFiber)
console.log("Received:", message) // "Hello"
}))
})
takeAll
const takeAll: <A>(self: Subscription<A>) => Effect<NonEmptyArray<A>>
Takes all available messages.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3"])
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const allMessages = yield* PubSub.takeAll(subscription)
console.log("All messages:", allMessages) // ["msg1", "msg2", "msg3"]
}))
})
takeUpTo
const takeUpTo: {
(max: number): <A>(self: Subscription<A>) => Effect<Array<A>>
<A>(self: Subscription<A>, max: number): Effect<Array<A>>
}
Takes up to the specified number of messages without suspending.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3", "msg4", "msg5"])
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const upTo3 = yield* PubSub.takeUpTo(subscription, 3)
console.log("Up to 3:", upTo3) // ["msg1", "msg2", "msg3"]
}))
})
takeBetween
const takeBetween: {
(min: number, max: number): <A>(self: Subscription<A>) => Effect<Array<A>>
<A>(self: Subscription<A>, min: number, max: number): Effect<Array<A>>
}
Takes between min and max messages.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const takeFiber = yield* Effect.forkChild(
PubSub.takeBetween(subscription, 2, 5)
)
yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3"])
const messages = yield* Fiber.join(takeFiber)
console.log("Between 2-5:", messages) // ["msg1", "msg2", "msg3"]
}))
})
Lifecycle
shutdown
const shutdown: <A>(self: PubSub<A>) => Effect<void>
Interrupts fibers waiting on offers or takes.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(1)
yield* PubSub.publish(pubsub, "msg1")
yield* PubSub.shutdown(pubsub)
const result = yield* Effect.either(PubSub.publish(pubsub, "msg2"))
console.log("Publisher interrupted:", result._tag === "Left")
})
isShutdown
const isShutdown: <A>(self: PubSub<A>) => Effect<boolean>
Checks if the PubSub has been shutdown.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
console.log(yield* PubSub.isShutdown(pubsub)) // false
yield* PubSub.shutdown(pubsub)
console.log(yield* PubSub.isShutdown(pubsub)) // true
})
awaitShutdown
const awaitShutdown: <A>(self: PubSub<A>) => Effect<void>
Waits until the PubSub is shutdown.
import { Effect, Fiber, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
const waiterFiber = yield* Effect.forkChild(
Effect.gen(function*() {
yield* PubSub.awaitShutdown(pubsub)
console.log("PubSub has been shutdown!")
})
)
yield* Effect.sleep("100 millis")
yield* PubSub.shutdown(pubsub)
yield* Fiber.join(waiterFiber)
})
Utilities
size
const size: <A>(self: PubSub<A>) => Effect<number>
Returns the number of elements in the PubSub.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* PubSub.publish(pubsub, "msg1")
yield* PubSub.publish(pubsub, "msg2")
const currentSize = yield* PubSub.size(pubsub)
console.log("After publishing:", currentSize) // 2
})
capacity
const capacity: <A>(self: PubSub<A>) => number
Returns the maximum capacity of the PubSub.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(100)
const cap = PubSub.capacity(pubsub)
console.log("PubSub capacity:", cap) // 100
})
isEmpty
const isEmpty: <A>(self: PubSub<A>) => Effect<boolean>
Checks if the PubSub contains zero elements.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
console.log(yield* PubSub.isEmpty(pubsub)) // true
yield* PubSub.publish(pubsub, "Hello")
console.log(yield* PubSub.isEmpty(pubsub)) // false
})
isFull
const isFull: <A>(self: PubSub<A>) => Effect<boolean>
Checks if the PubSub is at capacity.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(2)
console.log(yield* PubSub.isFull(pubsub)) // false
yield* PubSub.publish(pubsub, "msg1")
yield* PubSub.publish(pubsub, "msg2")
console.log(yield* PubSub.isFull(pubsub)) // true
})
remaining
const remaining: <A>(self: Subscription<A>) => Effect<number>
Returns the number of messages available in a subscription.
import { Effect, PubSub } from "effect"
const program = Effect.gen(function*() {
const pubsub = yield* PubSub.bounded<string>(10)
yield* PubSub.publishAll(pubsub, ["msg1", "msg2", "msg3"])
yield* Effect.scoped(Effect.gen(function*() {
const subscription = yield* PubSub.subscribe(pubsub)
const count = yield* PubSub.remaining(subscription)
console.log("Messages available:", count) // 3
}))
})