【CDK】Athena + LambdaでGlueのDBを構築する #AWS - Qiita

本記事では、DynamoDBのTTLで削除されるデータをS3にアーカイブし、そのデータをAthena + Lambdaで分析する仕組みをAWS CDKで実装した過程を紹介します。

マネジメントコンソールからAthenaを使うのは簡単ですが、CDKで再現しようとすると「抽象化された便利さ」を手動で構築する必要があり、想像以上に学びが多い体験となりました。

なぜAthenaを使おうと思ったのか?

もともとの目的は、 「アーカイブされたS3データから、先月の利用回数を取得する」 ことでした。

image.png

DynamoDBはストレージのコストが地味に高く、古いデータを残し続けるのは非現実的です。
そこで、

  • 利用履歴はDynamoDBで保持
  • TTL(3ヶ月)で削除
  • 削除前にS3にアーカイブ(JSONL形式)
  • そのデータをAthenaで分析

という流れに落ち着きました。

Athenaの概要とメリット

Athenaは、S3に保存された構造化データ(JSON, CSV, Parquetなど)に対してSQLでクエリができるサービスです。

  • スキーマ(テーブル)を定義
  • クエリ結果はS3に保存
  • パーティション分割で高速化・コスト削減も可能
  • 軽い分析であれば、ほとんどお金がかからない

サーバレスで、インフラ構築不要、使った分だけ課金されます(1クエリあたり、スキャンしたデータ1TBあたり約5ドル)。

たとえばこんな場面で便利

  • 分析のためだけにRDSやRedshiftを用意したくない
  • バッチで吐き出したログを軽く集計したい
  • LambdaやDynamoDBと連携してイベント駆動でレポートを出したい

分析頻度が低く、専用のDBを用意するほどでもない場合に最適です。

Athenaの内部構造と関連リソース

Athenaを使うには以下の3つが絡んできます:
image.png

  1. Glue Data Catalog(テーブル定義)
  2. 参照元S3(クエリ対象のデータ置き場)
  3. 出力先S3(クエリ結果を保存)

Athena自体にはテーブルの中身を保存する機能がなく、GlueとS3を橋渡しするイメージです。

具体例

保存されているデータ形式(JSON Lines)

{"userId": "abc123", "action": "click", "timestamp": "2025-03-15T12:34:56Z"}
{"userId": "xyz789", "action": "purchase", "timestamp": "2025-03-16T14:12:03Z"}

S3バケット(データ本体)

s3://my-archive-bucket/logs/year=2025/month=03/data.jsonl

Glue Data Catalog(テーブル定義)

CREATE EXTERNAL TABLE logs (
  userId string,
  action string,
  timestamp string
)
PARTITIONED BY (
  year string,
  month string
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-archive-bucket/logs/';

AthenaでS3からSQLでデータを抽出するには、マネコンでもCDKでもCreateTableする必要があります。

内部的にはSQLクエリがGlueのテーブル作成APIに変換されて、
Glueというサービスにテーブル定義(データ参照先S3のデータ構造を定義)をしている状態になります。

クエリ実行の流れ(非同期)

Athenaのクエリ処理は以下のような順番で進みます:

① Glueテーブル定義(Create Table)

事前にテーブル構造とS3のロケーションを定義します。

② パーティションの追加(MSCK REPAIR TABLE)

AthenaではS3のディレクトリ階層(year=…, month=…)をWhere条件のように使うため、パーティション情報の登録が必要です。
Glue側にパーティテーションを定義するだけではダメで、一度コマンドを実行させて、実際の参照先S3のディレクトリ構造とテーブル定義を紐づけるコマンドが必要となります。

Athenaでは以下のような非同期な流れでクエリが実行されます:

これで、以下のS3パスがAthenaのパーティションとして認識されるようになります

s3://my-archive-bucket/logs/year=2025/month=03/

これで下記でいうwhere区の部分をGlueが実際のS3の構造と結びつけて認識できるようになる、ということです。

SELECT action, COUNT(*) FROM logs
WHERE year='2025' AND month='03'
GROUP BY action;

③ クエリの非同期実行

Athenaは非同期APIなので、クエリ発行から結果取得まで以下の3ステップが必要です。

Lambdaなどで実行する場合の流れ:

  1. StartQueryExecution — クエリ実行開始
  2. GetQueryExecution — ステータス確認(SUCCEEDED or FAILED)
  3. GetQueryResults — 結果取得

SQLに慣れている人だと直ぐに結果が帰っていこないので、あせるかもしれません。
Athenaの場合、上記のように流れがあり、同期的ではなく、上記3ステップを自身で制御しないといけません。

SELECT action, COUNT(*) FROM logs
WHERE year='2025' AND month='03'
GROUP BY action;

このクエリで、先月のログからアクション別件数を集計できます。

実装

サンプルコードです。

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { AthenaGlueConstruct } from './athena-glue-construct';
import { ArchiveDataToS3Construct } from './archive-data-to-s3-construct';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';

interface MainStackProps extends cdk.StackProps {
  projectName: string;
  envName: 'dev' | 'stg' | 'prd';
}

export class MainStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: MainStackProps) {
    super(scope, id, props);

    // Glue定義(Database, Table, S3)
    const athenaGlue = new AthenaGlueConstruct(this, 'AthenaGlue', {
      projectName: props.projectName,
      envName: props.envName,
      env: {
        account: this.account,
        region: this.region,
      },
    });

    // DynamoDBテーブル(ここはサンプル用に適当なTableを作ってる)
    const UserChatHistories = new dynamodb.Table(this, 'NintAiUserChatHistories', {
      partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
    });


    // バッチ処理(Lambda, EventBridge)
    new ArchiveDataToS3Construct(this, 'ArchiveDataToS3', {
      projectName: props.projectName,
      envName: props.envName,
      env: {
        account: this.account,
        region: this.region,
      },
      userChatHistories: UserChatHistories,
      archiveDataToS3Bucket: athenaGlue.archiveDataToS3Bucket,
      archiveDatabase: athenaGlue.archiveDatabase,
    });
  }
}

athena-glue-construct.ts

export class SimpleAthenaGlueSample extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    //出力先S3(Athenaのクエリ結果を保存するバケット)
    const archiveAnalyticsExportBucket = new s3.Bucket(this, 'ArchiveAnalyticsExportBucket', {
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      encryption: s3.BucketEncryption.S3_MANAGED,
      autoDeleteObjects: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      objectOwnership: s3.ObjectOwnership.OBJECT_WRITER,
      enforceSSL: true,
    });

    // Glue Database
    const database = new glue.CfnDatabase(this, 'SampleDatabase', {
      catalogId: cdk.Stack.of(this).account,
      databaseInput: { name: 'sample_database' },
    });

    // Glue Table
    new glue.CfnTable(this, 'SampleTable', {
      catalogId: cdk.Stack.of(this).account,
      databaseName: database.databaseInput.name!,
      tableInput: {
        name: 'sample_table',
        tableType: 'EXTERNAL_TABLE',
        parameters: { classification: 'json' },
        storageDescriptor: {
          columns: [
            { name: 'userId', type: 'string' },
            { name: 'action', type: 'string' },
            { name: 'timestamp', type: 'string' },
          ],
          location: `s3://${bucket.bucketName}/sample-data/`,
          inputFormat: 'org.apache.hadoop.mapred.TextInputFormat',
          outputFormat: 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat',
          serdeInfo: {
            serializationLibrary: 'org.openx.data.jsonserde.JsonSerDe',
          },
        },
      },
    });
  }
}

archive-data-to-s3-construct.ts

import * as cdk from 'aws-cdk-lib';
import { Duration } from 'aws-cdk-lib';
import { aws_iam as iam, aws_lambda as lambda, aws_s3 as s3, aws_events as events, aws_events_targets as targets, aws_logs as logs } from 'aws-cdk-lib';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import { Construct } from 'constructs';

interface SimpleArchiveDataToS3Props extends cdk.StackProps {
  projectName: string;
  envName: 'dev' | 'stg' | 'prd';
  env: cdk.Environment;
  userChatHistories: cdk.aws_dynamodb.ITable;
  archiveDataToS3Bucket: cdk.aws_s3.Bucket;
  archiveDatabase: cdk.aws_glue.CfnDatabase;
}


export class SimpleArchiveDataToS3 extends Construct {
  constructor(scope: Construct, id: string, props: SimpleArchiveDataToS3Props) {
    super(scope, id);

    // --- 参照先S3(データが置かれるバケット)
    const archiveDataToS3Bucket = new s3.Bucket(this, 'ArchiveDataToS3Bucket', {
      blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL,
      encryption: s3.BucketEncryption.S3_MANAGED,
      autoDeleteObjects: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
      objectOwnership: s3.ObjectOwnership.OBJECT_WRITER,
      enforceSSL: true,
    });



    // --- Lambda
    const archiveDataToS3Function = new NodejsFunction(this, 'ArchiveDataToS3Function', {
      runtime: lambda.Runtime.NODEJS_22_X,
      entry: './lambda/archive-data-to-s3.ts',
      timeout: Duration.minutes(5),
      memorySize: 256,
      environment: {
        ARCHIVE_DATA_TO_S3_BUCKET_NAME: archiveDataToS3Bucket.bucketName,
        ATHENA_OUTPUT_BUCKET: archiveAnalyticsExportBucket.bucketName,
        ARCHIVE_DATABASE_NAME: props.glueDatabaseName,
      },
      logGroup: new logs.LogGroup(this, 'ArchiveDataToS3LogGroup', {
        retention: logs.RetentionDays.THREE_DAYS,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
      }),
    });

    // --- IAM設定

    // DynamoDBアクセス許可
    archiveDataToS3Function.addToRolePolicy(new iam.PolicyStatement({
      actions: ['dynamodb:Scan', 'dynamodb:Query'],
      resources: props.userChatHistories,
    }));

    // LambdaにS3参照・書き込みを許可(参照先)
    archiveDataToS3Function.addToRolePolicy(new iam.PolicyStatement({
      actions: ['s3:GetObject', 's3:PutObject', 's3:ListBucket', 's3:GetBucketLocation'],
      resources: [
        archiveDataToS3Bucket.bucketArn,
        archiveDataToS3Bucket.arnForObjects('*'),
      ],
    }));

    // LambdaにS3参照・書き込みを許可(出力先)
    archiveDataToS3Function.addToRolePolicy(new iam.PolicyStatement({
      actions: ['s3:PutObject', 's3:GetBucketLocation'],
      resources: [
        archiveAnalyticsExportBucket.bucketArn,
        archiveAnalyticsExportBucket.arnForObjects('*'),
      ],
    }));

    // LambdaにAthenaアクセス許可
    archiveDataToS3Function.addToRolePolicy(new iam.PolicyStatement({
      actions: [
        'athena:StartQueryExecution',
        'athena:GetQueryExecution',
        'athena:GetQueryResults',
      ],
      resources: ['*'],
    }));

    // LambdaにGlueアクセス許可
    archiveDataToS3Function.addToRolePolicy(new iam.PolicyStatement({
      actions: [
          'glue:GetDatabase',
          'glue:GetDatabases',
          'glue:GetTable',
          'glue:GetTables',
          'glue:UpdateTable',
          'glue:CreateTable',
          'glue:BatchCreatePartition',
          'glue:BatchDeletePartition',
          'glue:GetPartition',
          'glue:GetPartitions',
          'glue:UpdatePartition',
          'glue:BatchGetPartition',
      ],
      resources: [
        `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:catalog`,
        `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:database/${props.glueDatabaseName}`,
        `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:table/${props.glueDatabaseName}/*`,
      ],
    }));

    // AthenaにS3アクセス権限を付与(出力先)
    prop.archiveAnalyticsExportBucket.addToResourcePolicy(new iam.PolicyStatement({
      principals: [new iam.ServicePrincipal('athena.amazonaws.com')],
      actions: ['s3:GetObject', 's3:PutObject', 's3:GetBucketLocation'],
      resources: [
        archiveAnalyticsExportBucket.bucketArn,
        archiveAnalyticsExportBucket.arnForObjects('*'),
      ],
    }));

    // AthenaにS3アクセス権限を付与(参照先)
    archiveDataToS3Bucket.addToResourcePolicy(new iam.PolicyStatement({
      principals: [new iam.ServicePrincipal('athena.amazonaws.com')],
      actions: ['s3:GetObject', 's3:GetBucketLocation', 's3:ListBucket'],
      resources: [
        archiveDataToS3Bucket.bucketArn,
        archiveDataToS3Bucket.arnForObjects('*'),
      ],
    }));

    // --- EventBridgeスケジュール
    new events.Rule(this, 'ArchiveDataToS3ScheduleRule', {
      schedule: events.Schedule.cron({ minute: '0', hour: '18', day: '1', month: '*', year: '*' }),
      targets: [new targets.LambdaFunction(archiveDataToS3Function)],
    });
  }
}

よかったこと

ほとんどお金がかからない

Athenaは軽量なデータや少ないリクエスト回数であればかなりコストやすい
Glueは100万オブジェクトの定義、100万リクエストまで無料なので、コストが殆どかからないことです。

気になったところ・注意すべきところ

IAMがややこしい

Athena・Glue・S3すべてに対してLambdaからアクセスできるようにするIAM設計が必要です。
CDKでも記述量が多く、最初は少し混乱しました。

例えば、作成したラムダとS3にはこのような権限を与えています。

  • Lambda
    ・参照先のS3
    ・出力先のS3
    ・Athena(クエリ)
    ・Glue(DB/テーブル)
  • S3
    ・参照先:Lambdaリソースからのアクセス
    ・出力先:Lambdaリソースからのアクセス

これだけ必要で結構煩雑になってしまいます。
一番厄介なのがGlueで、LambdaからAthenaクエリを実行するとき、Athenaがクエリを内部的にGlueのコマンドに変えているので、どのようなコマンドを投げているのかわかりません。
ただ、LambdaからAthenaを利用している以上、実行権限が必要なので、IAMが煩雑になってしまいます。

必要なポリシーだけつけたいのですが、また厄介なのが権限エラーもクエリエラーも、Athenaがクエリエラーと判定してしまうところにあります。

Athena returns "FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. null"

これが気をつけるポイントです。

非同期処理の制御が大変

Athenaは同期的なSQL実行ではないため、Lambda内でポーリング処理や完了確認ロジックが必要になります。

AthenaでSQLでデータを取得するにあたって、順番が非常に大事になるからです。

  • Glueにテーブル定義すること
  • S3にデータが存在すること
  • パーティション追加すること
  • クエリ実行(StartQuery→GetQurely→GetResult)

と順番がとても大事。

マネコンだと裏でよしなにやってくれますが、CDKでやるとなると、順序や依存関係の意識が必須になります。

感想

CDKでAthenaを使うのは、「抽象化されたマネコンの便利さを一つ一つ具現化していく作業」に近いです。

その分、サービス理解は格段に深まりました。
仕組みを自動化・明文化できたことで、チーム内でも共有しやすくなったのは大きなメリットです。

余談・改善点

Step Functionを導入すればよかった

AthenaをLambdaで使うにあたって手順が思ったより多かったのでStep Functionをリファクタリング時には検討しようと思います。

AWS Step Functionsは、複数の処理(Lambdaとかバッチとか)を順番に実行したり、条件分岐したり、エラー処理を組み込んだりできるサービス。
簡単に言うと、「AWS上でフロー(ワークフロー)を作るツール」です。

しかも、全部マネージドサービスだから

・並列実行
・タイムアウト
・リトライ

成功・失敗の分岐
とかが簡単に組めるので楽になると思います。

aws-glue-alpha

2025年4月現在、@aws-cdk/aws-glue-alpha というL2コンストラクトが出ています。

これを使うと、Glue周りのコードがより簡潔に書けるようになるかもしれません。

まとめ

今回、Athena + Lambda + CDK という組み合わせで、
低コストでデータ分析を自動化する構成を作ってみました。

一見シンプルに見える構成ですが、実際に手を動かしてみると

  • IAMの細かな権限設計
  • 非同期処理(Athenaクエリ実行)との向き合い方
  • Glue、S3とのリソース連携
    など、想像以上に深い理解が求められる場面が多かったです。

実際、インフラでログ管理する人や、データエンジニア界隈ではこの構成はかなりよく使われているようで、記事もよく見かけます。
シンプルながら実戦的な設計力を試される構成だと改めて感じました。

それでも、こういう実用的な構成を自分の手で作れる経験は、
AWS力アップにも直結するので、「チャレンジしたい人」にはぜひおすすめしたいです!☺️👍



フラッグシティパートナーズ海外不動産投資セミナー 【DMM FX】入金

Source link