Amazon Kinesis から Redshift Serverless へ AWS Glue Streaming ETL でつなぐパイプライン構築

目次
はじめに
クラウドインテグレーション部、
クラウドソリューション2課の川井です。
近年、リアルタイムでデータを収集・加工・蓄積していくニーズが高まっており、
AWSのサービス群を組み合わせることで、柔軟かつ拡張性の高いストリーミング処理基盤を構築できます。
ストリーミングETL処理には、Amazon Managed Service for Apache Flink(旧名:Amazon Kinesis Data Analytics)や、Amazon EMR(Serverless)など複数の選択肢がありますが、今回は AWS Glue(以下、Glue)Streaming ETL を使用して実装していきます。
Glue Streaming ETLは、「手軽に、リアルタイムで、サーバレスかつ運用負荷なく」 データ連携基盤を構築できる、非常に柔軟で使いやすいマネージドサービスですが、一方で いくつか注意すべきポイントもあります。
Glue Streaming ETL ジョブは、バッチジョブと異なり 「常時起動してストリームを監視し続ける」 設計のため、ジョブを動かしっぱなしにしている間は継続的にコストが発生します。
また、処理の実行単位は 「ウィンドウサイズ(デフォルト:100秒)」ごとにまとめて行われるため、リアルタイム性を強く求められるシーン(即時の反応が必要なアラート分析など)にはやや不向きなケースもあります。
ウィンドウサイズを短く設定することでリアルタイム性は向上しますが、その分処理頻度が高くなり、DPU(Data Processing Unit)の使用回数が増加するため、コストの上昇につながる可能性があります。
しかし、Glue Streaming ETLは、Sparkに関する専門知識がなくても、GUIベースで構成・実行できる点が大きな特長です。
AWS Glue Studioを使えば、ジョブの作成からデータ変換、出力の設定までをコードを一切書かずに構築可能であり、
データエンジニア以外のエンジニアでも容易にパイプラインを構築できます。
本記事では、Amazon Kinesis Data Streams(以下、Kinesis Data Streams)から Glue Streaming ETLを用いて、データをリアルタイムに処理し Amazon Redshift Serverless(以下、Redshift Serverless)へ連携するパイプラインの構築手順を紹介します。
本記事の内容は以下の通りです。
✅ Kinesis Data Streams 作成とデータ送信
✅ Redshift Serverlessの作成とデータ連携設定
✅ Glue Streaming ETL ジョブの作成
構成の前提
- VPCやサブネット、VPCエンドポイントなど、ネットワーク関連のリソースは事前に作成されているものとします。
- 必要なVPCエンドポイントは「AWS Secrets Manager」、「AWS STS」、「Amazon S3」、「Kinesis Data Streams」
- Kinesis Data Streamsへのデータレコードの送信には Amazon Kinesis Data Generatorを使用します。
- Amazon Kinesis Data Generatorは事前に作成されているものとします。
Amazon Kinesis Data Generatorは、以下のGitHubリポジトリおよび公式ブログを参考に構築できます。 - IAM ポリシーについては、本記事では検証を目的としているため、簡便性を優先し一時的に広い権限を付与します。
※本番環境では 最小権限の原則(Principle of Least Privilege) に従い、必要最小限の権限のみを設定するようにお願いします。
全体の構成
今回、作成するデータパイプラインの通信フローと全体構成は以下の通りとなります。
〇 Redshift Serverless
本構成では、RedshiftのServerlessタイプを使用しています。
Redshift Serverlessは、従来のプロビジョニング型とは異なり、事前にクラスターサイズやノード数を設定する必要がなく、
使用した分だけの課金が行われるサービスです。
Redshift Serverlessは、VPC内に直接デプロイされるわけではありません。代わりに、Redshift Serverless側で作成されるVPCエンドポイントを介して、VPC内のリソースからアクセスする設計になっています。
VPCエンドポイントは、Redshift Serverlessを作成したタイミングで自動的に作成・関連付けされるため、ユーザーが明示的に構築する必要はありません。
〇 Glue Connection(JDBC接続)
Glueジョブでは、Redshift Serverlessに接続するために JDBC接続を利用します。
これは、Glueジョブが Javaベースで構成されており、Redshiftとの連携に JDBCドライバを用いるためです。
Glue自体はパブリックサービスとして提供されており、Amazon EC2などのようにVPC内に直接デプロイされるわけではありません。しかし、Glueジョブが JDBC接続を行う際には、Glue Connectionを通じて対象の VPC内にインターフェースが自動的に作成されます。
この構成により、Glueジョブが AWSサービスへアクセスする場合、VPC内からのトラフィックとして扱われるため、
対応する以下のVPCエンドポイントが必要になります。
- Amazon Kinesis Data Streams(ストリーミングデータ取得)
- AWS Secrets Manager(接続情報などの取得)
- AWS STS(IAM 認証トークンの取得)
- Amazon S3(チェックポイント等の保存)
以上が、全体構成の説明になります。
それでは、Kinesis Data Streamsから作成していきます。
Kinesis Data Streamsの作成とデータ送信
以下の赤枠部分を作成していきます。
Kinesisコンソール画面で「データストリームの作成」へと進んでいきます。
Kinesis Data Streamsを作成する際は、ストリーム名とスループット容量の設定が必要です。
容量モードには以下の2つがあります。
- オンデマンド:スループットが自動スケーリング。予測が難しい場合に最適。
- プロビジョンド:ユーザーがシャード数を指定してスループットを固定。
本構成では、パーティションキーによる順序性は特に重視せず、また検証目的でミニマムな構成とするため、
プロビジョンドモードを選択し、シャード数は1つに設定しています。
作成が問題なく完了すると、ステータスは以下の画像のように「アクティブ」と表示されます。
続いて、Amazon Kinesis Data Generatorを使用して、Kinesis Data Streamsにデータを送信していきます。
送信するデータは、以下の通りです。
{
"sensorId": {{random.number(100)}},
"currentTemperature": {{random.number(
{
"min":10,
"max":150
}
)}},
"status": "{{random.arrayElement(
["OK","FAIL","WARN"]
)}}"
}
Kinesis Data Streamsに対して約700件ほどのレコードを送信したため、データが正しくストリームに取り込まれているかを確認します。
Kinesisコンソールから「データビューワー」に進み、以下のようにレコードが表示されていれば、
ストリームへのデータ取り込みが正常に行われていることを確認できます。
以上で、Kinesis Data Streamsの作成とデータ送信は完了しましたので、
続いて、Redshift Serverlessの作成とデータ連携設定を実施していきます。
Redshift Serverlessの作成とデータ連携設定
以下の赤枠部分を作成していきます。
〇 Redshift Serverlessの構成
Redshift Serverlessでは、「ワークグループ」と「名前空間」 という2つの概念で構成が分かれています。
この構成は、Redshift Serverless 特有の概念です。
- ワークグループ:クエリを実行するためのサーバレスな実行環境です。 JDBC/ODBC接続時のエンドポイントとしても機能し、実際のクエリ処理はこのワークグループで行われます。
- 名前空間(Namespace):データベースやスキーマ、テーブルなどのデータ定義と保存領域を管理する論理コンテナです。データそのものはこの名前空間に保持され、複数のワークグループから参照可能です。
ワークグループがクエリを処理し、名前空間に格納されたデータへアクセスするという形で構成されます。
〇 ワークグループ・名前空間作成
Redshiftコンソール画面で「ワークグループを作成」へと進んでいきます。
ワークグループを作成する際は、ワークグループ名とパフォーマンス、ネットワークとセキュリティの設定などを行います。
以下の画面ではまず、ワークグループ名を入力します。
次に、パフォーマンスとコストに関する設定として、RPU(Redshift Processing Units)を構成します。RPUは、クエリ実行時に割り当てられるコンピューティングリソースの単位で、サーバレス環境での処理性能に直接影響します。
本構成では、検証用途かつ軽量構成とするため、RPUは最小の「8」を選択しています。
これは Redshift Serverlessがサポートする最小のベースキャパシティとなります。
続いて、ネットワークとセキュリティの設定として、Redshiftを配置するVPCやサブネットを指定します。
これにより、VPC内のリソースからの接続が可能になります。
セキュリティグループについては後ほど設定を行いますが、Glue Connection(JDBC 接続)から Redshiftにアクセスできるよう、インバウンド通信を許可するルールを追加する必要があります。
また、「拡張された VPC のルーティング」については、本構成では 無効(オフ)にしています。
今回、Redshift Serverless 側から VPC内を経由して他の AWSサービスへアクセスさせる必要がないためです。
次の画面では、名前空間の作成となります。
データベース名・パスワード、関連付けられるIAMロールの設定などを行います。
データベース名とパスワードでは、初期データベース名と管理ユーザーの認証情報を設定します。
- デフォルトのデータベース名は devで、変更はできません(予約済み名)
- 管理ユーザーについては「管理ユーザー認証情報をカスタマイズ」のチェックを有効にし、ユーザー名に admin を指定しています。
- パスワードは、Redshiftが自動生成するパスワードオプションを利用し、シンプルにセットアップを進めています。
この構成により、後のJDBC接続時に使う認証情報がここで定義されることになります。
なお、JDBC接続の設定の際に、この認証情報が自動的に AWS Secrets Managerに保存される仕組みになっているため、
本ステップであらかじめ AWS Secrets Managerに認証情報を管理してもいいと思います。
許可の設定では、「デフォルトの IAMロールを作成」 というオプションが表示されており、
AWSが提供する「AmazonRedshiftAllCommandsFullAccess」ポリシーが付与された IAMロールを自動生成できますので、こちらを作成して関連付けます。
ワークグループと名前空間の作成が問題なく完了すると、ステータスが以下の画像のように「Available」と表示されます。
また、作成したワークグループを確認すると「VPCエンドポイント」が作成されていることが確認できます。
こちらのVPCエンドポイントを経由して、Redshift Serverlessにアクセスすることになります。
〇 データベースとテーブルの作成
続いて、Redshift内にKinesis Data Streamsをデータを格納するためのデータベースとテーブルを作成します。
Redshiftへの接続には「クエリエディタv2」を使用します。
Redshiftコンソール画面で「クエリエディタv2」へと進んでいきます。
クエリエディタv2で以下のコマンドを実行して、データベースを作成します。
CREATE DATABASE sensor_db;
作成が完了すると、以下の画像のように「sensor_db」が表示されます。
画面右上のドロップダウンから 接続先のデータベースを選択することができますので、
先程作成した「sensor_db」を選択します。
続いて、以下のコマンドを実行して スキーマおよびテーブルを作成します。
テーブルのカラム構成は、Kinesis Data Streamsから取り込まれるデータ形式に合わせて定義しています。
なお、uuid カラムは一意な値を保持させたい項目のため、GlueのETLジョブ側で動的に生成・追加する設計としています。
CREATE SCHEMA sensor_schema;
CREATE TABLE sensor_schema.sensor_data (
sensorid INTEGER,
currenttemperature INTEGER,
status VARCHAR(50),
uuid VARCHAR(100)
)
DISTKEY(sensor_id)
〇 データ連携設定
次に、Redshift Serverless と Glueジョブを接続するために必要な Glue Connection(JDBC 接続)を作成していきます。
※Glue Connectionを作成すると、Redshift Serverless側のセキュリティグループが自動的にアタッチされます。
Redshiftにアクセスできるようにするためには、セキュリティグループに「自己参照ルール」を追加する必要がありますので、セキュリティグループにこちらの設定を入れてから Connectionの作成を行ってください。
なお、検証目的であれば一時的にインバウンドルールを全開放(0.0.0.0/0)しても接続自体は可能です。
Glueコンソール画面で「Create connection」へと進んでいきます。
「Choose data source」画面では「Amazon Redshift」を選択します。
次の画面では、接続先データベースや認証情報を指定します。以下の設定を行っています。
- Database instances:接続先のワークグループ(作成した test-streaming-workgroup)を選択します。
- Database name:対象となるデータベース(作成した sensor_db)を指定します。
- Credential type: 「Username and password」を選択し、Redshift Serverless作成時に設定した管理ユーザーの認証情報を直接入力しています。
- IAM service role:Glue が Secrets Manager や VPC内のリソースへアクセスするために使用する IAMロールを選択します。
※このIAMロールは、Glueジョブと併用して使用するため、以下のポリシーをアタッチしています。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Glue Connectionの作成が完了したら、接続テストを実行して Redshiftへの接続確認を行います。
テスト結果が画像のように「Successfully connected to the data store with connection Redshift connection.」と表示されれば、正常に接続されていることを確認できます。
以上で、Redshift Serverlessの作成とデータ連携設定は完了しましたので、
続いて、Glue Streaming ETL ジョブの作成を実施していきます。
Glue Streaming ETLジョブの作成
残りの以下の赤枠部分を作成していきます。
まずは、Glueジョブで Kinesis Data Streamsをソースとして利用するために、Glueデータカタログからデータベースを作成します。このデータベースは、取り込むストリーミングデータのメタデータ(テーブル定義など)を管理するための論理コンテナとして利用されます。
「Data Catalog」セクションの「Databases」メニューへと進んでいき、画面のフォームに任意の名前を入力し、
「Create database」をクリックするだけで作成できます。
続いて、取り込むデータ形式を定義するために Glueデータカタログ内にテーブルを作成します。
「Data Catalog」セクションの「Tables」メニューから新規作成に進み、作成するテーブルの名前と、
先ほど作成したデータベース test-database を指定します。
以下の画面では、ストリーミングデータの取り込み元とデータ形式を設定します。
- Data store(ソースの種類): Kinesis を選択します。
- Kinesis stream settings: my account を選択します。
- Region:Asia Pacific (Tokyo) を選択します。
- Kinesis stream name:対象のストリーム(test-streaming-data)を選択します。
- Data format(データ形式): Json を選択します。
そして、以下の画面では「Define or upload schema」を選択し、Kinesis Data Streamsから取り込まれるデータに合わせて、カラム情報を入力しテーブルを作成します。
データベースとテーブルの作成が完了したら、「ETL jobs」セクションの「Visual ETL」メニューへと進んでいき、Glueジョブを作成していきます。
まずは「Job details」タブの構成ですが、以下の3つの設定以外はデフォルトの設定としています。
- Name:ジョブの名前(ここでは kinesis Data-job)を入力します。
- IAM Role:IAMロールを指定します。Glue Connection作成の際と同様のものです。
- Worker type:最小構成の G 0.25X(2vCPU / 4GB RAM)を選択しています。
Typeは、データソースに Kinesisを指定すると、ジョブタイプは自動的に「Spark Streaming」として設定されます。
また、Worker typeの「G 0.25X」は、Data sourceノードには「Kinesis」を選択した後、選択できるようになりますので、ご注意ください。
また、ジョブ作成時には Glueによってスクリプトやストリーミング処理のチェックポイント情報などが保存される Amazon S3 パスが自動で設定されます。
必要に応じて、保存先の Amazon S3 バケットを任意の場所に変更してください。
「Job details」の設定が完了したら、Glue Data Studio(Visual)を使ってデータパイプラインの作成を進めます。
Data sourceノードには「Kinesis」を選択し、先ほど作成した Glueデータカタログのデータベースとテーブルを指定します。
正しくネットワーク設定 および IAM権限が構成されていれば「Data preview」タブで Kinesis Data Streams に取り込まれた
データの内容をリアルタイムで確認することができます。
このプレビューを利用することで、スキーマの整合性やストリーム内容を事前に検証しながら、
後続の変換処理や出力設定を進めることができます。
データ変換ステップでは、「Transform」ノードにて以下の2つの処理を追加します。
- UUID:各レコードに対して Glue 側で自動的に生成される 一意の識別子(UUID) を付与します。
- Drop Fields:Glue が Kinesis からデータを取り込む際に自動で付与されるシステムカラム "$record_timestamp" のメタ情報フィールドを 削除(ドロップ) します。
これらの変換を加えることで、データ分析に適した構造に整えることができます。
最後のステップとして出力先(ターゲット)ノードに Amazon Redshift を指定します。
以下のような設定を行っています。
- Redshift 接続方式:Direct data connectionを選択します。
- Redshift connection:作成した Glue Connection(Redshift Connection)を選択します。
- Database / Schema / Table:Redshift内で作成したDBなどを選択します。
- データベース:sensor_db
- スキーマ:sensor_schema
- テーブル名:sensor_data
- データ挿入方法:APPEND (Insert) を選択します。
画面下部の Output schemaセクションでは、sensorId、currentTemperature、status、uuid の4つのカラム構成が表示されており、データフロー全体の整合性も確認できます。
これで、データパイプラインが完成しましたので、画面右上の「Run」をクリックして実行していきます。
実行中のジョブは「Runs」タブで確認できます。
以上で、Glue Streaming ETL ジョブの作成は完了しましたので、
最後に Redshiftにデータが格納されているかを確認していきます。
Redshift内のデータ確認
データパイプラインの実行によって、Kinesis Data Streamsから取り込まれたデータが Redshiftに正しく格納されたかを確認するため、再び Redshiftコンソールにアクセスし、「クエリエディタv2」へと進みます。
クエリエディタ上では、対象のデータベースとテーブルを選択したうえで、以下のような集計クエリを実行することで、格納されたデータ件数を確認できます。
SELECT COUNT(*) FROM sensor_schema.sensor_data;
結果(Resultタブ)に、「2500」という結果が返ってきました。
続いて、以下のクエリにより、Redshiftに保存された最新の 100件のデータを確認します。
SELECT * FROM sensor_schema.sensor_data
LIMIT 100;
結果(Resultタブ)には、Kinesis Data Streams経由で取り込まれた sensorId、currentTemperature、statusに加え、
Glue側で付与された uuidも表示されており、一意なレコード識別子としての役割を果たしていることがわかります。
Glueジョブによって、Resultに書き込まれたレコードの確認できましたので、パイプラインが正しく動作しているかと思います。
以上で、Kinesis Data Streamsから Redshift Serverlessへ Glue Streaming ETL でつなぐパイプライン構築は完成となります。
まとめ
本記事では、Kinesis Data Streamsから Redshift Serverlessへのリアルタイムデータ連携を、
GlueのStreaming ETL機能を用いて構築する方法を紹介しました。
この構成の大きな魅力は、Apache Sparkのコードを一切書くことなく、GUIベースの Glue Data Studio上で、視覚的にストリーミング処理を設計・実装できる点にあります。
従来のように複雑な ETLロジックや接続処理をコードで記述する必要はなく、クリック操作と簡単な設定だけで、Kinesis Data Streamsの取り込みからデータ変換、Redshiftへの書き込みまでをスムーズに構築できます。
さらに、Redshift Serverlessを採用することで、クラスター管理やスケーリングを意識することなく、
クエリ実行ごとに従量課金での柔軟な分析基盤が実現できるのも大きな利点です。
ただし、前述のとおり、Glueはコードレスかつフルマネージドな利便性がある一方で、使用するユースケースや設定によっては、コストが割高になることもあります。たとえば、Amazon EMRのようなサービスを用いてストリーミング処理を構築した場合、管理負担は増えるものの、Glueよりもコストを抑えられるケースがあるというのも現実です。
そのため、用途や予算に応じて、実装のしやすさ・運用性・コストのバランスを見極めたサービス選定が重要になると思います。
では、また別の記事を執筆しますので、引き続きよろしくお願いいたします。
ありがとうございました。
クラウドインテグレーション部
クラウドソリューション2課
川井 康敬
アジアクエスト株式会社では一緒に働いていただける方を募集しています。
興味のある方は以下のURLを御覧ください。