GCP
PR

GCP Composer(Airflow)使ってみた

ycsg113XYZ
記事内に商品プロモーションを含む場合があります
スポンサーリンク

はじめに

わが社にGCP Composerを導入してはや3年。いいところも悪いところもありました。
使ってみた上でのノウハウを共有させて頂きたいと思います。

当記事ではGCP ComposerのAirflowを扱いますが、それ以外でのAirflowでもそんなに違いはないはずです

GCP Composer (Airflow)とは

Airflowとは

GCP Composerの説明をする前にまずはAirflowの説明ですね。

Airflowとは、もともとApacheが開発したオープンソースAirflowで、ワークフロー管理ツールです。ワークフローをプログラムで作成し、その実行をスケジューリングしたり、モニタリングを出来るものになってます。

Airflowでは「DAG」「ワークフロー」という言葉を良く使います。どちらも似たような意味で使い、依存関係のある処理同士をつなぎこんだものを指します。

Airflowとは これらのワークフロー を作成、実行、監視するためのプラットフォームとなります。Airflowを使うと、ワークフローが実装しやすく、わかりやすく、管理しやすくなります。この後に説明する「Operator」を使用すると、BigQueryのテーブル入出力、cloud Storageへの入出力、S3へのSync、など、様々なタスクを少ないコードで実装できます。また、ワークフローの途中でタスクがエラーで落ちた場合、管理画面からどこでエラーになったのかが視覚的にわかりますし、リトライの設定なども可能となります。

AirflowのワークフロープログラムはPython で実装されています。実装っていうよりもAirflowの設定ファイルがPython形式だって言うほうが正確かもしれません。Pythonに詳しくなくてもなんとかなります。サンプルいろいろ落ちてますし、そんな複雑なことはあまり書かなくて済みます。

GCP Composerとは

GCP Composerの特徴としては以下になるかと思います。

  • セットアップが簡単
  • Airflowの最新版を使用できる
  • マネージドサービスであり、サーバーの管理が不要

GCP内でAirflowを使おうとした場合にいろいろな方法があるかと思いますが、環境そのままそっくり作ってくれるやつが欲しいと思ったらCloud Composerになりますよね。(フルマネージドのワークフロー オーケストレーション サービスというらしいです。オーケストラなので作曲=Composerなんですかね)

ワークフローの作成、スケジューリング、モニタリング、管理がひととおりできます。

難点としましては、ちょっと運用費用がお高いところでしょうか、、

どんな機能がある?

Airflow管理コンソール

Airflow管理コンソールとは、Apache AirflowというワークフローオーケストレーションツールのWeb UIです。

Airflow管理コンソールを使用することで、ワークフローの可視化、タスクの実行状況のモニタリング、タスクのスケジュールの設定、タスク間の依存関係の定義などを行うことができます。

Airflowは、大規模なデータ処理プロジェクトにおけるワークフローの自動化やスケジュール管理に利用されます。Airflow管理コンソールは、Airflowの機能を設定したり、編集したりする場所でもあります。

Operatorご紹介

Airflowにはさまざまな種類のOperatorが用意されているため、タスクに応じたOperatorを選択することができます。

Operatorには、それぞれのタスクに必要な引数や設定がありますので、適切なものを選んで利用する必要があります。

以下に示すのは当社にて良く使用しているOperatorになります。

BashOperatorBashコマンドを実行することが出来ます。composerのAirflowの場合gcloud系のライブラリはあらかじめ入っているのでgsutilコマンドも使うことが出来ます
PythonOperatorPythonの関数を実行することが出来ます。あまり大きな重い処理は書かないようが良いです
SlackAPIPostOperatorSlackにメッセージを送ることが出来ます
BigqueryOperatorBigQueryでSelect文を発行しその結果をBigQueryのテーブルに出力することが出来ます
EmailOperatorメールを送信するOperatorです

当社では「BigqueryOperator」を良く使います

ひろし
ひろし

Sensorご紹介

Sensor(センサー)は Operator の一種で「とある条件が満たされるまで待つ」役割を担います。その「条件」は「ファイルが現れるまで」や「1時間が経ったら」など、様々な指定方法があります。

以下に当社で良く使用するSensorを示します。

TimeSensor指定の時間になるまで待ちます
ExternalTaskSensor他のDAGのタスクが完了するまで待ちます
BashSensorBashコマンドが成功するまで待ちます
PythonSensorPython関数が True を返すまで待ちます
GCSObjectExistenceSensorGCSにオブジェクトが現れるまで待つ

当社では「ExternalTaskSensor」を良く使います。なくてはならないSensorです。

ひろし
ひろし

Variables

Airflowには、任意の値を保存できるKey-Valueストアが用意されています。

このKey-Valueストアに保存される値は「Variable」と呼ばれ、ソース内に持たせたくない秘密情報の管理や外部パラメータとして変更する可能性のある設定値として使います。

当社ではComposerを2つ立て、1つを開発用としています。同じソースコードで動かすため環境の違いをVariablesにて設定しています。

ひろし
ひろし

日付マクロ

Airflowでは実行日を示す日付の変数があります。これを日付マクロと呼びます。

例えば、毎日動作するバッチのおとといの分を再度動かしたいとなった場合でも、そのおととい動作した時の日付とみなして動かすことが可能です。プログラム内ではシステム日付を使用せずにAirflowが準備してくれている日付マクロdsを使用すれば良いのです。

ただ、この日付マクロ、ちょっとくせがあります。以下の記事にて紹介しています。

あわせて読みたい
GCP Composer(Airflow)のstart_dateと日付マクロdsの考え方
GCP Composer(Airflow)のstart_dateと日付マクロdsの考え方

サンプルプログラム

BigQueryOperatorを使用したサンプルDAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 6, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='bigQuerySample', 
    default_args=default_args, 
    schedule_interval=timedelta(1)
)

t1 = BigQueryOperator(
    task_id='bigquery_test',
    sql='/sql/sample.sql',
    destination_dataset_table='testprj:testdataset.testtable',
    bigquery_conn_id='bigquery_default',
    dag=dag,
)

ソースの説明

  • depends_on_past・・・trueの場合、過去のタスクが成功してからじゃないと実行しません。
  • start_date・・・DAGの開始日を示します。(ただ、ちょっと単純ではないっす。基本1回前のスケジュール日になります。何を言っているかわからない場合、とりあえずやってみてください)
  • retries、retry_delay・・・エラーとなった時のリトライ回数と間隔です。
  • schedule_interval・・・実行間隔です。cron形式の記載方法もあります。
  • BigQueryOperatorのdestination_dataset_table・・・SQLを実行後、その実行結果を保存するテーブルです。
  • BigQueryOperatorのsql・・・GoogleStorageのdagディレクトリを起点とし、そこからの相対パスでsqlのファイル名を指定します。サブディレクトリも可能です。尚、SQLを直接記載する場合は「bql」というパラメータも準備されています。

ひろし
ひろし

おわりに

当社では約3年Composer(Airflow)を使用してきました。ではその生の感想を以下に示します。最初に書いてよという感じかとは思いますが、ここにに書きます。あと、普通のAirflowは使ったことないのでわからないです。あくまでもComposerのAirflowの感想です。

良い点

  • 管理がしやすい。バッチ処理が終わったかどうかが見た目でわかるので、赤くなっていたら「あ、エラーだな」とわかるし、とても良い。
  • BigQueryで多分混雑?しててSQLエラーになって、リトライすると成功することって良くある。リトライ処理はDAGに回数を設定しておくだけでAirflowがやってくれるので自前でロジック組まなくて良いので助かる
  • 再実行したりするのが簡単。管理コンソールから「Clear」ってするだけ。

良くない点

  • ソース変更した時にそれが反映されるまで(Airflow管理コンソール上に反映されるまで)に5分くらいかかる。急いでいる時ほど遅く感じる
  • Composerの一時停止が出来ない。本番は24時間稼働で良いが開発環境は昼間だけ動かしたいとかが出来ない。
スポンサーリンク
ABOUT ME
ひろし
ひろし
都内在中のなんちゃってSE。ギリギリPG。の私が管理者のひろしです。 日々夜遅くまで仕事に追われています。 今日は早く帰りたい・・・
スポンサーリンク
記事URLをコピーしました