Apache Airflow: krótkie wprowadzenie i prosty DAG

Apache Airflow: krótkie wprowadzenie i prosty DAG

Apache Airflow to oprogramowanie, które można z łatwością wykorzystać do planowania i monitorowania przepływów pracy. Jest napisany w Pythonie.

Ponieważ każde oprogramowanie Airflow zawiera również koncepcje opisujące funkcje główne i atomowe. W Airflow możesz napotkać:

  • DAG (Directed Acyclic Graph) – zbiór zadań, które w połączeniu tworzą przepływ. W DAG określasz relacje między operatorami (sekwencje lub równoległość zadań), porządek i zależności.
  • Operator – reprezentuje pojedyncze zadanie. Z technicznego punktu widzenia można traktować operatora jak „kontener” do działania.
  • Zadanie – jest instancją określonego uruchomienia operatora w reprezentacji DAG.

Pełna lista dostępnych operatorów w wersji 1.10.1 Apache Airflow:

  • BashOperator
  • BranchPythonOperator
  • CheckOperator
  • DockerOperator
  • DummyOperator
  • DruidCheckOperator
  • EmailOperator
  • GenericTransfer
  • HiveToDruidTransfer
  • HiveToMySqlTransfer
  • Hive2SambaOperator
  • HiveOperator
  • HiveStatsCollectionOperator
  • IntervalCheckOperator
  • JdbcOperator
  • LatestOnlyOperator
  • MsSqlOperator
  • MsSqlToHiveTransfer
  • MySqlOperator
  • MySqlToHiveTransfer
  • OracleOperator
  • PigOperator
  • PostgresOperator
  • PrestoCheckOperator
  • PrestoIntervalCheckOperator
  • PrestoToMySqlTransfer
  • PrestoValueCheckOperator
  • PythonOperator
  • PythonVirtualenvOperator
  • S3FileTransformOperator
  • S3ToHiveTransfer
  • S3ToRedshiftTransfer
  • ShortCircuitOperator
  • SimpleHttpOperator
  • SlackAPIOperator
  • SlackAPIPostOperator
  • SqliteOperator
  • SubDagOperator
  • TriggerDagRunOperator
  • ValueCheckOperator
  • RedshiftToS3Transfer

Utworzenie prostego DAG’a z dwoma operatorami

W naszym ćwiczeniu stworzymy DAG z dwoma zadaniami BashOperatora. Oba wydrukują komunikat w konsoli.

Przede wszystkim musimy stworzyć nowy plik Python w katalogu AIRFLOW_HOME/dags. Airflow monitoruje ten folder w odstępach czasu i po kilku sekundach możesz zobaczyć swój DAG w interfejsie Airflow.

W moim przypadku AIRFLOW_HOME=/home/pawel/airflow => Z tego wynika, że moje dagi muszę umieszczaćw folderze: /home/pawel/airflow/dags. Poniżej znajduje się implementacja DAG. Zapisałem go w pliku simple-dag.py w katalogu dags.

import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'Pawel',
    'start_date': datetime(2018, 12, 02, 16, 40, 00),
    'email': ['bigdataetlcom@gmail.com'],
    'email_on_failure': False,
    'email_on_retry': False
}
dag = DAG(dag_id='Simple_DAG', default_args=args, schedule_interval='@daily', concurrency=1, max_active_runs=1,
          catchup=False)
task_1 = BashOperator(
    task_id='task_1',
    bash_command='echo Hello, This is my first DAG in Airflow!',
    dag=dag
)
task_2 = BashOperator(
    task_id='task_2',
    bash_command='echo With two operators!',
    dag=dag
)
task_1 >> task_2

Airflow UI

W pliku konfiguracyjnym: (AIRFLOW_HOMEairflow.cfg) można ustawić parametr „endpoint_url„. Domyślnie jest ustawiony na: http://localhost:8080. Jeśli korzystasz z usługi Airflow na komputerze lokalnym, powinieneś móc otworzyć ten adres URL w przeglądarce. W przypadku, gdy Airflow jest zainstalowany na zdalnym komputerze-hoście, zalecam zmianę „localhost” na publiczny IP hosta Airflow. Dodatkowo zmień również parametr „base_url”. Dzięki temu, Airflow będzie generował wszystkie linki z uwzględnieniem Twojego zdalnego IP.

endpoint_url = http://localhost:8080
base_url = http://localhost:8080

Uruchom utworzony DAG

Powinieneś być w stanie zobaczyć stworzony DAG w interfejsie Airflow jak na zdjęciu poniżej. W moim przypadku ja już go oznaczyłem jako DAG aktywny i uruchomiłem, dlatego możesz zobaczyć przycisk „Włącz” z ciemnozielonym tłem. Po uruchomieniu DAG powinieneś być w stanie zobaczyć, że w sekcji „Ostatnie zadania” tworzone jest nowe zadanie, które przechodzi przez każdy status zadania (oczywiście w przypadku kolejnego scenariusza):

  1. Scheduled
  2. Queued
  3. Running
  4. Finished


Sekcja „DAG Runs” przedstawia informacje o wszystkich uruchomieniach DAG na poziomie DAG zamiast „Recent task„, który pokazuje tylko stan ostatnich zadań ostatniego DAG, jak sugeruje nazwa :). (DAG jest kontenerem dla zadań).

Otwórzmy pomyślny przebieg DAG i zobaczmy logi każdego zadania. W poniższym zrzucie ekranu przedstawiono „Widok wykresu” naszego DAG. Jest to bardzo przydatne, gdy chcesz sprawdzić, czy zadanie w DAG jest w poprawnej kolejności wykonywania i zobaczyć status wykonania każdego zadania. W przypadku niepowodzenia zobaczysz, że na przykład „task_2” zostanie oznaczony jako żółty (gdy status zadania zostanie ustawiony na „up_for_retry„) lub czerwony w przypadku niepowodzenia. Tutaj możesz łatwo przejść do logów każdego zadania, po prostu kliknij lewym przyciskiem na wybranym zadaniu zobaczysz modalne okno z wieloma opcjami. Jednym z nich jest przycisk „Logi„.


Logi #Task_1


Logi #Task_2


Podsumowanie

Po tym poście powinieneś być w stanie stworzyć, uruchomić i debugować proste DAG w Airflow.

Apache Airflow to potężne narzędzie do organizowania przepływów pracy w projektach i organizacjach. Pomimo tego, że wciąż jest w Apache Inkubator, Airflow jest używany przez wielu „dużych graczy” w świecie IT.

Jeśli spodobał Ci się ten post to napisz proszę komentarz poniżej oraz udostępnij ten post na swoim Facebook’u, Twitter’ze, LinkedIn lub innej stronie z mediami społecznościowymi.
Z góry dzięki!

Please follow and like us:
error

Ten post ma jeden komentarz

  1. Hej Paweł! Dzięki za artykuł. Fajnie, że piszesz o Airflow – na co dzień pracuję przy rozwoju tego oprogramowania. Jak byś chciał kiedyś pogadać to się odezwij na maila albo na Twitterze: https://twitter.com/sprzedwojski
    Pozdrawiam!
    Szymon

Dodaj komentarz

Close Menu
Social media & sharing icons powered by UltimatelySocial
error

Enjoy this blog? Please spread the word :)