본문 바로가기
정보보호 (Security)

방화벽(iptables) 요약 로그 장기 보관 체계, 보안과 규제를 모두 만족

by 날으는물고기 2025. 7. 13.

방화벽(iptables) 요약 로그 장기 보관 체계, 보안과 규제를 모두 만족

728x90

  • 🔥 “iptables + ulogd2 RATE로 트래픽 로그 폭주 잡기!”
  • 🚦 “NFLOG threshold & RATE 필터로 ‘1줄 요약’ 방화벽 로그 완성하기”
  • 📉 “10배 줄이는 iptables 로그: nflog-threshold + RATE 실전 가이드”
  • 🛠️ “ulogd2 RATE 집계로 SSH·웹 트래픽 로그 다이어트 성공기”
  • “방화벽 로그, 이젠 요약하자! iptables × ulogd2 RATE 세팅 비법”

🔥 방화벽 로그 ‘6 개월 이상’ 보관, 왜 & 무엇을 어떻게?

1️⃣ 법·규정별 “최소 보관 기간” 한눈에 보기

구분 적용 범위 요구/권고 보관 기간
개인정보 보호법 +
「개인정보의 안전성 확보조치 기준」제 8조
개인정보처리시스템 접속기록 ① 일반: 1 년↑
② 5만 명↑·민감/고유식별정보·기간통신사업자: 2 년↑
ISMS-P 인증기준 2.9.4 서버·네트워크·보안장비 로그 “법령 이상” + 관행상 6 개월↑ 확보 권고
PCI-DSS 4.0 Req 10.7 카드결제 관련 시스템 1 년↑(90 일 즉시 조회 가능)
ISO 27001 (Annex A 8.15) 전 산업권 일반 통상 12 개월 확보 권고
NIST SP 800-92 연방기관·모범지침 “사고 분석·조사에 충분한 기간(≥ 6 개월 권장)”

💡 정리

  • 공공·민간 대부분 ‘6 개월 이상’ 최소 기준으로 삼고, 개인정보·결제·금융 등 특수 분야는 1 ~ 2 년 이상을 요구합니다.
  • 방화벽 로그라 하더라도 개인정보 처리·결제 트래픽을 다룬다면 상위 규범(PCI-DSS·PIPA 등)에 맞춰 기간을 늘려야 합니다.

2️⃣ “어떤” 방화벽 로그를 남길까? (필수 필드 체크리스트)

카테고리 필드 예시 설명·활용 포인트
트래픽 정보 ▶ 발신 / 수신 IP, Port, Protocol
▶ Interface, Zone
위협 IP 추적, 포렌식에 핵심
행위(ACTION) allow / deny / drop / reset 등 정책 효과성·위협 차단 근거
정책 메타 ▶ Rule ID·이름
▶ 정책 그룹
‘어떤 룰이 언제 트리거?’를 역추적
세션·NAT ▶ Session ID·Duration
▶ 변환 前/後 IP·Port
내부-외부 매핑, 응답 지연 분석
패킷 통계 Bytes, Packets, TCP Flags DDoS·대용량 전송 탐지
위협·시그니처 IDS/IPS Hit, Malware Name 상관 분석·알림 트리거
관리자 활동 ▶ 로그인·로그아웃
▶ 정책 추가/변경/삭제
▶ Device OS Upgrade
변경 이력과 사고 연관 여부 확인

3️⃣ “어떻게” 안전하게 보존할까? 실무 가이드

  1. 🛡️ 중앙 집중 로그 서버
    • Syslog-TLS, Kafka-Beat, 또는 Cloud-based SIEM(ELK + S3 Object-Lock) 등으로 단방향 전송.
  2. 🔒 무결성 보장
    • 블록체인 해시, WORM 스토리지, 혹은 파일 단위 SHA-256 + RFC 3161 Time-stamp.
  3. 🗄️ 이중 저장 & 암호화
    • 온프레미스(RAID NAS) + 오프사이트(S3 Glacier, 타 DC) 병행.
  4. 👀 정책적 접근통제
    • RBAC로 ‘읽기 전용’ 계정 분리, 관리자 접근 시 MFA 요구.
  5. 🔄 수명주기 관리
    • ① 90 일: 실시간 SIEM 인덱스
      ② 6 개월: 온프레미스 저비용 스토리지
      ③ 2 년+: 압축 + Cold archive / 삭제 정책 자동 실행.
  6. 📝 정책·절차 문서화 (ISMS-P 2.9.4 요구)
    • 로그 유형·보존 기간·보존 위치·점검 주기·삭제 방법 명문화 → 내부 감사·외부 인증 시 제출.
  7. 🛠️ 정기 점검 & 알림
    • Cron + CLI(logrotate --state) 또는 Elastic ILM 정책으로 저장 용량·무결성 주기 점검 → Slack·Teams 알림.

4️⃣ 내부 직원에게 제시할 “점검 포인트” 체크리스트

# 확인 항목 주기
1 방화벽 장비의 시간 동기화(NTP) 상태 매일
2 정책 변경 로그 자동 수집 여부 실시간
3 허용 트래픽 중 비정상 Port/IP 탐지 룰 존재 매주
4 로그 전송 실패 알림(UDP Drop, Queue Full) 24 h
5 보존 스토리지 무결성(HASH 재검증) 분기
6 만료 로그 자동 파기 로테이션 정책 월간
7 DR 센터/Cloud 백업 복구 테스트 반기

5️⃣ “6 개월”은 출발선일 뿐! 🚀

  • 법적으로: 개인정보·결제 등은 1 ~ 2 년이 기본, 금융권·감사 목적은 5 년까지 요구될 수 있습니다.
  • 실무적으로: 사고 탐지·포렌식을 생각하면 6 ~ 12 개월 온-디스크 + 장기 보관을 권장합니다.
  • 보안 관점: 로그는 “있기만 하면” 끝이 아닙니다. 무결성·가용성·추적성 확보와 주기적 분석까지 포함되어야 진짜 컴플라이언스를 만족합니다.

📝 이 가이드를 기반으로 사내 정보보호 관리계획(로그 관리 절차, DR, 백업 정책)을 업데이트하시면 ISMS-P·PCI-DSS 심사 대응과 침해사고 포렌식 준비에 큰 도움이 됩니다!

✨ 한마디 정리

  • “방화벽 로그 = 네트워크 관점의 ‘교통기록’”
  • “서비스(SSH/Web/DB) 애플리케이션 로그 = 세션 내부의 ‘행위기록’”
    → 두 레이어는 역할이 다르므로 중복이라기보다 상호보완이 핵심입니다. 다만 모든 허용(allow) 트래픽을 방화벽에서 100% 남기느냐는 “규제‧위험도‧저장·분석 자원”에 따라 Tuning 가능해요.

1️⃣ 왜 둘 다 필요할까?

  1. 규제 시각
    • PCI-DSS 4.0 10번 항목은 “모든 시스템 컴포넌트 접근을 로그로 남겨라”고 명시해 네트워크·시스템·애플리케이션 전 계층을 요구합니다.
    • NIST SP 800-92도 “중요한 데이터만 선별해야 하지만, 레이어마다 남기는 값이 달라 서로 연결 분석이 가능해야 한다”고 권고합니다.
  2. 포렌식 시각
    • 방화벽: ‘누가·언제·어디서’ 접근했는지 (IP/포트/액션)
    • 앱 로그: ‘무엇을 했는지’ (쿼리·URL·계정·오류)
      → 둘을 조합해야 완전한 타임라인이 만들어집니다.

2️⃣ 방화벽 로그, 어디까지 남길까?

구분 필수 선택(튜닝 가능)
🚫 Deny/Drop/Reset 항상 기록 ― 규제·위협 분석
⚠️ Threat/IPS/Anti-Virus 항상 기록 ― 탐지·대응 근거
🛠 Config & Admin Event 항상 기록 ― 변경 추적
✅ Allow Traffic (인터넷↔DMZ, 외부 API) 대부분 기록 (요약 필드라도) 상세 패킷/바이트는 샘플링 가능
✅ Allow Traffic (내부 세그먼트) 요약 세션 로그만 or Sampling 1 % 완전 제외 가능 (아래 조건 충족 시)

제외 조건 가이드

  1. 동일 정보가 애플리케이션/시스템 로그에 이미 포함.
  2. SIEM에서 세션·계정 매핑 규칙이 있어 교차 조회 가능.
  3. 제외해도 규제(PCI-DSS·ISMS-P 등) 항목 누락 없이 증빙 가능.
  4. 제외 정책·사유를 문서화(로그 관리 절차서)하고 주기적 재검토.

3️⃣ 실무 튜닝 예시

플랫폼 최소 설정 예 ‘허용 트래픽 축소’ 옵션
Palo Alto log-setting에서 Deny, Threat, Config 프로파일 Always Enable traffic-log-type = start end로 세션 단위 기록, discard-non-syn으로 불필요 TCP ACK 드롭
FortiGate set logtraffic all + set local-in-logging enable set utm enable, 내부 VLAN 간 set logtraffic utmdisable
iptables + ulogd2 -j NFLOG --nflog-group 32 (DROP/REJECT 체인) nflog-threshold 10 : 10개 패킷당 1회 요약
AWS WAF v2 sampledRequestsEnabled = true (위협 IP만) requestSamplingRate = 0.01 (1 %)

4️⃣ 저장소·분석 설계 팁

  1. 계층 태그 맞춤: source_category=firewall, source_category=app 등 로그 라우팅 시 구분.
  2. 공통 키 매핑
    • 방화벽 src_ip ↔ 앱로그 client_ip
    • 방화벽 session_id ↔ 웹로그 x-forwarded-for + request_id
  3. 저장수명 차등
    • 방화벽 요약 세션: 90일 인덱스 → 1년 콜드 스토리지
    • Threat·Deny: 1년 인덱스 + 3년 장기 보관
  4. Dashboards: “Deny Top 10”, “알 수 없는 프로토콜 허용 세션” 등 네트워크 가시성 보완.

5️⃣ 내부 점검 체크리스트 ✅

  1. 로그 수집 누락율 < 1 %? (전송 실패 알람)
  2. 행위 추적성: IP → 계정 → 요청 URL까지 단일 쿼리로 연결되는가?
  3. 튜닝 예외 목록: 변경 시 CAB 승인·문서화 완료?
  4. 규제 대비: 매년 ISMS-P·PCI 내부 감사에서 증적 샘플 추출 테스트.
300x250
  • 방화벽 로그는 “외부 관문” 관점, 애플리케이션 로그는 “내부 행위” 관점.
  • 중복을 100% 제거보다는 안전·규제 범위를 우선 고려해 허용 트래픽 세부정보를 단계적으로 줄이는 접근이 현실적.
  • 튜닝 시 반드시 문서화·교차분석 가능성·규제 증빙을 확보하세요. 이렇게 하면 스토리지·분석 비용을 아끼면서도 포렌식·컴플라이언스 두 마리 토끼를 잡을 수 있습니다!

🚦 iptables + ulogd2 RATE로 트래픽 로그 “진짜” 요약하기

“nflog-threshold=10”만으로는 패킷을 묶어 보내 줄 뿐, 줄여 주지는 않습니다!
→ ulogd2의 RATE 필터와 함께 쓰면 N초마다/동일 키마다 집계된 한 줄 로그를 얻을 수 있어요.

1️⃣ 개념 먼저 이해하기

  1. 🐧 NFLOG 커널 큐
    • --nflog-threshold 10 → _10개 패킷_을 한꺼번에 Netlink로 전달(배치 전송).
    • 모든 패킷이 여전히 userspace(ulogd2)로 들어옵니다.
  2. 📊 ulogd2 RATE 필터
    • RATE 플러그인이 동일 필드 집합(예: 5 tuple)당 N 초 동안 패킷·바이트를 누적 후 1줄 요약만 출력.
    • 결과적으로 로그 양 급감 + 가독성 향상.

2️⃣ 필요 패키지 설치

# Debian/Ubuntu
sudo apt-get install -y ulogd2 ulogd2-nflog ulogd2-print
# RHEL/CentOS(8↑) – EPEL 필요
sudo dnf install -y ulogd ulogd-nflog ulogd-print

🔐 보안 팁: ulogd2는 root로 띄우되, 출력 파일 권한을 root:adm 640처럼 제한하세요.

3️⃣ iptables 규칙 예시 (NFLOG)

# SSH 트래픽만 요약하고 싶을 때
iptables -A INPUT -p tcp --dport 22 \
         -j NFLOG --nflog-group 10 \
         --nflog-prefix "SSH_IN" \
         --nflog-threshold 10
  • --nflog-group 10 → ulogd2가 구독할 그룹 번호
  • --nflog-prefix → 요약 로그 앞에 표시될 식별자

4️⃣ /etc/ulogd.conf 핵심 설정

[global]
logfile="/var/log/ulogd/ulogd.log"
loglevel=3

########################################
# ① 플러그인 로드
plugin="/usr/lib/ulogd/ulogd_inppkt_NFLOG.so"
plugin="/usr/lib/ulogd/ulogd_filter_BASE.so"
plugin="/usr/lib/ulogd/ulogd_filter_RATE.so"
plugin="/usr/lib/ulogd/ulogd_output_PRINTPKT.so"

########################################
# ② 스택 정의 (순서 = 흐름)
stack=log1:NFLOG,base1:BASE,rate1:RATE,print1:PRINTPKT

########################################
# ③ 각 플러그인 인스턴스
[log1]
group=10               # ← iptables 그룹 번호와 일치
resolve_conntrack=0    # conntrack 조회 OFF로 성능↑

[rate1]
rate_interval=60       # 60초마다 집계해 1줄 출력
aggregate=orig.ip.src,dst.ip,dst.port,proto   # 집계 키
print_total=1          # 합계 패킷·바이트 표시
upper_threshold=0      # 임계값 없이 계속 출력

[print1]
file="/var/log/ulogd/ssh_rate.log"
sync=1                 # 즉시 flush

💡 집계 키 예시

L3 / L4 흐름: src.ip,dst.ip,dst.port,proto

  • 인터페이스 기준: in.ifname,out.ifname,proto
  • 큰 그림만 필요하면 proto 정도만도 가능!

5️⃣ 서비스 재시작 & 확인

sudo systemctl restart ulogd.service
# 실시간 확인
tail -f /var/log/ulogd/ssh_rate.log

출력 예시

2025-07-11 14:32:00 SSH_IN 192.0.2.50 -> 10.0.0.2:22 proto=6 packets=125 bytes=12480

6️⃣ 운영·보안 튜닝 팁 🌟

  1. 🕒 interval 조정
    • 폭주 구간만 세밀히 보려면 rate_interval=5 (초) 등으로 축소.
  2. 📉 저장 공간 절약
    • logrotate + compress로 주기 압축, 6 개월 이상 S3/Glacier WORM 보관.
  3. 🛑 DDoS 탐지 룰 연계
    • RATE 출력에 packets > X 조건 걸어 fail2ban 또는 Slack 알림으로 연결.
  4. 무결성 체크
    • 주단위 sha256sum 스크립트로 로그 파일 해시 → Wazuh/OSSEC에 전송.

7️⃣ Troubleshooting FAQ

증상 원인 & 해결
로그가 전혀 안 나와요 🔍 iptables 그룹 번호 ↔ ulogd group 일치 확인, conntrack 커널 모듈 로드 여부 체크
“Invalid stack” 에러 stack= 줄에 콤마·대소문자 오타 여부 확인
RATE가 안 먹고 모든 패킷 출력 aggregate= 필드 키 누락 → 집계 불가. 최소 proto라도 지정
ulogd CPU 사용률 급증 rate_interval 너무 짧음 ② 집계 키 과다 → 필드 수 줄이기
  • nflog-threshold배치 전송 용도, 필터링·요약ulogd2 RATE로!
  • iptables → NFLOG(group=10)ulogd2(NFLOG→BASE→RATE→PRINT) 흐름을 기억하세요.
  • 집계 키·interval을 상황에 맞게 튜닝하며, 보안 로그 무결성·보존 정책도 잊지 않으셔야 합니다.

🛠️ 이대로 설정하면 로그 폭주 걱정 없이 필요한 요약 정보만 깔끔하게 확보하실 수 있습니다.

🛰️ Syslog 중앙 수집, 탄탄하게 설계하는 10-Step 가이드

🎯 목표: iptables ➜ ulogd2(RATE)로 “요약된” 방화벽 로그를 유실 없이 중앙 SIEM/스토리지로 전달
🗂️ 전제: Linux 서버 다수, rsyslog 사용(표준), 필요 시 syslog-ng로 대체 가능

1️⃣ 아키텍처 한눈에 보기

iptables ─→ NFLOG ─→ ulogd2(RATE) ─→ /var/log/ssh_rate.log
                               │
                               ▼ imfile
                      rsyslog(Agent) ──► TLS/RELP ──► rsyslog(Hub/Relay) ──►
                                          │
                    ┌──────────── ELK / Splunk / Graylog ────────────┐
                    │         S3 Object-Lock / Glacier (Archive)     │
                    └────────────────────────────────────────────────┘
  • Agent: 각 서버 → 파일 입력(imfile), RELP/TLS 전송
  • Hub/Relay: 2 노드 Active/Standby, disk-queue 로컬 버퍼
  • Backend: SIEM + 장기 보관 스토리지(WORM)

2️⃣ 전송 프로토콜 결정

프로토콜 특성 추천 상황
RELP + TLS TCP, ACK, 재전송·순서 보장, 암호화 ✅ 기본값 (most secure & reliable)
TCP + TLS 전송 보장 O, 세션 기반 소규모·저부하
UDP 514 경량, 손실 가능 내부 망 + 저위험 로그 전용

🔒 TLS 필수: PCI-DSS·ISMS-P는 암호화 전송 요구.

3️⃣ Agent(rsyslog) 설정 예시

/etc/rsyslog.d/30-firewall-rate.conf

# ① 파일 입력
module(load="imfile")
input(type="imfile"
      File="/var/log/ulogd/ssh_rate.log"
      Tag="FW_RATE"
      Severity="info"
      Facility="local4")

# ② JSON 파싱(선택)
module(load="mmjsonparse")
template(name="JSONonly" type="string" string="%$!all-json%\n")

# ③ RELP + TLS 전송
module(load="omrelp")
action(type="omrelp"
       Target="log-hub1.corp"
       Port="2514"
       tls="on"
       tls.caCert="/etc/pki/rsyslog/ca.pem"
       tls.myCert="/etc/pki/rsyslog/client.pem"
       tls.myPrivKey="/etc/pki/rsyslog/client.key"
       Template="JSONonly"
       queue.type="LinkedList"
       queue.size="50000"
       queue.filename="fw_rate_dq"
       action.resumeRetryCount="-1")

포인트

  1. imfile: 텍스트 파일을 실시간 tail → syslog 메시지화
  2. mmjsonparse: 키/값 필드를 ! 트리로 분리 → SIEM 인제스트 편리
  3. disk-queue: 네트워크 장애 시 로컬에 버퍼링 후 재전송

4️⃣ Hub/Relay(rsyslog) 설정 예시

/etc/rsyslog.d/30-in-relp.conf

module(load="imrelp")
input(type="imrelp"
      port="2514"
      tls="on"
      tls.caCert="/etc/pki/rsyslog/ca.pem"
      tls.myCert="/etc/pki/rsyslog/hub.pem"
      tls.myPrivKey="/etc/pki/rsyslog/hub.key"
      ruleset="firewall_rate")

ruleset(name="firewall_rate") {
  # GeoIP·Kafka·Elasticsearch 등 추가 필터 가능
  action(type="omkafka"
         topic="fw_rate"
         broker="kafka01:9092,kafka02:9092")
}
  • HA: Pacemaker 혹은 Keepalived VIP로 Active/Standby
  • Disk-queue: omkafka에도 queue.type="LinkedList" 권장

5️⃣ SIEM 처리·인덱스 전략

  1. Index 별도 분리: firewall-rate-*, firewall-threat-*
  2. 필드 매핑
    • src_ip, dst_ip ➜ IP-type
    • packets, bytes ➜ long
    • tag(“SSH_IN”) ➜ keyword
  3. ILM 정책 (예시)
    • 0~90 일: hot (SSD)
    • 90 일~1 년: warm (HDD)
    • 1 년+: cold (S3-GLACIER, read-only)

6️⃣ 인증서·암호화 운영

항목 방법
CA 사내 PKI or Hashicorp Vault PKI backend
배포 Ansible copy module + notify: restart rsyslog
Rotation 1 년 주기 · 만료 30 일 전 자동 갱신 스크립트

7️⃣ 모니터링 & 알림

  • rsyslog stats: module(load="impstats" interval="60" severity="7") → Prometheus Node Exporter textfile
  • Queue Depth Alert: if ($queuelen > 10000) then alert("Queue overflow!")
  • Log-loss Check: Agent/Hub 양쪽에서 sha256sum 비교(시간당 샘플)

8️⃣ 로깅 표준화 체크리스트 ✅

# 항목 설명
1 RFC 5424 포맷 template="RSYSLOG_SyslogProtocol23" 적용
2 Timezone UTC 고정 or Asia/Seoul 통일, 로그서버 NTP 필수
3 Tag 표준 FW_RATE, FW_DENY, APP_WEB 등 10자 이내
4 Severity 매핑 info=allow-rate, warning=deny, error=threat
5 Data Masking 개인정보(IP ⭢ 내부 NAT 후 공개) 필요 시 mmdb-lookup

9️⃣ 보존·파기 정책

  1. 방화벽 요약: 6 개월 online + 2 년 archive
  2. 위협/차단: 1 년 online + 5 년 archive(금융·결제 기준)
  3. 자동 파기: S3 Lifecycle, DeleteMarkerReplication Off ⇢ ‘정책 기반 삭제’ 로그 남김

🔟 DR & 테스트 시나리오

  • Agent → Hub 끊김: disk-queue → 재접속 후 catch-up 확인
  • Hub 장애: VIP 인계, imrelp 재연결 < 5 초
  • SIEM 불가(대규모 인덱스 장애): Hub → secondary S3 sink fallback
  • 연 1회 복구 훈련: Glacier → Restore → ELK re-index, 무결성 해시 비교

🎁 마무리 한 줄

“요약된 로그를 안전하게, 잃지 않고, 규정에 맞춰 오래 보관”
위 10-Step을 적용하면 ✔️전송 안전성 ✔️저장 무결성 ✔️확장성까지 모두 확보하실 수 있습니다!

📚 1. 방화벽 통신 로그 기록 관련 법규/규정

1.1 국내 법규 체계

🏛️ 개인정보보호법 체계

┌─────────────────────────────────────────────────────────┐
│                    개인정보보호법                          │
│                      제29조                              │
│                        ↓                                 │
│            개인정보의 안전성 확보조치 기준                  │
│                    고시 제8조                            │
│                        ↓                                 │
│              ┌─────────────────┬─────────────────┐      │
│              │   일반 기업      │   특수 대상     │      │
│              │   1년 이상       │   2년 이상      │      │
│              └─────────────────┴─────────────────┘      │
└─────────────────────────────────────────────────────────┘

상세 요구사항

  • 일반 기업: 개인정보처리시스템 접속기록 1년 이상 보관
  • 특수 대상 (2년 이상 보관):
    • 개인정보 5만명 이상 보유 기업
    • 민감정보(사상, 신념, 건강 등) 처리
    • 고유식별정보(주민번호, 여권번호 등) 처리
    • 정보통신서비스 제공자(ISP, 포털 등)

필수 기록 항목

접속기록:
  - 계정: 접속자 ID/계정명
  - 시간: 접속일시 (년월일시분초)
  - 위치: 접속지 IP 주소
  - 행위: 수행업무 또는 열람 내용
  - 대상: 접속한 정보주체 정보

🔐 ISMS-P 인증기준 상세

2.9.4 로그 및 접속기록 관리 세부 요구사항

1. 로그 수집 대상
   - 서버 (OS 레벨)
   - 네트워크 장비 (라우터, 스위치)
   - 보안장비 (방화벽, IPS, WAF)
   - 응용프로그램
   - 데이터베이스

2. 최소 기록 항목
   - 사용자 계정
   - 이벤트 유형
   - 날짜 및 시간
   - 성공/실패 여부
   - 이벤트 발생 출처
   - 영향받은 데이터/시스템/자원

3. 보관 기간
   - 법적 요구사항 준수 (최소 6개월)
   - 침해사고 분석 가능 기간
   - 업무 필요성 고려

4. 보호 조치
   - 위변조 방지
   - 무단 접근 차단
   - 정기 백업

1.2 국제 표준 및 산업별 규정

💳 PCI-DSS 4.0 상세 요구사항

Requirement 10: Log and Monitor All Access

10.1: 로깅 메커니즘 구현
  대상:
    - CHD(카드소지자 데이터) 접근
    - 모든 시스템 구성요소
    - 네트워크 자원

10.2: 감사 추적 구현
  필수 이벤트:
    - 모든 개인별 사용자 접근
    - 루트/관리자 권한 행위
    - 감사 로그 접근
    - 무효한 논리적 접근 시도
    - 인증 메커니즘 사용
    - 보안 이벤트 로깅 중지

10.3: 최소 기록 항목
  - 사용자 ID
  - 이벤트 유형
  - 날짜 및 시간
  - 성공/실패 표시
  - 이벤트 발생지
  - 영향받은 데이터/구성요소

10.7: 보관 기간
  - 최소 1년 보관
  - 최근 3개월은 즉시 분석 가능
  - 나머지는 백업에서 복원 가능

🌐 ISO/IEC 27001:2022 Annex A 통제항목

A.8.15 Logging (로깅)

목적: 이벤트 기록 및 증거 생성

구현 지침:
  - 로그 유형 결정
  - 보호 수준 설정
  - 보관 기간 정의
  - 시간 동기화
  - 정기적 검토

A.8.16 Monitoring activities (모니터링 활동)
  - 비정상 행위 탐지
  - 정보보호 사건 식별
  - 규정 준수 확인

1.3 산업별 특수 요구사항 매트릭스

산업 근거법규 로그 유형 보관기간 특이사항
금융 전자금융거래법 전자금융거래 5년 거래내역 포함
  전자금융감독규정 접속/권한변경 1년 실시간 모니터링
의료 의료법 전자의무기록 접근 3년 환자정보 보호
  HIPAA(미국) 시스템 활동 6년 PHI 접근 추적
통신 정보통신망법 통신사실 3개월 영장 없이 제공 금지
  통신비밀보호법 가입자 접속기록 3개월 법원 허가 필요
공공 전자정부법 행정정보시스템 2년 감사 대비
  공공기록물관리법 주요 시스템 준영구 이관 대상

🛠️ 2. iptables + ulogd2 고급 구현 가이드

2.1 아키텍처 설계

┌─────────────────────────────────────────────────────────────┐
│                     방화벽 서버 (iptables)                    │
│                                                             │
│  ┌─────────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │   INPUT     │    │   FORWARD    │    │   OUTPUT     │  │
│  └──────┬──────┘    └──────┬───────┘    └──────┬───────┘  │
│         │                   │                    │          │
│         └───────────────────┴────────────────────┘          │
│                             │                               │
│                      ┌──────▼──────┐                       │
│                      │    NFLOG    │                       │
│                      └──────┬──────┘                       │
│                             │                               │
│  ┌──────────────────────────▼────────────────────────────┐ │
│  │                      ulogd2                            │ │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐ │ │
│  │  │  NFLOG  │→ │  BASE   │→ │  RATE   │→ │ OUTPUT  │ │ │
│  │  └─────────┘  └─────────┘  └─────────┘  └─────────┘ │ │
│  └───────────────────────────┬───────────────────────────┘ │
└─────────────────────────────┼─────────────────────────────┘
                              │
                              ▼
                    ┌─────────────────────┐
                    │   로그 파일/SIEM    │
                    └─────────────────────┘

2.2 상세 구현 단계

Step 1: 시스템 준비 및 최적화

#!/bin/bash
# system-optimization.sh

# 1. 커널 파라미터 최적화
cat >> /etc/sysctl.d/99-firewall-logging.conf << EOF
# Netfilter 연결 추적 최적화
net.netfilter.nf_conntrack_max = 1048576
net.netfilter.nf_conntrack_buckets = 262144
net.netfilter.nf_conntrack_tcp_timeout_established = 432000
net.netfilter.nf_conntrack_tcp_timeout_time_wait = 120
net.netfilter.nf_conntrack_tcp_timeout_close_wait = 60
net.netfilter.nf_conntrack_tcp_timeout_fin_wait = 120

# 로그 버퍼 크기 증가
net.core.rmem_default = 8388608
net.core.rmem_max = 134217728
net.core.wmem_default = 8388608
net.core.wmem_max = 134217728

# NFLOG 큐 설정
net.netfilter.nf_log_queue_threshold = 65536
EOF

sysctl -p /etc/sysctl.d/99-firewall-logging.conf

# 2. 필요 패키지 설치
apt-get update
apt-get install -y \
    ulogd2 \
    ulogd2-json \
    ulogd2-pcap \
    ulogd2-sqlite3 \
    python3-pip \
    jq \
    rsyslog-relp \
    rsyslog-gnutls

# 3. Python 분석 도구 설치
pip3 install pandas numpy matplotlib seaborn

Step 2: 고급 iptables 규칙 설정

#!/bin/bash
# iptables-advanced-rules.sh

# 체인 초기화 및 커스텀 체인 생성
iptables -N LOG_ACCEPT 2>/dev/null || iptables -F LOG_ACCEPT
iptables -N LOG_DROP 2>/dev/null || iptables -F LOG_DROP
iptables -N RATE_LIMIT 2>/dev/null || iptables -F RATE_LIMIT

# 1. 속도 제한 체인
iptables -A RATE_LIMIT -m recent --name portscan --update --seconds 60 --hitcount 10 -j LOG_DROP
iptables -A RATE_LIMIT -m recent --name portscan --set -j RETURN

# 2. 허용 트래픽 로깅 (요약)
iptables -A LOG_ACCEPT \
    -j NFLOG --nflog-group 100 \
    --nflog-prefix "ACCEPT" \
    --nflog-threshold 10 \
    --nflog-range 128
iptables -A LOG_ACCEPT -j ACCEPT

# 3. 거부 트래픽 로깅 (전체)
iptables -A LOG_DROP \
    -j NFLOG --nflog-group 200 \
    --nflog-prefix "DROP" \
    --nflog-range 256
iptables -A LOG_DROP -j DROP

# 4. 서비스별 세분화 로깅
# HTTP/HTTPS (웹 서비스)
iptables -A INPUT -p tcp -m multiport --dports 80,443 \
    -m conntrack --ctstate NEW \
    -j NFLOG --nflog-group 110 \
    --nflog-prefix "WEB_NEW" \
    --nflog-threshold 20

# SSH (보안 중요)
iptables -A INPUT -p tcp --dport 22 \
    -m conntrack --ctstate NEW \
    -m recent --name ssh_attempt --set \
    -j NFLOG --nflog-group 120 \
    --nflog-prefix "SSH_NEW"

# Database (민감 데이터)
iptables -A INPUT -p tcp -m multiport --dports 3306,5432,1521 \
    -j NFLOG --nflog-group 130 \
    --nflog-prefix "DB_ACCESS"

# 5. 위협 탐지 특화 로깅
# 포트 스캔 탐지
iptables -A INPUT -p tcp --tcp-flags ALL NONE \
    -j NFLOG --nflog-group 300 \
    --nflog-prefix "SCAN_NULL"

iptables -A INPUT -p tcp --tcp-flags SYN,FIN SYN,FIN \
    -j NFLOG --nflog-group 300 \
    --nflog-prefix "SCAN_SYNFIN"

# 6. 메인 규칙 적용
iptables -A INPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT
iptables -A INPUT -i lo -j ACCEPT
iptables -A INPUT -p tcp --dport 22 -s 10.0.0.0/8 -j LOG_ACCEPT
iptables -A INPUT -p tcp -m multiport --dports 80,443 -j LOG_ACCEPT
iptables -A INPUT -j LOG_DROP

# 규칙 저장
iptables-save > /etc/iptables/rules.v4

Step 3: ulogd2 고급 설정

# /etc/ulogd.conf
[global]
logfile="/var/log/ulogd/ulogd.log"
loglevel=3
plugin_dir="/usr/lib/x86_64-linux-gnu/ulogd"

#######################################
# 플러그인 로드
#######################################
plugin="ulogd_inppkt_NFLOG.so"
plugin="ulogd_filter_BASE.so"
plugin="ulogd_filter_IFINDEX.so"
plugin="ulogd_filter_IP2STR.so"
plugin="ulogd_filter_IP2BIN.so"
plugin="ulogd_filter_PRINTPKT.so"
plugin="ulogd_filter_HWHDR.so"
plugin="ulogd_filter_RATE.so"
plugin="ulogd_filter_MARK.so"
plugin="ulogd_output_JSON.so"
plugin="ulogd_output_SQLITE3.so"
plugin="ulogd_output_SYSLOG.so"

#######################################
# 스택 정의 (다중 처리 파이프라인)
#######################################
# 1. 허용 트래픽 요약 (그룹 100-130)
stack=accept_log:NFLOG,base1:BASE,ifindex1:IFINDEX,ip2str1:IP2STR,rate1:RATE,json1:JSON

# 2. 거부 트래픽 전체 (그룹 200)
stack=drop_log:NFLOG,base2:BASE,ifindex2:IFINDEX,ip2str2:IP2STR,print2:PRINTPKT,sqlite1:SQLITE3

# 3. 위협 탐지 (그룹 300)
stack=threat_log:NFLOG,base3:BASE,ip2str3:IP2STR,mark1:MARK,syslog1:SYSLOG

#######################################
# NFLOG 입력 설정
#######################################
[accept_log]
group=100,110,120,130  # 여러 그룹 동시 처리
seq_local=1
seq_global=1
recv_buffer_size=8388608  # 8MB 버퍼

[drop_log]
group=200
seq_local=1
seq_global=1

[threat_log]
group=300
seq_local=1
seq_global=1

#######################################
# RATE 필터 설정 (핵심!)
#######################################
[rate1]
# 집계 간격
rate_interval=300  # 5분

# 집계 키 (5-tuple + 인터페이스 + prefix)
aggregate=orig.ip.saddr,orig.ip.daddr,orig.l4.sport,orig.l4.dport,orig.ip.protocol,oob.in,oob.prefix

# 통계 옵션
print_total=1      # 총 패킷/바이트
print_rate=1       # 초당 패킷/바이트
print_duration=1   # 지속 시간

# 임계값
upper_threshold=100000  # 10만 패킷 초과시 별도 알림
lower_threshold=10      # 10 패킷 미만은 무시

#######################################
# JSON 출력 (SIEM 연동용)
#######################################
[json1]
file="/var/log/ulogd/firewall_summary.json"
sync=1
timestamp=1

# 커스텀 필드 맵핑
field.event_type="firewall_summary"
field.src_ip=orig.ip.saddr_str
field.dst_ip=orig.ip.daddr_str
field.src_port=orig.l4.sport
field.dst_port=orig.l4.dport
field.protocol=orig.ip.protocol
field.interface_in=oob.in_str
field.interface_out=oob.out_str
field.service=oob.prefix
field.packets_total=rate.packets
field.bytes_total=rate.bytes
field.packets_per_sec=rate.pps
field.bytes_per_sec=rate.bps
field.duration_sec=rate.duration
field.start_time=rate.start_time
field.end_time=rate.end_time

#######################################
# SQLite3 출력 (분석용)
#######################################
[sqlite1]
db="/var/log/ulogd/firewall_drops.db"
table="drops"
create_table=1
buffer_size=100

# SQL 스키마
schema="CREATE TABLE IF NOT EXISTS drops (
    timestamp INTEGER,
    src_ip TEXT,
    dst_ip TEXT,
    src_port INTEGER,
    dst_port INTEGER,
    protocol INTEGER,
    tcp_flags TEXT,
    interface_in TEXT,
    reason TEXT
)"

#######################################
# Syslog 출력 (위협 알림)
#######################################
[syslog1]
facility=LOG_LOCAL5
level=LOG_WARNING
prefix="THREAT: "

Step 4: 로그 분석 및 시각화 스크립트

#!/usr/bin/env python3
# firewall_log_analyzer.py

import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import sqlite3
import sys

class FirewallLogAnalyzer:
    def __init__(self, json_file, db_file):
        self.json_file = json_file
        self.db_file = db_file

    def analyze_summary_logs(self):
        """JSON 요약 로그 분석"""
        data = []
        with open(self.json_file, 'r') as f:
            for line in f:
                try:
                    data.append(json.loads(line))
                except:
                    continue

        df = pd.DataFrame(data)
        df['timestamp'] = pd.to_datetime(df['end_time'])

        # 상위 통신 분석
        top_talkers = df.groupby(['src_ip', 'dst_ip', 'dst_port'])['packets_total'].sum().sort_values(ascending=False).head(20)

        # 시간대별 트래픽 분석
        df['hour'] = df['timestamp'].dt.hour
        hourly_traffic = df.groupby('hour')['bytes_total'].sum()

        # 프로토콜별 분포
        protocol_dist = df.groupby('protocol')['packets_total'].sum()

        return {
            'top_talkers': top_talkers,
            'hourly_traffic': hourly_traffic,
            'protocol_distribution': protocol_dist
        }

    def analyze_drop_logs(self):
        """SQLite 거부 로그 분석"""
        conn = sqlite3.connect(self.db_file)

        # 거부 원인별 통계
        drop_reasons = pd.read_sql_query("""
            SELECT reason, COUNT(*) as count
            FROM drops
            WHERE timestamp > strftime('%s', 'now', '-1 day')
            GROUP BY reason
            ORDER BY count DESC
        """, conn)

        # 공격자 IP 순위
        attacker_ips = pd.read_sql_query("""
            SELECT src_ip, COUNT(*) as attempts
            FROM drops
            WHERE timestamp > strftime('%s', 'now', '-1 day')
            GROUP BY src_ip
            ORDER BY attempts DESC
            LIMIT 50
        """, conn)

        conn.close()
        return {
            'drop_reasons': drop_reasons,
            'attacker_ips': attacker_ips
        }

    def generate_report(self):
        """종합 보고서 생성"""
        summary_analysis = self.analyze_summary_logs()
        drop_analysis = self.analyze_drop_logs()

        # 시각화
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # 1. 시간대별 트래픽
        summary_analysis['hourly_traffic'].plot(kind='bar', ax=axes[0,0])
        axes[0,0].set_title('Hourly Traffic Volume')
        axes[0,0].set_xlabel('Hour')
        axes[0,0].set_ylabel('Bytes')

        # 2. 프로토콜 분포
        protocol_names = {6: 'TCP', 17: 'UDP', 1: 'ICMP', 47: 'GRE'}
        protocol_labels = [protocol_names.get(p, f'Proto_{p}') for p in summary_analysis['protocol_distribution'].index]
        axes[0,1].pie(summary_analysis['protocol_distribution'].values, labels=protocol_labels, autopct='%1.1f%%')
        axes[0,1].set_title('Protocol Distribution')

        # 3. 거부 원인
        drop_analysis['drop_reasons'].head(10).plot(kind='barh', x='reason', y='count', ax=axes[1,0])
        axes[1,0].set_title('Top 10 Drop Reasons')
        axes[1,0].set_xlabel('Count')

        # 4. 공격자 IP (상위 10개)
        drop_analysis['attacker_ips'].head(10).plot(kind='barh', x='src_ip', y='attempts', ax=axes[1,1])
        axes[1,1].set_title('Top 10 Attacker IPs')
        axes[1,1].set_xlabel('Drop Attempts')

        plt.tight_layout()
        plt.savefig('/var/log/ulogd/daily_report.png', dpi=300)

        # 텍스트 보고서
        with open('/var/log/ulogd/daily_report.txt', 'w') as f:
            f.write(f"Firewall Log Analysis Report - {datetime.now().strftime('%Y-%m-%d')}\n")
            f.write("="*60 + "\n\n")

            f.write("Top 20 Network Flows:\n")
            f.write("-"*40 + "\n")
            for (src, dst, port), packets in summary_analysis['top_talkers'].items():
                f.write(f"{src} -> {dst}:{port} : {packets:,} packets\n")

            f.write("\n\nTop Attack Sources:\n")
            f.write("-"*40 + "\n")
            for _, row in drop_analysis['attacker_ips'].head(20).iterrows():
                f.write(f"{row['src_ip']} : {row['attempts']:,} attempts\n")

if __name__ == "__main__":
    analyzer = FirewallLogAnalyzer(
        '/var/log/ulogd/firewall_summary.json',
        '/var/log/ulogd/firewall_drops.db'
    )
    analyzer.generate_report()

2.3 로그 로테이션 및 보관 정책

# /etc/logrotate.d/ulogd2-advanced
/var/log/ulogd/*.json {
    hourly
    rotate 168  # 7일 * 24시간
    compress
    delaycompress
    missingok
    notifempty
    create 640 root adm
    postrotate
        # 시간별 아카이브 생성
        HOUR=$(date +%Y%m%d%H)
        for file in /var/log/ulogd/*.json.1; do
            [ -f "$file" ] && mv "$file" "${file%.1}_${HOUR}.json"
        done

        # S3 업로드 (장기 보관)
        aws s3 sync /var/log/ulogd/ s3://company-firewall-logs/$(hostname)/ \
            --exclude "*" \
            --include "*_${HOUR}.json" \
            --storage-class GLACIER_IR

        systemctl reload ulogd2
    endscript
}

/var/log/ulogd/*.db {
    daily
    rotate 30
    compress
    missingok
    notifempty
    create 640 root adm
    prerotate
        # SQLite 백업
        sqlite3 /var/log/ulogd/firewall_drops.db ".backup /var/log/ulogd/firewall_drops_$(date +%Y%m%d).db"
    endscript
}

🚀 3. Syslog 중앙 수집 시스템 구축

3.1 엔터프라이즈급 아키텍처

┌─────────────────────────────────────────────────────────────────┐
│                        전체 로그 수집 아키텍처                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐    │
│  │  Firewall-1  │    │  Firewall-2  │    │  Firewall-N  │    │
│  │   (Agent)    │    │   (Agent)    │    │   (Agent)    │    │
│  └──────┬───────┘    └──────┬───────┘    └──────┬───────┘    │
│         │                    │                    │             │
│         │         RELP/TLS   │                    │             │
│         └────────────────────┴────────────────────┘             │
│                              │                                  │
│                    ┌─────────▼─────────┐                       │
│                    │   Load Balancer   │                       │
│                    │   (Keepalived)    │                       │
│                    └─────────┬─────────┘                       │
│                              │                                  │
│         ┌────────────────────┴────────────────────┐            │
│         │                                         │            │
│  ┌──────▼───────┐                     ┌──────────▼─────┐      │
│  │ Log Hub #1   │                     │  Log Hub #2    │      │
│  │  (Active)    │◄────── SYNC ────────►  (Standby)    │      │
│  └──────┬───────┘                     └────────────────┘      │
│         │                                                      │
│         ├─────────────┬──────────────┬──────────────┐        │
│         │             │              │              │        │
│  ┌──────▼──────┐ ┌───▼────┐ ┌──────▼──────┐ ┌────▼────┐   │
│  │    Kafka    │ │  HDFS  │ │Elasticsearch│ │   S3    │   │
│  │  Cluster    │ │Cluster │ │   Cluster   │ │ Glacier │   │
│  └─────────────┘ └────────┘ └─────────────┘ └─────────┘   │
│                                                              │
└──────────────────────────────────────────────────────────────┘

3.2 Agent 설정 (rsyslog)

# /etc/rsyslog.d/50-firewall-advanced.conf

#######################################
# 모듈 로드
#######################################
module(load="imfile")
module(load="mmjsonparse")
module(load="mmnormalize")
module(load="omrelp")
module(load="imuxsock")

#######################################
# 전역 설정
#######################################
global(
    defaultNetstreamDriver="gtls"
    defaultNetstreamDriverCAFile="/etc/pki/rsyslog/ca.pem"
    defaultNetstreamDriverCertFile="/etc/pki/rsyslog/client-cert.pem"
    defaultNetstreamDriverKeyFile="/etc/pki/rsyslog/client-key.pem"
    workDirectory="/var/spool/rsyslog"
    maxMessageSize="256k"
)

#######################################
# 템플릿 정의
#######################################
# JSON 형식 (구조화된 데이터)
template(name="JsonFormat" type="list") {
    constant(value="{")
    constant(value="\"@timestamp\":\"")
    property(name="timegenerated" dateFormat="rfc3339")
    constant(value="\",\"hostname\":\"")
    property(name="hostname")
    constant(value="\",\"severity\":\"")
    property(name="syslogseverity-text")
    constant(value="\",\"facility\":\"")
    property(name="syslogfacility-text")
    constant(value="\",\"program\":\"")
    property(name="programname")
    constant(value="\",\"message\":\"")
    property(name="msg" format="json")
    constant(value="\",\"raw\":\"")
    property(name="rawmsg" format="json")
    constant(value="\"")
    property(name="$!all-json" position.from="2")
    constant(value="}\n")
}

# CEF 형식 (보안 이벤트)
template(name="CEFFormat" type="list") {
    constant(value="CEF:0|CompanyName|Firewall|1.0|")
    property(name="$!event_id")
    constant(value="|")
    property(name="$!event_name")
    constant(value="|")
    property(name="$!severity")
    constant(value="|src=")
    property(name="$!src_ip")
    constant(value=" dst=")
    property(name="$!dst_ip")
    constant(value=" spt=")
    property(name="$!src_port")
    constant(value=" dpt=")
    property(name="$!dst_port")
    constant(value=" proto=")
    property(name="$!protocol")
    constant(value=" act=")
    property(name="$!action")
    constant(value=" cn1=")
    property(name="$!packets")
    constant(value=" cn1Label=Packets cn2=")
    property(name="$!bytes")
    constant(value=" cn2Label=Bytes\n")
}

#######################################
# 입력 설정
#######################################
# ulogd2 JSON 로그
input(type="imfile"
    File="/var/log/ulogd/firewall_summary.json"
    Tag="firewall_summary"
    Severity="info"
    Facility="local4"
    addMetadata="on"
    freshStartTail="on"
    PersistStateInterval="100"
    ruleset="processFirewallSummary"
)

# 거부 로그
input(type="imfile"
    File="/var/log/ulogd/firewall_drops.log"
    Tag="firewall_drops"
    Severity="warning"
    Facility="local4"
    addMetadata="on"
    ruleset="processFirewallDrops"
)

#######################################
# 규칙셋 정의
#######################################
ruleset(name="processFirewallSummary") {
    # JSON 파싱
    action(type="mmjsonparse" 
           cookie=""
           container="!fw")

    # 필드 정규화
    set $!event_type = "firewall_flow";
    set $!src_ip = $!fw!src_ip;
    set $!dst_ip = $!fw!dst_ip;
    set $!packets = $!fw!packets_total;
    set $!bytes = $!fw!bytes_total;

    # 임계값 체크
    if ($!fw!packets_total > 100000) then {
        set $!alert = "high_volume_flow";
        set $!severity = "6";  # Warning
    }

    # RELP 전송 (고가용성)
    action(type="omrelp"
        target="log-hub-vip.company.com"
        port="2514"
        tls="on"
        tls.authMode="x509/name"
        tls.permittedPeer=["log-hub1.company.com", "log-hub2.company.com"]
        template="JsonFormat"
        queue.type="linkedList"
        queue.size="100000"
        queue.filename="fwsummary_q"
        queue.maxDiskSpace="1g"
        queue.saveOnShutdown="on"
        queue.highWatermark="90000"
        queue.lowWatermark="50000"
        queue.discardMark="95000"
        queue.discardSeverity="5"  # Notice 이하 버림
        queue.timeoutEnqueue="0"
        queue.timeoutShutdown="10000"
        queue.timeoutActionCompletion="1000"
        action.resumeRetryCount="-1"
        action.resumeInterval="30"
        action.reportSuspension="on"
        action.reportSuspensionContinuation="on"
    )
}

ruleset(name="processFirewallDrops") {
    # 정규화
    action(type="mmnormalize"
        rulebase="/etc/rsyslog.d/firewall_drops.rulebase"
        useRawMsg="off")

    # GeoIP 추가 (옵션)
    # action(type="mmgeoip"
    #     key="$!src_ip"
    #     fields=["country", "city", "latitude", "longitude"])

    # 위협 인텔리전스 체크
    if ($!src_ip == "10.0.0.0/8" or $!src_ip == "172.16.0.0/12" or $!src_ip == "192.168.0.0/16") then {
        set $!threat_level = "internal";
    } else {
        set $!threat_level = "external";
    }

    # CEF 형식으로 전송
    action(type="omrelp"
        target="siem.company.com"
        port="1514"
        template="CEFFormat"
        queue.type="linkedList"
        queue.filename="fwdrops_q"
        # ... 동일한 큐 설정 ...
    )
}

#######################################
# 통계 및 모니터링
#######################################
module(load="impstats"
    interval="60"
    severity="7"
    format="cee"
    ruleset="rsyslog_stats"
)

ruleset(name="rsyslog_stats") {
    action(type="omfile"
        file="/var/log/rsyslog/stats.log"
        template="RSYSLOG_FileFormat"
    )
}

3.3 Hub/Relay 서버 설정

# /etc/rsyslog.d/10-hub-server.conf

#######################################
# Hub 서버 전용 설정
#######################################
module(load="imrelp")
module(load="omkafka")
module(load="omelasticsearch")
module(load="omhdfs")
module(load="omprog")

#######################################
# RELP 리스너
#######################################
input(type="imrelp"
    port="2514"
    tls="on"
    tls.authMode="x509/name"
    tls.permittedPeer=["*.company.com"]
    tls.cacert="/etc/pki/rsyslog/ca.pem"
    tls.mycert="/etc/pki/rsyslog/hub-cert.pem"
    tls.myprivkey="/etc/pki/rsyslog/hub-key.pem"
    maxDataSize="256k"
    keepAlive="on"
    keepAlive.probes="5"
    keepAlive.interval="60"
    keepAlive.time="300"
    ruleset="hub_processing"
)

#######################################
# Hub 처리 규칙셋
#######################################
ruleset(name="hub_processing") {
    # 중복 제거
    action(type="mmrm1stspace")

    # 라우팅 결정
    if ($!event_type == "firewall_flow") then {
        call kafka_output
        call elasticsearch_output
    } else if ($!event_type == "firewall_drop") then {
        call siem_output
        call hdfs_archive
    }

    # 메트릭 수집
    action(type="omprog"
        binary="/usr/local/bin/log_metrics_collector.py"
        template="JsonFormat"
    )
}

#######################################
# Kafka 출력
#######################################
ruleset(name="kafka_output") {
    action(type="omkafka"
        broker=["kafka1:9092", "kafka2:9092", "kafka3:9092"]
        topic="firewall-logs"
        key="$!src_ip"
        partitions.auto="on"
        confParam=[
            "compression.type=snappy",
            "batch.size=65536",
            "linger.ms=100",
            "buffer.memory=134217728"
        ]
        queue.type="linkedList"
        queue.size="50000"
        queue.filename="kafka_q"
    )
}

#######################################
# Elasticsearch 출력
#######################################
ruleset(name="elasticsearch_output") {
    action(type="omelasticsearch"
        server=["es1.company.com", "es2.company.com", "es3.company.com"]
        serverport="9200"
        template="JsonFormat"
        searchIndex="firewall-logs"
        dynSearchIndex="on"
        searchType="_doc"
        bulkmode="on"
        bulkid="$uuid"
        queue.type="linkedList"
        queue.size="20000"
        queue.filename="es_q"
        errorfile="/var/log/rsyslog/es_errors.log"
        action.resumeretrycount="-1"
    )
}

#######################################
# HDFS 아카이브
#######################################
ruleset(name="hdfs_archive") {
    action(type="omhdfs"
        host="hdfs-namenode.company.com"
        port="50070"
        file="/logs/firewall/%$year%/%$month%/%$day%/drops_%$hour%.log"
        template="JsonFormat"
    )
}

#######################################
# 고가용성 동기화
#######################################
module(load="omrelp")

# Standby 서버로 복제
action(type="omrelp"
    target="log-hub2.company.com"
    port="2515"
    tls="on"
    template="rawmsg"
    queue.type="fixedArray"
    queue.size="10000"
    action.resumeRetryCount="3"
)

3.4 모니터링 및 알림 시스템

#!/usr/bin/env python3
# log_metrics_collector.py

import sys
import json
import time
import redis
from prometheus_client import Counter, Histogram, Gauge, push_to_gateway
from prometheus_client.registry import CollectorRegistry

class LogMetricsCollector:
    def __init__(self):
        self.registry = CollectorRegistry()
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)

        # Prometheus 메트릭 정의
        self.log_count = Counter('firewall_logs_total', 
                                'Total firewall logs processed',
                                ['event_type', 'severity'],
                                registry=self.registry)

        self.log_bytes = Counter('firewall_traffic_bytes_total',
                                'Total bytes logged',
                                ['direction', 'protocol'],
                                registry=self.registry)

        self.processing_time = Histogram('log_processing_seconds',
                                        'Log processing time',
                                        registry=self.registry)

        self.queue_depth = Gauge('rsyslog_queue_depth',
                                'Current queue depth',
                                ['queue_name'],
                                registry=self.registry)

    def process_log(self, log_data):
        """로그 처리 및 메트릭 수집"""
        start_time = time.time()

        try:
            # 로그 카운트
            self.log_count.labels(
                event_type=log_data.get('event_type', 'unknown'),
                severity=log_data.get('severity', 'info')
            ).inc()

            # 트래픽 통계
            if 'bytes' in log_data:
                self.log_bytes.labels(
                    direction='inbound' if log_data.get('dst_port') else 'outbound',
                    protocol=log_data.get('protocol', 'unknown')
                ).inc(log_data['bytes'])

            # 이상 탐지
            self.detect_anomalies(log_data)

        except Exception as e:
            print(f"Error processing log: {e}", file=sys.stderr)

        finally:
            self.processing_time.observe(time.time() - start_time)

    def detect_anomalies(self, log_data):
        """이상 행위 탐지"""
        # DDoS 탐지
        if log_data.get('packets_per_sec', 0) > 10000:
            self.alert_ddos(log_data)

        # 포트 스캔 탐지
        src_ip = log_data.get('src_ip')
        if src_ip:
            key = f"portscan:{src_ip}"
            self.redis_client.hincrby(key, log_data.get('dst_port', 0), 1)
            self.redis_client.expire(key, 300)  # 5분 TTL

            if self.redis_client.hlen(key) > 20:  # 20개 이상 포트 접근
                self.alert_portscan(src_ip)

    def alert_ddos(self, log_data):
        """DDoS 알림"""
        alert = {
            'alert_type': 'ddos_suspected',
            'timestamp': time.time(),
            'source_ip': log_data.get('src_ip'),
            'packets_per_sec': log_data.get('packets_per_sec'),
            'severity': 'critical'
        }
        self.redis_client.lpush('alerts:ddos', json.dumps(alert))
        self.redis_client.expire('alerts:ddos', 3600)

    def alert_portscan(self, src_ip):
        """포트스캔 알림"""
        alert = {
            'alert_type': 'portscan_detected',
            'timestamp': time.time(),
            'source_ip': src_ip,
            'severity': 'high'
        }
        self.redis_client.lpush('alerts:portscan', json.dumps(alert))

    def push_metrics(self):
        """Prometheus Gateway로 메트릭 전송"""
        push_to_gateway('prometheus-gw:9091', 
                       job='firewall_logs', 
                       registry=self.registry)

    def run(self):
        """메인 실행 루프"""
        for line in sys.stdin:
            try:
                log_data = json.loads(line.strip())
                self.process_log(log_data)

                # 100개마다 메트릭 푸시
                if self.log_count._value.sum() % 100 == 0:
                    self.push_metrics()

            except json.JSONDecodeError:
                continue
            except KeyboardInterrupt:
                break

        # 최종 메트릭 푸시
        self.push_metrics()

if __name__ == "__main__":
    collector = LogMetricsCollector()
    collector.run()

3.5 백업 및 복구 전략

#!/bin/bash
# firewall_log_backup.sh

#######################################
# 설정
#######################################
BACKUP_DIR="/backup/firewall_logs"
S3_BUCKET="s3://company-firewall-archive"
GLACIER_BUCKET="s3://company-firewall-glacier"
RETENTION_DAYS=180
COMPRESSION="zstd"  # 최고 압축률

#######################################
# 함수 정의
#######################################
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a /var/log/firewall_backup.log
}

create_backup_manifest() {
    local backup_id=$1
    cat > "${BACKUP_DIR}/${backup_id}/manifest.json" << EOF
{
    "backup_id": "${backup_id}",
    "timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
    "hostname": "$(hostname)",
    "files": [
$(find ${BACKUP_DIR}/${backup_id} -type f -name "*.${COMPRESSION}" -exec sha256sum {} \; | \
  awk '{print "        {\"file\": \"" $2 "\", \"sha256\": \"" $1 "\"}"}' | \
  paste -sd "," -)
    ],
    "retention": {
        "online_days": ${RETENTION_DAYS},
        "archive_class": "GLACIER",
        "legal_hold": false
    }
}
EOF
}

#######################################
# 메인 백업 프로세스
#######################################
main() {
    local backup_id="firewall_logs_$(date +%Y%m%d_%H%M%S)"
    local backup_path="${BACKUP_DIR}/${backup_id}"

    log "Starting backup: ${backup_id}"

    # 백업 디렉토리 생성
    mkdir -p "${backup_path}"

    # 1. JSON 로그 백업 (요약 데이터)
    log "Backing up JSON summary logs..."
    find /var/log/ulogd -name "*.json" -mtime -1 | while read file; do
        ${COMPRESSION} -19 -T0 < "$file" > "${backup_path}/$(basename $file).${COMPRESSION}"
    done

    # 2. SQLite DB 백업 (거부 로그)
    log "Backing up SQLite database..."
    sqlite3 /var/log/ulogd/firewall_drops.db ".backup ${backup_path}/firewall_drops.db"
    ${COMPRESSION} -19 "${backup_path}/firewall_drops.db"

    # 3. 설정 파일 백업
    log "Backing up configuration..."
    tar cf - /etc/ulogd.conf /etc/rsyslog.d/ /etc/iptables/ | \
        ${COMPRESSION} -19 > "${backup_path}/configs.tar.${COMPRESSION}"

    # 4. 매니페스트 생성
    create_backup_manifest "${backup_id}"

    # 5. S3 업로드 (Standard-IA)
    log "Uploading to S3..."
    aws s3 sync "${backup_path}" "${S3_BUCKET}/${backup_id}/" \
        --storage-class STANDARD_IA \
        --metadata "backup_id=${backup_id},retention_days=${RETENTION_DAYS}"

    # 6. 오래된 백업 Glacier로 이동
    log "Moving old backups to Glacier..."
    aws s3 ls "${S3_BUCKET}/" | awk '{print $2}' | while read old_backup; do
        backup_date=$(echo $old_backup | grep -oP '\d{8}')
        if [[ $(date -d "${backup_date} + ${RETENTION_DAYS} days" +%s) -lt $(date +%s) ]]; then
            aws s3 mv "${S3_BUCKET}/${old_backup}" "${GLACIER_BUCKET}/${old_backup}" \
                --recursive \
                --storage-class GLACIER
            log "Moved ${old_backup} to Glacier"
        fi
    done

    # 7. 로컬 정리
    find "${BACKUP_DIR}" -type d -name "firewall_logs_*" -mtime +7 -exec rm -rf {} \;

    log "Backup completed: ${backup_id}"
}

# 실행
main

📊 4. 성능 최적화 및 트러블슈팅

4.1 성능 튜닝 체크리스트

영역 최적화 항목 권장값 확인 명령
커널 conntrack 테이블 크기 1M+ sysctl net.netfilter.nf_conntrack_max
  네트워크 버퍼 8MB+ sysctl net.core.rmem_default
  NFLOG 큐 크기 64K+ cat /proc/net/netfilter/nfnetlink_queue
ulogd2 스택당 스레드 CPU 코어수 ps -eLf | grep ulogd
  RATE interval 60-300초 설정 파일 확인
  버퍼 크기 8MB+ recv_buffer_size 설정
rsyslog 큐 크기 50K-100K rsyslog-stats
  큐 타입 LinkedList Disk 지원 큐 사용
  배치 크기 100-1000 action 별 설정
스토리지 파일시스템 XFS/ext4 df -T
  I/O 스케줄러 deadline/noop /sys/block/*/queue/scheduler
  압축 zstd -19 CPU vs 용량 트레이드오프

4.2 트러블슈팅 가이드

문제: 로그 유실

# 진단 스크립트
#!/bin/bash
# diagnose_log_loss.sh

echo "=== NFLOG 통계 ==="
cat /proc/net/netfilter/nfnetlink_log

echo -e "\n=== ulogd2 상태 ==="
systemctl status ulogd2
tail -20 /var/log/ulogd/ulogd.log

echo -e "\n=== rsyslog 큐 상태 ==="
rsyslog-stats | grep -E "queue|discarded"

echo -e "\n=== 디스크 사용량 ==="
df -h /var/log

echo -e "\n=== 최근 에러 ==="
journalctl -u ulogd2 -u rsyslog -p err -since "1 hour ago"

문제: 높은 CPU 사용률

# CPU 프로파일링
perf record -g -p $(pgrep ulogd2) sleep 30
perf report

# 병목 지점 확인
strace -c -p $(pgrep ulogd2) -f

문제: 메모리 누수

# 메모리 모니터링
#!/bin/bash
while true; do
    ps aux | grep -E "ulogd|rsyslog" | grep -v grep
    echo "---"
    sleep 60
done | tee memory_monitor.log

# Valgrind 분석 (개발 환경)
valgrind --leak-check=full --show-leak-kinds=all ulogd -v

🔒 5. 보안 및 컴플라이언스

5.1 로그 무결성 보장

#!/bin/bash
# log_integrity_check.sh

#######################################
# 해시 체인 구현
#######################################
create_hash_chain() {
    local log_file=$1
    local hash_file="${log_file}.hash"
    local prev_hash="0000000000000000000000000000000000000000000000000000000000000000"

    > "$hash_file"

    while IFS= read -r line; do
        # 현재 줄 + 이전 해시
        current_data="${line}${prev_hash}"
        current_hash=$(echo -n "$current_data" | sha256sum | cut -d' ' -f1)

        # 해시 저장
        echo "${current_hash}" >> "$hash_file"
        prev_hash=$current_hash
    done < "$log_file"

    # 최종 해시를 블록체인 또는 TSA에 제출
    submit_to_timestamp_authority "$hash_file"
}

submit_to_timestamp_authority() {
    local hash_file=$1
    local final_hash=$(tail -1 "$hash_file")

    # RFC 3161 타임스탬프
    openssl ts -query -data "$hash_file" -no_nonce \
        -sha256 -out "${hash_file}.tsq"

    curl -H "Content-Type: application/timestamp-query" \
        --data-binary @"${hash_file}.tsq" \
        https://freetsa.org/tsr > "${hash_file}.tsr"
}

5.2 접근 통제 및 감사

# /etc/rsyslog.d/99-audit.conf

# 로그 접근 감사
$FileCreateMode 0640
$FileOwner root
$FileGroup adm

# 감사 로그 (별도 파일)
if $programname == 'rsyslog-audit' then {
    action(type="omfile"
        file="/var/log/audit/rsyslog_audit.log"
        template="RSYSLOG_FileFormat"
        fileCreateMode="0600"
        fileOwner="root"
        fileGroup="root"
    )
    stop
}

# 설정 변경 알림
if $msg contains 'rsyslog' and $msg contains 'reload' then {
    action(type="ommail"
        server="smtp.company.com"
        port="25"
        mailfrom="rsyslog@company.com"
        mailto="security@company.com"
        subject="Rsyslog Configuration Changed"
        body="Rsyslog configuration was reloaded on $hostname"
    )
}

📋 6. 종합 체크리스트

6.1 구축 단계별 체크리스트

  • 계획 단계
    • 법규 요구사항 파악 완료
    • 로그 보관 기간 정책 수립
    • 용량 산정 및 인프라 준비
    • 보안 요구사항 정의
  • 구현 단계
    • iptables NFLOG 규칙 설정
    • ulogd2 설치 및 설정
    • RATE 필터 최적화
    • rsyslog 에이전트 설정
    • 중앙 수집 서버 구축
    • TLS 인증서 배포
  • 검증 단계
    • 로그 수집 테스트
    • 성능 부하 테스트
    • 장애 복구 테스트
    • 보안 취약점 점검
  • 운영 단계
    • 모니터링 대시보드 구축
    • 알림 규칙 설정
    • 백업 자동화 구현
    • 정기 점검 일정 수립

6.2 일일 운영 체크리스트

#!/bin/bash
# daily_check.sh

echo "=== 방화벽 로그 일일 점검 ==="
echo "점검 시작: $(date)"

# 1. 서비스 상태
echo -e "\n[1] 서비스 상태 확인"
for service in ulogd2 rsyslog; do
    status=$(systemctl is-active $service)
    echo "$service: $status"
done

# 2. 로그 수집률
echo -e "\n[2] 로그 수집 통계"
yesterday=$(date -d "yesterday" +%Y%m%d)
log_count=$(grep -c "^{" /var/log/ulogd/firewall_summary.json)
echo "수집된 로그: $log_count 건"

# 3. 디스크 사용량
echo -e "\n[3] 디스크 사용량"
df -h /var/log | tail -1

# 4. 에러 확인
echo -e "\n[4] 최근 24시간 에러"
journalctl --since "24 hours ago" -p err | wc -l

# 5. 성능 지표
echo -e "\n[5] 성능 지표"
sar -u 1 1 | tail -1

echo -e "\n점검 완료: $(date)"

이 가이드는 방화벽 로그 관리의 모든 측면을 다루었습니다.

  1. 법규 준수: 국내외 주요 규정 요구사항 충족
  2. 기술 구현: iptables + ulogd2 + rsyslog 완벽 통합
  3. 확장성: 엔터프라이즈급 중앙 수집 아키텍처
  4. 보안성: 암호화, 무결성, 접근통제 구현
  5. 운영성: 모니터링, 백업, 복구 자동화

이 설정을 통해 "6개월 이상"의 단순한 보관 요구사항을 넘어, 안전하고 효율적이며 분석 가능한 방화벽 로그 관리 시스템을 구축할 수 있습니다.

728x90
그리드형(광고전용)

댓글