본문 바로가기

AI 기반 선언적 로그 분석 플랫폼: 보안 인텔리전스 자동화의 새로운 접근

728x90

“정규식·파서 없이 결과 스키마만 선언하면, LLM이 보안 이벤트/이상징후를 구조화 JSON으로 변환 → SIEM 대시보드/알림/IR까지 자동화”

  • “Declarative Log Analysis with LLM — 보안 이벤트 자동 탐지와 SIEM 통합”
  • “LLM-Powered Log Analysis: 구조화된 보안 이벤트 추출과 운영 자동화”
  • “로그는 말한다, AI가 해석한다 — Declarative Security Log Analyzer”
  • “Regex 없는 로그 분석, 선언만 하면 AI가 처리한다”
  • “Log Analysis Reinvented: Declarative Extraction + LLM”
  • “보안 로그 분석의 패러다임 전환 — LLM 선언적 추출과 SIEM 연동”
  • “LLM 기반 SOC 자동화: Wazuh, TheHive, Elastic Defend과 함께하는 로그 인텔리전스”
  • “RAG + Declarative Extraction: 지식 기반 보안 로그 분석의 미래”

목표와 핵심 아이디어

  • Declarative Extraction(선언적 추출): “무엇을 뽑을지”만 Pydantic 스키마로 선언 → LLM이 각 로그를 해석해 스키마 적합 JSON 출력
  • 표준화된 출력: ECS(Elastic Common Schema) 친화 필드명, 신뢰도(confidence), 근거(evidence) 포함
  • 확장성: 로그 타입(Nginx, Apache, Postfix, MySQL, VPN 등) 추가는 스키마+프롬프트만 작성
  • 통합: Elasticsearch/Kibana 시각화, 텔레그램/Slack 알림, Wazuh/Elastic Defend 룰 제안, TheHive 케이스 자동 생성
  • 보안: 온프레미스 LLM(Ollama/vLLM) 우선, 프롬프트 가드레일, 출력 검증, 감사 가능(버전 고정/GitOps)

아키텍처(논리)

[Log Sources]
  ├─ Files (/var/log/*), Journald, Syslog, S3, HTTP, DB, SSH Remote
  └─ Taps (Kafka, Filebeat/Logstash, Vector/Fluent Bit)

[Collector & Router]
  ├─ Batch Reader / Live Tailer (sampling, windowing)
  └─ Enricher(GeoIP, DNS, WHOIS, Asset CMDB)

[LLM Analysis Engine]
  ├─ Schema (Pydantic models)  ← Declarative Extraction
  ├─ Prompt Templates (few-shot, guardrails)
  ├─ Provider (OpenAI / Ollama / vLLM, retry, rate-limit)
  └─ Validator (JSON schema, Pydantic strict, fallback rules)

[Post-Processor]
  ├─ Scorer (severity, confidence)
  ├─ Normalizer (ECS mapping)
  └─ Deduper/Correlator (IP/user/host/time)

[Outputs]
  ├─ Elasticsearch (ILM/Index Template, Kibana dashboards)
  ├─ Alerts (Telegram/Slack, email)
  ├─ IR (TheHive Webhook → Case/Task/Artifact)
  └─ Controls (Wazuh/Elastic Defend rule/tag proposal)

[Governance/Ops]
  ├─ Config & Secrets (YAML + dotenv, Vault 권장)
  ├─ Metrics/Tracing (OpenTelemetry, Prometheus)
  └─ GitOps (prompt+schema+rules 버전관리)

구성요소 설계

1. 스키마 기반 선언적 추출 (핵심)

# schemas/http_access.py
from pydantic import BaseModel, Field, AnyHttpUrl
from typing import Literal

class AccessEvent(BaseModel):
    @timestamp: str                     # ISO8601
    src_ip: str                         # ip
    method: Literal["GET","POST","PUT","DELETE","HEAD","OPTIONS"]
    url: str | AnyHttpUrl
    status: int                         # 100~599
    user_agent: str | None = None
    attack_type: Literal["SQLI","XSS","PATH_TRAVERSAL","BRUTE_FORCE","NONE"] = "NONE"
    is_attack: bool = False
    confidence: float = Field(ge=0, le=1)
    evidence: list[str] = []            # 원문 스니펫(민감정보 마스킹)
    tags: list[str] = []                # ecs.event.category 등
    ecs: dict = {}                      # 확장 필드(ecs.*)
# engine/extract.py (요지)
def llm_extract(log_line: str, schema: type[BaseModel], provider) -> schema:
    prompt = build_prompt(schema, log_line)           # 스키마 필드/타입을 포함한 지시문
    raw = provider.complete(prompt, response_format="json")  # 함수콜/JSON 모드 권장
    obj = schema.model_validate_json(raw)             # 엄격 검증
    return obj
  • 핵심 포인트
    • 모델이 “추측”하기 쉬운 필드는 enum/regex/range강제
    • evidence에 근거 스니펫을 담아 감사 가능성 확보
    • 실패 시: 격리 인덱스(logai-deadletter)로 원문+에러 저장

2. LLM 프로바이더(로컬 우선)

# providers/base.py
class LLMProvider(Protocol):
    def complete(self, prompt: str, response_format: str="json") -> str: ...

# providers/ollama.py (로컬)
class OllamaProvider:
    def __init__(self, model="qwen2.5:7b-instruct", host="http://127.0.0.1:11434"):
        ...
    def complete(self, prompt, response_format="json"):
        # JSON mode 지원 모델 권장, 미지원 시 JSON repair 로직 포함
        ...

# providers/openai.py (외부 — 필요시)
  • 보안: 기본은 로컬 모델. 외부 호출 시 PII 마스킹, 프록시/이그레스 제어

3. 수집/라이브 모니터링

# 배치
logai run --analyzer http_access --log-path /var/log/nginx/access.log

# 실시간(샘플링 1/10)
logai tail --analyzer linux_system --log-path /var/log/messages --sample 0.1

# 원격 SSH
logai tail --analyzer linux_system --ssh user@host:22 --log-path /var/log/messages
  • 옵션: --window-size, --max-qps, --retry, --timeout, --jsonl-out, --es-url

4. 데이터 풍부화(Enrichment)

  • GeoIP: MaxMind GeoLite → src_geo.location(geo_point), country, asn
  • 자산/계정정보: 내부 CMDB/API → host.owner, dept, criticality
  • RAG(아래 7장): 플레이북/위협 지식으로 판단 근거 강화

5. 저장/시각화(Elasticsearch/Kibana)

PUT _index_template/logai_template
{
  "index_patterns": ["logai-*"],
  "template": {
    "settings": { "index.lifecycle.name": "logai-ilm" },
    "mappings": {
      "properties": {
        "@timestamp": {"type":"date"},
        "src_ip": {"type":"ip"},
        "src_geo.location": {"type":"geo_point"},
        "attack_type": {"type":"keyword"},
        "confidence": {"type":"float"},
        "tags": {"type":"keyword"}
      }
    }
  }
}
  • Kibana Lens/Maps로 탐지 추이, 지리 분포, 최다 공격 IP/URL, 자주 발생하는 이벤트 시각화

로그 타입별 분석기 예시

1. Nginx Access

# analyzers/nginx_access.py
class NginxAccess(AccessEvent):  # 앞서 정의한 AccessEvent 상속
    referer: str | None = None
    bytes: int | None = None
    upstream_status: int | None = None
  • 탐지 힌트: "/wp-admin", "/phpmyadmin", union select, "<script>", ..%2F 등 패턴은 근거(evidence)에 반드시 포함

2. Postfix(메일)

class PostfixEvent(BaseModel):
    @timestamp: str
    host: str
    queue_id: str | None
    event: Literal["CONNECT","DISCONNECT","REJECT","DELAY","BOUNCE","DELIVER"]
    src_ip: str | None
    rcpt: str | None
    reason: str | None
    is_abuse: bool = False
    confidence: float
  • 탐지: 대량 실패(REJECT 급증), 해외 IP 스팸 패턴, SMTP AUTH 실패 브루트포스

3. MySQL

class MySQLEvent(BaseModel):
    @timestamp: str
    host: str
    user: str | None
    db: str | None
    level: Literal["ERROR","WARNING","NOTE"]
    msg: str
    is_security: bool
    category: Literal["AUTH","PRIV","CONFIG","QUERY","RUNTIME"]
    confidence: float
  • 탐지: 인증 실패 연속, 권한 오류, 비정상 대용량/장시간 쿼리

4. VPN(OpenVPN 예)

class VPNEvent(BaseModel):
    @timestamp: str
    user: str | None
    src_ip: str
    action: Literal["CONNECT","AUTH_FAIL","DISCONNECT"]
    session_id: str | None
    is_suspicious: bool
    reason: str | None
    confidence: float
  • 탐지: 야간/해외 접속 급증, 실패 후 성공(크리덴셜 스터핑 가능성)

5. Linux System / SSH

class LinuxAuthEvent(BaseModel):
    @timestamp: str
    host: str
    process: str
    user: str | None
    src_ip: str | None
    action: Literal["AUTH_FAIL","USER_UNKNOWN","SU","SUDO","LOGIN","LOCKED"]
    is_attack: bool
    burst: int | None
    confidence: float
  • 탐지: 연속 AUTH_FAIL(버스트), user unknown, 루트 타깃 시 severity↑

알림 & IR 연동

1. 텔레그램(예)

export TELEGRAM_ENABLED=true
export TELEGRAM_TOKEN="xxxx:yyyy"
export TELEGRAM_CHAT_ID="123456"
export TELEGRAM_ALERT_LEVEL="CRITICAL"   # or HIGH
  • 메시지 구성: severity, type, summary, evidence, Kibana 링크, @document_id

2. TheHive 케이스 자동 생성(웹훅/REST)

# 예: 임계치 초과/고위험 이벤트 묶어 케이스 생성
curl -X POST https://thehive.local/api/case \
  -H "Authorization: Bearer $THEHIVE_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "title": "[SSH] Brute Force suspected - host linux.foo.com",
    "description": "연속 인증 실패 탐지. 로그 근거 및 통계 포함.",
    "severity": 3,         // 0-3
    "tlp": 2,
    "pap": 2,
    "tags": ["logai","ssh","bruteforce","auto"],
    "customFields": { "host": {"string":"linux.foo.com"} }
}'
# 관련 로그를 Artifact로 첨부(예: IP, URL, 파일해시 등)
curl -X POST https://thehive.local/api/case/<CASE_ID>/artifact \
  -H "Authorization: Bearer $THEHIVE_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "dataType": "ip",
    "data": "218.188.2.4",
    "message": "연속 SSH 실패 발생 IP",
    "tags": ["ioc","ssh"]
}'
  • 팁: 한 건 단건 알림보다 버스트/코릴레이션 묶음을 한 케이스로 생성 → 소음 감소

Wazuh / Elastic Defend 룰 연동(제안·태깅·자동화)

1. 전략

  • LLM 결과(attack_type, evidence, confidence)를 룰 후보 메타로 축적
  • 태깅: 기존 이벤트에 logai.attack_type, logai.confidence 태그 부여
  • 샌드박스 룰: 제안 룰은 비차단/모니터링 룰로 배포 → 오탐 검증 후 승격

2. Wazuh (개념 예시)

  • 방법 A: 커스텀 룰 파일 생성/배포
    • local_rules.xml 템플릿을 LLM 제안으로 생성 → GitOps(리뷰 후) → Wazuh 매니저에 배포
  • 방법 B: 규칙 파라미터(CDB lists) 업데이트
    • 빈번한 악성 URL/IP 패턴을 CDB 리스트로 관리 → 룰에서 참조
<!-- 예: SSH 다중 실패 탐지 강화(샘플) -->
<group name="authentication_success,pci_dss_10.2.4">
  <rule id="100201" level="7">
    <if_group>sshd,authentication_failed</if_group>
    <field name="srcip">\S+</field>
    <description>SSH multiple auth failures from same IP (LogAI)</description>
    <frequency>5</frequency>
    <timeframe>120</timeframe>
    <mitre id="T1110"/>
    <options>no_full_log</options>
    <actions>warn</actions>
    <tag>logai.ssh.bruteforce</tag>
  </rule>
</group>
  • 배포: Ansible/CI로 Wazuh 매니저에 업로드 후 재시작/릴로드

3. Elastic Defend(Elastic Security) Detection Rule API(예시)

# Kibana Detection Engine Rule 생성/업데이트 (예시 페이로드)
curl -X POST https://kibana.local/api/detection_engine/rules \
  -H "kbn-xsrf: true" -H "Authorization: Bearer $KIBANA_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "rule_id": "logai-ssh-burst",
    "name": "[LogAI] SSH Burst Auth Fail",
    "description": "LogAI 제안 기반: SSH 인증 실패 버스트",
    "severity": "high",
    "risk_score": 73,
    "type": "query",
    "index": ["logai-*"],
    "query": "attack_type:BRUTE_FORCE AND tags:linux_auth AND confidence:[0.7 TO *]",
    "interval": "5m",
    "from": "now-10m",
    "tags": ["logai","proposed"],
    "enabled": true
  }'
  • 운영 팁: proposed 태그로 후보기반 룰 세트를 구분, 성능/오탐 지표 모니터링 후 정식 승격

RAG 기반 맥락 보강 (내부 플레이북·지식 반영)

1. 소스

  • 내부 IR 플레이북, 차단 정책, 취약지표(IOC) 목록, 자산 중요도, 근무시간/국가 정책

2. 구현

# rag/index.py — 마크다운/HTML/문서 → 임베딩 → FAISS 등
docs = load_docs("./knowledge")                # *.md, *.pdf 텍스트
chunks = chunk(docs, size=1000, overlap=200)
vectors = embed(chunks)                         # 로컬 임베딩 모델 권장
faiss = build_faiss(vectors)

def retrieve(query: str) -> list[str]:
    return faiss.topk(embed([query]), k=5).texts
# engine/prompt.py — 추출 프롬프트에 RAG 컨텍스트 주입
def build_prompt(schema, log_line):
    kb = retrieve(f"보안분석 가이드 + {log_line[:200]}")
    return f"""
You are a SOC analyst. Follow the JSON schema strictly.

[KNOWLEDGE]
{kb[0]}\n{kb[1]}\n...

[LOG]
{log_line}

[SCHEMA]
{schema_json(schema)}
"""
  • 주의: RAG 문서에 운영 비밀/민감정보가 포함될 경우 권한·마스킹·감사 로그 필수

품질/성능/비용 관리

  1. 평가셋: 로그→정답 JSON 골든셋 구축, 필드별 정확도/재현율 측정
  2. 오탐 관리: 알림 전 임계치(confidence ≥ 0.7) + 버스트·코릴레이션 강화
  3. 성능: 샘플링·윈도우링, 소형 모델 우선, 실패 시 상위모델 폴백
  4. 캐시: 동일 패턴/유사 UA/경로는 결과 캐시
  5. 토큰 관리: truncation/요약, evidence 최소화(그러나 감사 가능성 유지)

보안 가드레일 점검표

  • 프롬프트 샌드박스: 로그 텍스트에 “규칙 무시” 포함돼도 시스템 프롬프트가 우선
  • PII/비밀: 외부 모델 전송 전 마스킹/해싱, 온프레 우선
  • 검증: Pydantic 엄격 모드, 실패는 격리 인덱스 + 재처리 큐
  • 감사: 모델/프롬프트/스키마/룰 버전 고정 + Git 이력
  • 권한: ES/Kibana/TheHive/Wazuh API 키, 네트워크 이그레스 제어
  • 소음 억제: Rate limit, Alert dedup, 조합 규칙(“고위험+근거 다수”만 알림)

체크리스트(요약)

  • 로컬 LLM 기본, 외부 전송 시 마스킹/프록시
  • 스키마 enum/regex/range로 엄격화
  • evidence 포함(민감정보 마스킹)
  • 실패 격리 인덱스/재시도 큐
  • 제안 룰은 샌드박스 태그 운영 → 검증 후 승격
  • GitOps(코드/프롬프트/룰/대시보드)
  • 운영 모니터링: 처리율, 실패율, 알림건수, 오탐률
300x250

설치/운영(샘플)

1. Docker Compose (단일 노드 실험)

version: "3.8"
services:
  es:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.14.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports: ["9200:9200"]

  kibana:
    image: docker.elastic.co/kibana/kibana:8.14.0
    environment:
      - ELASTICSEARCH_HOSTS=http://es:9200
    ports: ["5601:5601"]

  logai:
    image: yourorg/logai:latest
    environment:
      - ES_URL=http://es:9200
      - TELEGRAM_ENABLED=false
    volumes:
      - /var/log:/logs:ro
    command: >
      logai tail --analyzer nginx_access --log-path /logs/nginx/access.log --es-url $ES_URL

2. Index/ILM 템플릿, Kibana 대시보드 설치 스크립트

logai setup --es-url http://localhost:9200 --install-templates --install-dashboards

3. Systemd

[Unit]
Description=LogAI Tail - Linux System
After=network.target

[Service]
ExecStart=/usr/local/bin/logai tail --analyzer linux_system --log-path /var/log/messages --es-url http://127.0.0.1:9200
Restart=always
User=logai

[Install]
WantedBy=multi-user.target

운영 시나리오(예)

  1. 야간 SSH 실패 급증 감지 → LogAI가 attack_type=BRUTE_FORCE, confidence=0.86로 표준 JSON 생성
  2. ES 인덱싱 & 텔레그램 알림(증거 스니펫/대시보드 링크 포함)
  3. 코릴레이터가 동일 IP/호스트 묶어 TheHive 케이스 자동 생성
  4. 룰 엔진으로 Wazuh/Elastic Defend 후보 룰 갱신(샌드박스 태그)
  5. 다음 날 리뷰에서 오탐 낮음 확인 → 정식 룰로 승격, 임계치 미세 조정

PoC 체크리스트

  1. Day 1–2: ES/Kibana 단일 노드, 템플릿/ILM/대시보드 설치
  2. Day 3–4: Nginx Access, Linux System 분석기 구현(스키마+프롬프트)
  3. Day 5–7: 텔레그램 알림, TheHive 연동, RAG(플레이북 10문서)
  4. Day 8–10: Postfix/MySQL/VPN 추가, 성능 튜닝(샘플링/캐시)
  5. Day 11–12: 룰 제안 파이프라인(Wazuh/Elastic Defend), 샌드박스 룰 운영
  6. Day 13–14: 품질 평가셋/알림 지표/오탐 분석, 운영 가드레일 확정

최소 참조 구현(요약 코드)

# cli.py
import click
from engine.extract import llm_extract
from outputs.elasticsearch import es_index

@click.group()
def cli(): ...

@cli.command("run")
@click.option("--analyzer", required=True)
@click.option("--log-path", required=True)
@click.option("--es-url", required=True)
def run(analyzer, log_path, es_url):
    schema = load_schema(analyzer)
    provider = load_provider()
    for line in read_lines(log_path):
        try:
            obj = llm_extract(line, schema, provider)
            doc = postprocess(obj)       # score/map/dedupe
            es_index(es_url, "logai-analysis", doc)
        except Exception as e:
            es_index(es_url, "logai-deadletter", {"error": str(e), "raw": line})

if __name__ == "__main__":
    cli()
  • 스키마+프롬프트만으로 로그 해석을 자동화→개발·운영 비용 대폭 절감
  • Wazuh/Elastic DefendTheHive“제안→샌드박스→승격” 플로우로 연결해 탐지 품질을 지속 개선
  • RAG로 내부 지식/플레이북을 의사결정 근거로 주입 → 설명가능성과 일관성↑
  • 가드레일/감사/거버넌스엔터프라이즈 수준의 신뢰성 확보
728x90
그리드형(광고전용)

댓글