土曜日, 8月 2, 2025
土曜日, 8月 2, 2025
- Advertisment -
ホームニューステックニュースAgent Development Kit 1.9.0 で追加された 新たなPluginのCallback

Agent Development Kit 1.9.0 で追加された 新たなPluginのCallback


こんにちは、サントリーこと大橋です。

今日(2025/07/24)Agent Development Kit(以下 ADK) 1.9.0がリリースされました。

https://github.com/google/adk-python/releases/tag/v.1.9.0

主な更新内容は

  • adk webで起動するFastAPIサーバーをモジュール化し、再利用・カスタマイズしやすい形式に変更
  • 新たなPluginのCallback(on_tool_error_callback, on_model_error_callback)追加
  • GeminiのRetryOptions

今回はこの中で「新たなPluginのCallback」について解説していきます。

ADKのPluginとCallback

ADKには各種処理に対して、処理を差し込むことができる「Plugin」及び「Callback」呼ばれる機能があります。
Callbackは、任意のAgentに対して「ログの挿入」「リクエストの変更」「レスポンスの改変」「処理の中断」といった処理を差し込むための仕組みです。ADKの機能を拡張する上で、非常に重要な役割を担っています。
PluginはこのCallbackの機能を全Agentに対して適用できる機能でADK 1.7.0で追加されました。

ADKでは1.8.0までは以下の6つのCallbackが利用できました。

  • Agentの実行前後
    • before_agent_callback
    • after_agent_callback
  • LLMの実行前後
    • before_model_callback
    • after_model_callback
  • ツールの実行前後
    • before_tool_callback
    • after_tool_callback

基本的には各種処理の前後に処理を差し込めるようになっており、ADKの状態(state)の変更や、リクエスト(ユーザからのメッセージや、プロンプト、ツール引数など)の変更、レスポンスの変更(LLMからの変更、Agent自体のレスポンスの変更、ツールレスポンスの変更)が行えました。

課題:従来のエラーハンドリング

ここで、これまでのADKにおけるエラー処理の課題について見てみましょう。
従来のADKでは、「LLMやツール内で例外が発生した際、その例外がRunnerの内部で処理されず、呼び出し元までスローされてしまう」という課題がありました。

RunnerはADKのAgentを実行するための実行クラスです。
通常ADKでAgentを作成した場合、開発者は以下の様なコードでRunner経由でAgentを実行します。

main.py

from google.adk.runners import Runner



def get_current_time():
   
   pass


my_agent = Agent(...) 


runner = Runner(agent=my_agent, tools=[get_current_time], ......)

user_id = ...
session_id = ...
content = "ユーザーのメッセージ"


async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):
    print(event)

これまでADKでは特殊な方法(カスタムAgentや、カスタムFlow)を除いて、LLMや、ツールが例外を発生させた場合(クォータエラーや通信エラー、ツールの作りの問題で発生する例外等)、その結果をLLMに返してリトライするなどはせず、上記のrunner.run_asyncまで例外がスローされ、runnerの外側で例外処理をする必要がありました。

上記であれば get_current_timeの内部で例外が発生した場合、例外を発生した旨をLLMにわたすことなく、runnerの外までその例外が伝わってしまっていました。この場合、エラーが発生した情報をLLM (Agent) に伝えて処理を継続させるには、呼び出し側のコードで再度リクエストを組み立てる必要があり、手間がかかっていました。

これらの課題を解決しうる新機能が今回追加されたPluginのCallback(というより拡張ポイント)on_tool_error_callback, on_model_error_callback です。

これらのCallbackは名前の通り、それぞれの箇所で例外が発生した場合に処理を差し込むことができる機能です。

実際に使ってみよう

以前のコード

今まで例えばツール内で例外が発生していて、その処理をリトライしたい場合は以下の様にRunner外で例外を補足して、再処理する必要がありました。

import datetime
from typing import Any, Optional

from google.adk.agents.llm_agent import Agent
from google.adk.runners import InMemoryRunner
from google.adk.tools import ToolContext, BaseTool
from google.genai import types

import dotenv

dotenv.load_dotenv()


def now_tool(tool_context: ToolContext):
    """
    現在時刻を返却します。
    """
    retry = tool_context.state.get("retry", 0)

    if retry > 2:
        tool_context.state["retry"] = 0
        return datetime.datetime.now().isoformat()

    raise RuntimeError("なにかに失敗しました。")


root_agent = Agent(
    model='gemini-2.5-flash',
    name='root_agent',
    description='時計エージェント',
    instruction="""
    あなたは現在時刻を教える時計エージェントです。 `now_tool` を 利用して、現在時刻を取得してユーザーの問いに答えてください。
    `now_tool` はよく `失敗しました` の様な現在時刻ではない失敗した旨を返却します。その場合は成功するまで `now_tool`を呼び出してください。
    """,
    tools=[now_tool]
)


async def call_agent_async(query: str, runner: InMemoryRunner, user_id, session_id):
    """Sends a query to the agent and prints the final response."""
    print(f"\n>>> User Query: {query}")

    content = types.Content(role='user', parts=[types.Part(text=query)])
    final_response_text = "Agent did not produce a final response."

    state_delta = {"retry": 0}

    while True:
        try:
            async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content, state_delta=state_delta):

                if event.is_final_response():
                    if event.content and event.content.parts:
                        
                        final_response_text = event.content.parts[0].text
                    elif event.actions and event.actions.escalate:  
                        final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
                    
                    break  

                if event.content and event.content.parts:

                    for part in event.content.parts:
                        if part.text:
                            print(f"{part.text}")

                        if part.function_call and not part.function_response:
                            print(f"{part.function_call.name} args: {part.function_call.args}")
                        if part.function_response:
                            print(f"{part.function_response.name} response: {part.function_response.response}")
            break
        except Exception as e:
            session = await runner.session_service.get_session(app_name="test", session_id=session_id, user_id=user_id)
            session.state["retry"] = session.state.get("retry", 0) + 1
            state_delta = session.state
            print(e)

    print(f"{final_response_text}")


async def main():
    runner = InMemoryRunner(
        app_name="test",
        agent=root_agent,
    )

    await runner.session_service.create_session(app_name="test", user_id="test", session_id="test")
    await call_agent_async("今何時", runner, "test", "test")


import asyncio

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred: {e}")

以下のように確かに再実行はできますが、コードとしてはあまりきれいには見えません。
ご覧の通り再実行は可能ですが、例外処理のために while True ループや try-except ブロックが必要となり、コードが複雑になりがちです。

❯ uv run new_callback/agent.py

>>> User Query: 今何時
Warning: there are non-text parts in the response: ['function_call'], returning concatenated text result from text parts. Check the full candidates.content.parts accessor to get the full model response.

error callbackを利用したコード

では今回追加されたPluginのCallbackを利用したコードを書いてみます。

import datetime
from typing import Any, Optional

from google.adk.agents.llm_agent import Agent
from google.adk.runners import InMemoryRunner
from google.adk.tools import ToolContext, BaseTool
from google.adk.plugins import BasePlugin
from google.genai import types

import dotenv

dotenv.load_dotenv()


def now_tool(tool_context: ToolContext):
    """
    現在時刻を返却します。
    """


    retry = tool_context.state.get("retry", 0)

    if retry > 2:
        tool_context.state["retry"] = 0
        return datetime.datetime.now().isoformat()

    raise RuntimeError("なにかに失敗しました。")


class HandleErrorPlugin(BasePlugin):

    def __init__(self):
        super().__init__(name="LoggerPlugin")

    async def on_tool_error_callback(self, tool: BaseTool,
      tool_args: dict[str, Any],
      tool_context: ToolContext,
      error: Exception) -> Optional[str]:
        if tool.name == "now_tool":
            tool_context.state["retry"] = tool_context.state.get("retry", 0) + 1
            return "現在時刻の取得に失敗しました。"

        raise error

root_agent = Agent(
    model='gemini-2.5-flash',
    name='root_agent',
    description='時計エージェント',
    instruction="""
    あなたは現在時刻を教える時計エージェントです。 `now_tool` を 利用して、現在時刻を取得してユーザーの問いに答えてください。
    `now_tool` はよく `失敗しました` の様な現在時刻ではない失敗した旨を返却します。その場合は成功するまで `now_tool`を呼び出してください。
    """,
    tools=[now_tool]
)


async def call_agent_async(query: str, runner: InMemoryRunner, user_id, session_id):
    """Sends a query to the agent and prints the final response."""
    print(f"\n>>> User Query: {query}")

    content = types.Content(role='user', parts=[types.Part(text=query)])
    final_response_text = "Agent did not produce a final response."
    async for event in runner.run_async(user_id=user_id, session_id=session_id, new_message=content):

        if event.is_final_response():
            if event.content and event.content.parts:
                
                final_response_text = event.content.parts[0].text
            elif event.actions and event.actions.escalate:  
                final_response_text = f"Agent escalated: {event.error_message or 'No specific message.'}"
            
            break  
        if event.content and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    print(f"{part.text}")

                if part.function_call and not part.function_response:
                    print(f"{part.function_call.name} args: {part.function_call.args}")
                if part.function_response:
                    print(f"{part.function_call.name} args: {part.function_call.args} response: {part.function_response.response}")
    print(f"{final_response_text}")


async def main():
    runner = InMemoryRunner(
        app_name="test",
        agent=root_agent,
        plugins=[
            HandleErrorPlugin(),
        ]
    )

    await runner.session_service.create_session(app_name="test", user_id="test", session_id="test")
    await call_agent_async("今何時", runner, "test", "test")


import asyncio

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except Exception as e:
        print(f"An error occurred: {e}")

try-exceptwhile Trueが消えてかなりスッキリしました。またrunner内部で処理されるため、stateの管理も簡単に行えています。

実際の実行結果が以下です。

>>> User Query: 今何時
Warning: there are non-text parts in the response: ['function_call'], returning concatenated text result from text parts. Check the full candidates.content.parts accessor to get the full model response.

まとめ

いかがでしたでしょうか?
今回は新たに追加された on_tool_error_callback, on_model_error_callbackについて解説しました。エラー処理がだいぶスッキリ書けるようになりましたね。

今回のサンプルコードは、わかりやすい on_tool_error_callbackを使いましたが、
モデル自体の例外を処理する場合は on_model_error_callback を利用します。
今回の1.9.0で追加された 「GeminiのRetryOptions」も含めて、LLMに対するエラー処理とリトライ処理をかなり簡単にかけるようになったと思います。

エラー処理はサンプルコードを書くぐらいではそれほど使うものではありませんが、実運用を考えると必須になってくる部分ですので、利用するシーンが多く、とても利便性が上がった更新でしたね。

先日、ADKユーザー向けのイベント「ADK UG #0」が開催され、ADK開発者が集う日本語のDiscordコミュニティが誕生しました。ADKに関する情報交換や議論に興味がある方は、ぜひご参加ください!

https://discord.gg/BKpGRzjtqZ

また、ADKの最新のコミットログやリリースノートを分かりやすく解説するPodcastを、月・水・金に配信しています。ADKの動向を追いかけたい方は、ぜひ聴いてみてください。

https://www.youtube.com/playlist?list=PL0Zc2RFDZsM_MkHOzWNJpaT4EH5fQxA8n



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -