프로그램 (PHP,Python)

n8n에서 Elasticsearch로 직접 쿼리하여 자동화 처리 운영

날으는물고기 2024. 7. 25. 00:41

n8n에서 Elasticsearch로 직접 쿼리하여 최근 날짜의 값을 추출하려면 HTTP Request 노드를 사용하여 Elasticsearch에 쿼리를 보낼 수 있습니다. 다음은 이를 구현하는 예제입니다.

  1. HTTP Request 노드 설정
    • Method: POST
    • URL: http://your_elasticsearch_host:9200/{target_index}-*/_search
    • Headers: Content-Type: application/json
    • Body: 아래 JSON 형식
  2. JSON Body
    {
      "from": 0,
      "size": 1,
      "sort": [
        {
          "timestamp": {
            "order": "desc"
          }
        }
      ],
      "query": {
        "bool": {
          "must": [
            {
              "match": {
                "location": "{{ $json["location"] }}"
              }
            },
            {
              "match": {
                "category": "{{ $json["category"] }}"
              }
            },
            {
              "match": {
                "subcategory": "{{ $json["subcategory"] }}"
              }
            }
          ]
        }
      }
    }
  3. Elasticsearch 쿼리 결과 처리
    • HTTP Request 노드 다음에 Function 노드를 추가하여 쿼리 결과를 처리합니다.
    • 아래와 같이 Function 노드에 JavaScript 코드를 추가하여 응답에서 timestamp 값을 추출합니다.
    return {
      timestamp: $json["hits"]["hits"][0]?._source?.timestamp || 
                  (getInput().category === "SCAN" && getInput().subcategory === "default" ? 
                    Date.now() - (60 * 10 * 1000) : 
                    Date.now() - (60 * 60 * 24 * 3 * 1000))
    };

이 코드는 n8n에서 HTTP Request 노드를 사용하여 Elasticsearch에 쿼리를 보내고, Function 노드에서 쿼리 응답을 처리하는 방식으로, 최근 날짜의 timestamp 값을 추출하는 방법을 보여줍니다. 필요한 경우, HTTP Request 노드와 Function 노드의 설정을 조정하여 구체적인 요구 사항에 맞출 수 있습니다.

 

n8n에서 Elasticsearch 노드를 사용하여 최근 날짜의 값을 추출하려면 아래 단계를 따르면 됩니다.

Elasticsearch Node 설정

  • Operation: Search
  • Index: {target_index}-*

Query Body 설정

Elasticsearch 노드의 Body 필드에 아래 JSON 쿼리를 입력합니다.

{
  "from": 0,
  "size": 1,
  "sort": [
    {
      "timestamp": {
        "order": "desc"
      }
    }
  ],
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "location": "{{ $json.location }}"
          }
        },
        {
          "match": {
            "category": "{{ $json.category }}"
          }
        },
        {
          "match": {
            "subcategory": "{{ $json.subcategory }}"
          }
        }
      ]
    }
  }
}

Function Node를 사용하여 결과 처리

Elasticsearch 노드의 출력 값을 처리하기 위해 Function 노드를 추가합니다. 이 노드에서 최근 날짜의 timestamp 값을 추출합니다.

const hits = $json["hits"]["hits"];

if (hits.length > 0 && hits[0]._source && hits[0]._source.timestamp) {
  return {
    timestamp: hits[0]._source.timestamp
  };
} else {
  // SCAN default 처리
  if ($json.category === "SCAN" && $json.subcategory === "default") {
    return {
      timestamp: Date.now() - (60 * 10 * 1000) // 10분 전
    };
  }
  // 기본 처리
  return {
    timestamp: Date.now() - (60 * 60 * 24 * 3 * 1000) // 3일 전
  };
}

전체 워크플로우 설정

  1. Set Node를 사용하여 입력 데이터 설정 (예: location, category, subcategory).
  2. Elasticsearch Node를 추가하여 검색 쿼리를 설정합니다.
  3. Function Node를 추가하여 Elasticsearch 노드의 결과를 처리합니다.

예시 워크플로우

  1. Set Node: 위치, 카테고리, 서브카테고리 설정
    {
      "location": "office",
      "category": "NTP",
      "subcategory": "agentpacket"
    }
  2. Elasticsearch Node:
    • Operation: Search
    • Index: {target_index}-*
    • Body: 위에 제공된 JSON 쿼리
  3. Function Node: 결과 처리
    const hits = $json["hits"]["hits"];
    
    if (hits.length > 0 && hits[0]._source && hits[0]._source.timestamp) {
      return {
        timestamp: hits[0]._source.timestamp
      };
    } else {
      // SCAN default 처리
      if ($json.category === "SCAN" && $json.subcategory === "default") {
        return {
          timestamp: Date.now() - (60 * 10 * 1000) // 10분 전
        };
      }
      // 기본 처리
      return {
        timestamp: Date.now() - (60 * 60 * 24 * 3 * 1000) // 3일 전
      };
    }

이렇게 설정하면 n8n에서 Elasticsearch 노드를 사용하여 지정된 쿼리 조건에 맞는 가장 최근의 timestamp 값을 추출할 수 있습니다.

 

"size": 1은 Elasticsearch 쿼리에서 반환할 문서의 최대 수를 지정합니다. 이 경우, "size": 1은 가장 최근의 한 개의 문서만 반환하도록 설정된 것입니다. 이는 timestamp 기준으로 내림차순으로 정렬한 후 첫 번째 문서를 가져오므로, 가장 최신의 문서 하나를 가져오겠다는 의미입니다. 이 부분을 통해 최근의 timestamp 값을 가진 문서만 반환되도록 보장할 수 있습니다.

 

"from": 0은 검색 결과의 시작 지점을 지정하는데 사용됩니다. 이는 반환할 첫 번째 문서의 오프셋을 의미합니다. Elasticsearch에서 기본적으로 검색 결과는 페이지 단위로 반환되며, "from": 0은 첫 페이지의 첫 번째 문서부터 반환하도록 설정하는 것입니다.

 

  • "from": 0: 검색 결과의 첫 번째 문서부터 시작.
  • "size": 1: 최대 1개의 문서를 반환.

따라서, 이 설정은 가장 최신의 문서 한 개를 반환하도록 합니다.

 

Code Node에서 특정 값만 반환하려면, outputItems 배열에 직접 해당 값을 추가해야 합니다. 현재 코드에서 add 메서드가 없는 배열에 사용되었기 때문에 에러가 발생합니다. 배열의 push 메서드를 사용하여 값을 추가하고, 올바른 형식으로 값을 반환해야 합니다.

 

다음은 수정된 코드입니다.

const outputItems = [];
outputItems.push({ event_time: items[0].json.event_time });
return outputItems;

이 코드에서는 items 배열의 첫 번째 요소에서 event_time 값을 추출하여 outputItems 배열에 추가하고 반환합니다. json 속성을 통해 원본 데이터에 접근해야 한다는 점을 주의해야 합니다.

전체 예제

  1. Elasticsearch Node: 검색 쿼리 수행
  2. Code Node: 결과 처리 및 특정 값 반환

예제 워크플로우

  1. Elasticsearch Node 설정
    • Operation: Search
    • Index: {target_index}-*
    • Query Body
      {
        "from": 0,
        "size": 1,
        "sort": [
          {
            "timestamp": {
              "order": "desc"
            }
          }
        ],
        "query": {
          "bool": {
            "must": [
              {
                "match": {
                  "location": "{{ $json.location }}"
                }
              },
              {
                "match": {
                  "category": "{{ $json.category }}"
                }
              },
              {
                "match": {
                  "subcategory": "{{ $json.subcategory }}"
                }
              }
            ]
          }
        }
      }
  2. Code Node: 결과 처리
    const outputItems = [];
    if (items.length > 0 && items[0].json && items[0].json.event_time) {
      outputItems.push({ event_time: items[0].json.event_time });
    }
    return outputItems;

이 설정은 Elasticsearch에서 검색된 결과에서 첫 번째 문서의 event_time 값을 추출하여 반환합니다. Code Node는 검색된 결과를 처리하고, 원하는 값만을 반환하는 역할을 합니다.

 

그리고, 시간을 KST로 변환하는 작업을 포함하여, Code Node에서 event_time을 KST로 변환하고 원하는 형식으로 반환하는 코드를 작성할 수 있습니다.

 

다음은 수정된 코드입니다.

const outputItems = [];

if (items.length > 0 && items[0].json && items[0].json.event_time) {
    const date = new Date(items[0].json.event_time);
    const kstOffset = 9 * 60 * 60 * 1000; // KST는 UTC+9:00 (밀리초 단위)
    const kstDate = new Date(date.getTime() + kstOffset);

    const formattedDate = `${kstDate.getFullYear()}-${String(kstDate.getMonth() + 1).padStart(2, '0')}-${String(kstDate.getDate()).padStart(2, '0')} ${String(kstDate.getHours()).padStart(2, '0')}:${String(kstDate.getMinutes()).padStart(2, '0')}:${String(kstDate.getSeconds()).padStart(2, '0')}`;

    outputItems.push({ event_time: formattedDate });
}

return outputItems;
  1. Date 객체 생성: event_time을 Date 객체로 변환합니다.
  2. KST 오프셋 계산: KST는 UTC+9:00이므로 9시간을 밀리초 단위로 변환하여 오프셋을 계산합니다.
  3. KST 시간 계산: UTC 시간을 KST 시간으로 변환합니다.
  4. 포맷팅: KST 시간으로 변환한 값을 원하는 형식으로 문자열로 포맷팅합니다.
  5. outputItems 배열에 추가: 포맷팅된 날짜를 outputItems 배열에 추가합니다.
  6. 반환: outputItems 배열을 반환합니다.

이 설정을 통해 Elasticsearch에서 검색된 결과의 event_time을 KST로 변환하고, 원하는 형식으로 출력할 수 있습니다.

728x90