import type { Table } from '@google-cloud/bigquery';
import { adapt, managedwriter } from '@google-cloud/bigquery-storage';
import type { google } from '@google-cloud/bigquery-storage/build/protos/protos.js';
import {
Chunk,
Config,
Data,
Effect,
Fiber,
Option,
pipe,
Schema,
Sink,
Stream,
} from 'effect';
import { Status } from 'google-gax';
export class StreamSetupError extends Data.TaggedError('StreamSetupError'){
message: string;
}> {
toString() {
return `${this.name}: ${this.message}`;
}
}
export class AppendRowsGoogleInternalError extends Data.TaggedError(
'AppendRowsGoogleInternalError',
){
readonly googleInternalRpcStatus: google.rpc.IStatus;
}> {
toString() {
return `${this.name}: ${this.googleInternalRpcStatus.message} (code: ${this.googleInternalRpcStatus.code})`;
}
}
export class AppendRowsError extends Data.TaggedError(
'AppendRowsGoogleInternalError',
){
readonly rowErrors: google.cloud.bigquery.storage.v1.IRowError[];
}> {
toString() {
return `${this.name}: ${this.rowErrors
.slice(0, 10)
.map((e) => `${String(e.index)}: ${e.message}`)
.join(', ')}`;
}
}
export const BQ_APPEND_ROWS_MAX_BYTES = 10 * 1024 * 1024;
const bqAppendRowsCapBytesConfig = Schema.Config(
'BQ_APPEND_ROWS_CAP_BYTES',
Schema.NumberFromString.pipe(
Schema.int(),
Schema.between(0, BQ_APPEND_ROWS_MAX_BYTES),
).annotations({
title: 'BigQuery (StorageWrite API) / Append Rows / Cap Bytes',
description:
'BigQuery Storage Write APIのAppend Rowsで1回に送信できる最大バイト数',
}),
).pipe(Config.withDefault( 3 * 1024 * 1024));
const bqAppendRowsBufferBytesConfig = Schema.Config(
'BQ_APPEND_ROWS_BUFFER_BYTES',
Schema.NumberFromString.pipe(
Schema.int(),
Schema.between(0, BQ_APPEND_ROWS_MAX_BYTES),
).annotations({
title: 'BigQuery (StorageWrite API) / Append Rows / Cap Bytes',
description:
'BigQuery Storage Write APIのAppend Rowsの合算リミットに対しもたせておく余裕サイズ',
}),
).pipe(Config.withDefault( 1 * 1024 * 1024));
const bqAppendRowsOverheadBytesConfig = Schema.Config(
'BQ_APPEND_ROWS_OVERHEAD_BYTES',
Schema.NumberFromString.pipe(
Schema.int(),
Schema.between(0, BQ_APPEND_ROWS_MAX_BYTES),
).annotations({
title:
'BigQuery (StorageWrite API) / Append Rows / Overhead Bytes',
description:
'BigQuery Storage Write APIのAppend Rowsで1回の送信にかかると想定するオーバーヘッド',
}),
).pipe(Config.withDefault( 3 * 1024));
const handleAppendResult = ({
appendResult,
error,
rowErrors,
}: google.cloud.bigquery.storage.v1.IAppendRowsResponse) =>
Effect.gen(function* () {
if (error != null) {
if (error.code === Status.ALREADY_EXISTS) {
return Option.fromNullable(appendResult);
}
return yield* Effect.fail(
new AppendRowsGoogleInternalError({ googleInternalRpcStatus: error }),
);
}
if (rowErrors != null && rowErrors.length > 0) {
return yield* Effect.fail(new AppendRowsError({ rowErrors }));
}
return Option.fromNullable(appendResult);
});
export const estimateBigQueryValueBytes = (
value: AppendRowsInput | AppendRowsInputItem | AppendRowsInputItemValue,
): number => {
return new Blob([JSON.stringify(value)]).size;
};
export const writeStream = (
table: Table,
stream: Stream.StreamAppendRowsInputItem>,
) =>
pipe(
Effect.gen(function* () {
const writeClient = new managedwriter.WriterClient();
const projectId = table.dataset.bigQuery.projectId;
const datasetId = table.dataset.id;
const tableId = table.id;
const destinationTable = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;
yield* Effect.log(`Destination Table: ${destinationTable}`);
const writeStream = yield* Effect.tryPromise(() =>
writeClient.createWriteStreamFullResponse({
streamType: managedwriter.PendingStream,
destinationTable,
}),
).pipe(
Effect.mapError(
(e) => new StreamSetupError({ message: String(e.error) }),
),
);
const tableSchema = writeStream.tableSchema;
if (tableSchema == null)
return yield* Effect.fail(
new StreamSetupError({
message: `テーブルスキーマ(tableSchema)が取得できませんでした。テーブルID: ${tableId}`,
}),
);
const protoDescriptor = adapt.convertStorageSchemaToProto2Descriptor(
tableSchema,
'root',
);
const writeStreamName = writeStream.name;
if (writeStreamName == null)
return yield* Effect.fail(
new StreamSetupError({
message: `テーブルID '${tableId}' のwriteStream.nameが取得できませんでした`,
}),
);
yield* Effect.log('writeStreamのセットアップ完了', {
writeStreamName,
});
const connection = yield* Effect.tryPromise(() =>
writeClient.createStreamConnection({
streamId: writeStreamName,
}),
).pipe(
Effect.mapError((e) => new StreamSetupError({ message: e.message })),
);
const writer = new managedwriter.JSONWriter({
connection,
protoDescriptor,
});
yield* Effect.addFinalizer((exit) =>
Effect.gen(function* () {
yield* Effect.log('StorageWriteApiのストリームを閉じます', {
exit: exit.toString(),
writeStreamName,
});
writer.close();
}),
);
yield* Effect.log(
`セットアップ完了: streamId=${connection.getStreamId()}`,
);
const fiber = yield* Effect.fork(
Effect.gen(function* () {
while (true) {
yield* Effect.async((resume) => {
connection.once('error', (...args) => {
resume(
Effect.log(`Ignoring connection error`, {
args,
}),
);
});
});
}
}),
);
yield* Effect.addFinalizer(() => Fiber.interrupt(fiber));
return [
connection,
writer,
writeClient,
destinationTable,
writeStreamName,
] as const;
}),
Effect.andThen(
([connection, writer, writeClient, destinationTable, streamId]) =>
Effect.gen(function* () {
const bqAppendRowsCapBytes = yield* bqAppendRowsCapBytesConfig;
const bqAppendRowsBufferBytes = yield* bqAppendRowsBufferBytesConfig;
const bqAppendRowsOverheadBytes =
yield* bqAppendRowsOverheadBytesConfig;
const semaphorePermits =
BQ_APPEND_ROWS_MAX_BYTES - bqAppendRowsBufferBytes;
const semaphore = yield* Effect.makeSemaphore(semaphorePermits);
const scope = yield* Effect.scope;
yield* stream.pipe(
Stream.map((value) =>
Data.struct({
value,
dataBytes: estimateBigQueryValueBytes(value),
}),
),
Stream.transduce(
Sink.foldWeighted({
initial: Data.struct({
cost: bqAppendRowsOverheadBytes,
chunk: Chunk.emptyStream.Stream.Successtypeof stream>>(),
}),
maxCost: bqAppendRowsCapBytes,
cost: (_acc, { dataBytes }) => dataBytes,
body: (acc, { value, dataBytes }) =>
Data.struct({
cost: acc.cost + dataBytes,
chunk: Chunk.append(acc.chunk, value),
}),
}),
),
Stream.zipWithIndex,
Stream.map(([{ chunk, cost }, chunkIndex]) =>
Data.struct({
chunk,
cost,
chunkIndex,
}),
),
Stream.mapAccum(0, (accum: number, v) => [
accum + Chunk.size(v.chunk),
Data.struct({
...v,
offset: accum,
}),
]),
Stream.mapEffect(({ chunk, cost, chunkIndex, offset }) =>
Effect.gen(function* () {
yield* Effect.log('appendRows権の取得完了(semaphore)', {
AppendRowsを実行する: {
chunkIndex,
行数: Chunk.size(chunk),
cost,
offset,
},
});
return yield* Effect.tryPromise(async () => {
const pw = writer.appendRows(
chunk.pipe(Chunk.toArray),
offset,
);
return await pw.getResult();
}).pipe(Effect.andThen(handleAppendResult));
}).pipe(
Effect.tap(() =>
Effect.log(
`AppendRowsを実行しました: chunkIndex=${chunkIndex}`,
),
),
semaphore.withPermits(Math.min(cost, semaphorePermits)),
Effect.forkIn(scope),
Effect.withSpan('appendRows'),
Effect.withSpan(`chunkIndex=${chunkIndex}`),
),
),
Stream.runCollect,
Effect.andThen(Fiber.joinAll),
);
yield* Effect.log(
'全てのAppendRowsが完了しました。connection.finalizeを実行します。',
);
const rowCount = Option.fromNullable(
yield* Effect.tryPromise(() => connection.finalize()),
).pipe(Option.map((e) => String(e.rowCount)));
yield* Effect.log(`connection.finalize完了`, { rowCount });
yield* Effect.log(
`writeStreamをコミットします。destinationTable: ${destinationTable}, streamId: ${streamId}`,
);
const response = yield* Effect.tryPromise(() =>
writeClient.batchCommitWriteStream({
parent: destinationTable,
writeStreams: [streamId],
}),
);
yield* Effect.log('batchCommitWriteStream完了', {
response,
});
}),
),
Effect.withSpan('BigQuery StorageWrite API / writeStream'),
Effect.withSpan(`対象テーブル=${table.id}`),
Effect.scoped,
);