본문 바로가기

RabbitMQ 이용한 서버 상태 수집 및 이벤트 기반 워크플로우(n8n) 설계

728x90

리눅스 서버의 상태 정보를 수집하여 RabbitMQ로 전송하는 방법에는 크게 두 가지가 있다.

  1. Python을 사용한 스크립트 방식
  2. RabbitMQ 클라이언트 도구를 활용한 방식 (e.g., pika 라이브러리 또는 rabbitmqadmin)

아래에서 각각의 방법을 설명하겠다.

1. Python을 이용한 상태 정보 수집 및 RabbitMQ 전송

Python을 활용하면 커스텀하게 원하는 정보를 수집하고 RabbitMQ로 전송할 수 있다.

(1) 필요한 패키지 설치

먼저 pika 라이브러리를 설치한다.

pip install pika psutil

(2) 서버 상태 정보 수집 및 전송 코드

아래는 psutil을 활용하여 CPU, 메모리, 디스크, 네트워크 등의 상태 정보를 수집한 후 RabbitMQ에 전송하는 Python 코드이다.

import pika
import json
import psutil
import socket

# RabbitMQ 연결 정보
RABBITMQ_HOST = "rabbitmq_server_ip"  # RabbitMQ 서버 IP 또는 호스트명
RABBITMQ_QUEUE = "server_status_queue"

# 시스템 상태 정보 수집 함수
def collect_system_metrics():
    hostname = socket.gethostname()
    metrics = {
        "hostname": hostname,
        "cpu_usage": psutil.cpu_percent(interval=1),
        "memory_usage": psutil.virtual_memory().percent,
        "disk_usage": psutil.disk_usage('/').percent,
        "network_io": psutil.net_io_counters()._asdict(),
    }
    return metrics

# RabbitMQ에 메시지 전송 함수
def send_to_rabbitmq(data):
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
        channel = connection.channel()
        channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)

        message = json.dumps(data)
        channel.basic_publish(
            exchange='',
            routing_key=RABBITMQ_QUEUE,
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 메시지를 지속적으로 유지
            ),
        )
        print(f"Sent: {message}")
        connection.close()
    except Exception as e:
        print(f"Error: {e}")

# 상태 정보 수집 및 전송 실행
if __name__ == "__main__":
    data = collect_system_metrics()
    send_to_rabbitmq(data)

2. RabbitMQ CLI를 활용한 상태 정보 전송

RabbitMQ CLI를 활용하면 Python을 사용하지 않고도 메시지를 보낼 수 있다. rabbitmqadmin을 이용하면 메시지를 쉽게 전송할 수 있다.

(1) rabbitmqadmin 설치

RabbitMQ Management Plugin을 활성화한 후 rabbitmqadmin을 다운로드해야 한다.

wget http://rabbitmq_server_ip:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin

(2) 서버 상태 정보 수집 후 메시지 전송

#!/bin/bash

# RabbitMQ 정보
RABBITMQ_HOST="rabbitmq_server_ip"
QUEUE_NAME="server_status_queue"
USER="guest"
PASSWORD="guest"

# 상태 정보 수집
HOSTNAME=$(hostname)
CPU_USAGE=$(top -bn1 | grep "Cpu(s)" | awk '{print $2 + $4}')
MEMORY_USAGE=$(free | awk '/Mem/ {printf "%.2f", $3/$2 * 100.0}')
DISK_USAGE=$(df -h / | awk 'NR==2 {print $5}')
NETWORK_IO=$(cat /proc/net/dev | awk '/eth0/ {print "{\"rx_bytes\":" $2 ", \"tx_bytes\":" $10 "}"}')

# JSON 데이터 생성
JSON_DATA=$(echo "{\"hostname\":\"$HOSTNAME\", \"cpu_usage\":$CPU_USAGE, \"memory_usage\":$MEMORY_USAGE, \"disk_usage\":\"$DISK_USAGE\", \"network_io\":$NETWORK_IO}")

# RabbitMQ로 메시지 전송
./rabbitmqadmin -H $RABBITMQ_HOST -u $USER -p $PASSWORD declare queue name=$QUEUE_NAME durable=true
./rabbitmqadmin -H $RABBITMQ_HOST -u $USER -p $PASSWORD publish exchange=amq.default routing_key=$QUEUE_NAME payload="$JSON_DATA"

echo "Sent: $JSON_DATA"

(3) 실행 권한 부여 및 실행

chmod +x send_status.sh
./send_status.sh

3. 실행 자동화 (Cron 활용)

RabbitMQ로 지속적으로 서버 상태를 보내려면 cron을 활용하면 된다.

(1) 크론 잡 등록

crontab -e

(2) 예제 크론 설정 (매 5분마다 실행)

*/5 * * * * /path/to/send_status.sh

또는 Python 스크립트로 실행할 경우:

*/5 * * * * /usr/bin/python3 /path/to/send_status.py

4. RabbitMQ에서 메시지 수신 테스트

RabbitMQ 서버에서 메시지를 정상적으로 받는지 확인하려면 다음을 실행한다.

(1) RabbitMQ CLI를 이용한 메시지 확인

rabbitmqctl list_queues
rabbitmqctl get_messages -q server_status_queue

(2) Python을 이용한 메시지 수신

RabbitMQ에서 메시지를 지속적으로 수신하려면 아래의 Python 소비자(Consumer) 코드를 실행하면 된다.

import pika

RABBITMQ_HOST = "rabbitmq_server_ip"
RABBITMQ_QUEUE = "server_status_queue"

def callback(ch, method, properties, body):
    print(f"Received: {body}")

connection = pika.BlockingConnection(pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)

channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback, auto_ack=True)
print("Waiting for messages...")
channel.start_consuming()
  • Python 방식: psutil을 사용하여 시스템 상태 정보를 수집하고 pika를 통해 RabbitMQ로 전송한다.
  • CLI 방식: rabbitmqadmin을 사용하여 메시지를 전송하며, Bash 스크립트와 결합하여 자동화할 수 있다.
  • 자동화: cron을 이용하여 주기적으로 실행할 수 있다.

 

이 방법을 활용하면 대량의 리눅스 서버에서 중앙 RabbitMQ로 상태 정보를 쉽게 수집하고 처리할 수 있다. rabbitmqadmin은 기본적으로 RabbitMQ 서버에 포함되어 있지는 않지만, RabbitMQ Management Plugin이 활성화되어 있으면 웹 인터페이스에서 다운로드할 수 있다. 따라서 RabbitMQ 서버 전체를 설치하지 않고도 rabbitmqadmin만 사용할 수 있다.

1. rabbitmqadmin 다운로드 방법

(1) RabbitMQ 서버에서 rabbitmqadmin 다운로드

RabbitMQ 서버에서 Management Plugin이 활성화된 경우, HTTP API를 통해 rabbitmqadmin을 직접 다운로드할 수 있다.

wget http://<RABBITMQ_SERVER>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin

🔹 <RABBITMQ_SERVER>에는 RabbitMQ 서버의 IP 또는 도메인을 입력해야 한다.

(2) Management Plugin이 활성화되지 않은 경우

rabbitmqadminRabbitMQ Management Plugin이 활성화된 상태에서 제공된다. 만약 15672 포트가 열려 있지 않거나 다운로드가 되지 않는다면, 서버에서 플러그인을 활성화해야 한다.

 

Management Plugin 활성화 명령어

rabbitmq-plugins enable rabbitmq_management

플러그인을 활성화한 후 RabbitMQ 서비스를 재시작한다.

systemctl restart rabbitmq-server

이제 다시 wget을 실행하면 rabbitmqadmin을 다운로드할 수 있다.

2. RabbitMQ 서버가 없는 시스템에서 rabbitmqadmin 사용

RabbitMQ 서버를 설치하지 않고 클라이언트 시스템에서만 rabbitmqadmin을 실행하고 싶다면, 아래 방법을 사용할 수 있다.

(1) RabbitMQ 서버에서 다운로드 후 전송

RabbitMQ 서버에서 rabbitmqadmin을 다운로드한 후, 필요한 시스템으로 scp 또는 rsync를 통해 전송하면 된다.

scp rabbitmqadmin user@client-server:/usr/local/bin/

(2) 클라이언트에서 직접 다운로드

RabbitMQ 서버가 접근 가능한 경우, 클라이언트에서도 직접 wget으로 다운로드할 수 있다.

wget http://<RABBITMQ_SERVER>:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin
sudo mv rabbitmqadmin /usr/local/bin/

3. rabbitmqadmin을 패키지 설치 없이 사용하는 방법

RabbitMQ 서버 전체를 설치하지 않고도 rabbitmqadmin만 다운로드하여 사용 가능하다. 단, RabbitMQ 서버에는 Management Plugin이 활성화되어 있어야 한다. 이 방식은 특히 RabbitMQ 서버가 없는 별도의 모니터링 서버 또는 에이전트에서 메시지를 전송할 때 유용하다.

🛑 RabbitMQ 서버가 설치되지 않은 시스템에서 rabbitmqadmin을 사용하려면, RabbitMQ 서버가 원격에서 접근 가능해야 하며, 사용자 계정과 비밀번호가 필요하다.

4. rabbitmqadmin 사용 예제

(1) 큐 조회

rabbitmqadmin -H <RABBITMQ_SERVER> -u guest -p guest list queues

(2) 메시지 전송

rabbitmqadmin -H <RABBITMQ_SERVER> -u guest -p guest publish exchange=amq.default routing_key=my_queue payload='{"message":"Hello, RabbitMQ!"}'

(3) 메시지 가져오기

rabbitmqadmin -H <RABBITMQ_SERVER> -u guest -p guest get queue=my_queue
  • rabbitmqadmin은 기본적으로 RabbitMQ 서버 패키지에 포함되지 않는다.
  • RabbitMQ Management Plugin이 활성화된 경우, 웹 인터페이스(:15672/cli/rabbitmqadmin)에서 다운로드 가능하다.
  • RabbitMQ 서버가 없어도 rabbitmqadmin을 클라이언트 시스템에 배포하여 사용할 수 있다.
  • RabbitMQ 서버 전체를 설치할 필요 없이 rabbitmqadmin만 다운로드하여 메시지를 송수신할 수 있다.

 

RabbitMQ 서버 없이도 쉽게 관리 가능하므로, 클라이언트 시스템에서만 rabbitmqadmin을 실행하고 싶을 때 유용한 방법이다. RabbitMQ에서 큐와 메시지의 기본 동작 방식과 일반적인 활용 방법이다.

1. RabbitMQ 큐의 기본 동작

RabbitMQ의 큐는 기본적으로 비영구적(Non-Durable)이며, 메시지도 비영속적(Transient)으로 저장된다. 따라서 기본 설정으로 큐를 만들면 RabbitMQ 서버가 재시작될 경우 큐와 메시지가 사라진다.

(1) 큐의 영속성 (Durable Queue)

  • 큐를 영구적으로 유지하려면 큐를 Durable(지속적)하게 생성해야 한다.
  • Durable 큐는 RabbitMQ가 재시작되어도 유지되지만, 메시지는 기본적으로 유지되지 않는다.
channel.queue_declare(queue='task_queue', durable=True)

위와 같이 durable=True 옵션을 주면 큐 자체는 유지된다.

(2) 메시지의 영속성 (Persistent Message)

  • 메시지도 기본적으로 비영속적(Transient)이므로, 메시지가 유실되지 않게 하려면 Persistent Message(지속성 메시지)로 설정해야 한다.
  • 메시지를 지속적으로 저장하려면 delivery_mode=2를 설정해야 한다.
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=2  # 메시지를 영속적으로 저장
    )
)

큐가 Durable이어도 메시지가 Persistent하지 않으면 메시지가 손실될 수 있다.
즉, 메시지를 유지하려면 큐와 메시지 모두 영속적으로 설정해야 한다.

2. 큐에 쌓인 메시지는 어떻게 처리될까?

RabbitMQ에서 메시지가 큐에 쌓이면 Consumer(소비자)가 해당 메시지를 받아서 처리한 후 삭제할 수 있다.

(1) 메시지를 자동으로 삭제할 것인가? (Auto-Ack)

  • 메시지를 소비한 후 자동으로 삭제하려면 auto_ack=True로 설정한다.
  • 하지만, 이 경우 Consumer가 처리 도중 오류가 발생해도 메시지가 삭제되므로 데이터 유실 가능성이 있다.
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=True)
  • 백엔드에서 직접 처리 후 삭제하려면 auto_ack=False로 설정하고, 수동으로 삭제해야 한다.
  • ch.basic_ack(delivery_tag=method.delivery_tag)를 호출해야 RabbitMQ가 해당 메시지를 삭제한다.
def callback(ch, method, properties, body):
    print(f"Received {body}")
    # 메시지 정상 처리 후 RabbitMQ에서 삭제
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

(2) 메시지를 처리하지 못한 경우 재전송 (Requeue)

  • 만약 특정 조건에서 메시지 처리가 실패하면, RabbitMQ에 다시 넣어서 재처리할 수 있다.
  • 이를 위해 ch.basic_nack()를 호출하면 된다.
def callback(ch, method, properties, body):
    try:
        print(f"Processing {body}")
        # 정상 처리 완료
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error: {e}")
        # 메시지 다시 큐로 반환
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

3. 일반적인 RabbitMQ 활용 패턴

RabbitMQ는 다양한 방식으로 활용되며, 주로 다음과 같은 시나리오에서 많이 사용된다.

(1) 작업 큐 (Task Queue)

📌 주요 특징

  • 메시지를 큐에 넣고 여러 개의 Worker가 병렬로 처리하는 구조.
  • Worker가 메시지를 처리하면 RabbitMQ에서 해당 메시지를 삭제.
  • 작업을 분산 처리하여 병목을 줄임.

📌 예제

  • 대량의 이미지 처리, 동영상 변환, 로그 분석 등.

📌 사용 방법

  • Durable 큐 + Persistent 메시지 + 수동 Ack (auto_ack=False)
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

(2) 이벤트 브로커 (Event-driven Architecture)

📌 주요 특징

  • 한 시스템에서 발생한 이벤트를 다른 시스템에서 실시간으로 수신.
  • 여러 Consumer가 동일한 메시지를 구독 가능 (Pub/Sub 패턴).
  • 메시지는 보통 1회 소비 후 삭제.

📌 예제

  • 주문 완료 후, 결제 시스템과 배송 시스템에 동시에 알림 전송.
  • 애플리케이션 로깅 시스템.

📌 사용 방법

  • exchange 사용하여 여러 큐로 메시지 전송.
  • Durable 큐는 선택적으로 사용 가능.
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='New event!')

(3) 지연 메시지 (Delayed Message)

📌 주요 특징

  • 특정 시간 후에 메시지를 소비하도록 설정 가능.
  • RabbitMQ 자체적으로 지원하지 않지만, TTL (Time-To-Live) + Dead Letter Queue (DLQ) 방식으로 구현 가능.

📌 예제

  • 일정 시간 후에 알림 메시지 전송.
  • 재시도 전략 (일정 시간이 지난 후 다시 처리).

📌 사용 방법

  • 메시지의 TTL(Time-To-Live) 설정 후, 만료된 메시지를 DLX(Dead Letter Exchange)로 이동.
args = {"x-message-ttl": 60000, "x-dead-letter-exchange": "retry_exchange"}
channel.queue_declare(queue='delayed_queue', arguments=args)

(4) 로그 저장 (Logging)

📌 주요 특징

  • 로그 데이터를 중앙 시스템에 전송.
  • 여러 Consumer가 로그 데이터를 실시간으로 처리 가능.

📌 예제

  • 애플리케이션 로그를 Kibana, Elasticsearch에 저장.
  • 서버 상태 로그를 중앙 로그 시스템에 전송.

📌 사용 방법

  • Fanout Exchange를 활용하여 여러 Consumer에 메시지를 전송.
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body='[INFO] New log entry')

정리하면,

  1. RabbitMQ의 큐는 기본적으로 비영구적이며, 유지하려면 durable=True 설정 필요.
  2. 메시지도 기본적으로 비영속적이며, 유지하려면 delivery_mode=2 설정 필요.
  3. 큐에 쌓인 메시지는 소비자가 처리해야 삭제됨.
    • 자동 삭제(auto_ack=True) vs. 수동 삭제(auto_ack=False)
  4. 일반적인 활용 방식
    • 작업 큐 (Task Queue): Worker가 메시지를 받아서 작업 후 삭제.
    • 이벤트 브로커 (Pub/Sub): 여러 Consumer가 같은 메시지를 받아서 이벤트 처리.
    • 지연 메시지: 특정 시간이 지난 후 메시지를 소비.
    • 로그 저장: 로그 데이터를 중앙 시스템으로 전송.

RabbitMQ를 어떻게 활용할지에 따라 큐의 설정 방식과 메시지 처리 방식이 달라진다. 대량의 서버 상태 정보를 RabbitMQ로 전송한 후, 백엔드에서 Worker가 메시지를 받아서 처리하는 Task Queue 방식이 가장 적합할 것이다. RabbitMQ의 Management Plugin을 활성화한 후, 초기 관리 콘솔(15672 포트) 접속 정보는 기본적으로 다음과 같다.

  • URL: http://<RABBITMQ_SERVER_IP>:15672/
  • 기본 관리자 계정 (Default Admin Account)
    • 사용자명 (Username): guest
    • 비밀번호 (Password): guest
  • 기본적으로 guest 계정은 localhost에서만 접속 가능하며, 원격 접속을 허용하려면 추가 설정이 필요하다.

1. RabbitMQ 관리 콘솔(15672) 활성화 방법

RabbitMQ의 Management Plugin이 비활성화된 경우, 다음 명령어를 실행하여 활성화할 수 있다.

rabbitmq-plugins enable rabbitmq_management
systemctl restart rabbitmq-server  # 또는 service rabbitmq-server restart

이제 http://<RABBITMQ_SERVER_IP>:15672/ 에 접속하면 RabbitMQ 관리 UI를 사용할 수 있다.

2. 원격 접속 허용 설정

기본적으로 guest 계정은 localhost에서만 로그인할 수 있다. 원격에서 접근하려면 rabbitmq.config 또는 rabbitmq.conf에서 설정을 변경해야 한다.

(1) rabbitmq.conf 수정 (RabbitMQ 3.7 이상)

파일 경로: /etc/rabbitmq/rabbitmq.conf

sudo nano /etc/rabbitmq/rabbitmq.conf

파일에 다음 설정을 추가

loopback_users = none

저장 후 RabbitMQ를 재시작

systemctl restart rabbitmq-server

(2) rabbitmq.config 수정 (구버전, RabbitMQ 3.6 이하)

파일 경로: /etc/rabbitmq/rabbitmq.config

sudo nano /etc/rabbitmq/rabbitmq.config

아래 설정을 추가하거나 수정

[{rabbit, [{loopback_users, []}]}].

설정 반영을 위해 RabbitMQ 재시작

systemctl restart rabbitmq-server

🔹 보안 주의: guest 계정을 원격 접속 가능하게 하면 보안상 취약할 수 있으므로, 별도의 사용자 계정을 생성하는 것이 좋다.

3. 새로운 관리자 계정 생성

보안 강화를 위해 guest 계정 대신 새로운 관리자 계정을 생성하는 것이 권장된다.

(1) RabbitMQ에 새로운 사용자 추가

rabbitmqctl add_user myadmin mysecurepassword

(2) 관리자 권한 부여

rabbitmqctl set_user_tags myadmin administrator

(3) 모든 권한 부여

rabbitmqctl set_permissions -p / myadmin ".*" ".*" ".*"

이제 http://<RABBITMQ_SERVER_IP>:15672/ 에 접속하여 새로운 계정(myadmin)으로 로그인할 수 있다.

4. rabbitmqadmin을 이용한 원격 사용자 계정 추가

만약 CLI를 사용할 수 없거나 원격에서 계정을 추가해야 한다면, rabbitmqadmin을 사용하면 된다.

(1) 새로운 사용자 생성

rabbitmqadmin -H <RABBITMQ_SERVER_IP> -u guest -p guest declare user name="myadmin" password="mysecurepassword" tags="administrator"

(2) 권한 설정

rabbitmqadmin -H <RABBITMQ_SERVER_IP> -u myadmin -p mysecurepassword declare permission vhost="/" user="myadmin" configure=".*" write=".*" read=".*"

이제 새로운 계정을 사용하여 RabbitMQ에 접근할 수 있다.

5. 기본 계정 정보 변경

초기 설정 이후, 기본 guest 계정의 비밀번호를 변경하는 것이 보안상 좋다.

rabbitmqctl change_password guest newsecurepassword

또는, guest 계정을 비활성화하려면 다음 명령을 실행하면 된다.

rabbitmqctl delete_user guest
  • 기본 관리자 계정: guest / guest (localhost 전용)
  • 15672 포트 활성화: rabbitmq-plugins enable rabbitmq_management
  • 원격 로그인 허용: loopback_users = none 설정 변경
  • 새로운 관리자 계정 생성: rabbitmqctl add_user myadmin mysecurepassword
  • 보안 강화: 기본 guest 계정 비활성화 또는 비밀번호 변경

 

이제 RabbitMQ 관리 콘솔(15672)에 접속하여 관리할 수 있다.

n8n에서 RabbitMQ를 활용한 크롤링 고도화 방안

n8n에서 다양한 사이트에서 크롤링을 수행하고 Google Sheets에 기록하며 변경사항을 Slack으로 알림하는 프로세스를 RabbitMQ를 활용하여 더욱 안정적이고 확장 가능하게 개선하는 방안을 제안한다.

RabbitMQ를 도입하는 이유

  1. 비동기 처리 및 확장성 확보
    • 크롤링할 사이트가 많아질 경우 n8n 내에서 직접 순차적으로 처리하면 속도가 느려질 수 있음.
    • RabbitMQ를 활용하여 크롤링 작업을 비동기 분산 처리하면 병렬적으로 작업을 수행할 수 있음.
  2. 작업 분배 및 부하 제어
    • 여러 크롤링 작업을 큐에 넣고 Worker 노드(n8n 워크플로우)를 분산 배치하여 실행 가능.
    • 특정 사이트에서 크롤링 속도 제한이 있을 경우, Worker 노드 개수를 조절하여 부하를 관리할 수 있음.
  3. 에러 내구성 및 재시도 처리
    • 크롤링 중 장애 발생 시 RabbitMQ의 Dead Letter Queue(DLQ)를 이용하여 실패한 작업을 재처리할 수 있음.
    • n8n에서 실패한 작업을 감지하여 다시 큐에 넣을 수도 있음.
  4. 변경 사항 감지 및 이벤트 기반 처리
    • 크롤링 데이터의 변경 사항을 감지하는 역할을 이벤트 방식으로 분리하여 Slack 알림을 빠르게 보낼 수 있음.

아키텍처 구성

[크롤링 작업 트리거]
    ⬇ (사이트 리스트 전달)
[RabbitMQ Queue]
    ⬇ (Worker 노드에서 작업 분배)
[n8n Worker Nodes (크롤링 실행)]
    ⬇ (데이터 저장)
[Google Sheets 기록]
    ⬇ (이전 데이터와 비교)
[변경 사항 감지 및 Slack 알림]

세부 구현 방안

1. RabbitMQ 큐를 활용한 크롤링 작업 분배

  • n8n에서 크롤링할 사이트 리스트를 큐에 추가
  • RabbitMQ를 사용하여 Worker n8n 인스턴스에서 개별 크롤링 작업 실행

📌 RabbitMQ Producer (n8n)

  • n8n에서 HTTP Request Node 또는 AMQP Sender Node를 활용하여 RabbitMQ에 크롤링 작업을 큐에 등록
  • JSON 형식으로 크롤링할 URL과 파라미터를 포함하여 메시지를 발행
    { "url": "https://example.com/data", "params": { "category": "tech" }, "timestamp": "2025-04-03T12:00:00Z" }

📌 RabbitMQ Worker (n8n)

  • AMQP Trigger Node를 사용하여 큐에서 메시지를 수신
  • 메시지를 기반으로 크롤링 실행 후 Google Sheets에 데이터 업데이트

 

2. 크롤링 결과 저장 및 비교

  • n8n에서 크롤링된 데이터를 Google Sheets에 저장
  • Google Sheets Read Node를 사용하여 기존 데이터를 가져와 비교
  • 변경된 데이터가 있을 경우, Slack으로 알림

🔹 변경사항 비교 예시

  1. 크롤링 결과를 Google Sheets에서 읽기
  2. 기존 데이터와 새로운 데이터를 비교
  3. 변경 사항이 있으면 Slack 알림

3. 에러 및 재시도 처리

  • 크롤링 실패 시 RabbitMQ의 Dead Letter Queue (DLQ)를 활용하여 실패한 요청을 저장
  • n8n에서 일정 주기로 DLQ를 확인하고 실패한 요청을 재처리

RabbitMQ 기반 고도화 워크플로우 예시

1. 크롤링 요청 등록 (n8n)

  • 주기적으로 실행되는 트리거 (Cron Node)
  • 사이트 리스트를 가져와 RabbitMQ에 메시지 전송 (AMQP Sender Node)

2. 크롤링 작업 실행 (n8n Worker)

  • AMQP Trigger Node로 RabbitMQ에서 크롤링 요청 수신
  • 크롤링 수행 후 결과를 Google Sheets에 저장
  • 기존 데이터와 비교 후 변경사항이 있을 경우 Slack 알림

3. 장애 및 재시도 처리

  • 크롤링 실패 시 DLQ로 이동
  • 일정 주기로 DLQ를 확인하여 실패한 작업을 재시도

기대 효과

  1. 크롤링 병렬 처리 → 여러 사이트에서 동시에 크롤링 가능
  2. 부하 조절 가능 → RabbitMQ의 Worker 수를 조절하여 크롤링 속도 제어
  3. 실패 내구성 강화 → DLQ를 통해 크롤링 실패 시 자동 재시도
  4. 변경 사항을 실시간으로 감지 → Slack 알림을 빠르게 전송하여 업무 흐름 개선

RabbitMQ 설치 및 활용

RabbitMQ는 Docker를 활용하여 쉽게 설치할 수 있다.

docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin \
  rabbitmq:3-management
  • 5672 포트: AMQP 통신
  • 15672 포트: 관리 UI (http://localhost:15672)

n8n에서는 AMQP Sender NodeAMQP Trigger Node를 사용하여 RabbitMQ와 연동하면 된다.

 

RabbitMQ를 활용하여 n8n의 크롤링 워크플로우를 비동기 및 분산 처리 방식으로 개선하면 확장성, 안정성, 효율성이 증가한다.

  • 크롤링 사이트 증가 시에도 성능 저하 없이 확장 가능
  • 장애 발생 시 자동 재시도하여 데이터 손실 방지
  • 변경사항 감지를 이벤트 방식으로 처리하여 Slack 알림을 빠르게 전송

이 방식으로 크롤링 프로세스를 더욱 견고하고 효율적으로 운영할 수 있다.

RabbitMQ vs Mosquitto 비교 분석

RabbitMQ와 Mosquitto는 모두 메시지 브로커(Message Broker) 역할을 하지만, 사용 목적과 내부 동작 방식에서 차이가 있다. Mosquitto는 MQTT 프로토콜을 기반으로 하는 반면, RabbitMQ는 AMQP 프로토콜을 기본으로 사용하며 다양한 프로토콜을 지원할 수 있다.

1. 유사점 (공통점)

항목 RabbitMQ Mosquitto
메시지 브로커 역할 ✅ 가능 ✅ 가능
비동기 메시징 지원 ✅ 지원 ✅ 지원
Pub/Sub (Publish-Subscribe) 지원 ✅ 지원 ✅ 지원
QoS(Quality of Service) 보장 ✅ 가능 (ACK/NACK 사용) ✅ 가능 (MQTT QoS 사용)
클러스터링 및 확장성 ✅ 클러스터 및 Federation 지원 ✅ 클러스터 지원 (제한적)
경량 메시지 전송 가능 ✅ 가능 ✅ 가능
오픈소스 ✅ 오픈소스 ✅ 오픈소스
  • 두 시스템 모두 비동기 메시징을 처리하는 메시지 브로커 역할을 수행하며, Publish-Subscribe 모델을 지원합니다.
  • QoS(서비스 품질 보장) 기능을 제공하여 메시지 유실 방지를 위한 다양한 전달 전략을 가질 수 있습니다.

2. 차이점 (RabbitMQ vs Mosquitto 차이점 비교)

비교 항목 RabbitMQ Mosquitto
프로토콜 AMQP (Advanced Message Queuing Protocol) 기본 지원, 추가적으로 MQTT, STOMP, HTTP, WebSocket 등 지원 가능 MQTT (Message Queue Telemetry Transport) 전용
메시지 영속성 (Persistence) 디스크 기반 메시지 영속성 지원 기본적으로 메시지 영속성 없음 (하지만 설정 가능)
트랜잭션 지원 지원 (Publisher Confirms, Transactions) 제한적 지원 (MQTT 5.0에서 개선)
메시지 보존 방식 큐 기반 저장, Exchange와 Queue를 연결하여 라우팅 메시지는 클라이언트가 구독 중일 때만 전송되며 기본적으로 메시지 저장 없음
QoS 수준 1) 메시지 확인(Acknowledgment), 2) Requeue 기능 제공 1) QoS 0(최소 보장), 2) QoS 1(최소 1회 전달), 3) QoS 2(한 번만 전달)
스케일링(Scaling) 클러스터링, Federation, Shovel 등 다양한 확장 지원 클러스터 지원은 있지만 제한적
사용 용도 엔터프라이즈급 메시징 (마이크로서비스 통신, 분산 시스템, 대규모 메시징 처리) IoT 및 저전력 장치 메시징 (센서 데이터, IoT 디바이스 간 통신)
보안 TLS, 인증/권한 관리 제공 TLS 지원, 기본적인 인증 가능
사용자 수 수백~수천 개의 클라이언트 처리 가능 수백만 개의 IoT 장치 연결 가능
성능 메시지 큐 관리로 인해 상대적으로 무겁지만 강력한 기능 제공 경량 메시지 처리로 매우 빠르고 저전력 환경에 적합

3. 활용 측면에서의 비교

✅ RabbitMQ 활용 사례

RabbitMQ는 AMQP 프로토콜을 사용하며, 대규모 분산 시스템, 마이크로서비스, 금융거래, 클라우드 서비스 등에 최적화되어 있다.

🔹 사용 사례

  • 마이크로서비스 간 통신 (API Gateway ↔ 백엔드 서비스)
  • 분산 시스템 이벤트 처리 (이벤트 기반 아키텍처)
  • 금융/거래 시스템 (주문 및 결제 처리)
  • 로그 및 알림 시스템 (분산 환경에서 로그 수집 및 분석)
  • 백그라운드 작업 처리 (이메일 전송, 대량 데이터 처리)

🔹 예제

  • 전자상거래 시스템에서 주문이 들어오면 RabbitMQ를 통해 백엔드 시스템과 결제 시스템이 메시지를 주고받으며 실시간으로 처리.

 

✅ Mosquitto 활용 사례

Mosquitto는 MQTT 프로토콜을 기반으로 하는 경량 메시지 브로커로, IoT(Internet of Things) 및 센서 데이터 전송에 최적화되어 있다.

🔹 사용 사례

  • IoT 기기 간 메시징 (스마트 홈, 센서 네트워크)
  • 실시간 데이터 전송 (온도 센서, 기계 상태 모니터링)
  • 스마트 농업 (환경 센서 데이터 송신)
  • 스마트 자동차 시스템 (차량 상태 데이터 공유)
  • 의료 기기 네트워크 (환자 모니터링 시스템)

🔹 예제

  • 스마트 홈 시스템에서 온도 센서Mosquitto를 통해 MQTT 메시지를 전송하고, 클라이언트가 이를 구독하여 실시간으로 온도를 표시.

4. RabbitMQ vs Mosquitto 선택 기준

사용 목적 추천 메시지 브로커 이유
마이크로서비스 통신 RabbitMQ AMQP 기반으로 메시지 큐 관리 기능이 강력함
대규모 메시징 시스템 RabbitMQ 엔터프라이즈급 메시징 시스템에 적합
IoT 디바이스 간 통신 Mosquitto MQTT 프로토콜을 기반으로 하여 저전력 & 경량 메시징에 적합
센서 데이터 전송 Mosquitto 수많은 IoT 장치가 연결 가능
비동기 이벤트 기반 아키텍처 RabbitMQ 다양한 라우팅 및 확장 기능 지원
대량의 작은 메시지 처리 Mosquitto 가벼운 메시지 브로커로 성능이 뛰어남
보안 및 트랜잭션이 중요한 경우 RabbitMQ 보안 인증, 트랜잭션, 메시지 보존 기능 지원

RabbitMQ와 Mosquitto는 모두 메시징 시스템을 지원하지만 사용 목적과 환경이 다릅니다.

  • RabbitMQ대규모 마이크로서비스 및 엔터프라이즈 메시징에 적합하며, 강력한 메시지 라우팅, 영속성, 보안, 확장성을 제공합니다.
  • Mosquitto경량 메시징 솔루션으로 IoT 및 센서 데이터 전송에 최적화되어 있으며, 저전력 장치 및 실시간 메시징에 적합합니다.

👉 선택 가이드:

IoT 센서 네트워크, 스마트 홈, 경량 메시징 → Mosquitto

  • 마이크로서비스, 금융, 백엔드 분산 처리, 안정적인 메시징 큐RabbitMQ

 

즉, Mosquitto는 저전력 IoT 환경에 최적화된 메시지 브로커, RabbitMQ는 엔터프라이즈급 메시지 처리 및 복잡한 워크플로우에 강한 브로커라고 이해하면 됩니다.

728x90

댓글