리스트나 딕셔너리를 task 에 분배해서 병렬로 실행하게 만드는 기능
- decorator방식 task를 구성하면 훨씬 쉽게 구현가능함
- 리스트, 딕셔너리 가능
예시)
task1 에서 파라미터로 받은 파일 path 리스트를 다음 task에서 한 요소씩 분배해서 task에 할당
from airflow import DAG
from airflow.decorators import task
from airflow.models import Param
from datetime import datetime
# DAG 파라미터 정의
DEFAULT_DAG_PARAMS = {
"bucket_name": Param(
"default-bucket",
type="string",
title="S3 Bucket Name",
description="S3 bucket name"
),
"file_path": Param(
["aaa.zip", "bbb.zip"],
type="array",
title="File Paths",
description="List of zip file paths"
)
}
with DAG(
dag_id="zip_processing_dag",
start_date=datetime(2024, 1, 1),
schedule_interval=None,
params=DEFAULT_DAG_PARAMS,
catchup=False
) as dag:
@task
def task1(**context):
"""파라미터를 받아서 file_path 리스트만 반환"""
params = context.get("params", {})
bucket_name = params.get("bucket_name")
file_paths = params.get("file_path", [])
# 로깅
print(f"Bucket: {bucket_name}")
print(f"File paths: {file_paths}")
# file_path 리스트만 반환 (MAP용)
return file_paths
@task
def task2(file_path, **context):
"""각 파일별로 실행되는 task2"""
# 원본 파라미터도 필요하면 가져올 수 있음
params = context.get("params", {})
bucket_name = params.get("bucket_name")
print(f"Processing file: {file_path} from bucket: {bucket_name}")
# 파일 처리 로직
result = f"processed_{file_path}"
return {"file": file_path, "result": result}
@task
def task3(task2_result, **context):
"""task2 결과를 받아서 후속 처리"""
print(f"Task3 processing: {task2_result}")
# 최종 처리 로직
return f"final_{task2_result['result']}"
# Task 의존성 설정
file_paths = task1()
# MAP으로 task2, task3 연결
task2_results = task2.expand(file_path=file_paths)
task3_results = task3.expand(task2_result=task2_results)
핵심 포인트:
1. .expand(): MAP 방식으로 task 실행
2. task1(): file_path 리스트만 반환
3. task2.expand(file_path=file_paths): 각 파일별로 task2 실행
4. task3.expand(task2_result=task2_results): task2 결과별로 task3 실행
실행 흐름:
task1 → [aaa.zip, bbb.zip] 반환
↓
task2 → aaa.zip 처리 | bbb.zip 처리 (병렬)
↓
task3 → aaa 결과 처리 | bbb 결과 처리 (병렬)
댓글