木曜日, 9月 25, 2025
木曜日, 9月 25, 2025
- Advertisment -
ホームニューステックニュース【Agent Engine の A2A 対応記念】A2A リモートエージェントを Agent Engine にデプロイする

【Agent Engine の A2A 対応記念】A2A リモートエージェントを Agent Engine にデプロイする


はじめに

下記の記事では、「ネット記事の作成業務」のワークフローを実行するマルチエージェントシステムを Agent Engine にデプロイして、A2A で連携させる方法を説明しました。

この際、Agent Engine の前段に Cloud Run を用いて A2A サーバーをデプロイしましたが、Agent Engine が A2A 対応したため、A2A サーバーを別途デプロイする必要がなくなりました。 また、ADK が標準提供する RemoteA2aAgent クラスを利用すると、ローカルエージェントとほぼ同じ方法で Agent Engine にデプロイした A2A リモートエージェントをサブエージェントに組み込むことができます。つまり、「ネット記事の作成業務」のワークフローを実行するマルチエージェントシステムは、次の図のように実現されます。


ADK + Agent Engine による A2A マルチエージェントシステムの構成

この記事では、上図のシステムをデプロイする手順を具体的に説明します。

環境準備

Vertex AI workbench のノートブック上で実装しながら説明するために、まずは、ノートブックの実行環境を用意します。新しいプロジェクトを作成したら、Cloud Shell のコマンド端末を開いて、必要な API を有効化します。

gcloud services enable \
  aiplatform.googleapis.com \
  notebooks.googleapis.com \
  cloudresourcemanager.googleapis.com

続いて、デプロイの際に使用するステージングバケットを用意して、Workbench のインスタンスを作成します。

PROJECT_ID=$(gcloud config list --format 'value(core.project)')
BUCKET="gs://${PROJECT_ID}"
gsutil mb -b on -l us-central1 $BUCKET
gcloud workbench instances create agent-development \
  --project=$PROJECT_ID \
  --location=us-central1-a \
  --machine-type=e2-standard-2

クラウドコンソールのナビゲーションメニューから「Vertex AI」→「Workbench」を選択すると、作成したインスタンス agent-development があります。インスタンスの起動が完了するのを待って、「JUPYTERLAB を開く」をクリックしたら、「Python 3(ipykernel)」の新規ノートブックを作成します。

この後は、ノートブックのセルでコードを実行していきます。

まず、Agent Development Kit (ADK) と A2A SDK のパッケージをインストールします。

%pip install --user \
    google-adk==1.14.1 \
    google-genai==1.36.0 \
    google-cloud-aiplatform==1.113.0 \
    a2a-sdk==0.3.5

インストール時に表示される ERROR: pip's dependency resolver does not currently take into... というエラーは無視してください。

インストールしたパッケージを利用可能にするために、次のコマンドでカーネルを再起動します。

import IPython
app = IPython.Application.instance()
_ = app.kernel.do_shutdown(True)

再起動を確認するポップアップが表示されるので [Ok] をクリックします。

続いて、必要なモジュールをインポートして、実行環境の初期設定を行います。

import httpx, json, os, uuid
from google.auth import default
from google.auth.transport.requests import Request
import vertexai


from google.genai.types import Part, Content, HttpOptions
from google.adk.agents.callback_context import CallbackContext
from google.adk.agents.llm_agent import LlmAgent
from google.adk.agents.remote_a2a_agent import RemoteA2aAgent
from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.models import LlmResponse, LlmRequest
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService, VertexAiSessionService


from a2a.client import ClientConfig, ClientFactory
from a2a.types import AgentCard, TransportProtocol
from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor
from google.adk.a2a.utils.agent_card_builder import AgentCardBuilder


from vertexai import agent_engines
from vertexai.preview.reasoning_engines import A2aAgent
from vertexai.preview.reasoning_engines.templates.a2a import create_agent_card


[PROJECT_ID] = !gcloud config list --format 'value(core.project)'
[PROJECT_NUMBER] = !gcloud projects describe {PROJECT_ID} --format='value(projectNumber)'
LOCATION = 'us-central1'

vertexai.init(project=PROJECT_ID, location=LOCATION,
              staging_bucket=f'gs://{PROJECT_ID}')

os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
os.environ['GOOGLE_CLOUD_LOCATION'] = LOCATION
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'True'

また、ノートブック上でエージェントを利用するための簡易アプリのクラスを定義します。

class LocalApp:
    def __init__(self, agent, app_name='default_app', user_id='default_user'):
        self._agent = agent
        self._app_name = app_name
        self._user_id = user_id
        self._runner = Runner(
            app_name=self._app_name,
            agent=self._agent,
            artifact_service=InMemoryArtifactService(),
            session_service=InMemorySessionService(),
            memory_service=InMemoryMemoryService(),
        )
        self._session = None
        
    async def stream(self, query):
        if not self._session:
            self._session = await self._runner.session_service.create_session(
                app_name=self._app_name,
                user_id=self._user_id,
                session_id=uuid.uuid4().hex,
            )
        content = Content(role='user', parts=[Part(text=query)])
        async_events = self._runner.run_async(
            user_id=self._user_id,
            session_id=self._session.id,
            new_message=content,
        )
        result = []
        async for event in async_events:
            if (event.content and event.content.parts):
                response = '\n'.join([p.text for p in event.content.parts if p.text])
                if response:
                    print(response)
                    result.append(response)
        return result

リモートエージェントの定義と Agent Engine へのデプロイ

今回の構成では、次の 4 つのエージェントをリモートエージェントとして使用します。

  • research_agent1:調査レポートの調査項目を選定する
  • research_agent2:選定した項目に基づいて調査レポートを作成する
  • writer_agent:記事を作成する
  • review_agent:記事をレビューする

これらのエージェントを定義した後に、Agent Engine にデプロイして、A2A リモートエージェントとして呼び出せるようにします。まずは、それぞれのエージェントを定義します。

research_agent1

instruction = '''
あなたの役割は、記事の執筆に必要な情報を収集して調査レポートにまとめる事です。
指定されたテーマの記事を執筆する際に参考となるトピックを5項目程度のリストにまとめます。
後段のエージェントがこのリストに基づいて、調査レポートを作成します。

* 出力形式
日本語で出力。
'''

research_agent1 = LlmAgent(
    name='research_agent1',
    model='gemini-2.5-flash',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(テーマ選定)',
    instruction=instruction,
)

research_agent2

instruction = '''
あなたの役割は、記事の執筆に必要な情報を収集して調査レポートにまとめる事です。
前段のエージェントは、5項目程度の調査対象トピックを指定します。

* 出力形式
日本語で出力。
調査レポートは、トピックごとに客観的情報をまとめます。各トピックについて、5文以上の長さで記述すること。
'''

research_agent2 = LlmAgent(
    name='research_agent2',
    model='gemini-2.5-flash',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(レポート作成)',
    instruction=instruction,
)

writer_agent

instruction = '''
あなたの役割は、特定のテーマに関する気軽な読み物記事を書くことです。
記事の「テーマ」と、その内容に関連する「調査レポート」が与えられるので、
調査レポートに記載の客観的事実に基づいて、信頼性のある読み物記事を書いてください。

**出力条件**
- トピックに関してある程度の基礎知識がある読者を前提として、数分で気軽に読める内容にしてください。
- 比較的カジュアルで語りかける口調の文章にします。
- 思考過程は出力せずに、最終結果だけを出力します。
- 記事タイトルは付けないで、次の構成で出力します。各セクションタイトルは、内容に合わせてアレンジしてください。
0. 導入:セクションタイトルを付けないで、この記事を読みたくなる導入を1〜2文にまとめます。
1. 概要:トピックの全体像をまとめて簡単に説明します。
2. 最新情報:特に注目したい新しい情報を取り上げます。
3. 実践:トピックに関して、読者自身がやってみるとよさそうな事を1つ紹介します。
4. まとめ

- 各セクションのタイトルはマークダウンヘッダ ## 付けます。必要に応じて箇条書きのマークダウンを使用します。
- それ以外のマークダウンによる装飾は行いません。

**レビュアーの指示に応じた修正**
- レビュアーが修正ポイントを提示した際は、出力条件にこだわらずに、直前の記事を指示に従って修正してください。
- 修正ポイント以外の部分は、修正する必要ありません。
'''

writer_agent = LlmAgent(
    name='writer_agent',
    model='gemini-2.5-flash',
    description='特定のテーマに関する読み物記事を書くエージェント',
    instruction=instruction,
)

review_agent

instruction = f'''
あなたの役割は、読み物記事をレビューして、記事の条件にあった内容にするための改善コメントを与える事です。

* 記事の条件
- 記事は、はじめに40文字程度のタイトルがあること。
 今日から役立つ生活情報があって「すぐに読まなきゃ」と読者が感じるタイトルにすること。
 タイトルはマークダウンヘッダ # をつけること。
- タイトルの直後に「なぜいまこのテーマを取り上げるのか」をまとめた導入を加えて、読者にこの記事を読む動機づけを与えます。
- 各セクションのサブタイトルには、絵文字を用いて親しみやすさを出すこと。
- 読者が今日から実践できる具体例が3つ以上紹介されていること。

* 出力形式
- 日本語で出力。
- はじめに、記事の良い点を説明します。
- 次に、修正ポイントを箇条書きで出力します。
'''

review_agent = LlmAgent(
    name='review_agent',
    model='gemini-2.5-flash',
    description='読み物記事をレビューするエージェント',
    instruction=instruction,
)

これらのエージェントを A2A リモートエージェントとして、Agent Engine にデプロイします。なお、Agent Engine にデプロイしたエージェントにはユニークなリソース ID が付与されますが、この後で説明するように、このリソース ID から A2A のエージェントカードを取得する URL が決まります。

まず、エージェント名から対応するリソース ID を取得する関数 get_agent_resource() を用意します。

def get_agent_resource(agent_name):
    for agent in agent_engines.list():
        if agent.display_name == agent_name:
            return agent.resource_name
    return None

続いて、A2A サーバー機能に関連する関数を定義します。


class MyVertexAiSessionService(VertexAiSessionService):

    async def create_session(self, app_name, user_id, state={}, session_id=None):
        
        session = await super().create_session(
            app_name=app_name,
            user_id=user_id,
            state=state,
        )
        return session

    async def get_session(self, app_name, user_id, session_id):
        
        try:
            session = await super().get_session(
                app_name=app_name,
                user_id=user_id,
                session_id=session_id,
            )
            return session
        except:
            return None


def get_create_runner_class(agent, resource_name):
    async def create_runner():
        return Runner(
            
            app_name=resource_name or agent.name,
            agent=agent,
            artifact_service=InMemoryArtifactService(),
            session_service=MyVertexAiSessionService(),
            memory_service=InMemoryMemoryService(),
        )
    return create_runner


def get_agent_executor_class(agent, resource_name):
    def agent_executor_builder():
        return A2aAgentExecutor(
            runner=get_create_runner_class(agent, resource_name),
        )       
    return agent_executor_builder


async def get_agent_card(agent):
    builder = AgentCardBuilder(agent=agent)
    adk_agent_card = await builder.build()
    return create_agent_card(
        agent_name=adk_agent_card.name,
        description=adk_agent_card.description,
        skills=adk_agent_card.skills
    )

これで必要な準備ができました。次のコードで、先ほど定義した 4 つのエージェントを A2A リモートエージェントとして、Agent Engine にデプロイします。

agents = {
    'research_agent1_a2a': research_agent1,
    'research_agent2_a2a': research_agent2,
    'writer_agent_a2a': writer_agent,
    'review_agent_a2a': review_agent,
}

client = vertexai.Client(
    project=PROJECT_ID,
    location=LOCATION,
    http_options=HttpOptions(
        api_version='v1beta1', base_url=f'https://{LOCATION}-aiplatform.googleapis.com/'
    ),
)

for agent_name, agent in agents.items():    
    a2a_agent = A2aAgent(
        agent_card=await get_agent_card(agent),
        agent_executor_builder=get_agent_executor_class(agent, None)
    )
    config={
        'display_name': agent_name,
        'description': a2a_agent.agent_card.description,
        'requirements': [
            'google-adk==1.14.1',
            'google-genai==1.36.0',
            'google-cloud-aiplatform==1.113.0',
            'a2a-sdk==0.3.5'
        ],
        'http_options': {
            'base_url': f'https://{LOCATION}-aiplatform.googleapis.com',
            'api_version': 'v1beta1',
        },
        'staging_bucket': f'gs://{PROJECT_ID}',
    }

    resource_name = get_agent_resource(agent_name)
    if not resource_name:
        
        remote_a2a_agent = client.agent_engines.create(
            agent=a2a_agent,
            config=config,
        )
    
    
    resource_name = get_agent_resource(agent_name)
    a2a_agent = A2aAgent(
        agent_card=await get_agent_card(agent),
        agent_executor_builder=get_agent_executor_class(agent, resource_name)
    )
    remote_a2a_agent = client.agent_engines.update(
        name=resource_name,
        agent=a2a_agent,
        config=config,
    )

クラウドコンソールの上部にある検索バーから「Logs explorer」を検索して開くと、デプロイ中のログが確認できます。「すべてのフィールドを検索」と表示された部分に reasoning engine と入力すると、Agent Engine に関連したログだけが表示されます。

RemoteA2aAgent を用いた root agent の定義

これで 4 つの A2A リモートエージェントが Agent Engine にデプロイできたので、これらをサブエージェントとして組み込んだ root agent(ワークフローを実行するエージェント)を定義します。

冒頭で説明したように、ADK の RemoteA2aAgent クラスを利用して、サブエージェントとして組み込みますが、この際に、A2A サーバーにアクセスする A2A クライアントのファクトリークラスが必要になります。次のコードで、ファクトリークラスを用意します。

class GoogleAuthRefresh(httpx.Auth):
    def __init__(self, scopes):
        self.credentials, _ = default(scopes=scopes)
        self.transport_request = Request()
        self.credentials.refresh(self.transport_request)

    def auth_flow(self, request):
        if not self.credentials.valid:
            self.credentials.refresh(self.transport_request)
        request.headers['Authorization'] = f'Bearer {self.credentials.token}'
        yield request


factory = ClientFactory(
    ClientConfig(
        supported_transports=[TransportProtocol.http_json],
        use_client_preference=True,
        httpx_client=httpx.AsyncClient(
            timeout=60,
            headers={'Content-Type': 'application/json'},
            auth=GoogleAuthRefresh(scopes=['https://www.googleapis.com/auth/cloud-platform']) 
        ),
    ),
)

そして、次のコードで、4 つの A2A リモートエージェントに対応した RemoteA2aAgent クラスのインスタンスを定義します。

resource_name = get_agent_resource('research_agent1_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
research_agent1_remoteA2a = RemoteA2aAgent(
    name='research_agent1',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(テーマ選定)',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

resource_name = get_agent_resource('research_agent2_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
research_agent2_remoteA2a = RemoteA2aAgent(
    name='research_agent2',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(レポート作成)',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

resource_name = get_agent_resource('writer_agent_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
writer_agent_remoteA2a = RemoteA2aAgent(
    name='writer_agent',
    description='特定のテーマに関する読み物記事を書くエージェント',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

resource_name = get_agent_resource('review_agent_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
review_agent_remoteA2a = RemoteA2aAgent(
    name='review_agent',
    description='読み物記事をレビューするエージェント',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

RemoteA2aAgent クラスのインスタンスは、agent_card オプションで指定した URL からエージェントカードを取得した後、エージェントカードから、エージェントがデプロイされた A2A サーバーのエンドポイントの情報を取得して、リモートエージェントにリクエストを送信します。上記のコードからわかるように、エージェントカードの URL は次で与えられます。

https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a/v1/card/

{LOCATION} は Agent Engine のリージョン(今回の例であれば us-central1)で、{resource_name} は該当エージェントのリソース ID になります。

この後は、RemoteA2aAgent クラスのインスタンスをサブエージェントに指定することで、ローカルのサブエージェントを使用する場合と同様に root agent が定義できます。

def get_print_agent(text):
    def before_model_callback(
        callback_context: CallbackContext, llm_request: LlmRequest
    ) -> LlmResponse:
        return LlmResponse(
            content=Content(
                role='model', parts=[Part(text=text)],
            )
        )
    return LlmAgent(
        name='print_agent',
        model='gemini-2.0-flash', 
        description='',
        instruction = '',
        before_model_callback=before_model_callback,
    )

research_agent = SequentialAgent(
    name='research_agent',
    sub_agents=[
        get_print_agent('\n---\n## リサーチエージェントが調査レポートを作成します。\n---\n'),
        get_print_agent('\n## 調査対象のトピックを選定します。\n'),
        research_agent1_remoteA2a,
        get_print_agent('\n## 選定したトピックに基づいて、調査レポートを作成します。\n'),
        research_agent2_remoteA2a,
        get_print_agent('\n#### 調査レポートが準備できました。記事の作成に取り掛かってもよいでしょうか?\n'),
    ],
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント',
)

write_and_review_agent = SequentialAgent(
    name='write_and_review_agent',
    sub_agents=[
        get_print_agent('\n---\n## ライターエージェントが記事を執筆します。\n---\n'),
        writer_agent_remoteA2a,
        get_print_agent('\n---\n## レビューエージェントが記事をレビューします。\n---\n'),
        review_agent_remoteA2a,
       get_print_agent('\n#### レビューに基づいて記事の修正を依頼しますか?\n'),
    ],
    description='記事を作成、レビューする。',
)

root_agent = LlmAgent(
    name='article_generation_flow',
    model='gemini-2.0-flash',
    instruction = '''
何ができるか聞かれた場合は、以下の処理をすることをわかりやすくフレンドリーな文章にまとめて返答してください。

- ユーザーが指定したテーマの記事を作成する業務フローを実行する。
- はじめに、テーマに関する調査レポートを作成する。
- その後、ライターエージェントとレビューエージェントが協力して、編集方針に則した記事を作成する。

ユーザーが記事のテーマを指定した場合は、次のフローを実行します。

1. そのテーマの記事の作成に取り掛かる旨を伝えて、research_agent に転送して、調査レポートを依頼します。
2. ユーザー記事の作成を支持したら、write_and_review_agent に転送して、記事の作成とレビューを依頼します。
3. ユーザーが記事の修正を希望する場合は、write_and_review_agent に転送します。

**条件**
research_agent のニックネームは、リサーチエージェント
write_and_review_agent のネックネームは、ライターエージェントとレビューエージェント

''',
    sub_agents=[
        research_agent,
        write_and_review_agent,
    ],
    description='記事を作成する業務フローを実行するエージェント'
)

実行例

それでは、この root agent を実際に使ってみましょう。ここでは、先に用意した簡易アプリで実行します。

client = LocalApp(root_agent)
query = '''
こんにちは。何ができますか?
'''
result = await client.stream(query)

[出力結果]

こんにちは!私は記事を作成する業務フローを実行するエージェントです。
具体的には、ユーザーさんが指定したテーマについて、まずリサーチエージェントが調査レポートを作成します。
その後、ライターエージェントとレビューエージェントが協力して、編集方針に沿った記事を作成します。
ご希望のテーマがあれば教えてください!
query = '''
「近場で秋を感じる工夫」をテーマに記事を作成してください。
'''
result = await client.stream(query)
承知いたしました。「近場で秋を感じる工夫」というテーマの記事作成に取り掛かります。まずはリサーチエージェントに調査レポートを依頼します。

---
## リサーチエージェントが調査レポートを作成します。
---
...(以下省略)...

この後は、下記の記事の実行例と同じ流れで、記事の作成を進められます。

今回の実装内容をまとめたノートブックが下記にあるので、こちらも参考にしてください。

root agent を Agent Engine にデプロイ

先ほどは root agent 自体はローカルで実行しましたが、root agent も Agent Engine にデプロイして使用することができます。ただし、本記事執筆時点では、先に説明した、A2A クライアントのファクトリークラスと RemoteA2aAgent クラスに少し修正が必要になります。

ここでは、詳細な説明は割愛して、修正後のコードを紹介します。

はじめに、準備として、Agent Engine 上の root agent が A2A リモートエージェントを呼び出せるように、reasoning engine サービスエージェントに AI Platform ユーザーの IAM ロールを割り当てます。Cloud Shell の端末を開いて、次のコマンドを実行してください。

PROJECT_ID=$(gcloud config list --format 'value(core.project)')
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format='value(projectNumber)')
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:service-${PROJECT_NUMBER}@gcp-sa-aiplatform-re.iam.gserviceaccount.com" \
    --role='roles/aiplatform.user'

続いて、ノートブックに戻って、次のコードで root agent を再定義します。

class MyClientFactory(ClientFactory):
    def create(self, card, consumers=None, interceptors=None):
        if not self._config.httpx_client:
            self._config.httpx_client=httpx.AsyncClient(
                timeout=60,
                headers={'Content-Type': 'application/json'},
                auth=GoogleAuthRefresh(scopes=['https://www.googleapis.com/auth/cloud-platform']) 
            )
            self._register_defaults(self._config.supported_transports)
        return super().create(card, consumers, interceptors)


class MyRemoteA2aAgent(RemoteA2aAgent):
    async def _ensure_httpx_client(self):
        if not self._httpx_client:
            self._httpx_client=httpx.AsyncClient(
                timeout=60,
                headers={'Content-Type': 'application/json'},
                auth=GoogleAuthRefresh(scopes=['https://www.googleapis.com/auth/cloud-platform']) 
            )
        return self._httpx_client


factory = MyClientFactory(
    ClientConfig(
        supported_transports=[TransportProtocol.http_json],
        use_client_preference=True,
    )
)

resource_name = get_agent_resource('research_agent1_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
research_agent1_remoteA2a = MyRemoteA2aAgent(
    name='research_agent1',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(テーマ選定)',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

resource_name = get_agent_resource('research_agent2_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
research_agent2_remoteA2a = MyRemoteA2aAgent(
    name='research_agent2',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(レポート作成)',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

resource_name = get_agent_resource('writer_agent_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
writer_agent_remoteA2a = MyRemoteA2aAgent(
    name='writer_agent',
    description='特定のテーマに関する読み物記事を書くエージェント',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

resource_name = get_agent_resource('review_agent_a2a')
a2a_url = f'https://{LOCATION}-aiplatform.googleapis.com/v1beta1/{resource_name}/a2a'
review_agent_remoteA2a = MyRemoteA2aAgent(
    name='review_agent',
    description='読み物記事をレビューするエージェント',
    agent_card=f'{a2a_url}/v1/card',
    a2a_client_factory=factory,
)

def get_print_agent(text):
    def before_model_callback(
        callback_context: CallbackContext, llm_request: LlmRequest
    ) -> LlmResponse:
        return LlmResponse(
            content=Content(
                role='model', parts=[Part(text=text)],
            )
        )
    return LlmAgent(
        name='print_agent',
        model='gemini-2.0-flash', 
        description='',
        instruction = '',
        before_model_callback=before_model_callback,
    )

research_agent = SequentialAgent(
    name='research_agent',
    sub_agents=[
        get_print_agent('\n---\n## リサーチエージェントが調査レポートを作成します。\n---\n'),
        get_print_agent('\n## 調査対象のトピックを選定します。\n'),
        research_agent1_remoteA2a,
        get_print_agent('\n## 選定したトピックに基づいて、調査レポートを作成します。\n'),
        research_agent2_remoteA2a,
        get_print_agent('\n#### 調査レポートが準備できました。記事の作成に取り掛かってもよいでしょうか?\n'),
    ],
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント',
)

write_and_review_agent = SequentialAgent(
    name='write_and_review_agent',
    sub_agents=[
        get_print_agent('\n---\n## ライターエージェントが記事を執筆します。\n---\n'),
        writer_agent_remoteA2a,
        get_print_agent('\n---\n## レビューエージェントが記事をレビューします。\n---\n'),
        review_agent_remoteA2a,
       get_print_agent('\n#### レビューに基づいて記事の修正を依頼しますか?\n'),
    ],
    description='記事を作成、レビューする。',
)

root_agent = LlmAgent(
    name='article_generation_flow',
    model='gemini-2.0-flash',
    instruction = '''
何ができるか聞かれた場合は、以下の処理をすることをわかりやすくフレンドリーな文章にまとめて返答してください。

- ユーザーが指定したテーマの記事を作成する業務フローを実行する。
- はじめに、テーマに関する調査レポートを作成する。
- その後、ライターエージェントとレビューエージェントが協力して、編集方針に則した記事を作成する。

ユーザーが記事のテーマを指定した場合は、次のフローを実行します。

1. そのテーマの記事の作成に取り掛かる旨を伝えて、research_agent に転送して、調査レポートを依頼します。
2. ユーザー記事の作成を支持したら、write_and_review_agent に転送して、記事の作成とレビューを依頼します。
3. ユーザーが記事の修正を希望する場合は、write_and_review_agent に転送します。

**条件**
research_agent のニックネームは、リサーチエージェント
write_and_review_agent のネックネームは、ライターエージェントとレビューエージェント

''',
    sub_agents=[
        research_agent,
        write_and_review_agent,
    ],
    description='記事を作成する業務フローを実行するエージェント'
)

そして、次のコードで、root agent を Agent Engine にデプロイします。

remote_agent = agent_engines.create(
    agent_engine=root_agent,
    display_name='article_generation_flow_agent',
    requirements=[
        'google-adk==1.14.1',
        'google-genai==1.36.0',
        'google-cloud-aiplatform==1.113.0',
        'a2a-sdk==0.3.5',
    ],
)

また、Agent Engine 上のリモートエージェントを実行する簡易アプリのクラスを定義します。

class RemoteApp:
    def __init__(self, remote_agent, user_id='default_user'):
        self._remote_agent = remote_agent
        self._user_id = user_id
        self._session = None

    async def _stream(self, query):
        if not self._session:
            self._session = await remote_agent.async_create_session(user_id=self._user_id)
        events = self._remote_agent.async_stream_query(
            user_id=self._user_id,
            session_id=self._session['id'],
            message=query,
        )
        result = []
        async for event in events:
            if ('content' in event and 'parts' in event['content']):
                response = '\n'.join(
                    [p['text'] for p in event['content']['parts'] if 'text' in p]
                )
                print(response)
                result.append(response)
        return result

    async def stream(self, query):
        for _ in range(5):
            result = await self._stream(query)
            if result:
                return result
            time.sleep(3)
        return 'Error: No response from Agent Engine.'

この後は、次のコードで root agent を実行します。

client = RemoteApp(remote_agent)
query = '''
こんにちは。何ができますか?
'''
result = await client.stream(query)

[出力結果]

こんにちは!私は、記事を作成する業務フローを実行するエージェントです。
ユーザーさんが指定したテーマの記事を作成することができます。

具体的には、まずテーマについてのリサーチエージェントが調査レポートを作成します。
その後、ライターエージェントとレビューエージェントが協力して、編集方針に沿った記事を作成します。
どんなテーマの記事をご希望ですか?

この後の実行の流れは、ローカルの場合と同様です。

まとめ

この記事では、Agent Engine に複数の A2A リモートエージェントをデプロイした上で、これらを連携したワークフローを実行する例を紹介しました。ここでは特に、A2A サーバーと A2A クライアントの双方で、ADK が標準提供するクラスを利用した点に注目してください。

  • サーバー側:A2A サーバーの機能を提供する A2aAgentExecutor クラス
  • クライアント側:A2A リモートエージェントをサブエージェントとして組み込む RemoteA2aAgent クラス

下記の記事では独自実装の A2A サーバーを Cloud Run にデプロイした上で、独自実装の A2A クライアント機能をコールバック関数に組み込むという工夫をしました。

これに比べると、今回の実装例では比較的シンプルなコードになっていることがわかります。

このように、ADK の A2A 対応機能と Agent Engine の A2A 対応が連携することで、より簡単に A2A が利用できます。 詳細な説明は割愛しますが、上記の 2 つのクラスが連携する事で、root agent のセッション情報と A2A リモートエージェントのセッション情報が適切に同期されて、それぞれの A2A リモートエージェントは、ワークフロー全体の処理の流れを理解した上で、自身が担当する処理を実行していきます。

本文冒頭の注意書きにあるように、本記事執筆時点では、ADK の A2A 対応機能は「Experimental」フェーズのため、Wrapper クラスを利用するなど、多少のワークアラウンドが必要でしたが、このあたりは今後のバージョンアップで解消されていくでしょう。ADK と Agent Engine の進化には今後も要注目ですね。



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -