%%writefile browser_use_agent.py
from typing import Literal, Generator, List, Optional, Any, Dict, Mapping, Union
import uuid
import mlflow
from databricks_langchain import (
ChatDatabricks,
)
from langgraph.func import entrypoint, task
from databricks_langchain import ChatDatabricks
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
ChatAgentMessage,
ChatAgentResponse,
ChatContext,
ChatAgentChunk,
)
import subprocess
import asyncio
from browser_use import Agent, Browser, BrowserConfig
from playwright._impl._driver import compute_driver_executable, get_driver_env
# mlflow tracing
mlflow.langchain.autolog()
def install_playwright():
"""Playwrightの依存関係をインストールします。"""
driver_executable, driver_cli = compute_driver_executable()
args = [driver_executable, driver_cli, "install", "--with-deps"]
proc = subprocess.run(
args, env=get_driver_env(), capture_output=True, text=True, check=True
)
return proc == 0
@task
async def call_browser_use_agent(llm, task: str):
"""ブラウザを使用してタスクを実行します。"""
config = BrowserConfig(headless=True, disable_security=True)
browser = Browser(config=config)
agent = Agent(
task=task,
llm=llm,
browser=browser,
)
results = await agent.run()
await browser.close()
return results
@entrypoint()
async def workflow(inputs: dict) -> dict:
"""ブラウザ操作を行うLangGraphのグラフを構築"""
message = inputs.get("messages", [{}])[-1]
llm = inputs.get("llm")
return call_browser_use_agent(llm, message.get("content", None))
class BrowserUseChatAgent(ChatAgent):
def __init__(self, llm):
"""LangGraphのグラフを指定して初期化"""
self.llm = llm
def predict(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> ChatAgentResponse:
"""
指定されたチャットメッセージリストを使用して回答を生成する
Args:
messages (list[ChatAgentMessage]): チャットエージェントメッセージのリスト。
context (Optional[ChatContext]): オプションのチャットコンテキスト。
custom_inputs (Optional[dict[str, Any]]): カスタム入力のオプション辞書。
Returns:
ChatAgentResponse: 予測結果を含むChatAgentResponseオブジェクト。
"""
request = {
"messages": self._convert_messages_to_dict(messages),
"llm": self.llm,
}
results = asyncio.run(workflow.ainvoke(request)).result()
messages = [
ChatAgentMessage(
id=str(uuid.uuid4()),
role="assistant",
content=h.result[-1].extracted_content,
attachments={
"url": h.state.url,
"title": h.state.title,
"screenshot": h.state.screenshot,
},
)
for h in results.history
]
usage = {
"prompt_tokens": results.total_input_tokens(),
"completion_tokens": None,
"total_tokens": results.total_input_tokens(),
}
return ChatAgentResponse(
messages=messages,
usage=usage,
)
def predict_stream(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
"""
指定されたチャットメッセージリストを使用して、非同期的にエージェントを呼び出し、結果を取得します。
Args:
messages (list[ChatAgentMessage]): チャットエージェントメッセージのリスト。
context (Optional[ChatContext]): オプションのチャットコンテキスト。
custom_inputs (Optional[dict[str, Any]]): カスタム入力のオプション辞書。
Returns:
ChatAgentResponse: 予測結果を含むChatAgentResponseオブジェクト。
"""
request = {
"messages": self._convert_messages_to_dict(messages),
"llm": self.llm,
}
results = asyncio.run(workflow.ainvoke(request)).result()
for h in results.history:
delta = ChatAgentMessage(
id=str(uuid.uuid4()),
role="assistant",
content=h.result[-1].extracted_content,
attachments={
"url": h.state.url,
"title": h.state.title,
"screenshot": h.state.screenshot,
},
)
yield ChatAgentChunk(
delta=delta,
usage={
"prompt_tokens": h.metadata.input_tokens,
"completion_tokens": None,
"total_tokens": h.metadata.input_tokens,
},
)
# PlaywrightをInstall
install_playwright()
# DatabricksネイティブのClaude 3.7 SonnetをLLMとして利用
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
llm = ChatDatabricks(model=LLM_ENDPOINT_NAME)
AGENT = BrowserUseChatAgent(llm)
mlflow.models.set_model(AGENT)