Documentation Index Fetch the complete documentation index at: https://mintlify.com/effect-TS/effect-smol/llms.txt
Use this file to discover all available pages before exploring further.
Effect Streams represent effectful, pull-based sequences of values over time. They let you model finite or infinite data sources, transform them with composable operators, and consume them efficiently.
Creating Streams
Streams can be created from various data sources.
From Iterables
The simplest way to create a stream is from an array or any iterable:
import { Stream } from "effect"
// Stream.fromIterable turns any iterable into a stream
export const numbers = Stream . fromIterable < number >([ 1 , 2 , 3 , 4 , 5 ])
From Effects with Schedules
Create polling streams that emit values on a schedule:
import { Effect , Schedule , Stream } from "effect"
// Stream.fromEffectSchedule turns a single effect into a polling stream.
// This is useful for metrics, health checks, and cache refresh loops.
export const samples = Stream . fromEffectSchedule (
Effect . succeed ( 3 ),
Schedule . spaced ( "30 seconds" )
). pipe (
// Stream.take limits the number of elements emitted by the stream
Stream . take ( 3 )
)
From Paginated APIs
Use Stream.paginate for APIs that return data one page at a time:
import { Array , Effect , Stream } from "effect"
import * as Option from "effect/Option"
// Stream.paginate is perfect for APIs that return one page at a time
export const fetchJobsPage = Stream . paginate (
0 , // start with page 0 (the cursor)
Effect . fn ( function* ( page ) {
// Simulate network latency
yield * Effect . sleep ( "50 millis" )
const results = Array . range ( 0 , 100 ). map (( i ) =>
`Job ${ i + 1 + page * 100 } `
)
// Only return 10 pages of results
const nextPage = page <= 10
? Option . some ( page + 1 )
: Option . none ()
return [ results , nextPage ] as const
})
)
From Async Iterables
Convert async iterables into streams:
import { Schema , Stream } from "effect"
class LetterError extends Schema . TaggedErrorClass < LetterError >()( "LetterError" , {
cause: Schema . Defect
}) {}
async function* asyncIterable () {
yield "a"
yield "b"
yield "c"
}
// Create a stream from an async iterable
export const letters = Stream . fromAsyncIterable (
asyncIterable (),
( cause ) => new LetterError ({ cause })
)
From Event Listeners
Create streams from DOM events:
import { Stream } from "effect"
const button = document . getElementById ( "my-button" ) !
// Stream.fromEventListener creates a stream from an event listener
export const events = Stream . fromEventListener < PointerEvent >( button , "click" )
From Callbacks
For any callback-based API, use Stream.callback:
import { Effect , Queue , Stream } from "effect"
const button = document . getElementById ( "my-button" ) !
export const callbackStream = Stream . callback < PointerEvent >( Effect . fn ( function* ( queue ) {
// Use the Queue APIs to emit values into the stream
function onEvent ( event : PointerEvent ) {
Queue . offerUnsafe ( queue , event )
}
// Register the event listener and add a finalizer to unregister it
yield * Effect . acquireRelease (
Effect . sync (() => button . addEventListener ( "click" , onEvent )),
() => Effect . sync (() => button . removeEventListener ( "click" , onEvent ))
)
}))
From Node.js Streams
Convert Node.js readable streams:
import { NodeStream } from "@effect/platform-node"
import { Schema } from "effect"
import { Readable } from "node:stream"
export class NodeStreamError extends Schema . TaggedErrorClass < NodeStreamError >()( "NodeStreamError" , {
cause: Schema . Defect
}) {}
// Create a stream from a Node.js readable stream
export const nodeStream = NodeStream . fromReadable ({
evaluate : () => Readable . from ([ "Hello" , " " , "world" , "!" ]) ,
onError : ( cause ) => new NodeStreamError ({ cause }),
closeOnDone: true // true by default
})
Streams provide rich operators for transformation.
Use Stream.map for pure per-element transforms:
import { Stream } from "effect"
interface Order {
readonly id : string
readonly subtotalCents : number
readonly shippingCents : number
}
interface NormalizedOrder extends Order {
readonly totalCents : number
}
const orderEvents = Stream . succeed < Order >({
id: "ord_1001" ,
customerId: "cus_1" ,
status: "paid" ,
subtotalCents: 4_500 ,
shippingCents: 500 ,
country: "US"
})
// Stream.map for pure per-element transforms
export const normalizedOrders = orderEvents . pipe (
Stream . map (( order ) : NormalizedOrder => ({
... order ,
totalCents: order . subtotalCents + order . shippingCents
}))
)
Filtering
Exclude elements that don’t match a predicate:
// Stream.filter lets you exclude elements
export const paidOrders = normalizedOrders . pipe (
Stream . filter (( order ) => order . status === "paid" )
)
FlatMap
Transform each element into a stream and flatten the results:
import { Stream } from "effect"
// Stream.flatMap to transform each element into a stream
export const allOrders = Stream . make ( "US" , "CA" , "NZ" ). pipe (
Stream . flatMap (
( country ) =>
Stream . range ( 1 , 50 ). pipe (
Stream . map (( i ) : Order => ({
id: `ord_ ${ country } _ ${ i } ` ,
customerId: `cus_ ${ i } ` ,
status: i % 10 === 0 ? "refunded" : "paid" ,
subtotalCents: Math . round ( Math . random () * 100_000 ),
shippingCents: Math . round ( Math . random () * 10_000 ),
country
}))
),
// Control the concurrency of the flatMap
{ concurrency: 2 }
)
)
Use Stream.mapEffect for transformations that require effects:
import { Effect , Stream } from "effect"
const enrichOrder = Effect . fn ( function* ( order : NormalizedOrder ) : Effect . fn . Return < EnrichedOrder > {
// Simulate effectful enrichment (tax/risk lookup)
yield * Effect . sleep ( "5 millis" )
const taxRate = order . country === "US" ? 0.08 : 0.13
const taxCents = Math . round ( order . totalCents * taxRate )
return {
... order ,
taxCents ,
grandTotalCents: order . totalCents + taxCents ,
priority: order . totalCents >= 20_000 ? "high" : "normal"
}
})
// Stream.mapEffect performs effectful transforms with concurrency control
export const enrichedPaidOrders = paidOrders . pipe (
Stream . mapEffect ( enrichOrder , { concurrency: 4 })
)
Consuming Streams
Streams are consumed using various run* methods.
Collecting All Elements
Gather all stream outputs into an array:
import { Stream } from "effect"
// runCollect gathers all stream outputs into an immutable array
export const collectedOrders = Stream . runCollect ( enrichedPaidOrders )
Running for Effects
Run the stream for its effects, ignoring outputs:
// runDrain runs the stream for its effects, ignoring all outputs
export const drained = Stream . runDrain ( enrichedPaidOrders )
Processing Each Element
Execute an effectful consumer for every element:
import { Effect , Stream } from "effect"
// runForEach executes an effectful consumer for every element
export const logOrders = enrichedPaidOrders . pipe (
Stream . runForEach (( order ) =>
Effect . logInfo ( `Order ${ order . id } total=$ ${ ( order . grandTotalCents / 100 ). toFixed ( 2 ) } ` )
)
)
Folding/Reducing
Reduce the stream to one accumulated value:
// runFold reduces the stream to one accumulated value
export const totalRevenueCents = enrichedPaidOrders . pipe (
Stream . runFold (() => 0 , ( acc : number , order ) => acc + order . grandTotalCents )
)
Using Sinks
Consume streams through sinks:
import { Sink , Stream } from "effect"
// run lets you consume a stream through any Sink
export const totalRevenueViaSink = enrichedPaidOrders . pipe (
Stream . map (( order ) => order . grandTotalCents ),
Stream . run ( Sink . sum )
)
Getting First/Last Elements
Capture edge elements as Option values:
// runHead and runLast capture edge elements as Option values
export const firstLargeOrder = enrichedPaidOrders . pipe (
Stream . filter (( order ) => order . priority === "high" ),
Stream . runHead
)
export const lastLargeOrder = enrichedPaidOrders . pipe (
Stream . filter (( order ) => order . priority === "high" ),
Stream . runLast
)
Windowing and Limiting
Control how much of a stream is processed.
Taking Elements
import { Stream } from "effect"
// Take only the first N elements
export const firstTwoOrders = enrichedPaidOrders . pipe (
Stream . take ( 2 ),
Stream . runCollect
)
Dropping Elements
// Skip the first N elements
export const afterWarmupOrder = enrichedPaidOrders . pipe (
Stream . drop ( 1 ),
Stream . runCollect
)
Conditional Taking
// Take elements while a condition is true
export const untilLargeOrder = enrichedPaidOrders . pipe (
Stream . takeWhile (( order ) => order . priority === "normal" ),
Stream . runCollect
)
Encoding and Decoding Streams
Use channels to decode and encode streams of structured data.
import { Stream } from "effect"
import { Ndjson , Msgpack } from "effect/unstable"
// Decode NDJSON stream
const ndjsonStream = someByteStream . pipe (
Stream . pipeThroughChannel ( Ndjson . decode )
)
// Encode to MessagePack
const msgpackStream = structuredDataStream . pipe (
Stream . pipeThroughChannel ( Msgpack . encode )
)
Stream Patterns
Merging Streams
Zipping Streams
Buffering
Grouping
import { Stream } from "effect"
const stream1 = Stream . range ( 1 , 5 )
const stream2 = Stream . range ( 10 , 15 )
// Merge two streams into one
const merged = Stream . merge ( stream1 , stream2 )
import { Stream } from "effect"
const ids = Stream . range ( 1 , 5 )
const names = Stream . make ( "Alice" , "Bob" , "Charlie" )
// Combine elements from two streams pairwise
const users = Stream . zip ( ids , names )
import { Stream } from "effect"
// Buffer elements into chunks
const buffered = someStream . pipe (
Stream . buffer ({ capacity: 100 })
)
import { Stream } from "effect"
// Group consecutive elements
const grouped = someStream . pipe (
Stream . grouped ( 10 ) // Groups of 10
)
Error Handling in Streams
Streams propagate errors through the error channel:
import { Effect , Stream } from "effect"
const streamWithErrors = Stream . make ( 1 , 2 , 3 , 4 , 5 ). pipe (
Stream . mapEffect (( n ) =>
n === 3
? Effect . fail ( new Error ( "Invalid value" ))
: Effect . succeed ( n * 2 )
),
// Catch and recover from errors
Stream . catchAll (( error ) => Stream . succeed ( - 1 ))
)
Best Practices
Use Streams for Sequences Prefer streams over arrays when working with large datasets, infinite sequences, or data that arrives over time.
Control Concurrency Always specify concurrency limits with mapEffect and flatMap to prevent resource exhaustion.
Compose Operators Build complex stream pipelines by composing simple operators. Keep individual transformations focused.
Handle Errors Use Stream.catchAll, Stream.orElse, or Stream.retry to handle errors gracefully in stream pipelines.
Next Steps
Concurrency Learn about concurrent operations and fiber management
Error Handling Handle errors in streams and effects