こんにちは、サントリーこと大橋です。
今日(2025/07/24)Agent Development Kit(以下 ADK) 1.9.0がリリースされました。
主な更新内容は
- adk webで起動するFastAPIサーバーをモジュール化し、再利用・カスタマイズしやすい形式に変更
- 新たなPluginのCallback(on_tool_error_callback, on_model_error_callback)追加
- GeminiのRetryOptions
今回はこの中で「新たなPluginのCallback」について解説していきます。
ADKのPluginとCallback
ADKには各種処理に対して、処理を差し込むことができる「Plugin」及び「Callback」呼ばれる機能があります。
Callbackは、任意のAgentに対して「ログの挿入」「リクエストの変更」「レスポンスの改変」「処理の中断」といった処理を差し込むための仕組みです。ADKの機能を拡張する上で、非常に重要な役割を担っています。
PluginはこのCallbackの機能を全Agentに対して適用できる機能でADK 1.7.0で追加されました。
ADKでは1.8.0までは以下の6つのCallbackが利用できました。
- Agentの実行前後
- before_agent_callback
- after_agent_callback
- LLMの実行前後
- before_model_callback
- after_model_callback
- ツールの実行前後
- before_tool_callback
- after_tool_callback
基本的には各種処理の前後に処理を差し込めるようになっており、ADKの状態(state
)の変更や、リクエスト(ユーザからのメッセージや、プロンプト、ツール引数など)の変更、レスポンスの変更(LLMからの変更、Agent自体のレスポンスの変更、ツールレスポンスの変更)が行えました。
課題:従来のエラーハンドリング
ここで、これまでのADKにおけるエラー処理の課題について見てみましょう。
従来のADKでは、「LLMやツール内で例外が発生した際、その例外がRunnerの内部で処理されず、呼び出し元までスローされてしまう」という課題がありました。
RunnerはADKのAgentを実行するための実行クラスです。
通常ADKでAgentを作成した場合、開発者は以下の様なコードでRunner経由でAgentを実行します。
main.py
from google.adk.runners import Runner
def get_current_time():
pass
my_agent = Agent(...)
runner = Runner(agent=my_agent, tools=[get_current_time], ......)
user_id = ...
session_id = ...
content = "ユーザーのメッセージ"
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
print(event)
これまでADKでは特殊な方法(カスタムAgentや、カスタムFlow)を除いて、LLMや、ツールが例外を発生させた場合(クォータエラーや通信エラー、ツールの作りの問題で発生する例外等)、その結果をLLMに返してリトライするなどはせず、上記のrunner.run_async
まで例外がスローされ、runnerの外側で例外処理をする必要がありました。
上記であれば get_current_time
の内部で例外が発生した場合、例外を発生した旨をLLMにわたすことなく、runnerの外までその例外が伝わってしまっていました。この場合、エラーが発生した情報をLLM (Agent) に伝えて処理を継続させるには、呼び出し側のコードで再度リクエストを組み立てる必要があり、手間がかかっていました。
これらの課題を解決しうる新機能が今回追加されたPluginのCallback(というより拡張ポイント)on_tool_error_callback
, on_model_error_callback
です。
これらのCallbackは名前の通り、それぞれの箇所で例外が発生した場合に処理を差し込むことができる機能です。
実際に使ってみよう
以前のコード
今まで例えばツール内で例外が発生していて、その処理をリトライしたい場合は以下の様にRunner外で例外を補足して、再処理する必要がありました。
import datetime
from typing import Any, Optional
from google.adk.agents.llm_agent import Agent
from google.adk.runners import InMemoryRunner
from google.adk.tools import ToolContext, BaseTool
from google.genai import types
import dotenv
dotenv.load_dotenv()
def now_tool(tool_context: ToolContext):
"""
現在時刻を返却します。
"""
retry = tool_context.state.get("retry", 0)
if retry > 2:
tool_context.state["retry"] = 0
return datetime.datetime.now().isoformat()
raise RuntimeError("なにかに失敗しました。")
root_agent = Agent(
model='gemini-2.5-flash',
name='root_agent',
description='時計エージェント',
instruction="""
あなたは現在時刻を教える時計エージェントです。 `now_tool` を 利用して、現在時刻を取得してユーザーの問いに答えてください。
`now_tool` はよく `失敗しました` の様な現在時刻ではない失敗した旨を返却します。その場合は成功するまで `now_tool`を呼び出してください。
""",
tools=[now_tool]
)
async def call_agent_async(query: str, runner: InMemoryRunner, user_id, session_id):
"""Sends a query to the agent and prints the final response."""
print(f"\n>>> User Query: {query}")
content = types.Content(role='user', parts=[types.Part(text=query)])
final_response_text = "Agent did not produce a final response."
state_delta = {"retry": 0}
while True:
try:
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content, state_delta=state_delta):
if event.is_final_response():
if event.content and event.content.parts:
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
print(f"{part.text}")
if part.function_call and not part.function_response:
print(f"{part.function_call.name} args: {part.function_call.args}")
if part.function_response:
print(f"{part.function_response.name} response: {part.function_response.response}")
break
except Exception as e:
session = await runner.session_service.get_session(app_name="test", session_id=session_id, user_id=user_id)
session.state["retry"] = session.state.get("retry", 0) + 1
state_delta = session.state
print(e)
print(f"{final_response_text}")
async def main():
runner = InMemoryRunner(
app_name="test",
agent=root_agent,
)
await runner.session_service.create_session(app_name="test", user_id="test", session_id="test")
await call_agent_async("今何時", runner, "test", "test")
import asyncio
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"An error occurred: {e}")
以下のように確かに再実行はできますが、コードとしてはあまりきれいには見えません。
ご覧の通り再実行は可能ですが、例外処理のために while True ループや try-except ブロックが必要となり、コードが複雑になりがちです。
❯ uv run new_callback/agent.py
>>> User Query: 今何時
Warning: there are non-text parts in the response: ['function_call'], returning concatenated text result from text parts. Check the full candidates.content.parts accessor to get the full model response.
error callbackを利用したコード
では今回追加されたPluginのCallbackを利用したコードを書いてみます。
import datetime
from typing import Any, Optional
from google.adk.agents.llm_agent import Agent
from google.adk.runners import InMemoryRunner
from google.adk.tools import ToolContext, BaseTool
from google.adk.plugins import BasePlugin
from google.genai import types
import dotenv
dotenv.load_dotenv()
def now_tool(tool_context: ToolContext):
"""
現在時刻を返却します。
"""
retry = tool_context.state.get("retry", 0)
if retry > 2:
tool_context.state["retry"] = 0
return datetime.datetime.now().isoformat()
raise RuntimeError("なにかに失敗しました。")
class HandleErrorPlugin(BasePlugin):
def __init__(self):
super().__init__(name="LoggerPlugin")
async def on_tool_error_callback(self, tool: BaseTool,
tool_args: dict[str, Any],
tool_context: ToolContext,
error: Exception) -> Optional[str]:
if tool.name == "now_tool":
tool_context.state["retry"] = tool_context.state.get("retry", 0) + 1
return "現在時刻の取得に失敗しました。"
raise error
root_agent = Agent(
model='gemini-2.5-flash',
name='root_agent',
description='時計エージェント',
instruction="""
あなたは現在時刻を教える時計エージェントです。 `now_tool` を 利用して、現在時刻を取得してユーザーの問いに答えてください。
`now_tool` はよく `失敗しました` の様な現在時刻ではない失敗した旨を返却します。その場合は成功するまで `now_tool`を呼び出してください。
""",
tools=[now_tool]
)
async def call_agent_async(query: str, runner: InMemoryRunner, user_id, session_id):
"""Sends a query to the agent and prints the final response."""
print(f"\n>>> User Query: {query}")
content = types.Content(role='user', parts=[types.Part(text=query)])
final_response_text = "Agent did not produce a final response."
async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
if event.is_final_response():
if event.content and event.content.parts:
final_response_text = event.content.parts[0].text
elif event.actions and event.actions.escalate:
final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
break
if event.content and event.content.parts:
for part in event.content.parts:
if part.text:
print(f"{part.text}")
if part.function_call and not part.function_response:
print(f"{part.function_call.name} args: {part.function_call.args}")
if part.function_response:
print(f"{part.function_call.name} args: {part.function_call.args} response: {part.function_response.response}")
print(f"{final_response_text}")
async def main():
runner = InMemoryRunner(
app_name="test",
agent=root_agent,
plugins=[
HandleErrorPlugin(),
]
)
await runner.session_service.create_session(app_name="test", user_id="test", session_id="test")
await call_agent_async("今何時", runner, "test", "test")
import asyncio
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
print(f"An error occurred: {e}")
try-except
やwhile True
が消えてかなりスッキリしました。またrunner内部で処理されるため、stateの管理も簡単に行えています。
実際の実行結果が以下です。
>>> User Query: 今何時
Warning: there are non-text parts in the response: ['function_call'], returning concatenated text result from text parts. Check the full candidates.content.parts accessor to get the full model response.
まとめ
いかがでしたでしょうか?
今回は新たに追加された on_tool_error_callback
, on_model_error_callback
について解説しました。エラー処理がだいぶスッキリ書けるようになりましたね。
今回のサンプルコードは、わかりやすい on_tool_error_callback
を使いましたが、
モデル自体の例外を処理する場合は on_model_error_callback
を利用します。
今回の1.9.0で追加された 「GeminiのRetryOptions」も含めて、LLMに対するエラー処理とリトライ処理をかなり簡単にかけるようになったと思います。
エラー処理はサンプルコードを書くぐらいではそれほど使うものではありませんが、実運用を考えると必須になってくる部分ですので、利用するシーンが多く、とても利便性が上がった更新でしたね。
先日、ADKユーザー向けのイベント「ADK UG #0」が開催され、ADK開発者が集う日本語のDiscordコミュニティが誕生しました。ADKに関する情報交換や議論に興味がある方は、ぜひご参加ください!
また、ADKの最新のコミットログやリリースノートを分かりやすく解説するPodcastを、月・水・金に配信しています。ADKの動向を追いかけたい方は、ぜひ聴いてみてください。
Views: 0