Apache Airflow to oprogramowanie, które można z łatwością wykorzystać do planowania i monitorowania [ Apache Airflow krótkie wprowadzenie i prosty DAG ] przepływów pracy. Jest napisany w Pythonie.
Ponieważ każde oprogramowanie Airflow zawiera również koncepcje opisujące funkcje główne i atomowe.
Table of Contents
Wprowadzenie
Apache Airflow to otwarte źródło platformy do definiowania, planowania i monitorowania przepływów pracy. Jest ona zaprojektowana w celu ułatwienia programistom i inżynierom danych tworzenie i utrzymywanie łańcuchów dostaw danych i przepływów pracy w prosty i elastyczny sposób.
W Airflow możesz napotkać:
- DAG (Directed Acyclic Graph) – zbiór zadań, które w połączeniu tworzą przepływ. W DAG określasz relacje między operatorami (sekwencje lub równoległość zadań), porządek i zależności.
- Operator – reprezentuje pojedyncze zadanie. Z technicznego punktu widzenia można traktować operatora jak “kontener” do działania.
- Zadanie – jest instancją określonego uruchomienia operatora w reprezentacji DAG. (Apache Airflow krótkie wprowadzenie i prosty DAG)
Pełna lista dostępnych operatorów w wersji 1.10.1 Apache Airflow:
- BashOperator
- BranchPythonOperator
- CheckOperator
- DockerOperator
- DummyOperator
- DruidCheckOperator
- EmailOperator
- GenericTransfer
- HiveToDruidTransfer
- HiveToMySqlTransfer
- Hive2SambaOperator
- HiveOperator
- HiveStatsCollectionOperator
- IntervalCheckOperator
- JdbcOperator
- LatestOnlyOperator
- MsSqlOperator
- MsSqlToHiveTransfer
- MySqlOperator
- MySqlToHiveTransfer
- OracleOperator
- PigOperator
- PostgresOperator
- PrestoCheckOperator
- PrestoIntervalCheckOperator
- PrestoToMySqlTransfer
- PrestoValueCheckOperator
- PythonOperator
- PythonVirtualenvOperator
- S3FileTransformOperator
- S3ToHiveTransfer
- S3ToRedshiftTransfer
- ShortCircuitOperator
- SimpleHttpOperator
- SlackAPIOperator
- SlackAPIPostOperator
- SqliteOperator
- SubDagOperator
- TriggerDagRunOperator
- ValueCheckOperator
- RedshiftToS3Transfer
Utworzenie prostego DAG’a z dwoma operatorami
W naszym ćwiczeniu stworzymy DAG z dwoma zadaniami BashOperatora. Oba wydrukują komunikat w konsoli.
Przede wszystkim musimy stworzyć nowy plik Python w katalogu AIRFLOW_HOME/dags. Airflow monitoruje ten folder w odstępach czasu i po kilku sekundach możesz zobaczyć swój DAG w interfejsie Airflow.
W moim przypadku AIRFLOW_HOME=/home/pawel/airflow => Z tego wynika, że moje dagi muszę umieszczaćw folderze: /home/pawel/airflow/dags. Poniżej znajduje się implementacja DAG. Zapisałem go w pliku simple-dag.py w katalogu dags. . (Apache Airflow krótkie wprowadzenie i prosty DAG)
Apache Airflow Co to Jest?
import airflow from airflow.operators.bash_operator import BashOperator from airflow.models import DAG from datetime import datetime, timedelta args = { 'owner': 'Pawel', 'start_date': datetime(2018, 12, 02, 16, 40, 00), 'email': ['bigdataetlcom@gmail.com'], 'email_on_failure': False, 'email_on_retry': False } dag = DAG(dag_id='Simple_DAG', default_args=args, schedule_interval='@daily', concurrency=1, max_active_runs=1, catchup=False) task_1 = BashOperator( task_id='task_1', bash_command='echo Hello, This is my first DAG in Airflow!', dag=dag ) task_2 = BashOperator( task_id='task_2', bash_command='echo With two operators!', dag=dag ) task_1 >> task_2
Airflow UI
W pliku konfiguracyjnym: (AIRFLOW_HOMEairflow.cfg) można ustawić parametr “endpoint_url“. Domyślnie jest ustawiony na: http://localhost:8080. Jeśli korzystasz z usługi Airflow na komputerze lokalnym, powinieneś móc otworzyć ten adres URL w przeglądarce. W przypadku, gdy Airflow jest zainstalowany na zdalnym komputerze-hoście, zalecam zmianę “localhost” na publiczny IP hosta Airflow. Dodatkowo zmień również parametr “base_url”. Dzięki temu, Airflow będzie generował wszystkie linki z uwzględnieniem Twojego zdalnego IP. . (Apache Airflow krótkie wprowadzenie i prosty DAG)
endpoint_url = http://localhost:8080 base_url = http://localhost:8080
Uruchom Utworzony DAG
Powinieneś być w stanie zobaczyć stworzony DAG w interfejsie Airflow jak na zdjęciu poniżej. W moim przypadku ja już go oznaczyłem jako DAG aktywny i uruchomiłem, dlatego możesz zobaczyć przycisk “Włącz” z ciemnozielonym tłem. Po uruchomieniu DAG powinieneś być w stanie zobaczyć, że w sekcji “Ostatnie zadania” tworzone jest nowe zadanie, które przechodzi przez każdy status zadania (oczywiście w przypadku kolejnego scenariusza) (Apache Airflow krótkie wprowadzenie i prosty DAG)
- Scheduled
- Queued
- Running
- Finished

Sekcja “DAG Runs” przedstawia informacje o wszystkich uruchomieniach DAG na poziomie DAG zamiast “Recent task“, który pokazuje tylko stan ostatnich zadań ostatniego DAG, jak sugeruje nazwa :). (DAG jest kontenerem dla zadań).
Otwórzmy pomyślny przebieg DAG i zobaczmy logi każdego zadania. W poniższym zrzucie ekranu przedstawiono “Widok wykresu” naszego DAG. Jest to bardzo przydatne, gdy chcesz sprawdzić, czy zadanie w DAG jest w poprawnej kolejności wykonywania i zobaczyć status wykonania każdego zadania. W przypadku niepowodzenia zobaczysz, że na przykład “task_2” zostanie oznaczony jako żółty (gdy status zadania zostanie ustawiony na “up_for_retry“) lub czerwony w przypadku niepowodzenia.
Tutaj możesz łatwo przejść do logów każdego zadania, po prostu kliknij lewym przyciskiem na wybranym zadaniu zobaczysz modalne okno z wieloma opcjami. Jednym z nich jest przycisk “Logi“. (Apache Airflow krótkie wprowadzenie i prosty DAG)

Logi #Task_1

Logi #Task_2

Podsumowanie
Po tym poście powinieneś być w stanie stworzyć, uruchomić i debugować proste DAG w Airflow.
Apache Airflow to potężne narzędzie do organizowania przepływów pracy w projektach i organizacjach. Pomimo tego, że wciąż jest w Apache Inkubator, Airflow jest używany przez wielu “dużych graczy” w świecie IT. (Apache Airflow krótkie wprowadzenie i prosty DAG)
Could You Please Share This Post?
I appreciate It And Thank YOU! :)
Have A Nice Day!