728x90
DuckDB 기반 Webhook Gateway 구성과 Prometheus/Grafana 연계 모니터링을 포함한 종합적인 구현입니다.
시스템 구성도 및 데이터 흐름
┌─────────────────┐ ┌──────────────────┐ ┌────────────────────┐ ┌───────────┐
│ External APIs │────▶│ Webhook Gateway │────▶│ Internal Processor │────▶│ 내부 시스템 │
└─────────────────┘ │ (FastAPI) │ └────────────────────┘ └───────────┘
│ │
│ ┌─────────────┐ │ ▲
│ │ DuckDB │ │ │
│ │ (Queue DB) │ │ │
│ └─────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────┐ │ │
│ │ Processor │─┼───────────┘
│ │ (Async) │ │
│ └─────────────┘ │
└──────────────────┘
│
▼
┌─────────────────┐
│ Prometheus │
│ (Metrics) │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Grafana │
│ (Dashboard) │
└─────────────────┘
핵심 컴포넌트 역할
- Webhook Gateway (FastAPI)
- 외부 시스템으로부터 HTTP 요청 수신
- 요청을 즉시 DuckDB에 저장 (큐잉)
- 202 Accepted 응답 반환
- Prometheus 메트릭 노출
- DuckDB
- 메시지 큐 역할 (raw_webhooks 테이블)
- 처리 로그 저장 (webhook_processing_log)
- 라우팅 규칙 관리 (routing_rules)
- SQL 기반 데이터 변환
- Async Processor
- 백그라운드에서 지속적으로 실행
- DuckDB에서 대기 중인 웹훅 조회
- 라우팅 규칙에 따라 변환 및 전달
- 실패 시 재시도 관리
- Prometheus + Grafana
- 실시간 메트릭 수집 및 저장
- 시각화 대시보드 제공
- 알림 규칙 관리
300x250
Prometheus 메트릭 통합 웹 서버
import json
import asyncio
import time
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, List
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, BackgroundTasks, Response
from fastapi.responses import JSONResponse
import duckdb
import httpx
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from prometheus_client import CollectorRegistry, multiprocess, start_http_server
import logging
from dataclasses import dataclass
from enum import Enum
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Prometheus 메트릭 정의
registry = CollectorRegistry()
# 카운터 메트릭
webhook_received_total = Counter(
'webhook_received_total',
'Total number of webhooks received',
['endpoint', 'source'],
registry=registry
)
webhook_processed_total = Counter(
'webhook_processed_total',
'Total number of webhooks processed',
['status', 'rule_name'],
registry=registry
)
internal_api_calls_total = Counter(
'internal_api_calls_total',
'Total number of internal API calls',
['target_url', 'status_code'],
registry=registry
)
# 히스토그램 메트릭
webhook_processing_duration = Histogram(
'webhook_processing_duration_seconds',
'Time spent processing webhooks',
['rule_name'],
buckets=(0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0),
registry=registry
)
internal_api_duration = Histogram(
'internal_api_duration_seconds',
'Internal API call duration',
['target_url'],
buckets=(0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0),
registry=registry
)
queue_wait_time = Histogram(
'webhook_queue_wait_time_seconds',
'Time webhooks spend in queue before processing',
buckets=(1, 5, 10, 30, 60, 300, 600, 1800, 3600),
registry=registry
)
# 게이지 메트릭
webhook_queue_size = Gauge(
'webhook_queue_size',
'Current number of webhooks in queue',
['status'],
registry=registry
)
active_processing_count = Gauge(
'webhook_active_processing_count',
'Number of webhooks currently being processed',
registry=registry
)
database_size_bytes = Gauge(
'webhook_database_size_bytes',
'Size of DuckDB database file',
registry=registry
)
last_successful_processing = Gauge(
'webhook_last_successful_processing_timestamp',
'Timestamp of last successful webhook processing',
['rule_name'],
registry=registry
)
# 상태 열거형
class WebhookStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
# DuckDB 연결 관리자 (확장판)
class EnhancedDuckDBManager:
def __init__(self, db_path: str = "webhook_gateway.db"):
self.db_path = db_path
self.conn = None
self.init_database()
self._register_custom_functions()
def init_database(self):
"""데이터베이스 초기화 및 테이블 생성"""
conn = duckdb.connect(self.db_path)
# 기본 설정
conn.execute("""
SET memory_limit = '2GB';
SET threads = 4;
""")
# 시퀀스 생성
conn.execute("""
CREATE SEQUENCE IF NOT EXISTS webhook_seq START 1;
CREATE SEQUENCE IF NOT EXISTS log_seq START 1;
""")
# 메인 테이블들
conn.execute("""
CREATE TABLE IF NOT EXISTS raw_webhooks (
id BIGINT PRIMARY KEY DEFAULT nextval('webhook_seq'),
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_ip VARCHAR,
endpoint VARCHAR,
headers JSON,
payload JSON,
status VARCHAR DEFAULT 'pending',
retry_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# 인덱스 생성
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_webhook_status
ON raw_webhooks(status, retry_count, received_at);
CREATE INDEX IF NOT EXISTS idx_webhook_created
ON raw_webhooks(created_at);
""")
# 처리 로그 테이블
conn.execute("""
CREATE TABLE IF NOT EXISTS webhook_processing_log (
id BIGINT PRIMARY KEY DEFAULT nextval('log_seq'),
webhook_id BIGINT,
attempt_number INT,
processing_started_at TIMESTAMP,
processing_completed_at TIMESTAMP,
internal_url VARCHAR,
request_payload JSON,
response_status INT,
response_body TEXT,
error_message TEXT,
processing_duration_ms INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# 라우팅 규칙 테이블
conn.execute("""
CREATE TABLE IF NOT EXISTS routing_rules (
id INT PRIMARY KEY,
rule_name VARCHAR UNIQUE,
description TEXT,
filter_query TEXT,
transform_query TEXT,
target_url VARCHAR,
timeout_seconds INT DEFAULT 30,
retry_policy JSON,
is_active BOOLEAN DEFAULT true,
priority INT DEFAULT 100,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# 메트릭 집계 테이블
conn.execute("""
CREATE TABLE IF NOT EXISTS webhook_metrics (
id BIGINT PRIMARY KEY DEFAULT nextval('webhook_seq'),
metric_type VARCHAR,
metric_name VARCHAR,
metric_value DOUBLE,
labels JSON,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# 샘플 라우팅 규칙
conn.execute("""
INSERT OR REPLACE INTO routing_rules
(id, rule_name, description, filter_query, transform_query, target_url, retry_policy)
VALUES
(1, 'order_events',
'Process order creation events',
"json_extract_string(payload, '$.event_type') = 'order.created'",
"SELECT
json_extract(payload, '$.order_id') as order_id,
json_extract(payload, '$.customer_id') as customer_id,
json_extract(payload, '$.total_amount') as amount,
json_extract(payload, '$.items') as items",
'http://internal-api:8080/orders/process',
'{"max_retries": 3, "backoff_multiplier": 2}'),
(2, 'payment_events',
'Handle payment related events',
"json_extract_string(payload, '$.event_type') LIKE 'payment.%'",
"SELECT payload",
'http://internal-api:8080/payments/webhook',
'{"max_retries": 5, "backoff_multiplier": 1.5}'),
(3, 'user_events',
'User activity tracking',
"json_extract_string(payload, '$.category') = 'user'",
"SELECT
json_extract(payload, '$.user_id') as user_id,
json_extract(payload, '$.action') as action,
json_extract(payload, '$.metadata') as metadata",
'http://internal-api:8080/users/activity',
'{"max_retries": 2, "backoff_multiplier": 1}')
""")
conn.close()
def _register_custom_functions(self):
"""커스텀 SQL 함수 등록"""
conn = self.get_connection()
# JSON 깊은 추출
def json_deep_extract(json_str: str, path: str) -> Any:
try:
import json
data = json.loads(json_str) if isinstance(json_str, str) else json_str
# JSONPath 스타일 파싱
parts = path.strip('$.').split('.')
for part in parts:
if '[' in part and ']' in part:
# 배열 인덱스 처리
key = part[:part.index('[')]
index = int(part[part.index('[')+1:part.index(']')])
data = data.get(key, [])[index] if isinstance(data, dict) else None
else:
data = data.get(part) if isinstance(data, dict) else None
if data is None:
break
return json.dumps(data) if isinstance(data, (dict, list)) else data
except:
return None
conn.create_function("json_deep_extract", json_deep_extract)
def get_connection(self):
if not self.conn:
self.conn = duckdb.connect(self.db_path)
return self.conn
def get_db_size(self) -> int:
"""데이터베이스 파일 크기 반환"""
import os
try:
return os.path.getsize(self.db_path)
except:
return 0
# 메트릭 수집기
class MetricsCollector:
def __init__(self, db_manager: EnhancedDuckDBManager):
self.db_manager = db_manager
self._start_background_collection()
def _start_background_collection(self):
"""백그라운드 메트릭 수집 시작"""
asyncio.create_task(self._collect_metrics_loop())
async def _collect_metrics_loop(self):
"""주기적으로 메트릭 수집"""
while True:
try:
await self.collect_queue_metrics()
await self.collect_db_metrics()
await asyncio.sleep(10) # 10초마다 수집
except Exception as e:
logger.error(f"Metrics collection error: {e}")
await asyncio.sleep(30)
async def collect_queue_metrics(self):
"""큐 관련 메트릭 수집"""
conn = self.db_manager.get_connection()
# 상태별 웹훅 수
status_counts = conn.execute("""
SELECT status, COUNT(*) as count
FROM raw_webhooks
GROUP BY status
""").fetchall()
for status, count in status_counts:
webhook_queue_size.labels(status=status).set(count)
# 큐 대기 시간 계산
waiting_times = conn.execute("""
SELECT
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - received_at)) as wait_seconds
FROM raw_webhooks
WHERE status = 'pending'
LIMIT 100
""").fetchall()
for (wait_time,) in waiting_times:
queue_wait_time.observe(wait_time)
async def collect_db_metrics(self):
"""데이터베이스 메트릭 수집"""
database_size_bytes.set(self.db_manager.get_db_size())
# 향상된 웹훅 프로세서
class EnhancedWebhookProcessor:
def __init__(self, db_manager: EnhancedDuckDBManager):
self.db_manager = db_manager
self.http_client = httpx.AsyncClient(timeout=30.0)
self.processing = False
self.concurrent_limit = 10 # 동시 처리 제한
self.processing_semaphore = asyncio.Semaphore(self.concurrent_limit)
async def start_processing(self):
"""백그라운드 처리 시작"""
self.processing = True
# 여러 워커 실행
workers = []
for i in range(3): # 3개의 워커
workers.append(asyncio.create_task(self._worker_loop(i)))
# 재시도 전용 워커
workers.append(asyncio.create_task(self._retry_worker_loop()))
await asyncio.gather(*workers, return_exceptions=True)
async def stop_processing(self):
"""처리 중지"""
self.processing = False
await self.http_client.aclose()
async def _worker_loop(self, worker_id: int):
"""워커 루프"""
logger.info(f"Worker {worker_id} started")
while self.processing:
try:
await self._process_batch()
await asyncio.sleep(0.5) # 짧은 대기
except Exception as e:
logger.error(f"Worker {worker_id} error: {e}")
await asyncio.sleep(5)
async def _retry_worker_loop(self):
"""재시도 전용 워커"""
logger.info("Retry worker started")
while self.processing:
try:
await self._process_retries()
await asyncio.sleep(30) # 30초마다 재시도 확인
except Exception as e:
logger.error(f"Retry worker error: {e}")
await asyncio.sleep(60)
async def _process_batch(self):
"""배치 처리"""
conn = self.db_manager.get_connection()
# 처리할 웹훅 조회 및 잠금
batch = conn.execute("""
UPDATE raw_webhooks
SET status = 'processing',
updated_at = CURRENT_TIMESTAMP
WHERE id IN (
SELECT id FROM raw_webhooks
WHERE status = 'pending'
ORDER BY received_at
LIMIT 10
)
RETURNING id, payload, headers, endpoint, retry_count, received_at
""").fetchall()
if not batch:
return
# 동시 처리
tasks = []
for webhook_data in batch:
task = asyncio.create_task(self._process_single_webhook(*webhook_data))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def _process_retries(self):
"""재시도 처리"""
conn = self.db_manager.get_connection()
# 재시도 대상 조회
retries = conn.execute("""
SELECT id, payload, headers, endpoint, retry_count, received_at
FROM raw_webhooks
WHERE status = 'failed'
AND retry_count < 3
AND updated_at < CURRENT_TIMESTAMP - INTERVAL '1 minute' * POWER(2, retry_count)
ORDER BY updated_at
LIMIT 5
""").fetchall()
for webhook_data in retries:
await self._process_single_webhook(*webhook_data)
async def _process_single_webhook(self, webhook_id: int, payload: dict,
headers: dict, endpoint: str,
retry_count: int, received_at: datetime):
"""단일 웹훅 처리"""
async with self.processing_semaphore:
active_processing_count.inc()
# 큐 대기 시간 기록
wait_time = (datetime.now() - received_at).total_seconds()
queue_wait_time.observe(wait_time)
start_time = time.time()
rule_name = "unknown"
try:
# 라우팅 규칙 찾기
routing_rule = await self._find_routing_rule(payload, endpoint)
if not routing_rule:
# 규칙 없음 - 완료 처리
await self._mark_completed(webhook_id, "no_matching_rule")
return
rule_name = routing_rule['rule_name']
# 데이터 변환
transformed_data = await self._transform_data(
payload,
routing_rule['transform_query']
)
# 내부 API 호출
api_start = time.time()
response = await self._call_internal_api(
routing_rule['target_url'],
transformed_data,
headers,
routing_rule.get('timeout_seconds', 30)
)
api_duration = time.time() - api_start
# API 호출 메트릭
internal_api_duration.labels(
target_url=routing_rule['target_url']
).observe(api_duration)
internal_api_calls_total.labels(
target_url=routing_rule['target_url'],
status_code=response.status_code
).inc()
# 처리 로그 기록
await self._log_processing(
webhook_id, retry_count + 1,
routing_rule['target_url'],
transformed_data, response,
int((time.time() - start_time) * 1000)
)
# 성공/실패 처리
if response.status_code < 400:
await self._mark_completed(webhook_id, rule_name)
last_successful_processing.labels(
rule_name=rule_name
).set(time.time())
else:
raise Exception(f"API returned {response.status_code}")
except Exception as e:
logger.error(f"Processing error for webhook {webhook_id}: {e}")
await self._mark_failed(webhook_id, str(e), retry_count)
webhook_processed_total.labels(
status="failed",
rule_name=rule_name
).inc()
finally:
active_processing_count.dec()
# 처리 시간 기록
duration = time.time() - start_time
webhook_processing_duration.labels(
rule_name=rule_name
).observe(duration)
async def _find_routing_rule(self, payload: dict, endpoint: str) -> Optional[Dict]:
"""라우팅 규칙 찾기"""
conn = self.db_manager.get_connection()
# 규칙 매칭
rules = conn.execute("""
SELECT
r.id,
r.rule_name,
r.transform_query,
r.target_url,
r.timeout_seconds,
r.retry_policy
FROM routing_rules r
WHERE r.is_active = true
ORDER BY r.priority DESC
""").fetchall()
for rule in rules:
# 필터 조건 평가
try:
matches = conn.execute(f"""
SELECT ({rule[2]}) as matches
FROM (SELECT ? as payload) t
""", [json.dumps(payload)]).fetchone()
if matches and matches[0]:
return {
'id': rule[0],
'rule_name': rule[1],
'transform_query': rule[2],
'target_url': rule[3],
'timeout_seconds': rule[4],
'retry_policy': rule[5]
}
except:
continue
return None
async def _transform_data(self, payload: dict, transform_query: str) -> dict:
"""데이터 변환"""
if transform_query.strip().upper() == "SELECT PAYLOAD":
return payload
conn = self.db_manager.get_connection()
try:
# 변환 실행
result = conn.execute(f"""
WITH source AS (SELECT ? as payload)
{transform_query}
FROM source
""", [json.dumps(payload)]).fetchone()
if result:
columns = [desc[0] for desc in conn.description]
return dict(zip(columns, result))
except Exception as e:
logger.error(f"Transform error: {e}")
return payload
async def _call_internal_api(self, url: str, data: dict,
headers: dict, timeout: int) -> httpx.Response:
"""내부 API 호출"""
forward_headers = {
'Content-Type': 'application/json',
'X-Webhook-Gateway': 'duckdb-v1',
'X-Original-IP': headers.get('x-forwarded-for', ''),
}
response = await self.http_client.post(
url,
json=data,
headers=forward_headers,
timeout=timeout
)
return response
async def _log_processing(self, webhook_id: int, attempt: int,
target_url: str, request_data: dict,
response: httpx.Response, duration_ms: int):
"""처리 로그 기록"""
conn = self.db_manager.get_connection()
conn.execute("""
INSERT INTO webhook_processing_log
(webhook_id, attempt_number, processing_started_at,
processing_completed_at, internal_url, request_payload,
response_status, response_body, processing_duration_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
webhook_id, attempt,
datetime.now() - timedelta(milliseconds=duration_ms),
datetime.now(), target_url, json.dumps(request_data),
response.status_code, response.text[:1000], # 응답 본문 제한
duration_ms
])
async def _mark_completed(self, webhook_id: int, rule_name: str):
"""완료 처리"""
conn = self.db_manager.get_connection()
conn.execute("""
UPDATE raw_webhooks
SET status = 'completed',
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", [webhook_id])
webhook_processed_total.labels(
status="completed",
rule_name=rule_name
).inc()
async def _mark_failed(self, webhook_id: int, error: str, retry_count: int):
"""실패 처리"""
conn = self.db_manager.get_connection()
conn.execute("""
UPDATE raw_webhooks
SET status = 'failed',
retry_count = retry_count + 1,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", [webhook_id])
# 에러 로그
conn.execute("""
INSERT INTO webhook_processing_log
(webhook_id, attempt_number, error_message,
processing_started_at, processing_completed_at)
VALUES (?, ?, ?, ?, ?)
""", [
webhook_id, retry_count + 1, error,
datetime.now(), datetime.now()
])
# FastAPI 앱 설정
app = FastAPI(title="Webhook Gateway with Metrics")
# 전역 인스턴스
db_manager = EnhancedDuckDBManager()
processor = EnhancedWebhookProcessor(db_manager)
metrics_collector = MetricsCollector(db_manager)
@asynccontextmanager
async def lifespan(app: FastAPI):
# 시작
logger.info("Starting webhook gateway...")
asyncio.create_task(processor.start_processing())
yield
# 종료
logger.info("Stopping webhook gateway...")
await processor.stop_processing()
app.router.lifespan_context = lifespan
# API 엔드포인트들
@app.post("/webhook/{endpoint_path:path}")
async def receive_webhook(endpoint_path: str, request: Request):
"""웹훅 수신"""
start_time = time.time()
try:
# 요청 파싱
body = await request.body()
try:
payload = json.loads(body)
except:
payload = {"raw_body": body.decode('utf-8')}
headers = dict(request.headers)
source_ip = request.client.host
# 메트릭 기록
webhook_received_total.labels(
endpoint=endpoint_path,
source=source_ip.split('.')[0] + '.x.x.x' # IP 익명화
).inc()
# DB 저장
conn = db_manager.get_connection()
result = conn.execute("""
INSERT INTO raw_webhooks
(source_ip, endpoint, headers, payload)
VALUES (?, ?, ?, ?)
RETURNING id
""", [source_ip, endpoint_path, json.dumps(headers), json.dumps(payload)]).fetchone()
webhook_id = result[0]
return JSONResponse(
status_code=202,
content={
"status": "accepted",
"webhook_id": webhook_id,
"message": "Webhook queued for processing"
}
)
except Exception as e:
logger.error(f"Webhook receive error: {e}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
@app.get("/metrics")
async def get_metrics():
"""Prometheus 메트릭 엔드포인트"""
return Response(
content=generate_latest(registry),
media_type=CONTENT_TYPE_LATEST
)
@app.get("/status/{webhook_id}")
async def get_webhook_status(webhook_id: int):
"""웹훅 상태 조회"""
conn = db_manager.get_connection()
# 웹훅 정보
webhook = conn.execute("""
SELECT
w.id, w.status, w.retry_count,
w.received_at, w.created_at, w.updated_at,
w.endpoint, w.payload
FROM raw_webhooks w
WHERE w.id = ?
""", [webhook_id]).fetchone()
if not webhook:
return JSONResponse(status_code=404, content={"error": "Not found"})
# 처리 이력
logs = conn.execute("""
SELECT
attempt_number, processing_started_at,
processing_completed_at, internal_url,
response_status, error_message,
processing_duration_ms
FROM webhook_processing_log
WHERE webhook_id = ?
ORDER BY attempt_number DESC
""", [webhook_id]).fetchall()
return {
"webhook": {
"id": webhook[0],
"status": webhook[1],
"retry_count": webhook[2],
"received_at": webhook[3].isoformat() if webhook[3] else None,
"created_at": webhook[4].isoformat() if webhook[4] else None,
"updated_at": webhook[5].isoformat() if webhook[5] else None,
"endpoint": webhook[6]
},
"processing_history": [
{
"attempt": log[0],
"started_at": log[1].isoformat() if log[1] else None,
"completed_at": log[2].isoformat() if log[2] else None,
"target_url": log[3],
"response_status": log[4],
"error": log[5],
"duration_ms": log[6]
}
for log in logs
]
}
@app.get("/health")
async def health_check():
"""헬스 체크"""
try:
conn = db_manager.get_connection()
conn.execute("SELECT 1").fetchone()
return {
"status": "healthy",
"database": "connected",
"processor": "running" if processor.processing else "stopped"
}
except Exception as e:
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e)
}
)
@app.get("/dashboard/summary")
async def dashboard_summary():
"""대시보드용 요약 데이터"""
conn = db_manager.get_connection()
summary = conn.execute("""
WITH time_ranges AS (
SELECT
CURRENT_TIMESTAMP - INTERVAL '1 hour' as hour_ago,
CURRENT_TIMESTAMP - INTERVAL '24 hours' as day_ago,
CURRENT_TIMESTAMP - INTERVAL '7 days' as week_ago
),
stats AS (
SELECT
-- 전체 통계
COUNT(*) as total_webhooks,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed' AND retry_count >= 3) as failed,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'processing') as processing,
-- 시간대별 통계
COUNT(*) FILTER (WHERE created_at > (SELECT hour_ago FROM time_ranges)) as last_hour,
COUNT(*) FILTER (WHERE created_at > (SELECT day_ago FROM time_ranges)) as last_day,
COUNT(*) FILTER (WHERE created_at > (SELECT week_ago FROM time_ranges)) as last_week,
-- 성공률
ROUND(100.0 * COUNT(*) FILTER (WHERE status = 'completed') /
NULLIF(COUNT(*) FILTER (WHERE status IN ('completed', 'failed')), 0), 2) as success_rate
FROM raw_webhooks
),
processing_stats AS (
SELECT
AVG(processing_duration_ms) as avg_duration_ms,
MIN(processing_duration_ms) as min_duration_ms,
MAX(processing_duration_ms) as max_duration_ms,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY processing_duration_ms) as p50_duration_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY processing_duration_ms) as p95_duration_ms,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_duration_ms) as p99_duration_ms
FROM webhook_processing_log
WHERE processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
AND processing_duration_ms IS NOT NULL
),
rule_stats AS (
SELECT
r.rule_name,
COUNT(DISTINCT w.id) as webhook_count,
AVG(l.processing_duration_ms) as avg_duration_ms
FROM routing_rules r
LEFT JOIN webhook_processing_log l ON l.internal_url = r.target_url
LEFT JOIN raw_webhooks w ON w.id = l.webhook_id
WHERE r.is_active = true
GROUP BY r.rule_name
)
SELECT
s.*,
p.*,
(SELECT JSON_GROUP_ARRAY(JSON_OBJECT(
'rule_name', rule_name,
'webhook_count', webhook_count,
'avg_duration_ms', avg_duration_ms
)) FROM rule_stats) as rules_performance
FROM stats s, processing_stats p
""").fetchone()
if not summary:
return {"error": "No data available"}
# 결과 포맷팅
return {
"overview": {
"total_webhooks": summary[0],
"completed": summary[1],
"failed": summary[2],
"pending": summary[3],
"processing": summary[4],
"success_rate": summary[8]
},
"volume": {
"last_hour": summary[5],
"last_day": summary[6],
"last_week": summary[7]
},
"performance": {
"avg_duration_ms": summary[9],
"min_duration_ms": summary[10],
"max_duration_ms": summary[11],
"p50_duration_ms": summary[12],
"p95_duration_ms": summary[13],
"p99_duration_ms": summary[14]
},
"rules_performance": json.loads(summary[15]) if summary[15] else []
}
# 라우팅 규칙 관리 API
@app.get("/routing-rules")
async def list_routing_rules():
"""라우팅 규칙 목록"""
conn = db_manager.get_connection()
rules = conn.execute("""
SELECT
id, rule_name, description,
filter_query, transform_query,
target_url, is_active, priority,
created_at, updated_at
FROM routing_rules
ORDER BY priority DESC, created_at DESC
""").fetchall()
return {
"rules": [
{
"id": r[0],
"rule_name": r[1],
"description": r[2],
"filter_query": r[3],
"transform_query": r[4],
"target_url": r[5],
"is_active": r[6],
"priority": r[7],
"created_at": r[8].isoformat() if r[8] else None,
"updated_at": r[9].isoformat() if r[9] else None
}
for r in rules
]
}
@app.post("/routing-rules")
async def create_routing_rule(rule: Dict[str, Any]):
"""새 라우팅 규칙 생성"""
conn = db_manager.get_connection()
try:
# 규칙 검증
test_payload = rule.get('test_payload', {})
# 필터 쿼리 테스트
conn.execute(f"""
SELECT ({rule['filter_query']}) as result
FROM (SELECT ? as payload) t
""", [json.dumps(test_payload)])
# 규칙 삽입
result = conn.execute("""
INSERT INTO routing_rules
(rule_name, description, filter_query,
transform_query, target_url, priority, retry_policy)
VALUES (?, ?, ?, ?, ?, ?, ?)
RETURNING id
""", [
rule['rule_name'],
rule.get('description', ''),
rule['filter_query'],
rule['transform_query'],
rule['target_url'],
rule.get('priority', 100),
json.dumps(rule.get('retry_policy', {}))
]).fetchone()
return {
"id": result[0],
"message": "Routing rule created successfully"
}
except Exception as e:
return JSONResponse(
status_code=400,
content={"error": f"Invalid rule: {str(e)}"}
)
@app.put("/routing-rules/{rule_id}/toggle")
async def toggle_routing_rule(rule_id: int):
"""라우팅 규칙 활성화/비활성화"""
conn = db_manager.get_connection()
conn.execute("""
UPDATE routing_rules
SET is_active = NOT is_active,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", [rule_id])
return {"message": "Rule toggled successfully"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Prometheus 설정
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
monitor: 'webhook-gateway'
# Alerting 설정
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
# 규칙 파일 로드
rule_files:
- "alert_rules.yml"
# 스크래핑 대상
scrape_configs:
# Webhook Gateway 메트릭
- job_name: 'webhook-gateway'
static_configs:
- targets: ['webhook-gateway:8000']
metrics_path: '/metrics'
scrape_interval: 10s
scrape_timeout: 5s
# DuckDB 익스포터 (옵션)
- job_name: 'duckdb-exporter'
static_configs:
- targets: ['duckdb-exporter:9091']
scrape_interval: 30s
# Node Exporter (시스템 메트릭)
- job_name: 'node'
static_configs:
- targets: ['node-exporter:9100']
# Prometheus 자체 메트릭
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
---
# alert_rules.yml
groups:
- name: webhook_gateway_alerts
interval: 30s
rules:
# 큐 크기 알림
- alert: WebhookQueueBacklog
expr: webhook_queue_size{status="pending"} > 1000
for: 5m
labels:
severity: warning
component: webhook-gateway
annotations:
summary: "High webhook queue backlog"
description: "Webhook queue has {{ $value }} pending items"
# 처리 지연 알림
- alert: WebhookProcessingDelay
expr: histogram_quantile(0.95, rate(webhook_queue_wait_time_seconds_bucket[5m])) > 300
for: 5m
labels:
severity: critical
component: webhook-gateway
annotations:
summary: "High webhook processing delay"
description: "95th percentile queue wait time is {{ $value }}s"
# 실패율 알림
- alert: WebhookHighFailureRate
expr: |
rate(webhook_processed_total{status="failed"}[5m]) /
rate(webhook_processed_total[5m]) > 0.1
for: 5m
labels:
severity: critical
component: webhook-gateway
annotations:
summary: "High webhook failure rate"
description: "Failure rate is {{ $value | humanizePercentage }}"
# API 응답 시간 알림
- alert: InternalAPISlowResponse
expr: |
histogram_quantile(0.95,
rate(internal_api_duration_seconds_bucket[5m])
) > 5
for: 5m
labels:
severity: warning
component: internal-api
annotations:
summary: "Slow internal API responses"
description: "95th percentile response time is {{ $value }}s for {{ $labels.target_url }}"
# 데이터베이스 크기 알림
- alert: DatabaseSizeLarge
expr: webhook_database_size_bytes > 10737418240 # 10GB
for: 30m
labels:
severity: warning
component: database
annotations:
summary: "Database size is large"
description: "DuckDB database size is {{ $value | humanize1024 }}B"
# 처리 중단 알림
- alert: WebhookProcessingStopped
expr: |
rate(webhook_processed_total[5m]) == 0
and webhook_queue_size{status="pending"} > 0
for: 5m
labels:
severity: critical
component: webhook-gateway
annotations:
summary: "Webhook processing has stopped"
description: "No webhooks processed in 5 minutes with {{ $value }} pending"
Grafana 대시보드 설정
{
"dashboard": {
"id": null,
"uid": "webhook-gateway",
"title": "Webhook Gateway Dashboard",
"tags": ["webhook", "gateway", "duckdb"],
"timezone": "browser",
"schemaVersion": 30,
"version": 1,
"refresh": "10s",
"time": {
"from": "now-6h",
"to": "now"
},
"panels": [
{
"id": 1,
"gridPos": { "h": 8, "w": 6, "x": 0, "y": 0 },
"type": "stat",
"title": "Total Webhooks Received",
"targets": [
{
"expr": "sum(increase(webhook_received_total[1h]))",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"]
},
"orientation": "auto",
"textMode": "auto",
"colorMode": "value",
"graphMode": "area",
"justifyMode": "auto"
}
},
{
"id": 2,
"gridPos": { "h": 8, "w": 6, "x": 6, "y": 0 },
"type": "gauge",
"title": "Current Queue Size",
"targets": [
{
"expr": "sum(webhook_queue_size{status=\"pending\"})",
"refId": "A"
}
],
"options": {
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 100 },
{ "color": "red", "value": 500 }
]
},
"unit": "short",
"max": 1000,
"min": 0
}
}
},
{
"id": 3,
"gridPos": { "h": 8, "w": 6, "x": 12, "y": 0 },
"type": "stat",
"title": "Success Rate",
"targets": [
{
"expr": "sum(rate(webhook_processed_total{status=\"completed\"}[5m])) / sum(rate(webhook_processed_total[5m])) * 100",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"]
},
"orientation": "auto",
"textMode": "auto",
"colorMode": "value",
"graphMode": "none",
"justifyMode": "auto"
},
"fieldConfig": {
"defaults": {
"unit": "percent",
"decimals": 2,
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "red", "value": null },
{ "color": "yellow", "value": 90 },
{ "color": "green", "value": 95 }
]
}
}
}
},
{
"id": 4,
"gridPos": { "h": 8, "w": 6, "x": 18, "y": 0 },
"type": "stat",
"title": "Active Processing",
"targets": [
{
"expr": "webhook_active_processing_count",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"values": false,
"calcs": ["lastNotNull"]
},
"orientation": "auto",
"textMode": "auto",
"colorMode": "background",
"graphMode": "area",
"justifyMode": "auto"
}
},
{
"id": 5,
"gridPos": { "h": 10, "w": 12, "x": 0, "y": 8 },
"type": "graph",
"title": "Webhook Processing Rate",
"targets": [
{
"expr": "sum(rate(webhook_received_total[5m])) by (endpoint)",
"legendFormat": "Received - {{endpoint}}",
"refId": "A"
},
{
"expr": "sum(rate(webhook_processed_total{status=\"completed\"}[5m])) by (rule_name)",
"legendFormat": "Completed - {{rule_name}}",
"refId": "B"
},
{
"expr": "sum(rate(webhook_processed_total{status=\"failed\"}[5m])) by (rule_name)",
"legendFormat": "Failed - {{rule_name}}",
"refId": "C"
}
],
"yaxes": [
{
"format": "short",
"label": "Requests/sec",
"show": true
},
{
"format": "short",
"show": false
}
],
"xaxis": {
"show": true
}
},
{
"id": 6,
"gridPos": { "h": 10, "w": 12, "x": 12, "y": 8 },
"type": "heatmap",
"title": "Queue Wait Time Heatmap",
"targets": [
{
"expr": "sum(increase(webhook_queue_wait_time_seconds_bucket[1m])) by (le)",
"format": "heatmap",
"refId": "A"
}
],
"options": {
"calculate": false,
"yAxis": {
"axisLabel": "Wait Time",
"axisPlacement": "left",
"reverse": false,
"unit": "s"
},
"rowsFrame": {
"layout": "auto"
},
"color": {
"mode": "scheme",
"scheme": "Spectral",
"steps": 128
},
"cellGap": 1,
"filterValues": {
"le": 1e-09
}
}
},
{
"id": 7,
"gridPos": { "h": 10, "w": 12, "x": 0, "y": 18 },
"type": "graph",
"title": "Processing Duration by Rule (95th percentile)",
"targets": [
{
"expr": "histogram_quantile(0.95, sum(rate(webhook_processing_duration_seconds_bucket[5m])) by (rule_name, le))",
"legendFormat": "{{rule_name}}",
"refId": "A"
}
],
"yaxes": [
{
"format": "s",
"label": "Duration",
"show": true
},
{
"format": "short",
"show": false
}
],
"xaxis": {
"show": true
}
},
{
"id": 8,
"gridPos": { "h": 10, "w": 12, "x": 12, "y": 18 },
"type": "graph",
"title": "Internal API Response Times",
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(internal_api_duration_seconds_bucket[5m])) by (target_url, le))",
"legendFormat": "p50 - {{target_url}}",
"refId": "A"
},
{
"expr": "histogram_quantile(0.95, sum(rate(internal_api_duration_seconds_bucket[5m])) by (target_url, le))",
"legendFormat": "p95 - {{target_url}}",
"refId": "B"
},
{
"expr": "histogram_quantile(0.99, sum(rate(internal_api_duration_seconds_bucket[5m])) by (target_url, le))",
"legendFormat": "p99 - {{target_url}}",
"refId": "C"
}
],
"yaxes": [
{
"format": "s",
"label": "Response Time",
"show": true
},
{
"format": "short",
"show": false
}
],
"xaxis": {
"show": true
}
},
{
"id": 9,
"gridPos": { "h": 8, "w": 8, "x": 0, "y": 28 },
"type": "piechart",
"title": "Webhook Status Distribution",
"targets": [
{
"expr": "sum(webhook_queue_size) by (status)",
"legendFormat": "{{status}}",
"refId": "A"
}
],
"options": {
"pieType": "donut",
"displayLabels": ["name", "percent"],
"legendDisplayMode": "list",
"legendPlacement": "right"
}
},
{
"id": 10,
"gridPos": { "h": 8, "w": 8, "x": 8, "y": 28 },
"type": "table",
"title": "Top Errors (Last Hour)",
"targets": [
{
"expr": "topk(10, count by (error_message) (webhook_processing_log{error_message!=\"\"}))",
"format": "table",
"instant": true,
"refId": "A"
}
],
"options": {
"showHeader": true
},
"fieldConfig": {
"defaults": {
"custom": {
"align": "auto",
"displayMode": "auto"
}
}
}
},
{
"id": 11,
"gridPos": { "h": 8, "w": 8, "x": 16, "y": 28 },
"type": "bargauge",
"title": "Database Size",
"targets": [
{
"expr": "webhook_database_size_bytes",
"refId": "A"
}
],
"options": {
"orientation": "horizontal",
"displayMode": "gradient",
"showUnfilled": true
},
"fieldConfig": {
"defaults": {
"unit": "decbytes",
"max": 10737418240,
"thresholds": {
"mode": "absolute",
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 5368709120 },
{ "color": "red", "value": 8589934592 }
]
}
}
}
},
{
"id": 12,
"gridPos": { "h": 10, "w": 24, "x": 0, "y": 36 },
"type": "logs",
"title": "Recent Processing Logs",
"targets": [
{
"expr": "{job=\"webhook-gateway\"} |= \"error\"",
"refId": "A"
}
],
"options": {
"showTime": true,
"showLabels": true,
"showCommonLabels": false,
"wrapLogMessage": true,
"prettifyLogMessage": false,
"enableLogDetails": true,
"dedupStrategy": "none",
"sortOrder": "Descending"
}
}
],
"links": [
{
"title": "Webhook Gateway API",
"url": "http://webhook-gateway:8000/docs",
"targetBlank": true
},
{
"title": "Alert Manager",
"url": "http://alertmanager:9093",
"targetBlank": true
}
],
"annotations": {
"list": [
{
"datasource": "Prometheus",
"enable": true,
"expr": "ALERTS{alertstate=\"firing\", component=\"webhook-gateway\"}",
"tagKeys": "severity,component",
"titleFormat": "{{ alertname }}",
"textFormat": "{{ description }}"
}
]
}
}
}
DuckDB 커스텀 익스포터
#!/usr/bin/env python3
"""
DuckDB Prometheus Exporter
DuckDB의 상세 메트릭을 Prometheus 형식으로 노출
"""
import time
import duckdb
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
from prometheus_client import REGISTRY
import logging
from typing import Dict, List, Tuple
import os
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DuckDBCollector:
"""DuckDB 메트릭 수집기"""
def __init__(self, db_path: str):
self.db_path = db_path
def collect(self):
"""Prometheus 메트릭 수집"""
try:
conn = duckdb.connect(self.db_path, read_only=True)
# 테이블별 행 수
yield from self._collect_table_metrics(conn)
# 웹훅 상태별 통계
yield from self._collect_webhook_metrics(conn)
# 처리 성능 메트릭
yield from self._collect_performance_metrics(conn)
# 라우팅 규칙 메트릭
yield from self._collect_routing_metrics(conn)
# 데이터베이스 통계
yield from self._collect_database_stats(conn)
conn.close()
except Exception as e:
logger.error(f"Error collecting metrics: {e}")
def _collect_table_metrics(self, conn) -> List:
"""테이블별 메트릭 수집"""
tables = [
'raw_webhooks',
'webhook_processing_log',
'routing_rules',
'webhook_metrics'
]
for table in tables:
try:
count = conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
gauge = GaugeMetricFamily(
f'duckdb_table_row_count',
f'Number of rows in table',
labels=['table']
)
gauge.add_metric([table], count)
yield gauge
# 테이블 크기 (추정)
size = conn.execute(f"""
SELECT
pg_size_pretty(pg_relation_size('{table}'))
FROM pg_tables
WHERE tablename = '{table}'
""").fetchone()
if size:
size_gauge = GaugeMetricFamily(
f'duckdb_table_size_bytes',
f'Estimated size of table',
labels=['table']
)
size_gauge.add_metric([table], self._parse_size(size[0]))
yield size_gauge
except Exception as e:
logger.debug(f"Could not get metrics for table {table}: {e}")
def _collect_webhook_metrics(self, conn) -> List:
"""웹훅 관련 메트릭"""
# 상태별 분포
status_dist = conn.execute("""
SELECT status, COUNT(*) as count
FROM raw_webhooks
GROUP BY status
""").fetchall()
status_gauge = GaugeMetricFamily(
'duckdb_webhook_status_count',
'Number of webhooks by status',
labels=['status']
)
for status, count in status_dist:
status_gauge.add_metric([status], count)
yield status_gauge
# 재시도 분포
retry_dist = conn.execute("""
SELECT retry_count, COUNT(*) as count
FROM raw_webhooks
GROUP BY retry_count
ORDER BY retry_count
""").fetchall()
retry_gauge = GaugeMetricFamily(
'duckdb_webhook_retry_distribution',
'Distribution of webhook retry counts',
labels=['retry_count']
)
for retry_count, count in retry_dist:
retry_gauge.add_metric([str(retry_count)], count)
yield retry_gauge
# 엔드포인트별 통계
endpoint_stats = conn.execute("""
SELECT
endpoint,
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed
FROM raw_webhooks
GROUP BY endpoint
LIMIT 20
""").fetchall()
for endpoint, total, completed, failed in endpoint_stats:
endpoint_gauge = GaugeMetricFamily(
'duckdb_webhook_endpoint_stats',
'Webhook statistics by endpoint',
labels=['endpoint', 'metric']
)
endpoint_gauge.add_metric([endpoint, 'total'], total)
endpoint_gauge.add_metric([endpoint, 'completed'], completed)
endpoint_gauge.add_metric([endpoint, 'failed'], failed)
yield endpoint_gauge
def _collect_performance_metrics(self, conn) -> List:
"""처리 성능 메트릭"""
# 시간대별 처리 통계
hourly_stats = conn.execute("""
SELECT
DATE_TRUNC('hour', processing_completed_at) as hour,
COUNT(*) as processed_count,
AVG(processing_duration_ms) as avg_duration,
MIN(processing_duration_ms) as min_duration,
MAX(processing_duration_ms) as max_duration,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY processing_duration_ms) as p50,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY processing_duration_ms) as p95,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY processing_duration_ms) as p99
FROM webhook_processing_log
WHERE processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '24 hours'
AND processing_duration_ms IS NOT NULL
GROUP BY hour
ORDER BY hour DESC
LIMIT 24
""").fetchall()
for hour, count, avg, min_d, max_d, p50, p95, p99 in hourly_stats:
hour_str = hour.strftime('%Y-%m-%d %H:00') if hour else 'unknown'
# 처리량
throughput_gauge = GaugeMetricFamily(
'duckdb_hourly_throughput',
'Webhooks processed per hour',
labels=['hour']
)
throughput_gauge.add_metric([hour_str], count)
yield throughput_gauge
# 지연 시간 백분위
for percentile, value in [('p50', p50), ('p95', p95), ('p99', p99)]:
if value:
latency_gauge = GaugeMetricFamily(
f'duckdb_processing_latency_{percentile}_ms',
f'{percentile} processing latency in milliseconds',
labels=['hour']
)
latency_gauge.add_metric([hour_str], value)
yield latency_gauge
# URL별 성능 통계
url_stats = conn.execute("""
SELECT
internal_url,
COUNT(*) as call_count,
AVG(processing_duration_ms) as avg_duration,
COUNT(*) FILTER (WHERE response_status >= 400) as error_count
FROM webhook_processing_log
WHERE processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY internal_url
""").fetchall()
for url, calls, avg_duration, errors in url_stats:
if url:
# 호출 횟수
call_gauge = GaugeMetricFamily(
'duckdb_internal_api_calls_hourly',
'Internal API calls in last hour',
labels=['url']
)
call_gauge.add_metric([url], calls)
yield call_gauge
# 평균 응답 시간
if avg_duration:
duration_gauge = GaugeMetricFamily(
'duckdb_internal_api_avg_duration_ms',
'Average duration for internal API calls',
labels=['url']
)
duration_gauge.add_metric([url], avg_duration)
yield duration_gauge
# 오류율
error_rate = (errors / calls * 100) if calls > 0 else 0
error_gauge = GaugeMetricFamily(
'duckdb_internal_api_error_rate',
'Error rate percentage for internal API',
labels=['url']
)
error_gauge.add_metric([url], error_rate)
yield error_gauge
def _collect_routing_metrics(self, conn) -> List:
"""라우팅 규칙 메트릭"""
# 활성 규칙 수
active_rules = conn.execute("""
SELECT COUNT(*) FROM routing_rules WHERE is_active = true
""").fetchone()[0]
rules_gauge = GaugeMetricFamily(
'duckdb_active_routing_rules',
'Number of active routing rules'
)
rules_gauge.add_metric([], active_rules)
yield rules_gauge
# 규칙별 매칭 통계
rule_stats = conn.execute("""
WITH rule_matches AS (
SELECT
r.rule_name,
COUNT(DISTINCT l.webhook_id) as matched_webhooks,
AVG(l.processing_duration_ms) as avg_duration,
MAX(l.processing_completed_at) as last_match
FROM routing_rules r
LEFT JOIN webhook_processing_log l ON l.internal_url = r.target_url
WHERE r.is_active = true
AND l.processing_completed_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY r.rule_name
)
SELECT * FROM rule_matches
""").fetchall()
for rule_name, matches, avg_duration, last_match in rule_stats:
# 매칭 수
match_gauge = GaugeMetricFamily(
'duckdb_routing_rule_matches_hourly',
'Number of webhooks matched by rule in last hour',
labels=['rule_name']
)
match_gauge.add_metric([rule_name], matches or 0)
yield match_gauge
# 평균 처리 시간
if avg_duration:
duration_gauge = GaugeMetricFamily(
'duckdb_routing_rule_avg_duration_ms',
'Average processing duration for rule',
labels=['rule_name']
)
duration_gauge.add_metric([rule_name], avg_duration)
yield duration_gauge
def _collect_database_stats(self, conn) -> List:
"""데이터베이스 전체 통계"""
# 데이터베이스 크기
db_size = os.path.getsize(self.db_path) if os.path.exists(self.db_path) else 0
size_gauge = GaugeMetricFamily(
'duckdb_database_size_bytes',
'Total database file size in bytes'
)
size_gauge.add_metric([], db_size)
yield size_gauge
# 메모리 사용량 (추정)
try:
memory_info = conn.execute("SELECT current_memory()").fetchone()
if memory_info:
memory_gauge = GaugeMetricFamily(
'duckdb_memory_usage_bytes',
'Estimated memory usage'
)
memory_gauge.add_metric([], memory_info[0])
yield memory_gauge
except:
pass
# 연결 정보
try:
connection_info = conn.execute("""
SELECT COUNT(*) FROM duckdb_connections()
""").fetchone()
if connection_info:
conn_gauge = GaugeMetricFamily(
'duckdb_active_connections',
'Number of active connections'
)
conn_gauge.add_metric([], connection_info[0])
yield conn_gauge
except:
pass
# 가장 오래된 미처리 웹훅
oldest_pending = conn.execute("""
SELECT
MIN(received_at) as oldest,
EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - MIN(received_at))) as age_seconds
FROM raw_webhooks
WHERE status = 'pending'
""").fetchone()
if oldest_pending and oldest_pending[1]:
age_gauge = GaugeMetricFamily(
'duckdb_oldest_pending_webhook_age_seconds',
'Age of oldest pending webhook in seconds'
)
age_gauge.add_metric([], oldest_pending[1])
yield age_gauge
# 일일 처리 통계
daily_stats = conn.execute("""
SELECT
DATE(created_at) as date,
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed' AND retry_count >= 3) as failed
FROM raw_webhooks
WHERE created_at > CURRENT_TIMESTAMP - INTERVAL '7 days'
GROUP BY date
ORDER BY date DESC
""").fetchall()
for date, total, completed, failed in daily_stats:
date_str = date.strftime('%Y-%m-%d') if date else 'unknown'
daily_gauge = GaugeMetricFamily(
'duckdb_daily_webhook_stats',
'Daily webhook statistics',
labels=['date', 'status']
)
daily_gauge.add_metric([date_str, 'total'], total)
daily_gauge.add_metric([date_str, 'completed'], completed)
daily_gauge.add_metric([date_str, 'failed'], failed)
yield daily_gauge
def _parse_size(self, size_str: str) -> int:
"""크기 문자열을 바이트로 변환"""
size_str = size_str.strip()
units = {
'B': 1,
'KB': 1024,
'MB': 1024 * 1024,
'GB': 1024 * 1024 * 1024,
'TB': 1024 * 1024 * 1024 * 1024
}
for unit, multiplier in units.items():
if size_str.endswith(unit):
try:
return int(float(size_str[:-len(unit)].strip()) * multiplier)
except:
return 0
return 0
def main():
"""메인 함수"""
# 환경 변수 읽기
db_path = os.environ.get('DUCKDB_PATH', '/data/webhook_gateway.db')
port = int(os.environ.get('EXPORTER_PORT', '9091'))
logger.info(f"Starting DuckDB exporter on port {port}")
logger.info(f"Database path: {db_path}")
# 커스텀 컬렉터 등록
REGISTRY.register(DuckDBCollector(db_path))
# HTTP 서버 시작
start_http_server(port)
logger.info("DuckDB exporter started successfully")
# 무한 대기
while True:
time.sleep(60)
if __name__ == '__main__':
main()
Docker Compose 통합 구성
version: '3.8'
services:
# 메인 웹훅 게이트웨이
webhook-gateway:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- DATABASE_PATH=/data/webhook_gateway.db
- LOG_LEVEL=INFO
- MAX_WORKERS=4
- BATCH_SIZE=100
- RETRY_DELAY=60
- MAX_RETRIES=3
volumes:
- ./data:/data
- ./logs:/logs
depends_on:
- internal-api
networks:
- webhook-net
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
labels:
- "prometheus.io/scrape=true"
- "prometheus.io/port=8000"
- "prometheus.io/path=/metrics"
# 내부 API (예시)
internal-api:
image: your-internal-api:latest
ports:
- "8080:8080"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/internal_db
networks:
- webhook-net
# DuckDB 전용 익스포터
duckdb-exporter:
build:
context: .
dockerfile: Dockerfile.exporter
ports:
- "9091:9091"
environment:
- DUCKDB_PATH=/data/webhook_gateway.db
- EXPORTER_PORT=9091
volumes:
- ./data:/data:ro
networks:
- webhook-net
depends_on:
- webhook-gateway
# Prometheus
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./alert_rules.yml:/etc/prometheus/alert_rules.yml
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/usr/share/prometheus/console_libraries'
- '--web.console.templates=/usr/share/prometheus/consoles'
- '--web.enable-lifecycle'
networks:
- webhook-net
# Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
- GF_INSTALL_PLUGINS=grafana-piechart-panel
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
- ./grafana/dashboards:/var/lib/grafana/dashboards
networks:
- webhook-net
depends_on:
- prometheus
# Alert Manager
alertmanager:
image: prom/alertmanager:latest
ports:
- "9093:9093"
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
- alertmanager-data:/alertmanager
command:
- '--config.file=/etc/alertmanager/alertmanager.yml'
- '--storage.path=/alertmanager'
networks:
- webhook-net
# Node Exporter (시스템 메트릭)
node-exporter:
image: prom/node-exporter:latest
ports:
- "9100:9100"
volumes:
- /proc:/host/proc:ro
- /sys:/host/sys:ro
- /:/rootfs:ro
command:
- '--path.procfs=/host/proc'
- '--path.sysfs=/host/sys'
- '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'
networks:
- webhook-net
# Loki (로그 수집)
loki:
image: grafana/loki:latest
ports:
- "3100:3100"
volumes:
- ./loki-config.yml:/etc/loki/local-config.yaml
- loki-data:/loki
command: -config.file=/etc/loki/local-config.yaml
networks:
- webhook-net
# Promtail (로그 전송)
promtail:
image: grafana/promtail:latest
volumes:
- ./logs:/var/log/webhook
- ./promtail-config.yml:/etc/promtail/config.yml
- /var/lib/docker/containers:/var/lib/docker/containers:ro
command: -config.file=/etc/promtail/config.yml
networks:
- webhook-net
# DuckDB CLI (관리용)
duckdb-cli:
image: python:3.11-slim
volumes:
- ./data:/data
- ./scripts:/scripts
command: tail -f /dev/null
networks:
- webhook-net
networks:
webhook-net:
driver: bridge
volumes:
prometheus-data:
grafana-data:
alertmanager-data:
loki-data:
Grafana 프로비저닝 설정
# grafana/provisioning/datasources/prometheus.yml
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true
jsonData:
timeInterval: "10s"
queryTimeout: "60s"
httpMethod: "POST"
- name: Loki
type: loki
access: proxy
url: http://loki:3100
editable: true
jsonData:
maxLines: 1000
---
# grafana/provisioning/dashboards/dashboards.yml
apiVersion: 1
providers:
- name: 'Webhook Gateway Dashboards'
orgId: 1
folder: 'Webhook Gateway'
type: file
disableDeletion: false
updateIntervalSeconds: 30
allowUiUpdates: true
options:
path: /var/lib/grafana/dashboards
---
# grafana/provisioning/alerting/alerts.yml
apiVersion: 1
groups:
- name: webhook_alerts
folder: "Webhook Gateway"
interval: 1m
rules:
- uid: webhook_queue_alert
title: High Webhook Queue Size
condition: webhook_queue_threshold
data:
- refId: A
queryType: ""
relativeTimeRange:
from: 300
to: 0
datasourceUid: prometheus
model:
expr: webhook_queue_size{status="pending"}
intervalMs: 1000
maxDataPoints: 43200
refId: A
noDataState: NoData
execErrState: Alerting
for: 5m
annotations:
description: "Webhook queue has {{ $values.A }} pending items"
runbook_url: "https://wiki.example.com/webhook-queue-high"
summary: "High webhook queue backlog detected"
labels:
severity: warning
team: platform
- uid: webhook_processing_stopped
title: Webhook Processing Stopped
condition: no_processing
data:
- refId: A
queryType: ""
relativeTimeRange:
from: 600
to: 0
datasourceUid: prometheus
model:
expr: |
rate(webhook_processed_total[5m]) == 0
AND webhook_queue_size{status="pending"} > 0
intervalMs: 1000
maxDataPoints: 43200
refId: A
noDataState: NoData
execErrState: Alerting
for: 5m
annotations:
description: "No webhooks processed in 5 minutes with pending queue"
runbook_url: "https://wiki.example.com/webhook-processing-stopped"
summary: "Webhook processing has stopped"
labels:
severity: critical
team: platform
---
# alertmanager.yml
global:
resolve_timeout: 5m
slack_api_url: '${SLACK_WEBHOOK_URL}'
route:
group_by: ['alertname', 'cluster', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 12h
receiver: 'default'
routes:
- match:
severity: critical
receiver: critical-alerts
continue: true
- match:
severity: warning
receiver: warning-alerts
receivers:
- name: 'default'
slack_configs:
- channel: '#alerts'
title: 'Webhook Gateway Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
- name: 'critical-alerts'
slack_configs:
- channel: '#critical-alerts'
title: '🚨 CRITICAL: Webhook Gateway'
text: |
{{ range .Alerts }}
*Alert:* {{ .Labels.alertname }}
*Summary:* {{ .Annotations.summary }}
*Description:* {{ .Annotations.description }}
*Severity:* {{ .Labels.severity }}
{{ end }}
pagerduty_configs:
- service_key: '${PAGERDUTY_SERVICE_KEY}'
description: '{{ .GroupLabels.alertname }}'
- name: 'warning-alerts'
slack_configs:
- channel: '#platform-alerts'
title: '⚠️ Warning: Webhook Gateway'
text: |
{{ range .Alerts }}
*Alert:* {{ .Labels.alertname }}
*Summary:* {{ .Annotations.summary }}
{{ end }}
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'cluster', 'service']
대시보드 활용 가이드
주요 모니터링 지표
- 실시간 처리 현황
- Queue Size: 현재 대기 중인 웹훅 수
- Processing Rate: 초당 처리량
- Success Rate: 성공률 (목표: 95% 이상)
- Active Processing: 현재 처리 중인 웹훅 수
- 성능 지표
- Queue Wait Time: 큐 대기 시간 히트맵
- Processing Duration: 규칙별 처리 시간
- Internal API Response: 내부 API 응답 시간
- Database Size: 데이터베이스 크기 추이
- 오류 추적
- Error Distribution: 오류 유형별 분포
- Failed Webhooks: 실패한 웹훅 추적
- Retry Statistics: 재시도 통계
알람 대응 매뉴얼
🚨 Critical Alerts
1. Webhook Processing Stopped
# 1. 프로세서 상태 확인
curl http://localhost:8000/health
# 2. 로그 확인
docker logs webhook-gateway --tail 100
# 3. 프로세서 재시작
docker restart webhook-gateway
# 4. 수동으로 pending 웹훅 처리
docker exec -it duckdb-cli python /scripts/manual_process.py
2. High Failure Rate (>10%)
-- 실패 원인 분석
SELECT
error_message,
COUNT(*) as count,
MIN(processing_started_at) as first_occurrence,
MAX(processing_started_at) as last_occurrence
FROM webhook_processing_log
WHERE error_message IS NOT NULL
AND processing_started_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
GROUP BY error_message
ORDER BY count DESC;
-- 특정 규칙 비활성화
UPDATE routing_rules
SET is_active = false
WHERE rule_name = 'problematic_rule';
⚠️ Warning Alerts
1. High Queue Backlog (>1000)
# 1. 워커 수 증가
docker scale webhook-gateway=3
# 2. 배치 크기 조정
docker exec webhook-gateway env BATCH_SIZE=200
# 3. 오래된 웹훅 정리
docker exec duckdb-cli duckdb /data/webhook_gateway.db \
"DELETE FROM raw_webhooks WHERE status = 'completed' AND created_at < CURRENT_TIMESTAMP - INTERVAL '7 days'"
성능 튜닝 가이드
DuckDB 최적화
-- 1. 통계 업데이트
ANALYZE raw_webhooks;
ANALYZE webhook_processing_log;
-- 2. 인덱스 추가
CREATE INDEX idx_processing_log_time
ON webhook_processing_log(processing_completed_at)
WHERE processing_completed_at IS NOT NULL;
-- 3. 파티셔닝 (월별)
CREATE TABLE raw_webhooks_2024_01
PARTITION OF raw_webhooks
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- 4. VACUUM 실행
VACUUM FULL;
메모리 설정
# 환경 변수 설정
DUCKDB_MEMORY_LIMIT=4GB
DUCKDB_THREADS=8
DUCKDB_TEMP_DIRECTORY=/tmp/duckdb
백업 및 복구
자동 백업 스크립트
#!/bin/bash
# backup.sh
# 변수 설정
BACKUP_DIR="/backups"
DB_PATH="/data/webhook_gateway.db"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
# 백업 실행
cp $DB_PATH "$BACKUP_DIR/webhook_gateway_$TIMESTAMP.db"
# S3 업로드 (선택적)
aws s3 cp "$BACKUP_DIR/webhook_gateway_$TIMESTAMP.db" \
s3://webhook-backups/daily/
# 오래된 백업 삭제 (30일 이상)
find $BACKUP_DIR -name "webhook_gateway_*.db" -mtime +30 -delete
권장사항
장점
- 완전한 관측성: 모든 단계가 메트릭으로 추적됨
- 선언적 관리: SQL로 비즈니스 로직 표현
- 빠른 디버깅: 상세한 로그와 메트릭으로 문제 추적 용이
- 확장 가능: 수평 확장 및 파티셔닝 지원
한계 및 대응
- 동시성 제한: Write 락으로 인한 병목
- 대응: 배치 처리, 읽기 전용 복제본
- 스케일 한계: 단일 노드 제약
- 대응: 샤딩, 여러 인스턴스 운영
- 복잡한 변환: SQL 표현력 한계
- 대응: Python UDF 활용
적용 시나리오
- ✅ 중소 규모 이벤트 처리 (초당 수백 건)
- ✅ 복잡한 라우팅 규칙이 필요한 경우
- ✅ 강력한 관측성이 필요한 경우
- ❌ 초고속 처리가 필요한 경우 (초당 수천 건 이상)
- ❌ 복잡한 상태 관리가 필요한 경우
이러한 구조로 DuckDB 기반 Webhook Gateway를 구현하면, 코드 중심이 아닌 데이터 중심의 유연하고 관측 가능한 시스템을 구축할 수 있습니다.
728x90
그리드형(광고전용)
댓글