クラウドインテグレーション部の渡邊です。
今回は、DynamoDBとOpenSearchでゼロETLを実現するにあたり既存のデータを反映するために気にする点を説明します。
DynamoDBとOpenSearchでゼロETLの一般設定における注意点は記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点」で執筆しております。
初回ロードで既存のデータをゼロETL統合する記事は見かけますが、初回にその設定を行わなかった場合はどうすればよいでしょうか。
今回はその点に焦点を当てて執筆します。
※本記事は上記の記事で構築したものを前提として執筆している部分がございますのでご留意ください。
前回の記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点」の設定では、最終的にDynamoDBのPITRを無効にしていたので再度有効化しておきます。
既存の項目がDynamoDBに存在しているかつ、OpenSearchに反映されていない環境を作ります。
以下の項目を作成します。
DynamoDBストリームへの保存期間の影響でOpenSearchにこれらの項目が反映される恐れがあるので、項目追加時にはストリームを無効にしておきましょう。
適用後は再度有効化しておきましょう。
aws dynamodb put-item \
--table-name watanabe-dy \
--item \
'{"id" : {"N": "1"}, "Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name watanabe-dy \
--item \
'{"id" : {"N": "2"}, "Artist": {"S": "d"}, "SongTitle": {"S": "Howdy"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "2"}}'
aws dynamodb put-item \
--table-name watanabe-dy \
--item \
'{"id" : {"N": "3"}, "Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"}}'
aws dynamodb put-item \
--table-name watanabe-dy \
--item \
'{"id" : {"N": "4"}, "Artist": {"S": "Acme Band"}, "SongTitle": {"S": "PartiQL Rocks"}, "AlbumTitle": {"S": "Another Album Title"}, "Awards": {"N": "8"}}'
aws dynamodb put-item \
--table-name watanabe-dy \
--item \
'{"id" : {"N": "5"}, "Artist": {"S": "Acme Band"}, "SongTitle": {"S": "PartiQL Rocks2"}, "AlbumTitle": {"S": "Another Album Title2"}, "Awards": {"N": "8"}}'
参考:ステップ 2: コンソールまたは AWS CLI を使用して、テーブルにデータを書き込みます
ゼロETLがクロスリージョンに対応していないことを考慮し、他の環境と同じリージョンにS3バケットを作成します。
同リージョン、同アカウントにバケットを作成するため、バケットポリシーを付与しなくても機能します。
今回は初回ロードによってDynamoDBの既存の項目をロードしない想定のため、あえてS3エクスポート用のフィールドexport
を設定せずにインテグレーションを作成します。
その後、テンプレートを修正し、再起動します。
ゼロETLをするために必要なIAMロールを作成します。
IAMロールの信頼ポリシーにOpenSearch Ingestion Pipelines用のサービスが選択できます。
スナップショットをS3へエクスポートする点に注意して権限設定をします。
{参考:
"Version": "2012-10-17",
"Statement": [
{
"Sid": "allowRunExportJob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeTable",
"dynamodb:DescribeContinuousBackups",
"dynamodb:ExportTableToPointInTime"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
]
},
{
"Sid": "allowCheckExportjob",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeExport"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy/export/*"
]
},
{
"Sid": "allowReadFromStream",
"Effect": "Allow",
"Action": [
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:GetShardIterator"
],
"Resource": [
"arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy/stream/*"
]
},
{
"Sid": "allowReadAndWriteToS3ForExport",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:AbortMultipartUpload",
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": [
"arn:aws:s3:::watanabe-sample-temp/export/*"
]
},
{
"Effect": "Allow",
"Action": "es:DescribeDomain",
"Resource": "arn:aws:es:us-east-1:123456789012:domain/watanabe-opensearch"
},
{
"Effect": "Allow",
"Action": "es:ESHttp*",
"Resource": "arn:aws:es:us-east-1:123456789012:domain/watanabe-opensearch/*"
}
]
}
ステップ 1: パイプラインロールを作成する
Amazon DynamoDB OpenSearch での取り込みパイプラインの使用
信頼ポリシーは以下の通りです。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "osis-pipelines.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
新規作成する場合、設定は前回の記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点」を参考にしてください。
テンプレートは以下の通りです。
エクスポートの設定はあえてしておりません。
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
stream:
start_position: "LATEST"
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
region: "us-east-1"
sink:
- opensearch:
hosts:
[
"https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com"
]
index: "table-index"
index_type: custom
document_id: "${getMetadata(\"primary_key\")}"
action: "${getMetadata(\"opensearch_action\")}"
document_version: "${getMetadata(\"document_version\")}"
document_version_type: "external"
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
region: "us-east-1"
参考:opensearch
この後、既存の項目もインポートできるようにインテグレーションのテンプレートの修正を行いますが、そちらは次章で設定します。
OpenSearchと同じVPC内のEC2から動作確認をします。
※事前に以下のような環境変数を設定してください
USERNAME=watanabe
PASSWORD=PaSSW0rd!
ENDPOINT=search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
PIPELINE_ROLE=arn:aws:iam::123456789012:role/watanabe-es-etl-role
その後、DynamoDB側にサンプルデータを追加し、ストリームにより新規の項目が追加されていることを確認しました。
$ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/table-index/_search?pretty"
{
"took" : 82,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "table-index",
"_id" : "7",
"_score" : 1.0,
"_source" : {
"Artist" : "sampleartist",
"Awards" : "1",
"SongTitle" : "samplesong",
"AlbumTitle" : "sampletitle",
"id" : 7
}
},
{
"_index" : "table-index",
"_id" : "0",
"_score" : 1.0,
"_source" : {
"id" : 0
}
}
]
}
}
現状は画像のようになっているため、既存のデータを反映させる必要があります。
反映させるためには、追加で以下の手順を実施する必要があります。
テンプレートのexport
フィールドに情報を追加するために、以下の通り修正します。
version: "2"
dynamodb-pipeline:
source:
dynamodb:
acknowledgments: true
tables:
- table_arn: "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
stream:
start_position: "LATEST"
export:
s3_bucket: "watanabe-sample-temp"
s3_region: "us-east-1"
s3_prefix: "ddb-to-opensearch-export/"
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
region: "us-east-1"
sink:
- opensearch:
hosts:
[
"https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com"
]
index: "table-index"
index_type: custom
document_id: "${getMetadata(\"primary_key\")}"
action: "${getMetadata(\"opensearch_action\")}"
document_version: "${getMetadata(\"document_version\")}"
document_version_type: "external"
aws:
sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
region: "us-east-1"
この状態で再起動したり、しばらく待ってもOpenSearch側には既存の項目が反映されていませんでした。
また、S3にもエクスポートがされていませんでした。
現時点(2024/2/29)では、初回で既存のデータを同期しない場合、手動でエクスポートする必要があると思われます。
テンプレートの設定と同じフォルダ名にS3へのエクスポートをしてください。
フォルダおよびオブジェクトがバケットにあることを確認してください。
その後、しばらく待っても反映される様子がなかったため、インテグレーションを再起動(停止して開始)しました。
すると、OpenSearch側で既存の項目が反映されておりました。
$ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/table-index/_search?pretty"
<略>
"hits" : [
{
"_index" : "table-index",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"id" : 3,
"AlbumTitle" : "Songs About Life",
"Awards" : 10.0,
"Artist" : "Acme Band",
"SongTitle" : "Happy Day"
}
},
{
"_index" : "table-index",
"_id" : "5",
"_score" : 1.0,
"_source" : {
"id" : 5,
"AlbumTitle" : "Another Album Title2",
"Awards" : 8,
"Artist" : "Acme Band",
"SongTitle" : "PartiQL Rocks2"
}
},
{
"_index" : "table-index",
"_id" : "4",
"_score" : 1.0,
"_source" : {
"id" : 4,
"_id" : "7",
"_score" : 1.0,
"_source" : {
"id" : 7,
"AlbumTitle" : "sampletitle",
"Awards" : "1",
"Artist" : "sampleartist",
"SongTitle" : "samplesong"
}
},
{
"_index" : "table-index",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 2,
"AlbumTitle" : "Somewhat Famous",
"Awards" : 2,
"Artist" : "d",
"SongTitle" : "Howdy"
}
},
{
"_index" : "table-index",
"_id" : "0",
"_score" : 1.0,
"_source" : {
"id" : 0
}
},
{
"_index" : "table-index",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"AlbumTitle" : "Somewhat Famous",
"Awards" : 1,
"Artist" : "No One You Know",
"SongTitle" : "Call Me Today"
}
}
]
}
}
【参考】
Amazon OpenSearch Ingestion パイプラインでサポートされているプラグインとオプション