본문 바로가기
서버구축 (WEB,DB)

실시간 파이프라인 데이터 수집을 위한 Logstash의 grok 필터 설정

by 날으는물고기 2024. 5. 2.

실시간 파이프라인 데이터 수집을 위한 Logstash의 grok 필터 설정

Logstash에 대한 실용적인 소개 - Elastic Blog

실시간 파이프라인(Real-time Data Pipeline) 기능을 통한 데이터 수집을 위해 로그 메시지를 구문 분석하고 필드를 추출하기 위한 Logstash의 grok 필터 설정 부분에 대해서 아래 예제 코드를 통해서 각 부분별로 정리해보고자 합니다.

  #Initial log extraction
  mutate {
    gsub => ["message", "([\\w]+)Usuario:", "$1 Usuario:"]
  }
  grok {
    match => {
      "message" => [
        "(<%{INT}>)?%{GREEDYDATA:ts}  %{HOSTNAME:custom_inter_host} \\*\\*%{DATA:place}\\*\\* \\(%{WORD}\\)(\\s*)%{GREEDYDATA:custom_description} %{WORD} (?:Usuario:%{GREEDYDATA:custom_user})(?:Política (infringida|Infringida):%{GREEDYDATA:custom_politica_infringida})(?:Destinatarios:%{GREEDYDATA:custom_destinarios})(?:ID del).*?(?:(evento|Evento):.*?%{INT:custom_evento}(\\s*))(?:Nombre (del|de) adjunto: %{GREEDYDATA:custom_attachment})(?:Fecha:%{GREEDYDATA})(?:Severidad:.*?%{INT}:%{DATA:custom_severity})(?:Asunto:%{GREEDYDATA:custom_asunto})(?:Estado:%{GREEDYDATA:custom_estado})(?:Incidente:%{GREEDYDATA:custom_incidente})"
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOSTNAME:intermediary_host} icdx: %{GREEDYDATA:json_data}"
        #Logs with Category and LogType
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{HOST:computer},Category: %{NONNEGINT:category},(?P<logType>.*?),%{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IPORHOST:syslogServer} SymantecServer: %{HOSTNAME:symantec_server},Category: %{NONNEGINT:category},(?P<logType>Telemetry Engine),.*",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IPORHOST:syslogServer}? SymantecServer: %{HOST:computer},Category: %{NONNEGINT:category},(?P<logType>NTR),%{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IPORHOST:syslogServer}? SymantecServer: %{HOST:computer},Category: %{NONNEGINT:category},(?P<logType>.*?),%{GREEDYDATA:msg1}",
        #Security Risk Found
        "%{TIMESTAMP_ISO8601:ts},(?P<logType>Security risk found),%{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOSTNAME:intermediary_host} SymantecServer: (?P<logType>Virus found),%{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOSTNAME:intermediary_host} SymantecServer: (?P<logType>Security risk found),%{GREEDYDATA:msg1}"
        "(<%{INT}>)?%{TIMESTAMP_ISO8601:ts},(?P<logType>Virus found|Potential risk found|SONAR detection now allowed),%{GREEDYDATA:msg1}",
        #IDS Alert
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{HOST:computer},(?P<msg1>Event Description: .*)",
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{HOST:computer},\"(?P<msg1>Event Description: .*)",
        #Firewall Messages
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{DATA:computer},(?P<msg1>Local Host IP: .*)",
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{HOST:computer},(?P<msg1>Local Host: .*)",
        #No Product/Service Label
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},Site: (?P<site_name>.*?),Server: (?P<server_name_1>.*?),(?P<msg1>.*)",
        "%{TIMESTAMP_ISO8601:ts},Site: (?P<site_name>.*?),(Server Name|Server): (?P<server_name_1>.*?),(Domain Name|Domain): (?P<domain>.*?),(?P<msg2>.*)",
        "%{TIMESTAMP_ISO8601:ts},(?P<logType>Scan ID): %{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IPORHOST:syslogServer} SymantecServer: %{HOSTNAME:symantec_server},Category: %{DATA},(?P<logType>IPS),.*",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOST:syslogServer} SymantecServer: Site: (?P<site_name>.*?),(Server Name|Server): (?P<server_name_1>.*?),(Domain Name|Domain): (?P<domain>.*?),(?P<msg2>.*)",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IPORHOST:syslogServer} SymantecServer: Site: %{DATA},(Server Name|Server): (?P<server_name_1>.*?),%{GREEDYDATA:msg1}"
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IP:srcIp} SymantecServer: %{HOST:computer},%{GREEDYDATA:msg1}",
        # Scan
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IPORHOST:syslogServer} SymantecServer: (?P<logType>Scan ID): %{GREEDYDATA:msg1}",
        "%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{GREEDYDATA:messageTmp}",
        # DNS
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{IP:tgtIp} %{DATESTAMP} %{WORD} %{DATA} %{WORD:logType}  %{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOST:syslogServer} .*?SymantecServer: %{DATA:computer},%{IP:src_ip},%{WORD:sec_action},\\\"(?P<event_description>.*?)\\\",(?P<logType>[^,]+),%{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOST:syslogServer} .*?SymantecServer: %{DATA:computer},%{IP:src_ip},%{WORD:sec_action},(?P<event_description>[^,]*),(?P<logType>[^,]+),%{GREEDYDATA:msg1}",
        "(<%{INT}>)?%{SYSLOGTIMESTAMP:ts} %{HOST:syslogServer} .*?SymantecServer: %{DATA:computer},%{GREEDYDATA:msg1}",
        "%{SYSLOGTIMESTAMP:ts} %{HOST:syslogServer} %{GREEDYDATA:msg_lri}",
        "^(?P<json_data>{.+})$"
      ]
    }
    overwrite => ["custom_inter_host","place","custom_description","custom_user","custom_politica_infringida","custom_destinarios","custom_evento","custom_attachment",
    "custom_severity","custom_asunto","custom_estado","custom_incidente","logType", "msg1","severity", "computer", "server_name_1", "srcIp", "tgtIp", "syslogServer",
    "json_data","intermediary_host", "msg2", "event_description","msg_lri", "messageTmp", "category", "site_name","src_ip", "sec_action"]
    on_error => "grokfail1"
  }

1. 초기 로그 조정

mutate {
  gsub => ["message", "([\\w]+)Usuario:", "$1 Usuario:"]
}
  • mutate 플러그인의 gsub 옵션을 사용해 메시지 내의 특정 패턴을 다른 문자열로 치환합니다. 여기서는 "Usuario:" 앞에 오는 단어와 붙어있는 경우에 공백을 추가하여 분리합니다.

2. 로그 필드 추출

grok 플러그인을 사용해 로그 메시지에서 다양한 정보를 추출합니다. 각 패턴은 로그 형식에 따라 정의된 필드에 데이터를 매핑합니다. 예를 들어, %{INT}는 정수를, %{HOSTNAME}는 호스트 이름을, %{IP}는 IP 주소를 나타냅니다. match 옵션 안에서 다양한 로그 형식을 처리할 수 있는 패턴들이 정의되어 있습니다.

  • (<%{INT}>)?%{GREEDYDATA:ts} %{HOSTNAME:custom_inter_host} ...와 같은 각 줄은 로그의 특정 형태를 정의하며, 각 패턴에 맞는 로그에서 해당 필드(예: ts, custom_inter_host 등)를 추출합니다.
  • overwrite 옵션을 사용해 이미 존재하는 필드에 새로운 값을 덮어씁니다.
  • on_error 옵션으로 에러 발생 시 grokfail1 태그를 추가하여 에러 핸들링을 할 수 있습니다.

3. 특정 조건에 따른 추가 처리

위의 grok 처리 이후, 로그의 특정 조건을 만족하는 경우 추가적인 데이터 처리를 수행할 수 있습니다. 예를 들어, 특정 키워드가 포함되어 있거나, 특정 형식의 데이터가 필요한 경우 mutate 및 기타 필터 플러그인을 사용하여 필드 값을 조정하거나, 필요한 데이터 파싱을 수행할 수 있습니다.

 

위의 코드는 다양한 형식의 로그 메시지에서 필요한 정보를 추출하고, 필드를 정의하여 Logstash를 통해 구조화된 데이터로 파싱하는 과정을 정의합니다. 이를 통해 로그 데이터를 더 쉽게 분석하고, 모니터링 시스템이나 데이터베이스 등에 저장하여 사용할 수 있게 됩니다.

 

그러나, 로그 형식이 매우 다양하고 복잡할 수 있기 때문에, 정확한 필드 추출을 위해서는 로그 메시지의 구조를 잘 이해하고, 필요에 따라 grok 패턴을 세밀하게 조정해야 합니다. Logstash의 grok 필터를 사용하여 로그 메시지에서 다양한 정보를 추출하는 구성 각 부분의 역할입니다.

grok 필터

grok 필터는 로그 데이터에서 원하는 정보를 추출하는 데 사용됩니다. 정규 표현식과 함께 사용되며, %{PATTERN:fieldname} 형태로 사용됩니다. 여기서 PATTERN은 grok 패턴(예: %{IP}, %{WORD} 등)이고, fieldname은 추출된 데이터를 저장할 필드의 이름입니다.

match 옵션

match 옵션은 로그 메시지("message")와 매칭될 패턴을 정의합니다. 이 코드에서는 다양한 로그 형식에 대한 여러 패턴이 정의되어 있으며, 각 패턴은 특정 로그 형식에 대응됩니다.

패턴 예시

  • (<%{INT}>)?%{GREEDYDATA:ts} %{HOSTNAME:custom_inter_host} ...는 로그 메시지의 시작부에서 정수(%{INT}), 타임스탬프(%{GREEDYDATA:ts}), 호스트 이름(%{HOSTNAME:custom_inter_host}) 등을 추출하는 패턴입니다. 여기서 ?는 정수가 있을 수도 있고 없을 수도 있음을 나타냅니다.
  • ... Usuario:%{GREEDYDATA:custom_user} ...는 "Usuario:" 다음에 오는 모든 텍스트를 custom_user 필드에 저장합니다.
  • ... Política (infringida|Infringida):%{GREEDYDATA:custom_politica_infringida} ...는 "Política infringida" 또는 "Política Infringida" 다음에 오는 텍스트를 custom_politica_infringida 필드에 저장합니다.

overwrite 옵션

overwrite 옵션은 이미 존재하는 필드에 새로운 값을 할당할 때 사용됩니다. 위의 코드에서는 추출된 데이터를 특정 필드에 저장하고, 필요한 경우 기존 값에 덮어쓰도록 설정되어 있습니다.

on_error 옵션

on_error 옵션은 grok 패턴 매칭이 실패했을 때 처리 방법을 정의합니다. grokfail1이라는 태그를 추가하여, 어떤 로그 메시지가 매칭에 실패했는지 추적할 수 있습니다.

 

이 구성은 다양한 로그 메시지 형식에서 필요한 정보를 추출하고, 해당 정보를 구조화된 형태로 저장하기 위해 사용됩니다. 로그 데이터의 구조와 필요한 정보에 따라 grok 패턴을 세밀하게 조정해야 하며, 복잡한 로그 형식이나 다양한 정보를 추출해야 하는 경우 여러 패턴을 정의하여 사용할 수 있습니다.

 

Logstash의 grok 필터에서 match 옵션을 사용할 때 각 패턴 사이에 쉼표(,)를 사용하여 구분해야 합니다. 배열 내의 각 요소는 따옴표(")로 둘러싸인 문자열이며, 여러 패턴을 정의할 때는 쉼표로 구분해야 정상적으로 인식됩니다. 코드 내에서 쉼표 없이 정의된 부분이 있다면, 그것은 오타일 수 있으며, 정상적으로 동작하지 않을 가능성이 높습니다.

 

예를 들어,

match => {
  "message" => [
    "pattern1",
    "pattern2",
    "pattern3"
  ]
}

위와 같이 배열 내의 각 패턴 문자열 사이에 쉼표를 넣어 구분해야 합니다.

 

만약 코드에 쉼표가 누락되어 있다면, 이는 실수로 발생한 문제일 수 있으며, 각 패턴을 정확히 구분하기 위해 쉼표를 추가해야 합니다. 쉼표 없이 라인이 나뉘는 것만으로는 배열 요소로 정상적으로 처리되지 않으므로 쉼표가 누락된 부분은 수정이 필요할 것입니다. 각 패턴 사이에 쉼표를 추가하여 올바르게 배열 요소로 구분되도록 해야 합니다.

여러 줄로 나누어진 하나의 긴 패턴

Logstash의 grok 구성에서 여러 줄에 걸쳐 표현된 패턴은 사실 하나의 긴 패턴으로 처리됩니다. 이 경우, 각 줄의 끝에 쉼표(,)가 없어도 됩니다. 그러나 이것은 코드의 가독성을 해치고, 실수로 패턴을 잘못 정의할 위험이 있으므로, 일반적으로 권장되지 않는 방식입니다.

 

올바른 방식은 하나의 패턴을 정의할 때는 한 줄에 모두 작성하고, 실제로 여러 패턴을 배열 형식으로 나열할 때는 쉼표로 구분하는 것입니다.

match => {
  "message" => [
    "첫 번째 패턴",
    "두 번째 패턴",
    ...
  ]
}

실제로는 한 줄에 있어야 할 긴 패턴이 여러 줄로 나뉘어져 있는 경우, 각 줄을 올바르게 연결하여 하나의 패턴으로 만들거나, 의도된대로 여러 개의 독립적인 패턴으로 처리해야 할 필요가 있습니다.

 

다만, 쉼표가 없는 경우 Logstash의 grok 필터가 각 패턴을 올바르게 분리해서 처리하지 못할 수 있습니다. Logstash 구성에서 match 옵션 내의 패턴 배열을 정의할 때, 각 패턴 사이에는 반드시 쉼표가 있어야 합니다. 이는 배열의 각 요소를 명확하게 구분하기 위한 필수 구분자입니다.

 

쉼표가 빠진 것은 실수일 가능성이 높으며, 이로 인해 예상치 못한 동작이나 에러가 발생할 수 있습니다. Logstash가 문법적 오류나 예상치 못한 구성으로 인해 패턴을 정확히 분리하지 못하고 처리한다면, 이는 운 좋게 동작하는 것처럼 보일 수도 있지만, 안정적이거나 신뢰할 수 있는 동작 방식이라고 볼 수 없습니다.

 

정상적인 구성을 위해서는 다음과 같이 각 패턴을 명확하게 쉼표로 구분하여 배열 요소로 정의해야 합니다.

match => {
  "message" => [
    "첫 번째 패턴",  # 첫 번째 패턴 끝에 쉼표
    "두 번째 패턴",  # 두 번째 패턴 끝에 쉼표
    "세 번째 패턴"   # 마지막 패턴 끝에는 쉼표 없음
  ]
}

만약 쉼표 없이 패턴이 정의되어 있고, 실제 로그 처리에 문제가 없는 것처럼 보인다면, 이는 Logstash의 내부 동작 방식 또는 특정 버전에서의 유연한 처리로 인한 것일 수 있습니다. 그러나 이는 공식 문법과는 다르며, Logstash 구성을 정의할 때는 명확한 구분과 올바른 문법을 사용하는 것이 중요합니다. 따라서, 패턴 사이에 쉼표를 추가하는 것이 바람직합니다.

 

mutate 필터의 replace 옵션에서, Logstash의 구성에서는 특정 컨텍스트 내에서 쉼표가 필수적이지 않을 수도 있습니다. 이 경우, replace 옵션 안에서는 여러 필드에 대한 값 할당을 정의하고 있으며, Ruby 해시 문법을 따르고 있습니다.

mutate {
  replace => {
    "action" => "",
    "application" => "",
    ...
  }
}

이 문법에서는 각 키-값 쌍을 새 줄로 구분하여 나열하고 있으며, Ruby 해시에서는 이러한 형태가 유효합니다. 따라서, 이 경우 쉼표가 명시적으로 필요하지 않습니다. 이는 mutate 필터의 replace 옵션 같은 특정 구성에서만 해당되는 사항입니다.

 

그러나 grok 필터의 match 옵션에서는 쉼표를 사용하여 패턴을 구분하는 것이 필요합니다. grok 필터 예제에서 각 패턴 사이에 쉼표가 빠진 것은, 제가 이전에 언급한 것처럼, 실제로는 오타나 실수일 가능성이 높습니다. 혹은 다른 이유로 인해 예상치 못한 방식으로 동작하는 것일 수 있습니다.

요약

  • mutate 필터의 replace 같은 경우는 Ruby 해시 문법을 따르므로, 새 줄로 구분된 키-값 쌍 사이에 쉼표가 없어도 됩니다.
  • grok 필터의 match 옵션에서는 각 패턴을 배열 요소로 나열할 때 쉼표로 구분해야 합니다. 패턴 사이에 쉼표가 빠진 것은 일반적으로 권장되지 않으며, 정확한 문법을 사용하는 것이 중요합니다.

 

"%{TIMESTAMP_ISO8601:ts},%{WORD:severity},%{HOST:computer},Category: %{NONNEGINT:category},(?P<logType>.*?),%{GREEDYDATA:msg1}",
  • %{TIMESTAMP_ISO8601:ts}: TIMESTAMP_ISO8601 패턴은 ISO8601 형식의 타임스탬프를 매칭합니다. 이 패턴으로 매칭된 부분은 ts 필드에 할당됩니다. 예를 들어, "2024-01-01T00:00:00Z"와 같은 타임스탬프가 이에 해당됩니다.
  • %{WORD:severity}: WORD 패턴은 공백이나 특수 문자가 아닌 연속된 문자들을 매칭합니다. 이 패턴으로 매칭된 부분은 severity 필드에 할당됩니다. 로그의 심각도(severity) 레벨을 나타내는 단어(예: "ERROR", "INFO")가 여기에 해당됩니다.
  • %{HOST:computer}: HOST 패턴은 호스트 이름이나 IP 주소를 매칭합니다. 이 패턴으로 매칭된 부분은 computer 필드에 할당됩니다. 로그를 생성한 컴퓨터의 호스트 이름이나 IP 주소가 여기에 해당됩니다.
  • Category: %{NONNEGINT:category}: NONNEGINT 패턴은 음수가 아닌 정수를 매칭합니다. 이 패턴으로 매칭된 부분은 category 필드에 할당됩니다. "Category: " 뒤에 오는 숫자(예: 로그 카테고리를 나타내는 식별자)가 여기에 해당됩니다.
  • (?P<logType>.*?): 이 부분은 명명된 캡처 그룹을 사용하는 정규 표현식입니다. 여기서 logType은 그룹의 이름이며, .*?는 최소 매칭(non-greedy)을 의미하는 정규 표현식입니다. 즉, 가능한 가장 짧은 매칭을 찾습니다. 이 패턴으로 매칭된 부분은 logType 필드에 할당됩니다. 로그의 유형(예: "ERROR", "WARNING")이 여기에 해당될 수 있습니다.
  • %{GREEDYDATA:msg1}: GREEDYDATA 패턴은 가능한 모든 문자(탐욕적 매칭)를 매칭합니다. 이 패턴으로 매칭된 부분은 msg1 필드에 할당됩니다. 로그 메시지의 나머지 부분이 여기에 해당됩니다.

각 패턴 사이에 위치한 쉼표(,)는 패턴 구분자로 작용하여, 로그 메시지의 각 부분을 올바르게 분리하고 분석합니다.

 

grok 필터에서 정의된 패턴은 로그 메시지의 형식과 정확히 일치해야 합니다. 각 패턴(TIMESTAMP_ISO8601, WORD, HOST, NONNEGINT 등)은 로그 메시지의 특정 부분과 매치되어야 하며, 전체 패턴이 로그 메시지와 일치할 때만 해당 필드(ts, severity, computer, category, logType, msg1)로 데이터가 추출됩니다.

 

예를 들어, 제공된 패턴은 다음과 같은 로그 메시지와 일치해야 합니다.

2024-01-01T00:00:00Z,INFO,server1,Category: 100,ERROR_TYPE,Some message detail

이 경우,

  • 2024-01-01T00:00:00Z%{TIMESTAMP_ISO8601:ts}와 일치하여 ts 필드에 할당됩니다.
  • INFO%{WORD:severity}와 일치하여 severity 필드에 할당됩니다.
  • server1%{HOST:computer}와 일치하여 computer 필드에 할당됩니다.
  • 100%{NONNEGINT:category}와 일치하여 category 필드에 할당됩니다.
  • ERROR_TYPE(?P<logType>.*?)와 일치하여 logType 필드에 할당됩니다.
  • Some message detail%{GREEDYDATA:msg1}와 일치하여 msg1 필드에 할당됩니다.

모든 부분이 로그 메시지와 정확히 일치해야 하며, 패턴의 한 부분이라도 일치하지 않는 경우, 해당 grok 매치는 실패(_grokparsefailure 태그가 추가됨)로 처리됩니다. 따라서, 로그 데이터의 형식을 정확히 이해하고, grok 패턴을 그에 맞게 정의해야 합니다.

 

(?P<logType>.*?) 구문에서 <logType>는 사용자가 정의한 명명된 캡처 그룹으로, 로그 메시지 구조에 따라 필요한 정보를 추출하기 위해 사용자가 지정한 이름입니다. 이 구문은 정규 표현식에서 사용되며, 매칭된 텍스트를 특정 필드명으로 추출하고자 할 때 사용됩니다.

  • (?P<name>pattern) 형식은 Python 스타일의 명명된 캡처 그룹을 나타냅니다. 여기서 name은 캡처 그룹의 이름을 지정하며, pattern은 해당 그룹이 매칭해야 할 패턴입니다.
  • <logType>는 이 경우 캡처 그룹의 이름으로, 매칭된 데이터를 저장할 필드 이름을 의미합니다. 따라서, 이 패턴에 의해 로그 메시지에서 추출된 부분은 logType이라는 필드명으로 저장됩니다.
  • .*?는 비탐욕적(non-greedy) 매칭을 의미하는 정규 표현식으로, 가능한 가장 짧은 문자열을 매칭하려고 시도합니다.

예를 들어, 로그 메시지에서 "ERROR_TYPE" 부분을 추출하고 싶다면, 해당 패턴은 "ERROR_TYPE"을 logType 필드로 추출하게 됩니다. <logType>의 이름은 사용자가 자유롭게 지정할 수 있으며, 이를 통해 로그 데이터에서 원하는 정보를 유연하게 추출하고 필드명을 지정할 수 있습니다.

 

정리하면, (?P<logType>.*?) 구문에서 P 다음에 오는 <logType>는 캡처 그룹의 이름을 지정하는 부분입니다. 여기서 logType은 사용자가 지정한 캡처 그룹의 이름을 나타내며, 이 이름은 매칭된 데이터를 저장할 필드의 이름을 의미합니다. P는 명명된 캡처 그룹(named capture group)을 나타내는 정규 표현식 구문의 일부입니다.

 

(?P<name>pattern)에서 ?P는 Python 스타일의 정규 표현식에서 명명된 캡처 그룹을 정의할 때 사용되는 구문이며, <name>에는 캡처 그룹에 할당할 이름을 지정하고, pattern에는 해당 캡처 그룹이 매칭할 패턴을 지정합니다.

 

따라서, (?P<logType>.*?)에서는 다음과 같은 의미를 가집니다.

  • ?P<logType>: 매칭된 텍스트를 logType이라는 이름의 캡처 그룹으로 추출하겠다는 의미입니다.
  • .*?: 최소 매칭을 의미하는 정규 표현식으로, 가능한 한 가장 짧은 문자열을 매칭하려고 시도합니다.

예시에서 (?P<logType>.*?) 구문은 로그 메시지에서 매칭된 부분을 logType 필드에 저장하게 됩니다. 여기서 P는 구문의 일부이고, logType이 실제로 캡처된 데이터를 저장할 필드 이름입니다. 따라서 , Pxxx,로 되어 있다면 xxxlogType 변수에 들어가는 것이 아니라, logType이라는 이름으로 매칭된 데이터가 저장됩니다.

 

(?P<logType>.*?)(.*) , \1로 치환하는 것은 비슷해 보일 수 있지만, 다른 목적과 사용법을 가지고 있습니다.

(?P<logType>.*?)의 사용

  • (?P<logType>.*?)는 Python 스타일의 정규 표현식에서 명명된 캡처 그룹(named capture group)을 사용하는 예시입니다.
  • 여기서 logType은 매칭된 데이터를 저장할 캡처 그룹의 이름입니다.
  • .*?는 비탐욕적(non-greedy) 매칭을 의미하며, 가능한 한 가장 짧은 문자열을 매칭합니다.
  • 이 구문은 로그 메시지에서 특정 패턴을 찾아 그 부분을 logType이라는 이름으로 추출하고 식별하는 데 사용됩니다.

(.*) , \1의 사용

  • (.*)는 탐욕적(greedy) 매칭을 사용하여 가능한 한 가장 긴 문자열을 매칭합니다.
  • \1은 첫 번째 캡처 그룹에 매칭된 데이터를 참조합니다. 즉, (.*)에 의해 캡처된 텍스트를 나타냅니다.
  • 이 구문은 주로 문자열이나 데이터를 찾고, 그것을 다른 형태로 치환하는 데 사용됩니다. 예를 들어, 정규 표현식을 사용한 문자열 치환 작업에서 사용할 수 있습니다.

요약

  • (?P<logType>.*?)는 로그 데이터에서 특정 부분을 명명된 캡처 그룹으로 추출하는 데 사용되며, 추출된 데이터에 이름을 붙여 다른 곳에서 참조할 수 있게 합니다.
  • (.*) , \1는 데이터를 찾고 그것을 참조하여 다른 형태로 치환하는 데 사용됩니다. \1은 이전에 매칭된 첫 번째 캡처 그룹의 내용을 사용하는 방식입니다.

따라서, 이 두 구문은 사용 목적과 컨텍스트에 따라 다르게 사용되며, 직접적으로 비교하기 어려운 다른 기능을 수행합니다.

  grok {
    match => {
      "message" => [
      ]
    }
    overwrite => ["custom_inter_host","place","custom_description","custom_user","custom_politica_infringida","custom_destinarios","custom_evento","custom_attachment",
      "custom_severity","custom_asunto","custom_estado","custom_incidente","logType", "msg1","severity", "computer", "server_name_1", "srcIp", "tgtIp", "syslogServer",
      "json_data","intermediary_host", "msg2", "event_description","msg_lri", "messageTmp", "category", "site_name","src_ip", "sec_action"]
    on_error => "grokfail1"
  }

  if [grokfail1] == true {
    grok {
      match => {
        "message" => [
          "^<%{INT:_someIntValue}>%{SPACE}%{IP:_ip}%{SPACE}%{SYSLOGTIMESTAMP:ts} %{DATA:_server},(?P<messageTemp>.*)"
          ]
      }
      overwrite => ["messageTemp"]
      on_error => "grok_failed"
    }

위의 Logstash 구성 코드는 grok 플러그인을 사용하여 로그 메시지를 분석하고 데이터를 추출하는 과정을 정의합니다. 이 코드는 두 부분으로 나뉘며, 각 부분의 역할과 작동 방식은 아래와 같습니다.

첫 번째 grok 필터

첫 번째 grok 필터 섹션에서는 match 옵션을 사용해 로그 메시지("message")에서 다양한 정보를 추출하려고 시도합니다. 하지만 실제 패턴이 배열 안에 제공되지 않았으므로, 이 부분은 예제로서의 완성도가 떨어집니다. 실제 사용 시에는 로그 형식에 맞는 패턴을 [] 안에 추가해야 합니다.

overwrite 옵션은 특정 필드에 대해 grok 필터로 추출된 새로운 값을 덮어쓰도록 지정합니다.

on_error 옵션을 사용하여, 이 grok 필터가 실패할 경우(즉, 패턴 매칭이 실패할 경우) 로그 이벤트에 grokfail1 태그를 추가합니다.

조건부 처리

if [grokfail1] == true 조건은 첫 번째 grok 필터가 실패한 경우(즉, grokfail1 태그가 추가된 경우) 다음 단계를 수행하도록 합니다.

이 조건 하에서 두 번째 grok 필터가 실행됩니다.

두 번째 grok 필터

이 필터는 첫 번째 필터가 실패한 로그 메시지를 대상으로 하며, 다른 패턴("^<%{INT:_someIntValue}>%{SPACE}%{IP:_ip}%{SPACE}%{SYSLOGTIMESTAMP:ts} %{DATA:_server},(?P<messageTemp>.*)" )을 사용해 로그 메시지를 다시 분석하려고 시도합니다.

이 패턴은 로그 메시지의 시작 부분에서 일련의 필드(예: 인트로덕터, IP 주소, 타임스탬프, 서버 이름)를 추출하고, 나머지 부분을 messageTemp 필드에 저장하도록 설계되었습니다.

overwrite 옵션은 messageTemp 필드에 새로운 값을 덮어쓰도록 지정합니다.

on_error 옵션을 사용하여, 이 grok 필터가 실패할 경우 로그 이벤트에 grok_failed 태그를 추가하도록 설정합니다.

요약

이 Logstash 구성 코드는 로그 메시지를 분석하여 정보를 추출하려는 시도의 일부로, 패턴 매칭에 실패할 경우 대체적인 패턴을 사용하여 다시 시도하는 과정을 설명합니다. 실제로 사용하려면, 첫 번째 grok 필터의 match 옵션에 적절한 패턴을 포함시켜야 하며, 로그 데이터와 패턴이 정확히 일치해야 합니다.

 

grok 필터는 Logstash에서 로그 데이터의 구조화되지 않은 텍스트를 분석하고 구조화된 데이터로 변환하는 데 사용되는 강력한 도구입니다. grok 필터는 정규 표현식을 기반으로 하며, 다양한 옵션을 통해 로그 분석을 유연하게 제어할 수 있습니다. 여기 주요 grok 필터 옵션들에 대해 정리했습니다.

match

  • match 옵션은 로그 메시지를 분석할 때 사용할 패턴을 정의합니다. 이 옵션은 { "필드명" => "패턴" }의 형식으로 사용되며, 하나 이상의 패턴을 정의할 수 있습니다. 로그 메시지에서 여러 형식을 처리해야 하는 경우, 여러 패턴을 배열로 제공할 수 있습니다.

patterns_dir

  • patterns_dir 옵션을 사용하여 사용자 정의 패턴을 포함한 디렉터리의 경로를 지정할 수 있습니다. 이를 통해 기본 제공되는 패턴 외에도 사용자가 정의한 패턴을 grok 필터에서 사용할 수 있습니다.

break_on_match

  • break_on_matchtrue (기본값) 또는 false로 설정할 수 있으며, true일 경우 첫 번째로 매칭된 패턴에 대해서만 처리를 중단하고, false일 경우 모든 패턴에 대해 매칭을 시도합니다.

named_captures_only

  • named_captures_only 옵션은 true (기본값) 또는 false로 설정할 수 있으며, true일 경우 이름이 지정된 캡처 그룹만 필드로 생성하고, false일 경우 이름이 없는 캡처 그룹(\1, \2 등)도 필드로 생성합니다.

keep_empty_captures

  • keep_empty_captures 옵션은 true 또는 false (기본값)로 설정할 수 있으며, true로 설정할 경우 매칭된 결과가 없는 캡처 그룹도 필드로 유지합니다.

tag_on_failure

  • tag_on_failure 옵션은 패턴 매칭이 실패했을 때 이벤트에 추가할 태그를 지정합니다. 기본값은 ["_grokparsefailure"]입니다. 이를 통해 어떤 이벤트가 패턴 매칭에 실패했는지 식별할 수 있습니다.

overwrite

  • overwrite 옵션은 특정 필드에 대해 grok 필터로 추출된 새로운 값을 덮어쓰도록 지정합니다.

on_error

  • on_error 옵션은 grok 필터 실행 중 에러가 발생했을 때의 동작을 정의합니다. 예를 들어, 에러 발생 시 특정 태그를 추가하거나 필터 실행을 중단할 수 있습니다.

 

grok 필터를 효과적으로 사용하기 위해서는 이러한 옵션들을 이해하고 적절히 활용하는 것이 중요합니다. 로그 데이터의 형식과 분석 요구사항에 따라 다양한 옵션을 조합하여 사용할 수 있습니다.

    date {
      match => ["ts", "yyyy-MM-dd HH:mm:ss", "MMM dd HH:mm:ss", "MMM  d HH:mm:ss"]
      timezone => "Asia/Seoul"
      rebase => true
      on_error => "date_format_failure"
    }

이 코드 조각은 특정 날짜와 시간 관련 작업을 처리하기 위한 설정이 포함된 것으로 각 부분의 의미입니다.

  • match: 이 배열은 입력된 날짜와 시간 데이터를 구문 분석(parse)하기 위한 다양한 형식 패턴을 정의합니다. 여기에는 ts (타임스탬프를 나타낼 수 있음), yyyy-MM-dd HH:mm:ss (예: 2024-03-21 15:00:00), MMM dd HH:mm:ss (예: Mar 21 15:00:00), 그리고 MMM d HH:mm:ss (예: Mar 5 15:00:00, 일자 앞에 공백이 두 개 있는 경우를 위한 형식) 등이 포함됩니다.
  • timezone: 이 설정은 처리할 날짜와 시간 데이터의 시간대를 지정합니다. 여기서는 "Asia/Seoul"로, 서울 시간대를 나타냅니다.
  • rebase: 이 옵션이 true로 설정된 경우, 입력된 날짜와 시간 데이터를 지정된 timezone에 기반하여 재조정(rebase)합니다. 즉, 입력된 데이터를 서울 시간대에 맞게 조정합니다.
  • on_error: 이 설정은 데이터를 구문 분석하거나 재조정하는 과정에서 오류가 발생했을 때 어떤 행동을 취할지 결정합니다. 여기서는 date_format_failure라는 값을 사용하는데, 이는 특정 오류 처리 메커니즘을 호출하거나 오류 상태를 나타내는데 사용될 수 있습니다.

간단히 말해, 이 코드는 다양한 형식으로 입력된 날짜와 시간 데이터를 서울 시간대에 맞게 조정하고, 과정에서 발생할 수 있는 오류를 특정 방식으로 처리하기 위한 설정을 정의하는 것입니다.

 

match에 정의된 형식 중 하나라도 입력된 날짜와 시간 데이터와 일치하면, 해당 데이터는 성공적으로 구문 분석되어 처리됩니다. 여러 형식을 지정함으로써, 다양한 형태로 입력될 수 있는 날짜와 시간 데이터를 유연하게 처리할 수 있게 됩니다. 이는 입력 데이터의 형식이 미리 예측하기 어려운 경우나 여러 출처에서 오는 데이터를 처리해야 할 때 매우 유용합니다. 각 형식은 입력 데이터가 해당 형식과 일치하는지 순차적으로 검사되며, 일치하는 형식이 발견되면 해당 형식에 따라 데이터가 구문 분석됩니다.

 

date 처리와 관련하여 사용할 수 있는 다양한 옵션이 있습니다. 이러한 옵션들은 날짜와 시간 데이터를 구문 분석하고, 변환하며, 조정하는 과정을 유연하고 정교하게 제어할 수 있게 해줍니다. 다음은 일반적으로 사용될 수 있는 몇 가지 추가 옵션에 대해 정리했습니다.

  1. format
    • 이 옵션을 사용하여 구문 분석된 날짜와 시간 데이터를 특정 형식의 문자열로 출력할 수 있습니다. 예를 들어, "yyyy-MM-dd" 또는 "HH:mm:ss" 등을 지정할 수 있습니다.
  2. locale
    • 날짜와 시간을 구문 분석하거나 출력할 때 지역 설정에 따른 형식을 적용할 수 있습니다. 예를 들어, 다양한 언어나 지역에서 사용되는 월 이름이나 요일 이름을 올바르게 표시하기 위해 사용됩니다.
  3. start_of_week
    • 한 주의 시작을 정의합니다. 예를 들어, 일요일을 한 주의 시작으로 설정하거나 월요일을 시작으로 설정할 수 있습니다. 이는 주간 단위의 계산이나 변환에 영향을 줄 수 있습니다.
  4. adjust_to_timezone
    • 특정 시간대로 날짜와 시간을 조정합니다. 이는 timezone 옵션과 함께 사용되어 입력된 날짜와 시간 데이터를 다른 시간대로 변환할 때 사용될 수 있습니다.
  5. range
    • 날짜와 시간 데이터를 특정 범위 내로 제한하거나, 특정 기간 동안의 데이터만 선택하도록 할 수 있는 옵션입니다. 예를 들어, 특정 날짜 이전이나 이후의 데이터만 처리하고자 할 때 유용할 수 있습니다.
  6. default
    • 구문 분석에 실패했을 때 사용할 기본 날짜와 시간 값을 지정할 수 있습니다. 이는 on_error 옵션과 함께 사용하여, 오류 상황에서의 처리 방법을 보다 세밀하게 제어할 수 있습니다.
  7. transform
    • 날짜와 시간 데이터에 특정 변환을 적용할 수 있습니다. 예를 들어, 특정 기간을 더하거나 빼는 연산을 정의하여 결과 날짜와 시간을 조정할 수 있습니다.

이러한 옵션들은 사용하는 시스템이나 라이브러리에 따라 이름이나 사용 방법이 다를 수 있으며, 모든 옵션이 항상 사용 가능한 것은 아닙니다. 따라서 구체적인 사용 예나 문법은 해당 시스템이나 라이브러리의 문서를 참조하는 것이 좋습니다.

728x90

댓글