AQ Tech Blog

Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点

作成者: tsuyoshi.watanabe|2024年03月25日

はじめに

クラウドインテグレーション部の渡邊です。
今回は、DynamoDBとOpenSearchでゼロETLを実現する際の設定における注意点を構築手順とともに説明します。
本記事では、DynamoDBの追加項目をOpenSearchのインデックスとしてゼロETL統合することを目的としています。

まとめ

  • OpenSearch
    • きめ細かなアクセスコントロールでOpenSearchを制御している場合は、インテグレーション用のIAMロールをマッピングする
  • DynamoDB
    • PITRは必須な設定とは限らない
  • インテグレーション
    • サービスロール未作成による初回作成エラーに注意する
    • OpenSearchとAWSのドキュメントでインテグレーションのテンプレートの設定可能な値に差異がある
    • sts_role_arnsourcesinkで同じものを指定する必要がある
    • 必須フィールドは以下
      • source
        • dynamodb
          • table_arn
          • streamかexportいずれか
        • aws
          • sts_role_arn
          • region
      • sink
        • opensearch
          • hosts
        • aws
          • sts_role_arn
          • region

環境情報

  • OpenSearch 2.11

やってみた

OpenSearchの設定

OpenSearchの作成では以下を設定します。

  • きめ細かなアクセスコントロール有効化およびマスターユーザの作成
  • ドメインアクセスポリシーにおいて「きめ細かなアクセスコントロールのみを使用」を選択

DynamoDBの設定

DynamoDBの作成では以下を設定します。

  • 正しい、パーティションキーの指定
  • PITRの有効化
  • DynamoDBストリーム有効化

後に作成する「インテグレーション」のためにPITRおよびDynamoDBストリームを有効化します。

  • PITRの有効化

作成時に、以下のようにインフォが出てたので、有効化しました。
※[バックアップ]タブからも可能

なお、有効化しないとインテグレーション作成時に以下のような警告が出ます。

※「インテグレーション」と「統合」は同義

  • DynamoDBストリーム有効化

[エクスポートおよびストリーム]タブの[DynamoDB ストリームの詳細]から有効化しました。


「新旧イメージ」を表示タイプとして選択しました。

表示タイプ

  • キー属性のみ
    • 変更された項目のキー属性のみ
  • 新しいイメージ
    • 変更後に表示される項目全体
  • 古いイメージ
    • 変更される前に表示される項目全体
  • 新旧イメージ
    • 変更された項目の新しいイメージと古いイメージの両方


インテグレーション

インテグレーションの作成画面を見るとわかりますが、以下のように手順の案内があり、助かります。

DynamoDBのテーブルの作成は完了しているので、画像にあるステップ2のIAMロールの作成に着手します。

インテグレーション作成時に指定するロールの作成

ゼロETLをするために必要なIAMロールを作成します。

IAMロールの信頼ポリシーのサービスとして「OpenSearch Ingestion Pipelines」が選択できます。

権限は以下のように設定しました。
参考にしたドキュメントにはスナップショットをS3へエクスポートする想定でS3系の権限を入れていると思いますが、そのポリシーを含めずに構築します。

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowRunExportJob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
]
},
{
"Sid": "allowCheckExportjob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeExport"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy/export/*"
]
},
{
"Sid": "allowReadFromStream",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy/stream/*"
]
},
{
"Effect": "Allow",
"Action": "es:DescribeDomain",
"Resource": "arn:aws:es:us-east-1:123456789012:domain/watanabe-opensearch"
},
{
"Effect": "Allow",
"Action": "es:ESHttp*",
"Resource": "arn:aws:es:us-east-1:123456789012:domain/watanabe-opensearch/*"
}
]
}

参考:
ステップ 1: パイプラインロールを作成する
Amazon DynamoDB OpenSearch での取り込みパイプラインの使用

信頼ポリシーは以下の通りです。

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "osis-pipelines.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}


インテグレーションの作成

DynamoDBの左ペインから「インテグレーション」を選択して作成します。
以下のようなブループリントがすでに用意されています。

version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
# REQUIRED: Supply the DynamoDB table ARN and whether export or stream processing is needed, or both
- table_arn: "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
# Remove the stream block if only export is needed
stream:
start_position: "LATEST"
# Remove the export block if only stream is needed
export:
# REQUIRED for export: Specify the name of an existing S3 bucket for DynamoDB to write export data files to
s3_bucket: "<<my-bucket>>"
# Specify the region of the S3 bucket
s3_region: "<<us-east-1>>"
# Optionally set the name of a prefix that DynamoDB export data files are written to in the bucket.
s3_prefix: "ddb-to-opensearch-export/"
aws:
# REQUIRED: Provide the role to assume that has the necessary permissions to DynamoDB, OpenSearch, and S3.
sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
# Provide the region to use for aws credentials
region: "us-east-1"
sink:
- opensearch:
# REQUIRED: Provide an AWS OpenSearch endpoint
hosts:
[
"<<https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com>>"
]
index: "table-index"
index_type: custom
document_id: "${getMetadata(\"primary_key\")}"
action: "${getMetadata(\"opensearch_action\")}"
document_version: "${getMetadata(\"document_version\")}"
document_version_type: "external"
aws:
# REQUIRED: Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>"
# Provide the region of the domain.
region: "<<us-east-1>>"
# Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
# serverless: true
# serverless_options:
# Specify a name here to create or update network policy for the serverless collection
# network_policy_name: "network-policy-name"
# Enable the S3 DLQ to capture any failed requests in an S3 bucket. This is recommended as a best practice for all pipelines.
# dlq:
# s3:
# Provide an S3 bucket
# bucket: "your-dlq-bucket-name"
# Provide a key path prefix for the failed requests
# key_path_prefix: "dynamodb-pipeline/dlq"
# Provide the region of the bucket.
# region: "us-east-1"
# Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
# sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"

これを参考に以下の設定を作成しました。
少なくとも必須のフィールドは以下である点に留意します。

  • source
    • dynamodb
      • table_arn
      • streamかexportいずれか
    • aws
      • sts_role_arn
      • region
  • sink
    • opensearch
      • hosts
    • aws
      • sts_role_arn
      • region
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
stream:
start_position: "LATEST"
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
region: "us-east-1"
sink:
- opensearch:
hosts:
[
"https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com"
]
index: "table-index"
index_type: custom
document_id: "${getMetadata(\"primary_key\")}"
action: "${getMetadata(\"opensearch_action\")}"
document_version: "${getMetadata(\"document_version\")}"
document_version_type: "external"
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
region: "us-east-1"

参考:opensearch

設定中に、パイプラインの設定が問題ないかを確認できます。
成功すると以下のように表示されます。

マスターユーザのユーザ名(username)とパスワード(password)を設定に含めたところ、無効なパラメータである旨のエラーが出ました。

"opensearch.username" is an invalid parameter, and should be removed for Amazon OpenSearch Ingestion pipelines using an opensearch sink: "$['dynamodb-pipeline']['sink'][0]['opensearch']['username']".
"opensearch.password" is an invalid parameter, and should be removed for Amazon OpenSearch Ingestion pipelines using an opensearch sink: "$['dynamodb-pipeline']['sink'][0]['opensearch']['password']".


設定の要件と制限にも記載がある通り、パイプラインの設定としてユーザ名とパスワードをサポートしていません。
設定のドキュメントはOpenSearchドキュメントで記載のものと差異があるようです。

次のオプションはサポートされていません。

  • username
  • password

各ロールが同じではないとエラーが発生しますので、各フィールドで設定するsts_role_arnを同じにしましょう。

パイプラインの構成の検証に失敗しました
You must specify the same STS role ARN in all pipeline components. You have passed 2 different roles: [arn:aws:iam::123456789012:role/watanabe-es-etl-role, arn:aws:iam::123456789012:role/watanabe-wes-etl-role]

インテグレーションは簡単にするためにパブリックとし、ログを確認できるよう[ログ発行オプション]から、CloudWatch Logsの設定をしました。

初回作成を試みると、サービスロールのエラーが発生しました。

ApiError: Unable to perform "sts:assumeRole" for Service Linked Role "arn:aws:iam::123456789012:role/aws-service-role/osis.amazonaws.com/AWSS1erviceRoleForAmazonOpenSearchIngestionService". Ensure that the IAM role exists and has a trust relationship with "osis.amazonaws.com".

もう一度インテグレーションの作成を試みると、成功しました。
作成をクリックしたタイミングでサービスロールが作成されていたため、そちらが関係しているかもしれません。

動作確認

基本的な設定が完了したので動作確認をします。
OpenSearchと同じVPC内にEC2を作成しました。

※事前に以下環境変数を設定してください

USERNAME=watanabe
PASSWORD=PassW0rd!
ENDPOINT=vpc-watanabe-opensearch-test-rtlyozeemfsveugyewbiypbmqi.us-east-1.es.amazonaws.com
PIPELINE_ROLE=arn:aws:iam::123456789012:role/watanabe-es-etl-role

初期状態を見ると、作成されているインデックスがありません。

$ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/_cat/indices?pretty"

green open .plugins-ml-config _g-2jquHTRqbl37rVmGbpQ 5 0 1 0 4.7kb 4.7kb
green open .opensearch-observability jo45Iz9rTFydjXQrhYVbXA 1 0 0 0 208b 208b
green open .opendistro_security I0w50p3IRx-yTGrFwlM4ew 1 0 10 1 53.7kb 53.7kb
green open .kibana_1 CSf2fqrBSUCKzA9wobM98Q 1 0 0 0 208b 208b

CloudWatch Logsを見るとOpenSearchへの権限がないことが原因だとわかります。

2024-02-05T10:21:21.334 [Thread-11] WARN  org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to initialize OpenSearch sink with a retryable exception. 
org.opensearch.client.opensearch._types.OpenSearchException: Request failed: [security_exception] no permissions for [cluster:monitor/state] and User [name=arn:aws:iam::123456789012:role/watanabe-es-etl-role, backend_roles=[arn:aws:iam::123456789012:role/watanabe-es-etl-role], requestedTenant=null]

権限エラーに対処するため、ロールマッピングを行います。

curl -XPUT -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/_plugins/_security/api/rolesmapping/all_access" -d "{\"backend_roles\" : [ \"${PIPELINE_ROLE}\" ],\"users\" : [ \"${USERNAME}\"]}"

完了後すぐにインデックスを確認すると、想定のインデックスtable-indexが作成されておりました。

curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/_cat/indices?pretty"

green open .plugins-ml-config _g-2jquHTRqbl37rVmGbpQ 5 0 1 0 4.7kb 4.7kb
green open .opensearch-observability jo45Iz9rTFydjXQrhYVbXA 1 0 0 0 208b 208b
yellow open table-index RiiJJCUUTk--wZnW5yhyeQ 5 1 0 0 416b 416b
green open .opendistro_security I0w50p3IRx-yTGrFwlM4ew 1 0 10 2 61.8kb 61.8kb
green open .kibana_1 CSf2fqrBSUCKzA9wobM98Q 1 0 0 0 208b 208b

その後、DynamoDB側にサンプルデータを追加しました。
idが7の項目を1つ作成したら7秒(項目ごとに秒数の揺れあり)くらいでOpenSearch側に反映されました。

$ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/table-index/_search?pretty"
{
"took" : 82,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "table-index",
"_id" : "7",
"_score" : 1.0,
"_source" : {
"Artist" : "sampleartist",
"Awards" : "1",
"SongTitle" : "samplesong",
"AlbumTitle" : "sampletitle",
"id" : 7
}
},
{
"_index" : "table-index",
"_id" : "0",
"_score" : 1.0,
"_source" : {
"id" : 0
}
}
]
}
}

実は、DynamoDBのテーブルを作成した段階で項目をいくつか作成しておりましたが、それらは反映されていませんでした。
これらを反映させるためには、以下を満たすべきであると思われます。

  • PITRの有効化
  • インテグレーション用IAMロールにS3用の権限を追加
  • エクスポート用のS3バケットの作成

これらは別記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合を既存の項目に対して実行してみた」で検証しようと思います。

データはDynamoDBストリームによってOpenSearchに反映されると思われるため、PITRを無効にしても、DynamoDBへの追加の項目はOpenSearchに反映されると思われます。

Ingestion OpenSearch と DynamoDB の統合を使用して、スナップショットなしでイベントをストリーミングすることもできます。 他のメカニズムからの完全なスナップショットが既にある場合、または DynamoDB Streams を使用して DynamoDB テーブルから現在のイベントをストリーミングするだけの場合は、このオプションを選択します。 このオプションを選択した場合、必要なのは、テーブルで DynamoDB ストリームを有効にすることだけです。

Amazon DynamoDB OpenSearch での取り込みパイプラインの使用

PITRを無効化後にしばらく待ち、idが8の項目を追加すると、実際にOpenSearchのインデックスに追加されていることが確認できました。

$ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/table-index/_search?pretty"
{
"took" : 588,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "table-index",
"_id" : "7",
"_score" : 1.0,
"_source" : {
"Artist" : "sampleartist",
"Awards" : "1",
"SongTitle" : "samplesong",
"AlbumTitle" : "sampletitle",
"id" : 7
}
},
{
"_index" : "table-index",
"_id" : "8",
"_score" : 1.0,
"_source" : {
"Artist" : "PITRtest",
"id" : 8
}
},
{
"_index" : "table-index",
"_id" : "0",
"_score" : 1.0,
"_source" : {
"id" : 0
}
}
]
}
}


【参考】

Amazon OpenSearch Ingestion パイプラインでサポートされているプラグインとオプション

opensearch

Amazon DynamoDB OpenSearch での取り込みパイプラインの使用