はじめに
私は Firebase が好きで、フロントエンドフレームワークとしては、Flutter を採用することが多いです。
Google I/O 2024 以降、Genkit を継続的に触り続けています。
Genkit は、サーバーサイドで AI を活用した一連の処理を フロー (Flow)
として簡単に定義できる、Firebase チームによって開発された Google 製のオープンソースフレームワークです。
しかし、サーバーで定義したフローを、クライアントである Dart/Flutter アプリから呼び出すには、ひと手間必要でした。
このギャップを埋めるのが、今回開発した Dart client for Genkit
パッケージです。(ベータ版)
ライブラリ概要
上記でも記載の通り、本ライブラリは、サーバー上にデプロイされた Genkit フロー(Cloud Run functions, Cloud Run, GKE などのプラットフォームに公開された Web API)を簡単にコールするための Genkit 専用クライアントライブラリ であり、Dart で Genkit フローを実装するものではありません。
従来の課題
これまでは、Dart/Flutter から Genkit フローを呼び出すには、いくつかのお作法に沿って、手作業で HTTP クライアントを実装する必要がありました。
例えば、以下は HTTP クライアントに dio を使って、Genkit で定義した画像生成フローを呼び出す、これまでの一般的なコードです。
このコードには、いくつか課題があります。
-
リクエストを送る際には入力を
{'data': ...}
でラップし、レスポンスを受け取った際にはresponse.data['result']
から結果を取り出す必要があります。これは Genkit サーバー側固有のルールであり、クライアント側で実装者が毎回意識するのは面倒ですし、間違いの元にもなります。 -
Genkit フローが増えるたびに、似たような HTTP リクエストのコードを書くことになります。認証ヘッダーの付与などを共通化するにも、自分で仕組みを作る必要がありました。
-
上記は単純なリクエストの例ですが、リアルタイムでデータを受け取るストリーミング処理を実装しようとすると、Server-Sent Events (SSE) プロトコルのレスポンスを自力でパースする必要があり、コードはさらに複雑になります。
他のクライアントとの統一性
このライブラリは、公式で提供されている Next.js や Angular 向けのクライアントとインターフェースを合わせて設計されています。
コア設計
このライブラリがどのようにして、Genkit フローの呼び出しを抽象化しているのか、その設計思想や実装内容を紹介します。
主要な構成要素
ライブラリは、主に 3 つのクラスで構成されています。
1. RemoteAction クラス
すべての操作の起点となる、中心的なクラスです。
リモート(サーバー上)の Genkit フローを Dart の世界で表現するオブジェクトと考えることができます。
class RemoteActionO, S> {
final String _url;
final MapString, String>? _defaultHeaders;
final http.Client _httpClient;
final O Function(dynamic jsonData) _fromResponseData;
final S Function(dynamic jsonData)? _fromStreamChunkData;
}
ジェネリクスによって、型安全な操作を保証します。
-
O
(Output): フローが完了したときに受け取る、最終的なレスポンスの型 -
S
(Stream): ストリーミング実行時に、途中で送られてくるチャンクデータの型(ストリーミングを使わない場合はvoid
を指定)
2. FlowStreamResponse 型
ストリーミング操作の戻り値は、Record 型 FlowStreamResponse
で定義されています。
typedef FlowStreamResponseO, S> = ({FutureO> response, StreamS> stream});
これにより、response
(最終結果の Future
)と stream
(途中経過の Stream
)の両方に、1 つのオブジェクトから型安全にアクセスでき、直感的なコーディングが可能になります。
3. GenkitException クラス
Genkit フローがエラーを返した場合、ライブラリはそれを GenkitException
としてスローします。
HTTP ステータスコードや詳細なエラー情報が含まれているため、クライアント側での柔軟なエラーハンドリングをサポートします。
class GenkitException implements Exception {
final String message;
final int? statusCode;
final String? details;
final Exception? underlyingException;
final StackTrace? stackTrace;
const GenkitException(
this.message, {
this.statusCode,
this.details,
this.underlyingException,
this.stackTrace,
});
}
ストリーミング実装について
このライブラリの最も強力な機能の 1 つが、ストリーミングのサポートです。
この裏側では Server-Sent Events (SSE) プロトコルが使われています。
Genkit のストリーミングエンドポイントは、下記のような形式でデータを断続的に送り出してきます。
data: {"message": {"text": "Hello"}}
data: {"message": {"text": " World"}}
data: {"result": "Hello World"}
このままでは扱いにくいため、ライブラリ内部では、まず SSE プロトコルで定められたいくつかの定数を使って、生データを解析していきます。
const flowStreamDelimiter = '\n\n';
const sseDataPrefix = 'data: ';
-
flowStreamDelimiter
- 各メッセージブロックは、2 つの改行文字
\n\n
で区切られる - この定数を使って、ストリームから個々のメッセージを正確に切り出す
- 各メッセージブロックは、2 つの改行文字
-
sseDataPrefix
- SSE の仕様では、メッセージ本体の各行は
data:
という接頭辞で始まる必要がある - この定数は、その接頭辞を識別し、取り除くために使用する
- SSE の仕様では、メッセージ本体の各行は
これらの定数を用いて、ライブラリはデータストリームを Dart の Stream
オブジェクトに変換する、以下のような処理を行っています。
_streamInternal メソッド内の listen コールバックから抜粋
var buffer = '';
subscription = streamedResponse.stream.transform(utf8.decoder).listen(
(decodedChunk) {
buffer += decodedChunk;
while (buffer.contains(flowStreamDelimiter)) {
final endOfChunk = buffer.indexOf(flowStreamDelimiter);
final chunkData = buffer.substring(0, endOfChunk);
buffer = buffer.substring(endOfChunk + flowStreamDelimiter.length);
if (!chunkData.startsWith(sseDataPrefix)) {
continue;
}
final jsonData = chunkData.substring(sseDataPrefix.length);
if (jsonData.isEmpty) continue;
try {
final parsedJson = jsonDecode(jsonData);
if (parsedJson is MapString, dynamic>) {
if (parsedJson.containsKey('message')) {
if (!streamController.isClosed) {
streamController.add(parsedJson['message']);
}
} else if (parsedJson.containsKey('result')) {
if (!responseCompleter.isCompleted) {
responseCompleter.complete(parsedJson['result']);
}
} else if (parsedJson.containsKey('error')) {
}
}
} on FormatException catch (e, s) {
}
}
},
onDone: () {
},
onError: (e, s) {
},
);
この実装により、Dart/Flutter アプリの実装者は SSE の複雑な仕様を意識することなく、await for
を使って直感的に Genkit フローのストリーミングデータを扱うことが可能となります。
使い方
ここからが本題です。
Dart client for Genkit のパッケージを自プロジェクトからどうやって使用するかを紹介していきます。
基本的なセットアップ
install
dart pub add genkit
or
flutter pub add genkit
impor
import 'package:genkit/genkit.dart';
1. 最もシンプルな String → String のフロー呼び出し
はじめに、最も基本的な例として、文字列を入力として受け取り、文字列を返すフローを呼び出してみます。
Genkit JS(サーバー)側
まず、サーバー側で以下のような、string
型の入力と出力を持つ Genkit フローが定義されているとします。
export const simpleStringFlow = ai.defineFlow(
{
name: `simple-string`,
inputSchema: z.string(),
outputSchema: z.string(),
},
async (input) => {
const response = await ai.generate({ prompt: input });
return response.text;
}
);
Dart(クライアント)側
このフローを Dart から呼び出すには、defineRemoteAction
を使ってクライアントを定義します。
フローのエンドポイント URL と、レスポンスを期待する型に変換するための fromResponse
関数を渡すだけで、あとは定義したアクションを関数のように実行できます。
import 'package:genkit/genkit.dart';
void main() async {
final simpleStringAction = defineRemoteAction(
url: 'https://your-service.com/simpleString',
fromResponse: (json) => json as String,
);
try {
final response = await simpleStringAction(
input: 'Hello Dart client!',
);
print('Response: $response');
} on GenkitException catch (e) {
print('Genkit Error: ${e.message}');
}
}
2. 型付きオブジェクトを使用した場合
次に、より実践的な例として、型付けされたオブジェクトを送受信する方法を見ていきましょう。json_serializable
のようなライブラリと組み合わせることで、型安全性を維持したまま、複雑なデータを扱うことができます。
スキーマ定義(json_annotation を使用)
まず、クライアントとサーバーで共通のデータ構造を、Dart のクラスとして定義します。
ここでは json_serializable
を使い、toJson
/ fromJson
を自動生成します。
import 'package:json_annotation/json_annotation.dart';
part 'my_schemas.g.dart';
()
class MyInput {
final String message;
final int count;
MyInput({required this.message, required this.count});
factory MyInput.fromJson(MapString, dynamic> json) =>
_$MyInputFromJson(json);
MapString, dynamic> toJson() => _$MyInputToJson(this);
}
()
class MyOutput {
final String reply;
final int newCount;
MyOutput({required this.reply, required this.newCount});
factory MyOutput.fromJson(MapString, dynamic> json) =>
_$MyOutputFromJson(json);
MapString, dynamic> toJson() => _$MyOutputToJson(this);
}
Genkit JS(サーバー)側
サーバー側では Zod を使って Dart 側のクラスに対応するスキーマを定義します。
export const processObjectFlow = ai.defineFlow(
{
name: `process-object`,
inputSchema: z.object({
message: z.string(),
count: z.number(),
}),
outputSchema: z.object({
reply: z.string(),
newCount: z.number(),
}),
},
async (input) => {
const response = await ai.generate({
prompt: input.message,
});
const newCount = input.count * 2;
return {
reply: response.text,
newCount,
};
}
);
Dart(クライアント)側
defineRemoteAction
の fromResponse
に、先ほど定義したクラスの fromJson
ファクトリコンストラクタを渡します。
これにより、クライアントは受け取った JSON を自動的に MyOutput
オブジェクトに変換します。
呼び出し時には MyInput
オブジェクトを渡すだけで、ライブラリが内部で JSON への変換を行います。
import 'package:genkit/genkit.dart';
import 'schemas/my_schemas.dart';
void main() async {
final processObjectAction = defineRemoteActionMyOutput, void>(
url: 'https://your-service.com/processObject',
fromResponse: (json) => MyOutput.fromJson(json),
);
try {
final response = await processObjectAction(
input: MyInput(message: 'Hello Genkit!', count: 20),
);
print('Response: ${response.reply}, New Count: ${response.newCount}');
} on GenkitException catch (e) {
print('Error: ${e.message}');
}
}
3. ストリーミングフロー
ストリーミング処理は、このライブラリの主要な機能です。
生成 AI からのレスポンスのように、サーバーからのデータを断続的に受け取りたい場合に役立ちます。
Genkit JS(サーバー)側
サーバー側では、streamSchema
と streamingCallback
を使って、ストリーミング用の Genkit フローを定義します。
export const streamObjectsFlow = ai.defineFlow(
{
name: `stream-objects`,
inputSchema: z.object({
prompt: z.string(),
}),
outputSchema: z.object({
text: z.string(),
summary: z.string(),
}),
streamSchema: z.object({
text: z.string(),
summary: z.string(),
}),
},
async (input, streamingCallback) => {
if (!streamingCallback) {
throw new Error(`Streaming callback not provided for a streaming flow.`);
}
const { stream, response } = ai.generateStream({
prompt: input.prompt,
});
let accumulatedText = ``;
for await (const chunk of stream) {
if (chunk.text) {
accumulatedText += chunk.text;
const streamChunk = {
text: chunk.text,
summary: `Processing: ${accumulatedText.slice(0, 50)}...`,
};
streamingCallback(streamChunk);
}
}
const finalResponse = await response;
const finalOutput = {
text: finalResponse.text,
summary: `Completed processing of: "${input.prompt}"`,
};
return finalOutput;
}
);
Dart(クライアント)側
クライアント側では stream()
メソッドを呼び出します。
これにより、最終結果の Future
と途中経過の Stream
を持つ FlowStreamResponse
が返されます。await for
構文を使えば、SSE の複雑さを意識することなく、サーバーからのストリームデータを簡潔に扱うことができます。
import 'package:genkit/genkit.dart';
import 'schemas/stream_schemas.dart';
void main() async {
final streamObjectsAction = defineRemoteActionStreamOutput, StreamOutput>(
url: 'https://your-service.com/streamObjects',
fromResponse: (json) => StreamOutput.fromJson(json),
fromStreamChunk: (json) => StreamOutput.fromJson(json),
);
try {
final (:stream, :response) = streamObjectsAction.stream(
input: StreamInput(prompt: 'What is Genkit?'),
);
print('Streaming chunks:');
await for (final chunk in stream) {
print('Chunk: ${chunk.text}');
}
final finalResult = await response;
print('Final Response: ${finalResult.text}');
} on GenkitException catch (e) {
print('Error: ${e.message}');
}
}
4. 認証ヘッダーの送信
実際のアプリケーションでは、認証が不可欠です。
このライブラリでは、HTTP ヘッダーを柔軟に設定することで、認証付きのフロー呼び出しを簡単に行えます。
Genkit JS(サーバー)側
例として、Firebase Authentication の匿名認証済みユーザーのみがフローを実行できる、というポリシーをサーバー側で設定します。
import { onCallGenkit } from "firebase-functions/https";
import { googleAIapiKey } from "./genkit";
import { processObjectFlow } from "./flows/process-object-flow";
import { streamObjectsFlow } from "./flows/stream-objects-flow";
import { simpleStringFlow } from "./flows/simple-string-flow";
const opts = {
secrets: [googleAIapiKey],
region: `asia-northeast1`,
cors: true,
authPolicy: (auth: any) => {
return auth?.token?.firebase?.sign_in_provider === `anonymous`;
},
};
export const simpleString = onCallGenkit(opts, simpleStringFlow);
export const processObject = onCallGenkit(opts, processObjectFlow);
export const streamObjects = onCallGenkit(opts, streamObjectsFlow);
Dart(クライアント)側
クライアント側では、Firebase Authentication SDK などで取得した ID トークンを、Authorization
ヘッダーに含めてリクエストします。defineRemoteAction
の defaultHeaders
に設定すれば、そのインスタンスからのリクエストすべてに、このヘッダーが自動で付与されます。
import 'package:genkit/genkit.dart';
import 'package:firebase_auth/firebase_auth.dart';
void main() async {
final user = FirebaseAuth.instance.currentUser;
final idToken = await user?.getIdToken();
final simpleStringAction = defineRemoteAction(
url: 'https://your-service.com/simpleString',
fromResponse: (json) => json as String,
defaultHeaders: {
'Authorization': 'Bearer $idToken',
},
);
final response = await simpleStringAction(
input: 'Hello Dart client!',
);
print('Response: $response');
}
リクエスト単位でのヘッダー設定
defaultHeaders
に加えて、リクエストごとにヘッダーを動的に設定することも可能です。call()
や stream()
メソッドの headers
引数で指定することで、リクエスト ID を付与するなどの柔軟な対応ができます。
import 'package:genkit/genkit.dart';
void main() async {
final remoteAction = defineRemoteAction(
url: 'https://your-service.com/simpleString',
fromResponse: (json) => json as String,
defaultHeaders: {
'Authorization': 'Bearer your-token',
'X-Client-Type': 'flutter',
},
);
final response1 = await remoteAction(
input: 'First request',
headers: {
'X-Request-ID': 'req-001',
'X-Priority': 'high',
},
);
final response2 = await remoteAction(
input: 'Second request',
headers: {
'X-Request-ID': 'req-002',
'X-Priority': 'normal',
},
);
}
Flutter との組み合わせ
Dart client for Genkit を Flutter で使用する際のサンプルアプリも作ってみました。
サンプルアプリ
サンプルアプリの実装詳細はこちらをご覧ください。
処理の流れをイメージしやすいように、Flutter (クライアント側)だけでなく、Genkit フロー(サーバー側)のコードも含めています。
Tips
今回のサンプルでは、GenkitClient
という自前クラスを用意し、そこに、Dart client for Genkit を経由した Genkit フロー呼び出しを隠蔽しています。
※ クリーンアーキテクチャ、オニオンアーキテクチャ、レイヤードアーキテクチャの文脈でいうところのインフラストラクチャ層を担当させるイメージです。
class GenkitClient {
GenkitClient();
final baseUrl = 'https://region-project-name.cloudfunctions.net';
FutureMyOutput> callProcessObjectAction({
required String idToken,
required String message,
required int count,
}) async {
try {
final remoteAction = defineRemoteActionMyOutput, void>(
url: '$baseUrl/processObject',
fromResponse: (json) => MyOutput.fromJson(json),
defaultHeaders: {'Authorization': 'Bearer $idToken'},
);
return await remoteAction(input: MyInput(message: message, count: count));
} on GenkitException catch (e) {
throw Exception('Failed to remote action: ${e.message}');
}
}
FlowStreamResponseStreamOutput, StreamOutput> subscribeStreamObjectAction({
required String idToken,
required String prompt,
}) {
try {
final remoteAction = defineRemoteActionStreamOutput, StreamOutput>(
url: '$baseUrl/streamObjects',
fromResponse: (json) => StreamOutput.fromJson(json),
fromStreamChunk: (json) => StreamOutput.fromJson(json),
defaultHeaders: {'Authorization': 'Bearer $idToken'},
);
return remoteAction.stream(input: StreamInput(prompt: prompt));
} on GenkitException catch (e) {
throw Exception('Failed to remote action: ${e.message}');
}
}
}
まとめ
本記事では、Dart/Flutter から Genkit フローを快適に呼び出すために開発した Dart client for Genkit を紹介しました。
以前、とある Google 社員の方が Firebase チームの設計思想について話しているのを聞く機会がありました。
その方は「Firebase のインターフェース設計は美しい」と語っており、今回においては、特に Next.js や Angular など、異なるフレームワーク間でも一貫した使い心地を提供するという思想は、私にとって新しい視点でした。
複数プラットフォームに SDK を提供するプロダクトでは、特に意識すべき点だと改めて感じています。
その上で、今回開発に携わったライブラリが、公式クライアントとのインターフェース統一性に関するレビューをいただけたことは、その思想を少しでも体現できたようで、非常に嬉しい経験でした。
昨今、AI がコード生成を助けてくれる場面が増えていますが、最終的な品質を担保するのは、私たち開発者(人間)だと思っています。
だからこそ、人間にとってのコードの可読性やメンテナンス性は、これまで以上に重要になってくると感じています。
このライブラリが、複雑な部分を隠蔽・抽象化し、クリーンなインターフェースを提供することで、その一助となれば幸いです。
参考
Views: 0