GCP Composerでbotoファイルを使用してGCSからS3にファイルをコピーする方法
はじめに
GCP Composerは、Apache Airflowをベースとしたワークフロー管理ツールです。
GCP Composerでは、DAGと呼ばれるPythonスクリプトでタスクの実行順序や依存関係を定義できます。GCP Composerは、GCS(Google Cloud Storage)やS3などのクラウドストレージと連携するためのオペレータやセンサーやフックを提供しています。
↓詳しくはこちらをご覧ください。
今回は、GCP Composerでbotoファイルを使用してGCSからS3にファイルをコピーする方法を紹介します。
GCSからS3へのファイルコピー
GCSからS3へファイルコピーする方法いろいろ
GCP Composerを使用してGCSからS3にファイルをコピーするためには、いくつかのやり方があります。
GCSToS3Operatorを使う方法
GCStoS3Operatorは、Google Cloud StorageからAmazon S3にデータを転送するためのオペレーターです。
ただしこのオペレーターにはちょっとした特徴があり、コピーというよりも同期(Sync)という動きに近いです。
GCSとS3の階層がどちらも同じ階層の場合、同期する形でコピーすることが出来ます。
GCS
gs://my-gcs-bucket
/subdir
/20230101
aaa.txt
bbb.txt
/20230102
ccc.txt
ddd.txt
S3
gs://my-s3-bucket
/subdir
/20230101
aaa.txt
bbb.txt
↑
上記のような階層の場合、20230102配下をそのままS3にコピー(同期)したいという要件の場合はこのオペレータを使用することが可能となります。
BashOperatorを使用してgsutilコマンドを使う方法
BashOperatorは、Apache Airflowのオペレーターの1つであり、Bashコマンドを実行するために使用されます。
gsutilコマンドはLinuxローカル、GCS、S3、などいろいろなストレージに対応しています。ストレージの内容確認(gsutil ls)やコピー(gsutil cp)や移動(gsutil mv)などが柔軟に対応できます。
このBashOperatorの中からgsutilコマンドを使用することで自由にファイルをコピーすることができます。当ページではこのBashOperatorの中からbotoファイルとgsutilコマンドを使用してコピーする方法ついての説明となります。
botoファイルを使ってコピー
botoファイルとは、AWSのクレデンシャルや設定を記述したファイルです。botoファイルを使用することで、gsutilコマンドにAWSの認証情報を渡すことができます。
AWSのIAMでユーザーを作成し、アクセスキーIDとシークレットアクセスキーを取得します。このユーザーは、S3にアクセスできる権限が必要です。
次に、botoファイルを作成します。以下のように、AWSのクレデンシャルとS3の設定を記述します。
[Credentials]
aws_access_key_id = S3のアクセスキー
aws_secret_access_key = S3のシークレットキー
[s3]
use-signv4=True
host=s3.ap-northeast-1.amazonaws.com
このbotoファイルを、GCSにアップロードします。composerをセットアップした際、GCS上にDAGを置くためのバケットが自動的に作られたかと思います。その場所が
gs://dag-abcde-bucket/dag である場合、botoファイルを置くための最適な場所は
gs://dag-abcde-bucket/data です。
この場所はAirflow上では /home/airflow/gcs/data/として見えることが出来るので、先ほどのbotoファイルをsample.botoとしてgs://dag-abcde-bucket/data/sample.boto
へ置くとAirflow上からは/home/airflow/gcs/data/sample.boto として使用可能となります。
次に、GCP ComposerでDAGを作成します。以下のようなPythonスクリプトで、GCSからS3へのファイルコピーのタスクを定義します。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 3, 26),
}
dag = DAG(
'gcs_to_s3',
default_args=default_args,
schedule_interval='@daily',
)
copy_task = BashOperator(
task_id='copy_task',
bash_command='gsutil -m -o "Credentials:gs_service_key_file=/home/airflow/gcs/data/sample.boto" cp gs://my-gcs-bucket/aa.txt s3://my-s3-bucket/',
dag=dag,
)
このDAGでは、BashOperatorを使用してgsutilコマンドを実行しています。gsutilコマンドでは、-oオプションでbotoファイルのパスを指定しています。
このDAGをGCP Composerにデプロイすると、毎日1回、指定した時間にGCSからS3へのファイルコピーが実行されます。
以上が、GCP Composerでbotoファイルを使用してGCSからS3にファイルをコピーする方法となります。
まとめ
最近は当然のようにクラウド環境のストレージが使われてきています。社外に公開しアクセス制御も難しくないことから業務で使用されることも多く、会社間のファイルのやり取りにGCSやS3を使用する機会が増えてきました。
GCSToS3Operatorというのがもう少し柔軟に使えるようになれば良いのになぁと願っております。