水曜日, 5月 14, 2025
ホームニューステックニュースPydantic AIで作る!実践Text-to-SQLシステム構築ガイド 〜自然言語によるデータ抽出の自動化で分析業務を効率化〜

Pydantic AIで作る!実践Text-to-SQLシステム構築ガイド 〜自然言語によるデータ抽出の自動化で分析業務を効率化〜



こんにちは、Ubieでアナリティクスエンジニア/データアナリストをしているmatsu-ryuです。

普段は、Ubieが提供するサービスから得られる様々なデータを活用し、「テクノロジーで人々を適切な医療に案内する」というミッションの実現に向けて取り組んでいます。

皆さんの職場では、こんなやり取りはありませんか?

「先月のカテゴリ別売上トップ3、都道府県別で出せますか?」
「レビュー評価が星1つの商品のリストと、その商品を買ったユーザーのリストをお願いします。」

データドリブンな意思決定が重視される昨今、こうしたデータ抽出・分析の依頼は日常的に発生します。しかし、その裏側では多くの組織が共通の課題を抱えています。

  • SQLの壁: 分析したい人が必ずしもSQLを書けるわけではありません。エンジニアに依頼が集中しがちです。
  • コミュニケーションの難しさ: 依頼内容の微妙なニュアンスを正確に伝え、認識の齟齬なくSQLに落とし込むのは手間がかかります。
  • 見えないコスト: 一つ一つは「簡単な依頼」に見えても、積み重なると無視できない工数となり、エンジニアがより付加価値の高い業務に集中する時間を奪ってしまいます。

もちろん、高機能なBIツールも存在しますが、例えば「購入後に低評価レビューを付けたユーザーが、その前にどんなページを見ていたか」といった、複数テーブルを複雑に(時には時系列で)組み合わせるような分析は、GUI操作だけでは難しい場面も少なくありません。

そこで、私たちはある事実に気づきました。「普段の分析依頼でアクセスするデータベースのテーブルは、実はかなり限定的なのではないか?」と。

例えばECサイトであれば、注文 (orders)、顧客 (users)、商品 (products)、レビュー (reviews) など、コアとなるテーブルは数十程度かもしれません。

この気づきから、私たちは新たな取り組みに着手しました 「AIエージェントに、限定されたテーブルの中から最適なものを選ばせ、SQLを自動生成してもらおう!」

私たちが目指すのは、まるで優秀なアシスタントに依頼するように、自然な言葉で「〇〇のデータが見たい」と話しかけるだけで、AIがその意図を正確に汲み取り、必要なテーブルを特定し、検証済みのSQLを生成・実行してくれる世界です。SQLが書けないビジネスユーザーでも、これまでエンジニアに依頼しなければ難しかったような分析を、手軽に行えるようにすること。それが私たちのゴールです。

本記事では、pydantic-ai(Pydanticモデルを利用して大規模言語モデル(LLM)からの出力を構造化し、AIエージェントの構築を容易にするライブラリ)と pydantic-graph を用いて、Google BigQueryをデータソースとしたText-to-SQLシステムを構築した際の、具体的なアプローチと実装例をご紹介します。

アプローチの核心:AIに”考えさせる”ためのワークフローとツール

大規模言語モデル(LLM)は驚くほど賢くなりましたが、それでも「ECサイトの売上分析用SQLを作って」のように指示しただけで、完璧な答えが返ってくることは稀です。特に、BigQueryに数千ものテーブルが存在する ような環境では、AIが最適なテーブルを見つけ出すのは困難です。

そこで重要になるのが、「役割分担」「AI自身による情報収集能力」 です。

私たちは、人間がデータ分析を行う際の思考プロセス(①仕様を決める → ②SQLを書く → ③レビューする → ④実行する)を模倣し、それぞれのステップを専門のAIエージェント(ノード)が担当する**「ワークフロー」** を構築しました。このワークフローの設計には pydantic-graph を利用しています。

さらに、各AIエージェントが学習済みの一般的な知識だけに頼るのではなく、実際のデータベースの状況を確認しながら根拠に基づいて判断できるように、「ツール」 という形で特定の機能を与えています。これには pydantic-aiTool 機能を利用します。

このシステムにおける最重要ツールBigQueryTool です。これは、AIエージェントに以下のような能力を与えます。

  • テーブル定義の確認 (get_schema):orders テーブルにはどんなカラムがあるんだっけ?」をAI自身が確認できます。
  • 実データの確認 (sample_data):status カラムには具体的にどんな値が入っているの?」をAI自身が確認できます。
  • SQLのテスト実行 (query): 「このSQL、構文エラーにならないかな?」をAI自身が(LIMIT 付きで)試せます。

これにより、AIはまるでデータベースに直接アクセスできるかのように振る舞い、より正確で信頼性の高い判断を下せるようになります。

ワークフローの全体像と構成要素

このText-to-SQLワークフローを実現するため、役割を分担した複数のPythonファイルが連携することで実現されます。

  1. analysis_context.py (データの入れ物)

    • ワークフロー全体を通じて、各ステップ間で受け渡される情報を一元管理する「状態保持」クラス (AnalysisContext) を定義します。ユーザーの最初の質問、AIが特定したテーブル情報、生成されたSQL、チェック結果、実行結果、最終的な回答、さらには処理の試行回数などが、すべてこのオブジェクトに格納され、ノード間で引き継がれます。
  2. node_defs.py (ワークフローの設計図)

    • pydantic-graph を使い、SQL生成から回答までの処理の流れ、つまり「どのノードがどの順番で実行され、どのような条件で分岐・ループするか」というワークフローの構造自体を定義します。各ノード(ステップ)はクラスとして定義され、その run メソッドが次に実行するノードを返します。
  3. logic/*.py (各ステップの実行部隊)

    • node_defs.py で定義された各ノードクラスが、具体的にどのような処理を行うかを実装したファイル群です。AIへの指示(プロンプト)、pydantic_ai.Agent の設定、ツールの呼び出しなど、各ステップの頭脳となる部分がここに記述されます。
  4. tools/bigquery_tool.py (AIのツール提供)

    • AIエージェントにBigQueryの情報を調べたり、クエリを実行したりする能力を与えるための「ツール」クラス (BigQueryTool) を実装します。logic ファイル内のAgentから呼び出されます。

AnalysisContext: ワークフローの状態を管理する

ワークフロー全体で情報を共有するための「入れ物」となる AnalysisContext の定義は以下のようになります。


from dataclasses import dataclass
from typing import Any, Dict, Union
import pandas as pd 

@dataclass
class AnalysisContext:
    """分析コンテキスト: ワークフロー全体で共有される情報を保持するクラス"""
    query: str                  
    model_type: str             
    tables_info: str = None     
    sql_query: str = None       
    sql_check_result: Dict[str, Any] = None 
    execution_result: Union[pd.DataFrame, str] = None 
    final_answer: str = None    
    workflow_execution_count: int = 0 

各ステップの役割:専門家AIノードの紹介

次に、ワークフローを構成する主要なノード(ステップ)が、それぞれどのような役割を担っているのかを見ていきましょう。

  1. SQL生成ノード (SQLGenerationNode)

    • 担当: ユーザーの自然言語による要求を解釈し、分析に必要なテーブルやカラムを特定(仕様決め)、それに基づいてBigQuery SQLを生成します。SQLチェックで問題が指摘された場合は、フィードバックを元にSQLを修正する役割も担います。
    • 内部動作: logic/sql_generation_logic.py が呼び出され、BigQueryTool を使ってテーブル情報を確認しながら、プロンプトに従ってテーブル情報とSQLクエリを生成します。
  2. SQLチェックノード (SQLCheckNode)

    • 担当: 生成されたSQLが正しいか、意図通りか、安全か、効率的かをレビューします。人間によるコードレビューのプロセスを模倣します。
    • 内部動作: logic/sql_check_node_logic.py が呼び出され、BigQueryToolquery 機能を使ってSQLをテスト実行(LIMIT 10 など)し、プロンプトに記述された観点に基づいてSQLを評価します。
  3. SQL実行ノード (SQLExecutionNode)

    • 担当: レビューを通過したSQLをBigQueryデータベースに対して実行し、結果を取得します。
    • 内部動作: このノードは通常、LLMを使いません。node_defs.py 内で直接 google-cloud-bigquery ライブラリを呼び出し、AnalysisContext 内のSQLクエリを実行します。結果はDataFrameとして AnalysisContext に保存されます。
  4. 回答フォーマットノード (AnswerFormattingNode)

    • 担当: SQLの実行結果(表データ)を、専門家でなくても理解できるように、自然言語で分かりやすく要約・解説します。
    • 内部動作: logic/answer_formatting_logic.py が呼び出され、実行結果のデータと元の質問を元に、プロンプトに従ってユーザー向けの解説文を生成します。

これらのノードが連携し、必要に応じてループ(SQL生成⇔SQLチェック)することで、ユーザーの要求から最終的な回答までの一連の流れを自動化しています。

ワークフローの設計図:node_defs.py の実装

各ノードの役割が分かったところで、それらがどのように連携し、ワークフロー全体の流れを制御しているのかを node_defs.py のコードで見てみましょう。このファイルは、ワークフローの「設計図」や「交通整理役」と言えます。AIワークフローは、AIの予期せぬ応答や外部システムのエラーなど、様々な理由で失敗する可能性があります。そのため、ワークフローを安定して稼働させるためのエラー対策とループ制御の工夫がここに盛り込まれています。


import streamlit as st
from google.cloud import bigquery
from google.oauth2.credentials import Credentials
from dataclasses import dataclass, field
from typing import Union, List, TypedDict
import pandas as pd

from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, End, GraphRunContext


from utils.logger import get_logger
from domain.ai.workflows.sql_generator.nodes.analysis_context import AnalysisContext
from utils.constants import DEFAULT_BILLING_PROJECT_ID

import domain.ai.workflows.sql_generator.nodes.logic.sql_generation_logic as sql_generation_logic
import domain.ai.workflows.sql_generator.nodes.logic.sql_check_node_logic as sql_check_node_logic
import domain.ai.workflows.sql_generator.nodes.logic.answer_formatting_logic as answer_formatting_logic

logger = get_logger()

MAX_EXECUTION_COUNT = 4 



@dataclass
class SQLGenerationNode(BaseNode[AnalysisContext]):
    """SQL生成ノード: ユーザーの質問からSQLを生成・修正する"""
    feedback: str = None 
    agent_messages: List[ModelMessage] = field(default_factory=list) 

    async def run(self, ctx: GraphRunContext) -> Union[End, "SQLCheckNode"]:
        self.context = ctx.state
        logger.info("SQLクエリ生成を開始します")

        
        if self.context.workflow_execution_count >= MAX_EXECUTION_COUNT:
            logger.warning(f"最大試行回数({MAX_EXECUTION_COUNT})に達しました。")
            return End(self.context) 

        try:
            
            result = await sql_generation_logic.run_logic(
                self.context, self.feedback, self.agent_messages
            )
            
            self.context.tables_info = result.data.get("tables_info")
            self.context.sql_query = result.data.get("sql_query")
            self.agent_messages += result.all_messages() 
            logger.debug(f"生成されたSQL: {self.context.sql_query}")
            
            return SQLCheckNode(agent_messages=self.agent_messages)
        except Exception as e:
            logger.error(f"SQLクエリの生成中にエラー: {e}")
            self.context.workflow_execution_count += 1
            
            return SQLGenerationNode(feedback=str(e), agent_messages=self.agent_messages)

@dataclass
class SQLCheckNode(BaseNode[AnalysisContext]):
    """SQLチェックノード: 生成されたSQLをレビューする"""
    agent_messages: List[ModelMessage] = field(default_factory=list) 

    async def run(self, ctx: GraphRunContext) -> Union[End, "SQLGenerationNode", "SQLExecutionNode"]:
        self.context = ctx.state
        logger.info("SQLクエリのチェックを開始します")
        try:
            
            result = await sql_check_node_logic.run_logic(self.context)
            check_result_data = result.data 
            self.context.sql_check_result = check_result_data 

            if check_result_data.get("is_valid"):
                logger.info("SQLクエリは有効です。実行ノードへ進みます。")
                
                return SQLExecutionNode()
            else:
                logger.warning("SQLクエリに問題あり。SQL生成ノードへ戻ります。")
                self.context.workflow_execution_count += 1
                feedback = check_result_data.get("overall_feedback", "SQLに問題があります。修正してください。")
                
                return SQLGenerationNode(feedback=feedback, agent_messages=self.agent_messages)
        except Exception as e:
            logger.error(f"SQLチェック中にエラー: {e}")
            self.context.workflow_execution_count += 1
            
            return SQLGenerationNode(feedback=f"SQLチェック中にエラーが発生しました: {e}", agent_messages=self.agent_messages)

@dataclass
class SQLExecutionNode(BaseNode[AnalysisContext]):
    """SQL実行ノード: 検証済みSQLをBigQueryで実行する"""
    async def run(self, ctx: GraphRunContext) -> Union[End, "SQLGenerationNode", "AnswerFormattingNode"]:
        self.context = ctx.state
        logger.info("SQLクエリの実行を開始します")
        try:
            token = st.session_state.get("token")
            credentials = Credentials(token) if token else None
            client = bigquery.Client(project=DEFAULT_BILLING_PROJECT_ID, credentials=credentials)

            logger.debug(f"Executing SQL: {self.context.sql_query}")
            query_job = client.query(self.context.sql_query)
            df = query_job.to_dataframe()

            self.context.execution_result = df
            logger.info(f"SQLクエリを実行しました: 行数={len(df)}")
            
            return AnswerFormattingNode(agent_messages=[]) 
        except Exception as e:
            logger.error(f"SQLクエリの実行中にエラー: {e}")
            self.context.workflow_execution_count += 1
            
            return SQLGenerationNode(feedback=f"SQLクエリの実行中にエラーが発生しました: {str(e)}", agent_messages=[]) 

@dataclass
class AnswerFormattingNode(BaseNode[AnalysisContext]):
    """回答フォーマットノード: SQL実行結果を自然言語で解説する"""
    agent_messages: List[ModelMessage] = field(default_factory=list) 

    async def run(self, ctx: GraphRunContext) -> End[AnalysisContext]:
        self.context = ctx.state
        logger.info("回答のフォーマットを開始します")
        try:
            
            result = await answer_formatting_logic.run_logic(self.context, self.agent_messages)
            self.context.final_answer = result.data 
            
            
            logger.info("回答のフォーマットが完了しました")
        except Exception as e:
            logger.error(f"回答のフォーマット中にエラー: {e}")
            self.context.final_answer = f"回答生成中にエラーが発生しました: {e}"
        
        return End(self.context)

この node_defs.py では、各ノードの run メソッドが次に実行すべきノードのインスタンス(または処理の終了を示す End)を返すことで、ワークフローが進行します。成功時、失敗時、特定の条件(SQLチェックの結果など)に応じて処理の流れが制御される仕組みです。

ツールを利用したAgent実行の詳細

ワークフローの流れが定義されたところで、各 logic ファイルと tools ファイルで具体的にどのような実装が行われているかを見ていきます。logic については、SQL生成とSQLチェックにフォーカスして解説します。

ツール実装:AIの能力拡張 (tools/bigquery_tool.py)

まず、AIがBigQueryと連携するための主要なコンポーネントとなる BigQueryTool を実装します。このツールは、データベースのスキーマ確認、サンプルデータ取得、クエリ実行といった具体的な操作を提供します。


import re
import json

from google.cloud import bigquery
from google.oauth2.credentials import Credentials
from .utils import to_json 

class BigQueryTool:
    def __init__(self, billing_project_id: str, access_token: str | None = None):
        """BigQueryクライアントを初期化"""
        if access_token:
            credentials = Credentials(access_token)
        else:
            credentials = None 
        self.client = bigquery.Client(
            project=billing_project_id, credentials=credentials
        )

    def get_schema(self, table_id: str) -> str:
        """指定されたテーブルのスキーマ情報(カラム名, 型, モード)をJSON文字列で返す"""
        if not self._validate_table_id(table_id):
             raise ValueError(f"無効なテーブルID形式です: {table_id}")
        try:
            table = self.client.get_table(table_id)
            schema_info = [
                {"name": field.name, "type": field.field_type, "mode": field.mode}
                for field in table.schema
            ]
            return to_json(schema_info)
        except Exception as e:
            raise ValueError(f"テーブル '{table_id}' のスキーマ取得中にエラーが発生しました: {e}") from e

    def sample_data(self, table_id: str, num_rows: int = 5) -> str:
        """指定されたテーブルから指定行数のサンプルデータをJSON文字列で返す"""
        if not self._validate_table_id(table_id):
            raise ValueError(f"無効なテーブルID形式です: {table_id}")
        query = f"SELECT * FROM `{table_id}` LIMIT {num_rows}" 
        try:
            df = self.client.query(query).to_dataframe()
            if df.empty:
                return to_json([]) 
            return to_json(df) 
        except Exception as e:
            raise ValueError(f"テーブル '{table_id}' のサンプルデータ取得中にエラーが発生しました: {e}") from e

    def query(self, query: str, limit: int = -1) -> str:
        """与えられたSQLクエリを実行し、結果をJSON文字列で返す (limit指定可能)"""
        try:
            df = self.client.query(query).to_dataframe()
            if df.empty:
                return to_json([])
            if limit  0: 
                return to_json(df)
            else:
                return to_json(df.head(limit))
        except Exception as e:
            error_query_snippet = query[:100] + '...' if len(query) > 100 else query
            raise ValueError(f"クエリ実行中にエラーが発生しました (Query: '{error_query_snippet}'): {e}") from e

    def _validate_table_id(self, table_id: str) -> bool:
        """テーブルIDが 'project.dataset.table' 形式か簡易チェック"""
        
        pass

Agent設定とプロンプト:AIへの的確な指示書

次に、logic ファイル内で pydantic_ai.Agent を設定し、上で実装した BigQueryTool を利用可能なツールとして登録します。同時に、Agentの役割や思考プロセス、ツールの使い方を指示するプロンプトを与えます。

SQL生成

  • プロンプトのポイント
  1. 明確な役割と段階的思考

    • AIのペルソナを「ECサイト分析特化のSQLエキスパート」と定義。
    • 処理を「テーブル情報取得(Step1)」と「SQL生成(Step2)」の2段階に分け、複雑なタスクを分解しています。
  2. テーブル情報取得 (Step1) の重視

    • 最重要:利用可能テーブルの明示と詳細説明:

      • ECサイトの架空テーブル群(orders, users, productsなど)を具体的にリストアップ。
      • 各テーブルの概要、主要カラム、カラム値の意味(例: orders.status)、テーブル間結合キー、時系列カラムなどを詳細に記述。これにより、AIがどのテーブルを何のために使うべきか、根拠を持って判断できるようになります。 これがSQL生成の精度を左右する最も重要な情報となります。
    • ツールの利用強制: get_schema ツールによるカラム存在確認を必須とし、「存在しないカラム指定は厳禁」と警告。
    • 思考プロセスの構造化: table_info の出力形式(利用テーブル、結合条件、フィルター条件、利用カラムと各理由)を厳密に指定し、AIの判断根拠を明確化させています。
  3. SQL生成 (Step2) の厳格なルール

    • table_info を絶対的な入力とすることを強調。
    • CTEの利用、命名規則、日本語コメント、効率性、UNNEST、LIMIT検討など、具体的なコーディング規約を指示。
    • SQLベストプラクティスと出力SQLテンプレートを提示し、高品質で統一されたコード生成を促しています。
  4. 最終出力形式の指定

    • table_info と sql_query をキーとするJSON形式での出力を要求し、プログラムでの扱いを容易にしています。

これらの指示により、AIは提供されたテーブル情報を深く理解し、ツールを効果的に使いこなし、要求仕様に合致したSQLを生成するようになります。特に、利用可能なテーブルとその詳細を具体的に伝えることが、Text-to-SQLの成功率を大きく左右します。

sql_generation_logic.py の実装例

async def run_logic(context: AnalysisContext, feedback: Optional[str] = None, agent_messages: Optional[List[ModelMessage]] = None):
    
    bq_tool = BigQueryTool(...) 

    
    system_prompt = """
あなたはECサイト分析に特化したBigQueryのSQLエキスパートです。与えられた情報をもとに、ユーザーの質問に答えるための正確で効率的なSQLクエリを生成してください。

あなたは以下の2ステップで処理を行います。
step1: テーブル情報の取得(table_info)
step2: SQLの生成(sql_query)

# step1: テーブル情報の取得 (table_info)
このステップでは、ユーザーの質問を分析し、SQLクエリを生成するために必要な情報を整理します。

1.  **ユーザーの質問を分析し、必要なデータを特定する。**
    *   例:「過去3ヶ月間のアクティブユーザー数を知りたい」「特定の商品カテゴリで最もレビュー評価の高い商品は何か?」「キャンセル率が高い支払い方法は?」など。
    *   注意:ユーザーの質問が「売上分析」なのか「顧客行動分析」なのか「商品分析」なのか、大まかな分析の目的を特定してください。
    
2.  **適切なBigQueryテーブルを選択する。**
    *   下記の「利用可能なBigQueryテーブル」セクションを参照し、質問に答えるために最も適切なテーブルを1つ以上選択してください。
    *   選択した各テーブルについて、なぜそのテーブルが必要なのか理由を簡潔に説明してください。
    
3.  **テーブル同士の結合条件を特定する(必要な場合)。**
    *   複数のテーブルを使用する場合、それらをどのように結合するか(例: `orders.user_id = users.user_id`)を具体的に特定してください。
    *   **重要:不必要にテーブルの結合を提案しないでください。** 単一テーブルで回答できる場合は、その旨を明確にしてください。
    
4.  **テーブルのカラムを特定する。**
    *   選択したテーブルから、質問に答えるために必要なカラムを特定してください。
    *   **最重要:必ず `get_schema` ツールを使ってテーブルに存在するカラム名を確認し、指定してください。** テーブルに存在しないカラム名を指定することは厳禁です。
    *   特定した各カラムについて、なぜそのカラムが必要なのか、集計においてどのような役割を果たすのか(例: `total_amount` は売上集計のため、`prefecture` は地域別集計のグルーピングキーとして利用)を説明してください。
    
5.  **フィルター条件を特定する。**
    *   ユーザーの質問に含まれる条件(例: 「過去3ヶ月間」「特定の商品カテゴリ」「評価が4以上」など)を抽出し、どのテーブルのどのカラムに適用するかを具体的に特定してください。
    *   日付範囲のフィルターについては、`order_timestamp` や `created_at` などのタイムスタンプ型カラムを利用することを検討してください。

## 利用可能なBigQueryテーブル 

### 売上・注文関連
-   `your-project-id.ec_data.orders`
    -   ECサイトの注文ヘッダー情報が格納されたテーブル。注文ごとの基本的なデータ(注文ID、顧客ID、注文日時、合計金額、注文ステータスなど)が含まれます。
    -   `status` カラムの値:`pending` (処理中), `shipped` (発送済), `delivered` (配送完了), `cancelled` (キャンセル済)。
    -   注文日時は `order_timestamp` (TIMESTAMP型) を使用してください。
-   `your-project-id.ec_data.order_items`
    -   注文明細情報テーブル。どの注文でどの商品がいくつ、いくらで売れたかの詳細情報。
    -   `orders` テーブルとは `order_id` で結合可能です。

### ユーザー関連
-   `your-project-id.ec_data.users`
    -   顧客マスタテーブル。顧客ID、氏名、メールアドレス、登録日時、居住都道府県などの情報が含まれます。
    -   登録日時は `created_at` (TIMESTAMP型) を使用してください。
    -   `orders` テーブルとは `user_id` で結合可能です。

### 商品関連
-   `your-project-id.ec_data.products`
    -   商品マスタテーブル。商品ID、商品名、カテゴリ、現在の単価などの情報が含まれます。
    -   `order_items` テーブルとは `product_id` で結合可能です。
-   `your-project-id.ec_data.reviews`
    -   商品に対する顧客からのレビュー情報が格納されたテーブル。商品ID、顧客ID、評価点(1~5の整数)、コメント本文、レビュー投稿日時などが含まれます。
    -   `products` テーブルとは `product_id` で、`users` テーブルとは `user_id` で結合可能です。
    -   レビュー評価は `rating` カラムを参照してください。

### 行動ログ関連 (高度な分析用)
-   `your-project-id.ec_data.page_views`
    -   ウェブサイト内のページ閲覧ログ。セッションID、顧客ID(ログイン時)、閲覧ページパス、閲覧日時、参照元URLなどの情報が含まれます。
    -   コンバージョン分析やユーザー行動追跡に使用します。

### table_infoの出力形式の説明
以下の形式で、マークダウンで記述してください。
1.  **利用するテーブル**
    *   テーブル名1: 選定理由
    *   テーブル名2: 選定理由
2.  **テーブルの結合条件** (必要な場合のみ)
    *   `テーブルA.カラムX = テーブルB.カラムY` : 結合理由
3.  **テーブルのフィルター条件**
    *   テーブル名1.カラムA: 条件 (例: `> '2023-01-01'`) : 理由
4.  **利用するテーブル項目**
    *   テーブル名1.カラムA: 項目の役割、選定理由
    *   テーブル名1.カラムB: 項目の役割、選定理由
    *   集計関数(カラムC) AS 集計後カラム名: 項目の役割、選定理由

	**※ステップ2以降のSQL生成のために、この`table_info`は非常に重要です。正確かつ網羅的に記述してください。**
	**※上記の手順2(テーブル選択)から手順5(フィルター条件特定)までは、必要に応じて `get_schema` や `sample_data` ツールを積極的に利用して、テーブルやカラムの存在、データ型、具体的な値を確認しながら進めてください。**
	

# step2: SQLの生成 (sql_query)
このステップでは、step1で作成した`table_info`に基づいて、BigQueryで実行可能な標準SQLクエリを生成します。

以下の条件に厳密に従ってください:
※重要事項:step1で定義した`table_info`の内容と完全に一致するSQLを生成してください。`table_info`に記載のないテーブルやカラムを使用しないでください。

## 作業手順
1.  **BigQuery の標準SQL**を使用する。
2.  **CTE (Common Table Expression) を使用して、クエリを構造化する。**
    *   読みやすく、メンテナンスしやすいSQLを目指してください。
3.  **`import_テーブル名` プレフィックスのCTEでテーブルをインポートする。**
    *   例: `import_orders AS (SELECT ... FROM \`your-project-id.ec_data.orders\`)`
    *   この段階で、必要なカラム選択と基本的なフィルター(日付範囲など)を行ってください。
4.  **`logic_処理内容` プレフィックスのCTEでデータ加工や集計を行う。**
    *   例: `logic_user_aggregation AS (SELECT user_id, COUNT(DISTINCT order_id) AS order_count FROM import_orders GROUP BY user_id)`
    *   JOIN処理もこの段階で行うことが多いです。
5.  **各CTEの目的、主要な処理内容について、SQL内に詳細な日本語コメントを追加する。**
    *   なぜこの処理をしているのか、第三者にも理解できるように記述してください。
6.  **可能な限り効率的なクエリを作成する。**
    *   不要なJOINや複雑なサブクエリ、過度なウィンドウ関数の使用は避けてください。
    *   `SELECT *` は最終的な出力以外では極力避け、必要なカラムのみを選択してください。
7.  **ARRAY型やSTRUCT型のカラムの扱いには注意してください。**
    *   適切に`UNNEST`して処理してください(今回のECサイト例では該当するものは明示していませんが、一般的な注意点として)。
8.  **集計クエリで結果行数が少ないと明確に判断できる場合を除き、最終的なSELECT文に `LIMIT 100` などを適切に設定することを検討してください。**
    *   ただし、ユーザーが全件データを期待している場合は不要です。

## SQLのベストプラクティス
SQLを記述する際は、以下の書き方を強く推奨します。

### 書き方①: CTE(Common Table Expression)を強く推奨
(前回の回答で示した詳細なCTEの書き方説明)
...
### 書き方⑤: なぜこういう処理をしてるかという理由をクエリ内のコメントとして日本語で書こう

### sql_queryの出力形式の説明
生成するSQLクエリは、以下のテンプレートに従ってください。
```sql
-- ユーザーの質問: {ユーザーの質問}
-- 生成日時: {現在の日時}

-- Step 1: 必要なテーブルからのデータ抽出と前処理
WITH
import_orders AS (
    -- 注文テーブルから必要な情報を抽出
    SELECT
        order_id,
        user_id,
        order_timestamp,
        total_amount,
        status
    FROM
        `your-project-id.ec_data.orders`
    -- 必要に応じてWHERE句で初期フィルター
    -- WHERE order_timestamp >= 'YYYY-MM-DD'
),

import_users AS (
    -- 顧客テーブルから必要な情報を抽出
    SELECT
        user_id,
        prefecture
    FROM
        `your-project-id.ec_data.users`
),

-- Step 2: データ加工や集計ロジック
logic_calculate_sales_by_prefecture AS (
    -- 注文と顧客情報を結合し、都道府県別の売上を集計
    SELECT
        u.prefecture,
        SUM(o.total_amount) AS total_sales_amount,
        COUNT(DISTINCT o.order_id) AS total_order_count
    FROM
        import_orders AS o
    INNER JOIN
        import_users AS u ON o.user_id = u.user_id
    WHERE
        o.status="delivered" -- 例: 配送完了したもののみを集計
    GROUP BY
        u.prefecture
),

-- Step 3: 最終結果の整形 (この例では不要なので省略されることも多い)
final_result AS (
    SELECT
        prefecture,
        total_sales_amount,
        total_order_count
    FROM
        logic_calculate_sales_by_prefecture
    ORDER BY
        total_sales_amount DESC
)

-- 最終的なクエリ実行
SELECT
    *
FROM
    final_result
-- LIMIT 100 -- 必要に応じて
;

# step3:
table_infoとsql_queryをキーとしたJSON形式で、ステップ1とステップ2の結果をそれぞれ出力してください。
{
    "table_info": "...",
    "sql_query": "..."
}

"""


sql_agent = Agent(
    model=context.model_type,
    result_type=SQLGenerationResult, 
    tools=[ 
        Tool(wrap_error(bq_tool.get_schema)),
        Tool(wrap_error(bq_tool.sample_data)),
        Tool(wrap_error(bq_tool.query)),
    ],
    system_prompt=system_prompt 
)


user_input = f"""ユーザーの質問: {context.query}"""


result = await sql_agent.run(user_input, message_history=agent_messages)

return result

SQLチェック
sql_check_node_logic.py のプロンプトとAgent設定例:
SQLチェックAgentには、レビューの観点と、構文チェックのために query ツールを最初に使うよう指示します。



from pydantic_ai import Agent, Tool
from ..tools.bigquery_tool import BigQueryTool
from .utils import wrap_error


async def run_logic(context: AnalysisContext):
    
    bq_tool = BigQueryTool(...) 

    
    system_prompt = """あなたはECサイト分析に詳しいBigQueryのSQLレビュアーです。...

与えられたSQLクエリを分析し、以下の観点でレビューしてください:

1. 最初にSQLをLIMIT10件で実行して構文エラーがないかをチェックする。 (注意!:必ず最初にチェックしてください。ここでエラーになったら後続のチェックは不要です。)エラーなったらSQLを修正してください。
2. 抽出仕様とSQLが整合しているかをチェックする。
  - 抽出条件が満たされているか? 
  - 抽出項目が適切か? 
  - 仕様に記載されていないロジックが含まれていないか?
3. パフォーマンス上の問題がないか
4. セキュリティ上の問題がないか
5. ベストプラクティスからの逸脱がないか

レビューにおいては、ThinkToolを頻繁に利用してください。

以下の形式でJSONを出力してください(必ず日本語で):
json
{
  "is_valid": true or false,
  "issues": [
    {
      "severity": "error|warning|info",
      "description": "問題の詳細な説明",
      "line": "問題のある行番号または行の内容",
      "suggestion": "修正案",
      
    }
  ],
  "overall_feedback": "全体的なフィードバック"
}
"""



sql_check_agent = Agent(
    model=context.model_type,
    result_type=SQLCheckResult, 
    tools=[ 
        Tool(wrap_error(bq_tool.query)),
    ],
    system_prompt=system_prompt 
)


result = await sql_check_agent.run(user_input, model_settings={'temperature': 0.0})

return result

上記の sql_check_node_logic.py では、AIエージェントが BigQueryTool を使って構文チェックを行い、プロンプトに記述された観点に基づいてレビューを行います。これにより、基本的なエラーや明らかな問題点は自動的に検出できます。

しかし、より高度で微妙な問題点、例えば特定のドメイン知識に基づくロジックの妥当性や、チーム内で暗黙的に共有されているコーディング規約への準拠などは、AIだけでは判断が難しい場合があります。

ここで考えられるレビュー品質向上のための一つのアイデアとして、過去の人間によるレビュー結果をツール化し、AIエージェントが参照できるようにするというアプローチがあります。

例えば、以下のような仕組みを想像してみてください。

  1. レビュー知識ベースの構築:

    • GitHubのプルリクエストなどで行われた、過去のSQLに対する人間(経験豊富なエンジニアやアナリスト)のレビューコメントを収集・構造化します。
    • 「特定の関数はパフォーマンスに影響があるため避けるべき」「このケースではJOINの順番を逆にすると効率が良い」「このビジネスロジックではNULLの扱いをこうすべき」といった具体的な指摘や改善案を、問題のあるSQLパターンと共にデータベース化します。
  2. レビュー知識参照ツールの開発:

    • AIエージェントが、現在レビュー中のSQLに類似した過去のレビュー事例や指摘事項を、この知識ベースから検索できるようなツール(HumanReviewKnowledgeTool)を開発します。
    • このツールは、SQLの構造や使われている関数、テーブル名などをキーとして、関連性の高い過去のレビューコメントを抽出してAIに提供します。
  3. AIエージェントによる参照と判断:

    • SQLチェックエージェントは、自身のルールベースのチェックに加えて、この HumanReviewKnowledgeTool を利用します。
    • 「このSQLパターンは過去に〇〇という指摘があったな。今回のケースにも当てはまるだろうか?」といった形で、人間の知見を参考にしながら、より深いレベルでのレビューを行うことが期待できます。

このような仕組みを導入することで、単にAIがルールに従ってチェックするだけでなく、組織内に蓄積された**「生きたレビューのノウハウ」**をAIが活用できるようになり、レビューの網羅性や品質が向上する可能性があります。もちろん、知識ベースの構築とメンテナンス、適切な類似度検索ロジックの開発など、実現には相応の工数を要しますが、長期的には非常に強力な資産となるでしょう。このように、各ステップで必要なツールと、その使い方を指示するプロンプトを組み合わせることで、AIエージェントに特定のタスクを遂行させます。

成果とこれから〜広がる可能性

このAIエージェントワークフローによるText-to-SQLシステムは、まだ開発途上ではあるものの、以下のような成果と可能性を示しています。

  • 工数削減: 定型的なデータ抽出依頼に対応する時間を削減。
  • 分析の民主化: SQLスキルがないメンバーでも、より複雑なデータ要求が可能に。
  • 属人化の解消: 特定のエンジニアに依頼が集中する状況を緩和。

もちろん課題もあります。非常に曖昧な質問や、これまで分析されたことのない複雑な要求に対しては、まだ完璧に対応できるわけではありません。プロンプトのチューニングは継続的な試行錯誤が必要です。

今後は、以下のような改善を進めていきたいと考えています。

  • UI/UXの改善: よく使われる分析パターン(例: 「期間別売上推移」「商品カテゴリ別顧客リスト」)をメニューから選択できるようにし、ユーザーの利便性とAIへの指示の明確性を両立させる。
  • ワークフローの拡張: より高度な分析要求に応えるため、複数の分析ステップを組み合わせたワークフロー(例: 「特定セグメントの顧客の行動履歴を分析し、離反予測モデルのインプットデータを作成する」)を構築する。
  • BigQuery機能の活用: BigQueryのInformation SchemaやData Catalog (Google Cloud Data Catalog) のメタデータを BigQueryTool から活用し、AIによるテーブル・カラム特定能力をさらに強化する。

まとめ

AIエージェントとワークフローを活用したText-to-SQLは、データ抽出・分析業務の効率化と民主化を進めるための、非常に有望なアプローチです。

その成功の鍵は、単に強力なLLMを使うだけでなく、以下の3点にあると私たちは考えています。

  1. 人間の思考プロセスを模倣した、堅牢なワークフローの設計 (pydantic-graph と node_defs.py の活用)
  2. 各ステップに特化した役割のAIエージェントへの付与 (各 logic/*.py でのプロンプトとAgent設定)
  3. AI自身による外部情報(特にデータベーススキーマやデータ)の確認と、それに基づく判断を可能にする「ツール」の提供と適切な認識 (BigQueryTool の実装とAgentへの登録)

この取り組みを通じて、データに関わる誰もが専門知識のレベルを問わず、より迅速かつ容易に必要な情報へアクセスし、データに基づいたより良い意思決定を行えるようになることを目指しています。今後の進化にご期待ください。

最後に:共に未来のデータ活用を切り拓く仲間を募集しています!

本記事でご紹介したような、AIエージェントとワークフローを活用したText-to-SQLシステムの開発は、私たちが目指す「データ活用の民主化」に向けた取り組みの一例です。Ubieでは、このような先進的な技術を積極的に取り入れ、データドリブンな意思決定をさらに加速させようとしています。

2025年、UbieのBIチームは生成AI分野に大きく舵を切ります。

生成AIがデータ利活用の各業務プロセスに革命的な変化をもたらす大きな可能性を確信し、この分野に最大限リソースを投入していくことを決定しました。データエンジニアと密に連携し、本記事で触れたようなシステムのさらなる高度化はもちろん、データ抽出、分析、レポーティングといったあらゆる業務プロセスの自動化・効率化を推進していきます。

また、各種生成AIツールを積極的に導入・検証し、その活用ノウハウを組織の力に変えるとともに、本当に人が集中すべきクリエイティブな業務は何かを再定義していきたいと考えています。

私たちと一緒に、データとAIの力で医療の未来をより良くしていく挑戦に加わりませんか?

Ubieでは、本記事のような取り組みをさらに発展させ、業界をリードするようなデータ活用基盤を構築していく仲間を積極的に募集しています。データアナリスト、アナリティクスエンジニア、、そしてAI技術に関心のある全てのデータプロフェッショナルの皆さん、ぜひ一度カジュアルにお話ししましょう!

ご興味をお持ちいただけた方は、ぜひ弊社の採用ページをご覧ください。

https://herp.careers/v1/ubiehr/jOXYtKIKkonz



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -

インモビ転職