streamtest
streamtest is a library for testing streams. Provides helper functions that make it easier to test streams with various scenarios and assertions.
Inspired by the test helpers in RxJS.
Define stream scenarios
A simple string series
can be used to define values to be enqueued into a
stream and events such as closes and errors.
Series for ReadableStream
This can be used with the readable
or assertReadable
helpers.
Series format
The following characters are available in the series
:
\x20
: Space is ignored. Used to align columns.-
: Advance 1 tick.|
: Close the stream.!
: Cancel the stream.#
: Abort the stream.(...)
: Groups characters. It does not advance ticks inside. After closing)
, advance 1 tick.- Characters with keys in
values
will have their values enqueued to the stream, and then advance 1 tick. - Other characters are enqueued into the stream as a single character, and then advance 1 tick.
Example
- Series:
" ---A--B(CD)--|"
- Values:
{ A: "foo" }
- Waits 3 ticks.
- "foo" is enqueued and waits 1 tick.
- Waits 2 ticks.
- "B" is enqueued and waits 1 tick.
- "C" is enqueued, "D" is enqueued and waits 1 tick.
- Waits 2 ticks.
- Close the stream.
Series for WritableStream
This can be used with the writable
helper.
Series format
The following characters are available in the series
:
\x20
: Space is ignored. Used to align columns.-
: Advance 1 tick.#
: Abort the stream.<
: Apply backpressure. Then advance 1 tick.>
: Release backpressure. Then advance 1 tick.
Example
- Series:
" --- <-- >-- # "
- Waits 3 ticks.
- Apply backpressure. Flags the stream is not ready for writing.
- Waits 3 ticks.
- Release backpressure. Notify the data source that the stream is ready for writing.
- Waits 3 ticks.
- Abort the stream.
Series for AbortSignal
This can be used with the abort
helper.
Series format
The following characters are available in the series
:
\x20
: Space is ignored. Used to align columns.-
: Advance 1 tick.!
: Abort the signal.
Example
- Series:
" ----- ! "
- Waits 5 ticks.
- Aborts the signal.
API Reference
testStream
Define a block to test streams. TestStreamHelper
is passed to the function
specified for testStream
, which has helper functions available only within
that function.
import { testStream, type TestStreamHelper } from "jsr:@milly/streamtest";
Deno.test("MyTransformer", async () => {
await testStream(async (helper: TestStreamHelper) => {
// ... test logic using helper.assertReadable, helper.readable, and helper.run ...
});
});
readable
helper
Creates a ReadableStream
with the specified series
.
import { testStream } from "jsr:@milly/streamtest";
Deno.test("readable", async () => {
await testStream(async ({ readable }) => {
const abortReason = new Error("abort");
const values = {
A: "foo",
B: "bar",
C: "baz",
} as const;
// "a" ..sleep.. "b" ..sleep.. "c" ..sleep.. close
const characterStream = readable("a--b--c--|");
// ..sleep.. "foo" ..sleep.. "bar" ..sleep.. "baz" and close
const stringStream = readable(" --A--B--(C|)", values);
// "0" ..sleep.. "1" ..sleep.. "2" ..sleep.. abort
const errorStream = readable(" 012#", undefined, abortReason);
// Now you can use the `*Stream` in your test logic.
});
});
writable
helper
Creates a WritableStream
with the specified series
.
import { testStream } from "jsr:@milly/streamtest";
Deno.test("writable", async () => {
await testStream(async ({ writable, readable, run, assertReadable }) => {
const abortReason = new Error("abort");
const dest = writable(" -----<------------- > --#", abortReason);
// Backpressure range ____^^^^^^^^^^^^^^ ^
// Aborts the dest stream ____/
const source = readable("---a---b---c---d--- - -----|");
const intermediate = source.pipeThrough(new TransformStream());
const expected = " ---a---b-----------(cd)--!";
// Apply backpressure ____^ ^^
// Release backpressure and emits "c", "d" _/
await run([intermediate], async (intermediate) => {
intermediate.pipeTo(dest).catch(() => {});
});
await assertReadable(intermediate, expected, {}, abortReason);
});
});
assertReadable
helper
Asserts that the readable stream matches the specified series
.
import { testStream } from "jsr:@milly/streamtest";
import { UpperCase } from "jsr:@milly/streamtest/examples/upper-case";
Deno.test("assertReadable", async () => {
await testStream(async ({ assertReadable, readable }) => {
const abortReason = new Error("abort");
const values = {
A: "foo",
B: "bar",
C: "baz",
} as const;
const source = readable("--A--B--C--#", values, abortReason);
const expectedSeries = " --A--B--C--#";
const expectedValues = {
A: "FOO",
B: "BAR",
C: "BAZ",
};
const actual = source.pipeThrough(new UpperCase());
await assertReadable(actual, expectedSeries, expectedValues, abortReason);
});
});
abort
helper
Creates a AbortSignal
with the specified series
.
import { assertEquals } from "jsr:@std/assert@0.224.0/assert-equals";
import { testStream } from "jsr:@milly/streamtest";
import { delay } from "jsr:@std/async@0.224.0/delay";
Deno.test("readable", async () => {
await testStream(async ({ abort, run }) => {
const abortReason = new Error("abort");
// ..sleep 3 ticks.. abort with `abortReason`
const signal = abort("---!", abortReason);
await run([], async () => {
await delay(300 - 1);
assertEquals(signal.aborted, false);
await delay(2);
assertEquals(signal.aborted, true);
assertEquals(signal.reason, abortReason);
});
});
});
run
helper
Process the test streams inside the run
block.
import { assertEquals } from "jsr:@std/assert@0.224.0/assert-equals";
import { testStream } from "jsr:@milly/streamtest";
import { UpperCase } from "jsr:@milly/streamtest/examples/upper-case";
Deno.test("run", async () => {
await testStream(async ({ run, readable }) => {
const source = readable("--a--b--c--|");
const actual = source.pipeThrough(new UpperCase());
await run([actual], async (actual) => {
const reader = actual.getReader();
assertEquals(await reader.read(), { value: "A", done: false });
assertEquals(await reader.read(), { value: "B", done: false });
assertEquals(await reader.read(), { value: "C", done: false });
assertEquals(await reader.read(), { value: undefined, done: true });
reader.releaseLock();
});
});
});
License
This library is licensed under the MIT License. See the LICENSE file for details.