728x90
“정규식·파서 없이 결과 스키마만 선언하면, LLM이 보안 이벤트/이상징후를 구조화 JSON으로 변환 → SIEM 대시보드/알림/IR까지 자동화”
- “Declarative Log Analysis with LLM — 보안 이벤트 자동 탐지와 SIEM 통합”
- “LLM-Powered Log Analysis: 구조화된 보안 이벤트 추출과 운영 자동화”
- “로그는 말한다, AI가 해석한다 — Declarative Security Log Analyzer”
- “Regex 없는 로그 분석, 선언만 하면 AI가 처리한다”
- “Log Analysis Reinvented: Declarative Extraction + LLM”
- “보안 로그 분석의 패러다임 전환 — LLM 선언적 추출과 SIEM 연동”
- “LLM 기반 SOC 자동화: Wazuh, TheHive, Elastic Defend과 함께하는 로그 인텔리전스”
- “RAG + Declarative Extraction: 지식 기반 보안 로그 분석의 미래”
목표와 핵심 아이디어
- Declarative Extraction(선언적 추출): “무엇을 뽑을지”만 Pydantic 스키마로 선언 → LLM이 각 로그를 해석해 스키마 적합 JSON 출력
- 표준화된 출력: ECS(Elastic Common Schema) 친화 필드명, 신뢰도(confidence), 근거(evidence) 포함
- 확장성: 로그 타입(Nginx, Apache, Postfix, MySQL, VPN 등) 추가는 스키마+프롬프트만 작성
- 통합: Elasticsearch/Kibana 시각화, 텔레그램/Slack 알림, Wazuh/Elastic Defend 룰 제안, TheHive 케이스 자동 생성
- 보안: 온프레미스 LLM(Ollama/vLLM) 우선, 프롬프트 가드레일, 출력 검증, 감사 가능(버전 고정/GitOps)
아키텍처(논리)
[Log Sources]
├─ Files (/var/log/*), Journald, Syslog, S3, HTTP, DB, SSH Remote
└─ Taps (Kafka, Filebeat/Logstash, Vector/Fluent Bit)
[Collector & Router]
├─ Batch Reader / Live Tailer (sampling, windowing)
└─ Enricher(GeoIP, DNS, WHOIS, Asset CMDB)
[LLM Analysis Engine]
├─ Schema (Pydantic models) ← Declarative Extraction
├─ Prompt Templates (few-shot, guardrails)
├─ Provider (OpenAI / Ollama / vLLM, retry, rate-limit)
└─ Validator (JSON schema, Pydantic strict, fallback rules)
[Post-Processor]
├─ Scorer (severity, confidence)
├─ Normalizer (ECS mapping)
└─ Deduper/Correlator (IP/user/host/time)
[Outputs]
├─ Elasticsearch (ILM/Index Template, Kibana dashboards)
├─ Alerts (Telegram/Slack, email)
├─ IR (TheHive Webhook → Case/Task/Artifact)
└─ Controls (Wazuh/Elastic Defend rule/tag proposal)
[Governance/Ops]
├─ Config & Secrets (YAML + dotenv, Vault 권장)
├─ Metrics/Tracing (OpenTelemetry, Prometheus)
└─ GitOps (prompt+schema+rules 버전관리)
구성요소 설계
1. 스키마 기반 선언적 추출 (핵심)
# schemas/http_access.py
from pydantic import BaseModel, Field, AnyHttpUrl
from typing import Literal
class AccessEvent(BaseModel):
@timestamp: str # ISO8601
src_ip: str # ip
method: Literal["GET","POST","PUT","DELETE","HEAD","OPTIONS"]
url: str | AnyHttpUrl
status: int # 100~599
user_agent: str | None = None
attack_type: Literal["SQLI","XSS","PATH_TRAVERSAL","BRUTE_FORCE","NONE"] = "NONE"
is_attack: bool = False
confidence: float = Field(ge=0, le=1)
evidence: list[str] = [] # 원문 스니펫(민감정보 마스킹)
tags: list[str] = [] # ecs.event.category 등
ecs: dict = {} # 확장 필드(ecs.*)
# engine/extract.py (요지)
def llm_extract(log_line: str, schema: type[BaseModel], provider) -> schema:
prompt = build_prompt(schema, log_line) # 스키마 필드/타입을 포함한 지시문
raw = provider.complete(prompt, response_format="json") # 함수콜/JSON 모드 권장
obj = schema.model_validate_json(raw) # 엄격 검증
return obj
- 핵심 포인트
- 모델이 “추측”하기 쉬운 필드는 enum/regex/range로 강제
- evidence에 근거 스니펫을 담아 감사 가능성 확보
- 실패 시: 격리 인덱스(
logai-deadletter
)로 원문+에러 저장
2. LLM 프로바이더(로컬 우선)
# providers/base.py
class LLMProvider(Protocol):
def complete(self, prompt: str, response_format: str="json") -> str: ...
# providers/ollama.py (로컬)
class OllamaProvider:
def __init__(self, model="qwen2.5:7b-instruct", host="http://127.0.0.1:11434"):
...
def complete(self, prompt, response_format="json"):
# JSON mode 지원 모델 권장, 미지원 시 JSON repair 로직 포함
...
# providers/openai.py (외부 — 필요시)
- 보안: 기본은 로컬 모델. 외부 호출 시 PII 마스킹, 프록시/이그레스 제어
3. 수집/라이브 모니터링
# 배치
logai run --analyzer http_access --log-path /var/log/nginx/access.log
# 실시간(샘플링 1/10)
logai tail --analyzer linux_system --log-path /var/log/messages --sample 0.1
# 원격 SSH
logai tail --analyzer linux_system --ssh user@host:22 --log-path /var/log/messages
- 옵션:
--window-size
,--max-qps
,--retry
,--timeout
,--jsonl-out
,--es-url
4. 데이터 풍부화(Enrichment)
- GeoIP: MaxMind GeoLite →
src_geo.location(geo_point)
,country
,asn
- 자산/계정정보: 내부 CMDB/API →
host.owner
,dept
,criticality
- RAG(아래 7장): 플레이북/위협 지식으로 판단 근거 강화
5. 저장/시각화(Elasticsearch/Kibana)
PUT _index_template/logai_template
{
"index_patterns": ["logai-*"],
"template": {
"settings": { "index.lifecycle.name": "logai-ilm" },
"mappings": {
"properties": {
"@timestamp": {"type":"date"},
"src_ip": {"type":"ip"},
"src_geo.location": {"type":"geo_point"},
"attack_type": {"type":"keyword"},
"confidence": {"type":"float"},
"tags": {"type":"keyword"}
}
}
}
}
- Kibana Lens/Maps로 탐지 추이, 지리 분포, 최다 공격 IP/URL, 자주 발생하는 이벤트 시각화
로그 타입별 분석기 예시
1. Nginx Access
# analyzers/nginx_access.py
class NginxAccess(AccessEvent): # 앞서 정의한 AccessEvent 상속
referer: str | None = None
bytes: int | None = None
upstream_status: int | None = None
- 탐지 힌트:
"/wp-admin"
,"/phpmyadmin"
,union select
,"<script>"
,..%2F
등 패턴은 근거(evidence)에 반드시 포함
2. Postfix(메일)
class PostfixEvent(BaseModel):
@timestamp: str
host: str
queue_id: str | None
event: Literal["CONNECT","DISCONNECT","REJECT","DELAY","BOUNCE","DELIVER"]
src_ip: str | None
rcpt: str | None
reason: str | None
is_abuse: bool = False
confidence: float
- 탐지: 대량 실패(REJECT 급증), 해외 IP 스팸 패턴, SMTP AUTH 실패 브루트포스
3. MySQL
class MySQLEvent(BaseModel):
@timestamp: str
host: str
user: str | None
db: str | None
level: Literal["ERROR","WARNING","NOTE"]
msg: str
is_security: bool
category: Literal["AUTH","PRIV","CONFIG","QUERY","RUNTIME"]
confidence: float
- 탐지: 인증 실패 연속, 권한 오류, 비정상 대용량/장시간 쿼리
4. VPN(OpenVPN 예)
class VPNEvent(BaseModel):
@timestamp: str
user: str | None
src_ip: str
action: Literal["CONNECT","AUTH_FAIL","DISCONNECT"]
session_id: str | None
is_suspicious: bool
reason: str | None
confidence: float
- 탐지: 야간/해외 접속 급증, 실패 후 성공(크리덴셜 스터핑 가능성)
5. Linux System / SSH
class LinuxAuthEvent(BaseModel):
@timestamp: str
host: str
process: str
user: str | None
src_ip: str | None
action: Literal["AUTH_FAIL","USER_UNKNOWN","SU","SUDO","LOGIN","LOCKED"]
is_attack: bool
burst: int | None
confidence: float
- 탐지: 연속 AUTH_FAIL(버스트),
user unknown
, 루트 타깃 시 severity↑
알림 & IR 연동
1. 텔레그램(예)
export TELEGRAM_ENABLED=true
export TELEGRAM_TOKEN="xxxx:yyyy"
export TELEGRAM_CHAT_ID="123456"
export TELEGRAM_ALERT_LEVEL="CRITICAL" # or HIGH
- 메시지 구성:
severity, type, summary, evidence, Kibana 링크, @document_id
2. TheHive 케이스 자동 생성(웹훅/REST)
# 예: 임계치 초과/고위험 이벤트 묶어 케이스 생성
curl -X POST https://thehive.local/api/case \
-H "Authorization: Bearer $THEHIVE_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"title": "[SSH] Brute Force suspected - host linux.foo.com",
"description": "연속 인증 실패 탐지. 로그 근거 및 통계 포함.",
"severity": 3, // 0-3
"tlp": 2,
"pap": 2,
"tags": ["logai","ssh","bruteforce","auto"],
"customFields": { "host": {"string":"linux.foo.com"} }
}'
# 관련 로그를 Artifact로 첨부(예: IP, URL, 파일해시 등)
curl -X POST https://thehive.local/api/case/<CASE_ID>/artifact \
-H "Authorization: Bearer $THEHIVE_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"dataType": "ip",
"data": "218.188.2.4",
"message": "연속 SSH 실패 발생 IP",
"tags": ["ioc","ssh"]
}'
- 팁: 한 건 단건 알림보다 버스트/코릴레이션 묶음을 한 케이스로 생성 → 소음 감소
Wazuh / Elastic Defend 룰 연동(제안·태깅·자동화)
1. 전략
- LLM 결과(attack_type, evidence, confidence)를 룰 후보 메타로 축적
- 태깅: 기존 이벤트에
logai.attack_type
,logai.confidence
태그 부여 - 샌드박스 룰: 제안 룰은 비차단/모니터링 룰로 배포 → 오탐 검증 후 승격
2. Wazuh (개념 예시)
- 방법 A: 커스텀 룰 파일 생성/배포
local_rules.xml
템플릿을 LLM 제안으로 생성 → GitOps(리뷰 후) → Wazuh 매니저에 배포
- 방법 B: 규칙 파라미터(CDB lists) 업데이트
- 빈번한 악성 URL/IP 패턴을 CDB 리스트로 관리 → 룰에서 참조
<!-- 예: SSH 다중 실패 탐지 강화(샘플) -->
<group name="authentication_success,pci_dss_10.2.4">
<rule id="100201" level="7">
<if_group>sshd,authentication_failed</if_group>
<field name="srcip">\S+</field>
<description>SSH multiple auth failures from same IP (LogAI)</description>
<frequency>5</frequency>
<timeframe>120</timeframe>
<mitre id="T1110"/>
<options>no_full_log</options>
<actions>warn</actions>
<tag>logai.ssh.bruteforce</tag>
</rule>
</group>
- 배포: Ansible/CI로 Wazuh 매니저에 업로드 후 재시작/릴로드
3. Elastic Defend(Elastic Security) Detection Rule API(예시)
# Kibana Detection Engine Rule 생성/업데이트 (예시 페이로드)
curl -X POST https://kibana.local/api/detection_engine/rules \
-H "kbn-xsrf: true" -H "Authorization: Bearer $KIBANA_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"rule_id": "logai-ssh-burst",
"name": "[LogAI] SSH Burst Auth Fail",
"description": "LogAI 제안 기반: SSH 인증 실패 버스트",
"severity": "high",
"risk_score": 73,
"type": "query",
"index": ["logai-*"],
"query": "attack_type:BRUTE_FORCE AND tags:linux_auth AND confidence:[0.7 TO *]",
"interval": "5m",
"from": "now-10m",
"tags": ["logai","proposed"],
"enabled": true
}'
- 운영 팁:
proposed
태그로 후보기반 룰 세트를 구분, 성능/오탐 지표 모니터링 후 정식 승격
RAG 기반 맥락 보강 (내부 플레이북·지식 반영)
1. 소스
- 내부 IR 플레이북, 차단 정책, 취약지표(IOC) 목록, 자산 중요도, 근무시간/국가 정책 등
2. 구현
# rag/index.py — 마크다운/HTML/문서 → 임베딩 → FAISS 등
docs = load_docs("./knowledge") # *.md, *.pdf 텍스트
chunks = chunk(docs, size=1000, overlap=200)
vectors = embed(chunks) # 로컬 임베딩 모델 권장
faiss = build_faiss(vectors)
def retrieve(query: str) -> list[str]:
return faiss.topk(embed([query]), k=5).texts
# engine/prompt.py — 추출 프롬프트에 RAG 컨텍스트 주입
def build_prompt(schema, log_line):
kb = retrieve(f"보안분석 가이드 + {log_line[:200]}")
return f"""
You are a SOC analyst. Follow the JSON schema strictly.
[KNOWLEDGE]
{kb[0]}\n{kb[1]}\n...
[LOG]
{log_line}
[SCHEMA]
{schema_json(schema)}
"""
- 주의: RAG 문서에 운영 비밀/민감정보가 포함될 경우 권한·마스킹·감사 로그 필수
품질/성능/비용 관리
- 평가셋: 로그→정답 JSON 골든셋 구축, 필드별 정확도/재현율 측정
- 오탐 관리: 알림 전 임계치(
confidence ≥ 0.7
) + 버스트·코릴레이션 강화 - 성능: 샘플링·윈도우링, 소형 모델 우선, 실패 시 상위모델 폴백
- 캐시: 동일 패턴/유사 UA/경로는 결과 캐시
- 토큰 관리: truncation/요약, evidence 최소화(그러나 감사 가능성 유지)
보안 가드레일 점검표
- 프롬프트 샌드박스: 로그 텍스트에 “규칙 무시” 포함돼도 시스템 프롬프트가 우선
- PII/비밀: 외부 모델 전송 전 마스킹/해싱, 온프레 우선
- 검증: Pydantic 엄격 모드, 실패는 격리 인덱스 + 재처리 큐
- 감사: 모델/프롬프트/스키마/룰 버전 고정 + Git 이력
- 권한: ES/Kibana/TheHive/Wazuh API 키, 네트워크 이그레스 제어
- 소음 억제: Rate limit, Alert dedup, 조합 규칙(“고위험+근거 다수”만 알림)
체크리스트(요약)
- 로컬 LLM 기본, 외부 전송 시 마스킹/프록시
- 스키마 enum/regex/range로 엄격화
- evidence 포함(민감정보 마스킹)
- 실패 격리 인덱스/재시도 큐
- 제안 룰은 샌드박스 태그 운영 → 검증 후 승격
- GitOps(코드/프롬프트/룰/대시보드)
- 운영 모니터링: 처리율, 실패율, 알림건수, 오탐률
300x250
설치/운영(샘플)
1. Docker Compose (단일 노드 실험)
version: "3.8"
services:
es:
image: docker.elastic.co/elasticsearch/elasticsearch:8.14.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports: ["9200:9200"]
kibana:
image: docker.elastic.co/kibana/kibana:8.14.0
environment:
- ELASTICSEARCH_HOSTS=http://es:9200
ports: ["5601:5601"]
logai:
image: yourorg/logai:latest
environment:
- ES_URL=http://es:9200
- TELEGRAM_ENABLED=false
volumes:
- /var/log:/logs:ro
command: >
logai tail --analyzer nginx_access --log-path /logs/nginx/access.log --es-url $ES_URL
2. Index/ILM 템플릿, Kibana 대시보드 설치 스크립트
logai setup --es-url http://localhost:9200 --install-templates --install-dashboards
3. Systemd
[Unit]
Description=LogAI Tail - Linux System
After=network.target
[Service]
ExecStart=/usr/local/bin/logai tail --analyzer linux_system --log-path /var/log/messages --es-url http://127.0.0.1:9200
Restart=always
User=logai
[Install]
WantedBy=multi-user.target
운영 시나리오(예)
- 야간 SSH 실패 급증 감지 → LogAI가
attack_type=BRUTE_FORCE
,confidence=0.86
로 표준 JSON 생성 - ES 인덱싱 & 텔레그램 알림(증거 스니펫/대시보드 링크 포함)
- 코릴레이터가 동일 IP/호스트 묶어 TheHive 케이스 자동 생성
- 룰 엔진으로 Wazuh/Elastic Defend 후보 룰 갱신(샌드박스 태그)
- 다음 날 리뷰에서 오탐 낮음 확인 → 정식 룰로 승격, 임계치 미세 조정
PoC 체크리스트
- Day 1–2: ES/Kibana 단일 노드, 템플릿/ILM/대시보드 설치
- Day 3–4: Nginx Access, Linux System 분석기 구현(스키마+프롬프트)
- Day 5–7: 텔레그램 알림, TheHive 연동, RAG(플레이북 10문서)
- Day 8–10: Postfix/MySQL/VPN 추가, 성능 튜닝(샘플링/캐시)
- Day 11–12: 룰 제안 파이프라인(Wazuh/Elastic Defend), 샌드박스 룰 운영
- Day 13–14: 품질 평가셋/알림 지표/오탐 분석, 운영 가드레일 확정
최소 참조 구현(요약 코드)
# cli.py
import click
from engine.extract import llm_extract
from outputs.elasticsearch import es_index
@click.group()
def cli(): ...
@cli.command("run")
@click.option("--analyzer", required=True)
@click.option("--log-path", required=True)
@click.option("--es-url", required=True)
def run(analyzer, log_path, es_url):
schema = load_schema(analyzer)
provider = load_provider()
for line in read_lines(log_path):
try:
obj = llm_extract(line, schema, provider)
doc = postprocess(obj) # score/map/dedupe
es_index(es_url, "logai-analysis", doc)
except Exception as e:
es_index(es_url, "logai-deadletter", {"error": str(e), "raw": line})
if __name__ == "__main__":
cli()
- 스키마+프롬프트만으로 로그 해석을 자동화→개발·운영 비용 대폭 절감
- Wazuh/Elastic Defend와 TheHive를 “제안→샌드박스→승격” 플로우로 연결해 탐지 품질을 지속 개선
- RAG로 내부 지식/플레이북을 의사결정 근거로 주입 → 설명가능성과 일관성↑
- 가드레일/감사/거버넌스로 엔터프라이즈 수준의 신뢰성 확보
728x90
그리드형(광고전용)
댓글