본문 바로가기
서버구축 (WEB,DB)

Airflow Serverless 환경에서 Kubernetes 클러스터 이전 가이드

by 날으는물고기 2024. 10. 17.

Airflow Serverless 환경에서 Kubernetes 클러스터 이전 가이드

Elastic(autoscaling) Airflow Cluster on Kubernetes by Sarwesh Suman - ITNEXT

워크플로우 관리 플랫폼인 Apache Airflow를 사용하면서 서버리스 환경에서 Kubernetes(K8S)로 이전하려는 경우가 많습니다. 이전 과정을 이해할 수 있도록 설명하고, 사전에 검토해야 할 사항과 발생할 수 있는 문제들의 해결책을 정리합니다.

서버리스 환경의 장점

  • 자동 확장: 트래픽이나 작업량 증가에 따라 리소스가 자동으로 확장되어 성능을 유지합니다.
  • 관리 부담 감소: 인프라 관리에 드는 시간을 줄이고 애플리케이션 개발과 개선에 집중할 수 있습니다.
  • 비용 효율성: 사용한 만큼만 비용을 지불하므로, 리소스 낭비를 최소화하고 비용 관리가 용이합니다.

하지만 특정한 요구사항이나 성능 최적화를 위해서는 Kubernetes로 이전하여 더 세밀한 제어와 확장성을 확보해야 할 때가 있습니다.

사전 검토 사항 및 고려 사항

1. 워크로드 분석

현재 워크로드 파악

  • DAG(Task Group) 분석: 현재 Airflow에서 실행 중인 DAG들의 실행 빈도, 실행 시간, 리소스 사용량 등을 분석합니다.
  • 트래픽 패턴 식별: 특정 시간대에 작업이 집중되는지, 배치 작업이나 스트리밍 작업 등 어떤 유형의 작업이 많은지 파악합니다.
  • 리소스 집약적 작업 확인: 메모리나 CPU를 많이 사용하는 작업이 있는지 식별하여 적절한 리소스 할당 계획을 세웁니다.

워크로드 특성 이해

  • 주기적인 트래픽 피크: 일별, 주별, 월별로 트래픽이 증가하는 시점을 파악합니다.
  • 확장 계획 수립: 파악한 패턴을 기반으로 필요한 리소스의 규모와 확장 전략을 수립합니다.

2. 리소스 계획

초기 리소스 할당

  • CPU 및 메모리 설정: 초기 배포 시 안정적인 운영을 위해 충분한 CPU와 메모리를 할당합니다.
  • 리소스 요청 및 제한: Kubernetes의 리소스 요청(request)과 제한(limit)을 설정하여 파드가 안정적으로 동작하도록 합니다.

자동 확장 설정

  • Horizontal Pod Autoscaler(HPA): 워커와 스케줄러의 파드 수를 자동으로 조절하도록 설정합니다.
  • Cluster Autoscaler: 노드의 수를 자동으로 조절하여 클러스터의 리소스를 효율적으로 사용합니다.

Kubernetes 환경으로의 전환 단계

1. 초기 설정

Helm 설치 및 Airflow 배포

Helm은 Kubernetes용 패키지 매니저로, 복잡한 애플리케이션의 배포를 간소화합니다.

# Helm 저장소 추가
helm repo add apache-airflow https://airflow.apache.org

# 네임스페이스 생성
kubectl create namespace airflow

# Airflow 배포
helm install airflow apache-airflow/airflow --namespace airflow

구성 요소 설정

  • Metadata DB: PostgreSQL이나 MySQL 등을 사용하여 Airflow의 메타데이터를 저장합니다.
  • 웹 서버: Airflow의 UI를 제공하며, 워크플로우를 모니터링하고 관리합니다.
  • 스케줄러: DAG를 스케줄링하고 태스크를 워커에게 전달합니다.
  • 워커: 스케줄러로부터 받은 태스크를 실행합니다.

 

Helm 차트 설정

values.yaml 파일을 수정하여 환경에 맞게 설정합니다.

# 리소스 제한 설정 예시
scheduler:
  resources:
    requests:
      cpu: "500m"
      memory: "1Gi"
    limits:
      cpu: "1000m"
      memory: "2Gi"

worker:
  replicas: 2  # 워커 파드 수
  resources:
    requests:
      cpu: "500m"
      memory: "2Gi"
    limits:
      cpu: "1000m"
      memory: "4Gi"

Kubernetes 리소스 설정

  • 네임스페이스: airflow 네임스페이스를 생성하여 리소스를 격리합니다.
  • 서비스 계정 및 역할: 필요한 권한을 가진 서비스 계정과 역할(Role), 역할 바인딩(RoleBinding)을 설정합니다.
  • Persistent Volume: 로그나 DAG 파일을 저장할 영구 스토리지를 구성합니다.

2. 운영 중 발생할 수 있는 문제

CPU 과부하 문제

DAG를 추가하거나 변경할 때 스케줄러의 CPU 사용량이 급증하여 성능 저하나 장애가 발생할 수 있습니다. 특히 GitSync를 사용하여 DAG를 동기화하는 경우 많은 파일을 처리하면서 부하가 증가합니다.

  • min_file_process_interval 조정: DAG 파일의 구문 분석 주기를 늘려 CPU 부하를 줄입니다.
    scheduler:
      min_file_process_interval: 60  # 초 단위, 기본값은 30초
  • GitSync 최적화: 불필요한 파일 동기화를 줄이고, 필요한 파일만 동기화하도록 .gitignore를 활용합니다.
  • 스케줄러 리소스 증가: 스케줄러 파드에 더 많은 CPU 리소스를 할당합니다.

 

워커의 OOM(Out of Memory) 문제

워커 파드가 메모리를 많이 사용하여 OOM 오류가 발생하면 태스크 실행 실패나 파드 재시작 등의 문제가 발생합니다.

  • 리소스 요청 및 제한 설정: 워커 파드의 메모리 요청과 제한을 적절히 설정하여 안정적인 메모리 사용을 보장합니다.
    worker:
      resources:
        requests:
          memory: "2Gi"
        limits:
          memory: "4Gi"
  • LimitRange 및 ResourceQuota 적용: 네임스페이스 수준에서 리소스 사용을 관리합니다.
    apiVersion: v1
    kind: LimitRange
    metadata:
      name: memory-limits
      namespace: airflow
    spec:
      limits:
        - max:
            memory: 4Gi
          min:
            memory: 2Gi
          type: Container
  • Taints와 Tolerations 활용: 워커 파드를 리소스가 충분한 노드에 스케줄링하도록 설정합니다.
    tolerations:
      - key: "high-memory"
        operator: "Exists"
        effect: "NoSchedule"
  • 스케일 아웃: 워커 파드의 수를 늘려 부하를 분산합니다.
    worker:
      replicas: 4  # 워커 파드 수 증가

 

실행 로그 누락 문제

태스크 실행 실패 시 로그가 누락되어 문제 원인을 파악하기 어려울 수 있습니다.

  • 로그 스토리지 설정: 로그를 외부 스토리지(S3, GCS 등)에 저장하도록 설정합니다.
    logging:
      remote_logging: True
      remote_log_conn_id: MyS3Conn  # 연결 ID 설정
      remote_base_log_folder: s3://your-bucket/airflow-logs
  • 로그 레벨 및 회전 설정: 로그 레벨을 적절히 설정하고, 로그 파일이 너무 커지지 않도록 회전(rotation) 정책을 적용합니다.
    logging:
      level: INFO
      rotate_logs: True
      max_log_size: 50MB
      backup_count: 5
  • 실행기 설정 확인: CeleryExecutor를 사용하는 경우, Celery 워커와 브로커(RabbitMQ, Redis 등)의 상태를 확인하여 메시지 큐가 정상적으로 동작하는지 확인합니다.

사례를 통한 교훈

1. CPU 사용량 최적화

  • DAG 파일 처리 주기 조정: min_file_process_interval 값을 늘려 스케줄러의 DAG 구문 분석 빈도를 줄입니다.
  • DAG 파일 수 관리: 사용하지 않는 DAG 파일은 제거하거나 다른 디렉터리로 이동하여 스케줄러가 불필요한 파일을 처리하지 않도록 합니다.
  • 스케줄러 확장: 필요에 따라 스케줄러 파드의 수를 늘려 부하를 분산시킵니다.

2. OOM 문제 방지

  • 리소스 사용 모니터링: Kubernetes의 메트릭 서버나 Prometheus를 활용하여 파드의 리소스 사용량을 모니터링합니다.
  • 메모리 릭(leak) 확인: 태스크 코드에서 메모리 릭이 발생하지 않는지 검사하고 최적화합니다.
  • 리소스 제한 재평가: 실제 사용량에 따라 리소스 요청 및 제한 값을 조정합니다.

3. 로그 관리

  • 중앙화된 로그 관리: 모든 로그를 중앙 스토리지에 저장하여 접근성과 관리 편의성을 높입니다.
  • 로그 모니터링 도구 활용: ELK 스택(Elasticsearch, Logstash, Kibana)이나 Grafana 등을 사용하여 로그를 시각화하고 모니터링합니다.
  • 알림 설정: 로그에서 특정 에러나 이벤트 발생 시 알림을 받도록 설정하여 신속한 대응이 가능하도록 합니다.

 

Airflow를 서버리스 환경에서 Kubernetes로 이전하면 다음과 같은 이점이 있습니다.

  • 성능 향상: 세밀한 리소스 관리와 최적화를 통해 성능을 개선할 수 있습니다.
  • 비용 절감: 리소스를 효율적으로 사용하고 자동 확장 기능을 활용하여 비용을 절감할 수 있습니다.
  • 기술 역량 강화: Kubernetes를 활용함으로써 인프라 관리 역량을 향상시키고, DevOps 문화를 촉진할 수 있습니다.

 

하지만 성공적인 이전을 위해서는 다음과 같은 점을 유의해야 합니다.

  • 구성 요소 이해: Airflow와 Kubernetes의 구성 요소와 동작 방식을 잘 이해해야 합니다.
  • 리소스 사용량 모니터링: 지속적인 모니터링을 통해 문제를 사전에 발견하고 대응합니다.
  • 최적의 리소스 관리: 적절한 리소스 할당과 자동 확장 설정으로 안정적인 운영을 보장합니다.

 

이러한 단계를 따르면 안정적이고 확장 가능한 Airflow 배포를 달성할 수 있습니다. Kubernetes 환경에서의 Airflow 운영은 초기 설정과 지속적인 관리가 중요하며, 이를 통해 워크플로우 관리의 효율성과 안정성을 높일 수 있습니다.

예시로 배우는 워크플로우 관리

에어플로우(Apache Airflow)는 복잡한 워크플로우를 프로그래밍 방식으로 작성하고 스케줄링하며 모니터링할 수 있는 오픈 소스 플랫폼입니다. 에어플로우는 데이터 파이프라인을 작성하고 스케줄링하며 모니터링하는 데 도움을 주는 플랫폼입니다. 데이터 엔지니어링, 머신러닝 파이프라인, ETL(Extract, Transform, Load) 프로세스 등 복잡한 작업을 자동화하는 데 사용됩니다.

주요 개념 이해하기

1. 워크플로우(Workflow)

작업들의 순서와 의존성을 정의한 것입니다. 예를 들어: 데이터 수집 → 데이터 전처리 → 데이터 저장 → 데이터 시각화

 

2. DAG(Directed Acyclic Graph)

워크플로우를 표현하는 방향성이 있고 순환이 없는 그래프입니다. 에어플로우에서 DAG는 작업들의 실행 순서와 의존성을 정의합니다.

 

3. 태스크(Task)

DAG 내의 하나의 작업 단위입니다. 예를 들어, "데이터 수집하기"가 하나의 태스크입니다.

 

4. 오퍼레이터(Operator)

태스크를 생성하는 템플릿이나 도구입니다. 태스크에서 수행할 작업의 종류에 따라 다양한 오퍼레이터를 사용합니다.

 

5. 스케줄러(Scheduler)

DAG와 태스크를 지정된 일정에 따라 실행시켜주는 역할을 합니다.

 

6. 워커(Worker)

스케줄러에 의해 할당된 태스크를 실제로 실행하는 프로세스입니다.

 

7. 웹서버(Web Server)

에어플로우의 웹 UI를 제공하여 DAG와 태스크의 상태를 모니터링하고 관리할 수 있습니다.

 

간단한 DAG 작성하기

1. DAG 디렉토리 확인

에어플로우는 기본적으로 ~/airflow/dags/ 디렉토리에서 DAG 파일을 읽어옵니다.

 

2. DAG 파일 생성

~/airflow/dags/ 디렉토리에 simple_dag.py 파일을 생성합니다.

# simple_dag.py

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
    'start_date': datetime(2023, 1, 1),
}

with DAG(
    dag_id='simple_dag',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False,
) as dag:

    task_1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    task_2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
    )

    task_3 = BashOperator(
        task_id='print_message',
        bash_command='echo "에어플로우 튜토리얼"',
    )

    task_1 >> task_2 >> task_3

3. DAG 설명

  • default_args: DAG의 기본 설정입니다.
  • dag_id: DAG의 고유 식별자입니다.
  • schedule_interval: DAG의 실행 주기입니다.
  • BashOperator: bash 명령어를 실행하는 오퍼레이터입니다.
  • 태스크 의존성: task_1 >> task_2 >> task_3은 태스크의 실행 순서를 정의합니다.

 

4. DAG 활성화 및 실행

웹 UI에서 simple_dag를 찾아 ON으로 활성화하고, Trigger DAG로 실행합니다.

오퍼레이터와 태스크 자세히 알아보기

1. 주요 오퍼레이터 종류

  • BashOperator: bash 명령어 실행
  • PythonOperator: 파이썬 함수 실행
  • EmailOperator: 이메일 전송
  • HttpOperator: HTTP 요청 보내기
  • SqliteOperator: SQLite 데이터베이스 쿼리 실행

 

2. PythonOperator 예제

def greet():
    print("안녕하세요, 에어플로우!")

greet_task = PythonOperator(
    task_id='greet',
    python_callable=greet,
)

에어플로우 웹 UI 사용법

  • DAGs 페이지: 모든 DAG 목록과 상태를 볼 수 있습니다.
  • Graph 뷰: DAG의 태스크 의존성을 그래프로 확인할 수 있습니다.
  • Tree 뷰: DAG의 실행 기록을 트리 형태로 볼 수 있습니다.
  • 태스크 세부 정보: 각 태스크의 로그, 상태, 실행 시간 등을 확인할 수 있습니다.

날씨 데이터 파이프라인 구축 예제

오픈웨더맵(OpenWeatherMap) API를 사용하여 날씨 데이터를 수집하고 저장하는 파이프라인을 구축합니다.

 

1. 사전 준비

  • 오픈웨더맵에서 API 키 발급
  • SQLite 데이터베이스 사용을 위한 커넥션 설정

 

2. DAG 파일 생성

~/airflow/dags/weather_data_pipeline.py

 

3. 태스크 구성

  • creating_table: SqliteOperator를 사용하여 데이터 저장 테이블 생성
  • is_api_available: HttpSensor를 사용하여 API의 응답 가능 여부 확인
  • fetch_weather_data: SimpleHttpOperator로 데이터 수집
  • process_data: PythonOperator로 데이터 전처리
  • store_data: PythonOperator로 데이터 저장
  • notify_completion: PythonOperator로 완료 메시지 출력

 

4. 코드 예제

# weather_data_pipeline.py

from datetime import datetime
from airflow import DAG
import json
import sqlite3

# 오퍼레이터 임포트
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator

# 기본 설정
default_args = {
    'start_date': datetime(2023, 1, 1),
}

API_KEY = 'YOUR_OPENWEATHERMAP_API_KEY'
CITY = 'Seoul'

def process_weather_data(ti):
    data = ti.xcom_pull(task_ids='fetch_weather_data')
    weather = {
        'city': data['name'],
        'temperature': data['main']['temp'],
        'description': data['weather'][0]['description'],
        'datetime': datetime.fromtimestamp(data['dt'])
    }
    ti.xcom_push(key='processed_data', value=weather)

def store_weather_data(ti):
    weather = ti.xcom_pull(task_ids='process_data', key='processed_data')
    conn = sqlite3.connect('/home/your_username/airflow/airflow.db')
    cursor = conn.cursor()
    insert_query = """
    INSERT INTO weather_data (city, temperature, description, datetime) VALUES (?, ?, ?, ?)
    """
    cursor.execute(insert_query, (weather['city'], weather['temperature'], weather['description'], weather['datetime']))
    conn.commit()
    conn.close()

with DAG(
    dag_id='weather_data_pipeline',
    default_args=default_args,
    schedule_interval='@hourly',
    catchup=False,
) as dag:

    creating_table = SqliteOperator(
        task_id='creating_table',
        sqlite_conn_id='sqlite_default',
        sql='''
            CREATE TABLE IF NOT EXISTS weather_data (
                city TEXT,
                temperature REAL,
                description TEXT,
                datetime TEXT
            );
        ''',
    )

    is_api_available = HttpSensor(
        task_id='is_api_available',
        http_conn_id='openweathermap_api',
        endpoint='data/2.5/weather',
        request_params={'q': CITY, 'appid': API_KEY},
        response_check=lambda response: "main" in response.text,
        poke_interval=5,
        timeout=20,
    )

    fetch_weather_data = SimpleHttpOperator(
        task_id='fetch_weather_data',
        http_conn_id='openweathermap_api',
        endpoint='data/2.5/weather',
        method='GET',
        data={'q': CITY, 'appid': API_KEY},
        response_filter=lambda response: json.loads(response.text),
        log_response=True,
    )

    process_data = PythonOperator(
        task_id='process_data',
        python_callable=process_weather_data,
    )

    store_data = PythonOperator(
        task_id='store_data',
        python_callable=store_weather_data,
    )

    notify_completion = PythonOperator(
        task_id='notify_completion',
        python_callable=lambda: print("날씨 데이터 파이프라인 완료"),
    )

    # 태스크 의존성 설정
    creating_table >> is_api_available >> fetch_weather_data >> process_data >> store_data >> notify_completion

5. 커넥션 설정

웹 UI의 Admin > Connections에서 필요한 커넥션을 설정합니다.

  • sqlite_default: SQLite 데이터베이스 연결
  • openweathermap_api: 오픈웨더맵 API 연결

openweathermap_api 설정

  • Conn ID: openweathermap_api
  • Conn Type: HTTP
  • Host: http://api.openweathermap.org/

 

6. DAG 활성화 및 실행

웹 UI에서 weather_data_pipeline을 활성화하고 실행합니다.


에러 해결 및 디버깅

  • 로그 확인: 태스크 실패 시 로그를 확인하여 원인 파악
  • 에러 메시지 분석: 에러 메시지를 읽고 코드를 수정
  • XCom 사용법 숙지: 태스크 간 데이터 전달 시 XCom을 올바르게 사용

 

예를 들어, store_weather_data 태스크에서 데이터베이스 경로가 잘못되어 에러가 발생할 수 있습니다. 이 경우 로그를 확인하고 올바른 경로로 수정합니다.


에어플로우를 사용하면 복잡한 작업도 효율적으로 자동화할 수 있으며, 확장성과 유연성이 뛰어납니다.

728x90

댓글