GCP
PR

GCP Composer(Airflow)のstart_dateと日付マクロdsの考え方

ycsg113XYZ
記事内に商品プロモーションを含む場合があります
スポンサーリンク

GCP Composer(Airflow)とは

GCPガイドによりますと以下の記載があります。

Cloud Composer は、フルマネージドのワークフロー オーケストレーション サービスです。クラウドとオンプレミス データセンターにまたがるワークフローの作成、スケジューリング、モニタリング、管理ができます。

Cloud Composer は一般的な Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。

Cloud Composerガイド

要はApache Airflowの環境とかよくわからなくても大丈夫!サービスとして提供してますよ、ということになります。通常のApache Airflowとの違いはほとんどありません。

タイトルにGCP Composer(Airflow)とは書きましたが、以下の記事は通常のオープンソースでのApache Airflowと同じとお考え頂ければと思います。

Airflowの日付問題

Airflowで良くある混乱の原因となっているのが、start_dateと日付マクロのdsです。さらに追い打ちをかけるのがComposerのAirflowでは日時の取り扱いがUTC基準になっていることです。(オープンソースのApache Airflowでは日本時間にするやり方があるらしい、、、Composer Airflowでも是非いつかやれるようにしたい、、)

詳細を追っていきましょう。

バッチ処理の開始日 start_date

AirflowのDAGのstart_dateとは以下のように指定し、バッチ処理の開始日となります。

# このDAGを設置した時の3日前を指定する方法
'start_date': airflow.utils.dates.days_ago(3),

# 直接指定する方法
'start_date': datetime(2022, 10, 2),

start_dateに未来日を設定した場合は、その日から(正確にはその日ではなく)、過去日を設定した場合には、過去に遡って実行されます。

この時、このstart_dateの設定について、バッチ処理によってはそんなに気を使わなくて良い場合が多いです。

気を使わないといけないパターンとしましては、

  • お客様先へデータ連携する日が決まっている。(未来日)
  • 過去データを作らないといけないが、この日の分から作るというのが決まっている。

というような時にstart_dateをきっちり設定しなければなりません。

例えば、2022/10/2に初回バッチを起動したい場合、単純に

・・・・
'start_date': datetime(2022, 10, 2),
・・・・

というように記載しても10/2には動作しないです。これが罠なのです。

実は、start_dateにバッチ実行間隔(schedule_interval)を加えたを日時が初回実行日時なのです。

Airflowの初回実行日時は、start_date+バッチ実行間隔(schedule_interval)である

つまり、裏を返せば

start_dateとは

起動させたい日からバッチ実行間隔(schedule_interval)を引いたもの

になります。

例えば、「毎日10時に動作させたいバッチ処理を10/2から起動したいなぁ」となった場合、start_dateとschedule_intervalは次のようになります。

# JST10時(UTC1時)起動の日次バッチ処理で初回10/2に実行させる
・・・・
'start_date': datetime(2022, 10, 1),
・・・・
schedule_interval="0 1 * * *",
・・・・

↑日本時間10時に実行したいのでUTC時間1時を設定する

バッチ実行日時の日付部分が格納される日付マクロ ds

airflowの日付マクロ ds の説明には実行日(YYYY-MM-DD)という記載があります。

dsなどの日付マクロがairflowには用意されており、たとえ過去日をリトライしてもその時のdsの値でバッチ処理が動作するため、とても重要な要素となります。

ですので以下のような記載をして、実行日を入力や出力先の条件として使うことは良くあります。(以下は dsのハイフンを除外したds_nodashを使用してます)

task1= BigQueryOperator(
    task_id="task_01",
    sql="./sql/mysample.sql",
    use_legacy_sql=False,
    destination_dataset_table="myprj.mydataset.mytable_{{ds_nodash}}", 
    write_disposition="WRITE_TRUNCATE",
    dag=dag
)

ここでも落とし穴がありまして、{{ ds }}や{{ ds_nodash }}が示す日は、先ほどのstart_dateと同様にschedule_intervalが関係してきます。

例えば、毎日11時(JST)にSQL(mysample.sql)を実行し、その結果をmytable_YYYYMMDD(実行日付)に格納したい!となった場合、

・・・・
schedule_interval="0 2 * * *",   #11時JST
・・・・

task1= BigQueryOperator(
    task_id="task_01",
    sql="./sql/mysample.sql",
    use_legacy_sql=False,
    destination_dataset_table="myprj.mydataset.mytable_{{ds_nodash}}", 
    write_disposition="WRITE_TRUNCATE",
    dag=dag
)

のようにしてしまいがちなのですが、{{ds_nodash}}には実行日の前日の日付が入ってしまいます。

  • 10/2 AM11時の実行時には 20221001
  • 10/3 AM11時の実行時には 20221002

ds には、前回のバッチ動作日(今回のバッチ動作日からスケジュール間隔を引いた日付)が入る

バッチ実行時の日付を取得するにはどうすれば?

前述のとおり、{{ds}}は、「前回のバッチ実行日」が格納されます。

前回じゃなくて今回のバッチ実行日が取りたいんだよ!ってことが普通ですよね。なんでこの仕様なんだろう、、、不思議です。

今回のバッチ実行日は、前回のバッチ実行日を示す{{ds}}に、スケジュール間隔(schedule_interval)を追加してあげればよいのですが、もっと良い方法があります。

Airflowには{{next_ds}}という「前回のバッチ実行の次の実行日」というのが取得できます。

つまり {{next_ds}}が今回の実行日なのです!

next・・・なのに今回、、、うーむ違和感。

まとめ

Airflowの日付

1.start_date

  • Airflowの初回実行日時は、start_date+バッチ実行間隔(schedule_interval)である
  • start_dateは、バッチ処理を起動させたい日からバッチ実行間隔(schedule_interval)を引いたもの

2.日付マクロ ds

  • ds には前回のバッチ実行日(今回のバッチ実行日からバッチ実行間隔(schedule_interval)を引いたものが入る。
  • 今回のバッチ実行日を使用したい場合、next_dsを使用する。
スポンサーリンク
ABOUT ME
ひろし
ひろし
都内在中のなんちゃってSE。ギリギリPG。の私が管理者のひろしです。 日々夜遅くまで仕事に追われています。 今日は早く帰りたい・・・
スポンサーリンク
記事URLをコピーしました