土曜日, 6月 21, 2025
- Advertisment -
ホームニューステックニュースEffect.tsはストリームを扱う目的でも便利だよ、という話 with BigQuery Storage Write API

Effect.tsはストリームを扱う目的でも便利だよ、という話 with BigQuery Storage Write API



この記事の目標

クオータを越えないようにしつつ、BigQueryにStorage Write APIでインサートしてみよう。ということになります。
この記事を読むにあたって、BigQueryの知識は必要ないです。

詳しくはBigQuery Storage Write API の概要 – Google Cloudを参照。
BigQueryはビッグデータ向けのデータベース、分析ツールです。
ストリームで処理しつつ、ストリーム単位のトランザクションが扱えたり、重複して挿入されない保証をしたりできます。
たとえば、旧来のやつは二重挿入されることもありえたりしました。

こちらを利用して、以下のことを実現したいとします。

  • Stream.Stream を受けとり、すべてを挿入するか、適切に失敗する。
  • append-rows APIで分割して送りたい。 (append-rowsは AppendRowsInputItem[] を取ると思ってもらってよい)
  • append-rowsは一回あたり10MBまでで送らなければならないので、その制約を守りたい。
  • append-rowsは未処理のもの(Promiseが帰ってきてないもの)のサイズ合計が10MBまででなければならないので、その制約を守りたい。
BigQueryユーザ向け詳細

BigQuery Storage Write APIはBigQueryの次世代APIのようなもので、ストリーム処理をトランザクショナルに扱ういくつかの新しいAPIが提供されていたりします。
BigQuery Storage Write APIのベストプラクティスにはappend-rowsは待たずに突っこみまくっていい、というようなことが書かれているんですが、実際にそれをやると10MBの制限が存在し、未処理のものが10MB溜まる瞬間がないようにしなければいけないです。
(リクエスト一個ずつが10MB以内になるだけでは不十分)
ドキュメントに明示している箇所がみあたらなかったのですが、実際小分けにして送りまくってみると、エラーでそのように言われます。(ドキュメントに書いてほしい。)

Effect.tsで必要な道具

Effect.tsで必要な道具を紹介していきます。

StreamとChunk

import { Stream, Chunk } from 'effect';
  • Stream.StreamT のストリームを表します。
  • 以下のように他のものからStreamを作れます。
    • Stream.fromAsyncIterable: AsyncIterable から帰着できます。
    • Stream.fromReadableStream: WebAPIの ReadableStream から帰着できます。
  • 内部的にはチャンク(Chunk.Chunk)を持ちますが、明示的に露出させなければ、それが見られることはありません。
    • Stream.chunks すると Stream.Stream> が得られ、内部のチャンクが露出したストリームになります。
    • チャンクはいい感じにしてくれるものではなく、どのようにするか、ということは何らかの方法でどこかで与えなければなりません。
      • いい感じにする、というのを作ってかませることはできます。
    • Stream.Stream はつまり、(内部チャンクに意味を持たせている文脈においては)ある程度、いい感じの単位にまとまっている状態で、しかしそれを透過的に扱う、という目的に利用できます。
    • 逆に、いい感じになってるのだろうな、ではなく、チャンクそのものに明確に強い意味を持たせるなら、 Stream.Stream> を使いたいということになるでしょう。
    • 大抵の最初に生まれるストリームの内部チャンクは、1要素を1個ずつ括るチャンクになっていると思って良いでしょう。
      • これはつまり、なにもしてないのと同等で、なにかしたいなら自分でしなければならないことを意味します。
  • 配列の .map 等のようなものが、より広く揃っています: 代表的には、 Stream.filter, Stream.map, Stream.take, Stream.flatMap, などなど。
  • Stream vs Chunk
    • StreamはそのままAsyncIterableだと思っておけば良いです。
    • Chunkは、いわゆる普通に配列みたいに思って良いですが、よりarray構造体だと思ってよいしそのように最適化されている対象だと見るとよさそうです。
      • append(要素追加)などはネイティブ配列より速いらしい。
      • effectにはListというのが別であるので、List vs Chunkで考えるのがよりよいかと思います。いわゆる リンクリスト vs 可変長配列 の関係にあたります。
        • C++ でいえば std::liststd::vector の関係
        • Rust でいえば LinkedListVec の関係
        • データ構造をやろう。
      • effectにはArrayというのも生えていますが、これはネイティブ配列を扱うためのもので、特別、新しい構造体ではないです。
  • Effectの一員なので、途中でどのようなエラーが起こり得るか、どのような副作用が必要か、が型に表れます。
    • 例えば、データベースから一覧取得するようなストリームは、コネクションタイムアウトエラーを起こし得て、副作用としてデータベースアクセスを要求する、といった形にできます。(そのように設計すれば)
AsyncIterableの例
import { Chunk, Console, Effect, Stream } from "effect";

function* naturals(): Iterablebigint> {
  let n = 0n;
  while (true) {
    yield n++;
  }
}
const stream = Stream.fromIterable(naturals()).pipe(
  Stream.filter((n) => n % 2n === 0n),
  Stream.take(10),
  Stream.map((n) => n * n),
);

stream.pipe(
  Stream.runCollect,
  Effect.map(Chunk.toArray),
  Effect.andThen(Console.log), 
  Effect.runPromise,
);
ReadableStreamの例
import { Chunk, Console, Effect, Stream } from "effect";

const readableStream = new ReadableStreamstring>({
  start(controller) {
    controller.enqueue("Hello, ");
    controller.enqueue("world!");
    controller.close();
  },
});
const streamFromReadable = Stream.fromReadableStream({
  evaluate: () => readableStream,
  onError: (error) => new Error(`Stream error: ${error}`),
});

streamFromReadable.pipe(
  Stream.runCollect,
  Effect.map(Chunk.toArray),
  Effect.andThen(Console.log), 
  Effect.runPromise,
);

Sink

import { Sink } from 'effect';

Streamは Stream.run... 系のもので、ひとつの値へと集計することもできますが、それを特別に表す構造も持ちます。(もしかしたらそれ以上の意味を持つかもしれません。)
Sinkの基本的な利用方法は Stream.run(stream, sink) になりますから、最も一般化された run... 系のベース、と思っても良いかもしれません。

Sinkのタイプパラメータは以下のようになります。

     ┌─── 最終的に、どのような結果に集計されるか
     |  ┌─── 対象として何のストリームを集計するか
     |  |   ┌─── 余り (leftover)
     ▼  ▼   ▼
Sink

たとえば、

  • numberストリームの合計値を計算するなら、 Sink
  • 文字列ストリームの合計サイズをbigintで集計するなら、 Sink
  • booleanストリームの、最初にtrueになる場所のインデックスを取るなら、 Sink

のように設計できますね。最後の例には、leftoverを含んだものになっており、結果を出すのに、すべてを見なくても良い場合に利用できます。
また、その残りをさらに利用するからこそ型として表現されてるわけですが、そのleftoverの利用方法のひとつが次になります。

ところで、 LInnever になる以外で有用なケースはあるのだろうかというのは、私も気になります。基本は、残りがあるのか、ないのか、といったことを示す用途なのでしょうかね。

Stream.transduce

import { Stream } from 'effect';
Stream.transduce(
  stream: Stream.StreamA>,
  sink: Sink.SinkB, A, A>,
): Stream.StreamB>

transduceは、Sinkを利用して、Streamを別のStreamに変換します。
このとき、Sinkは値をいくらか消費して値を出して、同じタイプの値が残って…を繰り返すようなものである必要があります。

先ほどの、booleanストリームの、最初にtrueになるインデックスを返すものを考えると、true同士(と端)の間隔がいくつ空いているか、のストリームが得られます。

booleanストリームのコード例
import { Sink, Stream, Data, Effect, Console, Chunk } from "effect";


const sink = Sink.fold(
  Data.struct({ last: false, count: 0 }),
  ({ last }) => !last,
  ({ count }, e: boolean) =>
    Data.struct({
      last: e,
      count: count + 1,
    }),
).pipe(Sink.map(({ last, count }) => (last ? count - 1 : count)));

const stream = Stream.fromIterable([
  false,
  false,
  false,
  true,
  false,
  false,
  true,
  false,
  false,
  false,
  false,
  false,
  false,
]);

stream.pipe(
  Stream.transduce(sink),
  Stream.runCollect,
  Effect.andThen(Chunk.toArray),
  Effect.andThen(Console.log), 
  Effect.runPromiseExit,
);

Sink.foldWeighted

Sink.foldWeightedはストリームを特定の重みになるまで集めて、例えばチャンクなどにまとめて出します。
ひとつできたら、残りは余り(leftover)となり終了します。

たとえば、 10 を越えない範囲で先頭から集めてチャンクするSinkが次のように書けます。

Sink.foldWeighted({
  initial: Chunk.emptynumber>(),
  maxCost: 10,
  cost: (_acc: Chunk.Chunknumber>, input: number) => input,
  body: (acc: Chunk.Chunknumber>, input: number) => acc.pipe(Chunk.append(input)),
}); 
  • initial: 初期状態 (ここでは初期の空のチャンク)
  • maxCost: 最大コスト。このコストを越えない範囲で最大のチャンクを作る。ただし、最初の要素であれば越えてても追加する。
  • cost: 新たに追加する要素 input 単体のコストを計算する関数。現時点のチャンクの状態も反映される。
  • body: 現在のチャンクに、新たに追加する要素 input を加えたものを返す関数。

ここではチャンクを作ることを前提に書きましたが、チャンクでなくとも一般にどんなオブジェクトでもよいです。そのオブジェクト自体からはコストを算出するための情報が抜けていてもいいわけですから、「合計値が特定の値を越えない範囲の、平均値」のようなものを直接求めるといったこともできます。

さて、Sink.foldWeightedをSteram.transduceと組み合わせて利用すれば、特定のコストを越えない範囲でチャンクしたストリームに変換する、ということができますね。

Effect.Semaphore

Effect.makeSemaphoreを利用して、特定のキャパシティを持つセマフォを作成できます。

セマフォは並列プログラミングなどに用いられるリソース管理機構で、特定のキャパシティを越えない範囲で並列に処理をすることを実現します。
セマフォを作成すると、リソースが利用できるようになるまで待機する .withPermits か、使えなければそれを教えてくれる .withPermitsIfAvailable (オプションを返す) があります。

ここで、10MBに相当するバイト数のキャパシティを持つセマフォを作成し、ひとつのリクエストを送って処理の完了を待つのに必要なキャパシティを、そのリクエストのバイト数としておけば目的が達成できそうです。

実装の外観

完全版は折り畳みで書いておきます。


const semaphore = yield* Effect.makeSemaphore(semaphorePermits);
const scope = yield* Effect.scope;

yield* stream.pipe(
  
  Stream.map((value) =>
    Data.struct({
      value,
      dataBytes: estimateBigQueryValueBytes(value),
    }),
  ),

  
  
  Stream.transduce(
    Sink.foldWeighted({
      initial: Data.struct({
        cost: bqAppendRowsOverheadBytes,
        chunk: Chunk.emptyStream.Stream.Successtypeof stream>>(),
      }),
      maxCost: bqAppendRowsCapBytes,
      cost: (_acc, { dataBytes }) => dataBytes,
      body: (acc, { value, dataBytes }) =>
        Data.struct({
          cost: acc.cost + dataBytes,
          chunk: Chunk.append(acc.chunk, value),
        }),
    }),
  ),

  
  
  
  

  
  Stream.mapEffect(({ chunk, cost, chunkIndex, offset }) =>
    Effect.gen(function* () {
      yield* Effect.log('appendRows権の取得完了(semaphore)', {
        AppendRowsを実行する: {
          chunkIndex,
          行数: Chunk.size(chunk),
          cost,
          offset,
        },
      });
      return yield* Effect.tryPromise(async () => {
        const pw = writer.appendRows(
          chunk.pipe(Chunk.toArray),
          offset,
        );
        return await pw.getResult();
      });
    }).pipe(
      
      semaphore.withPermits(Math.min(cost, semaphorePermits)),
      Effect.forkIn(scope),
      
    ),
  ),
);

全体の流れとして、以下が伝われば十分です。

  1. セマフォを今回のストリーム挿入用に確保。
  2. ストリームのデータを、適切なサイズ以下になるようにチャンクに分割。
  3. チャンクをその推定サイズのキャパシティをセマフォに要求しながら、並列に処理。
    • 合計サイズ上限は越えないように、最大限リクエストを送る。
完全な実装
import type { Table } from '@google-cloud/bigquery';
import { adapt, managedwriter } from '@google-cloud/bigquery-storage';
import type { google } from '@google-cloud/bigquery-storage/build/protos/protos.js';
import {
  Chunk,
  Config,
  Data,
  Effect,
  Fiber,
  Option,
  pipe,
  Schema,
  Sink,
  Stream,
} from 'effect';
import { Status } from 'google-gax';

export class StreamSetupError extends Data.TaggedError('StreamSetupError'){
  message: string;
}> {
  toString() {
    return `${this.name}: ${this.message}`;
  }
}
export class AppendRowsGoogleInternalError extends Data.TaggedError(
  'AppendRowsGoogleInternalError',
){
  readonly googleInternalRpcStatus: google.rpc.IStatus;
}> {
  toString() {
    return `${this.name}: ${this.googleInternalRpcStatus.message} (code: ${this.googleInternalRpcStatus.code})`;
  }
}
export class AppendRowsError extends Data.TaggedError(
  'AppendRowsGoogleInternalError',
){
  readonly rowErrors: google.cloud.bigquery.storage.v1.IRowError[];
}> {
  toString() {
    return `${this.name}: ${this.rowErrors
      .slice(0, 10)
      .map((e) => `${String(e.index)}: ${e.message}`)
      .join(', ')}`;
  }
}


export const BQ_APPEND_ROWS_MAX_BYTES = 10 * 1024 * 1024;
const bqAppendRowsCapBytesConfig = Schema.Config(
  'BQ_APPEND_ROWS_CAP_BYTES',
  Schema.NumberFromString.pipe(
    Schema.int(),
    Schema.between(0, BQ_APPEND_ROWS_MAX_BYTES),
  ).annotations({
    title: 'BigQuery (StorageWrite API) / Append Rows / Cap Bytes',
    description:
      'BigQuery Storage Write APIのAppend Rowsで1回に送信できる最大バイト数',
  }),
).pipe(Config.withDefault( 3 * 1024 * 1024));
const bqAppendRowsBufferBytesConfig = Schema.Config(
  'BQ_APPEND_ROWS_BUFFER_BYTES',
  Schema.NumberFromString.pipe(
    Schema.int(),
    Schema.between(0, BQ_APPEND_ROWS_MAX_BYTES),
  ).annotations({
    title: 'BigQuery (StorageWrite API) / Append Rows / Cap Bytes',
    description:
      'BigQuery Storage Write APIのAppend Rowsの合算リミットに対しもたせておく余裕サイズ',
  }),
).pipe(Config.withDefault( 1 * 1024 * 1024));
const bqAppendRowsOverheadBytesConfig = Schema.Config(
  'BQ_APPEND_ROWS_OVERHEAD_BYTES',
  Schema.NumberFromString.pipe(
    Schema.int(),
    Schema.between(0, BQ_APPEND_ROWS_MAX_BYTES),
  ).annotations({
    title:
      'BigQuery (StorageWrite API) / Append Rows / Overhead Bytes',
    description:
      'BigQuery Storage Write APIのAppend Rowsで1回の送信にかかると想定するオーバーヘッド',
  }),
).pipe(Config.withDefault( 3 * 1024));

const handleAppendResult = ({
  appendResult,
  error,
  rowErrors,
}: google.cloud.bigquery.storage.v1.IAppendRowsResponse) =>
  Effect.gen(function* () {
    if (error != null) {
      
      
      if (error.code === Status.ALREADY_EXISTS) {
        return Option.fromNullable(appendResult);
      }
      return yield* Effect.fail(
        new AppendRowsGoogleInternalError({ googleInternalRpcStatus: error }),
      );
    }

    if (rowErrors != null && rowErrors.length > 0) {
      return yield* Effect.fail(new AppendRowsError({ rowErrors }));
    }
    return Option.fromNullable(appendResult);
  });

export const estimateBigQueryValueBytes = (
  value: AppendRowsInput | AppendRowsInputItem | AppendRowsInputItemValue,
): number => {
  
  return new Blob([JSON.stringify(value)]).size;
};

export const writeStream = (
  table: Table,
  stream: Stream.StreamAppendRowsInputItem>,
) =>
  pipe(
    Effect.gen(function* () {
      const writeClient = new managedwriter.WriterClient();
      const projectId = table.dataset.bigQuery.projectId;
      const datasetId = table.dataset.id;
      const tableId = table.id;
      const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
      yield* Effect.log(`Destination Table: ${destinationTable}`);
      const writeStream = yield* Effect.tryPromise(() =>
        writeClient.createWriteStreamFullResponse({
          streamType: managedwriter.PendingStream,
          destinationTable,
        }),
      ).pipe(
        Effect.mapError(
          (e) => new StreamSetupError({ message: String(e.error) }),
        ),
      );

      const tableSchema = writeStream.tableSchema;
      if (tableSchema == null)
        return yield* Effect.fail(
          new StreamSetupError({
            message: `テーブルスキーマ(tableSchema)が取得できませんでした。テーブルID: ${tableId}`,
          }),
        );
      const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
        tableSchema,
        'root',
      );

      const writeStreamName = writeStream.name;
      if (writeStreamName == null)
        return yield* Effect.fail(
          new StreamSetupError({
            message: `テーブルID '${tableId}' のwriteStream.nameが取得できませんでした`,
          }),
        );

      yield* Effect.log('writeStreamのセットアップ完了', {
        writeStreamName,
      });

      const connection = yield* Effect.tryPromise(() =>
        writeClient.createStreamConnection({
          streamId: writeStreamName,
        }),
      ).pipe(
        Effect.mapError((e) => new StreamSetupError({ message: e.message })),
      );

      const writer = new managedwriter.JSONWriter({
        connection,
        protoDescriptor,
      });
      yield* Effect.addFinalizer((exit) =>
        Effect.gen(function* () {
          yield* Effect.log('StorageWriteApiのストリームを閉じます', {
            exit: exit.toString(),
            writeStreamName,
          });
          writer.close();
        }),
      );

      yield* Effect.log(
        `セットアップ完了: streamId=${connection.getStreamId()}`,
      );

      
      
      const fiber = yield* Effect.fork(
        Effect.gen(function* () {
          while (true) {
            yield* Effect.async((resume) => {
              connection.once('error', (...args) => {
                resume(
                  Effect.log(`Ignoring connection error`, {
                    args,
                  }),
                );
              });
            });
          }
        }),
      );
      yield* Effect.addFinalizer(() => Fiber.interrupt(fiber));

      return [
        connection,
        writer,
        writeClient,
        destinationTable,
        writeStreamName,
      ] as const;
    }),
    Effect.andThen(
      ([connection, writer, writeClient, destinationTable, streamId]) =>
        Effect.gen(function* () {
          const bqAppendRowsCapBytes = yield* bqAppendRowsCapBytesConfig;
          const bqAppendRowsBufferBytes = yield* bqAppendRowsBufferBytesConfig;
          const bqAppendRowsOverheadBytes =
            yield* bqAppendRowsOverheadBytesConfig;

          const semaphorePermits =
            BQ_APPEND_ROWS_MAX_BYTES - bqAppendRowsBufferBytes;
          const semaphore = yield* Effect.makeSemaphore(semaphorePermits);
          const scope = yield* Effect.scope;

          yield* stream.pipe(
            
            Stream.map((value) =>
              Data.struct({
                value,
                dataBytes: estimateBigQueryValueBytes(value),
              }),
            ),

            
            
            Stream.transduce(
              Sink.foldWeighted({
                initial: Data.struct({
                  cost: bqAppendRowsOverheadBytes,
                  chunk: Chunk.emptyStream.Stream.Successtypeof stream>>(),
                }),
                maxCost: bqAppendRowsCapBytes,
                cost: (_acc, { dataBytes }) => dataBytes,
                body: (acc, { value, dataBytes }) =>
                  Data.struct({
                    cost: acc.cost + dataBytes,
                    chunk: Chunk.append(acc.chunk, value),
                  }),
              }),
            ),

            
            Stream.zipWithIndex,
            Stream.map(([{ chunk, cost }, chunkIndex]) =>
              Data.struct({
                chunk,
                cost,
                chunkIndex,
              }),
            ),

            
            Stream.mapAccum(0, (accum: number, v) => [
              accum + Chunk.size(v.chunk),
              Data.struct({
                ...v,
                offset: accum,
              }),
            ]),

            
            Stream.mapEffect(({ chunk, cost, chunkIndex, offset }) =>
              Effect.gen(function* () {
                yield* Effect.log('appendRows権の取得完了(semaphore)', {
                  AppendRowsを実行する: {
                    chunkIndex,
                    行数: Chunk.size(chunk),
                    cost,
                    offset,
                  },
                });
                return yield* Effect.tryPromise(async () => {
                  const pw = writer.appendRows(
                    chunk.pipe(Chunk.toArray),
                    offset,
                  );
                  return await pw.getResult();
                }).pipe(Effect.andThen(handleAppendResult));
              }).pipe(
                Effect.tap(() =>
                  Effect.log(
                    `AppendRowsを実行しました: chunkIndex=${chunkIndex}`,
                  ),
                ),
                semaphore.withPermits(Math.min(cost, semaphorePermits)),
                Effect.forkIn(scope),
                Effect.withSpan('appendRows'),
                Effect.withSpan(`chunkIndex=${chunkIndex}`),
              ),
            ),
            Stream.runCollect,
            Effect.andThen(Fiber.joinAll),
          );

          yield* Effect.log(
            '全てのAppendRowsが完了しました。connection.finalizeを実行します。',
          );
          const rowCount = Option.fromNullable(
            yield* Effect.tryPromise(() => connection.finalize()),
          ).pipe(Option.map((e) => String(e.rowCount)));
          yield* Effect.log(`connection.finalize完了`, { rowCount });

          yield* Effect.log(
            `writeStreamをコミットします。destinationTable: ${destinationTable}, streamId: ${streamId}`,
          );
          const response = yield* Effect.tryPromise(() =>
            writeClient.batchCommitWriteStream({
              parent: destinationTable,
              writeStreams: [streamId],
            }),
          );
          yield* Effect.log('batchCommitWriteStream完了', {
            response,
          });
        }),
    ),
    Effect.withSpan('BigQuery StorageWrite API / writeStream'),
    Effect.withSpan(`対象テーブル=${table.id}`),
    Effect.scoped,
  );

以下のように並列で処理されていくのが確認できる。

...
timestamp=2025-06-19T09:51:35.453Z level=INFO fiber=#34 message="AppendRowsを実行しました: chunkIndex=25"
timestamp=2025-06-19T09:51:35.454Z level=INFO fiber=#37 message=appendRows権の取得完了(semaphore) message="{
  \"AppendRowsを実行する\": {
    \"chunkIndex\": 28,
    \"行数\": 130,
    \"cost\": 3134288,
    \"offset\": 3640
  }
}"
timestamp=2025-06-19T09:51:35.597Z level=INFO fiber=#35 message="AppendRowsを実行しました: chunkIndex=26"
timestamp=2025-06-19T09:51:35.598Z level=INFO fiber=#38 message=appendRows権の取得完了(semaphore) message="{
  \"AppendRowsを実行する\": {
    \"chunkIndex\": 29,
    \"行数\": 130,
    \"cost\": 3134286,
    \"offset\": 3770
  }
}"
timestamp=2025-06-19T09:51:35.724Z level=INFO fiber=#36 message="AppendRowsを実行しました: chunkIndex=27"
timestamp=2025-06-19T09:51:35.724Z level=INFO fiber=#39 message=appendRows権の取得完了(semaphore) message="{
  \"AppendRowsを実行する\": {
    \"chunkIndex\": 30,
    \"行数\": 130,
    \"cost\": 3134289,
    \"offset\": 3900
  }
}"
timestamp=2025-06-19T09:51:35.846Z level=INFO fiber=#37 message="AppendRowsを実行しました: chunkIndex=28"
timestamp=2025-06-19T09:51:35.847Z level=INFO fiber=#40 message=appendRows権の取得完了(semaphore) message="{
  \"AppendRowsを実行する\": {
    \"chunkIndex\": 31,
    \"行数\": 130,
    \"cost\": 3134283,
    \"offset\": 4030
  }
}"
timestamp=2025-06-19T09:51:35.974Z level=INFO fiber=#38 message="AppendRowsを実行しました: chunkIndex=29"
...

省略したこと: リトライ処理

少し長くなってしまうのと、プロジェクト特有の設定にはなってしまうので、リトライ処理は除きました。リトライもEffect.tsでは非常に簡単に記述ができて、大変すばらしいです。
たとえば、 createWriteStreamFullResponse に以下をパイプすることで、リトライ処理を行うことができます。

import { Effect, Schedule, pipe } from 'effect';
Effect.retry({
  schedule: pipe(
    
    Schedule.union(
      Schedule.exponential('1 seconds'),
      Schedule.spaced('20 seconds'),
    ),
    
    Schedule.intersect(Schedule.recurUpTo('15 minutes')),

    scheduleLogRetryCount('createWriteStreamFullResponse'),
  ),
}),

scheduleLogRetryCount は以下のようなスケジュールとのインターセクトを待つ関数を作成します。

import { Effect, Schedule, pipe } from 'effect';

const scheduleLogRetryCount = (spanName: string) =>
  Schedule.intersect(
    pipe(
      Schedule.count,
      Schedule.tapOutput((count) =>
        Effect.gen(function* () {
          if (count > 0) {
            yield* Effect.log(`retry count=${count}`);
          }
        }).pipe(Effect.withLogSpan(spanName)),
      ),
    ),
  );

Schedule.unionSchedule.intersect の意味論は私もまだ深くは理解できていないですが、公式ドキュメントを見ながら、ユーティリティを組み合わせると、他の類似のライブラリパッケージでは簡単にはできなかったことが驚くほど簡易に、かつ強力に実現できます。
これは、本当にすごく感動しています。Effect.tsは、自分の中では、リトライ処理等を行う他の選択肢の苦しさから救ってくれる最良の選択肢になりました。

単なる便利ライブラリとして、そして新しいパラダイムとしてのEffect.ts

この記事で見てきたのはEffect.tsのまだ断片に過ぎません。しかし、単に良くできたパッケージ群だと思っても、非常に強力で新しく価値のあるものであることがわかりました。

Effect.tsのその理論背景にはEffect Systemというものがあるようですが、私はまだ理解の入口にいるかも分かっていません。しかし、それが関数型に由来し、その先に本質的なパラダイムの変革を起こすものであることは間違いなさそうだと感じます。
Promiseが登場したときにasync/awaitの構文が入ったときのように、Effectが登場し、いつか専用の構文が確立される日が来るのかもしれません。 (それはきっとTypeScriptではないんだろうなとは感じますが…)

課題だと思った点

  • とても難しい。ある程度、プログラミングの知識は広くあるつもりでしたが、Effect.ts、そしてEffectそのものへの理解には時間がかかりそうだと感じています。
  • Effect.forkが適切なランタイムで実行されていないことを検知できない。(浮いているFiberをstaticに検知できない)
  • 今回のBigQueryのクライアントみたいなものを、どのようにContextとして抽象化しようか悩ましい(分からない)
    • 逆に言えば、その抽象化をどのように実現しようかということに悩む必要がなくなり、議論すべきアーキテクチャニグに集中できるような気はします。

また、全体がGeneratorになる、とかは、強力な標準が搭載されることによって得られるメリットが圧倒的だと思っています。自由に色々できてしまう世界のほうがツラいです。

宣伝: 君も一緒にEffect.tsを書かないか?

有効期限: 2025年07月31日 (適宜更新します)

ちょうどこの記事の内容のもとになった実装が、プロダクションにリリースされようとしています。すなわち、Effect.tsが部分的にですがプロダクションに出ます。
Effect.tsを導入するために、トップダウンと、ボトムアップの両面から少しずつ変えようとしています。
この記事で紹介したコードもかなりリファクタリングがさらに必要でしょうから、これからが楽しみなところです。

私のチームを含め、いくつかのチームが採用をしています。採用ページ → https://recruit.optimind.tech/

あと、閲覧注意ですが、本当に僕が個人的に、今の会社についてと、コワーカー募集というnote記事を書きました。



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -