본문 바로가기
카테고리 없음

[Airflow] Dynamic task mapping

by LO-OmJ 2025. 9. 15.

리스트나 딕셔너리를 task 에 분배해서 병렬로 실행하게 만드는 기능

 

  • decorator방식 task를 구성하면 훨씬 쉽게 구현가능함
  • 리스트, 딕셔너리 가능

예시)

task1 에서 파라미터로 받은 파일 path 리스트를 다음 task에서 한 요소씩 분배해서 task에 할당

  from airflow import DAG
  from airflow.decorators import task
  from airflow.models import Param
  from datetime import datetime

  # DAG 파라미터 정의
  DEFAULT_DAG_PARAMS = {
      "bucket_name": Param(
          "default-bucket",
          type="string",
          title="S3 Bucket Name",
          description="S3 bucket name"
      ),
      "file_path": Param(
          ["aaa.zip", "bbb.zip"],
          type="array",
          title="File Paths",
          description="List of zip file paths"
      )
  }

  with DAG(
      dag_id="zip_processing_dag",
      start_date=datetime(2024, 1, 1),
      schedule_interval=None,
      params=DEFAULT_DAG_PARAMS,
      catchup=False
  ) as dag:

      @task
      def task1(**context):
          """파라미터를 받아서 file_path 리스트만 반환"""
          params = context.get("params", {})
          bucket_name = params.get("bucket_name")
          file_paths = params.get("file_path", [])

          # 로깅
          print(f"Bucket: {bucket_name}")
          print(f"File paths: {file_paths}")

          # file_path 리스트만 반환 (MAP용)
          return file_paths

      @task
      def task2(file_path, **context):
          """각 파일별로 실행되는 task2"""
          # 원본 파라미터도 필요하면 가져올 수 있음
          params = context.get("params", {})
          bucket_name = params.get("bucket_name")

          print(f"Processing file: {file_path} from bucket: {bucket_name}")

          # 파일 처리 로직
          result = f"processed_{file_path}"
          return {"file": file_path, "result": result}

      @task  
      def task3(task2_result, **context):
          """task2 결과를 받아서 후속 처리"""
          print(f"Task3 processing: {task2_result}")

          # 최종 처리 로직
          return f"final_{task2_result['result']}"

      # Task 의존성 설정
      file_paths = task1()

      # MAP으로 task2, task3 연결
      task2_results = task2.expand(file_path=file_paths)
      task3_results = task3.expand(task2_result=task2_results)

  핵심 포인트:

  1. .expand(): MAP 방식으로 task 실행
  2. task1(): file_path 리스트만 반환
  3. task2.expand(file_path=file_paths): 각 파일별로 task2 실행
  4. task3.expand(task2_result=task2_results): task2 결과별로 task3 실행

  실행 흐름:

  task1 → [aaa.zip, bbb.zip] 반환
           ↓
  task2 → aaa.zip 처리 | bbb.zip 처리 (병렬)
           ↓
  task3 → aaa 결과 처리 | bbb 결과 처리 (병렬)

 

댓글