AQ Tech Blog

自然言語からAmazon Athenaへ――このサンプルが実装するText-to-SQLの経路

作成者: sora.matsumoto|2026年04月08日

想定読者:AWS(Lambda・API Gateway・Athena)に触れたことのあるバックエンド/クラウドエンジニア
スコープ:チャットUIからAmazon Bedrock上のStrandsエージェント経由でSQLを組み立て・実行し、結果をWebSocketで返すまで。 エンティティ解決やセグメンテーション本体は本稿では扱いません(末尾に一言だけ)。

本稿の内容は、2026年3月時点当該リポジトリの実装に基づきます。デプロイ構成や環境変数の詳細は変更する可能性があります。

はじめに

Text-to-SQLとは、ユーザーの自然言語の質問を、データベースに対して実行可能なSQLに変換する仕組みの総称です。 多くの説明では、単一のHTTP APIが自然言語を受け取り、SQL文字列を一度返す構成が典型として語られがちです。

本リポジトリは、Customer 360(C360)関連のサンプルです。 ここでのText-to-SQLは、前段のような単一APIがSQL文字列だけを返す構成とは少し違う形で実装されています。 コードを追うと、次の三つが設計の芯になります。

  1. ツール呼び出し型エージェント(Strands[エージェント構築用フレームワーク]のAgentとAmazon Bedrock)が、会話とスキーマ情報を踏まえてSQLを組み立てる。
  2. 実際のクエリ実行は大規模言語モデル(LLM)が直接行わず、登録されたexecute_sql_queryツールAmazon AthenaのAPIを呼ぶ(責務の境界を明確にする)。
  3. フロントとはWebSocketでやり取りし、重い処理は別Lambdaを非同期起動してタイムアウトと責務を分離する。

読了後には、メッセージがフロントからAthenaの結果までどう流れるかを、コンポーネント単位で説明できるようになることを目指しています。 以下、コード上の経路に沿って追います。 なお、コード引用は長さの都合で...による省略を含みます。

全体像:誰が誰を呼ぶか

利用している主なAWSコンポーネントは次のとおりです。

役割 コンポーネント
常時接続のチャット転送 API Gateway(WebSocket)
接続・メッセージ受付 Lambda websocket_handler
エージェント実行・Athena呼び出し Lambda agent_processor
推論 Amazon Bedrock(StrandsのBedrockModel
SQL実行 Amazon Athena
会話・接続IDの保存 Amazon DynamoDB(セッションテーブル)

1. ユーザーの発話がどう送られるか(フロント)

ブラウザ側では、useChatがチャット用ペイロードをWebSocketで送ります。 type: "chat"message(本文)、session_idがセットされます。 セッションがまだない場合はクライアントでsession-プレフィックス付きのIDを生成します。

      const chatPayload = {
        type: 'chat',
        message: content,
        session_id: currentSessionId
      };
      await sendData(chatPayload);

接続自体はuseWebSocketで行い、CognitoのIDトークンをクエリに付与し、セッションに応じてsession_idをURLに載せます(接続時にバックエンドへセッションと紐づけられるようにするため)。

設計上の要点:同期RESTの「1リクエスト1レスポンス」ではなく、チャット向けにWebSocketとセッションIDで状態を区切っていることです。

2. API Gatewayと受付:ackと非同期Lambda

WebSocketのルートでmessage_type == "chat"になるとhandle_chatが動きます。 ここで重要なのは、すぐに別Lambdaを非同期(InvocationType: "Event")で起動することです。

        # Send acknowledgment to the client
        api_gateway_endpoint = get_api_endpoint(event)
        send_to_connection(
            connection_id,
            {"type": "ack", "message": "Message received, processing...", "user_id": user_id, "session_id": session_id},
            api_gateway_endpoint,
        )

        # Invoke the agent processor Lambda asynchronously
        lambda_client.invoke(
            FunctionName=AGENT_PROCESSOR_FUNCTION_NAME,
            InvocationType="Event",
            Payload=json.dumps(
                {
                    "connection_id": connection_id,
                    "user_id": user_id,
                    "session_id": session_id,
                    "message": user_input,
                    "api_gateway_endpoint": api_gateway_endpoint,
                }
            ),
        )

なぜ分離するか:Bedrockの推論やAthenaの完了待ちは数秒から数十秒かかり得ます。 それをWebSocketハンドラLambdaの一実行に閉じ込めると、タイムアウト同時実行の設計が厳しくなります。 受付だけ早く返し、重処理はagent_processorに逃がす、という典型的な分割です。

3. Bedrockは「SQLを返すAPI」ではなくエージェントの一部

agent_processorでは、StrandsのBedrockModelAgentを使います。 モデルIDは環境変数BEDROCK_MODEL_IDにセットします。

bedrock_model = BedrockModel(
    model_id=BEDROCK_MODEL_ID, temperature=0.0, boto_session=boto3.Session(region_name="us-west-2")
)

システムプロンプトは、固定のAGENT_INSTRUCTION(役割・手順・Athenaの注意事項)に加え、当日の日付AWS Glueから取得したテーブル情報を連結したenhanced_system_promptです。

        enhanced_system_prompt = (
            f"{AGENT_INSTRUCTION}\n\nCURRENT DATE:\nToday's date: {current_date}\n\nAVAILABLE DATABASE INFORMATION:\n{table_information}"
        )

        # Build tools list based on available services
        tools = [execute_sql_query, create_downloadable_url]
        if USE_PERSONALIZE:
            tools.extend([create_personalize_item_based_segment, check_personalize_segment_status])

        # Create the agent with conditional tools and conversation history
        agent = Agent(
            model=bedrock_model,
            tools=tools,
            system_prompt=enhanced_system_prompt,
            messages=agent_messages,
        )

AGENT_INSTRUCTIONでは、自然言語理解からSQL化、execute_sql_query実行、結果の提示までの手順が明示されています。 また、Athenaではunix_timestamp()が使えないため、from_unixtime()current_timestampを使うようモデル向けの制約も書かれています(プロンプトと実行環境の両方でのガード)。

まとめ:Text-to-SQLは「一発のcompletion」ではなく、カタログを埋め込んだシステムプロンプトとツールで実行検証するエージェントとして実装されています。

4. SQLへの到達:execute_sql_queryツールとAthena

実際のクエリは@toolデコレータ付きのexecute_sql_queryが実行します。 athena.start_query_executionから完了待ち、結果取得までを担います。

@tool
def execute_sql_query(sql_query: str) -> str:
    """
    Execute a SQL query on Amazon Athena.
    ...
    """
    try:
        logger.info(sql_query)
        response = athena.start_query_execution(
            QueryString=sql_query,
            QueryExecutionContext={"Database": ATHENA_DATABASE},
            ResultConfiguration={"OutputLocation": ATHENA_OUTPUT_LOCATION},
            WorkGroup=ATHENA_WORKGROUP,
        )

        query_execution_id = response["QueryExecutionId"]
        # Wait for the query to complete
        query_status = wait_for_query_completion(query_execution_id)
        ...
        if query_status == "SUCCEEDED":
            # Get the query results
            results = get_query_results(query_execution_id)
            formatted_results = format_query_results(sql_query, results)

            # Add query_execution_id to the response
            return f"{formatted_results}\n\nQuery Execution ID: {query_execution_id}"

境界の話:LLMは任意のSQLをその場で実行するのではなく、アプリが提供したツールを通じてだけAthenaに触れます。 権限・監査・結果整形をここに集約しやすい、というのが実務的なメリットです。

5. 結果が大きいとき:しきい値とダウンロード

SQL_RESULT_THRESHOLD(既定値300)を超える行数の場合、プレビューのみを返し、全文はcreate_downloadable_url(別ツール)で取得する流れになります。 チャットに巨大な結果表をそのまま載せないための制約です。

取得時点ではget_query_resultsMaxResults: SQL_RESULT_THRESHOLD + 1で打ち切っているため、しきい値ちょうどのときに「本当はもっとある」可能性を検知できます。 format_query_resultsでは行数がしきい値以上なら、プレビュー用に先頭数十行だけを整形し、エージェントにはCSVダウンロードを促す文言を返すよう分岐します。

    return athena.get_query_results(QueryExecutionId=query_execution_id, MaxResults=SQL_RESULT_THRESHOLD + 1)


def format_query_results(sql_query: str, results: Dict[str, Any]) -> str:
    """
    Format Athena query results into a readable string.
    If the number of rows exceeds SQL_RESULT_THRESHOLD, suggest downloading the results as CSV.
    ...
    """
    try:
        rows = results["ResultSet"]["Rows"]
        ...
        row_count = len(rows) - 1  # Subtract 1 for the header row
        ...
        if row_count >= SQL_RESULT_THRESHOLD:
            output.append(f"Results: more than {row_count} rows returned (exceeds the threshold)")
            output.append(
                "\nThe result set is too large as a response to an Agent. Please explain using the preview and urge client to download the full results as CSV"
            )
            output.append("\nShowing first few rows as preview:")
            ...
            preview_rows = min(20, row_count)  # Show at most 20 rows as preview

6. ユーザーへの返却:WebSocketと会話履歴

処理開始時にtype: "processing"を送り、完了時にtype: "response"response(エージェントの最終文字列)とconversation_history(フロント表示用にフィルタ済み)を送ります。

        # Send processing message to client
        send_to_connection(connection_id, {"type": "processing", "message": "Processing your request..."}, api_gateway_endpoint)
        ...
        agent_response = agent(user_input)

        # Save the updated messages directly from the agent
        save_conversation_history(user_id, session_id, agent.messages)

        # Filter messages for response
        conversation_history = filter_messages_for_response(agent.messages)

        # 最新のconnection_idを取得(接続が切れて再接続した場合に備えて)
        current_connection_id = get_active_connection_id(user_id, session_id) or connection_id

        # Send the response to the client
        try:
            send_to_connection(
                current_connection_id,
                {
                    "type": "response",
                    "user_id": user_id,
                    "session_id": session_id,
                    "response": str(agent_response),
                    "conversation_history": conversation_history,
                },
                api_gateway_endpoint,
            )

会話本文はDynamoDBのSESSION_TABLEmessagesとして保存されます。 接続が切れても、同じuser_idsession_idで再接続すればconnection_idを更新し、最新のWebSocket接続へpushする設計です(set_session_connectionget_active_connection_id)。

フロントではtype === "response"のときconversation_historyをそのままsetMessagesし、UIを一貫した履歴表示に更新します。

    if (chatMessage.type === 'response') {
      if (chatMessage.conversation_history) {
        setMessages(chatMessage.conversation_history);
      }

ローディング相当の表示は、useChat内で最後のメッセージのroleuserかどうかで推定しています。 ackやprocessingで状態フラグを増やさず、履歴の形だけでUIを揃える簡素なパターンです。

  // 最後に発言したのがユーザーかどうかでローディング状態を判断
  const isLoading = messages.length > 0 && messages[messages.length - 1].role === 'user';

7. エンジニア向けの振り返り(この実装に即したトレードオフ)

  • スキーマの鮮度とサイズ:実行ごとにGlue由来のテーブル情報をプロンプトに載せるため、カタログが大きいとトークン数とレイテンシが増える。 逆に、常に最新スキーマを意識しやすい。
  • 安全性:実行経路がツールに限定される一方、生成SQLの妥当性はプロンプトとモデル任せです。 本番ではVPC・ワークグループ・IAM・結果のマスキングなどの検討が別途必要です。
  • 体感速度:WebSocketのackとprocessingで「受け付けた」ことはすぐ伝えられる一方、実際の完了は非同期Lambda側にあります。 上記のとおりフロントは履歴ベースでローディングを推定するため、エラー時のメッセージ追加などとの整合に注意が必要です。

付録:同じリポジトリの「別のBedrock」と本題の関係

  • Amazon Personalize連携が有効な場合、agent_processorのツール一覧にセグメント関連のツールが追加されます(USE_PERSONALIZEが真のとき)。 本稿の主線であるAthena向けText-to-SQLとは独立したオプションです。
  • csvtoolなどでは、CSVからGlueカタログを作る別フローでBedrockを使うコードがあります。 チャットのText-to-SQLとは目的が異なるので、混同しないのがよいでしょう。
  • エンティティ解決は別スタック/別Lambdaの世界であり、チャット経路の説明からは切り離してよい範囲です。

まとめ

このサンプルにおけるText-to-SQLは、自然言語から(Bedrock上のStrandsエージェントへ)、ツールexecute_sql_query経由でAmazon Athenaへという一本のパイプラインです。 その前後をWebSocketと非同期Lambdaで支え、DynamoDBで会話と接続を管理しています。 「SQLを一発生成するだけ」ではなく、実行と結果制御まで含めた対話型アシスタントとして読み解くのが、コードを追ううえでの近道になります。

参考(実装の所在)

  • サンプル全体:aws-samples/sample-c360-text2sql-segmentation-entityresolution
  • バックエンド構成と環境変数の配線:リポジトリ内の[lib/webbackend.ts](../lib/webbackend.ts)
  • WebSocket受付:lambda/webbackend/websocket_handler.py
  • エージェントとAthena実行:lambda/webbackend/agent_processor.py
  • フロントの送受信:frontend/src/hooks/useChat.tsおよびuseWebSocket

公式ドキュメントを併読する場合は、Amazon BedrockAmazon AthenaAPI GatewayのWebSocket APIなどを参照してください(仕様・画面は更新されるため、必要に応じて最新版を確認してください)。

本稿はリポジトリ内の実装に基づく技術説明です。