Airfowの紹介
Airfowは2014年にAirbnbで開発され、現在はApacheソフトウェア財団のインキュベーションプログラムに参加しているジョブスケジューラープラットフォームです。
以前はAirbnb Airflowの名称でしたがApache Airflowに変更されています。ライセンスはApacheライセンス2.0です。
Airflowの特徴
ワークフローをPythonでプログラミングする事で、コードベースのメンテナンス、共同編集、バージョン管理等を行います。
主要な機能を列挙します。
- スケールアウト
- リモートログ指定
- ウェブUI・コマンドラインインターフェイス
- ジョブ実行状況のツリービュー
- ガントチャート(ボトルネックのジョブを特定するための)
- タスクの期間指定
- ウェブUIからの変数管理
- SLAの測定と通知
- API連携
DAG(Directed Acyclic Graph)について
Airflowは一般的にジョブに該当する概念をDAG(Directed Acyclic Graph)と呼ぶアイデアで管理します。DAGは実行可能なタスクの集合で、依存関係やリレーションを整理したものです。
多くのジョブスケジューラーは複数タスクをワークフロー上で順序立てて成功・失敗を管理します。AirflowはDAGのワークフロー実行方法に合わせてそれぞれのタスクを独立させて実行できます。
ジョブが管理する複数のタスクを実行する方法には順次実行や並列処理がありますが、Airflowではこれらを組み合わせた実行方法や、例としてリトライに合わせて実行したり他タスクを監視したりと、DAGで管理しているタスクを柔軟に実行できるのが特徴です。
インストール手順
Ubuntu16.04 LTSにairflowをインストールする手順を説明します。
まずpipをインストールします。
$ sudo apt install python-pip
pipを最新版に更新します。
$ pip install --upgrade pip
AirflowのHOMEを指定します。ここにデータベースや設定ファイルが格納されます。
$ export AIRFLOW_HOME=~/airflow
pipを利用してairflowをインストールします。ここでは拡張機能を有効にせずインストールを続けます。
$ sudo pip install airflow
拡張機能を最初から利用するには以下のように[]で囲んで指定します。
$ sudo pip install "airflow[s3, postgres]"
airflowコマンドが実行できバージョンが表示されればインストールに成功しています。この記事中では1.8.0がインストールされました。
$ airflow version
もしairflowコマンドが見つからないとエラーが表示される場合は、一度pip uninstallで削除してから再度sudoを付けなおしてインストールしなおして下さい。
$ sudo pip uninstall airflow
データベースを初期化します。
$ airflow initdb
実行するとSQLiteのデータベースがAirflowのHOME以下に作られました。
DB: sqlite:////home/user/airflow/airflow.db
次にウェブサーバーを8080ポートで立ち上げます。
$ airflow webserver -p 8080
以下エラーが表示されてしまいました。
================================================================= Traceback (most recent call last): File "/usr/local/bin/airflow", line 28, inargs.func(args) File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 791, in webserver gunicorn_master_proc = subprocess.Popen(run_args) File "/usr/lib/python2.7/subprocess.py", line 711, in __init__ errread, errwrite) File "/usr/lib/python2.7/subprocess.py", line 1343, in _execute_child raise child_exception OSError: [Errno 2] No such file or directory
gunicornコマンドを利用できない事が原因のように見えるのでapt経由でgunicornをインストールします。
$ sudo apt install gunicorn $ airflow webserver -p 8080
今度はError: ‘airflow.www.gunicorn_config’ doesn’t exist とエラーが表示されウェブサーバーが起動しませんでした。
gunicornプロジェクトからgunicorn_configのサンプルを取得し、airflow.www.gunicorn_configに保存します。
$ curl https://raw.githubusercontent.com/benoitc/gunicorn/master/examples/example_config.py > airflow.www.gunicorn_config
再度ウェブサーバーを起動すると無事にアクセスできました。
$ airflow webserver -p 8080
使い方の説明
ウェブUIにアクセスすると初期状態でサンプルが登録されています。その中にあるtutorialのリンクをクリックします。
リンクをクリックするとツリービューが表示されます。Codeボタンを押して、どのようなワークフローになっているのか確認します。
Pythonのコードが表示されます。
コード中にDAGオブジェクトの定義があります。以下の行が該当します。
dag = DAG( 'tutorial', default_args=default_args, description='A simple tutorial DAG', schedule_interval=timedelta(days=1))
Detailsボタンを押すとDAGの詳細が開きます。
テストのため、開いているDAGパイプラインを解析できるか実行してみます。エラーが起こらなければ正しく記述されています。ここではパッケージ内のexample_dags以下を直接実行します。
$ export AIRFLOW_HOME=~/airflow $ python /usr/local/lib/python2.7/dist-packages/airflow/example_dags/tutorial.py
次にairflowコマンドでtutorialのsleepタスクを実行してみます。無事に5秒間スリープします。
$ airflow test tutorial sleep 2015-06-01
次にDagsのページからtutorialの実行ボタンを押します。テーブル右にある再生ボタンが即時実行ボタンです。
実行すると最終実行時間の欄に値が入ります。このリンクをクリックして実行状況の詳細を開きます。
実行状況が表示されました。
workerを有効にする
ワーカーを起動するには以下のようにworkerと指定します。
$ airflow worker
ですが、そのまま起動するとairflow.cfg で指定しているbroker_urlに含まれるsqlaが利用できないとエラーが出ました。エラーを解決するためにceleryをダウングレードします。
$ sudo pip install celery==3.1.15
次にairflow.cfgを見ると初期設定で以下のようにMySQL が利用されていましたので、MySQLをインストールします。
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow $ sudo apt install mysql-server python-mysqldb $ sudo mysql_secure_installation
rootパスワードへのrootにしておきます。
$ sudo mysql -u root -p
airflowのDBを作りユーザーとパスワードを指定します。
mysql> CREATE DATABASE airflow; mysql> CREATE USER 'airflow'@'localhost' identified by 'airflow'; mysql> GRANT ALL PRIVILEGES ON airflow.* TO 'airflow'@'localhost'; mysql> FLUSH PRIVILEGES;
これでworkerが起動するようになります。
$ airflow worker
まとめ
Airflowは高機能なジョブスケジューラーです。スケーラビリティが考慮された設計になっており規模を問いません。豊富な拡張機能を持ちApache Mesosを利用したスケールアウトを実現出来る等、稼働後に発生しがちな問題への解決方法が提供されています。
もしかすると現バージョンだけかも知れませんが、何をするにしてもエラーが発生し原因調査しなければならないため運用に手間がかかる印象を持ちました。
独特なコンセプトで設計されているジョブスケジューラーのため、柔軟なプラットフォームとして利用するのには向いているのかも知れませんが、例えばcronの代替えとして使うには取っ付きが悪いかも知れません。
参考情報
https://airflow.incubator.apache.org/
https://github.com/apache/incubator-airflow
http://www.celeryproject.org/
本ブログは、Git / Subversionのクラウド型ホスティングサービス「tracpath(トラックパス)」を提供している株式会社オープングルーヴが運営しています。
エンタープライズ向け Git / Subversion 導入や DevOps による開発の効率化を検討している法人様必見!
「tracpath(トラックパス)」は、企業内の情報システム部門や、ソフトウェア開発・アプリケーション開発チームに対して、開発の効率化を支援し、品質向上を実現します。
さらに、システム運用の効率化・自動化支援サービスも提供しています。
”つくる情熱を支えるサービス”を提供し、まるで専属のインフラエンジニアのように、あなたのチームを支えていきます。
No Comments