はじめに
Snowflake Summit 2025、すごい盛り上がりですね!
私は日本から参加者のブログやオンライン配信を追いかけて、一人でワクワクしています。
数ある発表の中でも、Day2のKeynoteで紹介された「Snowflake Openflow」が特に印象的でした。
本記事では、そのOpenflowを実際に触ってみた様子をお届けします。
1. Snowflake Openflowとは
Snowflake Openflowは、構造化テキスト、非構造化テキスト、画像、音声、動画、センサーデータなど、数百のプロセッサを搭載したあらゆるデータソースとあらゆる接続先を接続する統合サービスです。Apache NiFiを基盤とするOpenflowは、お客様独自のクラウド内でフルマネージドサービスを実行し、完全な制御を実現します
👉 引用:Openflowについて
🔍従来のデータ連携の課題
組織で「データを活用しよう」とすると、まず直面するのが、社内外に点在する多種多様なデータの収集・連携という課題です。これにより、これまでは外部ETL/ELTツールを組み合わせていくことが一般的でした。
これらはいずれも優れたツールですが、導入・運用コスト、社内教育、セキュリティ統合の構築など、多くのハードルが存在しました。
🚀 Openflowがもたらす変化
Snowflake Openflowは、こうした課題をSnowflakeのエコシステム内で解決できる公式サービスです。
視覚的なフローデザインを通じて、データの取得・変換・転送といった処理を簡潔に構築できます。特別な外部ツールを導入せずに、Snowflake内で完結するアーキテクチャが実現できます。
項目 | 説明 |
---|---|
GUIベースの開発 | Apache NiFiをベースとしたGUIでのフロー設計が可能 |
幅広いソース対応 | 幅広いデータソースに対応(例:PostgreSQL、SQL Server、SharePoint、Slackなど) |
完全マネージド | Snowflake環境内でプロビジョニングされ、セキュリティやスケーラビリティも安心 |
2. OpenflowでRDS PostgreSQL連携を試す
ここからは、対応データソースのひとつであるPostgreSQLを使用して、Openflowを用いたデータ連携フローの検証を進めていきます。実施する検証内容は以下のとおりです。
- 初回同期(Snapshot Load)の確認
データソースに検証用テーブルを作成し、Openflowを起動して初回ロードがSnowflake側のテーブルに正常に行われることを確認する - 継続ロード(Incremental Load)の確認
Openflowを起動した状態でデータソースにレコードを追加し、増分データがSnowflake側に正しく反映されることを確認する
本検証では、Amazon Web Services (AWS) 上で構成された BYOC(Bring Your Own Cloud) を採用しています。なお、SPCS(Snowpark Container Services)版の対応は、今後のリリースで提供される予定です。
公式ドキュメントから構成図を引用すると、以降の手順では 左側の「BYOC Deployment」 および 右上の「Openflow」 をそれぞれ構築していく流れになります。
2.1 環境構築(デプロイメント・ランタイムを作成する)
それではドキュメントに沿ってセットアップしていきます。
👉 参考:Openflow環境のセットアップ-SnowflakeDocs
Snowflake Account
- image repositoriesの作成
USE ROLE ACCOUNTADMIN;
CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;
- Openflow用の権限を設定(デプロイメント統合権限・ランタイム権限)
ドキュメントでは用途別にロールを細かく分けて設定するシナリオが紹介されていますが、今回の検証では、構成をシンプルに保つため、以下のような単一の管理ロール(OPENFLOW_ADMIN_ROLE)に権限を集約した構成としています。
USE ROLE ACCOUNTADMIN;
CREATE ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW DATA PLANE INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT CREATE OPENFLOW RUNTIME INTEGRATION ON ACCOUNT TO ROLE OPENFLOW_ADMIN_ROLE;
GRANT ROLE OPENFLOW_ADMIN_ROLE TO USER 任意のユーザ>;
ALTER USER 任意のユーザ> SET DEFAULT_SECONDARY_ROLES = ('ALL');
AWS Account
後ほど CloudFormation を使って Openflow のデプロイメントを構成していきますが、その前に、AWS 側に以下のネットワーク関連リソースを事前に作成しておく必要があります。
リソース | 説明 |
---|---|
VPC | Openflow 専用のネットワーク空間を構築 |
Public Subnet ×2 | 異なるアベイラビリティーゾーンにある 2 つのパブリックサブネット ※EKS の LB 用に以下のタグを付与する必要があります: ・ kubernetes.io/role/elb = 1
|
Private Subnet ×2 | 異なるアベイラビリティーゾーンにある 2 つのプライベートサブネット ※少なくとも /24 の CIDR 範囲(256 IP)を確保する必要があります。ランタイム数や規模の制約を避けるため、より広い CIDR を推奨します。 |
IGW(Internet Gateway) | パブリックサブネット用のインターネット出入口 |
NAT Gateway(NATGW) | プライベートサブネットから外部インターネットへのアクセス用 |
ルートテーブル | サブネット間の通信制御 ・Public Subnet → IGW ・Private Subnet → NATGW |
作成後の状態
Snowflake Account
次に、Openflowの画面に遷移し、デプロイメントを作成していきます
-
SnowsightからOpenflowを起動を選択
-
権限を付与したユーザでログイン
「Openflow用の権限を設定」を参照。 -
デプロイメントの作成
今回はBYOCを採用するので、独自のVPCを使用するを選択し、デプロイメントを作成 -
CloudFormationの定義ファイルをダウンロード
後ほどAWS Accountで利用する -
(オプション)NWポリシーを設定する
NWポリシーを使用してSnowflakeへのアクセスを制御している場合は、以下を実施する。NAT_GATEWAY_PUBLIC_IPは、先ほど作成したNAT GATEWAYのものを設定USE ROLE ACCOUNTADMIN; USE DATABASE {REPLACE_WITH_YOUR_DB_NAME}; CREATE NETWORK RULE allow_openflow_deployment MODE = INGRESS TYPE = IPV4 VALUE_LIST = ('{$NAT_GATEWAY_PUBLIC_IP}/32'); SHOW PARAMETERS LIKE 'NETWORK_POLICY' IN ACCOUNT; ALTER NETWORK POLICY {ENTER_YOUR_ACTIVE_NETWORK_POLICY_NAME} ADD ALLOWED_NETWORK_RULE_LIST = (allow_openflow_deployment);
-
イベントテーブルを作成する
USE ROLE ACCOUNTADMIN; GRANT create event table on schema OPENFLOW.OPENFLOW to role $ROLE_OF_DEPLOYMENT_OWNER; USE ROLE $ROLE_OF_DEPLOYMENT_OWNER; CREATE event table if not exists openflow.openflow.openflow_events; SHOW openflow data plane integrations; ALTER openflow data plane integration $OPENFLOW_deployment_UUID SET event_table = 'openflow.openflow.openflow_events';
AWS Account
- ダウンロードしたCloudFormation定義を使ってデプロイメントを作成する
事前に用意したVPCとprivate subnet2つを入力する(それ以外はデフォルトでOK)公式ドキュメントのとおり、45分ほど作成に要する
Snowflake Account
- デプロイメントの作成が完了したら、以下のようにステータスがActiveとなる
- ランタイム作成
ランタイムの名前を入力し、ノードタイプ、サイズ等を選択する。
ランタイムの作成は数分で完了する
⚠️ 補足:EC2課金に注意!
ランタイムの作成が完了すると、AWSには以下のように3つのEC2インスタンスが稼働している状態になります。
検証用途で常時稼働が不要な場合は、不要な時間帯はインスタンスを停止することを推奨します。
2.2 環境構築(ソースデータベースを用意する)
ランタイム環境の作成まで完了しました。次にレプリケーション元となるPostgreSQLの作成、設定をしていきましょう。
👉 参考:PostgreSQL用のコネクタを設定する-SnowflakeDocs
AWS Account RDS PostgreSQLの作成と設定
-
AWSコンソールからRDS PostgreSQLを作成します。
-
パラメータグループの変更
以下パラメータを変更し、RDSを再起動します。key value rds.logical_replication 1 -
PostgreSQLにpsqlで接続し、レプリケーションがアクティブになっていることを確認します
SELECT name,setting FROM pg_settings WHERE name IN ('wal_level','rds.logical_replication'); name | setting rds.logical_replication | on wal_level | logical (2 rows)
-
コネクタ用のユーザー作成と権限付与
CREATE USER openflow_user WITH PASSWORD ''; GRANT rds_replication TO openflow_user; \du OPENFLOW_USER; List of roles Role name | Attributes | Member of openflow_user | | {rds_replication}
-
レプリケーション用のテーブルの作成、データの挿入
CREATE TABLE sample_data ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); INSERT INTO sample_data (name) VALUES ('Openflow連携テスト1'), ('Openflow連携テスト2'); id | name | created_at 1 | Openflow連携テスト1 | 2025-06-02 09:44:08.2517 2 | Openflow連携テスト2 | 2025-06-02 09:44:08.2517
-
publicationの作成、テーブルの追加
publicationを作成し、先ほど設定したテーブルを登録します。CREATE PUBLICATION openflow_pub; ALTER PUBLICATION openflow_pub ADD TABLE sample_data;
-
先ほど作成したユーザにテーブルのSELECT、スキーマのUSAGE権限を付与
GRANT USAGE ON SCHEMA public TO openflow_user; GRANT SELECT ON TABLE sample_data TO openflow_user;
Snowflake Account レプリケーション用ユーザ/データベース作成
- ワークシートから以下クエリを実行します。
CREATE DATABASE openflow_replicated_data; CREATE USER openflow_svc_user TYPE = SERVICE COMMENT = 'Service user for automated access of Openflow'; CREATE ROLE openflow_service_role; GRANT ROLE openflow_service_role TO USER openflow_svc_user; GRANT USAGE ON DATABASE openflow_replicated_data TO ROLE openflow_service_role; GRANT CREATE SCHEMA ON DATABASE openflow_replicated_data TO ROLE openflow_service_role; CREATE WAREHOUSE openflow_compute_wh WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE openflow_compute_wh TO ROLE openflow_service_role; ALTER USER openflow_svc_user SET RSA_PUBLIC_KEY = '';
2.3 コネクタ定義を設定する
それでは、コネクタ定義の設定に入っていきましょう
👉 参考コネクタの設定
-
OpenflowのOverviewからPostgreSQLコネクタを追加します
-
先ほど作成したruntimeを指定してAdd
-
数分待つと、Openflowのキャンバス画面に遷移
-
コネクタの設定
プロセスグループを右クリックし、Parametersをクリックして、各パラメータを設定していきます。例(入力値)はこの記事で設定してきたもの。
PostgreSQL Source Parameters Context
Parameter | 説明 | 例(入力値) |
---|---|---|
Postgres Connection URL | ソースデータベースへの完全なJDBC URL。 例: jdbc:postgresql://example.com:5432/postgres
|
jdbc:postgresql://:5432/postgres |
Postgres JDBC Driver | PostgreSQL JDBC ドライバー jarへのパス。Webサイトからjarをダウンロードし、「参照アセット」チェックボックスをオンにしてアップロードして添付します。 | |
Postgres SSL Mode | SSL接続を有効または無効にします。 | Enable or disable |
Postgres Root SSL Certificate | データベースのルート証明書の完全な内容。SSLが無効なら空欄でも可。 | |
Postgres Username | コネクタのユーザー名。 | openflow_user |
Postgres Password | コネクタのパスワード。 | |
Publication Name | 先ほど作成したパブリケーションの名前。 | openflow_pub |
PostgreSQL Destination Parameters Context
Parameter | 説明 | 例(入力値) |
---|---|---|
Destination Database | データが保存されるデータベース。Snowflakeに既に存在している必要があります。 | OPENFLOW_REPLICATED_DATA |
Snowflake Account Identifier | データが保存される [組織名]-[アカウント名] 形式の Snowflake アカウント名 | |
Snowflake Authentication Strategy | Snowflakeへの認証戦略。可能な値:SNOWFLAKE_SESSION_TOKEN (SPCSで実行時)、KEY_PAIR (秘密鍵を使用) |
KEY_PAIR |
Snowflake Private Key | 認証に使用するRSA秘密鍵。PKCS8標準に準拠し、PEM形式のヘッダーとフッターを持つ必要があります。 | -----BEGIN PRIVATE KEY----- ... |
Snowflake Private Key File | RSA秘密鍵を含むファイル(-----BEGIN PRIVATE KEY----- で始まる)。「参照アセット」チェックボックスをオンにしてアップロードします。 |
snowflake_key.p8 openflow_svc_user用に生成した秘密鍵 |
Snowflake Private Key Password | Snowflake秘密鍵ファイルに関連付けられたパスワード | パスワードをかけている場合 |
Snowflake Role | クエリ実行中に使用されるSnowflakeロール | OPENFLOW_SERVICE_ROLE |
Snowflake Username | Snowflakeインスタンスへの接続に使用するユーザー名 | OPENFLOW_SVC_USER |
Snowflake Warehouse | クエリを実行するために使用するSnowflakeウェアハウス | OPENFLOW_COMPUTE_WH |
PostgreSQL Ingestion Parameters Context
Parameter | 説明 | 例(入力値) |
---|---|---|
Included Table Names | スキーマを含むテーブルパスのコンマ区切りリスト。 例: public.my_table, other_schema.other_table
|
public.sample_data |
Included Table Regex | テーブルパスにマッチする正規表現。マッチしたテーブルはすべて複製対象となり、後から追加されたテーブルも対象になります。 例: public\.auto_.*
|
public.* |
Filter JSON | レプリケーション対象のテーブル名と、列名を指定する正規表現パターンのJSON。 例: [{"schema":"public", "table":"table1", "includedPattern":".*name"}] は table1 内の name で終わる列を含めます。 |
|
Merge Task Schedule CRON | ジャーナルから宛先テーブルへのマージ処理の実行タイミングを定義するCRON式。 例: • * 0 * * * ? → 毎時ちょうどに1分間のマージ• * 20 14 ? * MON-FRI → 平日14:20にマージ実行※詳細は Quartz のCRONトリガーチュートリアル参照 |
* 0 * * * ? |
- Controller Serviceを有効化
Enable All Controller Serviceで全てを有効化します。
2.4 初回同期(Snapshot Load)の確認
それでは、RDS PostgreSQLで作成したテーブルsample_dataをSnowflake側に同期させましょう
postgres=> select * from sample_data;
id | name | created_at
1 | Openflow連携テスト1 | 2025-06-04 09:04:03.086614
2 | Openflow連携テスト2 | 2025-06-04 09:04:03.086614
Startを押下
少し待って確認すると、Snowflake側にロードされていることが確認できました!
2.5 継続ロード(Incremental Load)の確認
それでは、RDS側に新たなレコードを追加してみましょう
postgres=> INSERT INTO sample_data (name)
VALUES
('Openflow連携テスト3'),
('Openflow連携テスト4');
postgres=> select * from sample_data;
id | name | created_at
1 | Openflow連携テスト1 | 2025-06-04 09:04:03.086614
2 | Openflow連携テスト2 | 2025-06-04 09:04:03.086614
3 | Openflow連携テスト3 | 2025-06-05 04:10:10.497407
4 | Openflow連携テスト4 | 2025-06-05 04:10:10.497407
(4 rows)
追加したレコードがSnowflake側に挿入されていますね!
おわりに
今回は、構造化データであるRDS PostgreSQLを対象に、OpenflowによるSnowflakeへのデータ連携を実践してみました。
GUIベースでデータ連携フローをここまでスムーズに行えるとは、正直驚きでした。今後さらに活用が広がる予感がします。
検証にはBYOC構成を使用しましたが、今後登場するSPCS構成により、Openflowのユーザー体験がさらに向上することにも期待しています。
次回は、画像やPDFなど非構造化データを対象に、Openflowの実力を引き続き検証していきたいと思います。お楽しみに!
Views: 0