본문 바로가기
서버구축 (WEB,DB)

Apache Airflow로 API·DB 작업 자동화 Connection, Hook, Operator

by 날으는물고기 2026. 2. 2.

Apache Airflow로 API·DB 작업 자동화 Connection, Hook, Operator

728x90

Airflow를 처음 배우면 거의 항상 나오는 질문이 있습니다.

“Airflow로 API 호출이나 데이터베이스 작업도 자동화할 수 있나요?”

정답은 물론 가능합니다.

300x250

그리고 Airflow는 “그걸 잘하기 위해 만들어진 도구”라고 보는 편이 더 정확합니다.

다만, 외부 시스템(API/DB 등)과 연동할 때는 반드시 알아야 할 핵심 개념이 있습니다.

  • Connection
  • Hook
  • Operator

Airflow에서 외부 시스템 연동은 어떻게 이루어질까?

Airflow는 DAG(워크플로)를 실행하면서, 외부 시스템과 통신하는 작업을 “Task”로 처리합니다.
이 Task는 보통 Operator로 구현되고, 실제 접속은 Hook이 담당하며, 접속 정보는 Connection에 저장됩니다.

전체 흐름
  1. DAG에서 Task(Operator) 실행
  2. Operator가 Hook을 호출
  3. Hook이 Connection을 읽고 접속
  4. 외부 시스템(API/DB)과 통신
  5. 결과를 다음 Task로 넘김(XCom 등)

즉, 구조는 이렇게 정리됩니다.

DAG → Operator → Hook → Connection → 외부 시스템

Connection이란 무엇인가?

Connection은 “외부 시스템 접속정보”를 Airflow에 등록하는 방식입니다.

Airflow는 보안/운영 측면에서 DAG 코드에 접속 정보를 쓰지 않도록 설계되어 있습니다.

예를 들어, 아래처럼 DAG 코드에 API Key나 DB 비밀번호를 넣는 방식은 피해야 합니다.

(안 좋은 예)
requests.get("https://api.example.com", headers={"Authorization":"Bearer xxx"})

대신 Airflow는 Connection이라는 표준 저장소에 정보를 넣고, DAG에서는 “연결 이름(conn_id)”만 참조합니다.

(좋은 예)
http_conn_id="my_api_conn"

Connection은 어디에 저장되고 어떻게 관리될까?

Airflow의 Connection은 기본적으로 Airflow 메타데이터 DB에 저장됩니다.

  • UI에서 생성 가능 (Admin → Connections)
  • 환경변수 / CLI로도 설정 가능
  • 운영 환경에서는 Secret Backend(Vault/Secrets Manager 등)로 관리 권장

Connection 항목 구성(필드) 제대로 이해하기

Connection에는 보통 다음 필드가 있습니다.

  • Conn Type (HTTP, MySQL, Postgres 등)
  • Host
  • Schema (DB명 같은 것)
  • Login
  • Password
  • Port
  • Extra (JSON 형태 옵션)

예시

HTTP Connection 예시
  • Conn Id: my_api_conn
  • Conn Type: HTTP
  • Host: https://api.example.com
  • Extra: {"headers": {"Authorization":"Bearer ..."}} (권장 방식은 Secret Backend)
MySQL Connection 예시
  • Conn Id: mysql_conn
  • Conn Type: MySQL
  • Host: mysql (Docker Compose 서비스명)
  • Schema: appdb
  • Login/Password: 계정 정보
  • Port: 3306

Hook이란? 왜 DAG에서 직접 연결하지 않을까?

Hook은 “연결 + 공통 처리를 표준화한 객체”입니다.

Hook이 해주는 일
  • Connection 읽기(conn_id 기반)
  • 인증/세션 생성
  • 재시도/에러 처리 패턴 제공
  • 공통 메서드 제공(예: run, get_records, get_conn)

즉, DAG에 연결/인증/예외처리 로직을 반복해서 쓰지 않게 해줍니다.

Operator란? Hook과의 차이

  • Operator: “무엇을 할지” (Task 정의)
  • Hook: “어떻게 연결해서 수행할지” (접속/세션/실행)

Operator는 Hook을 이용해서 작업을 수행합니다.

실습 1: HttpOperator로 API 호출하기

Airflow에서 API 호출은 보통 다음을 사용합니다.

  • HttpOperator
  • 또는 Python 기반 PythonOperator + HttpHook

여기서는 HttpOperator 중심으로 실습합니다.

Connection 생성 (HTTP)

Airflow UI → Admin → Connections

  • Conn Id: my_api_conn
  • Conn Type: HTTP
  • Host: https://jsonplaceholder.typicode.com

6-2) DAG 예시 (GET 호출)

from airflow import DAG
from airflow.providers.http.operators.http import HttpOperator
from datetime import datetime

with DAG(
    dag_id="http_api_example",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    call_api = HttpOperator(
        task_id="call_api",
        http_conn_id="my_api_conn",
        endpoint="/todos/1",
        method="GET",
        log_response=True,
    )
포인트
  • Host는 Connection에 저장
  • DAG에서는 endpoint만 지정
  • log_response=True로 응답 확인

응답(JSON) 파싱까지 하고 싶다면?

call_api = HttpOperator(
    task_id="call_api",
    http_conn_id="my_api_conn",
    endpoint="/todos/1",
    method="GET",
    response_filter=lambda r: r.json(),
)

이렇게 하면 결과가 XCom으로 넘어갈 수 있어 다음 Task에서 활용이 가능합니다.

실습 2: Docker 환경에서 MySQL 연동하기

Docker에서 Airflow + MySQL을 붙일 때 가장 많이 실수하는 부분이 있습니다.

localhost를 쓰면 안 됩니다. 컨테이너 안에서 localhost는 “그 컨테이너 자신”입니다.

대신 Docker Compose의 서비스명을 Host로 써야 합니다.

docker-compose 예시 (MySQL 포함)

services:
  mysql:
    image: mysql:8
    environment:
      MYSQL_ROOT_PASSWORD: rootpw
      MYSQL_DATABASE: appdb
      MYSQL_USER: appuser
      MYSQL_PASSWORD: apppw
    ports:
      - "3306:3306"

Airflow 컨테이너가 같은 네트워크라면 Host는 mysql이 됩니다.

MySQL Connection 생성

  • Conn Id: mysql_conn
  • Conn Type: MySQL
  • Host: mysql
  • Schema: appdb
  • Login: appuser
  • Password: apppw
  • Port: 3306

실습 3: SQLExecuteQueryOperator로 DB 조작하기

Airflow에서 SQL 실행은 다음을 많이 씁니다.

  • SQLExecuteQueryOperator (Provider 공통)
  • 또는 MySqlOperator (구버전/특정 provider)

테이블 생성 + 데이터 입력 DAG

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

with DAG(
    dag_id="mysql_sql_example",
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:

    create_table = SQLExecuteQueryOperator(
        task_id="create_table",
        conn_id="mysql_conn",
        sql="""
        CREATE TABLE IF NOT EXISTS test_table (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(50),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """,
    )

    insert_row = SQLExecuteQueryOperator(
        task_id="insert_row",
        conn_id="mysql_conn",
        sql="INSERT INTO test_table (name) VALUES ('airflow');",
    )

    create_table >> insert_row

SELECT 결과를 다음 Task로 넘기기

select_rows = SQLExecuteQueryOperator(
    task_id="select_rows",
    conn_id="mysql_conn",
    sql="SELECT * FROM test_table ORDER BY id DESC LIMIT 5;",
    do_xcom_push=True,
)

do_xcom_push=True를 켜면 결과를 XCom으로 넘길 수 있습니다.

꼭 점검해야 할 체크포인트

Airflow는 “자동화”가 강력한 만큼, 보안 관점에서는 반드시 기준을 잡아야 합니다.

DAG 코드에 비밀정보 금지

  • API Key / Token
  • DB Password
  • Private endpoint URL(민감)
  • 인증서/키 파일
해결책
  • Connection 사용
  • 가능하면 Secret Backend(Vault, AWS SM 등)

Connection 권한 통제

Airflow UI에서 Connection 수정이 가능하므로,

  • Admin만 수정 가능
  • 사용자/팀별 Role 분리
  • 접근 가능한 DAG/Connection 범위 정의

네트워크 통제 (특히 API 호출)

Airflow worker가 외부 API를 호출한다는 것은

  • 내부망에서 외부로 나가는 Egress 통로가 생긴다는 뜻입니다.
점검 포인트
  • 호출 대상 도메인 allowlist
  • 프록시/게이트웨이 경유
  • 요청 로그 기록(감사)
  • 민감 데이터 포함 여부 점검

DB 계정 권한 최소화

Airflow에서 DB 작업할 때 “root 계정” 쓰는 경우가 흔하지만 매우 위험합니다.

권장
  • 읽기/쓰기 분리 계정
  • 최소 권한(필요 테이블만)
  • 운영/개발 환경 분리

로그에 민감정보 남지 않게

Airflow는 실행 로그를 아주 많이 남깁니다.

점검 포인트
  • log_response=True 사용 시 응답에 토큰/개인정보 포함 여부
  • SQL 로그에 개인정보가 그대로 찍히는지
  • Task 실패 시 traceback에 비밀정보가 포함되는지

실무 활용 시나리오 예시

패턴 A: API → DB 적재 ETL

  1. API 호출로 데이터 수집
  2. 응답 파싱
  3. DB Insert
  4. 완료 알림(Slack/Webhook)

패턴 B: DB 작업 자동화(정기 정리/아카이빙)

  1. 오래된 데이터 삭제
  2. 통계 테이블 갱신
  3. 결과 리포트 생성

패턴 C: 보안팀 자동화에도 딱 맞음

  • 위협 인텔리전스 API 수집 → SIEM 적재
  • 자산/계정 상태 수집 → 일일 점검
  • 취약점 스캐너 API 호출 → 결과 저장/요약

“Connection을 이해하면 Airflow가 쉬워진다”

Airflow 초반이 어려운 이유는 DAG 문법 때문이 아니라, 외부 시스템 연동 구조(Connection/Hook/Operator) 를 모르는 상태에서 “코드만 따라하기” 때문인 경우가 많습니다.

728x90
그리드형(광고전용)

댓글