
Airflow를 처음 배우면 거의 항상 나오는 질문이 있습니다.
“Airflow로 API 호출이나 데이터베이스 작업도 자동화할 수 있나요?”
정답은 물론 가능합니다.
그리고 Airflow는 “그걸 잘하기 위해 만들어진 도구”라고 보는 편이 더 정확합니다.
다만, 외부 시스템(API/DB 등)과 연동할 때는 반드시 알아야 할 핵심 개념이 있습니다.
- Connection
- Hook
- Operator
Airflow에서 외부 시스템 연동은 어떻게 이루어질까?
Airflow는 DAG(워크플로)를 실행하면서, 외부 시스템과 통신하는 작업을 “Task”로 처리합니다.
이 Task는 보통 Operator로 구현되고, 실제 접속은 Hook이 담당하며, 접속 정보는 Connection에 저장됩니다.
전체 흐름
- DAG에서 Task(Operator) 실행
- Operator가 Hook을 호출
- Hook이 Connection을 읽고 접속
- 외부 시스템(API/DB)과 통신
- 결과를 다음 Task로 넘김(XCom 등)
즉, 구조는 이렇게 정리됩니다.
DAG → Operator → Hook → Connection → 외부 시스템
Connection이란 무엇인가?
Connection은 “외부 시스템 접속정보”를 Airflow에 등록하는 방식입니다.
Airflow는 보안/운영 측면에서 DAG 코드에 접속 정보를 쓰지 않도록 설계되어 있습니다.
예를 들어, 아래처럼 DAG 코드에 API Key나 DB 비밀번호를 넣는 방식은 피해야 합니다.
(안 좋은 예)
requests.get("https://api.example.com", headers={"Authorization":"Bearer xxx"})
대신 Airflow는 Connection이라는 표준 저장소에 정보를 넣고, DAG에서는 “연결 이름(conn_id)”만 참조합니다.
(좋은 예)
http_conn_id="my_api_conn"
Connection은 어디에 저장되고 어떻게 관리될까?
Airflow의 Connection은 기본적으로 Airflow 메타데이터 DB에 저장됩니다.
- UI에서 생성 가능 (Admin → Connections)
- 환경변수 / CLI로도 설정 가능
- 운영 환경에서는 Secret Backend(Vault/Secrets Manager 등)로 관리 권장
Connection 항목 구성(필드) 제대로 이해하기
Connection에는 보통 다음 필드가 있습니다.
- Conn Type (HTTP, MySQL, Postgres 등)
- Host
- Schema (DB명 같은 것)
- Login
- Password
- Port
- Extra (JSON 형태 옵션)
예시
HTTP Connection 예시
- Conn Id:
my_api_conn - Conn Type:
HTTP - Host:
https://api.example.com - Extra:
{"headers": {"Authorization":"Bearer ..."}}(권장 방식은 Secret Backend)
MySQL Connection 예시
- Conn Id:
mysql_conn - Conn Type:
MySQL - Host:
mysql(Docker Compose 서비스명) - Schema:
appdb - Login/Password: 계정 정보
- Port:
3306
Hook이란? 왜 DAG에서 직접 연결하지 않을까?
Hook은 “연결 + 공통 처리를 표준화한 객체”입니다.
Hook이 해주는 일
- Connection 읽기(conn_id 기반)
- 인증/세션 생성
- 재시도/에러 처리 패턴 제공
- 공통 메서드 제공(예: run, get_records, get_conn)
즉, DAG에 연결/인증/예외처리 로직을 반복해서 쓰지 않게 해줍니다.
Operator란? Hook과의 차이
- Operator: “무엇을 할지” (Task 정의)
- Hook: “어떻게 연결해서 수행할지” (접속/세션/실행)
Operator는 Hook을 이용해서 작업을 수행합니다.
실습 1: HttpOperator로 API 호출하기
Airflow에서 API 호출은 보통 다음을 사용합니다.
HttpOperator- 또는 Python 기반
PythonOperator+HttpHook
여기서는 HttpOperator 중심으로 실습합니다.
Connection 생성 (HTTP)
Airflow UI → Admin → Connections
- Conn Id:
my_api_conn - Conn Type:
HTTP - Host:
https://jsonplaceholder.typicode.com
6-2) DAG 예시 (GET 호출)
from airflow import DAG
from airflow.providers.http.operators.http import HttpOperator
from datetime import datetime
with DAG(
dag_id="http_api_example",
start_date=datetime(2025, 1, 1),
schedule="@daily",
catchup=False,
) as dag:
call_api = HttpOperator(
task_id="call_api",
http_conn_id="my_api_conn",
endpoint="/todos/1",
method="GET",
log_response=True,
)
포인트
Host는 Connection에 저장- DAG에서는
endpoint만 지정 log_response=True로 응답 확인
응답(JSON) 파싱까지 하고 싶다면?
call_api = HttpOperator(
task_id="call_api",
http_conn_id="my_api_conn",
endpoint="/todos/1",
method="GET",
response_filter=lambda r: r.json(),
)
이렇게 하면 결과가 XCom으로 넘어갈 수 있어 다음 Task에서 활용이 가능합니다.
실습 2: Docker 환경에서 MySQL 연동하기
Docker에서 Airflow + MySQL을 붙일 때 가장 많이 실수하는 부분이 있습니다.
localhost를 쓰면 안 됩니다. 컨테이너 안에서 localhost는 “그 컨테이너 자신”입니다.
대신 Docker Compose의 서비스명을 Host로 써야 합니다.
docker-compose 예시 (MySQL 포함)
services:
mysql:
image: mysql:8
environment:
MYSQL_ROOT_PASSWORD: rootpw
MYSQL_DATABASE: appdb
MYSQL_USER: appuser
MYSQL_PASSWORD: apppw
ports:
- "3306:3306"
Airflow 컨테이너가 같은 네트워크라면 Host는 mysql이 됩니다.
MySQL Connection 생성
- Conn Id:
mysql_conn - Conn Type:
MySQL - Host:
mysql - Schema:
appdb - Login:
appuser - Password:
apppw - Port:
3306
실습 3: SQLExecuteQueryOperator로 DB 조작하기
Airflow에서 SQL 실행은 다음을 많이 씁니다.
SQLExecuteQueryOperator(Provider 공통)- 또는
MySqlOperator(구버전/특정 provider)
테이블 생성 + 데이터 입력 DAG
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
with DAG(
dag_id="mysql_sql_example",
start_date=datetime(2025, 1, 1),
schedule="@daily",
catchup=False,
) as dag:
create_table = SQLExecuteQueryOperator(
task_id="create_table",
conn_id="mysql_conn",
sql="""
CREATE TABLE IF NOT EXISTS test_table (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
)
insert_row = SQLExecuteQueryOperator(
task_id="insert_row",
conn_id="mysql_conn",
sql="INSERT INTO test_table (name) VALUES ('airflow');",
)
create_table >> insert_row
SELECT 결과를 다음 Task로 넘기기
select_rows = SQLExecuteQueryOperator(
task_id="select_rows",
conn_id="mysql_conn",
sql="SELECT * FROM test_table ORDER BY id DESC LIMIT 5;",
do_xcom_push=True,
)
do_xcom_push=True를 켜면 결과를 XCom으로 넘길 수 있습니다.
꼭 점검해야 할 체크포인트
Airflow는 “자동화”가 강력한 만큼, 보안 관점에서는 반드시 기준을 잡아야 합니다.
DAG 코드에 비밀정보 금지
- API Key / Token
- DB Password
- Private endpoint URL(민감)
- 인증서/키 파일
해결책
- Connection 사용
- 가능하면 Secret Backend(Vault, AWS SM 등)
Connection 권한 통제
Airflow UI에서 Connection 수정이 가능하므로,
- Admin만 수정 가능
- 사용자/팀별 Role 분리
- 접근 가능한 DAG/Connection 범위 정의
네트워크 통제 (특히 API 호출)
Airflow worker가 외부 API를 호출한다는 것은
- 내부망에서 외부로 나가는 Egress 통로가 생긴다는 뜻입니다.
점검 포인트
- 호출 대상 도메인 allowlist
- 프록시/게이트웨이 경유
- 요청 로그 기록(감사)
- 민감 데이터 포함 여부 점검
DB 계정 권한 최소화
Airflow에서 DB 작업할 때 “root 계정” 쓰는 경우가 흔하지만 매우 위험합니다.
권장
- 읽기/쓰기 분리 계정
- 최소 권한(필요 테이블만)
- 운영/개발 환경 분리
로그에 민감정보 남지 않게
Airflow는 실행 로그를 아주 많이 남깁니다.
점검 포인트
log_response=True사용 시 응답에 토큰/개인정보 포함 여부- SQL 로그에 개인정보가 그대로 찍히는지
- Task 실패 시 traceback에 비밀정보가 포함되는지
실무 활용 시나리오 예시
패턴 A: API → DB 적재 ETL
- API 호출로 데이터 수집
- 응답 파싱
- DB Insert
- 완료 알림(Slack/Webhook)
패턴 B: DB 작업 자동화(정기 정리/아카이빙)
- 오래된 데이터 삭제
- 통계 테이블 갱신
- 결과 리포트 생성
패턴 C: 보안팀 자동화에도 딱 맞음
- 위협 인텔리전스 API 수집 → SIEM 적재
- 자산/계정 상태 수집 → 일일 점검
- 취약점 스캐너 API 호출 → 결과 저장/요약
“Connection을 이해하면 Airflow가 쉬워진다”
Airflow 초반이 어려운 이유는 DAG 문법 때문이 아니라, 외부 시스템 연동 구조(Connection/Hook/Operator) 를 모르는 상태에서 “코드만 따라하기” 때문인 경우가 많습니다.
댓글