機械学習基盤における Cloud Dataflow の活用

AUTHOR :   ギックス

Cloud Dataflow の使い所

Cloud Dataflow は GCP で提供されているサービスの一つで、フルマネージドな環境で ETL 処理を実行することができます。バッチ処理とストリーミング処理を統一的に扱うことのできる Apache Beam というフレームワークを使用しており、処理のパイプラインを記述するだけで、データ量に応じた分散処理を実現できるのが最大のメリットです。

機械学習基盤の構築においても、下記の例のように様々な場面で Cloud Dataflow が活用できる場面があります。

  • 大規模な学習・推論用データセットの前処理
  • TFX (TFDV) を利用したデータセットのスキーマ検証や統計情報の取得
  • ストリーミングデータに対するリアルタイムな推論処理のパイプライン構築

弊社の機械学習基盤では、先日公開した GKE の活用と合わせて、 Cloud Composer を用いて Cloud Dataflow の処理を管理しています。この記事では、Cloud Dataflow の活用事例について紹介します。

Apache Beam によるパイプラインの定義

Apache Beam は複数のプログラミング言語に対応しており、公式 SDK が提供されている Java (Kotlin)、Python、Go (Experimental) の他、Spotify が開発している Scala API の Scio などが利用できます。弊社では Cloud Composer で利用している Airflow や機械学習モデルの実装が全て Python であることから、開発の一貫性を重視して Python を採用しています。

Python SDK は昨年ごろまでは Python3 への対応が完全でなかったことから敬遠されがちな面もありましたが、Python2 の EOL に合わせて対応が進んだこともあり、現在は Python3 でも安定して使える状態になっています。

一例として、弊社で構築している Cloud Dataflow のパイプラインの一つに、学習の実行時に BigQuery のテーブルから学習用の CSV ファイルを生成する処理があります。 Cloud Composer から渡した引数を known_args として受け取ることで、任意の BigQuery のテーブルを CSV 化する処理を以下のように書くことが出来ます。(掲載のために一部抜粋の上改変しています)

FlexRS の活用

上記に挙げたようなアドホックな処理の他に、データセットを定期的に更新するような処理に対しても Cloud Dataflow は非常に有用です。定常的に必要な処理で、特に即時実行が求められないケースに対しては、 Flexible Resource Scheduling (FlexRS) を利用することで低コストでバッチ処理を実現することができます。

FlexRS には実行された後 6 時間以内のどこかで処理が行われる(即時実行されない)という制約がありますが、これを許容できるケースであれば、利用を検討してみてもよいかもしれません。

Cloud Composer との連携

Cloud Composer から Cloud Dataflow (Python) の処理を呼び出す場合、 DataflowTemplateOperatorDataFlowPythonOperator のどちらかを使うことになります。 DataflowTemplateOperator を使う場合は事前にテンプレートを作成しておく必要がありますが、 DataflowPythonOperator の場合は Python スクリプトを Composer の GCS に配置しておけば利用できます。テンプレートを利用すると Composer から動的なパラメータを渡すのが面倒になるため、 DataflowPythonOperator で呼び出す方式がおすすめです。

Cloud Composer から DataflowPythonOperator を使う場合の注意点として、 Composer および Airflow のバージョンが古い場合はデフォルトの python バージョンが Python2 系であることに注意する必要があります。(2020 年 1 月〜2 月のアップデートで Python3 系をデフォルトで使用できるようになりました。参考 1, 2)なお、公式の Airflow では、2020年4月現在でも master ブランチしか Python3 インタプリタでの実行に対応していないため、 DataflowPythonOperator を Python3 で利用する場合は Operator を自作する必要があります。(参考

また、 Cloud Dataflow で実行されるパイプラインの Apache Beam のバージョンは Composer にインストールされている apache-beam のバージョンに依存するため、開発時にはバージョンのずれがないよう注意が必要です。弊社では Terraform で以下のようにバージョンの管理を行っています。(該当部分のみ抜粋)

推論処理

ここまではデータの処理にフォーカスして活用例を挙げましたが、 Cloud Dataflow 自身に学習や推論の処理を行わせることも可能です。本記事では詳細は省きますが、こちらで紹介されているように任意のライブラリをインストールする方法を利用することで、 XGBoost や LightGBM などの軽量なモデルであれば推論処理をパイプライン上でそのまま行わせることも可能になります。また、 Tensorflow のモデルを利用した例については公式のソリューション例で紹介されているので、こちらを参考にしてみてください。

おわりに

Cloud Dataflow を利用することで、メンテナンスフリーなデータ処理基盤を構築できるようになりました。Apache Beam の記法は癖が強いため、慣れるまでは処理が書きづらい面もありますが、学習コストに見合うだけの利用価値があると感じています。

直近では Dataflow SQL のベータ提供も始まり、限定的ながら SQL でパイプラインを記述可能になっています。また、 Jupyter notebook によるインタラクティブなパイプライン構築や、 Flink for Kubernetes を利用した GKE 上での処理を実現するなど、より活用の幅が広がってきている印象があります。今後のリリースにも注目していきたい所です。


Masaaki Hirotsu
MLOps Div. 所属 / Kaggle Master
機械学習・データ分析基盤の構築に関わる事例や、クラウドを活用したアーキテクチャについて発信していきます。

SERVICE