프로그램 (PHP,Python)

Django와 Celery 사용한 비동기 및 주기적인 작업 스케줄링

날으는물고기 2024. 10. 28. 00:50

Django와 Celery를 함께 사용하면 비동기 작업 및 주기적인 작업 스케줄링이 가능합니다. Celery는 작업 큐(Task Queue) 시스템으로, 백그라운드에서 비동기적으로 실행될 수 있는 작업을 처리하는 데 매우 유용합니다. Django와 함께 Celery를 설정하고 사용하는 방법입니다.

1. Celery 설치

Celery와 필요한 추가 패키지를 설치합니다.

pip install celery
pip install django-celery-beat  # 주기적인 작업을 위해

2. Django 프로젝트에 Celery 설정

프로젝트 루트 디렉토리에 celery.py 파일을 생성하고 다음과 같이 설정합니다.

# project/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django의 설정 파일을 Celery가 사용하도록 등록합니다.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')

# Django의 설정 파일에서 정의된 모든 Celery 관련 설정을 가져옵니다.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Django의 모든 등록된 task 모듈을 로드합니다.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

3. Django 설정에 Celery 설정 추가

Django 설정 파일 (settings.py)에 Celery 설정을 추가합니다.

# settings.py

CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 브로커 URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

# django-celery-beat 설정 (주기적인 작업을 위한)
INSTALLED_APPS += ['django_celery_beat']

4. Celery 작업(Task) 생성

Celery 작업을 정의하려면 Django 앱 폴더에 tasks.py 파일을 생성하고 작업을 정의합니다.

# your_app/tasks.py

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

5. Celery Worker와 Beat 시작

Celery Worker와 Beat를 시작하여 작업을 처리하고 주기적인 작업을 실행합니다.

# Celery Worker 시작
celery -A your_project worker --loglevel=info

# Celery Beat 시작 (주기적인 작업 스케줄링)
celery -A your_project beat --loglevel=info

6. 주기적인 작업 등록

주기적인 작업을 등록하려면 Django Admin에서 django-celery-beat의 Periodic Task를 사용하거나, 코드를 통해 등록할 수 있습니다. 예를 들어, 매 분마다 add 작업을 실행하도록 등록하는 코드는 다음과 같습니다.

from django_celery_beat.models import PeriodicTask, CrontabSchedule

# 매 분마다 실행되는 Crontab 스케줄 생성
schedule, created = CrontabSchedule.objects.get_or_create(minute='*', hour='*')

# Periodic Task 생성
PeriodicTask.objects.create(
    crontab=schedule,
    name='Add numbers every minute',
    task='your_app.tasks.add',
    args=json.dumps([1, 2]),
)

추가 고려 사항

  • 모니터링 및 관리: Flower와 같은 Celery 모니터링 도구를 사용하여 작업 상태를 모니터링할 수 있습니다.
    pip install flower
    celery -A your_project flower
  • 재시도 로직: 작업 실패 시 재시도 로직을 추가하여 안정성을 높일 수 있습니다.
    @shared_task(bind=True, default_retry_delay=300, max_retries=5)
    def add(self, x, y):
        try:
            return x + y
        except Exception as exc:
            raise self.retry(exc=exc)
  • 장기 실행 작업: 장기 실행 작업을 처리할 때는 작업이 워커를 과도하게 점유하지 않도록 주의해야 합니다. 적절한 타임아웃 설정과 자원 관리를 고려합니다.

 

이와 같이 Django와 Celery를 통합하여 비동기 작업과 주기적인 작업을 처리할 수 있습니다. 필요한 경우 추가적인 설정과 최적화를 통해 성능과 안정성을 높일 수 있습니다.

Elasticsearch(ES)에서 특정 조건의 이벤트를 체크하고, 탐지되는 경우 관련된 추가적인 정보를 다양한 API 등을 통해 수집하여 종합적인 내용을 Slack으로 전달하는 자동화 시스템을 구축하는 예제입니다. 이 예제는 Python을 사용하여 ElasticSearch와 Slack API를 통합하고 Celery를 사용하여 비동기 작업을 처리하는 방식으로 구현됩니다.

1. 필요 패키지 설치

필요한 Python 패키지를 설치합니다.

pip install elasticsearch
pip install requests
pip install celery
pip install slack_sdk
pip install schedule

2. ElasticSearch에서 조건을 체크하는 Python 스크립트

ElasticSearch에서 특정 조건의 이벤트를 체크하고 탐지하는 스크립트를 작성합니다.

# es_check.py
from elasticsearch import Elasticsearch
import requests
import json

es = Elasticsearch(['http://localhost:9200'])  # ElasticSearch 서버 주소

def check_events():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "specific_condition"}}  # 특정 조건
                ]
            }
        }
    }

    response = es.search(index="your_index", body=query)
    events = response['hits']['hits']

    return events

def get_additional_info(event):
    # 여기서 event에 대한 추가 정보를 수집하는 API 호출 예시
    response = requests.get("http://api.example.com/info", params={"id": event['_id']})
    return response.json()

def send_to_slack(message):
    webhook_url = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
    slack_data = {'text': message}

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(
            'Request to slack returned an error %s, the response is:\n%s'
            % (response.status_code, response.text)
        )

def process_events():
    events = check_events()
    for event in events:
        additional_info = get_additional_info(event)
        message = f"Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

if __name__ == "__main__":
    process_events()

3. Celery 설정 및 작업 정의

Django 프로젝트와 Celery를 사용하여 작업을 비동기적으로 처리하도록 설정합니다.

# your_project/celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Celery 작업을 정의합니다.

# your_app/tasks.py
from celery import shared_task
from es_check import process_events

@shared_task
def check_and_notify_events():
    process_events()

4. 주기적인 작업 스케줄링

Celery Beat를 사용하여 주기적인 작업을 스케줄링합니다.

# settings.py
INSTALLED_APPS += ['django_celery_beat']

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

Django Admin에서 주기적인 작업을 설정하거나 초기화 스크립트를 작성합니다.

# your_app/initial_setup.py
from django_celery_beat.models import PeriodicTask, CrontabSchedule

schedule, created = CrontabSchedule.objects.get_or_create(minute='*/5')  # 매 5분마다 실행
PeriodicTask.objects.create(
    crontab=schedule,
    name='Check and Notify Events',
    task='your_app.tasks.check_and_notify_events',
)

5. Celery Worker와 Beat 시작

Celery Worker와 Beat를 시작하여 작업을 처리하고 주기적인 작업을 실행합니다.

# Celery Worker 시작
celery -A your_project worker --loglevel=info

# Celery Beat 시작 (주기적인 작업 스케줄링)
celery -A your_project beat --loglevel=info

종합적인 설명

  1. ElasticSearch 쿼리: check_events 함수에서 특정 조건을 만족하는 이벤트를 ElasticSearch에서 쿼리합니다.
  2. 추가 정보 수집: get_additional_info 함수에서 각 이벤트에 대해 추가적인 정보를 외부 API를 통해 수집합니다.
  3. Slack 알림: send_to_slack 함수에서 수집된 정보를 Slack 웹훅을 통해 알림으로 보냅니다.
  4. Celery 작업: process_events 함수에서 이벤트를 처리하고 알림을 보내는 작업을 Celery에서 주기적으로 실행하도록 설정합니다.
  5. 주기적 실행: Celery Beat를 사용하여 주기적으로 check_and_notify_events 작업을 실행하여 자동으로 운영되도록 합니다.

이와 같이 구성하면 ElasticSearch에서 특정 조건의 이벤트를 탐지하고, 관련된 추가 정보를 수집하여 Slack으로 알림을 보내는 자동화 시스템을 구축할 수 있습니다.

여러 유형의 작업을 등록하고 수행할 수 있도록 Celery와 Django를 구성하는 방법입니다. 이를 통해 다양한 조건에 맞는 여러 작업을 정의하고, 각 작업이 주기적으로 실행되도록 설정할 수 있습니다.

1. 다양한 작업 정의

여러 유형의 작업을 Celery 작업으로 정의합니다. 각 작업은 ElasticSearch에서 조건을 체크하고, 필요한 추가 정보를 수집한 후, Slack으로 알림을 보낼 수 있습니다.

# your_app/tasks.py
from celery import shared_task
from elasticsearch import Elasticsearch
import requests
import json

es = Elasticsearch(['http://localhost:9200'])  # ElasticSearch 서버 주소

def send_to_slack(message):
    webhook_url = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
    slack_data = {'text': message}

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(
            'Request to slack returned an error %s, the response is:\n%s'
            % (response.status_code, response.text)
        )

def get_additional_info(event, api_url):
    response = requests.get(api_url, params={"id": event['_id']})
    return response.json()

def check_events(query):
    response = es.search(index="your_index", body=query)
    events = response['hits']['hits']
    return events

@shared_task
def check_and_notify_type1():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type1"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type1")
        message = f"Type1 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

@shared_task
def check_and_notify_type2():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type2"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type2")
        message = f"Type2 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

2. 주기적인 작업 스케줄링

여러 작업을 Celery Beat를 사용하여 주기적으로 실행하도록 설정합니다. Django Admin을 사용하여 주기적인 작업을 등록할 수도 있지만, 초기화 스크립트를 통해 등록하는 방법을 예시로 들겠습니다.

# your_app/initial_setup.py
from django_celery_beat.models import PeriodicTask, CrontabSchedule
from django.utils.timezone import now

# 매 5분마다 실행되는 스케줄
schedule_type1, created = CrontabSchedule.objects.get_or_create(minute='*/5')
PeriodicTask.objects.create(
    crontab=schedule_type1,
    name='Check and Notify Type1 Events',
    task='your_app.tasks.check_and_notify_type1',
    start_time=now(),
)

# 매 10분마다 실행되는 스케줄
schedule_type2, created = CrontabSchedule.objects.get_or_create(minute='*/10')
PeriodicTask.objects.create(
    crontab=schedule_type2,
    name='Check and Notify Type2 Events',
    task='your_app.tasks.check_and_notify_type2',
    start_time=now(),
)

3. Celery Worker와 Beat 시작

Celery Worker와 Beat를 시작하여 작업을 처리하고 주기적인 작업을 실행합니다.

# Celery Worker 시작
celery -A your_project worker --loglevel=info

# Celery Beat 시작 (주기적인 작업 스케줄링)
celery -A your_project beat --loglevel=info

이와 같이 구성하면 ElasticSearch에서 다양한 조건의 이벤트를 탐지하고, 각각에 대해 추가 정보를 수집한 후, Slack으로 알림을 보내는 여러 작업을 주기적으로 수행할 수 있습니다. 각 작업은 독립적으로 실행되므로 필요에 따라 다양한 조건을 처리할 수 있습니다.

 

Django의 settings.py 파일에 Celery 및 주기적인 작업 관련 설정을 추가하는 방법입니다. 이 설정은 Celery 작업과 주기적인 작업을 관리하는 데 도움이 됩니다.

1. Celery 기본 설정

settings.py 파일에 Celery와 관련된 설정을 추가합니다. 이를 통해 Celery가 Django 프로젝트와 올바르게 통합되도록 합니다.

# settings.py

INSTALLED_APPS += ['django_celery_beat']  # django_celery_beat 추가

# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 브로커 URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Redis 결과 백엔드
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

2. Celery 작업 스케줄링 설정

django-celery-beat를 사용하여 주기적인 작업을 관리할 수 있습니다. settings.py 파일에서 주기적인 작업과 관련된 설정을 추가하거나, 초기화 스크립트를 작성하여 주기적인 작업을 설정할 수 있습니다.

3. 초기화 스크립트 작성

주기적인 작업을 설정하기 위해 초기화 스크립트를 작성할 수 있습니다. 이 스크립트는 Django 관리 명령어를 사용하여 실행할 수 있습니다. 주기적인 작업을 settings.py가 아닌 별도의 초기화 스크립트에 정의하는 것이 일반적입니다.

 

예를 들어, initial_setup.py 파일을 작성하여 주기적인 작업을 설정합니다.

# your_app/initial_setup.py

from django_celery_beat.models import PeriodicTask, CrontabSchedule
from django.utils.timezone import now

# 매 5분마다 실행되는 스케줄
schedule_type1, created = CrontabSchedule.objects.get_or_create(minute='*/5')
PeriodicTask.objects.create(
    crontab=schedule_type1,
    name='Check and Notify Type1 Events',
    task='your_app.tasks.check_and_notify_type1',
    start_time=now(),
)

# 매 10분마다 실행되는 스케줄
schedule_type2, created = CrontabSchedule.objects.get_or_create(minute='*/10')
PeriodicTask.objects.create(
    crontab=schedule_type2,
    name='Check and Notify Type2 Events',
    task='your_app.tasks.check_and_notify_type2',
    start_time=now(),
)

이 스크립트를 Django의 manage.py를 통해 실행하여 주기적인 작업을 설정합니다.

python manage.py shell < your_app/initial_setup.py

4. Celery 애플리케이션 설정

Celery 애플리케이션을 정의하는 파일(celery.py)을 프로젝트 루트 디렉토리에 생성합니다.

# your_project/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django의 설정 파일을 Celery가 사용하도록 등록합니다.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')

# Django의 설정 파일에서 정의된 모든 Celery 관련 설정을 가져옵니다.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Django의 모든 등록된 task 모듈을 로드합니다.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

5. Celery 작업 정의

여러 유형의 작업을 Celery 작업으로 정의합니다.

# your_app/tasks.py

from celery import shared_task
from elasticsearch import Elasticsearch
import requests
import json

es = Elasticsearch(['http://localhost:9200'])  # ElasticSearch 서버 주소

def send_to_slack(message):
    webhook_url = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
    slack_data = {'text': message}

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(
            'Request to slack returned an error %s, the response is:\n%s'
            % (response.status_code, response.text)
        )

def get_additional_info(event, api_url):
    response = requests.get(api_url, params={"id": event['_id']})
    return response.json()

def check_events(query):
    response = es.search(index="your_index", body=query)
    events = response['hits']['hits']
    return events

@shared_task
def check_and_notify_type1():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type1"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type1")
        message = f"Type1 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

@shared_task
def check_and_notify_type2():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type2"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type2")
        message = f"Type2 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

6. Celery Worker와 Beat 시작

Celery Worker와 Beat를 시작하여 작업을 처리하고 주기적인 작업을 실행합니다. Celery의 CELERY_BEAT_SCHEDULE 변수를 사용하여 주기적인 작업을 settings.py 파일에 직접 등록할 수도 있습니다. 이렇게 하면 별도의 초기화 스크립트를 작성하지 않아도 됩니다. CELERY_BEAT_SCHEDULE에 여러 작업을 등록하여 다양한 조건에 맞는 작업을 주기적으로 실행할 수 있습니다.

 

아래는 settings.py 파일에서 CELERY_BEAT_SCHEDULE 변수를 사용하여 여러 작업을 등록하는 예제입니다.

1. Celery 설정 및 CELERY_BEAT_SCHEDULE 정의

# settings.py

INSTALLED_APPS += ['django_celery_beat']  # django_celery_beat 추가

# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 브로커 URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Redis 결과 백엔드
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

# 주기적인 작업 스케줄 설정
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'check-and-notify-type1': {
        'task': 'your_app.tasks.check_and_notify_type1',
        'schedule': crontab(minute='*/5'),  # 매 5분마다 실행
        'args': (),
    },
    'check-and-notify-type2': {
        'task': 'your_app.tasks.check_and_notify_type2',
        'schedule': crontab(minute='*/10'),  # 매 10분마다 실행
        'args': (),
    },
}

2. Celery 애플리케이션 설정

Celery 애플리케이션을 정의하는 파일(celery.py)을 프로젝트 루트 디렉토리에 생성합니다.

# your_project/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django의 설정 파일을 Celery가 사용하도록 등록합니다.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')

# Django의 설정 파일에서 정의된 모든 Celery 관련 설정을 가져옵니다.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Django의 모든 등록된 task 모듈을 로드합니다.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

3. Celery 작업 정의

여러 유형의 작업을 Celery 작업으로 정의합니다.

# your_app/tasks.py

from celery import shared_task
from elasticsearch import Elasticsearch
import requests
import json

es = Elasticsearch(['http://localhost:9200'])  # ElasticSearch 서버 주소

def send_to_slack(message):
    webhook_url = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
    slack_data = {'text': message}

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(
            'Request to slack returned an error %s, the response is:\n%s'
            % (response.status_code, response.text)
        )

def get_additional_info(event, api_url):
    response = requests.get(api_url, params={"id": event['_id']})
    return response.json()

def check_events(query):
    response = es.search(index="your_index", body=query)
    events = response['hits']['hits']
    return events

@shared_task
def check_and_notify_type1():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type1"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type1")
        message = f"Type1 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

@shared_task
def check_and_notify_type2():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type2"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type2")
        message = f"Type2 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

4. Celery Worker와 Beat 시작

Celery Worker와 Beat를 시작하여 작업을 처리하고 주기적인 작업을 실행합니다. 이와 같이 구성하면 settings.py 파일에서 직접 주기적인 작업을 정의하고 관리할 수 있습니다. 이를 통해 ElasticSearch에서 다양한 조건의 이벤트를 탐지하고, 각각에 대해 추가 정보를 수집한 후, Slack으로 알림을 보내는 여러 작업을 주기적으로 수행할 수 있습니다.

Celery는 분산 작업 큐를 위한 강력한 비동기 작업 시스템으로, 작업을 백그라운드에서 실행할 수 있습니다. Celery를 사용하면 작업을 여러 워커(worker) 프로세스에 분산하여 실행할 수 있으며, 주기적인 작업을 스케줄링할 수 있습니다. 이 과정에서 Celery Beat와 Celery Worker는 각각 중요한 역할을 합니다.

Celery Worker

Celery Worker는 Celery가 실행할 작업을 실제로 처리하는 프로세스입니다. 워커는 큐에서 작업을 가져와 이를 실행하고 결과를 처리합니다. 워커는 다수의 인스턴스로 구성할 수 있으며, 각각이 독립적으로 실행되어 작업을 병렬로 처리합니다.

  • 작업 큐에서 작업을 가져와 실행합니다.
  • 작업의 성공, 실패, 재시도 등의 상태를 관리합니다.

Celery Beat

Celery Beat는 주기적인 작업을 스케줄링하는 스케줄러 프로세스입니다. Beat는 주기적으로 실행해야 하는 작업을 정의하고, 이 작업들을 적절한 시간에 Celery Worker에게 전달합니다.

  • 주기적인 작업을 정의하고 스케줄링합니다.
  • 설정된 주기에 따라 작업을 Celery Worker에게 전달합니다.

Celery Worker와 Beat의 연관성

Celery Beat는 주기적인 작업의 스케줄러 역할을 하며, 이 작업들을 적절한 시간에 Celery Worker에게 전달합니다. Celery Worker는 큐에서 작업을 받아 이를 실행합니다. 둘은 서로 독립적으로 실행되지만, 주기적인 작업을 처리하는 과정에서 협력합니다.

Celery의 다른 구성 요소

  1. Broker: 작업 메시지를 전송하고 큐를 관리하는 역할을 합니다. 일반적으로 Redis나 RabbitMQ를 브로커로 사용합니다. Celery Worker는 브로커에서 작업 메시지를 가져옵니다.
  2. Result Backend: 작업 결과를 저장하고 관리하는 역할을 합니다. Redis, RabbitMQ, 데이터베이스 등을 사용할 수 있습니다.
  3. Flower: Celery 클러스터를 모니터링하고 관리하기 위한 웹 기반 툴입니다. 작업 상태, 워커 상태 등을 실시간으로 모니터링할 수 있습니다.

Celery의 주요 구성 요소 간의 관계

  1. 브로커: 작업을 전달하는 중간 매체.
  2. Celery Beat: 주기적인 작업을 브로커에게 전달.
  3. Celery Worker: 브로커에서 작업을 가져와 실행.
  4. Result Backend: 작업 결과를 저장.

다음은 Celery Worker와 Beat를 설정하고 실행하는 예시입니다.

# your_project/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django의 설정 파일을 Celery가 사용하도록 등록합니다.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')

# Django의 설정 파일에서 정의된 모든 Celery 관련 설정을 가져옵니다.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Django의 모든 등록된 task 모듈을 로드합니다.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

Celery 설정 추가

# settings.py

INSTALLED_APPS += ['django_celery_beat']

# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 브로커 URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Redis 결과 백엔드
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

# 주기적인 작업 스케줄 설정
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'check-and-notify-type1': {
        'task': 'your_app.tasks.check_and_notify_type1',
        'schedule': crontab(minute='*/5'),  # 매 5분마다 실행
        'args': (),
    },
    'check-and-notify-type2': {
        'task': 'your_app.tasks.check_and_notify_type2',
        'schedule': crontab(minute='*/10'),  # 매 10분마다 실행
        'args': (),
    },
}

작업 정의

# your_app/tasks.py

from celery import shared_task
from elasticsearch import Elasticsearch
import requests
import json

es = Elasticsearch(['http://localhost:9200'])

def send_to_slack(message):
    webhook_url = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
    slack_data = {'text': message}

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(
            'Request to slack returned an error %s, the response is:\n%s'
            % (response.status_code, response.text)
        )

def get_additional_info(event, api_url):
    response = requests.get(api_url, params={"id": event['_id']})
    return response.json()

def check_events(query):
    response = es.search(index="your_index", body=query)
    events = response['hits']['hits']
    return events

@shared_task
def check_and_notify_type1():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type1"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type1")
        message = f"Type1 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

@shared_task
def check_and_notify_type2():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type2"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type2")
        message = f"Type2 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

이와 같이 Celery의 다양한 구성 요소를 활용하여 분산 작업 큐 시스템을 구축하고 관리할 수 있습니다. Celery는 Python으로 작성된 범용 작업 큐이며, Django와 잘 통합되어 사용할 수 있습니다. Django는 Celery와 직접 통합하여 비동기 작업을 처리하고 주기적인 작업을 스케줄링할 수 있는 기능을 제공합니다.

Celery와 Django의 관계

  • 별도 라이브러리: Celery는 Python 애플리케이션에서 비동기 작업 및 주기적인 작업을 처리하기 위한 독립적인 라이브러리입니다.
  • Django와의 통합: Django 프로젝트와 쉽게 통합할 수 있는 다양한 도구와 패턴을 제공합니다. Django 프로젝트 내에서 Celery를 설정하고, Django의 ORM, 설정, 시그널 등을 활용하여 작업을 정의하고 관리할 수 있습니다.
  • 유연성: Celery는 Django뿐만 아니라 Flask, Pyramid, Tornado 등 다양한 Python 웹 프레임워크와 통합하여 사용할 수 있습니다.

주요 구성 요소

  1. Celery: 비동기 작업 및 주기적인 작업을 처리하는 핵심 라이브러리.
  2. Celery Worker: Celery가 관리하는 작업을 실제로 실행하는 프로세스.
  3. Celery Beat: 주기적인 작업을 스케줄링하는 스케줄러.
  4. Broker: 작업 메시지를 전달하는 중간 매체(예: Redis, RabbitMQ).
  5. Result Backend: 작업 결과를 저장하는 매체(예: Redis, 데이터베이스).

Django와 Celery의 통합 예제

다시 한번 Django 프로젝트에 Celery를 설정하고 사용하는 예제를 정리해 보겠습니다.

 

1. Celery 설치

pip install celery django-celery-beat redis

2. Django 설정 파일에 Celery 설정 추가

settings.py 파일에 Celery 관련 설정을 추가합니다.

# settings.py

INSTALLED_APPS += ['django_celery_beat']

# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # Redis 브로커 URL
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'  # Redis 결과 백엔드
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

# 주기적인 작업 스케줄 설정
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'check-and-notify-type1': {
        'task': 'your_app.tasks.check_and_notify_type1',
        'schedule': crontab(minute='*/5'),  # 매 5분마다 실행
        'args': (),
    },
    'check-and-notify-type2': {
        'task': 'your_app.tasks.check_and_notify_type2',
        'schedule': crontab(minute='*/10'),  # 매 10분마다 실행
        'args': (),
    },
}

3. Celery 애플리케이션 설정

프로젝트 루트 디렉토리에 celery.py 파일을 생성합니다.

# your_project/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django 설정 파일을 Celery가 사용하도록 등록
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')

# Django 설정 파일에서 Celery 관련 설정을 가져옵니다.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Django의 모든 등록된 task 모듈을 로드합니다.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

4. Celery 작업 정의

tasks.py 파일을 생성하고 작업을 정의합니다.

# your_app/tasks.py

from celery import shared_task
from elasticsearch import Elasticsearch
import requests
import json

es = Elasticsearch(['http://localhost:9200'])  # ElasticSearch 서버 주소

def send_to_slack(message):
    webhook_url = 'https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX'
    slack_data = {'text': message}

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

    if response.status_code != 200:
        raise ValueError(
            'Request to slack returned an error %s, the response is:\n%s'
            % (response.status_code, response.text)
        )

def get_additional_info(event, api_url):
    response = requests.get(api_url, params={"id": event['_id']})
    return response.json()

def check_events(query):
    response = es.search(index="your_index", body=query)
    events = response['hits']['hits']
    return events

@shared_task
def check_and_notify_type1():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type1"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type1")
        message = f"Type1 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

@shared_task
def check_and_notify_type2():
    query = {
        "query": {
            "bool": {
                "must": [
                    {"match": {"field_name": "condition_type2"}}
                ]
            }
        }
    }
    events = check_events(query)
    for event in events:
        additional_info = get_additional_info(event, "http://api.example.com/type2")
        message = f"Type2 Event detected: {event['_source']} \nAdditional Info: {additional_info}"
        send_to_slack(message)

5. Celery Worker와 Beat 시작

# Celery Worker 시작
celery -A your_project worker --loglevel=info

# Celery Beat 시작 (주기적인 작업 스케줄링)
celery -A your_project beat --loglevel=info

Celery는 Django 외에도 다양한 Python 애플리케이션에서 비동기 작업 및 주기적인 작업을 처리하는 데 사용할 수 있습니다. Django와의 통합은 매우 유연하고 간편하며, 이를 통해 복잡한 작업 흐름을 비동기적으로 처리할 수 있습니다.

728x90