- データ供給先が増える
- アイデア:Consumerの追加と役割の追加
- Consumerを増やす?
- 1つのConsumerの役割を増やす?
- 必ずしも全件必要ということではない
- Consumerがフィルタリング、SNS-Lambdaで後続処理
- Consumerを追加&&拡張ファンアウトを割り当て
- フィルタリング用Query DSLをGUIで設定できるように
- まとめ
登壇者はこの方

佐野 玄 氏
株式会社日本経済新聞社
プラットフォーム推進室 データインテリジェンスグループ
部長
データ基盤「Atlas」の設計・開発や、ダッシュボード、リアルタイムMAの開発などを通じてデータドリブンな文化の醸成を推進。現在はエンジニア、アナリスト、サイエンティストからなるデータインテリジェンスグループ(DIG)のマネジメントを担当しながら、社内データサイエンティストのEMを兼任。
これまでSaaSのポストセールス、広告代理店のコンサルタント、事業会社のデベロッパーとしてデータとマーケティングの重なる領域でキャリアを重ねる。
佐野:本日は「リアルタイム行動ログを付加価値に繋げる、ファンアウトの試行錯誤」というタイトルで発表したいと思います。データ分析基盤という意味では、複数のデータベースを使い分け、ビジネス要件に応じて複数の可視化ツールを使い分けています。
同時に、分析から得られた示唆や、分析を踏まえ取り組んだデータサイエンスの成果物をマーケティング施策に使うような場合に、同じデータを様々なシステムで共有しています。私たちのデータ基盤は分析と施策のループを形作っています。
まず、チャレンジした背景として2016年にリアルタイムアナリティクスの基盤、今でいうカスタマーデータプラットフォーム(以下:CDP)を構築したことが挙げられます。
上記はログの処理部分だけを切り取った図ですが、左からSDKでお客様との接触履歴やサービス上での行動を計測し、それをデータの受け口であるエンドポイントに送り、そのあとストリームデータ処理を行い、最終的にはBigQueryやElasticsearch、SingleStoreへ格納しています。
BigQueryは大量のデータ処理に、Elasticsearchはログの柔軟性に、SingleStoreは高速処理にそれぞれ活用されています。日経ID連携している90以上のWebサービスとネイティブアプリのアクセス状況を計測しており、これらのツールを組み合わせることで、1,000人以上のデータ利用者が分析・可視化を行えています。
行動ログからはさまざまなことがわかります。例えば、以下のようなデータを取得しています。
- ページビュー
- クリック
- フォームの操作
- 動画・音声の再生、視聴時間
- スクロールの深さ
- 記事の読了率
商品を販売しているECサイトとは違い、メディアサイトではお客様からのフィードバックを得る機会はなかなかありません。そのような中でサービスの使われ方や読まれ方などパッシブな情報が「反応・反響」として得られています。
私たちはこうしたデータを活用して、さらにお客様に【価値】を届けたいと考えています。具体的には、記事やビジュアルジャーナリズムなどのコンテンツをよりよくすることに加え、データを活用した機能をプロダクトに組み込むことです。
例えば「閲読履歴機能」「レコメンデーション」「通知」などが挙げられています。これにより、顧客体験を向上させ、プロダクトの魅力を高めることができます。
また、パーソナライズされたプロモーションを通じて、自動化しつつも顧客一人ひとりに合わせた的確なコミュニケーションを実現することも目指しています。マーケティングの4R(Right Person、 Right Place、 Right Time、 Right Message)を追求し、次の施策を実施する上での示唆につなげることができるように日々試行錯誤しています。
データ供給先が増える
そうなると、データ供給先が増えます。
例えば他のレコメンドシステムやマーケティングオートメーションにデータを供給することになります。もちろんETLを使って定期バッチ処理はできますが、いい記事を読んで「別の関連する記事が読みたい!」と思っても、翌日に昨日読んだ記事のレコメンドが届いても既に興味は薄れているでしょう。
そういった時にリアルタイムデータをレコメンドやマーケティングオートメーションに使えたらいいなと考え、下図の流れになっています。
弊社の場合、ストリームデータ処理の流れではAWSのKinesisを使い、Consumerでデータを分岐させています。データの供給先を増やすときはConsumerを並列化してそれぞれ連携先を増やすというアプローチを考えました。
アイデア:Consumerの追加と役割の追加
では、どのような方法をとっていったか?
まず前提としてログと環境については下図の通りです。
ログとしてはボリュームが大きい気がしますが、設計当初のアプローチとして、行動ログに対して集計時に各種のメタデータを紐づけようとすると、毎回計算コストが膨大で時間もかかるため、コンピューティングコストよりストレージコストが安いと判断してPre‐JOINしてログを投入前に1件ずつ拡張しています。よってログ1件あたりのデータが大きくなっています。
Consumerを増やす?
最初に検討したのは、前述のとおりConsumerの追加でした。Kinesisのシャードは、Write:Readの比率が1:2なので、Producerが1つとするとConsumerは2つ使え、同数のシャードを追加するごとに単純にConsumerは2つずつ増やせるのではないか、と考えいました。
実際にConsumerを増やしてみると、メリット・デメリットがありました。
メリット
- Consumerの量産は簡単。Lambdaを作って、Kinesisとポチッと繋ぐだけ。
- シャード追加で水平スケールできるとすれば、全件処理する場合のLambdaの性能・負荷・コストは予測しやすい。
- Kinesis直結だから速い。
- それぞれIteratorがあり再処理しやすい。
デメリット
- ログの一部しか必要としない処理でも、全件読み出してとりあえずLambdaが発火することになり、無駄なコストもかかる?
- キャパがWrite 1:Read 2 だからといってConsumerを倍にできるわけではなかった(再処理などでスパイクする)
Consumerの量産は簡単で、Lambdaを作ってKinesisと繋ぐだけ。シャード追加で水平スケールできれば、Lambdaの性能や負荷、コストも予測しやすい。Kinesis直結なので処理も速く、Iteratorを使って再処理もしやすいです。
1つのConsumerの役割を増やす?
次に検討したのは、1つのConsumerから処理を分岐する方法でした。
新設するConsumerは一つにしてKinesisの読み出しを最小限にしつつ、Consumer(Lambda)に一人二役以上を担ってもらい連携先を広げることも考えました。
実際に試した結果、メリット・デメリットは以下の通りです。
メリット
- Consumer (Lambda) の発火は最小限、1つのレコードが1回処理されるだけなのでLambdaの費用は抑えられる。
- 必要なシャード追加も最小限なのでここもお得。
デメリット
- 連携先が増えるとLambdaの実行時間が延び、コストになる。
- 連携先が増える度にLambdaの改修とデプロイが必要になる。
- 連携先の一つに障害が発生した場合に、どこからどこまで再処理するか混乱する。(Iteratorが共通)
上記から、デメリットが大きいと感じたため、この方法は見送ることになりました。確かに、一見すると効率的に思えるかもしれませんが、長期的に見ると運用が複雑になり、コストも増大する可能性があります。システムは、常に変化していくものなので、柔軟性や拡張性を考慮した設計が重要だと改めて感じました。
必ずしも全件必要ということではない
ここで、改めて考えたのが、「ConsumerはKinesisから全ログレコードを受け取る必要があるのか? 」ということでした。確かに、分析用途でデータを格納するBigQueryやElasticsearchには全件連携したいです。しかし、レコメンデーションであれば記事の閲読履歴だけで十分ですし、マーケティングオートメーションであればランディングやコンバージョンだけ連携できれば良いです。つまり、連携先が必要としているログレコードは、必ずしも100%ではない、ということに気づいたのです。
例えば、レコメンデーションで使うのは、ログ全体の50%くらいです。マーケティングオートメーションに至っては、0.1%以下ほどです。
そこで考えたのが「Consumerにフィルター機能を持たせること」です。Consumerが必要なログレコードだけを選んで連携先に渡せば、無駄な処理を減らすことができるのではないかと思いました。これなら、必要なデータだけを効率的に処理できるので、コスト削減にも繋がると考えました。
Consumerがフィルタリング、SNS-Lambdaで後続処理
先ほどConsumerにフィルター機能を持たせるというアイデアについてお話しましたが、実際にそれを実装したのが、この「EventTrigger」というConsumerです。
このEventTriggerは、発火するたびにDynamoDBを参照し、QueryDSLとTopic ARNのセットを取得します。
そして、取得したQueryに一致するレコードをSNSトピックに連携します。つまり、EventTriggerがフィルターと分岐の役割を担い、必要なデータだけを必要な場所に送る、という流れになります。SNSトピックごとにSubscriberのLambdaがあり、そのLambdaが後続処理に繋いでいます。DynamoDBには、フィルター条件(QueryDSL)と連携先のSNSトピックなどを保存しています。これによって、柔軟に連携先を制御できるようになりました。
Consumerを追加&&拡張ファンアウトを割り当て
具体的な実装の流れは下図の通りです。
まず、連携先の拡張に対応するために、Consumerを1つ追加しました。Read TP、つまり読み込み処理のスループットについてはモニタリングしながらシャード数を調整し、最終的には既存の重要な処理を拡張ファンアウトコンシューマーに切り替えるなどして落ち着きました。具体的には、Elasticsearch連携用にConsumerを追加しました。それに加え編集業務に必要なリアルタイムダッシュボードのDBであるSingleStore連携用にもConsumerを追加し、こちらは拡張ファンアウトでキャパシティを確保しました。
そして、分岐処理用のConsumerも追加しました。ここの連携は、相対的に優先度が低いので、既存のキャパシティで動かすことにしました。
フィルタリング用Query DSLをGUIで設定できるように
マーケティングオートメーション(MA)の施策をリアルタイムデータに基づいて発火する機能も内製しましたが、施策の度にQueryDSLを手書きしてDynamoDBを更新していると手間がかかる・事故の元になりそうです。
これで、QueryDSLを手書きする必要がなくなり、GUI上で簡単に条件を設定できるようになりました。このGUI化によって、QueryDSLの設定ミスを防ぎ、より安全かつ効率的にマーケティング施策を実行できるようになりました。
まとめ
いくつか採用しているサービスがありますが、まとめると下図の通りです。
- ストリーム処理の根幹はSQSとKinesisで確実にデータを保持し、再処理耐性を高めています。
- ファンアウトは Consumerの追加か、単一Consumerから分岐してSNS経由の連携としました。
- メッセージサイズやレイテンシーは今回の用途では気にするような制約はありませんでした。
全体のまとめとしては以下の4つです。
1. Consumer側の要件に応じたサービス選択
- 全件に近いボリュームを連携する場合はKinesis Consumerを増やす
- 一部を連携する先が複数あるならば、分岐用Consumerを1つ作る
・ その後の連携はSNSまたはSQSを挟んで個別のSubscriberを用意しておく
2. KinesisのConsumer追加におけるシャード数に注意
- Write1:Read2だが再処理やスパイクではバランスが崩れる
- 重要なConsumerは拡張ファンアウトでキャパを確保する
3. 再処理の要否に応じた設計が必要
- KinesisはIteratorの打ち直しで過去に遡って再処理できる。保持期間も365日まで延長可能
- SNSやSQSは再試行の設定はできるが、連携先の障害などに対してはDLQを設けて保持する必要があり、データ保持期間の短さにも注意が必要
4. サービスの違いを理解する
- どれも似たことができるが微妙に異なるし、先々仕様が変わることもある
- 設計マージンになるのか無駄なコストになるのか…?
以上で、発表は終わりです。
▼▼ぜひ他登壇者の発表レポートもご覧ください
データ分析基盤 エンジニア勉強会
イベント中にあがったQ&Aはこちらをチェック!
パフォーマンスとコスト改善のために法人データ分析基盤をBigQueryで構築した話