본문 바로가기

실시간 데이터 백엔드 분석법 모듈형 플랫폼 Moose, ClickHouse, Redpanda 구축

728x90

 

기존 데이터 분석 백엔드 구축의 문제점

  • 복잡한 인프라: Kafka + ClickHouse + dbt + Airflow + API 서버 각각 관리
  • 스키마 불일치: 데이터베이스, API, 메시지 포맷 간 동기화 어려움
  • 개발 속도 저하: 로컬 테스트 환경 구축 복잡, 긴 피드백 사이클
  • SQL 중심 개발: 복잡한 비즈니스 로직을 SQL로만 처리하는 한계
300x250

Moose의 해결책: TypeScript/Python 코드 기반으로 전체 데이터 파이프라인을 통합 관리

Moose 아키텍처

┌─────────────────┐     ┌──────────────┐     ┌──────────────┐
│   Data Sources  │────▶│    Moose     │────▶│  ClickHouse  │
│  (API, Kafka)   │     │  Framework   │     │   (OLAP DB)  │
└─────────────────┘     └──────────────┘     └──────────────┘
                              │
                              ▼
                        ┌──────────────┐
                        │   Redpanda   │
                        │  (Optional)  │
                        └──────────────┘

ClickHouse 고급 활용법

1. Materialized View로 실시간 집계

Materialized View는 새 데이터 입력 시 자동으로 집계를 수행하여 별도 테이블에 저장합니다.

실전 예제: 실시간 사용자 활동 대시보드

-- 1. 원본 이벤트 테이블
CREATE TABLE user_events (
    user_id String,
    event_type String,
    properties Map(String, String),
    timestamp DateTime,
    session_id String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp, user_id);

-- 2. 시간별 집계 테이블
CREATE TABLE hourly_stats (
    hour DateTime,
    event_type String,
    unique_users UInt64,
    total_events UInt64,
    avg_session_duration Float32
) ENGINE = SummingMergeTree()
ORDER BY (hour, event_type);

-- 3. Materialized View 정의
CREATE MATERIALIZED VIEW mv_hourly_stats
TO hourly_stats
AS
SELECT
    toStartOfHour(timestamp) AS hour,
    event_type,
    uniqExact(user_id) AS unique_users,
    count() AS total_events,
    avg(dateDiff('second', 
        groupArrayMin(timestamp), 
        groupArrayMax(timestamp)
    )) AS avg_session_duration
FROM user_events
GROUP BY hour, event_type;

2. 고급 분석 쿼리 패턴

퍼널 분석 (Conversion Funnel)

WITH funnel_steps AS (
    SELECT
        user_id,
        groupArray(event_type) AS events,
        groupArray(timestamp) AS timestamps
    FROM user_events
    WHERE timestamp >= today() - 7
    GROUP BY user_id
)
SELECT
    countIf(has(events, 'page_view')) AS step1_users,
    countIf(has(events, 'page_view') AND has(events, 'add_to_cart')) AS step2_users,
    countIf(has(events, 'page_view') AND has(events, 'add_to_cart') AND has(events, 'purchase')) AS step3_users
FROM funnel_steps;

세션 분석

SELECT
    user_id,
    session_id,
    min(timestamp) AS session_start,
    max(timestamp) AS session_end,
    count() AS events_in_session,
    dateDiff('second', min(timestamp), max(timestamp)) AS session_duration
FROM user_events
WHERE timestamp >= today()
GROUP BY user_id, session_id
HAVING session_duration > 0
ORDER BY session_duration DESC
LIMIT 100;

3. 성능 최적화 기법

파티셔닝 전략

-- 월별 파티션 + 샤딩
CREATE TABLE distributed_events ON CLUSTER cluster_name (
    user_id String,
    event_type String,
    timestamp DateTime
) ENGINE = Distributed('cluster_name', 'default', 'local_events', xxHash64(user_id));

Redpanda 이벤트 스트리밍

1. Schema Registry 통합

스키마 정의 및 등록

# Avro 스키마 생성
cat > user_event.avsc << EOF
{
  "type": "record",
  "name": "UserEvent",
  "namespace": "com.example",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp", "type": "string"},
    {"name": "properties", "type": {"type": "map", "values": "string"}}
  ]
}
EOF

# Schema Registry에 등록
curl -X POST http://localhost:8081/subjects/user-events-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d "{\"schema\": \"$(cat user_event.avsc | jq -R -s .)\"}"

2. 프로듀서/컨슈머 패턴

Python 프로듀서 예제

from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import json

# Schema Registry 설정
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

# Avro 시리얼라이저 설정
avro_serializer = AvroSerializer(schema_registry_client, schema_str)

# 프로듀서 설정
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'client.id': 'python-producer'
})

# 이벤트 전송
event = {
    'user_id': 'u123',
    'event_type': 'page_view',
    'timestamp': '2025-07-23T10:00:00Z',
    'properties': {'page': '/home'}
}

producer.produce(
    topic='user-events',
    value=avro_serializer(event, SerializationContext('user-events', MessageField.VALUE))
)
producer.flush()

3. REST Proxy 활용

HTTP API로 이벤트 전송

# JSON 형식으로 이벤트 전송
curl -X POST http://localhost:8082/topics/user-events \
  -H "Content-Type: application/vnd.kafka.json.v2+json" \
  -d '{
    "records": [
      {
        "value": {
          "user_id": "u456",
          "event_type": "button_click",
          "timestamp": "2025-07-23T10:05:00Z",
          "properties": {"button_id": "submit-form"}
        }
      }
    ]
  }'

Grafana 시각화 통합

1. ClickHouse 데이터소스 설정

# grafana-datasources.yaml
apiVersion: 1
datasources:
  - name: ClickHouse
    type: vertamedia-clickhouse-datasource
    access: proxy
    url: http://clickhouse:8123
    basicAuth: true
    basicAuthUser: default
    secureJsonData:
      basicAuthPassword: ${CLICKHOUSE_PASSWORD}
    jsonData:
      defaultDatabase: default
      tlsSkipVerify: true

2. 대시보드 자동 생성

Python 스크립트로 대시보드 생성

import requests
import json

GRAFANA_URL = "http://localhost:3000"
API_KEY = "your-api-key"

dashboard = {
    "dashboard": {
        "title": "User Analytics Dashboard",
        "panels": [
            {
                "title": "Events per Hour",
                "type": "graph",
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": """
                        SELECT 
                            toStartOfHour(timestamp) AS time,
                            count() AS value,
                            event_type AS metric
                        FROM user_events
                        WHERE timestamp >= now() - INTERVAL 24 HOUR
                        GROUP BY time, event_type
                        ORDER BY time
                    """,
                    "format": "time_series"
                }],
                "gridPos": {"x": 0, "y": 0, "w": 12, "h": 8}
            },
            {
                "title": "Top Users",
                "type": "table",
                "datasource": "ClickHouse",
                "targets": [{
                    "rawSql": """
                        SELECT 
                            user_id,
                            count() AS event_count,
                            uniq(session_id) AS sessions
                        FROM user_events
                        WHERE timestamp >= today()
                        GROUP BY user_id
                        ORDER BY event_count DESC
                        LIMIT 10
                    """,
                    "format": "table"
                }],
                "gridPos": {"x": 12, "y": 0, "w": 12, "h": 8}
            }
        ]
    },
    "overwrite": True
}

response = requests.post(
    f"{GRAFANA_URL}/api/dashboards/db",
    headers={
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    },
    data=json.dumps(dashboard)
)

실전 프로젝트 예제

실시간 이커머스 분석 시스템 구축

1. Moose 프로젝트 구조

ecommerce-analytics/
├── models/
│   ├── user_event.py
│   ├── product.py
│   └── order.py
├── pipelines/
│   ├── event_enrichment.py
│   ├── fraud_detection.py
│   └── recommendation.py
├── flows/
│   └── daily_aggregation.py
├── api/
│   └── analytics_api.py
├── moose.yaml
└── requirements.txt

2. 모델 정의

# models/user_event.py
from moose import model
from datetime import datetime
from typing import Optional, Dict

@model
class UserEvent:
    event_id: str
    user_id: str
    session_id: str
    event_type: str  # page_view, add_to_cart, purchase
    product_id: Optional[str]
    timestamp: datetime
    properties: Dict[str, str]

    class Config:
        # ClickHouse 테이블 설정
        engine = "MergeTree"
        partition_by = "toYYYYMM(timestamp)"
        order_by = ["timestamp", "user_id"]
        ttl = "timestamp + INTERVAL 90 DAY"

3. 실시간 파이프라인

# pipelines/event_enrichment.py
from moose import pipeline, event_source
from models import UserEvent, Product
import redis

r = redis.Redis(host='localhost', port=6379)

@pipeline
@event_source(topic="raw.events")
async def enrich_user_event(event: UserEvent):
    # 제품 정보 추가
    if event.product_id:
        product_info = await get_product_info(event.product_id)
        event.properties.update({
            'product_name': product_info.name,
            'product_category': product_info.category,
            'product_price': str(product_info.price)
        })

    # 사용자 세그먼트 추가
    user_segment = r.get(f"user_segment:{event.user_id}")
    if user_segment:
        event.properties['user_segment'] = user_segment.decode()

    # 실시간 이상 탐지
    if event.event_type == 'purchase' and is_fraudulent(event):
        await send_alert(event)

    return event  # ClickHouse에 자동 저장

def is_fraudulent(event: UserEvent) -> bool:
    # 간단한 규칙 기반 이상 탐지
    price = float(event.properties.get('product_price', 0))
    return price > 10000  # 예시: 고액 거래 탐지

4. 배치 집계 워크플로

# flows/daily_aggregation.py
from moose import flow, schedule
from datetime import datetime, timedelta

@flow
@schedule(cron="0 2 * * *")  # 매일 새벽 2시 실행
async def daily_user_metrics():
    yesterday = datetime.now() - timedelta(days=1)

    # 일별 사용자 메트릭 계산
    query = """
    INSERT INTO daily_user_metrics
    SELECT
        toDate(timestamp) AS date,
        user_id,
        count() AS total_events,
        countIf(event_type = 'page_view') AS page_views,
        countIf(event_type = 'purchase') AS purchases,
        sumIf(toFloat64(properties['product_price']), event_type = 'purchase') AS revenue
    FROM user_events
    WHERE toDate(timestamp) = yesterday()
    GROUP BY date, user_id
    """

    await execute_clickhouse_query(query)

    # 코호트 분석 업데이트
    await update_cohort_analysis(yesterday)

5. API 엔드포인트

# api/analytics_api.py
from moose import api, get_clickhouse_client
from fastapi import Query
from datetime import datetime

@api.get("/analytics/funnel")
async def get_conversion_funnel(
    start_date: datetime = Query(...),
    end_date: datetime = Query(...)
):
    ch = get_clickhouse_client()

    query = """
    WITH funnel_data AS (
        SELECT
            user_id,
            groupArray(event_type) AS events,
            groupArray(timestamp) AS timestamps
        FROM user_events
        WHERE timestamp BETWEEN %(start_date)s AND %(end_date)s
        GROUP BY user_id
    )
    SELECT
        'Viewed Product' AS step,
        countIf(has(events, 'page_view')) AS users
    FROM funnel_data
    UNION ALL
    SELECT
        'Added to Cart' AS step,
        countIf(has(events, 'page_view') AND has(events, 'add_to_cart')) AS users
    FROM funnel_data
    UNION ALL
    SELECT
        'Completed Purchase' AS step,
        countIf(has(events, 'page_view') AND has(events, 'add_to_cart') AND has(events, 'purchase')) AS users
    FROM funnel_data
    """

    results = ch.execute(query, {'start_date': start_date, 'end_date': end_date})

    return {
        "funnel": [
            {"step": row[0], "users": row[1], "conversion_rate": row[1] / results[0][1] if results[0][1] > 0 else 0}
            for row in results
        ]
    }

보안 및 운영 가이드

1. 인프라 보안 설정

ClickHouse 보안 강화

<!-- users.xml -->
<clickhouse>
    <users>
        <moose_app>
            <password_sha256_hex>...</password_sha256_hex>
            <networks>
                <ip>10.0.0.0/8</ip>
            </networks>
            <profile>restricted</profile>
            <quota>default</quota>
            <access_management>1</access_management>
        </moose_app>
    </users>

    <profiles>
        <restricted>
            <max_memory_usage>10000000000</max_memory_usage>
            <max_execution_time>300</max_execution_time>
            <readonly>0</readonly>
        </restricted>
    </profiles>
</clickhouse>

Redpanda SASL 인증 설정

# redpanda.yaml
redpanda:
  kafka_api:
    - address: 0.0.0.0
      port: 9092
      authentication_method: sasl

  kafka_api_tls:
    - enabled: true
      cert_file: /etc/redpanda/certs/broker.crt
      key_file: /etc/redpanda/certs/broker.key

superusers:
  - admin

sasl:
  enabled: true
  mechanism: SCRAM-SHA-256

2. 모니터링 및 알림

Prometheus 메트릭 수집

# prometheus.yml
scrape_configs:
  - job_name: 'clickhouse'
    static_configs:
      - targets: ['clickhouse:9363']

  - job_name: 'redpanda'
    static_configs:
      - targets: ['redpanda:9644']

  - job_name: 'moose'
    static_configs:
      - targets: ['moose:8080']

알림 규칙 설정

# alerting_rules.yml
groups:
  - name: data_pipeline
    rules:
      - alert: HighEventLatency
        expr: histogram_quantile(0.95, event_processing_duration_seconds) > 5
        for: 5m
        annotations:
          summary: "Event processing latency is high"

      - alert: ClickHouseStorageFull
        expr: clickhouse_disk_usage_percent > 85
        for: 10m
        annotations:
          summary: "ClickHouse storage usage above 85%"

3. 데이터 거버넌스

데이터 보존 정책

-- TTL 기반 자동 삭제
ALTER TABLE user_events
MODIFY TTL timestamp + INTERVAL 90 DAY DELETE;

-- 민감 정보 마스킹
CREATE VIEW user_events_masked AS
SELECT
    user_id,
    event_type,
    timestamp,
    mapApply((k, v) -> 
        if(k IN ('email', 'phone'), '***', v), 
        properties
    ) AS properties
FROM user_events;

4. 성능 최적화 체크리스트

ClickHouse 최적화

  • 적절한 파티션 키 설정 (일반적으로 날짜 기반)
  • ORDER BY 절에 자주 사용되는 필터 컬럼 포함
  • Materialized View로 자주 사용되는 집계 사전 계산
  • 압축 코덱 최적화 (CODEC(ZSTD), DoubleDelta 등)

Redpanda 최적화

  • 토픽별 적절한 파티션 수 설정
  • 리텐션 정책 설정 (retention.ms, retention.bytes)
  • 압축 설정 (compression.type: snappy)
  • 프로듀서 배치 설정 최적화

Moose 파이프라인 최적화

  • 비동기 처리 활용 (async/await)
  • 배치 처리 구현 (대량 데이터 처리 시)
  • 캐싱 전략 수립 (Redis, 메모리 캐시)
  • 에러 핸들링 및 재시도 로직

5. 운영 자동화

CI/CD 파이프라인

# .github/workflows/deploy.yml
name: Deploy Moose Application

on:
  push:
    branches: [main]

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2

      - name: Run Tests
        run: |
          moose test

      - name: Build and Deploy
        run: |
          moose build
          moose deploy --env production

      - name: Update Grafana Dashboards
        run: |
          python scripts/update_dashboards.py

Moose 프레임워크의 장점

  1. 통합 개발 경험: TypeScript/Python으로 전체 파이프라인 관리
  2. 자동 스키마 동기화: 모델 → DB → API 자동 연동
  3. 실시간 개발: 핫 리로드로 즉각적인 피드백
  4. 확장성: 필요한 컴포넌트만 선택적 활용

성공적인 구축을 위한 권장사항

  1. 단계적 접근: 간단한 파이프라인부터 시작하여 점진적 확장
  2. 모니터링 우선: 처음부터 메트릭과 로그 수집 체계 구축
  3. 테스트 자동화: 데이터 품질 검증을 위한 자동화된 테스트
  4. 문서화: 데이터 모델과 파이프라인 로직 문서화

Moose 기반의 강력하고 확장 가능한 데이터 분석 백엔드를 구축할 수 있습니다. 각 예제는 실제 프로젝트에 참고할 수 있도록 작성되었으며, 보안과 운영 관점까지 고려한 접근 방법을 결정하시기 바랍니다.

728x90
그리드형(광고전용)

댓글