728x90
기존 데이터 분석 백엔드 구축의 문제점
- 복잡한 인프라: Kafka + ClickHouse + dbt + Airflow + API 서버 각각 관리
- 스키마 불일치: 데이터베이스, API, 메시지 포맷 간 동기화 어려움
- 개발 속도 저하: 로컬 테스트 환경 구축 복잡, 긴 피드백 사이클
- SQL 중심 개발: 복잡한 비즈니스 로직을 SQL로만 처리하는 한계
300x250
Moose의 해결책: TypeScript/Python 코드 기반으로 전체 데이터 파이프라인을 통합 관리
Moose 아키텍처
┌─────────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Sources │────▶│ Moose │────▶│ ClickHouse │
│ (API, Kafka) │ │ Framework │ │ (OLAP DB) │
└─────────────────┘ └──────────────┘ └──────────────┘
│
▼
┌──────────────┐
│ Redpanda │
│ (Optional) │
└──────────────┘
ClickHouse 고급 활용법
1. Materialized View로 실시간 집계
Materialized View는 새 데이터 입력 시 자동으로 집계를 수행하여 별도 테이블에 저장합니다.
실전 예제: 실시간 사용자 활동 대시보드
-- 1. 원본 이벤트 테이블
CREATE TABLE user_events (
user_id String,
event_type String,
properties Map(String, String),
timestamp DateTime,
session_id String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, user_id);
-- 2. 시간별 집계 테이블
CREATE TABLE hourly_stats (
hour DateTime,
event_type String,
unique_users UInt64,
total_events UInt64,
avg_session_duration Float32
) ENGINE = SummingMergeTree()
ORDER BY (hour, event_type);
-- 3. Materialized View 정의
CREATE MATERIALIZED VIEW mv_hourly_stats
TO hourly_stats
AS
SELECT
toStartOfHour(timestamp) AS hour,
event_type,
uniqExact(user_id) AS unique_users,
count() AS total_events,
avg(dateDiff('second',
groupArrayMin(timestamp),
groupArrayMax(timestamp)
)) AS avg_session_duration
FROM user_events
GROUP BY hour, event_type;
2. 고급 분석 쿼리 패턴
퍼널 분석 (Conversion Funnel)
WITH funnel_steps AS (
SELECT
user_id,
groupArray(event_type) AS events,
groupArray(timestamp) AS timestamps
FROM user_events
WHERE timestamp >= today() - 7
GROUP BY user_id
)
SELECT
countIf(has(events, 'page_view')) AS step1_users,
countIf(has(events, 'page_view') AND has(events, 'add_to_cart')) AS step2_users,
countIf(has(events, 'page_view') AND has(events, 'add_to_cart') AND has(events, 'purchase')) AS step3_users
FROM funnel_steps;
세션 분석
SELECT
user_id,
session_id,
min(timestamp) AS session_start,
max(timestamp) AS session_end,
count() AS events_in_session,
dateDiff('second', min(timestamp), max(timestamp)) AS session_duration
FROM user_events
WHERE timestamp >= today()
GROUP BY user_id, session_id
HAVING session_duration > 0
ORDER BY session_duration DESC
LIMIT 100;
3. 성능 최적화 기법
파티셔닝 전략
-- 월별 파티션 + 샤딩
CREATE TABLE distributed_events ON CLUSTER cluster_name (
user_id String,
event_type String,
timestamp DateTime
) ENGINE = Distributed('cluster_name', 'default', 'local_events', xxHash64(user_id));
Redpanda 이벤트 스트리밍
1. Schema Registry 통합
스키마 정의 및 등록
# Avro 스키마 생성
cat > user_event.avsc << EOF
{
"type": "record",
"name": "UserEvent",
"namespace": "com.example",
"fields": [
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": "string"},
{"name": "timestamp", "type": "string"},
{"name": "properties", "type": {"type": "map", "values": "string"}}
]
}
EOF
# Schema Registry에 등록
curl -X POST http://localhost:8081/subjects/user-events-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": \"$(cat user_event.avsc | jq -R -s .)\"}"
2. 프로듀서/컨슈머 패턴
Python 프로듀서 예제
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json
# Schema Registry 설정
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Avro 시리얼라이저 설정
avro_serializer = AvroSerializer(schema_registry_client, schema_str)
# 프로듀서 설정
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'client.id': 'python-producer'
})
# 이벤트 전송
event = {
'user_id': 'u123',
'event_type': 'page_view',
'timestamp': '2025-07-23T10:00:00Z',
'properties': {'page': '/home'}
}
producer.produce(
topic='user-events',
value=avro_serializer(event, SerializationContext('user-events', MessageField.VALUE))
)
producer.flush()
3. REST Proxy 활용
HTTP API로 이벤트 전송
# JSON 형식으로 이벤트 전송
curl -X POST http://localhost:8082/topics/user-events \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-d '{
"records": [
{
"value": {
"user_id": "u456",
"event_type": "button_click",
"timestamp": "2025-07-23T10:05:00Z",
"properties": {"button_id": "submit-form"}
}
}
]
}'
Grafana 시각화 통합
1. ClickHouse 데이터소스 설정
# grafana-datasources.yaml
apiVersion: 1
datasources:
- name: ClickHouse
type: vertamedia-clickhouse-datasource
access: proxy
url: http://clickhouse:8123
basicAuth: true
basicAuthUser: default
secureJsonData:
basicAuthPassword: ${CLICKHOUSE_PASSWORD}
jsonData:
defaultDatabase: default
tlsSkipVerify: true
2. 대시보드 자동 생성
Python 스크립트로 대시보드 생성
import requests
import json
GRAFANA_URL = "http://localhost:3000"
API_KEY = "your-api-key"
dashboard = {
"dashboard": {
"title": "User Analytics Dashboard",
"panels": [
{
"title": "Events per Hour",
"type": "graph",
"datasource": "ClickHouse",
"targets": [{
"rawSql": """
SELECT
toStartOfHour(timestamp) AS time,
count() AS value,
event_type AS metric
FROM user_events
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY time, event_type
ORDER BY time
""",
"format": "time_series"
}],
"gridPos": {"x": 0, "y": 0, "w": 12, "h": 8}
},
{
"title": "Top Users",
"type": "table",
"datasource": "ClickHouse",
"targets": [{
"rawSql": """
SELECT
user_id,
count() AS event_count,
uniq(session_id) AS sessions
FROM user_events
WHERE timestamp >= today()
GROUP BY user_id
ORDER BY event_count DESC
LIMIT 10
""",
"format": "table"
}],
"gridPos": {"x": 12, "y": 0, "w": 12, "h": 8}
}
]
},
"overwrite": True
}
response = requests.post(
f"{GRAFANA_URL}/api/dashboards/db",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
},
data=json.dumps(dashboard)
)
실전 프로젝트 예제
실시간 이커머스 분석 시스템 구축
1. Moose 프로젝트 구조
ecommerce-analytics/
├── models/
│ ├── user_event.py
│ ├── product.py
│ └── order.py
├── pipelines/
│ ├── event_enrichment.py
│ ├── fraud_detection.py
│ └── recommendation.py
├── flows/
│ └── daily_aggregation.py
├── api/
│ └── analytics_api.py
├── moose.yaml
└── requirements.txt
2. 모델 정의
# models/user_event.py
from moose import model
from datetime import datetime
from typing import Optional, Dict
@model
class UserEvent:
event_id: str
user_id: str
session_id: str
event_type: str # page_view, add_to_cart, purchase
product_id: Optional[str]
timestamp: datetime
properties: Dict[str, str]
class Config:
# ClickHouse 테이블 설정
engine = "MergeTree"
partition_by = "toYYYYMM(timestamp)"
order_by = ["timestamp", "user_id"]
ttl = "timestamp + INTERVAL 90 DAY"
3. 실시간 파이프라인
# pipelines/event_enrichment.py
from moose import pipeline, event_source
from models import UserEvent, Product
import redis
r = redis.Redis(host='localhost', port=6379)
@pipeline
@event_source(topic="raw.events")
async def enrich_user_event(event: UserEvent):
# 제품 정보 추가
if event.product_id:
product_info = await get_product_info(event.product_id)
event.properties.update({
'product_name': product_info.name,
'product_category': product_info.category,
'product_price': str(product_info.price)
})
# 사용자 세그먼트 추가
user_segment = r.get(f"user_segment:{event.user_id}")
if user_segment:
event.properties['user_segment'] = user_segment.decode()
# 실시간 이상 탐지
if event.event_type == 'purchase' and is_fraudulent(event):
await send_alert(event)
return event # ClickHouse에 자동 저장
def is_fraudulent(event: UserEvent) -> bool:
# 간단한 규칙 기반 이상 탐지
price = float(event.properties.get('product_price', 0))
return price > 10000 # 예시: 고액 거래 탐지
4. 배치 집계 워크플로
# flows/daily_aggregation.py
from moose import flow, schedule
from datetime import datetime, timedelta
@flow
@schedule(cron="0 2 * * *") # 매일 새벽 2시 실행
async def daily_user_metrics():
yesterday = datetime.now() - timedelta(days=1)
# 일별 사용자 메트릭 계산
query = """
INSERT INTO daily_user_metrics
SELECT
toDate(timestamp) AS date,
user_id,
count() AS total_events,
countIf(event_type = 'page_view') AS page_views,
countIf(event_type = 'purchase') AS purchases,
sumIf(toFloat64(properties['product_price']), event_type = 'purchase') AS revenue
FROM user_events
WHERE toDate(timestamp) = yesterday()
GROUP BY date, user_id
"""
await execute_clickhouse_query(query)
# 코호트 분석 업데이트
await update_cohort_analysis(yesterday)
5. API 엔드포인트
# api/analytics_api.py
from moose import api, get_clickhouse_client
from fastapi import Query
from datetime import datetime
@api.get("/analytics/funnel")
async def get_conversion_funnel(
start_date: datetime = Query(...),
end_date: datetime = Query(...)
):
ch = get_clickhouse_client()
query = """
WITH funnel_data AS (
SELECT
user_id,
groupArray(event_type) AS events,
groupArray(timestamp) AS timestamps
FROM user_events
WHERE timestamp BETWEEN %(start_date)s AND %(end_date)s
GROUP BY user_id
)
SELECT
'Viewed Product' AS step,
countIf(has(events, 'page_view')) AS users
FROM funnel_data
UNION ALL
SELECT
'Added to Cart' AS step,
countIf(has(events, 'page_view') AND has(events, 'add_to_cart')) AS users
FROM funnel_data
UNION ALL
SELECT
'Completed Purchase' AS step,
countIf(has(events, 'page_view') AND has(events, 'add_to_cart') AND has(events, 'purchase')) AS users
FROM funnel_data
"""
results = ch.execute(query, {'start_date': start_date, 'end_date': end_date})
return {
"funnel": [
{"step": row[0], "users": row[1], "conversion_rate": row[1] / results[0][1] if results[0][1] > 0 else 0}
for row in results
]
}
보안 및 운영 가이드
1. 인프라 보안 설정
ClickHouse 보안 강화
<!-- users.xml -->
<clickhouse>
<users>
<moose_app>
<password_sha256_hex>...</password_sha256_hex>
<networks>
<ip>10.0.0.0/8</ip>
</networks>
<profile>restricted</profile>
<quota>default</quota>
<access_management>1</access_management>
</moose_app>
</users>
<profiles>
<restricted>
<max_memory_usage>10000000000</max_memory_usage>
<max_execution_time>300</max_execution_time>
<readonly>0</readonly>
</restricted>
</profiles>
</clickhouse>
Redpanda SASL 인증 설정
# redpanda.yaml
redpanda:
kafka_api:
- address: 0.0.0.0
port: 9092
authentication_method: sasl
kafka_api_tls:
- enabled: true
cert_file: /etc/redpanda/certs/broker.crt
key_file: /etc/redpanda/certs/broker.key
superusers:
- admin
sasl:
enabled: true
mechanism: SCRAM-SHA-256
2. 모니터링 및 알림
Prometheus 메트릭 수집
# prometheus.yml
scrape_configs:
- job_name: 'clickhouse'
static_configs:
- targets: ['clickhouse:9363']
- job_name: 'redpanda'
static_configs:
- targets: ['redpanda:9644']
- job_name: 'moose'
static_configs:
- targets: ['moose:8080']
알림 규칙 설정
# alerting_rules.yml
groups:
- name: data_pipeline
rules:
- alert: HighEventLatency
expr: histogram_quantile(0.95, event_processing_duration_seconds) > 5
for: 5m
annotations:
summary: "Event processing latency is high"
- alert: ClickHouseStorageFull
expr: clickhouse_disk_usage_percent > 85
for: 10m
annotations:
summary: "ClickHouse storage usage above 85%"
3. 데이터 거버넌스
데이터 보존 정책
-- TTL 기반 자동 삭제
ALTER TABLE user_events
MODIFY TTL timestamp + INTERVAL 90 DAY DELETE;
-- 민감 정보 마스킹
CREATE VIEW user_events_masked AS
SELECT
user_id,
event_type,
timestamp,
mapApply((k, v) ->
if(k IN ('email', 'phone'), '***', v),
properties
) AS properties
FROM user_events;
4. 성능 최적화 체크리스트
✅ ClickHouse 최적화
- 적절한 파티션 키 설정 (일반적으로 날짜 기반)
- ORDER BY 절에 자주 사용되는 필터 컬럼 포함
- Materialized View로 자주 사용되는 집계 사전 계산
- 압축 코덱 최적화 (
CODEC(ZSTD)
,DoubleDelta
등)
✅ Redpanda 최적화
- 토픽별 적절한 파티션 수 설정
- 리텐션 정책 설정 (
retention.ms
,retention.bytes
) - 압축 설정 (
compression.type: snappy
) - 프로듀서 배치 설정 최적화
✅ Moose 파이프라인 최적화
- 비동기 처리 활용 (
async/await
) - 배치 처리 구현 (대량 데이터 처리 시)
- 캐싱 전략 수립 (Redis, 메모리 캐시)
- 에러 핸들링 및 재시도 로직
5. 운영 자동화
CI/CD 파이프라인
# .github/workflows/deploy.yml
name: Deploy Moose Application
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run Tests
run: |
moose test
- name: Build and Deploy
run: |
moose build
moose deploy --env production
- name: Update Grafana Dashboards
run: |
python scripts/update_dashboards.py
Moose 프레임워크의 장점
- 통합 개발 경험: TypeScript/Python으로 전체 파이프라인 관리
- 자동 스키마 동기화: 모델 → DB → API 자동 연동
- 실시간 개발: 핫 리로드로 즉각적인 피드백
- 확장성: 필요한 컴포넌트만 선택적 활용
성공적인 구축을 위한 권장사항
- 단계적 접근: 간단한 파이프라인부터 시작하여 점진적 확장
- 모니터링 우선: 처음부터 메트릭과 로그 수집 체계 구축
- 테스트 자동화: 데이터 품질 검증을 위한 자동화된 테스트
- 문서화: 데이터 모델과 파이프라인 로직 문서화
Moose 기반의 강력하고 확장 가능한 데이터 분석 백엔드를 구축할 수 있습니다. 각 예제는 실제 프로젝트에 참고할 수 있도록 작성되었으며, 보안과 운영 관점까지 고려한 접근 방법을 결정하시기 바랍니다.
728x90
그리드형(광고전용)
댓글