1. Apache Airflow 소개
1.1 Batch Process란?
예약된 시간에 실행되는 프로세스
일회성(1회)도 가능하고, 주기적인 실행도 가능
ex. 이번 주 일요일 07:00에 1번 실행되는 프로세스
ex. 매주 일요일 07:00에 실행되는 프로세스
1.2 Batch Process - Airflow 등장 전
대표적인 Batch Process 구축 방법 : Linux Crontab
크론표현식
자주 사용되는 크론표현식
크론 메이커
CronMaker
www.cronmaker.com
크론 해석기
Crontab.guru - The cron schedule expression editor
crontab.guru
크론탭의 문제
- 재실행 및 알람
- 파일을 실행하다 오류가 발생한 경우, 크론탭이 별도의 처리를 하지 않음
- 예) 매주 일요일 07:00에 predict.py를 실행하다가 에러가 발생한 경우, 알람을 별도로 받지 못함
- 실패할 경우, 자동으로 몇 번 더 재실행(Retry)하고, 그래도 실패하면 실패했다는 알람을 받으면 좋음
- 과거 실행 이력 및 실행 로그를 보기 어려움
- 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘듬 Crontab은 간단히 사용할 수는 있지만, 실패 시 재실행, 실행 로그 확인, 알람 등의 기능은 제공하지 않음
1.3 Airflow 소개
파이썬을 사용해 스케줄링 및 파이프라인 작성
- 실패 시 알람
- 실패 시 재실행 시도
- 동시 실행 워커 수
- 설정 및 변수 값 분리
2. Apache Airflow 실습하며 배워보기
2.1 설치하고 실행하기
1) pip install 'apache-airflow==2.2.0'
2) 환경변수로 에어플로우 홈 지정
- 현재 폴더(airflow를 사용 할)를 airflow home으로지정
- export AIRFLOW_HOME=.
3) airflow로 사용 할 db 초기화 (초기1회)
- airflow db init
4) 저장 된 파일 확인(폴더에 파일생성됨)
- airflow.db
- airflow.cfg
5) airflow에서 사용 할 어드민계정 생성
- airflow users create --username admin --password 1234 --firstname jihoon --lastname heo --role Admin --email asdf@naver.com
6) 웹서버 오픈
- airflow webserver --port 8080
7) 스케줄러 오픈
- airflow scheduler
2.2 DAG 작성하기
- Airflow는 DAG이라는 단위로 스케줄링 관리
- 각 DAG은 Task로 구성
- DAG 내 Task는 순차적으로 실행되거나, 동시에(병렬로) 실행할 수 있음
예제
# hello_world.py
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
def print_world() -> None:
print("world")
# with 구문으로 DAG 정의를 시작합니다.
with DAG(
dag_id="hello_world", # DAG의 식별자용 아이디입니다.
description="My First DAG", # DAG에 대해 설명합니다.
start_date=days_ago(2), # DAG 정의 기준 2일 전부터 시작합니다.
schedule_interval="0 6 * * *", # 매일 06:00에 실행합니다.
tags=["my_dags"],
) as dag:
# 테스크를 정의합니다.
# bash 커맨드로 echo hello 를 실행합니다.
t1 = BashOperator(
task_id="print_hello",
bash_command="echo Hello",
owner="heumsi", # 이 작업의 오너입니다. 보통 작업을 담당하는 사람 이름을 넣습니다.
retries=3, # 이 테스크가 실패한 경우, 3번 재시도 합니다.
retry_delay=timedelta(minutes=5), # 재시도하는 시간 간격은 5분입니다.
)
# 테스크를 정의합니다.
# python 함수인 print_world를 실행합니다.
t2 = PythonOperator(
task_id="print_world",
python_callable=print_world,
depends_on_past=True,
owner="heumsi",
retries=3,
retry_delay=timedelta(minutes=5),
)
# 테스크 순서를 정합니다.
# t1 실행 후 t2를 실행합니다.
t1 >> t2
- AIRFLOW_HOME 으로 지정된 디렉토리에 dags 디렉토리를 생성하고 이 안에 DAG 파일을 작성
- DAG은 파이썬 파일로 작성. 보통 하나의 .py 파일에 하나의 DAG을 저장
- DAG 파일은 크게 다음으로 구성
- DAG 정의 부분
- Task 정의 부분
- Task 간 순서 정의 부분
- DAG 파일을 저장하면, Airflow 웹 UI에서 확인할 수 있음
- Airflow 웹 UI에서 해당 DAG을 ON으로 변경하면 DAG이 스케줄링되어 실행
- DAG 세부 페이지에서 실행된 DAG Run의 결과를 볼 수 있음
2.3 유용한 Operator 간단 소개
- DummyOperator
- SimpleHttpOperator
- BranchOperator
- DockerOperator
- KuberntesOperator
- CustomOperator (직접 Operator 구현)
Tip
외부 Third Party와 연동해 사용하는 Operator의 경우 (docker, aws, gcp 등)
Airflow 설치 시에 다음처럼 extra package를 설치해야 함
pip install “apache-airflow[aws]”
더 잘 사용하기 위한 방법
- Variable : Airflow Console에서 변수(Variable)를 저장해 Airflow DAG에서 활용
- Connection & Hooks : 연결하기 위한 설정(MySQL, GCP 등)
- Sensor : 외부 이벤트를 기다리며 특정 조건이 만족하면 실행
- Marker - XComs : Task 끼리 결과를 주고받은 싶은 경우 사용
3. Apache Airflow 아키텍처와 활용방안
3.1 기본 아키텍처
1) DAG Directory
DAG 파일들을 저장
- 기본 경로는 $AIRFLOW_HOME/dags
- DAG_FOLDER 라고도 부르며, 이 폴더 내부에서 폴더 구조를 어떻게 두어도 상관없음
- Scheduler에 의해 .py 파일은 모두 탐색되고 DAG이 파싱
2) Scheduler
Scheduler는 각종 메타 정보의 기록을 담당
- DAG Directory 내 .py 파일에서 DAG을 파싱하여 DB에 저장
- DAG들의 스케줄링 관리 및 담당
- 실행 진행 상황과 결과를 DB에 저장
- Executor를 통해 실제로 스케줄링된 DAG을 실행
- Airflow에서 가장 중요한 컴포넌트
3) Scheduler - Executor
Executor는 스케줄링된 DAG을 실행하는 객체로, 크게 2종류로 나뉨
- Local Executor
- Remote Executor
3-1) Scheduler - Local Executor
Local Executor는 DAG Run을 프로세스 단위로 실행하며, 다음처럼 나뉨
Local Executor
- 하나의 DAG Run을 하나의 프로세스로 띄워서 실행
- 최대로 생성할 프로세스 수를 정해야 함
- Airflow를 간단하게 운영할 때 적합
Sequential Executor
- 하나의 프로세스에서 모든 DAG Run들을 처리
- Airflow 기본 Executor로, 별도 설정이 없으면 이 Executor를 사용
- Airflow를 테스트로 잠시 운영할 때 적합
3-2) Scheduler - Remote Executor
DAG Run을 외부 프로세스로 실행
Celery Executor
- DAG Run을 Celery Worker Process로 실행
- 보통 Redis를 중간에 두고 같이 사용
- Local Executor를 사용하다, Airflow 운영 규모가 좀 더 커지면 Celery Executor로 전환
Kubernetes Executor
- 쿠버네티스 상에서 Airflow를 운영할 때 사용
- DAG Run 하나가 하나의 Pod(쿠버네티스의 컨테이너 같은 개념)
- Airflow 운영 규모가 큰 팀에서 사용
https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
Kubernetes를 이용한 효율적인 데이터 엔지니어링(Airflow on Kubernetes VS Airflow Kubernetes Executor) - 1 - LINE
안녕하세요. LINE Financial Data Platform을 운영하고 개발하고 있는 이웅규입니다. 저는 지난 NAVER DEVIEW 2020에서 발표했던 Kubernetes를 이용한 효율적인 데이터 엔지니어링 (Airflow on Kubernetes VS Airflow Kubern
engineering.linecorp.com
4) Workers
DAG을 실제로 실행
- Scheduler에 의해 생기고 실행
- Executor에 따라 워커의 형태가 다름
- Celery 혹은 Local Executor인 경우, Worker는 프로세스
- Kubernetes Executor인 경우, Worker는 pod.
- DAG Run을 실행하는 과정에서 생긴 로그를 저장
5) Metadata Database
메타 정보를 저장
- Scheduler에 의해 Metadata가 쌓임
- 보통 MySQL이나 Postgres를 사용
- 파싱한 DAG 정보, DAG Run 상태와 실행 내용, Task 정보 등을 저장
- User와 Role (RBAC)에 대한 정보 저장
- Scheduler와 더불어 핵심 컴포넌트
- 트러블 슈팅 시, 디버깅을 위해 직접 DB에 연결해 데이터를 확인하기도 함
- 실제 운영 환경에서는 GCP Cloud SQL이나, AWS Aurora DB 등 외부 DB 인스턴스를 사용
6) Webserver
WEB UI를 담당
- Metadata DB와 통신하며 유저에게 필요한 메타 데이터를 웹 브라우저에 보여주고 시각화
- 보통 Airflow 사용자들은 이 웹서버를 이용하여 DAG을 ON/OFF 하며, 현 상황을 파악
- REST API도 제공하므로, 꼭 WEB UI를 통해서 통신하지 않아도 괜찮음
- 웹서버가 당장 작동하지 않아도, Airflow에 큰 장애가 발생하지 않음(반면 Scheduler의 작동 여부는 매우 중요)
3.2 Airflow 실제 활용 사례
1) Managed Airflow (GCP Composer, AWS MWAA)
장점
- 설치와 구축을 클릭 몇번으로 클라우드 서비스가 다 진행
- 유저는 DAG 파일을 스토리지(파일 업로드) 형태로 관리
단점
- 비용
- 자유도가 적음. 클라우드에서 기능을 제공하지 않으면 불가능한 제약이 많음
2) VM + Docker compose
https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
GitHub - apache/airflow: Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
Apache Airflow - A platform to programmatically author, schedule, and monitor workflows - GitHub - apache/airflow: Apache Airflow - A platform to programmatically author, schedule, and monitor work...
github.com
https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html
쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기(feat. Airflow on Kubernetes)
안녕하세요. 데이터 엔지니어링팀의 하디입니다. 이번 글에서는 쏘카 데이터 그룹의 태동기(2018년)부터 현재(2021년)까지 어떻게 Airflow를 구축하고 운영했는지를 소개합니다. 특히 최근에 쏘카
tech.socarcorp.kr
장점
- Managed Service 보다는 살짝 복잡하지만, 어려운 난이도는 아님
- (Docker와 Docker compose에 익숙한 사람이라면 금방 익힐 수 있음)
- 하나의 VM만을 사용하기 때문에 단순
단점
- 각 도커 컨테이너 별로 환경이 다르므로, 관리 포인트가 늘어남
- 예를 들어, 특정 컨테이너가 갑자기 죽을 수도 있고, 특정 컨테이너에 라이브러리를 설치했다면, 나머지 컨테이너에도 하나씩 설치해야 함
3) Kubernetes + Helm
- Kubernetes는 여러 개의 VM을 동적으로 운영하는 일종의 분산환경으로, 리소스 사용이 매우 유연한게 대표적인 특징(필요에 따라 VM 수를 알아서 늘려주고 줄여줌)
- 이런 특징 덕분에, 특정 시간에 배치 프로세스를 실행시키는 Airflow와 궁합이 매우 잘 맞음
- Airflow DAG 수가 몇 백개로 늘어나도 노드 오토 스케일링으로 모든 프로세스를 잘 처리할 수 있음
- 하지만 쿠버네티스 자체가 난이도가 있는만큼 구축과 운영이 어려움
- 보통 데이터 팀에 엔지니어링 팀이 존재하고, 쿠버네티스 환경인 경우에 적극 사용
3.3 MLOps 관점의 Airflow
“주기적인 실행”이 필요한 경우
- Batch Training : 1주일 단위로 모델 학습
- Batch Serving(Batch Inference) : 30분 단위로 인퍼런스
- 인퍼런스 결과를 기반으로 일자별, 주차별 모델 퍼포먼스 Report 생성
- MySQL에 저장된 메타데이터를 데이터 웨어하우스로 1시간 단위로 옮기기
- S3, GCS 등 Objest Storage
- Feature Store를 만들기 위해 Batch ETL 실행
Airflow 추천글
1. 버킷플레이스 - Airflow 도입기 - https://www.bucketplace.co.kr/post/2021-04-13-버킷플레이스-airflow-도입기/
2. 라인 엔지니어링 - Airflow on Kubernetes - https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
3. 쏘카 데이터 그룹 - Airflow와 함께한 데이터 환경 구축기 - https://tech.socarcorp.kr/data/2021/06/01/data-engineering-with-airflow.html
4. Airflow Executors Explained - https://www.astronomer.io/guides/airflow-executors-explained
'부스트캠프 AI Tech 2기 > 2기 Product Serving' 카테고리의 다른 글
머신러닝 디자인 패턴 (0) | 2021.12.16 |
---|---|
BentoML (0) | 2021.12.13 |
Cloud (0) | 2021.12.10 |
Linux & Shell Command (0) | 2021.12.10 |
웹 서비스 형태 - Streamlit (0) | 2021.12.09 |