処理完了を1時間前倒し!データパイプラインのイベント駆動化

こんにちは!YOUTRUSTでデータエンジニアをしている小林(YOUTRUST)です。

以前のブログで、2025年11月時点でのデータ基盤のアーキテクチャについて紹介しました。

tech.youtrust.co.jp

その中で、

S3からGCSへのデータ転送処理やdbtの処理が時間をトリガーとしているので、前の処理が終わってないのに次の処理が始まってしまって失敗する

といった課題を挙げていました。今回はこの課題を解決した話です。

cronベースのデータパイプライン (下記の図の赤枠で囲んだ部分)をイベント駆動化することで、処理間の待機時間を46分削減し、dbtの処理完了時刻を約1時間前倒しすることができました。

RDSのバックアップからdbt処理までのデータパイプラインをイベント駆動アーキテクチャに

Before: cronベースのデータパイプラインの問題点

まず、改善前のタイムラインを見てみます。

改善前のタイムライン

各処理が固定の時刻に実行されるため、前の処理が完了しても次の処理開始時刻まで待つ必要があります。そのため、以下のような3つの問題点がありました。

1. 無駄な待機時間

処理間で合計約46分の待機時間が発生していました。それぞれの処理でバッファを持たせた時間設定にしているため、実際の処理時間に関わらず固定の待機時間が発生していました。

2. データパイプラインの信頼性へのリスク

逆に前の処理が遅延したり失敗したりする場合、次の処理が不完全なデータで実行されるリスクがありました。それぞれが時刻をトリガーにしているため、依存関係が暗黙的で、問題が起きたときにどこまでうまくいっていたのかを特定するのが難しい状態でした。

3. 保守性の課題

データパイプラインの処理時間を調整する場合、前後の処理との依存関係を把握して手動で処理時間とバッファの時間を計算する必要がありました。「この処理は何分かかるから、次の処理は何時に設定して...」というのを毎回考えるのは、ミスの原因になります。

After: イベント駆動アーキテクチャ

これらの問題を解決するために、cronベースからイベント駆動のアーキテクチャに移行しました。

改善後は、各処理が前の処理の完了を検知してから次の処理を開始するようになりました。

AWS側(Step Functions)

  1. RDSバックアップの完了をポーリングで検知
  2. S3へのエクスポート処理を実行
  3. S3へのエクスポート完了をポーリングで検知
  4. 別のバケットへのコピー処理を実行
  5. Google CloudのData Transfer APIを呼び出し

Google Cloud側(Pub/Sub + Cloud Functions)

  1. Data Transferの完了通知をPub/Subで受信
  2. Cloud Functionsがdbt Cloud APIを呼び出し

イベント駆動化したデータパイプラインのアーキテクチャ

このイベント駆動アーキテクチャのポイントは、以下の3つです。

1. AWS Step Functionsによるポーリング待機

RDSバックアップやエクスポートの完了は、Step Functionsで30秒間隔のポーリングを行い検知しています。完了を検知したら即座に次の処理を開始するので、無駄な待機が発生しません。

2. Workload Identity Federationでクロスクラウド認証

AWS LambdaからGoogle CloudのData Transfer APIを呼び出す必要がありますが、ここではWorkload Identity Federationを使っています。サービスアカウントキー (JSONキー)を発行せずに、AWSのIAMロールをGoogle Cloudのサービスアカウントに紐づけることで、セキュアにクロスクラウド連携を実現しました。

3. Pub/Sub + Cloud FunctionsでELT処理を起動

Data Transfer Jobは完了時にPub/Subへ通知を送る機能があります。この通知をトリガーに、Cloud Functionsを起動しdbt Cloud APIを呼び出すことで、GCSへのファイル転送完了後すぐにdbtの処理を開始することができるようになりました。

改善による効果

改善後のタイムラインは以下のようになりました。改善前と比較すると分かりますが、dbtの処理完了時刻を約1時間前倒しすることができました。

改善前後のタイムライン比較

また、BigQueryのテーブルをSalesforce Marketing Cloudへ連携し、メール配信によるマーケティング施策を行っているのですが、そのためのデータ連携の時刻を早めることができるようになったため、ビジネス的にも大きなインパクトがありました。

技術的な工夫

今回の移行は、本番稼働中のデータパイプラインに対して行いました。一度に全部を変えるとリスクが大きいので、段階的に進めました。

  • Phase 1: AWS側のイベント駆動化(Step Functions導入)
  • Phase 2: Google Cloud側の連携(Cloud Functions + Pub/Sub)

また、移行期間中はcronベースをフォールバックとして残しておき、問題が起きたらすぐに戻せるようにしていました。

まとめ

cronベースのデータパイプラインをイベント駆動化することで、処理間の待機時間を46分削減し、dbtの完了時刻を約1時間前倒しできました。

今回は、AWS Step Functionsによるポーリング待機、Workload Identity Federationによるクロスクラウド認証、Pub/Subによるイベント連携といった技術を組み合わせてイベント駆動を実現しました。

前回のブログで挙げた課題を1つ解決できましたが、まだ「dbtテストの充足」や「モデルのDescription整備」などの課題は残っています。 引き続き改善を進めて、データ基盤をより使いやすく、信頼できるものにしていきたいです!