본문 바로가기
프로그램 (PHP,Python)

FastAPI + DuckDB로 만드는 가볍고 유연한 Webhook 큐잉 처리 시스템

by 날으는물고기 2025. 7. 5.

FastAPI + DuckDB로 만드는 가볍고 유연한 Webhook 큐잉 처리 시스템

728x90

DuckDB 기반 Webhook Gateway 구성과 Prometheus/Grafana 연계 모니터링을 포함한 종합적인 구현입니다.

시스템 구성도 및 데이터 흐름

┌─────────────────┐     ┌──────────────────┐     ┌────────────────────┐     ┌───────────┐
│  External APIs  │────▶│  Webhook Gateway │────▶│ Internal Processor │────▶│ 내부 시스템 │
└─────────────────┘     │   (FastAPI)      │     └────────────────────┘     └───────────┘
                        │                   │
                        │  ┌─────────────┐ │           ▲
                        │  │   DuckDB    │ │           │
                        │  │  (Queue DB) │ │           │
                        │  └─────────────┘ │           │
                        │         │        │           │
                        │         ▼        │           │
                        │  ┌─────────────┐ │           │
                        │  │  Processor  │─┼───────────┘
                        │  │  (Async)    │ │
                        │  └─────────────┘ │
                        └──────────────────┘
                                 │
                                 ▼
                        ┌─────────────────┐
                        │   Prometheus    │
                        │   (Metrics)     │
                        └────────┬────────┘
                                 │
                                 ▼
                        ┌─────────────────┐
                        │    Grafana      │
                        │  (Dashboard)    │
                        └─────────────────┘

핵심 컴포넌트 역할

  1. Webhook Gateway (FastAPI)
    • 외부 시스템으로부터 HTTP 요청 수신
    • 요청을 즉시 DuckDB에 저장 (큐잉)
    • 202 Accepted 응답 반환
    • Prometheus 메트릭 노출
  2. DuckDB
    • 메시지 큐 역할 (raw_webhooks 테이블)
    • 처리 로그 저장 (webhook_processing_log)
    • 라우팅 규칙 관리 (routing_rules)
    • SQL 기반 데이터 변환
  3. Async Processor
    • 백그라운드에서 지속적으로 실행
    • DuckDB에서 대기 중인 웹훅 조회
    • 라우팅 규칙에 따라 변환 및 전달
    • 실패 시 재시도 관리
  4. Prometheus + Grafana
    • 실시간 메트릭 수집 및 저장
    • 시각화 대시보드 제공
    • 알림 규칙 관리
300x250

Prometheus 메트릭 통합 웹 서버

import json
import asyncio
import time
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, BackgroundTasks, Response
from fastapi.responses import JSONResponse
import duckdb
import httpx
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from prometheus_client import CollectorRegistry, multiprocess, start_http_server
import logging
from dataclasses import dataclass
from enum import Enum

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Prometheus 메트릭 정의
registry = CollectorRegistry()

# 카운터 메트릭
webhook_received_total = Counter(
    'webhook_received_total',
    'Total number of webhooks received',
    ['endpoint', 'source'],
    registry=registry
)

webhook_processed_total = Counter(
    'webhook_processed_total',
    'Total number of webhooks processed',
    ['status', 'rule_name'],
    registry=registry
)

internal_api_calls_total = Counter(
    'internal_api_calls_total',
    'Total number of internal API calls',
    ['target_url', 'status_code'],
    registry=registry
)

# 히스토그램 메트릭
webhook_processing_duration = Histogram(
    'webhook_processing_duration_seconds',
    'Time spent processing webhooks',
    ['rule_name'],
    buckets=(0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0),
    registry=registry
)

internal_api_duration = Histogram(
    'internal_api_duration_seconds',
    'Internal API call duration',
    ['target_url'],
    buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0),
    registry=registry
)

queue_wait_time = Histogram(
    'webhook_queue_wait_time_seconds',
    'Time webhooks spend in queue before processing',
    buckets=(1, 5, 10, 30, 60, 300, 600, 1800, 3600),
    registry=registry
)

# 게이지 메트릭
webhook_queue_size = Gauge(
    'webhook_queue_size',
    'Current number of webhooks in queue',
    ['status'],
    registry=registry
)

active_processing_count = Gauge(
    'webhook_active_processing_count',
    'Number of webhooks currently being processed',
    registry=registry
)

database_size_bytes = Gauge(
    'webhook_database_size_bytes',
    'Size of DuckDB database file',
    registry=registry
)

last_successful_processing = Gauge(
    'webhook_last_successful_processing_timestamp',
    'Timestamp of last successful webhook processing',
    ['rule_name'],
    registry=registry
)

# 상태 열거형
class WebhookStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

# DuckDB 연결 관리자 (확장판)
class EnhancedDuckDBManager:
    def __init__(self, db_path: str = "webhook_gateway.db"):
        self.db_path = db_path
        self.conn = None
        self.init_database()
        self._register_custom_functions()

    def init_database(self):
        """데이터베이스 초기화 및 테이블 생성"""
        conn = duckdb.connect(self.db_path)

        # 기본 설정
        conn.execute("""
            SET memory_limit = '2GB';
            SET threads = 4;
        """)

        # 시퀀스 생성
        conn.execute("""
            CREATE SEQUENCE IF NOT EXISTS webhook_seq START 1;
            CREATE SEQUENCE IF NOT EXISTS log_seq START 1;
        """)

        # 메인 테이블들
        conn.execute("""
            CREATE TABLE IF NOT EXISTS raw_webhooks (
                id BIGINT PRIMARY KEY DEFAULT nextval('webhook_seq'),
                received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                source_ip VARCHAR,
                endpoint VARCHAR,
                headers JSON,
                payload JSON,
                status VARCHAR DEFAULT 'pending',
                retry_count INT DEFAULT 0,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """)

        # 인덱스 생성
        conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_webhook_status 
            ON raw_webhooks(status, retry_count, received_at);

            CREATE INDEX IF NOT EXISTS idx_webhook_created 
            ON raw_webhooks(created_at);
        """)

        # 처리 로그 테이블
        conn.execute("""
            CREATE TABLE IF NOT EXISTS webhook_processing_log (
                id BIGINT PRIMARY KEY DEFAULT nextval('log_seq'),
                webhook_id BIGINT,
                attempt_number INT,
                processing_started_at TIMESTAMP,
                processing_completed_at TIMESTAMP,
                internal_url VARCHAR,
                request_payload JSON,
                response_status INT,
                response_body TEXT,
                error_message TEXT,
                processing_duration_ms INT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """)

        # 라우팅 규칙 테이블
        conn.execute("""
            CREATE TABLE IF NOT EXISTS routing_rules (
                id INT PRIMARY KEY,
                rule_name VARCHAR UNIQUE,
                description TEXT,
                filter_query TEXT,
                transform_query TEXT,
                target_url VARCHAR,
                timeout_seconds INT DEFAULT 30,
                retry_policy JSON,
                is_active BOOLEAN DEFAULT true,
                priority INT DEFAULT 100,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """)

        # 메트릭 집계 테이블
        conn.execute("""
            CREATE TABLE IF NOT EXISTS webhook_metrics (
                id BIGINT PRIMARY KEY DEFAULT nextval('webhook_seq'),
                metric_type VARCHAR,
                metric_name VARCHAR,
                metric_value DOUBLE,
                labels JSON,
                timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """)

        # 샘플 라우팅 규칙
        conn.execute("""
            INSERT OR REPLACE INTO routing_rules 
            (id, rule_name, description, filter_query, transform_query, target_url, retry_policy)
            VALUES 
            (1, 'order_events', 
             'Process order creation events',
             "json_extract_string(payload, '$.event_type') = 'order.created'",
             "SELECT 
                json_extract(payload, '$.order_id') as order_id, 
                json_extract(payload, '$.customer_id') as customer_id,
                json_extract(payload, '$.total_amount') as amount,
                json_extract(payload, '$.items') as items",
             'http://internal-api:8080/orders/process',
             '{"max_retries": 3, "backoff_multiplier": 2}'),

            (2, 'payment_events',
             'Handle payment related events',
             "json_extract_string(payload, '$.event_type') LIKE 'payment.%'",
             "SELECT payload",
             'http://internal-api:8080/payments/webhook',
             '{"max_retries": 5, "backoff_multiplier": 1.5}'),

            (3, 'user_events',
             'User activity tracking',
             "json_extract_string(payload, '$.category') = 'user'",
             "SELECT 
                json_extract(payload, '$.user_id') as user_id,
                json_extract(payload, '$.action') as action,
                json_extract(payload, '$.metadata') as metadata",
             'http://internal-api:8080/users/activity',
             '{"max_retries": 2, "backoff_multiplier": 1}')
        """)

        conn.close()

    def _register_custom_functions(self):
        """커스텀 SQL 함수 등록"""
        conn = self.get_connection()

        # JSON 깊은 추출
        def json_deep_extract(json_str: str, path: str) -> Any:
            try:
                import json
                data = json.loads(json_str) if isinstance(json_str, str) else json_str

                # JSONPath 스타일 파싱
                parts = path.strip('$.').split('.')
                for part in parts:
                    if '[' in part and ']' in part:
                        # 배열 인덱스 처리
                        key = part[:part.index('[')]
                        index = int(part[part.index('[')+1:part.index(']')])
                        data = data.get(key, [])[index] if isinstance(data, dict) else None
                    else:
                        data = data.get(part) if isinstance(data, dict) else None

                    if data is None:
                        break

                return json.dumps(data) if isinstance(data, (dict, list)) else data
            except:
                return None

        conn.create_function("json_deep_extract", json_deep_extract)

    def get_connection(self):
        if not self.conn:
            self.conn = duckdb.connect(self.db_path)
        return self.conn

    def get_db_size(self) -> int:
        """데이터베이스 파일 크기 반환"""
        import os
        try:
            return os.path.getsize(self.db_path)
        except:
            return 0

# 메트릭 수집기
class MetricsCollector:
    def __init__(self, db_manager: EnhancedDuckDBManager):
        self.db_manager = db_manager
        self._start_background_collection()

    def _start_background_collection(self):
        """백그라운드 메트릭 수집 시작"""
        asyncio.create_task(self._collect_metrics_loop())

    async def _collect_metrics_loop(self):
        """주기적으로 메트릭 수집"""
        while True:
            try:
                await self.collect_queue_metrics()
                await self.collect_db_metrics()
                await asyncio.sleep(10)  # 10초마다 수집
            except Exception as e:
                logger.error(f"Metrics collection error: {e}")
                await asyncio.sleep(30)

    async def collect_queue_metrics(self):
        """큐 관련 메트릭 수집"""
        conn = self.db_manager.get_connection()

        # 상태별 웹훅 수
        status_counts = conn.execute("""
            SELECT status, COUNT(*) as count
            FROM raw_webhooks
            GROUP BY status
        """).fetchall()

        for status, count in status_counts:
            webhook_queue_size.labels(status=status).set(count)

        # 큐 대기 시간 계산
        waiting_times = conn.execute("""
            SELECT 
                EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - received_at)) as wait_seconds
            FROM raw_webhooks
            WHERE status = 'pending'
            LIMIT 100
        """).fetchall()

        for (wait_time,) in waiting_times:
            queue_wait_time.observe(wait_time)

    async def collect_db_metrics(self):
        """데이터베이스 메트릭 수집"""
        database_size_bytes.set(self.db_manager.get_db_size())

# 향상된 웹훅 프로세서
class EnhancedWebhookProcessor:
    def __init__(self, db_manager: EnhancedDuckDBManager):
        self.db_manager = db_manager
        self.http_client = httpx.AsyncClient(timeout=30.0)
        self.processing = False
        self.concurrent_limit = 10  # 동시 처리 제한
        self.processing_semaphore = asyncio.Semaphore(self.concurrent_limit)

    async def start_processing(self):
        """백그라운드 처리 시작"""
        self.processing = True

        # 여러 워커 실행
        workers = []
        for i in range(3):  # 3개의 워커
            workers.append(asyncio.create_task(self._worker_loop(i)))

        # 재시도 전용 워커
        workers.append(asyncio.create_task(self._retry_worker_loop()))

        await asyncio.gather(*workers, return_exceptions=True)

    async def stop_processing(self):
        """처리 중지"""
        self.processing = False
        await self.http_client.aclose()

    async def _worker_loop(self, worker_id: int):
        """워커 루프"""
        logger.info(f"Worker {worker_id} started")

        while self.processing:
            try:
                await self._process_batch()
                await asyncio.sleep(0.5)  # 짧은 대기
            except Exception as e:
                logger.error(f"Worker {worker_id} error: {e}")
                await asyncio.sleep(5)

    async def _retry_worker_loop(self):
        """재시도 전용 워커"""
        logger.info("Retry worker started")

        while self.processing:
            try:
                await self._process_retries()
                await asyncio.sleep(30)  # 30초마다 재시도 확인
            except Exception as e:
                logger.error(f"Retry worker error: {e}")
                await asyncio.sleep(60)

    async def _process_batch(self):
        """배치 처리"""
        conn = self.db_manager.get_connection()

        # 처리할 웹훅 조회 및 잠금
        batch = conn.execute("""
            UPDATE raw_webhooks
            SET status = 'processing',
                updated_at = CURRENT_TIMESTAMP
            WHERE id IN (
                SELECT id FROM raw_webhooks
                WHERE status = 'pending'
                ORDER BY received_at
                LIMIT 10
            )
            RETURNING id, payload, headers, endpoint, retry_count, received_at
        """).fetchall()

        if not batch:
            return

        # 동시 처리
        tasks = []
        for webhook_data in batch:
            task = asyncio.create_task(self._process_single_webhook(*webhook_data))
            tasks.append(task)

        await asyncio.gather(*tasks, return_exceptions=True)

    async def _process_retries(self):
        """재시도 처리"""
        conn = self.db_manager.get_connection()

        # 재시도 대상 조회
        retries = conn.execute("""
            SELECT id, payload, headers, endpoint, retry_count, received_at
            FROM raw_webhooks
            WHERE status = 'failed'
              AND retry_count < 3
              AND updated_at < CURRENT_TIMESTAMP - INTERVAL '1 minute' * POWER(2, retry_count)
            ORDER BY updated_at
            LIMIT 5
        """).fetchall()

        for webhook_data in retries:
            await self._process_single_webhook(*webhook_data)

    async def _process_single_webhook(self, webhook_id: int, payload: dict, 
                                     headers: dict, endpoint: str, 
                                     retry_count: int, received_at: datetime):
        """단일 웹훅 처리"""
        async with self.processing_semaphore:
            active_processing_count.inc()

            # 큐 대기 시간 기록
            wait_time = (datetime.now() - received_at).total_seconds()
            queue_wait_time.observe(wait_time)

            start_time = time.time()
            rule_name = "unknown"

            try:
                # 라우팅 규칙 찾기
                routing_rule = await self._find_routing_rule(payload, endpoint)

                if not routing_rule:
                    # 규칙 없음 - 완료 처리
                    await self._mark_completed(webhook_id, "no_matching_rule")
                    return

                rule_name = routing_rule['rule_name']

                # 데이터 변환
                transformed_data = await self._transform_data(
                    payload, 
                    routing_rule['transform_query']
                )

                # 내부 API 호출
                api_start = time.time()
                response = await self._call_internal_api(
                    routing_rule['target_url'],
                    transformed_data,
                    headers,
                    routing_rule.get('timeout_seconds', 30)
                )
                api_duration = time.time() - api_start

                # API 호출 메트릭
                internal_api_duration.labels(
                    target_url=routing_rule['target_url']
                ).observe(api_duration)

                internal_api_calls_total.labels(
                    target_url=routing_rule['target_url'],
                    status_code=response.status_code
                ).inc()

                # 처리 로그 기록
                await self._log_processing(
                    webhook_id, retry_count + 1, 
                    routing_rule['target_url'],
                    transformed_data, response,
                    int((time.time() - start_time) * 1000)
                )

                # 성공/실패 처리
                if response.status_code < 400:
                    await self._mark_completed(webhook_id, rule_name)
                    last_successful_processing.labels(
                        rule_name=rule_name
                    ).set(time.time())
                else:
                    raise Exception(f"API returned {response.status_code}")

            except Exception as e:
                logger.error(f"Processing error for webhook {webhook_id}: {e}")
                await self._mark_failed(webhook_id, str(e), retry_count)
                webhook_processed_total.labels(
                    status="failed",
                    rule_name=rule_name
                ).inc()

            finally:
                active_processing_count.dec()

                # 처리 시간 기록
                duration = time.time() - start_time
                webhook_processing_duration.labels(
                    rule_name=rule_name
                ).observe(duration)

    async def _find_routing_rule(self, payload: dict, endpoint: str) -> Optional[Dict]:
        """라우팅 규칙 찾기"""
        conn = self.db_manager.get_connection()

        # 규칙 매칭
        rules = conn.execute("""
            SELECT 
                r.id, 
                r.rule_name, 
                r.transform_query, 
                r.target_url,
                r.timeout_seconds,
                r.retry_policy
            FROM routing_rules r
            WHERE r.is_active = true
            ORDER BY r.priority DESC
        """).fetchall()

        for rule in rules:
            # 필터 조건 평가
            try:
                matches = conn.execute(f"""
                    SELECT ({rule[2]}) as matches
                    FROM (SELECT ? as payload) t
                """, [json.dumps(payload)]).fetchone()

                if matches and matches[0]:
                    return {
                        'id': rule[0],
                        'rule_name': rule[1],
                        'transform_query': rule[2],
                        'target_url': rule[3],
                        'timeout_seconds': rule[4],
                        'retry_policy': rule[5]
                    }
            except:
                continue

        return None

    async def _transform_data(self, payload: dict, transform_query: str) -> dict:
        """데이터 변환"""
        if transform_query.strip().upper() == "SELECT PAYLOAD":
            return payload

        conn = self.db_manager.get_connection()

        try:
            # 변환 실행
            result = conn.execute(f"""
                WITH source AS (SELECT ? as payload)
                {transform_query}
                FROM source
            """, [json.dumps(payload)]).fetchone()

            if result:
                columns = [desc[0] for desc in conn.description]
                return dict(zip(columns, result))
        except Exception as e:
            logger.error(f"Transform error: {e}")

        return payload

    async def _call_internal_api(self, url: str, data: dict, 
                                headers: dict, timeout: int) -> httpx.Response:
        """내부 API 호출"""
        forward_headers = {
            'Content-Type': 'application/json',
            'X-Webhook-Gateway': 'duckdb-v1',
            'X-Original-IP': headers.get('x-forwarded-for', ''),
        }

        response = await self.http_client.post(
            url,
            json=data,
            headers=forward_headers,
            timeout=timeout
        )

        return response

    async def _log_processing(self, webhook_id: int, attempt: int,
                            target_url: str, request_data: dict,
                            response: httpx.Response, duration_ms: int):
        """처리 로그 기록"""
        conn = self.db_manager.get_connection()

        conn.execute("""
            INSERT INTO webhook_processing_log 
            (webhook_id, attempt_number, processing_started_at, 
             processing_completed_at, internal_url, request_payload, 
             response_status, response_body, processing_duration_ms)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, [
            webhook_id, attempt, 
            datetime.now() - timedelta(milliseconds=duration_ms),
            datetime.now(), target_url, json.dumps(request_data),
            response.status_code, response.text[:1000],  # 응답 본문 제한
            duration_ms
        ])

    async def _mark_completed(self, webhook_id: int, rule_name: str):
        """완료 처리"""
        conn = self.db_manager.get_connection()

        conn.execute("""
            UPDATE raw_webhooks 
            SET status = 'completed',
                updated_at = CURRENT_TIMESTAMP
            WHERE id = ?
        """, [webhook_id])

        webhook_processed_total.labels(
            status="completed",
            rule_name=rule_name
        ).inc()

    async def _mark_failed(self, webhook_id: int, error: str, retry_count: int):
        """실패 처리"""
        conn = self.db_manager.get_connection()

        conn.execute("""
            UPDATE raw_webhooks 
            SET status = 'failed',
                retry_count = retry_count + 1,
                updated_at = CURRENT_TIMESTAMP
            WHERE id = ?
        """, [webhook_id])

        # 에러 로그
        conn.execute("""
            INSERT INTO webhook_processing_log 
            (webhook_id, attempt_number, error_message, 
             processing_started_at, processing_completed_at)
            VALUES (?, ?, ?, ?, ?)
        """, [
            webhook_id, retry_count + 1, error,
            datetime.now(), datetime.now()
        ])

# FastAPI 앱 설정
app = FastAPI(title="Webhook Gateway with Metrics")

# 전역 인스턴스
db_manager = EnhancedDuckDBManager()
processor = EnhancedWebhookProcessor(db_manager)
metrics_collector = MetricsCollector(db_manager)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 시작
    logger.info("Starting webhook gateway...")
    asyncio.create_task(processor.start_processing())
    yield
    # 종료
    logger.info("Stopping webhook gateway...")
    await processor.stop_processing()

app.router.lifespan_context = lifespan

# API 엔드포인트들
@app.post("/webhook/{endpoint_path:path}")
async def receive_webhook(endpoint_path: str, request: Request):
    """웹훅 수신"""
    start_time = time.time()

    try:
        # 요청 파싱
        body = await request.body()
        try:
            payload = json.loads(body)
        except:
            payload = {"raw_body": body.decode('utf-8')}

        headers = dict(request.headers)
        source_ip = request.client.host

        # 메트릭 기록
        webhook_received_total.labels(
            endpoint=endpoint_path,
            source=source_ip.split('.')[0] + '.x.x.x'  # IP 익명화
        ).inc()

        # DB 저장
        conn = db_manager.get_connection()
        result = conn.execute("""
            INSERT INTO raw_webhooks 
            (source_ip, endpoint, headers, payload)
            VALUES (?, ?, ?, ?)
            RETURNING id
        """, [source_ip, endpoint_path, json.dumps(headers), json.dumps(payload)]).fetchone()

        webhook_id = result[0]

        return JSONResponse(
            status_code=202,
            content={
                "status": "accepted",
                "webhook_id": webhook_id,
                "message": "Webhook queued for processing"
            }
        )

    except Exception as e:
        logger.error(f"Webhook receive error: {e}")
        return JSONResponse(
            status_code=500,
            content={"error": "Internal server error"}
        )

@app.get("/metrics")
async def get_metrics():
    """Prometheus 메트릭 엔드포인트"""
    return Response(
        content=generate_latest(registry),
        media_type=CONTENT_TYPE_LATEST
    )

@app.get("/status/{webhook_id}")
async def get_webhook_status(webhook_id: int):
    """웹훅 상태 조회"""
    conn = db_manager.get_connection()

    # 웹훅 정보
    webhook = conn.execute("""
        SELECT 
            w.id, w.status, w.retry_count, 
            w.received_at, w.created_at, w.updated_at,
            w.endpoint, w.payload
        FROM raw_webhooks w
        WHERE w.id = ?
    """, [webhook_id]).fetchone()

    if not webhook:
        return JSONResponse(status_code=404, content={"error": "Not found"})

    # 처리 이력
    logs = conn.execute("""
        SELECT 
            attempt_number, processing_started_at, 
            processing_completed_at, internal_url,
            response_status, error_message,
            processing_duration_ms
        FROM webhook_processing_log
        WHERE webhook_id = ?
        ORDER BY attempt_number DESC
    """, [webhook_id]).fetchall()

    return {
        "webhook": {
            "id": webhook[0],
            "status": webhook[1],
            "retry_count": webhook[2],
            "received_at": webhook[3].isoformat() if webhook[3] else None,
            "created_at": webhook[4].isoformat() if webhook[4] else None,
            "updated_at": webhook[5].isoformat() if webhook[5] else None,
            "endpoint": webhook[6]
        },
        "processing_history": [
            {
                "attempt": log[0],
                "started_at": log[1].isoformat() if log[1] else None,
                "completed_at": log[2].isoformat() if log[2] else None,
                "target_url": log[3],
                "response_status": log[4],
                "error": log[5],
                "duration_ms": log[6]
            }
            for log in logs
        ]
    }

@app.get("/health")
async def health_check():
    """헬스 체크"""
    try:
        conn = db_manager.get_connection()
        conn.execute("SELECT 1").fetchone()

        return {
            "status": "healthy",
            "database": "connected",
            "processor": "running" if processor.processing else "stopped"
        }
    except Exception as e:
        return JSONResponse(
            status_code=503,
            content={
                "status": "unhealthy",
                "error": str(e)
            }
        )

@app.get("/dashboard/summary")
async def dashboard_summary():
    """대시보드용 요약 데이터"""
    conn = db_manager.get_connection()

    summary = conn.execute("""
        WITH time_ranges AS (
            SELECT 
                CURRENT_TIMESTAMP - INTERVAL '1 hour' as hour_ago,
                CURRENT_TIMESTAMP - INTERVAL '24 hours' as day_ago,
                CURRENT_TIMESTAMP - INTERVAL '7 days' as week_ago
        ),
        stats AS (
            SELECT 
                -- 전체 통계
                COUNT(*) as total_webhooks,
                COUNT(*) FILTER (WHERE status = 'completed') as completed,
                COUNT(*) FILTER (WHERE status = 'failed' AND retry_count >= 3) as failed,
                COUNT(*) FILTER (WHERE status = 'pending') as pending,
                COUNT(*) FILTER (WHERE status = 'processing') as processing,

                -- 시간대별 통계
                COUNT(*) FILTER (WHERE created_at > (SELECT hour_ago FROM time_ranges)) as last_hour,
                COUNT(*) FILTER (WHERE created_at > (SELECT day_ago FROM time_ranges)) as last_day,
                COUNT(*) FILTER (WHERE created_at > (SELECT week_ago FROM time_ranges)) as last_week,

                -- 성공률
                ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'completed') / 
                      NULLIF(COUNT(*) FILTER (WHERE status IN ('completed', 'failed')), 0), 2) as success_rate
            FROM raw_webhooks
        ),
        processing_stats AS (
            SELECT 
                AVG(processing_duration_ms) as avg_duration_ms,
                MIN(processing_duration_ms) as min_duration_ms,
                MAX(processing_duration_ms) as max_duration_ms,
                PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY processing_duration_ms) as p50_duration_ms,
                PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY processing_duration_ms) as p95_duration_ms,
                PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_duration_ms) as p99_duration_ms
            FROM webhook_processing_log
            WHERE processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
              AND processing_duration_ms IS NOT NULL
        ),
        rule_stats AS (
            SELECT 
                r.rule_name,
                COUNT(DISTINCT w.id) as webhook_count,
                AVG(l.processing_duration_ms) as avg_duration_ms
            FROM routing_rules r
            LEFT JOIN webhook_processing_log l ON l.internal_url = r.target_url
            LEFT JOIN raw_webhooks w ON w.id = l.webhook_id
            WHERE r.is_active = true
            GROUP BY r.rule_name
        )
        SELECT 
            s.*,
            p.*,
            (SELECT JSON_GROUP_ARRAY(JSON_OBJECT(
                'rule_name', rule_name,
                'webhook_count', webhook_count,
                'avg_duration_ms', avg_duration_ms
            )) FROM rule_stats) as rules_performance
        FROM stats s, processing_stats p
    """).fetchone()

    if not summary:
        return {"error": "No data available"}

    # 결과 포맷팅
    return {
        "overview": {
            "total_webhooks": summary[0],
            "completed": summary[1],
            "failed": summary[2],
            "pending": summary[3],
            "processing": summary[4],
            "success_rate": summary[8]
        },
        "volume": {
            "last_hour": summary[5],
            "last_day": summary[6],
            "last_week": summary[7]
        },
        "performance": {
            "avg_duration_ms": summary[9],
            "min_duration_ms": summary[10],
            "max_duration_ms": summary[11],
            "p50_duration_ms": summary[12],
            "p95_duration_ms": summary[13],
            "p99_duration_ms": summary[14]
        },
        "rules_performance": json.loads(summary[15]) if summary[15] else []
    }

# 라우팅 규칙 관리 API
@app.get("/routing-rules")
async def list_routing_rules():
    """라우팅 규칙 목록"""
    conn = db_manager.get_connection()

    rules = conn.execute("""
        SELECT 
            id, rule_name, description, 
            filter_query, transform_query, 
            target_url, is_active, priority,
            created_at, updated_at
        FROM routing_rules
        ORDER BY priority DESC, created_at DESC
    """).fetchall()

    return {
        "rules": [
            {
                "id": r[0],
                "rule_name": r[1],
                "description": r[2],
                "filter_query": r[3],
                "transform_query": r[4],
                "target_url": r[5],
                "is_active": r[6],
                "priority": r[7],
                "created_at": r[8].isoformat() if r[8] else None,
                "updated_at": r[9].isoformat() if r[9] else None
            }
            for r in rules
        ]
    }

@app.post("/routing-rules")
async def create_routing_rule(rule: Dict[str, Any]):
    """새 라우팅 규칙 생성"""
    conn = db_manager.get_connection()

    try:
        # 규칙 검증
        test_payload = rule.get('test_payload', {})

        # 필터 쿼리 테스트
        conn.execute(f"""
            SELECT ({rule['filter_query']}) as result
            FROM (SELECT ? as payload) t
        """, [json.dumps(test_payload)])

        # 규칙 삽입
        result = conn.execute("""
            INSERT INTO routing_rules 
            (rule_name, description, filter_query, 
             transform_query, target_url, priority, retry_policy)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            RETURNING id
        """, [
            rule['rule_name'],
            rule.get('description', ''),
            rule['filter_query'],
            rule['transform_query'],
            rule['target_url'],
            rule.get('priority', 100),
            json.dumps(rule.get('retry_policy', {}))
        ]).fetchone()

        return {
            "id": result[0],
            "message": "Routing rule created successfully"
        }

    except Exception as e:
        return JSONResponse(
            status_code=400,
            content={"error": f"Invalid rule: {str(e)}"}
        )

@app.put("/routing-rules/{rule_id}/toggle")
async def toggle_routing_rule(rule_id: int):
    """라우팅 규칙 활성화/비활성화"""
    conn = db_manager.get_connection()

    conn.execute("""
        UPDATE routing_rules 
        SET is_active = NOT is_active,
            updated_at = CURRENT_TIMESTAMP
        WHERE id = ?
    """, [rule_id])

    return {"message": "Rule toggled successfully"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Prometheus 설정

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s
  external_labels:
    monitor: 'webhook-gateway'

# Alerting 설정
alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

# 규칙 파일 로드
rule_files:
  - "alert_rules.yml"

# 스크래핑 대상
scrape_configs:
  # Webhook Gateway 메트릭
  - job_name: 'webhook-gateway'
    static_configs:
      - targets: ['webhook-gateway:8000']
    metrics_path: '/metrics'
    scrape_interval: 10s
    scrape_timeout: 5s

  # DuckDB 익스포터 (옵션)
  - job_name: 'duckdb-exporter'
    static_configs:
      - targets: ['duckdb-exporter:9091']
    scrape_interval: 30s

  # Node Exporter (시스템 메트릭)
  - job_name: 'node'
    static_configs:
      - targets: ['node-exporter:9100']

  # Prometheus 자체 메트릭
  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

---
# alert_rules.yml
groups:
  - name: webhook_gateway_alerts
    interval: 30s
    rules:
      # 큐 크기 알림
      - alert: WebhookQueueBacklog
        expr: webhook_queue_size{status="pending"} > 1000
        for: 5m
        labels:
          severity: warning
          component: webhook-gateway
        annotations:
          summary: "High webhook queue backlog"
          description: "Webhook queue has {{ $value }} pending items"

      # 처리 지연 알림
      - alert: WebhookProcessingDelay
        expr: histogram_quantile(0.95, rate(webhook_queue_wait_time_seconds_bucket[5m])) > 300
        for: 5m
        labels:
          severity: critical
          component: webhook-gateway
        annotations:
          summary: "High webhook processing delay"
          description: "95th percentile queue wait time is {{ $value }}s"

      # 실패율 알림
      - alert: WebhookHighFailureRate
        expr: |
          rate(webhook_processed_total{status="failed"}[5m]) / 
          rate(webhook_processed_total[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
          component: webhook-gateway
        annotations:
          summary: "High webhook failure rate"
          description: "Failure rate is {{ $value | humanizePercentage }}"

      # API 응답 시간 알림
      - alert: InternalAPISlowResponse
        expr: |
          histogram_quantile(0.95, 
            rate(internal_api_duration_seconds_bucket[5m])
          ) > 5
        for: 5m
        labels:
          severity: warning
          component: internal-api
        annotations:
          summary: "Slow internal API responses"
          description: "95th percentile response time is {{ $value }}s for {{ $labels.target_url }}"

      # 데이터베이스 크기 알림
      - alert: DatabaseSizeLarge
        expr: webhook_database_size_bytes > 10737418240  # 10GB
        for: 30m
        labels:
          severity: warning
          component: database
        annotations:
          summary: "Database size is large"
          description: "DuckDB database size is {{ $value | humanize1024 }}B"

      # 처리 중단 알림
      - alert: WebhookProcessingStopped
        expr: |
          rate(webhook_processed_total[5m]) == 0 
          and webhook_queue_size{status="pending"} > 0
        for: 5m
        labels:
          severity: critical
          component: webhook-gateway
        annotations:
          summary: "Webhook processing has stopped"
          description: "No webhooks processed in 5 minutes with {{ $value }} pending"

Grafana 대시보드 설정

{
  "dashboard": {
    "id": null,
    "uid": "webhook-gateway",
    "title": "Webhook Gateway Dashboard",
    "tags": ["webhook", "gateway", "duckdb"],
    "timezone": "browser",
    "schemaVersion": 30,
    "version": 1,
    "refresh": "10s",
    "time": {
      "from": "now-6h",
      "to": "now"
    },
    "panels": [
      {
        "id": 1,
        "gridPos": { "h": 8, "w": 6, "x": 0, "y": 0 },
        "type": "stat",
        "title": "Total Webhooks Received",
        "targets": [
          {
            "expr": "sum(increase(webhook_received_total[1h]))",
            "refId": "A"
          }
        ],
        "options": {
          "reduceOptions": {
            "values": false,
            "calcs": ["lastNotNull"]
          },
          "orientation": "auto",
          "textMode": "auto",
          "colorMode": "value",
          "graphMode": "area",
          "justifyMode": "auto"
        }
      },
      {
        "id": 2,
        "gridPos": { "h": 8, "w": 6, "x": 6, "y": 0 },
        "type": "gauge",
        "title": "Current Queue Size",
        "targets": [
          {
            "expr": "sum(webhook_queue_size{status=\"pending\"})",
            "refId": "A"
          }
        ],
        "options": {
          "showThresholdLabels": false,
          "showThresholdMarkers": true
        },
        "fieldConfig": {
          "defaults": {
            "color": {
              "mode": "thresholds"
            },
            "mappings": [],
            "thresholds": {
              "mode": "absolute",
              "steps": [
                { "color": "green", "value": null },
                { "color": "yellow", "value": 100 },
                { "color": "red", "value": 500 }
              ]
            },
            "unit": "short",
            "max": 1000,
            "min": 0
          }
        }
      },
      {
        "id": 3,
        "gridPos": { "h": 8, "w": 6, "x": 12, "y": 0 },
        "type": "stat",
        "title": "Success Rate",
        "targets": [
          {
            "expr": "sum(rate(webhook_processed_total{status=\"completed\"}[5m])) / sum(rate(webhook_processed_total[5m])) * 100",
            "refId": "A"
          }
        ],
        "options": {
          "reduceOptions": {
            "values": false,
            "calcs": ["lastNotNull"]
          },
          "orientation": "auto",
          "textMode": "auto",
          "colorMode": "value",
          "graphMode": "none",
          "justifyMode": "auto"
        },
        "fieldConfig": {
          "defaults": {
            "unit": "percent",
            "decimals": 2,
            "thresholds": {
              "mode": "absolute",
              "steps": [
                { "color": "red", "value": null },
                { "color": "yellow", "value": 90 },
                { "color": "green", "value": 95 }
              ]
            }
          }
        }
      },
      {
        "id": 4,
        "gridPos": { "h": 8, "w": 6, "x": 18, "y": 0 },
        "type": "stat",
        "title": "Active Processing",
        "targets": [
          {
            "expr": "webhook_active_processing_count",
            "refId": "A"
          }
        ],
        "options": {
          "reduceOptions": {
            "values": false,
            "calcs": ["lastNotNull"]
          },
          "orientation": "auto",
          "textMode": "auto",
          "colorMode": "background",
          "graphMode": "area",
          "justifyMode": "auto"
        }
      },
      {
        "id": 5,
        "gridPos": { "h": 10, "w": 12, "x": 0, "y": 8 },
        "type": "graph",
        "title": "Webhook Processing Rate",
        "targets": [
          {
            "expr": "sum(rate(webhook_received_total[5m])) by (endpoint)",
            "legendFormat": "Received - {{endpoint}}",
            "refId": "A"
          },
          {
            "expr": "sum(rate(webhook_processed_total{status=\"completed\"}[5m])) by (rule_name)",
            "legendFormat": "Completed - {{rule_name}}",
            "refId": "B"
          },
          {
            "expr": "sum(rate(webhook_processed_total{status=\"failed\"}[5m])) by (rule_name)",
            "legendFormat": "Failed - {{rule_name}}",
            "refId": "C"
          }
        ],
        "yaxes": [
          {
            "format": "short",
            "label": "Requests/sec",
            "show": true
          },
          {
            "format": "short",
            "show": false
          }
        ],
        "xaxis": {
          "show": true
        }
      },
      {
        "id": 6,
        "gridPos": { "h": 10, "w": 12, "x": 12, "y": 8 },
        "type": "heatmap",
        "title": "Queue Wait Time Heatmap",
        "targets": [
          {
            "expr": "sum(increase(webhook_queue_wait_time_seconds_bucket[1m])) by (le)",
            "format": "heatmap",
            "refId": "A"
          }
        ],
        "options": {
          "calculate": false,
          "yAxis": {
            "axisLabel": "Wait Time",
            "axisPlacement": "left",
            "reverse": false,
            "unit": "s"
          },
          "rowsFrame": {
            "layout": "auto"
          },
          "color": {
            "mode": "scheme",
            "scheme": "Spectral",
            "steps": 128
          },
          "cellGap": 1,
          "filterValues": {
            "le": 1e-09
          }
        }
      },
      {
        "id": 7,
        "gridPos": { "h": 10, "w": 12, "x": 0, "y": 18 },
        "type": "graph",
        "title": "Processing Duration by Rule (95th percentile)",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, sum(rate(webhook_processing_duration_seconds_bucket[5m])) by (rule_name, le))",
            "legendFormat": "{{rule_name}}",
            "refId": "A"
          }
        ],
        "yaxes": [
          {
            "format": "s",
            "label": "Duration",
            "show": true
          },
          {
            "format": "short",
            "show": false
          }
        ],
        "xaxis": {
          "show": true
        }
      },
      {
        "id": 8,
        "gridPos": { "h": 10, "w": 12, "x": 12, "y": 18 },
        "type": "graph",
        "title": "Internal API Response Times",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, sum(rate(internal_api_duration_seconds_bucket[5m])) by (target_url, le))",
            "legendFormat": "p50 - {{target_url}}",
            "refId": "A"
          },
          {
            "expr": "histogram_quantile(0.95, sum(rate(internal_api_duration_seconds_bucket[5m])) by (target_url, le))",
            "legendFormat": "p95 - {{target_url}}",
            "refId": "B"
          },
          {
            "expr": "histogram_quantile(0.99, sum(rate(internal_api_duration_seconds_bucket[5m])) by (target_url, le))",
            "legendFormat": "p99 - {{target_url}}",
            "refId": "C"
          }
        ],
        "yaxes": [
          {
            "format": "s",
            "label": "Response Time",
            "show": true
          },
          {
            "format": "short",
            "show": false
          }
        ],
        "xaxis": {
          "show": true
        }
      },
      {
        "id": 9,
        "gridPos": { "h": 8, "w": 8, "x": 0, "y": 28 },
        "type": "piechart",
        "title": "Webhook Status Distribution",
        "targets": [
          {
            "expr": "sum(webhook_queue_size) by (status)",
            "legendFormat": "{{status}}",
            "refId": "A"
          }
        ],
        "options": {
          "pieType": "donut",
          "displayLabels": ["name", "percent"],
          "legendDisplayMode": "list",
          "legendPlacement": "right"
        }
      },
      {
        "id": 10,
        "gridPos": { "h": 8, "w": 8, "x": 8, "y": 28 },
        "type": "table",
        "title": "Top Errors (Last Hour)",
        "targets": [
          {
            "expr": "topk(10, count by (error_message) (webhook_processing_log{error_message!=\"\"}))",
            "format": "table",
            "instant": true,
            "refId": "A"
          }
        ],
        "options": {
          "showHeader": true
        },
        "fieldConfig": {
          "defaults": {
            "custom": {
              "align": "auto",
              "displayMode": "auto"
            }
          }
        }
      },
      {
        "id": 11,
        "gridPos": { "h": 8, "w": 8, "x": 16, "y": 28 },
        "type": "bargauge",
        "title": "Database Size",
        "targets": [
          {
            "expr": "webhook_database_size_bytes",
            "refId": "A"
          }
        ],
        "options": {
          "orientation": "horizontal",
          "displayMode": "gradient",
          "showUnfilled": true
        },
        "fieldConfig": {
          "defaults": {
            "unit": "decbytes",
            "max": 10737418240,
            "thresholds": {
              "mode": "absolute",
              "steps": [
                { "color": "green", "value": null },
                { "color": "yellow", "value": 5368709120 },
                { "color": "red", "value": 8589934592 }
              ]
            }
          }
        }
      },
      {
        "id": 12,
        "gridPos": { "h": 10, "w": 24, "x": 0, "y": 36 },
        "type": "logs",
        "title": "Recent Processing Logs",
        "targets": [
          {
            "expr": "{job=\"webhook-gateway\"} |= \"error\"",
            "refId": "A"
          }
        ],
        "options": {
          "showTime": true,
          "showLabels": true,
          "showCommonLabels": false,
          "wrapLogMessage": true,
          "prettifyLogMessage": false,
          "enableLogDetails": true,
          "dedupStrategy": "none",
          "sortOrder": "Descending"
        }
      }
    ],
    "links": [
      {
        "title": "Webhook Gateway API",
        "url": "http://webhook-gateway:8000/docs",
        "targetBlank": true
      },
      {
        "title": "Alert Manager",
        "url": "http://alertmanager:9093",
        "targetBlank": true
      }
    ],
    "annotations": {
      "list": [
        {
          "datasource": "Prometheus",
          "enable": true,
          "expr": "ALERTS{alertstate=\"firing\", component=\"webhook-gateway\"}",
          "tagKeys": "severity,component",
          "titleFormat": "{{ alertname }}",
          "textFormat": "{{ description }}"
        }
      ]
    }
  }
}

DuckDB 커스텀 익스포터

#!/usr/bin/env python3
"""
DuckDB Prometheus Exporter
DuckDB의 상세 메트릭을 Prometheus 형식으로 노출
"""

import time
import duckdb
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
from prometheus_client import REGISTRY
import logging
from typing import Dict, List, Tuple
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DuckDBCollector:
    """DuckDB 메트릭 수집기"""

    def __init__(self, db_path: str):
        self.db_path = db_path

    def collect(self):
        """Prometheus 메트릭 수집"""
        try:
            conn = duckdb.connect(self.db_path, read_only=True)

            # 테이블별 행 수
            yield from self._collect_table_metrics(conn)

            # 웹훅 상태별 통계
            yield from self._collect_webhook_metrics(conn)

            # 처리 성능 메트릭
            yield from self._collect_performance_metrics(conn)

            # 라우팅 규칙 메트릭
            yield from self._collect_routing_metrics(conn)

            # 데이터베이스 통계
            yield from self._collect_database_stats(conn)

            conn.close()

        except Exception as e:
            logger.error(f"Error collecting metrics: {e}")

    def _collect_table_metrics(self, conn) -> List:
        """테이블별 메트릭 수집"""
        tables = [
            'raw_webhooks',
            'webhook_processing_log',
            'routing_rules',
            'webhook_metrics'
        ]

        for table in tables:
            try:
                count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]

                gauge = GaugeMetricFamily(
                    f'duckdb_table_row_count',
                    f'Number of rows in table',
                    labels=['table']
                )
                gauge.add_metric([table], count)
                yield gauge

                # 테이블 크기 (추정)
                size = conn.execute(f"""
                    SELECT 
                        pg_size_pretty(pg_relation_size('{table}'))
                    FROM pg_tables 
                    WHERE tablename = '{table}'
                """).fetchone()

                if size:
                    size_gauge = GaugeMetricFamily(
                        f'duckdb_table_size_bytes',
                        f'Estimated size of table',
                        labels=['table']
                    )
                    size_gauge.add_metric([table], self._parse_size(size[0]))
                    yield size_gauge

            except Exception as e:
                logger.debug(f"Could not get metrics for table {table}: {e}")

    def _collect_webhook_metrics(self, conn) -> List:
        """웹훅 관련 메트릭"""
        # 상태별 분포
        status_dist = conn.execute("""
            SELECT status, COUNT(*) as count
            FROM raw_webhooks
            GROUP BY status
        """).fetchall()

        status_gauge = GaugeMetricFamily(
            'duckdb_webhook_status_count',
            'Number of webhooks by status',
            labels=['status']
        )

        for status, count in status_dist:
            status_gauge.add_metric([status], count)

        yield status_gauge

        # 재시도 분포
        retry_dist = conn.execute("""
            SELECT retry_count, COUNT(*) as count
            FROM raw_webhooks
            GROUP BY retry_count
            ORDER BY retry_count
        """).fetchall()

        retry_gauge = GaugeMetricFamily(
            'duckdb_webhook_retry_distribution',
            'Distribution of webhook retry counts',
            labels=['retry_count']
        )

        for retry_count, count in retry_dist:
            retry_gauge.add_metric([str(retry_count)], count)

        yield retry_gauge

        # 엔드포인트별 통계
        endpoint_stats = conn.execute("""
            SELECT 
                endpoint,
                COUNT(*) as total,
                COUNT(*) FILTER (WHERE status = 'completed') as completed,
                COUNT(*) FILTER (WHERE status = 'failed') as failed
            FROM raw_webhooks
            GROUP BY endpoint
            LIMIT 20
        """).fetchall()

        for endpoint, total, completed, failed in endpoint_stats:
            endpoint_gauge = GaugeMetricFamily(
                'duckdb_webhook_endpoint_stats',
                'Webhook statistics by endpoint',
                labels=['endpoint', 'metric']
            )
            endpoint_gauge.add_metric([endpoint, 'total'], total)
            endpoint_gauge.add_metric([endpoint, 'completed'], completed)
            endpoint_gauge.add_metric([endpoint, 'failed'], failed)
            yield endpoint_gauge

    def _collect_performance_metrics(self, conn) -> List:
        """처리 성능 메트릭"""
        # 시간대별 처리 통계
        hourly_stats = conn.execute("""
            SELECT 
                DATE_TRUNC('hour', processing_completed_at) as hour,
                COUNT(*) as processed_count,
                AVG(processing_duration_ms) as avg_duration,
                MIN(processing_duration_ms) as min_duration,
                MAX(processing_duration_ms) as max_duration,
                PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY processing_duration_ms) as p50,
                PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY processing_duration_ms) as p95,
                PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_duration_ms) as p99
            FROM webhook_processing_log
            WHERE processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
              AND processing_duration_ms IS NOT NULL
            GROUP BY hour
            ORDER BY hour DESC
            LIMIT 24
        """).fetchall()

        for hour, count, avg, min_d, max_d, p50, p95, p99 in hourly_stats:
            hour_str = hour.strftime('%Y-%m-%d %H:00') if hour else 'unknown'

            # 처리량
            throughput_gauge = GaugeMetricFamily(
                'duckdb_hourly_throughput',
                'Webhooks processed per hour',
                labels=['hour']
            )
            throughput_gauge.add_metric([hour_str], count)
            yield throughput_gauge

            # 지연 시간 백분위
            for percentile, value in [('p50', p50), ('p95', p95), ('p99', p99)]:
                if value:
                    latency_gauge = GaugeMetricFamily(
                        f'duckdb_processing_latency_{percentile}_ms',
                        f'{percentile} processing latency in milliseconds',
                        labels=['hour']
                    )
                    latency_gauge.add_metric([hour_str], value)
                    yield latency_gauge

        # URL별 성능 통계
        url_stats = conn.execute("""
            SELECT 
                internal_url,
                COUNT(*) as call_count,
                AVG(processing_duration_ms) as avg_duration,
                COUNT(*) FILTER (WHERE response_status >= 400) as error_count
            FROM webhook_processing_log
            WHERE processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
            GROUP BY internal_url
        """).fetchall()

        for url, calls, avg_duration, errors in url_stats:
            if url:
                # 호출 횟수
                call_gauge = GaugeMetricFamily(
                    'duckdb_internal_api_calls_hourly',
                    'Internal API calls in last hour',
                    labels=['url']
                )
                call_gauge.add_metric([url], calls)
                yield call_gauge

                # 평균 응답 시간
                if avg_duration:
                    duration_gauge = GaugeMetricFamily(
                        'duckdb_internal_api_avg_duration_ms',
                        'Average duration for internal API calls',
                        labels=['url']
                    )
                    duration_gauge.add_metric([url], avg_duration)
                    yield duration_gauge

                # 오류율
                error_rate = (errors / calls * 100) if calls > 0 else 0
                error_gauge = GaugeMetricFamily(
                    'duckdb_internal_api_error_rate',
                    'Error rate percentage for internal API',
                    labels=['url']
                )
                error_gauge.add_metric([url], error_rate)
                yield error_gauge

    def _collect_routing_metrics(self, conn) -> List:
        """라우팅 규칙 메트릭"""
        # 활성 규칙 수
        active_rules = conn.execute("""
            SELECT COUNT(*) FROM routing_rules WHERE is_active = true
        """).fetchone()[0]

        rules_gauge = GaugeMetricFamily(
            'duckdb_active_routing_rules',
            'Number of active routing rules'
        )
        rules_gauge.add_metric([], active_rules)
        yield rules_gauge

        # 규칙별 매칭 통계
        rule_stats = conn.execute("""
            WITH rule_matches AS (
                SELECT 
                    r.rule_name,
                    COUNT(DISTINCT l.webhook_id) as matched_webhooks,
                    AVG(l.processing_duration_ms) as avg_duration,
                    MAX(l.processing_completed_at) as last_match
                FROM routing_rules r
                LEFT JOIN webhook_processing_log l ON l.internal_url = r.target_url
                WHERE r.is_active = true
                  AND l.processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
                GROUP BY r.rule_name
            )
            SELECT * FROM rule_matches
        """).fetchall()

        for rule_name, matches, avg_duration, last_match in rule_stats:
            # 매칭 수
            match_gauge = GaugeMetricFamily(
                'duckdb_routing_rule_matches_hourly',
                'Number of webhooks matched by rule in last hour',
                labels=['rule_name']
            )
            match_gauge.add_metric([rule_name], matches or 0)
            yield match_gauge

            # 평균 처리 시간
            if avg_duration:
                duration_gauge = GaugeMetricFamily(
                    'duckdb_routing_rule_avg_duration_ms',
                    'Average processing duration for rule',
                    labels=['rule_name']
                )
                duration_gauge.add_metric([rule_name], avg_duration)
                yield duration_gauge

    def _collect_database_stats(self, conn) -> List:
        """데이터베이스 전체 통계"""
        # 데이터베이스 크기
        db_size = os.path.getsize(self.db_path) if os.path.exists(self.db_path) else 0

        size_gauge = GaugeMetricFamily(
            'duckdb_database_size_bytes',
            'Total database file size in bytes'
        )
        size_gauge.add_metric([], db_size)
        yield size_gauge

        # 메모리 사용량 (추정)
        try:
            memory_info = conn.execute("SELECT current_memory()").fetchone()
            if memory_info:
                memory_gauge = GaugeMetricFamily(
                    'duckdb_memory_usage_bytes',
                    'Estimated memory usage'
                )
                memory_gauge.add_metric([], memory_info[0])
                yield memory_gauge
        except:
            pass

        # 연결 정보
        try:
            connection_info = conn.execute("""
                SELECT COUNT(*) FROM duckdb_connections()
            """).fetchone()
            if connection_info:
                conn_gauge = GaugeMetricFamily(
                    'duckdb_active_connections',
                    'Number of active connections'
                )
                conn_gauge.add_metric([], connection_info[0])
                yield conn_gauge
        except:
            pass

        # 가장 오래된 미처리 웹훅
        oldest_pending = conn.execute("""
            SELECT 
                MIN(received_at) as oldest,
                EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - MIN(received_at))) as age_seconds
            FROM raw_webhooks
            WHERE status = 'pending'
        """).fetchone()

        if oldest_pending and oldest_pending[1]:
            age_gauge = GaugeMetricFamily(
                'duckdb_oldest_pending_webhook_age_seconds',
                'Age of oldest pending webhook in seconds'
            )
            age_gauge.add_metric([], oldest_pending[1])
            yield age_gauge

        # 일일 처리 통계
        daily_stats = conn.execute("""
            SELECT 
                DATE(created_at) as date,
                COUNT(*) as total,
                COUNT(*) FILTER (WHERE status = 'completed') as completed,
                COUNT(*) FILTER (WHERE status = 'failed' AND retry_count >= 3) as failed
            FROM raw_webhooks
            WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '7 days'
            GROUP BY date
            ORDER BY date DESC
        """).fetchall()

        for date, total, completed, failed in daily_stats:
            date_str = date.strftime('%Y-%m-%d') if date else 'unknown'

            daily_gauge = GaugeMetricFamily(
                'duckdb_daily_webhook_stats',
                'Daily webhook statistics',
                labels=['date', 'status']
            )
            daily_gauge.add_metric([date_str, 'total'], total)
            daily_gauge.add_metric([date_str, 'completed'], completed)
            daily_gauge.add_metric([date_str, 'failed'], failed)
            yield daily_gauge

    def _parse_size(self, size_str: str) -> int:
        """크기 문자열을 바이트로 변환"""
        size_str = size_str.strip()
        units = {
            'B': 1,
            'KB': 1024,
            'MB': 1024 * 1024,
            'GB': 1024 * 1024 * 1024,
            'TB': 1024 * 1024 * 1024 * 1024
        }

        for unit, multiplier in units.items():
            if size_str.endswith(unit):
                try:
                    return int(float(size_str[:-len(unit)].strip()) * multiplier)
                except:
                    return 0

        return 0

def main():
    """메인 함수"""
    # 환경 변수 읽기
    db_path = os.environ.get('DUCKDB_PATH', '/data/webhook_gateway.db')
    port = int(os.environ.get('EXPORTER_PORT', '9091'))

    logger.info(f"Starting DuckDB exporter on port {port}")
    logger.info(f"Database path: {db_path}")

    # 커스텀 컬렉터 등록
    REGISTRY.register(DuckDBCollector(db_path))

    # HTTP 서버 시작
    start_http_server(port)

    logger.info("DuckDB exporter started successfully")

    # 무한 대기
    while True:
        time.sleep(60)

if __name__ == '__main__':
    main()

Docker Compose 통합 구성

version: '3.8'

services:
  # 메인 웹훅 게이트웨이
  webhook-gateway:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
    environment:
      - DATABASE_PATH=/data/webhook_gateway.db
      - LOG_LEVEL=INFO
      - MAX_WORKERS=4
      - BATCH_SIZE=100
      - RETRY_DELAY=60
      - MAX_RETRIES=3
    volumes:
      - ./data:/data
      - ./logs:/logs
    depends_on:
      - internal-api
    networks:
      - webhook-net
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
    labels:
      - "prometheus.io/scrape=true"
      - "prometheus.io/port=8000"
      - "prometheus.io/path=/metrics"

  # 내부 API (예시)
  internal-api:
    image: your-internal-api:latest
    ports:
      - "8080:8080"
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/internal_db
    networks:
      - webhook-net

  # DuckDB 전용 익스포터
  duckdb-exporter:
    build:
      context: .
      dockerfile: Dockerfile.exporter
    ports:
      - "9091:9091"
    environment:
      - DUCKDB_PATH=/data/webhook_gateway.db
      - EXPORTER_PORT=9091
    volumes:
      - ./data:/data:ro
    networks:
      - webhook-net
    depends_on:
      - webhook-gateway

  # Prometheus
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./alert_rules.yml:/etc/prometheus/alert_rules.yml
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/usr/share/prometheus/console_libraries'
      - '--web.console.templates=/usr/share/prometheus/consoles'
      - '--web.enable-lifecycle'
    networks:
      - webhook-net

  # Grafana
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_USER=admin
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_USERS_ALLOW_SIGN_UP=false
      - GF_INSTALL_PLUGINS=grafana-piechart-panel
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana/provisioning:/etc/grafana/provisioning
      - ./grafana/dashboards:/var/lib/grafana/dashboards
    networks:
      - webhook-net
    depends_on:
      - prometheus

  # Alert Manager
  alertmanager:
    image: prom/alertmanager:latest
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
      - alertmanager-data:/alertmanager
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'
    networks:
      - webhook-net

  # Node Exporter (시스템 메트릭)
  node-exporter:
    image: prom/node-exporter:latest
    ports:
      - "9100:9100"
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.sysfs=/host/sys'
      - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
    networks:
      - webhook-net

  # Loki (로그 수집)
  loki:
    image: grafana/loki:latest
    ports:
      - "3100:3100"
    volumes:
      - ./loki-config.yml:/etc/loki/local-config.yaml
      - loki-data:/loki
    command: -config.file=/etc/loki/local-config.yaml
    networks:
      - webhook-net

  # Promtail (로그 전송)
  promtail:
    image: grafana/promtail:latest
    volumes:
      - ./logs:/var/log/webhook
      - ./promtail-config.yml:/etc/promtail/config.yml
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
    command: -config.file=/etc/promtail/config.yml
    networks:
      - webhook-net

  # DuckDB CLI (관리용)
  duckdb-cli:
    image: python:3.11-slim
    volumes:
      - ./data:/data
      - ./scripts:/scripts
    command: tail -f /dev/null
    networks:
      - webhook-net

networks:
  webhook-net:
    driver: bridge

volumes:
  prometheus-data:
  grafana-data:
  alertmanager-data:
  loki-data:

Grafana 프로비저닝 설정

# grafana/provisioning/datasources/prometheus.yml
apiVersion: 1

datasources:
  - name: Prometheus
    type: prometheus
    access: proxy
    url: http://prometheus:9090
    isDefault: true
    editable: true
    jsonData:
      timeInterval: "10s"
      queryTimeout: "60s"
      httpMethod: "POST"

  - name: Loki
    type: loki
    access: proxy
    url: http://loki:3100
    editable: true
    jsonData:
      maxLines: 1000

---
# grafana/provisioning/dashboards/dashboards.yml
apiVersion: 1

providers:
  - name: 'Webhook Gateway Dashboards'
    orgId: 1
    folder: 'Webhook Gateway'
    type: file
    disableDeletion: false
    updateIntervalSeconds: 30
    allowUiUpdates: true
    options:
      path: /var/lib/grafana/dashboards

---
# grafana/provisioning/alerting/alerts.yml
apiVersion: 1

groups:
  - name: webhook_alerts
    folder: "Webhook Gateway"
    interval: 1m
    rules:
      - uid: webhook_queue_alert
        title: High Webhook Queue Size
        condition: webhook_queue_threshold
        data:
          - refId: A
            queryType: ""
            relativeTimeRange:
              from: 300
              to: 0
            datasourceUid: prometheus
            model:
              expr: webhook_queue_size{status="pending"}
              intervalMs: 1000
              maxDataPoints: 43200
              refId: A
        noDataState: NoData
        execErrState: Alerting
        for: 5m
        annotations:
          description: "Webhook queue has {{ $values.A }} pending items"
          runbook_url: "https://wiki.example.com/webhook-queue-high"
          summary: "High webhook queue backlog detected"
        labels:
          severity: warning
          team: platform

      - uid: webhook_processing_stopped
        title: Webhook Processing Stopped
        condition: no_processing
        data:
          - refId: A
            queryType: ""
            relativeTimeRange:
              from: 600
              to: 0
            datasourceUid: prometheus
            model:
              expr: |
                rate(webhook_processed_total[5m]) == 0 
                AND webhook_queue_size{status="pending"} > 0
              intervalMs: 1000
              maxDataPoints: 43200
              refId: A
        noDataState: NoData
        execErrState: Alerting
        for: 5m
        annotations:
          description: "No webhooks processed in 5 minutes with pending queue"
          runbook_url: "https://wiki.example.com/webhook-processing-stopped"
          summary: "Webhook processing has stopped"
        labels:
          severity: critical
          team: platform

---
# alertmanager.yml
global:
  resolve_timeout: 5m
  slack_api_url: '${SLACK_WEBHOOK_URL}'

route:
  group_by: ['alertname', 'cluster', 'service']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 12h
  receiver: 'default'
  routes:
    - match:
        severity: critical
      receiver: critical-alerts
      continue: true
    - match:
        severity: warning
      receiver: warning-alerts

receivers:
  - name: 'default'
    slack_configs:
      - channel: '#alerts'
        title: 'Webhook Gateway Alert'
        text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'

  - name: 'critical-alerts'
    slack_configs:
      - channel: '#critical-alerts'
        title: '🚨 CRITICAL: Webhook Gateway'
        text: |
          {{ range .Alerts }}
          *Alert:* {{ .Labels.alertname }}
          *Summary:* {{ .Annotations.summary }}
          *Description:* {{ .Annotations.description }}
          *Severity:* {{ .Labels.severity }}
          {{ end }}
    pagerduty_configs:
      - service_key: '${PAGERDUTY_SERVICE_KEY}'
        description: '{{ .GroupLabels.alertname }}'

  - name: 'warning-alerts'
    slack_configs:
      - channel: '#platform-alerts'
        title: '⚠️ Warning: Webhook Gateway'
        text: |
          {{ range .Alerts }}
          *Alert:* {{ .Labels.alertname }}
          *Summary:* {{ .Annotations.summary }}
          {{ end }}

inhibit_rules:
  - source_match:
      severity: 'critical'
    target_match:
      severity: 'warning'
    equal: ['alertname', 'cluster', 'service']

대시보드 활용 가이드

주요 모니터링 지표

  1. 실시간 처리 현황
    • Queue Size: 현재 대기 중인 웹훅 수
    • Processing Rate: 초당 처리량
    • Success Rate: 성공률 (목표: 95% 이상)
    • Active Processing: 현재 처리 중인 웹훅 수
  2. 성능 지표
    • Queue Wait Time: 큐 대기 시간 히트맵
    • Processing Duration: 규칙별 처리 시간
    • Internal API Response: 내부 API 응답 시간
    • Database Size: 데이터베이스 크기 추이
  3. 오류 추적
    • Error Distribution: 오류 유형별 분포
    • Failed Webhooks: 실패한 웹훅 추적
    • Retry Statistics: 재시도 통계

알람 대응 매뉴얼

🚨 Critical Alerts

1. Webhook Processing Stopped

# 1. 프로세서 상태 확인
curl http://localhost:8000/health

# 2. 로그 확인
docker logs webhook-gateway --tail 100

# 3. 프로세서 재시작
docker restart webhook-gateway

# 4. 수동으로 pending 웹훅 처리
docker exec -it duckdb-cli python /scripts/manual_process.py

2. High Failure Rate (>10%)

-- 실패 원인 분석
SELECT 
    error_message,
    COUNT(*) as count,
    MIN(processing_started_at) as first_occurrence,
    MAX(processing_started_at) as last_occurrence
FROM webhook_processing_log
WHERE error_message IS NOT NULL
  AND processing_started_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY error_message
ORDER BY count DESC;

-- 특정 규칙 비활성화
UPDATE routing_rules 
SET is_active = false 
WHERE rule_name = 'problematic_rule';

⚠️ Warning Alerts

1. High Queue Backlog (>1000)

# 1. 워커 수 증가
docker scale webhook-gateway=3

# 2. 배치 크기 조정
docker exec webhook-gateway env BATCH_SIZE=200

# 3. 오래된 웹훅 정리
docker exec duckdb-cli duckdb /data/webhook_gateway.db \
  "DELETE FROM raw_webhooks WHERE status = 'completed' AND created_at < CURRENT_TIMESTAMP - INTERVAL '7 days'"

성능 튜닝 가이드

DuckDB 최적화

-- 1. 통계 업데이트
ANALYZE raw_webhooks;
ANALYZE webhook_processing_log;

-- 2. 인덱스 추가
CREATE INDEX idx_processing_log_time 
ON webhook_processing_log(processing_completed_at)
WHERE processing_completed_at IS NOT NULL;

-- 3. 파티셔닝 (월별)
CREATE TABLE raw_webhooks_2024_01 
PARTITION OF raw_webhooks
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- 4. VACUUM 실행
VACUUM FULL;

메모리 설정

# 환경 변수 설정
DUCKDB_MEMORY_LIMIT=4GB
DUCKDB_THREADS=8
DUCKDB_TEMP_DIRECTORY=/tmp/duckdb

백업 및 복구

자동 백업 스크립트

#!/bin/bash
# backup.sh

# 변수 설정
BACKUP_DIR="/backups"
DB_PATH="/data/webhook_gateway.db"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)

# 백업 실행
cp $DB_PATH "$BACKUP_DIR/webhook_gateway_$TIMESTAMP.db"

# S3 업로드 (선택적)
aws s3 cp "$BACKUP_DIR/webhook_gateway_$TIMESTAMP.db" \
  s3://webhook-backups/daily/

# 오래된 백업 삭제 (30일 이상)
find $BACKUP_DIR -name "webhook_gateway_*.db" -mtime +30 -delete

권장사항

장점

  • 완전한 관측성: 모든 단계가 메트릭으로 추적됨
  • 선언적 관리: SQL로 비즈니스 로직 표현
  • 빠른 디버깅: 상세한 로그와 메트릭으로 문제 추적 용이
  • 확장 가능: 수평 확장 및 파티셔닝 지원

한계 및 대응

  • 동시성 제한: Write 락으로 인한 병목
    • 대응: 배치 처리, 읽기 전용 복제본
  • 스케일 한계: 단일 노드 제약
    • 대응: 샤딩, 여러 인스턴스 운영
  • 복잡한 변환: SQL 표현력 한계
    • 대응: Python UDF 활용

적용 시나리오

  • ✅ 중소 규모 이벤트 처리 (초당 수백 건)
  • ✅ 복잡한 라우팅 규칙이 필요한 경우
  • ✅ 강력한 관측성이 필요한 경우
  • ❌ 초고속 처리가 필요한 경우 (초당 수천 건 이상)
  • ❌ 복잡한 상태 관리가 필요한 경우

 

이러한 구조로 DuckDB 기반 Webhook Gateway를 구현하면, 코드 중심이 아닌 데이터 중심의 유연하고 관측 가능한 시스템을 구축할 수 있습니다.

728x90
그리드형(광고전용)

댓글