데이터 과학 워크플로우: Airflow를 이용한 파이프라인 자동화 🚀
안녕하세요, 데이터 과학 열정가 여러분! 오늘은 정말 흥미진진한 주제로 여러분과 함께 데이터의 세계를 탐험해보려고 해요. 바로 Airflow를 이용한 데이터 파이프라인 자동화에 대해 알아볼 거예요. 🎉
여러분, 혹시 데이터 처리 작업을 하다가 "아, 이 과정을 자동화할 수 있다면 얼마나 좋을까?"라고 생각해본 적 있나요? 그렇다면 여러분은 이미 Airflow의 필요성을 느끼고 계신 거예요! Airflow는 마치 우리의 데이터 작업을 위한 슈퍼 영웅과 같아요. 복잡한 데이터 처리 과정을 자동화하고, 효율적으로 관리할 수 있게 해주는 강력한 도구랍니다. 😎
이 글을 통해 우리는 Airflow의 세계로 깊숙이 들어가 볼 거예요. 마치 재능넷에서 다양한 재능을 탐험하듯이, 우리도 Airflow의 다양한 기능과 활용법을 탐험해볼 거예요. 여러분의 데이터 과학 실력을 한 단계 업그레이드할 준비 되셨나요? 그럼 시작해볼까요! 🚀
1. Airflow란 무엇인가? 🤔
자, 여러분! Airflow에 대해 들어보셨나요? 아직 모르신다고요? 걱정 마세요. 지금부터 차근차근 설명해드릴게요. 🙂
Airflow는 워크플로우 관리 플랫폼이에요. 복잡하게 들리시나요? 쉽게 말해서, Airflow는 우리가 해야 할 일들을 순서대로 정리하고 자동으로 실행해주는 똑똑한 비서 같은 존재예요. 마치 재능넷에서 여러분이 원하는 재능을 찾아 순서대로 배우는 것처럼, Airflow는 데이터 처리 작업을 순서대로 실행해줍니다.
🌟 Airflow의 핵심 특징:
- 파이썬으로 작성된 오픈소스 프로젝트
- 복잡한 계산 로직을 표현할 수 있는 강력한 기능
- 확장성이 뛰어나며 다양한 시스템과 통합 가능
- 동적인 파이프라인 생성 지원
- 웹 인터페이스를 통한 편리한 모니터링
Airflow는 2014년 Airbnb에서 시작된 프로젝트로, 현재는 Apache Software Foundation의 최상위 프로젝트로 성장했어요. 마치 재능넷이 다양한 재능을 공유하는 플랫폼으로 성장한 것처럼, Airflow도 데이터 엔지니어링 분야에서 없어서는 안 될 중요한 도구로 자리잡았답니다. 👏
그럼 Airflow가 어떻게 작동하는지 좀 더 자세히 알아볼까요?
위의 그림에서 볼 수 있듯이, Airflow는 데이터 수집부터 결과 저장까지의 전체 과정을 관리해요. 마치 재능넷에서 여러분이 원하는 재능을 찾아 학습하고, 그 결과를 포트폴리오로 만드는 과정과 비슷하답니다. 😊
Airflow를 사용하면 다음과 같은 이점을 얻을 수 있어요:
- 자동화: 반복적인 작업을 자동으로 처리할 수 있어요.
- 스케줄링: 작업을 원하는 시간에 실행할 수 있어요.
- 의존성 관리: 작업 간의 순서와 의존 관계를 쉽게 정의할 수 있어요.
- 모니터링: 작업의 진행 상황을 실시간으로 확인할 수 있어요.
이제 Airflow가 무엇인지 대략적으로 이해하셨나요? 그렇다면 이제 Airflow의 핵심 개념들을 하나씩 살펴보도록 해요. 마치 재능넷에서 새로운 재능을 배우는 것처럼, 우리도 Airflow의 새로운 개념들을 하나씩 배워볼까요? 😃
2. Airflow의 핵심 개념 🔑
Airflow를 제대로 이해하기 위해서는 몇 가지 핵심 개념을 알아야 해요. 마치 재능넷에서 새로운 재능을 배우기 위해 기본 개념부터 시작하는 것처럼요. 자, 그럼 하나씩 살펴볼까요? 🧐
2.1 DAG (Directed Acyclic Graph) 📊
DAG는 Airflow의 가장 중요한 개념이에요. DAG는 '방향성 비순환 그래프'라는 뜻인데, 쉽게 말해 작업들의 실행 순서를 정의하는 청사진이라고 생각하면 돼요.
🌟 DAG의 특징:
- 작업(Task)들의 집합
- 작업 간의 의존성을 정의
- 순환(cycle)이 없음
- 파이썬 코드로 정의
DAG를 이해하기 위해 간단한 예를 들어볼게요. 여러분이 재능넷에서 새로운 재능을 배우는 과정을 생각해보세요:
- 재능 검색
- 재능 선택
- 수업 신청
- 수업 수강
- 과제 제출
- 피드백 받기
이 과정을 DAG로 표현하면 다음과 같아요:
이 DAG에서 각 원은 하나의 Task(작업)를 나타내고, 화살표는 작업 간의 의존성을 나타내요. 이렇게 DAG를 사용하면 복잡한 워크플로우를 쉽게 표현하고 관리할 수 있답니다. 😊
2.2 Operator 🎭
Operator는 DAG 안에서 실제로 실행되는 작업의 단위예요. 각 Operator는 특정 작업을 수행하도록 설계되어 있죠. Airflow에는 다양한 종류의 Operator가 있어요:
- BashOperator: Bash 명령어를 실행
- PythonOperator: Python 함수를 실행
- SQLOperator: SQL 쿼리를 실행
- EmailOperator: 이메일을 보냄
- SimpleHttpOperator: HTTP 요청을 보냄
- 그 외 다양한 커스텀 Operator
예를 들어, 재능넷에서 새로운 재능을 검색하는 작업을 PythonOperator로 구현한다면 다음과 같이 할 수 있어요:
from airflow.operators.python_operator import PythonOperator
def search_talent():
# 재능 검색 로직
print("재능을 검색합니다.")
search_talent_task = PythonOperator(
task_id='search_talent',
python_callable=search_talent,
dag=dag
)
이렇게 Operator를 사용하면 다양한 종류의 작업을 쉽게 정의하고 실행할 수 있어요. 마치 재능넷에서 다양한 재능을 쉽게 찾고 배울 수 있는 것처럼 말이죠! 😉
2.3 Task 📌
Task는 DAG 안에서 실행되는 개별 작업의 인스턴스를 말해요. 쉽게 말해, Operator를 사용해 정의한 작업이 실제로 DAG 안에서 실행될 때, 그것을 Task라고 부른답니다.
Task들은 서로 의존성을 가질 수 있어요. 예를 들어, '재능 검색' Task가 완료된 후에 '재능 선택' Task가 실행되어야 한다면, 이런 의존성을 다음과 같이 표현할 수 있어요:
search_talent_task >> select_talent_task
이 표현은 "search_talent_task가 select_talent_task보다 먼저 실행되어야 한다"는 의미예요. 이렇게 Task 간의 의존성을 정의하면, Airflow가 자동으로 올바른 순서로 Task를 실행해줍니다. 👍
2.4 Executor 🏃♂️
Executor는 Task를 어떻게 실행할지 결정하는 구성 요소예요. Airflow에는 여러 종류의 Executor가 있어요:
- SequentialExecutor: 기본 Executor, Task를 순차적으로 실행
- LocalExecutor: 로컬 머신에서 병렬로 Task 실행
- CeleryExecutor: 여러 워커 노드에서 분산 처리
- KubernetesExecutor: Kubernetes 클러스터에서 Task 실행
Executor를 선택할 때는 여러분의 워크로드와 인프라 환경을 고려해야 해요. 마치 재능넷에서 여러분의 학습 스타일에 맞는 강사를 선택하는 것처럼 말이죠! 🤓
2.5 XCom 💌
XCom(Cross-Communication)은 Task 간에 작은 양의 데이터를 주고받을 수 있게 해주는 기능이에요. 예를 들어, '재능 검색' Task에서 찾은 재능의 ID를 '재능 선택' Task로 전달하고 싶다면 XCom을 사용할 수 있어요.
def search_talent(**kwargs):
talent_id = 12345 # 예시 ID
kwargs['ti'].xcom_push(key='talent_id', value=talent_id)
def select_talent(**kwargs):
talent_id = kwargs['ti'].xcom_pull(key='talent_id', task_ids='search_talent')
print(f"선택된 재능 ID: {talent_id}")
search_task = PythonOperator(
task_id='search_talent',
python_callable=search_talent,
provide_context=True,
dag=dag
)
select_task = PythonOperator(
task_id='select_talent',
python_callable=select_talent,
provide_context=True,
dag=dag
)
search_task >> select_task
이렇게 XCom을 사용하면 Task 간에 필요한 정보를 쉽게 공유할 수 있어요. 마치 재능넷에서 강사와 학생이 정보를 주고받는 것처럼요! 😊
지금까지 Airflow의 핵심 개념들을 살펴봤어요. 이 개념들을 잘 이해하면 Airflow를 사용해 복잡한 데이터 파이프라인도 쉽게 구축할 수 있답니다. 다음 섹션에서는 이런 개념들을 바탕으로 실제 Airflow를 설치하고 사용하는 방법에 대해 알아볼 거예요. 준비되셨나요? 그럼 계속해서 Airflow의 세계로 더 깊이 들어가 볼까요? 🚀
3. Airflow 설치 및 설정 🛠️
자, 이제 Airflow의 기본 개념을 이해했으니 실제로 Airflow를 설치하고 설정하는 방법을 알아볼까요? 마치 재능넷에서 새로운 재능을 배우기 위해 필요한 도구를 준비하는 것처럼, 우리도 Airflow를 사용하기 위한 준비를 해볼 거예요. 😊
3.1 Airflow 설치하기 📥
Airflow를 설치하는 방법은 여러 가지가 있지만, 가장 간단한 방법은 pip를 사용하는 거예요. 먼저 가상 환경을 만들고 그 안에 Airflow를 설치하는 것이 좋아요.
# 가상 환경 생성
python -m venv airflow_env
# 가상 환경 활성화 (Windows)
airflow_env\Scripts\activate
# 가상 환경 활성화 (Mac/Linux)
source airflow_env/bin/activate
# Airflow 설치
pip install apache-airflow
주의: Airflow 설치 과정에서 많은 의존성 패키지들이 함께 설치되므로 시간이 좀 걸릴 수 있어요. 마치 재능넷에서 새로운 재능을 배우기 위해 여러 가지 준비물을 모으는 것처럼 말이죠! 😉
3.2 Airflow 초기화하기 🚀
Airflow를 설치했다면 이제 초기화를 해야 해요. 이 과정에서 Airflow에 필요한 기본 디렉토리와 설정 파일들이 생성됩니다.
# Airflow 홈 디렉토리 설정 (Windows)
set AIRFLOW_HOME=C:\Users\YourUsername\airflow
# Airflow 홈 디렉토리 설정 (Mac/Linux)
export AIRFLOW_HOME=~/airflow
# Airflow 데이터베이스 초기화
airflow db init
이 명령어를 실행하면 AIRFLOW_HOME 디렉토리에 Airflow 관련 파일들이 생성돼요. 주요 파일과 디렉토리는 다음과 같아요:
- airflow.cfg: Airflow 설정 파일
- airflow.db: 기본 SQLite 데이터베이스
- logs: 로그 파일 디렉토리
- dags: DAG 파일을 저장할 디렉토리
3.3 Airflow 웹 서버와 스케줄러 실행하기 🌐
Airflow를 사용하려면 웹 서버와 스케줄러를 실행해야 해요. 웹 서버는 Airflow의 UI를 제공하고, 스케줄러는 정의된 DAG를 실행하는 역할을 해요.
# 웹 서버 실행 (백그라운드에서 실행)
airflow webserver --port 8080 -D
# 스케줄러 실행 (백그라운드에서 실행)
airflow scheduler -D
이제 브라우저에서 http://localhost:8080으로 접속하면 Airflow UI를 볼 수 있어요. 마치 재능넷 웹사이트에 접속하는 것처럼 쉽죠? 😊
3.4 Airflow 설정 커스터마이징 🛠️
Airflow의 동작을 커스터마이징하고 싶다면 airflow.cfg 파일을 수정하면 돼요. 이 파일에서 다양한 설정을 변경할 수 있어요:
- executor: Task 실행 방식 설정
- sql_alchemy_conn: 데이터베이스 연결 설정
- dag_folder: DAG 파일 위치 설정
- load_examples: 예제 DAG 로드 여부 설정
예를 들어, 예제 DAG를 로드하지 않으려면 다음과 같이 설정을 변경할 수 있어요:
load_examples = False
주의: airflow.cfg 파일을 수정한 후에는 Airflow 서비스를 재시작해야 변경사항이 적용돼요.
3.5 Airflow CLI 사용하기 💻
Airflow는 강력한 명령줄 인터페이스(CLI)를 제공해요. CLI를 통해 다양한 Airflow 관련 작업을 수행할 수 있죠. 주요 명령어들을 살펴볼까요?
# DAG 목록 보기
airflow dags list
# 특정 DAG의 Task 목록 보기
airflow tasks list [dag_id]
# DAG 실행하기
airflow dags trigger [dag_id]
# DAG 일시 중지/재개하기
airflow dags pause [dag_id]
airflow dags unpause [dag_id]
# 특정 Task 상태 확인하기
airflow tasks state [dag_id] [task_id] [execution_date]
이런 CLI 명령어들을 사용하면 Airflow를 더욱 효과적으로 관리할 수 있어요. 마치 재능넷에서 다양한 기능을 사용해 학습을 관리하는 것처럼 말이죠! 😉
위 그림은 Airflow의 주요 구성 요소들을 보여줘요. 각 구성 요소들이 어떻게 상호작용하는지 이해하면 Airflow를 더 효과적으로 사용할 수 있답니다. 😊
이제 Airflow를 설치하고 기본적인 설정을 마쳤어요. 다음 섹션에서는 실제로 DAG를 작성하고 실행하는 방법에 대해 알아볼 거예요. 재능넷에서 새로운 재능을 배우기 시작하는 것처럼, 우리도 이제 Airflow로 데이터 파이프라인을 만들어볼 준비가 된 거예요! 🚀
4. 첫 번째 DAG 만들기 🎨
자, 이제 Airflow의 기본 설정을 마쳤으니 실제로 DAG를 만들어볼 차례예요. 마치 재능넷에서 첫 수업을 듣는 것처럼 설렘 가득한 마음으로 시작해볼까요? 😊
4.1 DAG 파일 생성하기 📝
먼저, AIRFLOW_HOME/dags 디렉토리에 새로운 Python 파일을 만들어요. 이 파일 이름을 my_first_dag.py라고 지어볼까요?
# AIRFLOW_HOME/dags/my_first_dag.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# DAG의 기본 인자 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email': ['your_email@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
'my_first_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# 태스크로 사용할 Python 함수들 정의
def task1_function():
print("This is Task 1")
def task2_function():
print("This is Task 2")
def task3_function():
print("This is Task 3")
# 태스크 생성
task1 = PythonOperator(
task_id='task1',
python_callable=task1_function,
dag=dag,
)
task2 = PythonOperator(
task_id='task2',
python_callable=task2_function,
dag=dag,
)
task3 = PythonOperator(
task_id='task3',
python_callable=task3_function,
dag=dag,
)
# 태스크 의존성 설정
task1 >> [task2, task3]
이 코드는 간단한 DAG를 정의해요. 세 개의 태스크가 있고, task1이 완료된 후 task2와 task3이 병렬로 실행되는 구조예요. 마치 재능넷에서 기초 과정을 배운 후 두 가지 심화 과정을 동시에 학습하는 것과 비슷하죠? 😉
4.2 DAG 구조 시각화하기 🖼️
방금 만든 DAG의 구조를 시각화하면 이렇게 보일 거예요:
4.3 DAG 실행하기 ▶️
이제 DAG를 만들었으니 실행해볼 차례예요. Airflow CLI를 사용해 DAG를 트리거할 수 있어요:
airflow dags trigger my_first_dag
이 명령어를 실행하면 Airflow가 my_first_dag를 실행하기 시작할 거예요. Airflow UI에서 DAG의 실행 상태를 확인할 수 있답니다.
4.4 DAG 모니터링하기 👀
Airflow UI (http://localhost:8080)에 접속하면 DAG의 실행 상태를 모니터링할 수 있어요. UI에서는 다음과 같은 정보를 확인할 수 있죠:
- DAG의 전체적인 실행 상태
- 각 태스크의 상태 (성공, 실패, 실행 중 등)
- 태스크 로그
- DAG의 그래프 뷰와 트리 뷰
이렇게 모니터링을 하면 마치 재능넷에서 자신의 학습 진도를 체크하는 것처럼, DAG의 진행 상황을 한눈에 파악할 수 있어요. 😊
4.5 DAG 개선하기 🔧
첫 번째 DAG를 만들고 실행해봤어요. 하지만 이게 끝이 아니에요! DAG를 계속해서 개선하고 발전시킬 수 있답니다. 예를 들어:
- 더 복잡한 태스크 의존성 추가하기
- 다양한 Operator 사용해보기 (예: BashOperator, SQLOperator 등)
- 조건부 실행 로직 추가하기
- 에러 처리와 재시도 로직 개선하기
이렇게 계속해서 DAG를 개선하다 보면, 복잡한 데이터 파이프라인도 쉽게 만들 수 있게 될 거예요. 마치 재능넷에서 꾸준히 학습하며 실력을 키우는 것처럼 말이죠! 🚀
지금까지 첫 번째 DAG를 만들고 실행해보는 과정을 살펴봤어요. 이제 Airflow의 기본적인 사용법을 익혔으니, 더 복잡하고 실용적인 데이터 파이프라인을 만들 준비가 된 거예요. 다음 섹션에서는 실제 데이터 처리 작업을 포함한 더 복잡한 DAG를 만들어볼 거예요. 준비되셨나요? 그럼 계속해서 Airflow의 세계를 탐험해볼까요? 😃
5. 실용적인 DAG 만들기: 데이터 처리 파이프라인 🔄
자, 이제 기본적인 DAG 작성법을 익혔으니 조금 더 실용적인 DAG를 만들어볼까요? 이번에는 실제 데이터를 처리하는 파이프라인을 만들어볼 거예요. 마치 재능넷에서 배운 기술을 실제 프로젝트에 적용하는 것처럼 말이죠! 😊
5.1 시나리오 설정 📊
다음과 같은 시나리오를 가정해볼게요:
🌟 시나리오: 매일 CSV 파일로 새로운 판매 데이터를 받아, 이를 처리하고 데이터베이스에 저장한 후, 간단한 분석 결과를 이메일로 보내는 파이프라인을 만들어야 합니다.
이 시나리오를 Airflow DAG로 구현해볼까요?
5.2 필요한 라이브러리 설치 📚
먼저, 필요한 라이브러리들을 설치해야 해요:
pip install pandas sqlalchemy psycopg2-binary
5.3 DAG 코드 작성하기 💻
이제 DAG 코드를 작성해볼게요. 이 코드를 sales_data_pipeline.py라는 이름으로 AIRFLOW_HOME/dags 디렉토리에 저장해주세요.
import pandas as pd
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.email_operator import EmailOperator
from sqlalchemy import create_engine
# DAG 기본 인자 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email': ['your_email@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG 정의
dag = DAG(
'sales_data_pipeline',
default_args=default_args,
description='A pipeline to process daily sales data',
schedule_interval=timedelta(days=1),
)
# 데이터 처리 함수
def process_sales_data():
# CSV 파일 읽기
df = pd.read_csv('/path/to/sales_data.csv')
# 데이터 처리 (예: 날짜 형식 변경, 결측치 처리 등)
df['date'] = pd.to_datetime(df['date'])
df['total_sales'] = df['quantity'] * df['price']
# 처리된 데이터를 데이터베이스에 저장
engine = create_engine('postgresql://username:password@localhost:5432/sales_db')
df.to_sql('daily_sales', engine, if_exists='replace', index=False)
# 분석 함수
def analyze_sales_data():
engine = create_engine('postgresql://username:password@localhost:5432/sales_db')
df = pd.read_sql('SELECT * FROM daily_sales', engine)
total_sales = df['total_sales'].sum()
avg_sales = df['total_sales'].mean()
with open('/tmp/sales_report.txt', 'w') as f:
f.write(f"Total Sales: {total_sales}\n")
f.write(f"Average Sales: {avg_sales}\n")
# 태스크 정의
t1 = PythonOperator(
task_id='process_sales_data',
python_callable=process_sales_data,
dag=dag,
)
t2 = PostgresOperator(
task_id='create_summary_table',
postgres_conn_id='postgres_default',
sql="""
CREATE TABLE IF NOT EXISTS sales_summary AS (
SELECT date, SUM(total_sales) as daily_total
FROM daily_sales
GROUP BY date
)
""",
dag=dag,
)
t3 = PythonOperator(
task_id='analyze_sales_data',
python_callable=analyze_sales_data,
dag=dag,
)
t4 = EmailOperator(
task_id='send_email',
to='manager@example.com',
subject='Daily Sales Report',
html_content='Please find attached the daily sales report.',
files=['/tmp/sales_report.txt'],
dag=dag,
)
# 태스크 의존성 설정
t1 >> t2 >> t3 >> t4
이 DAG는 다음과 같은 단계로 구성되어 있어요:
- CSV 파일에서 판매 데이터를 읽고 처리한 후 데이터베이스에 저장
- 데이터베이스에 요약 테이블 생성
- 판매 데이터 분석 수행
- 분석 결과를 이메일로 전송
5.4 DAG 구조 시각화 🖼️
이 DAG의 구조를 시각화하면 다음과 같아요:
5.5 주의사항 및 개선 포인트 🚧
이 DAG를 실제로 사용할 때는 몇 가지 주의해야 할 점이 있어요:
- 보안: 데이터베이스 연결 정보는 Airflow의 Connection 기능을 사용해 안전하게 관리해야 해요.
- 에러 처리: 각 태스크에 적절한 에러 처리 로직을 추가하면 좋아요.
- 확장성: 데이터 양이 많아지면 Pandas 대신 Spark 같은 분산 처리 도구를 고려해볼 수 있어요.
- 모니터링: 중요한 지표들을 로깅하고 모니터링하는 것이 좋아요.
이렇게 만든 DAG는 실제 비즈니스 환경에서 매우 유용하게 사용될 수 있어요. 마치 재능넷에서 배운 기술을 실제 프로젝트에 적용하는 것처럼, 여러분도 이런 DAG를 만들어 실제 데이터 처리 작업을 자동화할 수 있답니다! 🚀
지금까지 실용적인 데이터 처리 파이프라인 DAG를 만들어봤어요. 이런 식으로 Airflow를 활용하면 복잡한 데이터 워크플로우도 효율적으로 관리할 수 있어요. 다음 섹션에서는 Airflow의 고급 기능들과 베스트 프랙티스에 대해 알아볼 거예요. 계속해서 Airflow 마스터의 길로 나아가볼까요? 😊
6. Airflow 고급 기능 및 베스트 프랙티스 🏆
자, 이제 Airflow의 기본적인 사용법과 실용적인 DAG 작성법을 배웠어요. 하지만 Airflow의 진정한 힘은 그 고급 기능들에 있답니다. 마치 재능넷에서 고급 과정을 수강하는 것처럼, 우리도 Airflow의 고급 기능들을 살펴보고 베스트 프랙티스를 알아볼까요? 🚀
6.1 동적 DAG 생성 🔄
때로는 비슷한 구조의 DAG를 여러 개 만들어야 할 때가 있어요. 이럴 때 동적 DAG 생성 기능을 사용하면 편리해요.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def create_dag(dag_id, schedule, default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(dag_id))
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag=dag)
return dag
# 여러 DAG 생성
for i in range(1, 4):
dag_id = 'hello_world_{}'.format(i)
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule = '@daily'
globals()[dag_id] = create_dag(dag_id, schedule, default_args)
이 코드는 비슷한 구조의 DAG를 여러 개 생성해요. 마치 재능넷에서 비슷한 주제의 강의를 여러 개 만드는 것과 같죠? 😉
6.2 Branching 🌿
때로는 조건에 따라 다른 태스크를 실행해야 할 때가 있어요. 이럴 때 BranchPythonOperator를 사용할 수 있어요.
from airflow.operators.python_operator import BranchPythonOperator
def branch_func(**kwargs):
ti = kwargs['ti']
xcom_value = ti.xcom_pull(task_ids='get_data')
if xcom_value > 5:
return 'task_a'
else:
return 'task_b'
branch_op = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_func,
provide_context=True,
dag=dag)
task_a = DummyOperator(task_id='task_a', dag=dag)
task_b = DummyOperator(task_id='task_b', dag=dag)
branch_op >> [task_a, task_b]
이렇게 하면 조건에 따라 다른 태스크를 실행할 수 있어요. 마치 재능넷에서 학습자의 수준에 따라 다른 강의를 추천하는 것과 비슷하죠? 🌟
6.3 SubDAGs 🧩
복잡한 워크플로우를 관리하기 쉽게 만들기 위해 SubDAG를 사용할 수 있어요.
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
schedule_interval="@daily",
)
for i in range(5):
DummyOperator(
task_id=f'{child_dag_name}-task-{i + 1}',
default_args=args,
dag=dag_subdag,
)
return dag_subdag
subdag_1 = SubDagOperator(
task_id='subdag-1',
subdag=subdag('parent_dag', 'subdag-1', default_args),
default_args=default_args,
dag=dag,
)
SubDAG를 사용하면 복잡한 워크플로우를 모듈화할 수 있어요. 마치 재능넷에서 큰 주제를 작은 단원으로 나누는 것과 같죠! 📚
6.4 SLAs와 Callbacks ⏰
SLA(Service Level Agreement)를 설정하고 콜백 함수를 사용하면 DAG의 실행을 더 잘 제어할 수 있어요.
def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis):
print("SLA was missed on DAG {0}s by task id {1}s".format(dag.dag_id, task_list))
def failure_callback(context):
print("Task failed. Dag Id: {0}. Task Id: {1}".format(context['dag'].dag_id, context['task'].task_id))
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=2),
'on_failure_callback': failure_callback
}
dag = DAG(
'example_sla_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
sla_miss_callback=sla_callback,
)
이렇게 SLA와 콜백을 설정하면 DAG의 실행을 더 잘 모니터링하고 관리할 수 있어요. 마치 재능넷에서 학습 진도와 성과를 체크하는 것과 비슷하죠? 📊
6.5 베스트 프랙티스 🌟
마지막으로, Airflow를 사용할 때 알아두면 좋은 베스트 프랙티스들을 정리해볼게요:
- 멱등성 유지: DAG를 여러 번 실행해도 같은 결과가 나오도록 설계하세요.
- 태스크 세분화: 큰 태스크를 작은 단위로 나누면 재실행과 디버깅이 쉬워져요.
- 변수 사용: 하드코딩된 값 대신 Airflow Variables를 사용하세요.
- 코드 재사용: 공 통 로직은 Python 모듈로 분리하여 재사용성을 높이세요.
- 로깅 활용: 중요한 정보는 로그로 남겨 디버깅과 모니터링을 용이하게 하세요.
- 테스트 작성: DAG와 태스크에 대한 단위 테스트를 작성하여 안정성을 높이세요.
- 버전 관리: DAG 코드를 버전 관리 시스템(예: Git)으로 관리하세요.
이러한 베스트 프랙티스를 따르면 Airflow를 더욱 효과적으로 사용할 수 있어요. 마치 재능넷에서 학습 팁을 따라 더 효율적으로 공부하는 것과 같죠? 😊
이렇게 Airflow의 고급 기능들과 베스트 프랙티스를 알아봤어요. 이 기능들을 잘 활용하면 더욱 강력하고 유연한 데이터 파이프라인을 구축할 수 있답니다. 마치 재능넷에서 고급 과정을 수료하고 나면 더 복잡한 프로젝트를 수행할 수 있는 것처럼 말이에요! 🎓
Airflow는 정말 강력한 도구지만, 동시에 계속해서 발전하고 있는 기술이에요. 새로운 기능과 업데이트를 계속 학습하고 적용하는 것이 중요해요. 마치 재능넷에서 새로운 강의가 올라올 때마다 학습하는 것처럼 말이죠! 🚀
여러분, 이제 Airflow의 기본부터 고급 기능까지 전반적인 내용을 살펴봤어요. 이 지식을 바탕으로 여러분만의 강력한 데이터 파이프라인을 구축해보세요. 데이터 과학의 세계에서 여러분의 재능이 빛을 발할 거예요. 마치 재능넷에서 여러분의 재능이 빛나는 것처럼 말이죠! 😊
Airflow와 함께하는 여정이 즐거우셨기를 바랍니다. 앞으로도 계속해서 학습하고 성장하세요. 여러분의 데이터 과학 여정을 응원합니다! 🎉