金曜日, 6月 20, 2025
- Advertisment -
ホームニューステックニュースSnowflake Summit 2025で話題のSnowflake Openflowを試してみた

Snowflake Summit 2025で話題のSnowflake Openflowを試してみた


はじめに

Snowflake Summit 2025、すごい盛り上がりですね!
私は日本から参加者のブログやオンライン配信を追いかけて、一人でワクワクしています。
数ある発表の中でも、Day2のKeynoteで紹介された「Snowflake Openflow」が特に印象的でした。
本記事では、そのOpenflowを実際に触ってみた様子をお届けします。

1. Snowflake Openflowとは

Snowflake Openflowは、構造化テキスト、非構造化テキスト、画像、音声、動画、センサーデータなど、数百のプロセッサを搭載したあらゆるデータソースとあらゆる接続先を接続する統合サービスです。Apache NiFiを基盤とするOpenflowは、お客様独自のクラウド内でフルマネージドサービスを実行し、完全な制御を実現します

👉 引用:Openflowについて

🔍従来のデータ連携の課題

組織で「データを活用しよう」とすると、まず直面するのが、社内外に点在する多種多様なデータの収集・連携という課題です。これにより、これまでは外部ETL/ELTツールを組み合わせていくことが一般的でした。

これらはいずれも優れたツールですが、導入・運用コスト、社内教育、セキュリティ統合の構築など、多くのハードルが存在しました。

🚀 Openflowがもたらす変化

Snowflake Openflowは、こうした課題をSnowflakeのエコシステム内で解決できる公式サービスです。

視覚的なフローデザインを通じて、データの取得・変換・転送といった処理を簡潔に構築できます。特別な外部ツールを導入せずに、Snowflake内で完結するアーキテクチャが実現できます。

項目 説明
GUIベースの開発 Apache NiFiをベースとしたGUIでのフロー設計が可能
幅広いソース対応 幅広いデータソースに対応(例:PostgreSQL、SQL Server、SharePoint、Slackなど)
完全マネージド Snowflake環境内でプロビジョニングされ、セキュリティやスケーラビリティも安心

2. OpenflowでRDS PostgreSQL連携を試す

ここからは、対応データソースのひとつであるPostgreSQLを使用して、Openflowを用いたデータ連携フローの検証を進めていきます。実施する検証内容は以下のとおりです。

  • 初回同期(Snapshot Load)の確認
    データソースに検証用テーブルを作成し、Openflowを起動して初回ロードがSnowflake側のテーブルに正常に行われることを確認する
  • 継続ロード(Incremental Load)の確認
    Openflowを起動した状態でデータソースにレコードを追加し、増分データがSnowflake側に正しく反映されることを確認する

本検証では、Amazon Web Services (AWS) 上で構成された BYOC(Bring Your Own Cloud) を採用しています。なお、SPCS(Snowpark Container Services)版の対応は、今後のリリースで提供される予定です。

公式ドキュメントから構成図を引用すると、以降の手順では 左側の「BYOC Deployment」 および 右上の「Openflow」 をそれぞれ構築していく流れになります。

2.1 環境構築(デプロイメント・ランタイムを作成する)

それではドキュメントに沿ってセットアップしていきます。
👉 参考:Openflow環境のセットアップ-SnowflakeDocs

Snowflake Account

  • image repositoriesの作成
USE ROLE ACCOUNTADMIN;

CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;

  • Openflow用の権限を設定(デプロイメント統合権限・ランタイム権限)
    ドキュメントでは用途別にロールを細かく分けて設定するシナリオが紹介されていますが、今回の検証では、構成をシンプルに保つため、以下のような単一の管理ロール(OPENFLOW_ADMIN_ROLE)に権限を集約した構成としています。
USE ROLE ACCOUNTADMIN;
CREATE ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW DATA PLANE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW RUNTIME INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT ROLE OPENFLOW_ADMIN_ROLE TO USER 任意のユーザ>;
ALTER USER 任意のユーザ> SET DEFAULT_SECONDARY_ROLES = ('ALL');

AWS Account

後ほど CloudFormation を使って Openflow のデプロイメントを構成していきますが、その前に、AWS 側に以下のネットワーク関連リソースを事前に作成しておく必要があります。

リソース 説明
VPC Openflow 専用のネットワーク空間を構築
Public Subnet ×2 異なるアベイラビリティーゾーンにある 2 つのパブリックサブネット
※EKS の LB 用に以下のタグを付与する必要があります:
 ・kubernetes.io/role/elb = 1
Private Subnet ×2 異なるアベイラビリティーゾーンにある 2 つのプライベートサブネット
※少なくとも /24 の CIDR 範囲(256 IP)を確保する必要があります。ランタイム数や規模の制約を避けるため、より広い CIDR を推奨します。
IGW(Internet Gateway) パブリックサブネット用のインターネット出入口
NAT Gateway(NATGW) プライベートサブネットから外部インターネットへのアクセス用
ルートテーブル サブネット間の通信制御
・Public Subnet → IGW
・Private Subnet → NATGW

作成後の状態


Snowflake Account

次に、Openflowの画面に遷移し、デプロイメントを作成していきます

  • SnowsightからOpenflowを起動を選択

  • 権限を付与したユーザでログイン
    「Openflow用の権限を設定」を参照。

  • デプロイメントの作成
    今回はBYOCを採用するので、独自のVPCを使用するを選択し、デプロイメントを作成

  • CloudFormationの定義ファイルをダウンロード
    後ほどAWS Accountで利用する

  • (オプション)NWポリシーを設定する
    NWポリシーを使用してSnowflakeへのアクセスを制御している場合は、以下を実施する。NAT_GATEWAY_PUBLIC_IPは、先ほど作成したNAT GATEWAYのものを設定

    
    USE ROLE ACCOUNTADMIN;
    USE DATABASE {REPLACE_WITH_YOUR_DB_NAME};
    
    CREATE NETWORK RULE allow_openflow_deployment
    MODE = INGRESS
    TYPE = IPV4
    VALUE_LIST = ('{$NAT_GATEWAY_PUBLIC_IP}/32');
    
    
    SHOW PARAMETERS LIKE 'NETWORK_POLICY' IN ACCOUNT;
    
    ALTER NETWORK POLICY {ENTER_YOUR_ACTIVE_NETWORK_POLICY_NAME} ADD ALLOWED_NETWORK_RULE_LIST = (allow_openflow_deployment);
    
    
    
  • イベントテーブルを作成する

    
    USE ROLE ACCOUNTADMIN;
    GRANT create event table on schema OPENFLOW.OPENFLOW to role $ROLE_OF_DEPLOYMENT_OWNER;
    
    USE ROLE $ROLE_OF_DEPLOYMENT_OWNER;
    CREATE event table if not exists openflow.openflow.openflow_events;
    
    
    SHOW openflow data plane integrations;
    
    ALTER openflow data plane integration
    $OPENFLOW_deployment_UUID
    SET event_table = 'openflow.openflow.openflow_events';
    
    
    

AWS Account

  • ダウンロードしたCloudFormation定義を使ってデプロイメントを作成する

    事前に用意したVPCとprivate subnet2つを入力する(それ以外はデフォルトでOK)公式ドキュメントのとおり、45分ほど作成に要する

Snowflake Account

  • デプロイメントの作成が完了したら、以下のようにステータスがActiveとなる
  • ランタイム作成
    ランタイムの名前を入力し、ノードタイプ、サイズ等を選択する。
    ランタイムの作成は数分で完了する

⚠️ 補足:EC2課金に注意!
ランタイムの作成が完了すると、AWSには以下のように3つのEC2インスタンスが稼働している状態になります。

検証用途で常時稼働が不要な場合は、不要な時間帯はインスタンスを停止することを推奨します。

2.2 環境構築(ソースデータベースを用意する)

ランタイム環境の作成まで完了しました。次にレプリケーション元となるPostgreSQLの作成、設定をしていきましょう。
👉 参考:PostgreSQL用のコネクタを設定する-SnowflakeDocs

AWS Account RDS PostgreSQLの作成と設定

  • AWSコンソールからRDS PostgreSQLを作成します。

  • パラメータグループの変更
    以下パラメータを変更し、RDSを再起動します。

    key value
    rds.logical_replication 1
  • PostgreSQLにpsqlで接続し、レプリケーションがアクティブになっていることを確認します

    SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication');
    
              name           | setting 
    
     rds.logical_replication | on
     wal_level               | logical
    (2 rows)
    
    
  • コネクタ用のユーザー作成と権限付与

    CREATE USER openflow_user WITH PASSWORD '';
    GRANT rds_replication TO openflow_user;
    
    \du OPENFLOW_USER;
                     List of roles
       Role name   | Attributes |     Member of     
    
     openflow_user |            | {rds_replication}
    
    
  • レプリケーション用のテーブルの作成、データの挿入

    CREATE TABLE sample_data (
        id SERIAL PRIMARY KEY,
        name TEXT NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    
    INSERT INTO sample_data (name)
    VALUES
        ('Openflow連携テスト1'),
        ('Openflow連携テスト2');
    
    
     id |        name         |        created_at        
    
      1 | Openflow連携テスト1 | 2025-06-02 09:44:08.2517
      2 | Openflow連携テスト2 | 2025-06-02 09:44:08.2517
    
    
    
  • publicationの作成、テーブルの追加
    publicationを作成し、先ほど設定したテーブルを登録します。

    CREATE PUBLICATION openflow_pub;
    ALTER PUBLICATION openflow_pub ADD TABLE sample_data;  
    
  • 先ほど作成したユーザにテーブルのSELECT、スキーマのUSAGE権限を付与

    GRANT USAGE ON SCHEMA public TO openflow_user;
    GRANT SELECT ON TABLE sample_data TO openflow_user;
    

Snowflake Account レプリケーション用ユーザ/データベース作成

  • ワークシートから以下クエリを実行します。
    CREATE DATABASE openflow_replicated_data;
    
    CREATE USER openflow_svc_user 
        TYPE = SERVICE 
        COMMENT = 'Service user for automated access of Openflow';
    
    CREATE ROLE openflow_service_role;
    
    GRANT ROLE openflow_service_role TO USER openflow_svc_user;
    
    GRANT USAGE ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
    GRANT CREATE SCHEMA ON DATABASE openflow_replicated_data TO ROLE openflow_service_role;
    
    CREATE WAREHOUSE openflow_compute_wh
        WITH 
            WAREHOUSE_SIZE = 'MEDIUM'
            AUTO_SUSPEND = 300
            AUTO_RESUME = TRUE;
    
    GRANT USAGE, OPERATE ON WAREHOUSE openflow_compute_wh TO ROLE openflow_service_role;
    
    ALTER USER openflow_svc_user SET RSA_PUBLIC_KEY =  '';
    
    

2.3 コネクタ定義を設定する

それでは、コネクタ定義の設定に入っていきましょう
👉 参考コネクタの設定

  • OpenflowのOverviewからPostgreSQLコネクタを追加します

  • 先ほど作成したruntimeを指定してAdd

  • 数分待つと、Openflowのキャンバス画面に遷移

  • コネクタの設定
    プロセスグループを右クリックし、Parametersをクリックして、各パラメータを設定していきます。例(入力値)はこの記事で設定してきたもの。

PostgreSQL Source Parameters Context

Parameter 説明 例(入力値)
Postgres Connection URL ソースデータベースへの完全なJDBC URL。
例: jdbc:postgresql://example.com:5432/postgres
jdbc:postgresql://:5432/postgres
Postgres JDBC Driver PostgreSQL JDBC ドライバー jarへのパス。Webサイトからjarをダウンロードし、「参照アセット」チェックボックスをオンにしてアップロードして添付します。
Postgres SSL Mode SSL接続を有効または無効にします。 Enable or disable
Postgres Root SSL Certificate データベースのルート証明書の完全な内容。SSLが無効なら空欄でも可。
Postgres Username コネクタのユーザー名。 openflow_user
Postgres Password コネクタのパスワード。
Publication Name 先ほど作成したパブリケーションの名前。 openflow_pub

PostgreSQL Destination Parameters Context

Parameter 説明 例(入力値)
Destination Database データが保存されるデータベース。Snowflakeに既に存在している必要があります。 OPENFLOW_REPLICATED_DATA
Snowflake Account Identifier データが保存される [組織名]-[アカウント名] 形式の Snowflake アカウント名
Snowflake Authentication Strategy Snowflakeへの認証戦略。可能な値:SNOWFLAKE_SESSION_TOKEN(SPCSで実行時)、KEY_PAIR(秘密鍵を使用) KEY_PAIR
Snowflake Private Key 認証に使用するRSA秘密鍵。PKCS8標準に準拠し、PEM形式のヘッダーとフッターを持つ必要があります。 -----BEGIN PRIVATE KEY----- ...
Snowflake Private Key File RSA秘密鍵を含むファイル(-----BEGIN PRIVATE KEY----- で始まる)。「参照アセット」チェックボックスをオンにしてアップロードします。 snowflake_key.p8
openflow_svc_user用に生成した秘密鍵
Snowflake Private Key Password Snowflake秘密鍵ファイルに関連付けられたパスワード パスワードをかけている場合
Snowflake Role クエリ実行中に使用されるSnowflakeロール OPENFLOW_SERVICE_ROLE
Snowflake Username Snowflakeインスタンスへの接続に使用するユーザー名 OPENFLOW_SVC_USER
Snowflake Warehouse クエリを実行するために使用するSnowflakeウェアハウス OPENFLOW_COMPUTE_WH

PostgreSQL Ingestion Parameters Context

Parameter 説明 例(入力値)
Included Table Names スキーマを含むテーブルパスのコンマ区切りリスト。
例: public.my_table, other_schema.other_table
public.sample_data
Included Table Regex テーブルパスにマッチする正規表現。マッチしたテーブルはすべて複製対象となり、後から追加されたテーブルも対象になります。
例: public\.auto_.*
public.*
Filter JSON レプリケーション対象のテーブル名と、列名を指定する正規表現パターンのJSON。
例:[{"schema":"public", "table":"table1", "includedPattern":".*name"}]table1 内の name で終わる列を含めます。
Merge Task Schedule CRON ジャーナルから宛先テーブルへのマージ処理の実行タイミングを定義するCRON式。
例:
* 0 * * * ? → 毎時ちょうどに1分間のマージ
* 20 14 ? * MON-FRI → 平日14:20にマージ実行
※詳細は Quartz のCRONトリガーチュートリアル参照
* 0 * * * ?
  • Controller Serviceを有効化
    Enable All Controller Serviceで全てを有効化します。

2.4 初回同期(Snapshot Load)の確認

それでは、RDS PostgreSQLで作成したテーブルsample_dataをSnowflake側に同期させましょう

postgres=> select * from sample_data;
 id |        name         |         created_at         

  1 | Openflow連携テスト1 | 2025-06-04 09:04:03.086614
  2 | Openflow連携テスト2 | 2025-06-04 09:04:03.086614


Startを押下

少し待って確認すると、Snowflake側にロードされていることが確認できました!

2.5 継続ロード(Incremental Load)の確認

それでは、RDS側に新たなレコードを追加してみましょう


postgres=> INSERT INTO sample_data (name)
VALUES                               
    ('Openflow連携テスト3'),
    ('Openflow連携テスト4');

postgres=> select * from sample_data;
 id |        name         |         created_at         

  1 | Openflow連携テスト1 | 2025-06-04 09:04:03.086614
  2 | Openflow連携テスト2 | 2025-06-04 09:04:03.086614
  3 | Openflow連携テスト3 | 2025-06-05 04:10:10.497407
  4 | Openflow連携テスト4 | 2025-06-05 04:10:10.497407
(4 rows)


追加したレコードがSnowflake側に挿入されていますね!

おわりに

今回は、構造化データであるRDS PostgreSQLを対象に、OpenflowによるSnowflakeへのデータ連携を実践してみました。

GUIベースでデータ連携フローをここまでスムーズに行えるとは、正直驚きでした。今後さらに活用が広がる予感がします。

検証にはBYOC構成を使用しましたが、今後登場するSPCS構成により、Openflowのユーザー体験がさらに向上することにも期待しています。

次回は、画像やPDFなど非構造化データを対象に、Openflowの実力を引き続き検証していきたいと思います。お楽しみに!



Source link

Views: 0

RELATED ARTICLES

返事を書く

あなたのコメントを入力してください。
ここにあなたの名前を入力してください

- Advertisment -