みーのぺーじ

みーが趣味でやっているPCやソフトウェアについて.Python, Javascript, Processing, Unityなど.

JavaScript で stream を扱う

JavaScript で大きなサイズのデータを扱いたかったので, stream の使い方をまとめます.

以下のソースコードは全て TypeScript の Vitest 用単体テストです.

ReadableStream

まずは ReadableStream を扱ってみます.

ReadableStream - Web API | MDN

import { assert, test } from "vitest";

test("ReadableStream", async () => {
    const src = new ReadableStream<string>({
        start(controller) {
            controller.enqueue("A");
            setTimeout(() => {
                controller.enqueue("BC");
                controller.close();
            }, 100);
        }
    });
    let chunks: string[] = [];
    for await (const chunk of src) {
        chunks.push(chunk);
    }
    assert.deepEqual(chunks, ["A", "BC"]);
});

controllerenqueue() 関数で挿入した文字列が非同期で順番に chunk 変数として取り出せました.

WritableStream

stream からデータを取り出します.

WritableStream - Web API | MDN

test("WritableStream", async () => {
    const src = new ReadableStream<string>({
        start(controller) {
            controller.enqueue("A");
            controller.enqueue("BC");
            controller.close();
        }
    });
    let chunks: string[] = [];
    const target = new WritableStream<string>({
        write(chunk) {
            return new Promise((resolve) => {
                chunks.push(chunk);
                return resolve();
            });
        }
    });
    await src.pipeTo(target);
    assert.deepEqual(chunks, ["A", "BC"]);
});

ReadableStreamWritableStreampipeTo() 関数で繋げます.write() 関数で chunk 変数として文字列を非同期で取り出せました.

TransformStream

途中でデータを変換します.例として CompressionStream を使って gzip 圧縮してから展開してみます.

test("CompressionStream", async () => {
    const src = new ReadableStream<string>({
        start(controller) {
            controller.enqueue("A");
            setTimeout(() => {
                controller.enqueue("BC");
                controller.close();
            }, 100);
        }
    });
    const target = src
        .pipeThrough(new CompressionStream("gzip"))
        .pipeThrough(new DecompressionStream("gzip"))
        .pipeThrough(new TextDecoderStream("utf-8"));
    let chunks: string[] = [];
    for await (const chunk of target) {
        chunks.push(chunk);
    }
    assert.deepEqual(chunks, ["ABC"]);
});

pipeThrough 関数を使用して繋げます.stream はデフォルトで 16 kB 毎に処理するので,結果は一つの文字列に結合されました.

chunk のサイズと分割

test("CompressionStream chunk size", async () => {
    const src = new ReadableStream<string>({
        start(controller) {
            controller.enqueue("1234".repeat(16384));
            controller.close();
        }
    });
    let chunks: string[] = [];
    const target = new WritableStream<string>({
        write(chunk) {
            return new Promise((resolve) => {
                chunks.push(chunk);
                return resolve();
            });
        }
    });
    await src
        .pipeThrough(new CompressionStream("gzip"))
        .pipeThrough(new DecompressionStream("gzip"))
        .pipeThrough(new TextDecoderStream("utf-8"))
        .pipeTo(target);
    assert.deepEqual(chunks.map(chunk => chunk.length), [16384, 16384, 16384, 16384]);
});

4文字を16384個結合すると 64 kB のデータになりますので,4分割されました.

結果

上記の単体テストは全て合格しました.

 ✓ tests/stream.spec.ts (4) 316ms
   ✓ ReadableStream
   ✓ WritableStream
   ✓ CompressionStream
   ✓ CompressionStream chunk size

 Test Files  1 passed (1)
      Tests  4 passed (4)
   Start at  21:15:40
   Duration  324ms

まとめ

JavaScript で stream を利用することで,最低限のメモリーを使いながら大きなサイズのデータを自由に変換して取り出せることが分かりました.