- 🔥 “iptables + ulogd2 RATE로 트래픽 로그 폭주 잡기!”
- 🚦 “NFLOG threshold & RATE 필터로 ‘1줄 요약’ 방화벽 로그 완성하기”
- 📉 “10배 줄이는 iptables 로그: nflog-threshold + RATE 실전 가이드”
- 🛠️ “ulogd2 RATE 집계로 SSH·웹 트래픽 로그 다이어트 성공기”
- ✨ “방화벽 로그, 이젠 요약하자! iptables × ulogd2 RATE 세팅 비법”
🔥 방화벽 로그 ‘6 개월 이상’ 보관, 왜 & 무엇을 어떻게?
1️⃣ 법·규정별 “최소 보관 기간” 한눈에 보기
구분 | 적용 범위 | 요구/권고 보관 기간 |
---|---|---|
개인정보 보호법 + 「개인정보의 안전성 확보조치 기준」제 8조 |
개인정보처리시스템 접속기록 | ① 일반: 1 년↑ ② 5만 명↑·민감/고유식별정보·기간통신사업자: 2 년↑ |
ISMS-P 인증기준 2.9.4 | 서버·네트워크·보안장비 로그 | “법령 이상” + 관행상 6 개월↑ 확보 권고 |
PCI-DSS 4.0 Req 10.7 | 카드결제 관련 시스템 | 1 년↑(90 일 즉시 조회 가능) |
ISO 27001 (Annex A 8.15) | 전 산업권 일반 | 통상 12 개월 확보 권고 |
NIST SP 800-92 | 연방기관·모범지침 | “사고 분석·조사에 충분한 기간(≥ 6 개월 권장)” |
💡 정리
- 공공·민간 대부분 ‘6 개월 이상’ 최소 기준으로 삼고, 개인정보·결제·금융 등 특수 분야는 1 ~ 2 년 이상을 요구합니다.
- 방화벽 로그라 하더라도 개인정보 처리·결제 트래픽을 다룬다면 상위 규범(PCI-DSS·PIPA 등)에 맞춰 기간을 늘려야 합니다.
2️⃣ “어떤” 방화벽 로그를 남길까? (필수 필드 체크리스트)
카테고리 | 필드 예시 | 설명·활용 포인트 |
---|---|---|
트래픽 정보 | ▶ 발신 / 수신 IP, Port, Protocol ▶ Interface, Zone |
위협 IP 추적, 포렌식에 핵심 |
행위(ACTION) | allow / deny / drop / reset 등 | 정책 효과성·위협 차단 근거 |
정책 메타 | ▶ Rule ID·이름 ▶ 정책 그룹 |
‘어떤 룰이 언제 트리거?’를 역추적 |
세션·NAT | ▶ Session ID·Duration ▶ 변환 前/後 IP·Port |
내부-외부 매핑, 응답 지연 분석 |
패킷 통계 | Bytes, Packets, TCP Flags | DDoS·대용량 전송 탐지 |
위협·시그니처 | IDS/IPS Hit, Malware Name | 상관 분석·알림 트리거 |
관리자 활동 | ▶ 로그인·로그아웃 ▶ 정책 추가/변경/삭제 ▶ Device OS Upgrade |
변경 이력과 사고 연관 여부 확인 |
3️⃣ “어떻게” 안전하게 보존할까? 실무 가이드
- 🛡️ 중앙 집중 로그 서버
- Syslog-TLS, Kafka-Beat, 또는 Cloud-based SIEM(ELK + S3 Object-Lock) 등으로 단방향 전송.
- 🔒 무결성 보장
- 블록체인 해시, WORM 스토리지, 혹은 파일 단위 SHA-256 + RFC 3161 Time-stamp.
- 🗄️ 이중 저장 & 암호화
- 온프레미스(RAID NAS) + 오프사이트(S3 Glacier, 타 DC) 병행.
- 👀 정책적 접근통제
- RBAC로 ‘읽기 전용’ 계정 분리, 관리자 접근 시 MFA 요구.
- 🔄 수명주기 관리
- ① 90 일: 실시간 SIEM 인덱스
② 6 개월: 온프레미스 저비용 스토리지
③ 2 년+: 압축 + Cold archive / 삭제 정책 자동 실행.
- ① 90 일: 실시간 SIEM 인덱스
- 📝 정책·절차 문서화 (ISMS-P 2.9.4 요구)
- 로그 유형·보존 기간·보존 위치·점검 주기·삭제 방법 명문화 → 내부 감사·외부 인증 시 제출.
- 🛠️ 정기 점검 & 알림
- Cron + CLI(
logrotate --state
) 또는 Elastic ILM 정책으로 저장 용량·무결성 주기 점검 → Slack·Teams 알림.
- Cron + CLI(
4️⃣ 내부 직원에게 제시할 “점검 포인트” 체크리스트
# | 확인 항목 | 주기 |
---|---|---|
1 | 방화벽 장비의 시간 동기화(NTP) 상태 | 매일 |
2 | 정책 변경 로그 자동 수집 여부 | 실시간 |
3 | 허용 트래픽 중 비정상 Port/IP 탐지 룰 존재 | 매주 |
4 | 로그 전송 실패 알림(UDP Drop, Queue Full) | 24 h |
5 | 보존 스토리지 무결성(HASH 재검증) | 분기 |
6 | 만료 로그 자동 파기 로테이션 정책 | 월간 |
7 | DR 센터/Cloud 백업 복구 테스트 | 반기 |
5️⃣ “6 개월”은 출발선일 뿐! 🚀
- 법적으로: 개인정보·결제 등은 1 ~ 2 년이 기본, 금융권·감사 목적은 5 년까지 요구될 수 있습니다.
- 실무적으로: 사고 탐지·포렌식을 생각하면 6 ~ 12 개월 온-디스크 + 장기 보관을 권장합니다.
- 보안 관점: 로그는 “있기만 하면” 끝이 아닙니다. 무결성·가용성·추적성 확보와 주기적 분석까지 포함되어야 진짜 컴플라이언스를 만족합니다.
📝 이 가이드를 기반으로 사내 정보보호 관리계획(로그 관리 절차, DR, 백업 정책)을 업데이트하시면 ISMS-P·PCI-DSS 심사 대응과 침해사고 포렌식 준비에 큰 도움이 됩니다!
✨ 한마디 정리
- “방화벽 로그 = 네트워크 관점의 ‘교통기록’”
- “서비스(SSH/Web/DB) 애플리케이션 로그 = 세션 내부의 ‘행위기록’”
→ 두 레이어는 역할이 다르므로 중복이라기보다 상호보완이 핵심입니다. 다만 모든 허용(allow) 트래픽을 방화벽에서 100% 남기느냐는 “규제‧위험도‧저장·분석 자원”에 따라 Tuning 가능해요.
1️⃣ 왜 둘 다 필요할까?
- 규제 시각
- PCI-DSS 4.0 10번 항목은 “모든 시스템 컴포넌트 접근을 로그로 남겨라”고 명시해 네트워크·시스템·애플리케이션 전 계층을 요구합니다.
- NIST SP 800-92도 “중요한 데이터만 선별해야 하지만, 레이어마다 남기는 값이 달라 서로 연결 분석이 가능해야 한다”고 권고합니다.
- 포렌식 시각
- 방화벽: ‘누가·언제·어디서’ 접근했는지 (IP/포트/액션)
- 앱 로그: ‘무엇을 했는지’ (쿼리·URL·계정·오류)
→ 둘을 조합해야 완전한 타임라인이 만들어집니다.
2️⃣ 방화벽 로그, 어디까지 남길까?
구분 | 필수 | 선택(튜닝 가능) |
---|---|---|
🚫 Deny/Drop/Reset | 항상 기록 ― 규제·위협 분석 | — |
⚠️ Threat/IPS/Anti-Virus | 항상 기록 ― 탐지·대응 근거 | — |
🛠 Config & Admin Event | 항상 기록 ― 변경 추적 | — |
✅ Allow Traffic (인터넷↔DMZ, 외부 API) | 대부분 기록 (요약 필드라도) | 상세 패킷/바이트는 샘플링 가능 |
✅ Allow Traffic (내부 세그먼트) | 요약 세션 로그만 or Sampling 1 % | 완전 제외 가능 (아래 조건 충족 시) |
제외 조건 가이드
- 동일 정보가 애플리케이션/시스템 로그에 이미 포함.
- SIEM에서 세션·계정 매핑 규칙이 있어 교차 조회 가능.
- 제외해도 규제(PCI-DSS·ISMS-P 등) 항목 누락 없이 증빙 가능.
- 제외 정책·사유를 문서화(로그 관리 절차서)하고 주기적 재검토.
3️⃣ 실무 튜닝 예시
플랫폼 | 최소 설정 예 | ‘허용 트래픽 축소’ 옵션 |
---|---|---|
Palo Alto | log-setting 에서 Deny, Threat, Config 프로파일 Always Enable |
traffic-log-type = start end 로 세션 단위 기록, discard-non-syn 으로 불필요 TCP ACK 드롭 |
FortiGate | set logtraffic all + set local-in-logging enable |
set utm enable , 내부 VLAN 간 set logtraffic utm → disable |
iptables + ulogd2 | -j NFLOG --nflog-group 32 (DROP/REJECT 체인) |
nflog-threshold 10 : 10개 패킷당 1회 요약 |
AWS WAF v2 | sampledRequestsEnabled = true (위협 IP만) |
requestSamplingRate = 0.01 (1 %) |
4️⃣ 저장소·분석 설계 팁
- 계층 태그 맞춤:
source_category=firewall
,source_category=app
등 로그 라우팅 시 구분. - 공통 키 매핑
- 방화벽
src_ip
↔ 앱로그client_ip
- 방화벽
session_id
↔ 웹로그x-forwarded-for
+request_id
- 방화벽
- 저장수명 차등
- 방화벽 요약 세션: 90일 인덱스 → 1년 콜드 스토리지
- Threat·Deny: 1년 인덱스 + 3년 장기 보관
- Dashboards: “Deny Top 10”, “알 수 없는 프로토콜 허용 세션” 등 네트워크 가시성 보완.
5️⃣ 내부 점검 체크리스트 ✅
- 로그 수집 누락율 < 1 %? (전송 실패 알람)
- 행위 추적성: IP → 계정 → 요청 URL까지 단일 쿼리로 연결되는가?
- 튜닝 예외 목록: 변경 시 CAB 승인·문서화 완료?
- 규제 대비: 매년 ISMS-P·PCI 내부 감사에서 증적 샘플 추출 테스트.
- 방화벽 로그는 “외부 관문” 관점, 애플리케이션 로그는 “내부 행위” 관점.
- 중복을 100% 제거보다는 안전·규제 범위를 우선 고려해 허용 트래픽 세부정보를 단계적으로 줄이는 접근이 현실적.
- 튜닝 시 반드시 문서화·교차분석 가능성·규제 증빙을 확보하세요. 이렇게 하면 스토리지·분석 비용을 아끼면서도 포렌식·컴플라이언스 두 마리 토끼를 잡을 수 있습니다!
🚦 iptables + ulogd2 RATE로 트래픽 로그 “진짜” 요약하기
“nflog-threshold=10”만으로는 패킷을 묶어 보내 줄 뿐, 줄여 주지는 않습니다!
→ ulogd2의 RATE 필터와 함께 쓰면 N초마다/동일 키마다 집계된 한 줄 로그를 얻을 수 있어요.
1️⃣ 개념 먼저 이해하기
- 🐧 NFLOG 커널 큐
--nflog-threshold 10
→ _10개 패킷_을 한꺼번에 Netlink로 전달(배치 전송).- 모든 패킷이 여전히 userspace(ulogd2)로 들어옵니다.
- 📊 ulogd2 RATE 필터
RATE
플러그인이 동일 필드 집합(예: 5 tuple)당 N 초 동안 패킷·바이트를 누적 후 1줄 요약만 출력.- 결과적으로 로그 양 급감 + 가독성 향상.
2️⃣ 필요 패키지 설치
# Debian/Ubuntu
sudo apt-get install -y ulogd2 ulogd2-nflog ulogd2-print
# RHEL/CentOS(8↑) – EPEL 필요
sudo dnf install -y ulogd ulogd-nflog ulogd-print
🔐 보안 팁: ulogd2는 root로 띄우되, 출력 파일 권한을 root:adm 640처럼 제한하세요.
3️⃣ iptables 규칙 예시 (NFLOG)
# SSH 트래픽만 요약하고 싶을 때
iptables -A INPUT -p tcp --dport 22 \
-j NFLOG --nflog-group 10 \
--nflog-prefix "SSH_IN" \
--nflog-threshold 10
--nflog-group 10
→ ulogd2가 구독할 그룹 번호--nflog-prefix
→ 요약 로그 앞에 표시될 식별자
4️⃣ /etc/ulogd.conf 핵심 설정
[global]
logfile="/var/log/ulogd/ulogd.log"
loglevel=3
########################################
# ① 플러그인 로드
plugin="/usr/lib/ulogd/ulogd_inppkt_NFLOG.so"
plugin="/usr/lib/ulogd/ulogd_filter_BASE.so"
plugin="/usr/lib/ulogd/ulogd_filter_RATE.so"
plugin="/usr/lib/ulogd/ulogd_output_PRINTPKT.so"
########################################
# ② 스택 정의 (순서 = 흐름)
stack=log1:NFLOG,base1:BASE,rate1:RATE,print1:PRINTPKT
########################################
# ③ 각 플러그인 인스턴스
[log1]
group=10 # ← iptables 그룹 번호와 일치
resolve_conntrack=0 # conntrack 조회 OFF로 성능↑
[rate1]
rate_interval=60 # 60초마다 집계해 1줄 출력
aggregate=orig.ip.src,dst.ip,dst.port,proto # 집계 키
print_total=1 # 합계 패킷·바이트 표시
upper_threshold=0 # 임계값 없이 계속 출력
[print1]
file="/var/log/ulogd/ssh_rate.log"
sync=1 # 즉시 flush
💡 집계 키 예시
L3 / L4 흐름: src.ip,dst.ip,dst.port,proto
- 인터페이스 기준:
in.ifname,out.ifname,proto
- 큰 그림만 필요하면
proto
정도만도 가능!
5️⃣ 서비스 재시작 & 확인
sudo systemctl restart ulogd.service
# 실시간 확인
tail -f /var/log/ulogd/ssh_rate.log
출력 예시
2025-07-11 14:32:00 SSH_IN 192.0.2.50 -> 10.0.0.2:22 proto=6 packets=125 bytes=12480
6️⃣ 운영·보안 튜닝 팁 🌟
- 🕒 interval 조정
- 폭주 구간만 세밀히 보려면
rate_interval=5
(초) 등으로 축소.
- 폭주 구간만 세밀히 보려면
- 📉 저장 공간 절약
logrotate
+compress
로 주기 압축, 6 개월 이상 S3/Glacier WORM 보관.
- 🛑 DDoS 탐지 룰 연계
- RATE 출력에
packets > X
조건 걸어 fail2ban 또는 Slack 알림으로 연결.
- RATE 출력에
- ✅ 무결성 체크
- 주단위
sha256sum
스크립트로 로그 파일 해시 → Wazuh/OSSEC에 전송.
- 주단위
7️⃣ Troubleshooting FAQ
증상 | 원인 & 해결 |
---|---|
로그가 전혀 안 나와요 | 🔍 iptables 그룹 번호 ↔ ulogd group 일치 확인, conntrack 커널 모듈 로드 여부 체크 |
“Invalid stack” 에러 | stack= 줄에 콤마·대소문자 오타 여부 확인 |
RATE가 안 먹고 모든 패킷 출력 | aggregate= 필드 키 누락 → 집계 불가. 최소 proto 라도 지정 |
ulogd CPU 사용률 급증 | ① rate_interval 너무 짧음 ② 집계 키 과다 → 필드 수 줄이기 |
- nflog-threshold는 배치 전송 용도, 필터링·요약은 ulogd2 RATE로!
iptables → NFLOG(group=10)
→ulogd2(NFLOG→BASE→RATE→PRINT)
흐름을 기억하세요.- 집계 키·interval을 상황에 맞게 튜닝하며, 보안 로그 무결성·보존 정책도 잊지 않으셔야 합니다.
🛠️ 이대로 설정하면 로그 폭주 걱정 없이 필요한 요약 정보만 깔끔하게 확보하실 수 있습니다.
🛰️ Syslog 중앙 수집, 탄탄하게 설계하는 10-Step 가이드
🎯 목표: iptables ➜ ulogd2(RATE)로 “요약된” 방화벽 로그를 유실 없이 중앙 SIEM/스토리지로 전달
🗂️ 전제: Linux 서버 다수, rsyslog 사용(표준), 필요 시 syslog-ng로 대체 가능
1️⃣ 아키텍처 한눈에 보기
iptables ─→ NFLOG ─→ ulogd2(RATE) ─→ /var/log/ssh_rate.log
│
▼ imfile
rsyslog(Agent) ──► TLS/RELP ──► rsyslog(Hub/Relay) ──►
│
┌──────────── ELK / Splunk / Graylog ────────────┐
│ S3 Object-Lock / Glacier (Archive) │
└────────────────────────────────────────────────┘
- Agent: 각 서버 → 파일 입력(
imfile
), RELP/TLS 전송 - Hub/Relay: 2 노드 Active/Standby, disk-queue 로컬 버퍼
- Backend: SIEM + 장기 보관 스토리지(WORM)
2️⃣ 전송 프로토콜 결정
프로토콜 | 특성 | 추천 상황 |
---|---|---|
RELP + TLS | TCP, ACK, 재전송·순서 보장, 암호화 | ✅ 기본값 (most secure & reliable) |
TCP + TLS | 전송 보장 O, 세션 기반 | 소규모·저부하 |
UDP 514 | 경량, 손실 가능 | 내부 망 + 저위험 로그 전용 |
🔒 TLS 필수: PCI-DSS·ISMS-P는 암호화 전송 요구.
3️⃣ Agent(rsyslog) 설정 예시
/etc/rsyslog.d/30-firewall-rate.conf
# ① 파일 입력
module(load="imfile")
input(type="imfile"
File="/var/log/ulogd/ssh_rate.log"
Tag="FW_RATE"
Severity="info"
Facility="local4")
# ② JSON 파싱(선택)
module(load="mmjsonparse")
template(name="JSONonly" type="string" string="%$!all-json%\n")
# ③ RELP + TLS 전송
module(load="omrelp")
action(type="omrelp"
Target="log-hub1.corp"
Port="2514"
tls="on"
tls.caCert="/etc/pki/rsyslog/ca.pem"
tls.myCert="/etc/pki/rsyslog/client.pem"
tls.myPrivKey="/etc/pki/rsyslog/client.key"
Template="JSONonly"
queue.type="LinkedList"
queue.size="50000"
queue.filename="fw_rate_dq"
action.resumeRetryCount="-1")
포인트
- imfile: 텍스트 파일을 실시간 tail → syslog 메시지화
- mmjsonparse: 키/값 필드를
!
트리로 분리 → SIEM 인제스트 편리 - disk-queue: 네트워크 장애 시 로컬에 버퍼링 후 재전송
4️⃣ Hub/Relay(rsyslog) 설정 예시
/etc/rsyslog.d/30-in-relp.conf
module(load="imrelp")
input(type="imrelp"
port="2514"
tls="on"
tls.caCert="/etc/pki/rsyslog/ca.pem"
tls.myCert="/etc/pki/rsyslog/hub.pem"
tls.myPrivKey="/etc/pki/rsyslog/hub.key"
ruleset="firewall_rate")
ruleset(name="firewall_rate") {
# GeoIP·Kafka·Elasticsearch 등 추가 필터 가능
action(type="omkafka"
topic="fw_rate"
broker="kafka01:9092,kafka02:9092")
}
- HA: Pacemaker 혹은 Keepalived VIP로 Active/Standby
- Disk-queue:
omkafka
에도queue.type="LinkedList"
권장
5️⃣ SIEM 처리·인덱스 전략
- Index 별도 분리:
firewall-rate-*
,firewall-threat-*
- 필드 매핑
src_ip
,dst_ip
➜ IP-typepackets
,bytes
➜ longtag
(“SSH_IN”) ➜ keyword
- ILM 정책 (예시)
- 0~90 일: hot (SSD)
- 90 일~1 년: warm (HDD)
- 1 년+: cold (S3-GLACIER, read-only)
6️⃣ 인증서·암호화 운영
항목 | 방법 |
---|---|
CA | 사내 PKI or Hashicorp Vault PKI backend |
배포 | Ansible copy module + notify: restart rsyslog |
Rotation | 1 년 주기 · 만료 30 일 전 자동 갱신 스크립트 |
7️⃣ 모니터링 & 알림
- rsyslog stats:
module(load="impstats" interval="60" severity="7")
→ Prometheus Node Exporter textfile - Queue Depth Alert:
if ($queuelen > 10000) then alert("Queue overflow!")
- Log-loss Check: Agent/Hub 양쪽에서
sha256sum
비교(시간당 샘플)
8️⃣ 로깅 표준화 체크리스트 ✅
# | 항목 | 설명 |
---|---|---|
1 | RFC 5424 포맷 | template="RSYSLOG_SyslogProtocol23" 적용 |
2 | Timezone | UTC 고정 or Asia/Seoul 통일, 로그서버 NTP 필수 |
3 | Tag 표준 | FW_RATE , FW_DENY , APP_WEB 등 10자 이내 |
4 | Severity 매핑 | info=allow-rate, warning=deny, error=threat |
5 | Data Masking | 개인정보(IP ⭢ 내부 NAT 후 공개) 필요 시 mmdb-lookup |
9️⃣ 보존·파기 정책
- 방화벽 요약: 6 개월 online + 2 년 archive
- 위협/차단: 1 년 online + 5 년 archive(금융·결제 기준)
- 자동 파기: S3 Lifecycle,
DeleteMarkerReplication
Off ⇢ ‘정책 기반 삭제’ 로그 남김
🔟 DR & 테스트 시나리오
- Agent → Hub 끊김: disk-queue → 재접속 후 catch-up 확인
- Hub 장애: VIP 인계,
imrelp
재연결 < 5 초 - SIEM 불가(대규모 인덱스 장애): Hub → secondary S3 sink fallback
- 연 1회 복구 훈련: Glacier → Restore → ELK re-index, 무결성 해시 비교
🎁 마무리 한 줄
“요약된 로그를 안전하게, 잃지 않고, 규정에 맞춰 오래 보관”
위 10-Step을 적용하면 ✔️전송 안전성 ✔️저장 무결성 ✔️확장성까지 모두 확보하실 수 있습니다!
📚 1. 방화벽 통신 로그 기록 관련 법규/규정
1.1 국내 법규 체계
🏛️ 개인정보보호법 체계
┌─────────────────────────────────────────────────────────┐
│ 개인정보보호법 │
│ 제29조 │
│ ↓ │
│ 개인정보의 안전성 확보조치 기준 │
│ 고시 제8조 │
│ ↓ │
│ ┌─────────────────┬─────────────────┐ │
│ │ 일반 기업 │ 특수 대상 │ │
│ │ 1년 이상 │ 2년 이상 │ │
│ └─────────────────┴─────────────────┘ │
└─────────────────────────────────────────────────────────┘
상세 요구사항
- 일반 기업: 개인정보처리시스템 접속기록 1년 이상 보관
- 특수 대상 (2년 이상 보관):
- 개인정보 5만명 이상 보유 기업
- 민감정보(사상, 신념, 건강 등) 처리
- 고유식별정보(주민번호, 여권번호 등) 처리
- 정보통신서비스 제공자(ISP, 포털 등)
필수 기록 항목
접속기록:
- 계정: 접속자 ID/계정명
- 시간: 접속일시 (년월일시분초)
- 위치: 접속지 IP 주소
- 행위: 수행업무 또는 열람 내용
- 대상: 접속한 정보주체 정보
🔐 ISMS-P 인증기준 상세
2.9.4 로그 및 접속기록 관리 세부 요구사항
1. 로그 수집 대상
- 서버 (OS 레벨)
- 네트워크 장비 (라우터, 스위치)
- 보안장비 (방화벽, IPS, WAF)
- 응용프로그램
- 데이터베이스
2. 최소 기록 항목
- 사용자 계정
- 이벤트 유형
- 날짜 및 시간
- 성공/실패 여부
- 이벤트 발생 출처
- 영향받은 데이터/시스템/자원
3. 보관 기간
- 법적 요구사항 준수 (최소 6개월)
- 침해사고 분석 가능 기간
- 업무 필요성 고려
4. 보호 조치
- 위변조 방지
- 무단 접근 차단
- 정기 백업
1.2 국제 표준 및 산업별 규정
💳 PCI-DSS 4.0 상세 요구사항
Requirement 10: Log and Monitor All Access
10.1: 로깅 메커니즘 구현
대상:
- CHD(카드소지자 데이터) 접근
- 모든 시스템 구성요소
- 네트워크 자원
10.2: 감사 추적 구현
필수 이벤트:
- 모든 개인별 사용자 접근
- 루트/관리자 권한 행위
- 감사 로그 접근
- 무효한 논리적 접근 시도
- 인증 메커니즘 사용
- 보안 이벤트 로깅 중지
10.3: 최소 기록 항목
- 사용자 ID
- 이벤트 유형
- 날짜 및 시간
- 성공/실패 표시
- 이벤트 발생지
- 영향받은 데이터/구성요소
10.7: 보관 기간
- 최소 1년 보관
- 최근 3개월은 즉시 분석 가능
- 나머지는 백업에서 복원 가능
🌐 ISO/IEC 27001:2022 Annex A 통제항목
A.8.15 Logging (로깅)
목적: 이벤트 기록 및 증거 생성
구현 지침:
- 로그 유형 결정
- 보호 수준 설정
- 보관 기간 정의
- 시간 동기화
- 정기적 검토
A.8.16 Monitoring activities (모니터링 활동)
- 비정상 행위 탐지
- 정보보호 사건 식별
- 규정 준수 확인
1.3 산업별 특수 요구사항 매트릭스
산업 | 근거법규 | 로그 유형 | 보관기간 | 특이사항 |
---|---|---|---|---|
금융 | 전자금융거래법 | 전자금융거래 | 5년 | 거래내역 포함 |
전자금융감독규정 | 접속/권한변경 | 1년 | 실시간 모니터링 | |
의료 | 의료법 | 전자의무기록 접근 | 3년 | 환자정보 보호 |
HIPAA(미국) | 시스템 활동 | 6년 | PHI 접근 추적 | |
통신 | 정보통신망법 | 통신사실 | 3개월 | 영장 없이 제공 금지 |
통신비밀보호법 | 가입자 접속기록 | 3개월 | 법원 허가 필요 | |
공공 | 전자정부법 | 행정정보시스템 | 2년 | 감사 대비 |
공공기록물관리법 | 주요 시스템 | 준영구 | 이관 대상 |
🛠️ 2. iptables + ulogd2 고급 구현 가이드
2.1 아키텍처 설계
┌─────────────────────────────────────────────────────────────┐
│ 방화벽 서버 (iptables) │
│ │
│ ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ INPUT │ │ FORWARD │ │ OUTPUT │ │
│ └──────┬──────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └───────────────────┴────────────────────┘ │
│ │ │
│ ┌──────▼──────┐ │
│ │ NFLOG │ │
│ └──────┬──────┘ │
│ │ │
│ ┌──────────────────────────▼────────────────────────────┐ │
│ │ ulogd2 │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ NFLOG │→ │ BASE │→ │ RATE │→ │ OUTPUT │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └───────────────────────────┬───────────────────────────┘ │
└─────────────────────────────┼─────────────────────────────┘
│
▼
┌─────────────────────┐
│ 로그 파일/SIEM │
└─────────────────────┘
2.2 상세 구현 단계
Step 1: 시스템 준비 및 최적화
#!/bin/bash
# system-optimization.sh
# 1. 커널 파라미터 최적화
cat >> /etc/sysctl.d/99-firewall-logging.conf << EOF
# Netfilter 연결 추적 최적화
net.netfilter.nf_conntrack_max = 1048576
net.netfilter.nf_conntrack_buckets = 262144
net.netfilter.nf_conntrack_tcp_timeout_established = 432000
net.netfilter.nf_conntrack_tcp_timeout_time_wait = 120
net.netfilter.nf_conntrack_tcp_timeout_close_wait = 60
net.netfilter.nf_conntrack_tcp_timeout_fin_wait = 120
# 로그 버퍼 크기 증가
net.core.rmem_default = 8388608
net.core.rmem_max = 134217728
net.core.wmem_default = 8388608
net.core.wmem_max = 134217728
# NFLOG 큐 설정
net.netfilter.nf_log_queue_threshold = 65536
EOF
sysctl -p /etc/sysctl.d/99-firewall-logging.conf
# 2. 필요 패키지 설치
apt-get update
apt-get install -y \
ulogd2 \
ulogd2-json \
ulogd2-pcap \
ulogd2-sqlite3 \
python3-pip \
jq \
rsyslog-relp \
rsyslog-gnutls
# 3. Python 분석 도구 설치
pip3 install pandas numpy matplotlib seaborn
Step 2: 고급 iptables 규칙 설정
#!/bin/bash
# iptables-advanced-rules.sh
# 체인 초기화 및 커스텀 체인 생성
iptables -N LOG_ACCEPT 2>/dev/null || iptables -F LOG_ACCEPT
iptables -N LOG_DROP 2>/dev/null || iptables -F LOG_DROP
iptables -N RATE_LIMIT 2>/dev/null || iptables -F RATE_LIMIT
# 1. 속도 제한 체인
iptables -A RATE_LIMIT -m recent --name portscan --update --seconds 60 --hitcount 10 -j LOG_DROP
iptables -A RATE_LIMIT -m recent --name portscan --set -j RETURN
# 2. 허용 트래픽 로깅 (요약)
iptables -A LOG_ACCEPT \
-j NFLOG --nflog-group 100 \
--nflog-prefix "ACCEPT" \
--nflog-threshold 10 \
--nflog-range 128
iptables -A LOG_ACCEPT -j ACCEPT
# 3. 거부 트래픽 로깅 (전체)
iptables -A LOG_DROP \
-j NFLOG --nflog-group 200 \
--nflog-prefix "DROP" \
--nflog-range 256
iptables -A LOG_DROP -j DROP
# 4. 서비스별 세분화 로깅
# HTTP/HTTPS (웹 서비스)
iptables -A INPUT -p tcp -m multiport --dports 80,443 \
-m conntrack --ctstate NEW \
-j NFLOG --nflog-group 110 \
--nflog-prefix "WEB_NEW" \
--nflog-threshold 20
# SSH (보안 중요)
iptables -A INPUT -p tcp --dport 22 \
-m conntrack --ctstate NEW \
-m recent --name ssh_attempt --set \
-j NFLOG --nflog-group 120 \
--nflog-prefix "SSH_NEW"
# Database (민감 데이터)
iptables -A INPUT -p tcp -m multiport --dports 3306,5432,1521 \
-j NFLOG --nflog-group 130 \
--nflog-prefix "DB_ACCESS"
# 5. 위협 탐지 특화 로깅
# 포트 스캔 탐지
iptables -A INPUT -p tcp --tcp-flags ALL NONE \
-j NFLOG --nflog-group 300 \
--nflog-prefix "SCAN_NULL"
iptables -A INPUT -p tcp --tcp-flags SYN,FIN SYN,FIN \
-j NFLOG --nflog-group 300 \
--nflog-prefix "SCAN_SYNFIN"
# 6. 메인 규칙 적용
iptables -A INPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT
iptables -A INPUT -i lo -j ACCEPT
iptables -A INPUT -p tcp --dport 22 -s 10.0.0.0/8 -j LOG_ACCEPT
iptables -A INPUT -p tcp -m multiport --dports 80,443 -j LOG_ACCEPT
iptables -A INPUT -j LOG_DROP
# 규칙 저장
iptables-save > /etc/iptables/rules.v4
Step 3: ulogd2 고급 설정
# /etc/ulogd.conf
[global]
logfile="/var/log/ulogd/ulogd.log"
loglevel=3
plugin_dir="/usr/lib/x86_64-linux-gnu/ulogd"
#######################################
# 플러그인 로드
#######################################
plugin="ulogd_inppkt_NFLOG.so"
plugin="ulogd_filter_BASE.so"
plugin="ulogd_filter_IFINDEX.so"
plugin="ulogd_filter_IP2STR.so"
plugin="ulogd_filter_IP2BIN.so"
plugin="ulogd_filter_PRINTPKT.so"
plugin="ulogd_filter_HWHDR.so"
plugin="ulogd_filter_RATE.so"
plugin="ulogd_filter_MARK.so"
plugin="ulogd_output_JSON.so"
plugin="ulogd_output_SQLITE3.so"
plugin="ulogd_output_SYSLOG.so"
#######################################
# 스택 정의 (다중 처리 파이프라인)
#######################################
# 1. 허용 트래픽 요약 (그룹 100-130)
stack=accept_log:NFLOG,base1:BASE,ifindex1:IFINDEX,ip2str1:IP2STR,rate1:RATE,json1:JSON
# 2. 거부 트래픽 전체 (그룹 200)
stack=drop_log:NFLOG,base2:BASE,ifindex2:IFINDEX,ip2str2:IP2STR,print2:PRINTPKT,sqlite1:SQLITE3
# 3. 위협 탐지 (그룹 300)
stack=threat_log:NFLOG,base3:BASE,ip2str3:IP2STR,mark1:MARK,syslog1:SYSLOG
#######################################
# NFLOG 입력 설정
#######################################
[accept_log]
group=100,110,120,130 # 여러 그룹 동시 처리
seq_local=1
seq_global=1
recv_buffer_size=8388608 # 8MB 버퍼
[drop_log]
group=200
seq_local=1
seq_global=1
[threat_log]
group=300
seq_local=1
seq_global=1
#######################################
# RATE 필터 설정 (핵심!)
#######################################
[rate1]
# 집계 간격
rate_interval=300 # 5분
# 집계 키 (5-tuple + 인터페이스 + prefix)
aggregate=orig.ip.saddr,orig.ip.daddr,orig.l4.sport,orig.l4.dport,orig.ip.protocol,oob.in,oob.prefix
# 통계 옵션
print_total=1 # 총 패킷/바이트
print_rate=1 # 초당 패킷/바이트
print_duration=1 # 지속 시간
# 임계값
upper_threshold=100000 # 10만 패킷 초과시 별도 알림
lower_threshold=10 # 10 패킷 미만은 무시
#######################################
# JSON 출력 (SIEM 연동용)
#######################################
[json1]
file="/var/log/ulogd/firewall_summary.json"
sync=1
timestamp=1
# 커스텀 필드 맵핑
field.event_type="firewall_summary"
field.src_ip=orig.ip.saddr_str
field.dst_ip=orig.ip.daddr_str
field.src_port=orig.l4.sport
field.dst_port=orig.l4.dport
field.protocol=orig.ip.protocol
field.interface_in=oob.in_str
field.interface_out=oob.out_str
field.service=oob.prefix
field.packets_total=rate.packets
field.bytes_total=rate.bytes
field.packets_per_sec=rate.pps
field.bytes_per_sec=rate.bps
field.duration_sec=rate.duration
field.start_time=rate.start_time
field.end_time=rate.end_time
#######################################
# SQLite3 출력 (분석용)
#######################################
[sqlite1]
db="/var/log/ulogd/firewall_drops.db"
table="drops"
create_table=1
buffer_size=100
# SQL 스키마
schema="CREATE TABLE IF NOT EXISTS drops (
timestamp INTEGER,
src_ip TEXT,
dst_ip TEXT,
src_port INTEGER,
dst_port INTEGER,
protocol INTEGER,
tcp_flags TEXT,
interface_in TEXT,
reason TEXT
)"
#######################################
# Syslog 출력 (위협 알림)
#######################################
[syslog1]
facility=LOG_LOCAL5
level=LOG_WARNING
prefix="THREAT: "
Step 4: 로그 분석 및 시각화 스크립트
#!/usr/bin/env python3
# firewall_log_analyzer.py
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import sqlite3
import sys
class FirewallLogAnalyzer:
def __init__(self, json_file, db_file):
self.json_file = json_file
self.db_file = db_file
def analyze_summary_logs(self):
"""JSON 요약 로그 분석"""
data = []
with open(self.json_file, 'r') as f:
for line in f:
try:
data.append(json.loads(line))
except:
continue
df = pd.DataFrame(data)
df['timestamp'] = pd.to_datetime(df['end_time'])
# 상위 통신 분석
top_talkers = df.groupby(['src_ip', 'dst_ip', 'dst_port'])['packets_total'].sum().sort_values(ascending=False).head(20)
# 시간대별 트래픽 분석
df['hour'] = df['timestamp'].dt.hour
hourly_traffic = df.groupby('hour')['bytes_total'].sum()
# 프로토콜별 분포
protocol_dist = df.groupby('protocol')['packets_total'].sum()
return {
'top_talkers': top_talkers,
'hourly_traffic': hourly_traffic,
'protocol_distribution': protocol_dist
}
def analyze_drop_logs(self):
"""SQLite 거부 로그 분석"""
conn = sqlite3.connect(self.db_file)
# 거부 원인별 통계
drop_reasons = pd.read_sql_query("""
SELECT reason, COUNT(*) as count
FROM drops
WHERE timestamp > strftime('%s', 'now', '-1 day')
GROUP BY reason
ORDER BY count DESC
""", conn)
# 공격자 IP 순위
attacker_ips = pd.read_sql_query("""
SELECT src_ip, COUNT(*) as attempts
FROM drops
WHERE timestamp > strftime('%s', 'now', '-1 day')
GROUP BY src_ip
ORDER BY attempts DESC
LIMIT 50
""", conn)
conn.close()
return {
'drop_reasons': drop_reasons,
'attacker_ips': attacker_ips
}
def generate_report(self):
"""종합 보고서 생성"""
summary_analysis = self.analyze_summary_logs()
drop_analysis = self.analyze_drop_logs()
# 시각화
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
# 1. 시간대별 트래픽
summary_analysis['hourly_traffic'].plot(kind='bar', ax=axes[0,0])
axes[0,0].set_title('Hourly Traffic Volume')
axes[0,0].set_xlabel('Hour')
axes[0,0].set_ylabel('Bytes')
# 2. 프로토콜 분포
protocol_names = {6: 'TCP', 17: 'UDP', 1: 'ICMP', 47: 'GRE'}
protocol_labels = [protocol_names.get(p, f'Proto_{p}') for p in summary_analysis['protocol_distribution'].index]
axes[0,1].pie(summary_analysis['protocol_distribution'].values, labels=protocol_labels, autopct='%1.1f%%')
axes[0,1].set_title('Protocol Distribution')
# 3. 거부 원인
drop_analysis['drop_reasons'].head(10).plot(kind='barh', x='reason', y='count', ax=axes[1,0])
axes[1,0].set_title('Top 10 Drop Reasons')
axes[1,0].set_xlabel('Count')
# 4. 공격자 IP (상위 10개)
drop_analysis['attacker_ips'].head(10).plot(kind='barh', x='src_ip', y='attempts', ax=axes[1,1])
axes[1,1].set_title('Top 10 Attacker IPs')
axes[1,1].set_xlabel('Drop Attempts')
plt.tight_layout()
plt.savefig('/var/log/ulogd/daily_report.png', dpi=300)
# 텍스트 보고서
with open('/var/log/ulogd/daily_report.txt', 'w') as f:
f.write(f"Firewall Log Analysis Report - {datetime.now().strftime('%Y-%m-%d')}\n")
f.write("="*60 + "\n\n")
f.write("Top 20 Network Flows:\n")
f.write("-"*40 + "\n")
for (src, dst, port), packets in summary_analysis['top_talkers'].items():
f.write(f"{src} -> {dst}:{port} : {packets:,} packets\n")
f.write("\n\nTop Attack Sources:\n")
f.write("-"*40 + "\n")
for _, row in drop_analysis['attacker_ips'].head(20).iterrows():
f.write(f"{row['src_ip']} : {row['attempts']:,} attempts\n")
if __name__ == "__main__":
analyzer = FirewallLogAnalyzer(
'/var/log/ulogd/firewall_summary.json',
'/var/log/ulogd/firewall_drops.db'
)
analyzer.generate_report()
2.3 로그 로테이션 및 보관 정책
# /etc/logrotate.d/ulogd2-advanced
/var/log/ulogd/*.json {
hourly
rotate 168 # 7일 * 24시간
compress
delaycompress
missingok
notifempty
create 640 root adm
postrotate
# 시간별 아카이브 생성
HOUR=$(date +%Y%m%d%H)
for file in /var/log/ulogd/*.json.1; do
[ -f "$file" ] && mv "$file" "${file%.1}_${HOUR}.json"
done
# S3 업로드 (장기 보관)
aws s3 sync /var/log/ulogd/ s3://company-firewall-logs/$(hostname)/ \
--exclude "*" \
--include "*_${HOUR}.json" \
--storage-class GLACIER_IR
systemctl reload ulogd2
endscript
}
/var/log/ulogd/*.db {
daily
rotate 30
compress
missingok
notifempty
create 640 root adm
prerotate
# SQLite 백업
sqlite3 /var/log/ulogd/firewall_drops.db ".backup /var/log/ulogd/firewall_drops_$(date +%Y%m%d).db"
endscript
}
🚀 3. Syslog 중앙 수집 시스템 구축
3.1 엔터프라이즈급 아키텍처
┌─────────────────────────────────────────────────────────────────┐
│ 전체 로그 수집 아키텍처 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Firewall-1 │ │ Firewall-2 │ │ Firewall-N │ │
│ │ (Agent) │ │ (Agent) │ │ (Agent) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ │ RELP/TLS │ │ │
│ └────────────────────┴────────────────────┘ │
│ │ │
│ ┌─────────▼─────────┐ │
│ │ Load Balancer │ │
│ │ (Keepalived) │ │
│ └─────────┬─────────┘ │
│ │ │
│ ┌────────────────────┴────────────────────┐ │
│ │ │ │
│ ┌──────▼───────┐ ┌──────────▼─────┐ │
│ │ Log Hub #1 │ │ Log Hub #2 │ │
│ │ (Active) │◄────── SYNC ────────► (Standby) │ │
│ └──────┬───────┘ └────────────────┘ │
│ │ │
│ ├─────────────┬──────────────┬──────────────┐ │
│ │ │ │ │ │
│ ┌──────▼──────┐ ┌───▼────┐ ┌──────▼──────┐ ┌────▼────┐ │
│ │ Kafka │ │ HDFS │ │Elasticsearch│ │ S3 │ │
│ │ Cluster │ │Cluster │ │ Cluster │ │ Glacier │ │
│ └─────────────┘ └────────┘ └─────────────┘ └─────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
3.2 Agent 설정 (rsyslog)
# /etc/rsyslog.d/50-firewall-advanced.conf
#######################################
# 모듈 로드
#######################################
module(load="imfile")
module(load="mmjsonparse")
module(load="mmnormalize")
module(load="omrelp")
module(load="imuxsock")
#######################################
# 전역 설정
#######################################
global(
defaultNetstreamDriver="gtls"
defaultNetstreamDriverCAFile="/etc/pki/rsyslog/ca.pem"
defaultNetstreamDriverCertFile="/etc/pki/rsyslog/client-cert.pem"
defaultNetstreamDriverKeyFile="/etc/pki/rsyslog/client-key.pem"
workDirectory="/var/spool/rsyslog"
maxMessageSize="256k"
)
#######################################
# 템플릿 정의
#######################################
# JSON 형식 (구조화된 데이터)
template(name="JsonFormat" type="list") {
constant(value="{")
constant(value="\"@timestamp\":\"")
property(name="timegenerated" dateFormat="rfc3339")
constant(value="\",\"hostname\":\"")
property(name="hostname")
constant(value="\",\"severity\":\"")
property(name="syslogseverity-text")
constant(value="\",\"facility\":\"")
property(name="syslogfacility-text")
constant(value="\",\"program\":\"")
property(name="programname")
constant(value="\",\"message\":\"")
property(name="msg" format="json")
constant(value="\",\"raw\":\"")
property(name="rawmsg" format="json")
constant(value="\"")
property(name="$!all-json" position.from="2")
constant(value="}\n")
}
# CEF 형식 (보안 이벤트)
template(name="CEFFormat" type="list") {
constant(value="CEF:0|CompanyName|Firewall|1.0|")
property(name="$!event_id")
constant(value="|")
property(name="$!event_name")
constant(value="|")
property(name="$!severity")
constant(value="|src=")
property(name="$!src_ip")
constant(value=" dst=")
property(name="$!dst_ip")
constant(value=" spt=")
property(name="$!src_port")
constant(value=" dpt=")
property(name="$!dst_port")
constant(value=" proto=")
property(name="$!protocol")
constant(value=" act=")
property(name="$!action")
constant(value=" cn1=")
property(name="$!packets")
constant(value=" cn1Label=Packets cn2=")
property(name="$!bytes")
constant(value=" cn2Label=Bytes\n")
}
#######################################
# 입력 설정
#######################################
# ulogd2 JSON 로그
input(type="imfile"
File="/var/log/ulogd/firewall_summary.json"
Tag="firewall_summary"
Severity="info"
Facility="local4"
addMetadata="on"
freshStartTail="on"
PersistStateInterval="100"
ruleset="processFirewallSummary"
)
# 거부 로그
input(type="imfile"
File="/var/log/ulogd/firewall_drops.log"
Tag="firewall_drops"
Severity="warning"
Facility="local4"
addMetadata="on"
ruleset="processFirewallDrops"
)
#######################################
# 규칙셋 정의
#######################################
ruleset(name="processFirewallSummary") {
# JSON 파싱
action(type="mmjsonparse"
cookie=""
container="!fw")
# 필드 정규화
set $!event_type = "firewall_flow";
set $!src_ip = $!fw!src_ip;
set $!dst_ip = $!fw!dst_ip;
set $!packets = $!fw!packets_total;
set $!bytes = $!fw!bytes_total;
# 임계값 체크
if ($!fw!packets_total > 100000) then {
set $!alert = "high_volume_flow";
set $!severity = "6"; # Warning
}
# RELP 전송 (고가용성)
action(type="omrelp"
target="log-hub-vip.company.com"
port="2514"
tls="on"
tls.authMode="x509/name"
tls.permittedPeer=["log-hub1.company.com", "log-hub2.company.com"]
template="JsonFormat"
queue.type="linkedList"
queue.size="100000"
queue.filename="fwsummary_q"
queue.maxDiskSpace="1g"
queue.saveOnShutdown="on"
queue.highWatermark="90000"
queue.lowWatermark="50000"
queue.discardMark="95000"
queue.discardSeverity="5" # Notice 이하 버림
queue.timeoutEnqueue="0"
queue.timeoutShutdown="10000"
queue.timeoutActionCompletion="1000"
action.resumeRetryCount="-1"
action.resumeInterval="30"
action.reportSuspension="on"
action.reportSuspensionContinuation="on"
)
}
ruleset(name="processFirewallDrops") {
# 정규화
action(type="mmnormalize"
rulebase="/etc/rsyslog.d/firewall_drops.rulebase"
useRawMsg="off")
# GeoIP 추가 (옵션)
# action(type="mmgeoip"
# key="$!src_ip"
# fields=["country", "city", "latitude", "longitude"])
# 위협 인텔리전스 체크
if ($!src_ip == "10.0.0.0/8" or $!src_ip == "172.16.0.0/12" or $!src_ip == "192.168.0.0/16") then {
set $!threat_level = "internal";
} else {
set $!threat_level = "external";
}
# CEF 형식으로 전송
action(type="omrelp"
target="siem.company.com"
port="1514"
template="CEFFormat"
queue.type="linkedList"
queue.filename="fwdrops_q"
# ... 동일한 큐 설정 ...
)
}
#######################################
# 통계 및 모니터링
#######################################
module(load="impstats"
interval="60"
severity="7"
format="cee"
ruleset="rsyslog_stats"
)
ruleset(name="rsyslog_stats") {
action(type="omfile"
file="/var/log/rsyslog/stats.log"
template="RSYSLOG_FileFormat"
)
}
3.3 Hub/Relay 서버 설정
# /etc/rsyslog.d/10-hub-server.conf
#######################################
# Hub 서버 전용 설정
#######################################
module(load="imrelp")
module(load="omkafka")
module(load="omelasticsearch")
module(load="omhdfs")
module(load="omprog")
#######################################
# RELP 리스너
#######################################
input(type="imrelp"
port="2514"
tls="on"
tls.authMode="x509/name"
tls.permittedPeer=["*.company.com"]
tls.cacert="/etc/pki/rsyslog/ca.pem"
tls.mycert="/etc/pki/rsyslog/hub-cert.pem"
tls.myprivkey="/etc/pki/rsyslog/hub-key.pem"
maxDataSize="256k"
keepAlive="on"
keepAlive.probes="5"
keepAlive.interval="60"
keepAlive.time="300"
ruleset="hub_processing"
)
#######################################
# Hub 처리 규칙셋
#######################################
ruleset(name="hub_processing") {
# 중복 제거
action(type="mmrm1stspace")
# 라우팅 결정
if ($!event_type == "firewall_flow") then {
call kafka_output
call elasticsearch_output
} else if ($!event_type == "firewall_drop") then {
call siem_output
call hdfs_archive
}
# 메트릭 수집
action(type="omprog"
binary="/usr/local/bin/log_metrics_collector.py"
template="JsonFormat"
)
}
#######################################
# Kafka 출력
#######################################
ruleset(name="kafka_output") {
action(type="omkafka"
broker=["kafka1:9092", "kafka2:9092", "kafka3:9092"]
topic="firewall-logs"
key="$!src_ip"
partitions.auto="on"
confParam=[
"compression.type=snappy",
"batch.size=65536",
"linger.ms=100",
"buffer.memory=134217728"
]
queue.type="linkedList"
queue.size="50000"
queue.filename="kafka_q"
)
}
#######################################
# Elasticsearch 출력
#######################################
ruleset(name="elasticsearch_output") {
action(type="omelasticsearch"
server=["es1.company.com", "es2.company.com", "es3.company.com"]
serverport="9200"
template="JsonFormat"
searchIndex="firewall-logs"
dynSearchIndex="on"
searchType="_doc"
bulkmode="on"
bulkid="$uuid"
queue.type="linkedList"
queue.size="20000"
queue.filename="es_q"
errorfile="/var/log/rsyslog/es_errors.log"
action.resumeretrycount="-1"
)
}
#######################################
# HDFS 아카이브
#######################################
ruleset(name="hdfs_archive") {
action(type="omhdfs"
host="hdfs-namenode.company.com"
port="50070"
file="/logs/firewall/%$year%/%$month%/%$day%/drops_%$hour%.log"
template="JsonFormat"
)
}
#######################################
# 고가용성 동기화
#######################################
module(load="omrelp")
# Standby 서버로 복제
action(type="omrelp"
target="log-hub2.company.com"
port="2515"
tls="on"
template="rawmsg"
queue.type="fixedArray"
queue.size="10000"
action.resumeRetryCount="3"
)
3.4 모니터링 및 알림 시스템
#!/usr/bin/env python3
# log_metrics_collector.py
import sys
import json
import time
import redis
from prometheus_client import Counter, Histogram, Gauge, push_to_gateway
from prometheus_client.registry import CollectorRegistry
class LogMetricsCollector:
def __init__(self):
self.registry = CollectorRegistry()
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Prometheus 메트릭 정의
self.log_count = Counter('firewall_logs_total',
'Total firewall logs processed',
['event_type', 'severity'],
registry=self.registry)
self.log_bytes = Counter('firewall_traffic_bytes_total',
'Total bytes logged',
['direction', 'protocol'],
registry=self.registry)
self.processing_time = Histogram('log_processing_seconds',
'Log processing time',
registry=self.registry)
self.queue_depth = Gauge('rsyslog_queue_depth',
'Current queue depth',
['queue_name'],
registry=self.registry)
def process_log(self, log_data):
"""로그 처리 및 메트릭 수집"""
start_time = time.time()
try:
# 로그 카운트
self.log_count.labels(
event_type=log_data.get('event_type', 'unknown'),
severity=log_data.get('severity', 'info')
).inc()
# 트래픽 통계
if 'bytes' in log_data:
self.log_bytes.labels(
direction='inbound' if log_data.get('dst_port') else 'outbound',
protocol=log_data.get('protocol', 'unknown')
).inc(log_data['bytes'])
# 이상 탐지
self.detect_anomalies(log_data)
except Exception as e:
print(f"Error processing log: {e}", file=sys.stderr)
finally:
self.processing_time.observe(time.time() - start_time)
def detect_anomalies(self, log_data):
"""이상 행위 탐지"""
# DDoS 탐지
if log_data.get('packets_per_sec', 0) > 10000:
self.alert_ddos(log_data)
# 포트 스캔 탐지
src_ip = log_data.get('src_ip')
if src_ip:
key = f"portscan:{src_ip}"
self.redis_client.hincrby(key, log_data.get('dst_port', 0), 1)
self.redis_client.expire(key, 300) # 5분 TTL
if self.redis_client.hlen(key) > 20: # 20개 이상 포트 접근
self.alert_portscan(src_ip)
def alert_ddos(self, log_data):
"""DDoS 알림"""
alert = {
'alert_type': 'ddos_suspected',
'timestamp': time.time(),
'source_ip': log_data.get('src_ip'),
'packets_per_sec': log_data.get('packets_per_sec'),
'severity': 'critical'
}
self.redis_client.lpush('alerts:ddos', json.dumps(alert))
self.redis_client.expire('alerts:ddos', 3600)
def alert_portscan(self, src_ip):
"""포트스캔 알림"""
alert = {
'alert_type': 'portscan_detected',
'timestamp': time.time(),
'source_ip': src_ip,
'severity': 'high'
}
self.redis_client.lpush('alerts:portscan', json.dumps(alert))
def push_metrics(self):
"""Prometheus Gateway로 메트릭 전송"""
push_to_gateway('prometheus-gw:9091',
job='firewall_logs',
registry=self.registry)
def run(self):
"""메인 실행 루프"""
for line in sys.stdin:
try:
log_data = json.loads(line.strip())
self.process_log(log_data)
# 100개마다 메트릭 푸시
if self.log_count._value.sum() % 100 == 0:
self.push_metrics()
except json.JSONDecodeError:
continue
except KeyboardInterrupt:
break
# 최종 메트릭 푸시
self.push_metrics()
if __name__ == "__main__":
collector = LogMetricsCollector()
collector.run()
3.5 백업 및 복구 전략
#!/bin/bash
# firewall_log_backup.sh
#######################################
# 설정
#######################################
BACKUP_DIR="/backup/firewall_logs"
S3_BUCKET="s3://company-firewall-archive"
GLACIER_BUCKET="s3://company-firewall-glacier"
RETENTION_DAYS=180
COMPRESSION="zstd" # 최고 압축률
#######################################
# 함수 정의
#######################################
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a /var/log/firewall_backup.log
}
create_backup_manifest() {
local backup_id=$1
cat > "${BACKUP_DIR}/${backup_id}/manifest.json" << EOF
{
"backup_id": "${backup_id}",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"hostname": "$(hostname)",
"files": [
$(find ${BACKUP_DIR}/${backup_id} -type f -name "*.${COMPRESSION}" -exec sha256sum {} \; | \
awk '{print " {\"file\": \"" $2 "\", \"sha256\": \"" $1 "\"}"}' | \
paste -sd "," -)
],
"retention": {
"online_days": ${RETENTION_DAYS},
"archive_class": "GLACIER",
"legal_hold": false
}
}
EOF
}
#######################################
# 메인 백업 프로세스
#######################################
main() {
local backup_id="firewall_logs_$(date +%Y%m%d_%H%M%S)"
local backup_path="${BACKUP_DIR}/${backup_id}"
log "Starting backup: ${backup_id}"
# 백업 디렉토리 생성
mkdir -p "${backup_path}"
# 1. JSON 로그 백업 (요약 데이터)
log "Backing up JSON summary logs..."
find /var/log/ulogd -name "*.json" -mtime -1 | while read file; do
${COMPRESSION} -19 -T0 < "$file" > "${backup_path}/$(basename $file).${COMPRESSION}"
done
# 2. SQLite DB 백업 (거부 로그)
log "Backing up SQLite database..."
sqlite3 /var/log/ulogd/firewall_drops.db ".backup ${backup_path}/firewall_drops.db"
${COMPRESSION} -19 "${backup_path}/firewall_drops.db"
# 3. 설정 파일 백업
log "Backing up configuration..."
tar cf - /etc/ulogd.conf /etc/rsyslog.d/ /etc/iptables/ | \
${COMPRESSION} -19 > "${backup_path}/configs.tar.${COMPRESSION}"
# 4. 매니페스트 생성
create_backup_manifest "${backup_id}"
# 5. S3 업로드 (Standard-IA)
log "Uploading to S3..."
aws s3 sync "${backup_path}" "${S3_BUCKET}/${backup_id}/" \
--storage-class STANDARD_IA \
--metadata "backup_id=${backup_id},retention_days=${RETENTION_DAYS}"
# 6. 오래된 백업 Glacier로 이동
log "Moving old backups to Glacier..."
aws s3 ls "${S3_BUCKET}/" | awk '{print $2}' | while read old_backup; do
backup_date=$(echo $old_backup | grep -oP '\d{8}')
if [[ $(date -d "${backup_date} + ${RETENTION_DAYS} days" +%s) -lt $(date +%s) ]]; then
aws s3 mv "${S3_BUCKET}/${old_backup}" "${GLACIER_BUCKET}/${old_backup}" \
--recursive \
--storage-class GLACIER
log "Moved ${old_backup} to Glacier"
fi
done
# 7. 로컬 정리
find "${BACKUP_DIR}" -type d -name "firewall_logs_*" -mtime +7 -exec rm -rf {} \;
log "Backup completed: ${backup_id}"
}
# 실행
main
📊 4. 성능 최적화 및 트러블슈팅
4.1 성능 튜닝 체크리스트
영역 | 최적화 항목 | 권장값 | 확인 명령 |
---|---|---|---|
커널 | conntrack 테이블 크기 | 1M+ | sysctl net.netfilter.nf_conntrack_max |
네트워크 버퍼 | 8MB+ | sysctl net.core.rmem_default |
|
NFLOG 큐 크기 | 64K+ | cat /proc/net/netfilter/nfnetlink_queue |
|
ulogd2 | 스택당 스레드 | CPU 코어수 | ps -eLf | grep ulogd |
RATE interval | 60-300초 | 설정 파일 확인 | |
버퍼 크기 | 8MB+ | recv_buffer_size 설정 |
|
rsyslog | 큐 크기 | 50K-100K | rsyslog-stats |
큐 타입 | LinkedList | Disk 지원 큐 사용 | |
배치 크기 | 100-1000 | action 별 설정 | |
스토리지 | 파일시스템 | XFS/ext4 | df -T |
I/O 스케줄러 | deadline/noop | /sys/block/*/queue/scheduler |
|
압축 | zstd -19 | CPU vs 용량 트레이드오프 |
4.2 트러블슈팅 가이드
문제: 로그 유실
# 진단 스크립트
#!/bin/bash
# diagnose_log_loss.sh
echo "=== NFLOG 통계 ==="
cat /proc/net/netfilter/nfnetlink_log
echo -e "\n=== ulogd2 상태 ==="
systemctl status ulogd2
tail -20 /var/log/ulogd/ulogd.log
echo -e "\n=== rsyslog 큐 상태 ==="
rsyslog-stats | grep -E "queue|discarded"
echo -e "\n=== 디스크 사용량 ==="
df -h /var/log
echo -e "\n=== 최근 에러 ==="
journalctl -u ulogd2 -u rsyslog -p err -since "1 hour ago"
문제: 높은 CPU 사용률
# CPU 프로파일링
perf record -g -p $(pgrep ulogd2) sleep 30
perf report
# 병목 지점 확인
strace -c -p $(pgrep ulogd2) -f
문제: 메모리 누수
# 메모리 모니터링
#!/bin/bash
while true; do
ps aux | grep -E "ulogd|rsyslog" | grep -v grep
echo "---"
sleep 60
done | tee memory_monitor.log
# Valgrind 분석 (개발 환경)
valgrind --leak-check=full --show-leak-kinds=all ulogd -v
🔒 5. 보안 및 컴플라이언스
5.1 로그 무결성 보장
#!/bin/bash
# log_integrity_check.sh
#######################################
# 해시 체인 구현
#######################################
create_hash_chain() {
local log_file=$1
local hash_file="${log_file}.hash"
local prev_hash="0000000000000000000000000000000000000000000000000000000000000000"
> "$hash_file"
while IFS= read -r line; do
# 현재 줄 + 이전 해시
current_data="${line}${prev_hash}"
current_hash=$(echo -n "$current_data" | sha256sum | cut -d' ' -f1)
# 해시 저장
echo "${current_hash}" >> "$hash_file"
prev_hash=$current_hash
done < "$log_file"
# 최종 해시를 블록체인 또는 TSA에 제출
submit_to_timestamp_authority "$hash_file"
}
submit_to_timestamp_authority() {
local hash_file=$1
local final_hash=$(tail -1 "$hash_file")
# RFC 3161 타임스탬프
openssl ts -query -data "$hash_file" -no_nonce \
-sha256 -out "${hash_file}.tsq"
curl -H "Content-Type: application/timestamp-query" \
--data-binary @"${hash_file}.tsq" \
https://freetsa.org/tsr > "${hash_file}.tsr"
}
5.2 접근 통제 및 감사
# /etc/rsyslog.d/99-audit.conf
# 로그 접근 감사
$FileCreateMode 0640
$FileOwner root
$FileGroup adm
# 감사 로그 (별도 파일)
if $programname == 'rsyslog-audit' then {
action(type="omfile"
file="/var/log/audit/rsyslog_audit.log"
template="RSYSLOG_FileFormat"
fileCreateMode="0600"
fileOwner="root"
fileGroup="root"
)
stop
}
# 설정 변경 알림
if $msg contains 'rsyslog' and $msg contains 'reload' then {
action(type="ommail"
server="smtp.company.com"
port="25"
mailfrom="rsyslog@company.com"
mailto="security@company.com"
subject="Rsyslog Configuration Changed"
body="Rsyslog configuration was reloaded on $hostname"
)
}
📋 6. 종합 체크리스트
6.1 구축 단계별 체크리스트
- 계획 단계
- 법규 요구사항 파악 완료
- 로그 보관 기간 정책 수립
- 용량 산정 및 인프라 준비
- 보안 요구사항 정의
- 구현 단계
- iptables NFLOG 규칙 설정
- ulogd2 설치 및 설정
- RATE 필터 최적화
- rsyslog 에이전트 설정
- 중앙 수집 서버 구축
- TLS 인증서 배포
- 검증 단계
- 로그 수집 테스트
- 성능 부하 테스트
- 장애 복구 테스트
- 보안 취약점 점검
- 운영 단계
- 모니터링 대시보드 구축
- 알림 규칙 설정
- 백업 자동화 구현
- 정기 점검 일정 수립
6.2 일일 운영 체크리스트
#!/bin/bash
# daily_check.sh
echo "=== 방화벽 로그 일일 점검 ==="
echo "점검 시작: $(date)"
# 1. 서비스 상태
echo -e "\n[1] 서비스 상태 확인"
for service in ulogd2 rsyslog; do
status=$(systemctl is-active $service)
echo "$service: $status"
done
# 2. 로그 수집률
echo -e "\n[2] 로그 수집 통계"
yesterday=$(date -d "yesterday" +%Y%m%d)
log_count=$(grep -c "^{" /var/log/ulogd/firewall_summary.json)
echo "수집된 로그: $log_count 건"
# 3. 디스크 사용량
echo -e "\n[3] 디스크 사용량"
df -h /var/log | tail -1
# 4. 에러 확인
echo -e "\n[4] 최근 24시간 에러"
journalctl --since "24 hours ago" -p err | wc -l
# 5. 성능 지표
echo -e "\n[5] 성능 지표"
sar -u 1 1 | tail -1
echo -e "\n점검 완료: $(date)"
이 가이드는 방화벽 로그 관리의 모든 측면을 다루었습니다.
- 법규 준수: 국내외 주요 규정 요구사항 충족
- 기술 구현: iptables + ulogd2 + rsyslog 완벽 통합
- 확장성: 엔터프라이즈급 중앙 수집 아키텍처
- 보안성: 암호화, 무결성, 접근통제 구현
- 운영성: 모니터링, 백업, 복구 자동화
이 설정을 통해 "6개월 이상"의 단순한 보관 요구사항을 넘어, 안전하고 효율적이며 분석 가능한 방화벽 로그 관리 시스템을 구축할 수 있습니다.
댓글