728x90
목표와 배경
- 가시성: 프롬프트→의사결정→툴 호출→결과→사용자 알림까지 전 단계 추적.
- 감사·재현성: 동일 입력으로 동일 결과 재현, 무결성(해시체인) 보장.
- 안전성: 위험 액션은 사전 정책 검증 + 승인(SoD) 후 실행.
- 운영효율: 일/주간 리포트 자동 생성, 이상 징후 탐지 → 케이스화.
아키텍처 요약
- 수집(Ingest): HTTP/Webhook, gRPC, Kafka(권장), Filebeat/Elastic Agent
- 브로커: Kafka(토픽:
ai.audit
,ai.action
,ai.report
) - 저장
- 빠른 탐색: Elasticsearch/OpenSearch (ECS 매핑)
- 원본 보존: Object Storage(버전·서명·WORM 옵션)
- 분석
- 규칙(EQL/Sigma/ElastAlert) + 통계/ML(비정상 스코어)
- 요약/리포트: LLM(사내 프록시) + 서술형 리포팅
- 오케스트레이션: LangGraph/MCP/서드파티 프레임워크 + 정책 PDP(OPA/Cedar)
- 대응: TheHive/Slack/Jira 티켓화, Wazuh와 연계된 자동조치(저위험)
표준 로그 스키마(AI 관제용)
ECS를 최대한 따르되, 의사결정/프롬프트/툴호출을 확장 필드로 정의합니다.
{
"@timestamp": "2025-09-03T12:34:56.789Z",
"event": { "category": "ai", "type": "info", "action": "tool_call" },
"agent": { "id": "agent-hr-01", "name": "hr-assistant", "type": "langgraph" },
"user": { "id": "u-123", "email": "user@example.com" },
"trace": { "id": "2f6c...e1", "parent_id": "..." },
"labels": { "tenant": "pageskr", "env": "prod", "pii": "maybe" },
"ai": {
"session_id": "sess-abc",
"prompt_redacted": true,
"input_hash": "sha256:...",
"output_hash": "sha256:...",
"decision": {
"reason_redacted": true,
"risk_score": 72,
"policy_checks": [
{"id": "P-DLP-01", "result": "deny", "evidence": "email detected"}
],
"approval": { "required": true, "status": "pending" }
},
"tool": { "name": "gsuite.directory.deleteUser", "args_schema": "v1", "args": {"userId":"..."} },
"result": { "status": "blocked_by_policy", "error": null }
},
"integrity": {
"hmac": "base64:...",
"prev_hash": "sha256:prevrec...",
"chain_index": 102391
}
}
필수 필드 체크(요점)
- 무결성:
integrity.prev_hash
(체인) +integrity.hmac
(서명) - 정책/승인:
ai.decision.policy_checks
,ai.decision.approval
- 민감정보:
prompt_redacted
,labels.pii
- 재현성:
ai.input_hash
,ai.output_hash
,trace.id
에이전트 측 로깅 라이브러리(파이썬 예시)
HMAC 서명 + 해시체인까지 포함한 경량 로거 예시입니다.
import os, hmac, hashlib, json, time, requests
SECRET = os.getenv("AI_AUDIT_HMAC_KEY","change-me")
PREV = {"hash": None, "idx": -1}
INGEST_URL = "https://siem.example.com/ingest"
def sha256_hex(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def emit(event: dict):
global PREV
body = json.dumps(event, ensure_ascii=False, separators=(",",":")).encode()
prev_hash = PREV["hash"] or "0"*64
chain_index = PREV["idx"] + 1
chain_seed = (prev_hash + "|" + sha256_hex(body)).encode()
h = hmac.new(SECRET.encode(), chain_seed, hashlib.sha256).digest()
record = event | {
"integrity": {
"prev_hash": prev_hash,
"chain_index": chain_index,
"hmac": "base64:"+h.hex()
}
}
r = requests.post(INGEST_URL, json=record, timeout=3)
r.raise_for_status()
PREV = {"hash": sha256_hex(json.dumps(record, ensure_ascii=False).encode()), "idx": chain_index}
# 사용 예: 툴 호출 전
emit({
"@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S.000Z", time.gmtime()),
"event": {"category":"ai","type":"info","action":"tool_call"},
"agent": {"id":"agent-hr-01","name":"hr-assistant","type":"langgraph"},
"ai": {
"session_id":"sess-abc",
"prompt_redacted": True,
"input_hash":"sha256:...",
"decision":{"reason_redacted":True,"risk_score":72,"policy_checks":[],"approval":{"required":True,"status":"pending"}},
"tool":{"name":"gsuite.directory.deleteUser","args":{"userId":"test@example.com"}}
}
})
수집·적재 파이프라인(Elastic 기준)
1) Filebeat / Elastic Agent 입력
# filebeat.yml
filebeat.inputs:
- type: http_endpoint
enabled: true
listen_address: 0.0.0.0
listen_port: 9080
response_code: 204
response_body: ""
processors:
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true
output.elasticsearch:
hosts: ["https://es.example.com:9200"]
index: "ai-audit-%{+yyyy.MM.dd}"
2) 인덱스 템플릿 & 수명주기(ILM)
PUT _ilm/policy/ai-audit-90d
{"policy":{"phases":{"hot":{"actions":{}},"warm":{"min_age":"30d","actions":{"forcemerge":{"max_num_segments":1}}},"delete":{"min_age":"90d","actions":{"delete":{}}}}}}
PUT _index_template/ai-audit
{"index_patterns":["ai-audit-*"],"template":{"settings":{"index.lifecycle.name":"ai-audit-90d"},"mappings":{"dynamic":"strict","properties":{
"@timestamp":{"type":"date"},
"event":{"properties":{"category":{"type":"keyword"},"action":{"type":"keyword"},"type":{"type":"keyword"}}},
"agent":{"properties":{"id":{"type":"keyword"},"name":{"type":"keyword"},"type":{"type":"keyword"}}},
"user":{"properties":{"id":{"type":"keyword"},"email":{"type":"keyword"}}},
"trace":{"properties":{"id":{"type":"keyword"},"parent_id":{"type":"keyword"}}},
"labels":{"type":"object","dynamic":true},
"ai":{"properties":{
"session_id":{"type":"keyword"},
"prompt_redacted":{"type":"boolean"},
"input_hash":{"type":"keyword"},
"output_hash":{"type":"keyword"},
"decision":{"properties":{
"reason_redacted":{"type":"boolean"},
"risk_score":{"type":"short"},
"policy_checks":{"type":"nested","properties":{"id":{"type":"keyword"},"result":{"type":"keyword"},"evidence":{"type":"keyword"}}},
"approval":{"properties":{"required":{"type":"boolean"},"status":{"type":"keyword"},"approver":{"type":"keyword"}}}
}},
"tool":{"properties":{"name":{"type":"keyword"}}},
"result":{"properties":{"status":{"type":"keyword"},"error":{"type":"keyword"}}}
}},
"integrity":{"properties":{"prev_hash":{"type":"keyword"},"chain_index":{"type":"long"},"hmac":{"type":"keyword"}}}
}}}}
이상행위 탐지(규칙 예시)
1) EQL(Elasticsearch Query Language)
sequence by ai.session_id
[ ai where event.action == "tool_call" and ai.tool.name == "gsuite.directory.deleteUser" ]
[ ai where event.action == "tool_result" and ai.result.status == "success" ]
until [ ai where ai.decision.approval.status == "approved" ]
→ 승인 전 삭제 성공 시 경보.
2) KQL(간단)
event.category:"ai" and ai.decision.risk_score >= 70 and ai.result.status:"success"
3) ElastAlert(요약)
name: ai-high-risk-success
type: any
index: ai-audit-*
filter:
- query:
query_string:
query: 'event.category:"ai" AND ai.decision.risk_score:[70 TO *] AND ai.result.status:"success"'
alert:
- slack
alert_text: "고위험 성공 액션: {0}"
alert_text_args: ["ai.tool.name"]
요약 리포트 자동화(일/주간)
- 지표: 고위험 액션 수, 미승인 성공 수, 실패율, 평균 응답시간, 최다 호출 툴 Top10, 정책차단 Top5
- 워크플로우: ES 집계 → LLM로 설명문 생성 → Slack/메일/Google Docs 게시
300x250
파이썬 집계 스니펫(개념)
# ES date_histogram + terms 집계 → Markdown 템플릿 렌더 → Slack Webhook 전송
보안 정책·승인(OPA 예시)
package ai.policy
default allow = false
high_risk_tools = {"gsuite.directory.deleteUser","cloud.vm.delete","secrets.readAll"}
allow {
input.ai.tool.name not in high_risk_tools
}
require_approval {
input.ai.tool.name in high_risk_tools
}
- 엔진 앞단에서
require_approval
이면 실행 중단 + 승인 요청 이벤트 생성.
무결성·프라이버시
- 해시체인 + HMAC: 로그 조작 탐지.
- 프롬프트 비식별화:
prompt_redacted=true
, 전문은 암호화 저장(별도 KMS). - 권한분리(SoD): 에이전트 운영자 ≠ 승인자 ≠ SIEM 관리자.
- 키관리: KMS/HashiCorp Vault, 키 순환 주기화.
- 보관정책: 운영지표(90일), 감사원본(WORM, 1~3년).
단계별 도입 플랜
- Shadow Mode: 읽기 전용, 전 이벤트 로깅 검증
- Read-Only Action: 저위험 조회형 툴만 허용
- Low-Risk Write: 승인 후 실행, 롤백 자동화
- 확장: 크리티컬 툴(삭제/권한변경) 별도 라인 승인
유관 시스템 연계 팁
- Wazuh: 에이전트 프로세스/포트 상시 점검(SCA), 로그 수집 데코더 추가
- TheHive: 고위험 탐지 시 자동 케이스 생성(cURL/Webhook)
- n8n: Slack 승인 플로우 + Google Sheets 주간 요약 적재
- Osquery/FleetDM: 에이전트 런타임, Credential 파일, 네트워크 호출 모니터링
TheHive 케이스 생성 예시
curl -XPOST https://thehive.example.com/api/case \
-H 'Authorization: Bearer $TOKEN' -H 'Content-Type: application/json' \
-d '{"title":"AI High-Risk Action","severity":2,"tlp":2,"tags":["ai","high-risk"],"description":"..."}'
내부 사용자 보안 가이드(체크리스트)
- 항상 로깅: 툴 호출 전후, 정책판정, 승인흐름, 오류 포함
- 민감정보: 프롬프트/결과 전문은 비식별화·암호화·부분 마스킹
- 정책 우선: 툴 화이트리스트, 고위험은 기본 차단 + 승인필수
- 테스트: 재현성(시드 고정), 프롬프트 인젝션/데이터 유출 테스트 케이스
- 키·비밀: 코드 포함 금지, Vault 참조, 단기 토큰
- 대시보드: 위험지표 위젯 상시 모니터, 주간 리포트 확인 및 리턴 액션
- 장애대응: 브로커 지연·SIEM 장애 시 로컬 버퍼링 및 재전송
Wazuh 디코더/룰(개념 예시)
<decoder name="ai-audit">
<prematch>\"event\"\:\{\"category\"\:\"ai\"</prematch>
</decoder>
<rule id="120001" level="10">
<if_decoder_name>ai-audit</if_decoder_name>
<match>\"risk_score\"\:\s*(7[0-9]|[89][0-9]|100)</match>
<description>AI high risk action</description>
<group>ai_audit,policy,</group>
</rule>
유사/활용사례(아이디어)
- 고객지원/권한관리 자동화 에이전트의 권한 변경/삭제/대량 작업 관제
- DevOps 에이전트의 클라우드 리소스 삭제/보안그룹 수정 사전 승인
- DLP 보조 에이전트의 대량 데이터 조회/내보내기 차단·승인 트랙
728x90
그리드형(광고전용)
댓글