Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合を既存の項目に対して「初回ロード後に」実行してみた

    Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合を既存の項目に対して「初回ロード後に」実行してみた

    目次

      はじめに

      クラウドインテグレーション部の渡邊です。
      今回は、DynamoDBとOpenSearchでゼロETLを実現するにあたり既存のデータを反映するために気にする点を説明します。
      DynamoDBとOpenSearchでゼロETLの一般設定における注意点は記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点」で執筆しております。
      初回ロードで既存のデータをゼロETL統合する記事は見かけますが、初回にその設定を行わなかった場合はどうすればよいでしょうか。
      今回はその点に焦点を当てて執筆します。
      ※本記事は上記の記事で構築したものを前提として執筆している部分がございますのでご留意ください。

      まとめ

      • 初回ロードかどうかにかかわらず、既存の項目のゼロETLのために、少なくとも以下を実施する必要がある
        • ポイントインタイムリカバリ(PITR)の有効化
        • DynamoDBストリームの有効化(有効化しなくても実現できるかもしれないが、実践を想定するとするべき)
        • S3のエクスポートの権限を含むゼロETL用のIAMロールの作成
        • エクスポート用バケットの作成
      • 初回ロードではないかつ、既存のDynamoDBの項目を反映させるために、追加で以下の手順を実施する必要がある
        • テンプレートのexportフィールドに情報を追加する
        • 手動でS3へのエクスポートをする
        • インテグレーションのパイプラインを再起動する

      環境情報

      • OpenSearch 2.11

      やってみた

      DynamoDB

      前回の記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点」の設定では、最終的にDynamoDBのPITRを無効にしていたので再度有効化しておきます。
      既存の項目がDynamoDBに存在しているかつ、OpenSearchに反映されていない環境を作ります。

      テーブルに項目を追加

      以下の項目を作成します。
      DynamoDBストリームへの保存期間の影響でOpenSearchにこれらの項目が反映される恐れがあるので、項目追加時にはストリームを無効にしておきましょう。
      適用後は再度有効化しておきましょう。

      aws dynamodb put-item \
      --table-name watanabe-dy \
      --item \
      '{"id" : {"N": "1"}, "Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'

      aws dynamodb put-item \
      --table-name watanabe-dy \
      --item \
      '{"id" : {"N": "2"}, "Artist": {"S": "d"}, "SongTitle": {"S": "Howdy"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "2"}}'

      aws dynamodb put-item \
      --table-name watanabe-dy \
      --item \
      '{"id" : {"N": "3"}, "Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"}}'

      aws dynamodb put-item \
      --table-name watanabe-dy \
      --item \
      '{"id" : {"N": "4"}, "Artist": {"S": "Acme Band"}, "SongTitle": {"S": "PartiQL Rocks"}, "AlbumTitle": {"S": "Another Album Title"}, "Awards": {"N": "8"}}'

      aws dynamodb put-item \
      --table-name watanabe-dy \
      --item \
      '{"id" : {"N": "5"}, "Artist": {"S": "Acme Band"}, "SongTitle": {"S": "PartiQL Rocks2"}, "AlbumTitle": {"S": "Another Album Title2"}, "Awards": {"N": "8"}}'

      参考:ステップ 2: コンソールまたは AWS CLI を使用して、テーブルにデータを書き込みます


      S3バケット

      ゼロETLがクロスリージョンに対応していないことを考慮し、他の環境と同じリージョンにS3バケットを作成します。
      同リージョン、同アカウントにバケットを作成するため、バケットポリシーを付与しなくても機能します。

      インテグレーション

      今回は初回ロードによってDynamoDBの既存の項目をロードしない想定のため、あえてS3エクスポート用のフィールドexportを設定せずにインテグレーションを作成します。
      その後、テンプレートを修正し、再起動します。

      インテグレーション作成時に指定するロールの作成

      ゼロETLをするために必要なIAMロールを作成します。
      IAMロールの信頼ポリシーにOpenSearch Ingestion Pipelines用のサービスが選択できます。
      スナップショットをS3へエクスポートする点に注意して権限設定をします。

      {
      "Version": "2012-10-17",
      "Statement": [
      {
      "Sid": "allowRunExportJob",
      "Effect": "Allow",
      "Action": [
      "dynamodb:DescribeTable",
      "dynamodb:DescribeContinuousBackups",
      "dynamodb:ExportTableToPointInTime"
      ],
      "Resource": [
      "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
      ]
      },
      {
      "Sid": "allowCheckExportjob",
      "Effect": "Allow",
      "Action": [
      "dynamodb:DescribeExport"
      ],
      "Resource": [
      "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy/export/*"
      ]
      },
      {
      "Sid": "allowReadFromStream",
      "Effect": "Allow",
      "Action": [
      "dynamodb:DescribeStream",
      "dynamodb:GetRecords",
      "dynamodb:GetShardIterator"
      ],
      "Resource": [
      "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy/stream/*"
      ]
      },
      {
      "Sid": "allowReadAndWriteToS3ForExport",
      "Effect": "Allow",
      "Action": [
      "s3:GetObject",
      "s3:AbortMultipartUpload",
      "s3:PutObject",
      "s3:PutObjectAcl"
      ],
      "Resource": [
      "arn:aws:s3:::watanabe-sample-temp/export/*"
      ]
      },
      {
      "Effect": "Allow",
      "Action": "es:DescribeDomain",
      "Resource": "arn:aws:es:us-east-1:123456789012:domain/watanabe-opensearch"
      },
      {
      "Effect": "Allow",
      "Action": "es:ESHttp*",
      "Resource": "arn:aws:es:us-east-1:123456789012:domain/watanabe-opensearch/*"
      }
      ]
      }
      参考:

      ステップ 1: パイプラインロールを作成する
      Amazon DynamoDB OpenSearch での取り込みパイプラインの使用


      信頼ポリシーは以下の通りです。

      {
      "Version": "2012-10-17",
      "Statement": [
      {
      "Effect": "Allow",
      "Principal": {
      "Service": "osis-pipelines.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
      }
      ]
      }


      インテグレーションの作成

      新規作成する場合、設定は前回の記事「Amazon DynamoDBとAmazon OpenSearch ServiceのゼロETL統合における設定の注意点」を参考にしてください。
      テンプレートは以下の通りです。
      エクスポートの設定はあえてしておりません。

      version: "2"
      dynamodb-pipeline:
      source:
      dynamodb:
      acknowledgments: true
      tables:
      - table_arn: "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
      stream:
      start_position: "LATEST"
      aws:
      sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
      region: "us-east-1"
      sink:
      - opensearch:
      hosts:
      [
      "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com"
      ]
      index: "table-index"
      index_type: custom
      document_id: "${getMetadata(\"primary_key\")}"
      action: "${getMetadata(\"opensearch_action\")}"
      document_version: "${getMetadata(\"document_version\")}"
      document_version_type: "external"
      aws:
      sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
      region: "us-east-1"

      参考:opensearch

      この後、既存の項目もインポートできるようにインテグレーションのテンプレートの修正を行いますが、そちらは次章で設定します。

      動作確認

      OpenSearchと同じVPC内のEC2から動作確認をします。

      ※事前に以下のような環境変数を設定してください

      USERNAME=watanabe
      PASSWORD=PaSSW0rd!
      ENDPOINT=search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
      PIPELINE_ROLE=arn:aws:iam::123456789012:role/watanabe-es-etl-role

      その後、DynamoDB側にサンプルデータを追加し、ストリームにより新規の項目が追加されていることを確認しました。

      $ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/table-index/_search?pretty"
      {
      "took" : 82,
      "timed_out" : false,
      "_shards" : {
      "total" : 5,
      "successful" : 5,
      "skipped" : 0,
      "failed" : 0
      },
      "hits" : {
      "total" : {
      "value" : 2,
      "relation" : "eq"
      },
      "max_score" : 1.0,
      "hits" : [
      {
      "_index" : "table-index",
      "_id" : "7",
      "_score" : 1.0,
      "_source" : {
      "Artist" : "sampleartist",
      "Awards" : "1",
      "SongTitle" : "samplesong",
      "AlbumTitle" : "sampletitle",
      "id" : 7
      }
      },
      {
      "_index" : "table-index",
      "_id" : "0",
      "_score" : 1.0,
      "_source" : {
      "id" : 0
      }
      }
      ]
      }
      }

      現状は画像のようになっているため、既存のデータを反映させる必要があります。

      202405_zeroetl_01

      インテグレーションのテンプレートの修正

      反映させるためには、追加で以下の手順を実施する必要があります。

      • テンプレートのexportフィールドに情報を追加する
      • 手動でS3へのエクスポートをする
      • インテグレーションのパイプラインを再起動する(しばらく待てばできるかもしれないが、再起動が望ましい)

      テンプレートのexportフィールドに情報を追加するために、以下の通り修正します。

      version: "2"
      dynamodb-pipeline:
      source:
      dynamodb:
      acknowledgments: true
      tables:
      - table_arn: "arn:aws:dynamodb:us-east-1:123456789012:table/watanabe-dy"
      stream:
      start_position: "LATEST"
      export:
      s3_bucket: "watanabe-sample-temp"
      s3_region: "us-east-1"
      s3_prefix: "ddb-to-opensearch-export/"
      aws:
      sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
      region: "us-east-1"
      sink:
      - opensearch:
      hosts:
      [
      "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com"
      ]
      index: "table-index"
      index_type: custom
      document_id: "${getMetadata(\"primary_key\")}"
      action: "${getMetadata(\"opensearch_action\")}"
      document_version: "${getMetadata(\"document_version\")}"
      document_version_type: "external"
      aws:
      sts_role_arn: "arn:aws:iam::123456789012:role/watanabe-es-etl-role"
      region: "us-east-1"

      この状態で再起動したり、しばらく待ってもOpenSearch側には既存の項目が反映されていませんでした。
      また、S3にもエクスポートがされていませんでした。
      現時点(2024/2/29)では、初回で既存のデータを同期しない場合、手動でエクスポートする必要があると思われます。
      テンプレートの設定と同じフォルダ名にS3へのエクスポートをしてください。
      フォルダおよびオブジェクトがバケットにあることを確認してください。

       

      202405_zeroetl_02

       

      その後、しばらく待っても反映される様子がなかったため、インテグレーションを再起動(停止して開始)しました。
      すると、OpenSearch側で既存の項目が反映されておりました。

      $ curl -XGET -u "${USERNAME}:${PASSWORD}" -H "Content-Type: application/json" "https://${ENDPOINT}:443/table-index/_search?pretty"
      <略>
      "hits" : [
      {
      "_index" : "table-index",
      "_id" : "3",
      "_score" : 1.0,
      "_source" : {
      "id" : 3,
      "AlbumTitle" : "Songs About Life",
      "Awards" : 10.0,
      "Artist" : "Acme Band",
      "SongTitle" : "Happy Day"
      }
      },
      {
      "_index" : "table-index",
      "_id" : "5",
      "_score" : 1.0,
      "_source" : {
      "id" : 5,
      "AlbumTitle" : "Another Album Title2",
      "Awards" : 8,
      "Artist" : "Acme Band",
      "SongTitle" : "PartiQL Rocks2"
      }
      },
      {
      "_index" : "table-index",
      "_id" : "4",
      "_score" : 1.0,
      "_source" : {
      "id" : 4,

      "_id" : "7",
      "_score" : 1.0,
      "_source" : {
      "id" : 7,
      "AlbumTitle" : "sampletitle",
      "Awards" : "1",
      "Artist" : "sampleartist",
      "SongTitle" : "samplesong"
      }
      },
      {
      "_index" : "table-index",
      "_id" : "2",
      "_score" : 1.0,
      "_source" : {
      "id" : 2,
      "AlbumTitle" : "Somewhat Famous",
      "Awards" : 2,
      "Artist" : "d",
      "SongTitle" : "Howdy"
      }
      },
      {
      "_index" : "table-index",
      "_id" : "0",
      "_score" : 1.0,
      "_source" : {
      "id" : 0
      }
      },
      {
      "_index" : "table-index",
      "_id" : "1",
      "_score" : 1.0,
      "_source" : {
      "id" : 1,
      "AlbumTitle" : "Somewhat Famous",
      "Awards" : 1,
      "Artist" : "No One You Know",
      "SongTitle" : "Call Me Today"
      }
      }
      ]
      }
      }

       

      【参考】

      Amazon OpenSearch Ingestion パイプラインでサポートされているプラグインとオプション

      opensearch

      OpenSearch Ingestion パイプラインを Amazon DynamoDB で使用する

      その ETL パイプラインもういらないかも︖zero-ETL 総まとめ︕

      アジアクエスト株式会社では一緒に働いていただける方を募集しています。
      興味のある方は以下のURLを御覧ください。