1. Apache Airflow 소개
1.1 Batch Process란?
예약된 시간에 실행되는 프로세스
일회성(1회)도 가능하고, 주기적인 실행도 가능
ex. 이번 주 일요일 07:00에 1번 실행되는 프로세스
ex. 매주 일요일 07:00에 실행되는 프로세스
1.2 Batch Process - Airflow 등장 전
대표적인 Batch Process 구축 방법 : Linux Crontab
크론표현식
자주 사용되는 크론표현식
크론 메이커
크론 해석기
크론탭의 문제
- 재실행 및 알람
- 파일을 실행하다 오류가 발생한 경우, 크론탭이 별도의 처리를 하지 않음
- 예) 매주 일요일 07:00에 predict.py를 실행하다가 에러가 발생한 경우, 알람을 별도로 받지 못함
- 실패할 경우, 자동으로 몇 번 더 재실행(Retry)하고, 그래도 실패하면 실패했다는 알람을 받으면 좋음
- 과거 실행 이력 및 실행 로그를 보기 어려움
- 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듬 Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음
1.3 Airflow 소개
파이썬을 사용해 스케줄링 및 파이프라인 작성
- 실패 시 알람
- 실패 시 재실행 시도
- 동시 실행 워커 수
- 설정 및 변수 값 분리
2. Apache Airflow 실습하며 배워보기
2.1 설치하고 실행하기
1) pip install 'apache-airflow==2.2.0'
2) 환경변수로 에어플로우 홈 지정
- 현재 폴더(airflow를 사용 할)를 airflow home으로지정
- export AIRFLOW_HOME=.
3) airflow로 사용 할 db 초기화 (초기1회)
- airflow db init
4) 저장 된 파일 확인(폴더에 파일생성됨)
- airflow.db
- airflow.cfg
5) airflow에서 사용 할 어드민계정 생성
- airflow users create --username admin --password 1234 --firstname jihoon --lastname heo --role Admin --email asdf@naver.com
6) 웹서버 오픈
- airflow webserver --port 8080
7) 스케줄러 오픈
- airflow scheduler
2.2 DAG 작성하기
- Airflow는 DAG이라는 단위로 스케줄링 관리
- 각 DAG은 Task로 구성
- DAG 내 Task는 순차적으로 실행되거나, 동시에(병렬로) 실행할 수 있음
예제
# hello_world.py
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def print_world() -> None:
print("world")
# with 구문으로 DAG 정의를 시작합니다.
with DAG(
dag_id="hello_world", # DAG의 식별자용 아이디입니다.
description="My First DAG", # DAG에 대해 설명합니다.
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다.
schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다.
tags=["my_dags"],
) as dag:
# 테스크를 정의합니다.
# bash 커맨드로 echo hello 를 실행합니다.
t1 = BashOperator(
task_id="print_hello",
bash_command="echo Hello",
owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다.
retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다.
retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다.
)
# 테스크를 정의합니다.
# python 함수인 print_world를 실행합니다.
t2 = PythonOperator(
task_id="print_world",
python_callable=print_world,
depends_on_past=True,
owner="heumsi",
retries=3,
retry_delay=timedelta(minutes=5),
)
# 테스크 순서를 정합니다.
# t1 실행 후 t2를 실행합니다.
t1 >> t2
- AIRFLOW_HOME 으로 지정된 디렉토리에 dags 디렉토리를 생성하고 이 안에 DAG 파일을 작성
- DAG은 파이썬 파일로 작성. 보통 하나의 .py 파일에 하나의 DAG을 저장
- DAG 파일은 크게 다음으로 구성
- DAG 정의 부분
- Task 정의 부분
- Task 간 순서 정의 부분
- DAG 파일을 저장하면, Airflow 웹 UI에서 확인할 수 있음
- Airflow 웹 UI에서 해당 DAG을 ON으로 변경하면 DAG이 스케줄링되어 실행
- DAG 세부 페이지에서 실행된 DAG Run의 결과를 볼 수 있음
2.3 유용한 Operator 간단 소개
- DummyOperator
- SimpleHttpOperator
- BranchOperator
- DockerOperator
- KuberntesOperator
- CustomOperator (직접 Operator 구현)
Tip
외부 Third Party와 연동해 사용하는 Operator의 경우 (docker, aws, gcp 등)
Airflow 설치 시에 다음처럼 extra package를 설치해야 함
pip install “apache-airflow[aws]”
더 잘 사용하기 위한 방법
- Variable : Airflow Console에서 변수(Variable)를 저장해 Airflow DAG에서 활용
- Connection & Hooks : 연결하기 위한 설정(MySQL, GCP 등)
- Sensor : 외부 이벤트를 기다리며 특정 조건이 만족하면 실행
- Marker - XComs : Task 끼리 결과를 주고받은 싶은 경우 사용
3. Apache Airflow 아키텍처와 활용방안
3.1 기본 아키텍처
1) DAG Directory
DAG 파일들을 저장
- 기본 경로는 $AIRFLOW_HOME/dags
- DAG_FOLDER 라고도 부르며, 이 폴더 내부에서 폴더 구조를 어떻게 두어도 상관없음
- Scheduler에 의해 .py 파일은 모두 탐색되고 DAG이 파싱
2) Scheduler
Scheduler는 각종 메타 정보의 기록을 담당
- DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
- DAG들의 스케줄링 관리 및 담당
- 실행 진행 상황과 결과를 DB에 저장
- Executor를 통해 실제로 스케줄링된 DAG을 실행
- Airflow에서 가장 중요한 컴포넌트
3) Scheduler - Executor
Executor는 스케줄링된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
- Local Executor
- Remote Executor
3-1) Scheduler - Local Executor
Local Executor는 DAG Run을 프로세스 단위로 실행하며, 다음처럼 나뉨
Local Executor
- 하나의 DAG Run을 하나의 프로세스로 띄워서 실행
- 최대로 생성할 프로세스 수를 정해야 함
- Airflow를 간단하게 운영할 때 적합
Sequential Executor
- 하나의 프로세스에서 모든 DAG Run들을 처리
- Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용
- Airflow를 테스트로 잠시 운영할 때 적합
3-2) Scheduler - Remote Executor
DAG Run을 외부 프로세스로 실행
Celery Executor
- DAG Run을 Celery Worker Process로 실행
- 보통 Redis를 중간에 두고 같이 사용
- Local Executor를 사용하다, Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
Kubernetes Executor
- 쿠버네티스 상에서 Airflow를 운영할 때 사용
- DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
- Airflow 운영 규모가 큰 팀에서 사용
https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
4) Workers
DAG을 실제로 실행
- Scheduler에 의해 생기고 실행
- Executor에 따라 워커의 형태가 다름
- Celery 혹은 Local Executor인 경우, Worker는 프로세스
- Kubernetes Executor인 경우, Worker는 pod.
- DAG Run을 실행하는 과정에서 생긴 로그를 저장
5) Metadata Database
메타 정보를 저장
- Scheduler에 의해 Metadata가 쌓임
- 보통 MySQL이나 Postgres를 사용
- 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
- User와 Role (RBAC)에 대한 정보 저장
- Scheduler와 더불어 핵심 컴포넌트
- 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
- 실제 운영 환경에서는 GCP Cloud SQL이나, AWS Aurora DB 등 외부 DB 인스턴스를 사용
6) Webserver
WEB UI를 담당
- Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
- 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG을 ON/OFF 하며, 현 상황을 파악
- REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않아도 괜찮음
- 웹서버가 당장 작동하지 않아도, Airflow에 큰 장애가 발생하지 않음(반면 Scheduler의 작동 여부는 매우 중요)
3.2 Airflow 실제 활용 사례
1) Managed Airflow (GCP Composer, AWS MWAA)
장점
- 설치와 구축을 클릭 몇번으로 클라우드 서비스가 다 진행
- 유저는 DAG 파일을 스토리지(파일 업로드) 형태로 관리
단점
- 비용
- 자유도가 적음. 클라우드에서 기능을 제공하지 않으면 불가능한 제약이 많음
2) VM + Docker compose
https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html
장점
- Managed Service 보다는 살짝 복잡하지만, 어려운 난이도는 아님
- (Docker와 Docker compose에 익숙한 사람이라면 금방 익힐 수 있음)
- 하나의 VM만을 사용하기 때문에 단순
단점
- 각 도커 컨테이너 별로 환경이 다르므로, 관리 포인트가 늘어남
- 예를 들어, 특정 컨테이너가 갑자기 죽을 수도 있고, 특정 컨테이너에 라이브러리를 설치했다면, 나머지 컨테이너에도 하나씩 설치해야 함
3) Kubernetes + Helm
- Kubernetes는 여러 개의 VM을 동적으로 운영하는 일종의 분산환경으로, 리소스 사용이 매우 유연한게 대표적인 특징(필요에 따라 VM 수를 알아서 늘려주고 줄여줌)
- 이런 특징 덕분에, 특정 시간에 배치 프로세스를 실행시키는 Airflow와 궁합이 매우 잘 맞음
- Airflow DAG 수가 몇 백개로 늘어나도 노드 오토 스케일링으로 모든 프로세스를 잘 처리할 수 있음
- 하지만 쿠버네티스 자체가 난이도가 있는만큼 구축과 운영이 어려움
- 보통 데이터 팀에 엔지니어링 팀이 존재하고, 쿠버네티스 환경인 경우에 적극 사용
3.3 MLOps 관점의 Airflow
“주기적인 실행”이 필요한 경우
- Batch Training : 1주일 단위로 모델 학습
- Batch Serving(Batch Inference) : 30분 단위로 인퍼런스
- 인퍼런스 결과를 기반으로 일자별, 주차별 모델 퍼포먼스 Report 생성
- MySQL에 저장된 메타데이터를 데이터 웨어하우스로 1시간 단위로 옮기기
- S3, GCS 등 Objest Storage
- Feature Store를 만들기 위해 Batch ETL 실행
Airflow 추천글
1. 버킷플레이스 - Airflow 도입기 - https://www.bucketplace.co.kr/post/2021-04-13-버킷플레이스-airflow-도입기/
2. 라인 엔지니어링 - Airflow on Kubernetes - https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
3. 쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기 - https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html
4. Airflow Executors Explained - https://www.astronomer.io/guides/airflow-executors-explained
'부스트캠프 AI Tech 2기 > 2기 Product Serving' 카테고리의 다른 글
머신러닝 디자인 패턴 (0) | 2021.12.16 |
---|---|
BentoML (0) | 2021.12.13 |
Cloud (0) | 2021.12.10 |
Linux & Shell Command (0) | 2021.12.10 |
웹 서비스 형태 - Streamlit (0) | 2021.12.09 |