본문 바로가기
서버구축 (WEB,DB)

대량 데이타 실시간 처리 분산 메시징 시스템 Kafka 구조 및 구성

by 날으는물고기 2023. 12. 8.

대량 데이타 실시간 처리 분산 메시징 시스템 Kafka 구조 및 구성

Install Kafka Cluster and Zookeeper with High Availability by Hakim - Medium

Apache Kafka는 대용량의 데이터를 안정적으로 수집, 저장 및 처리하기 위한 분산 스트리밍 플랫폼입니다. 주로 대규모 데이터의 실시간 스트리밍 및 이벤트 처리에 사용되며, 여러 소비자에게 데이터를 안전하게 전달할 수 있는 메시징 시스템의 역할을 합니다.

 

주요 특징

  1. 분산 아키텍처: Kafka는 여러 브로커로 구성된 분산 아키텍처를 가지며, 데이터를 여러 노드에 분산하여 안정성과 확장성을 제공합니다.
  2. 토픽 기반 메시지 큐: 데이터는 토픽(topic)이라는 카테고리로 구분되며, 생산자(producer)가 메시지를 생성하고, 소비자(consumer)가 해당 토픽에서 메시지를 구독하여 처리합니다.
  3. 내결함성: 브로커 중 하나가 실패해도 데이터의 유실 없이 안전하게 처리할 수 있도록 내결함성을 제공합니다.
  4. 확장성: Kafka 클러스터는 노드를 동적으로 추가하여 처리 능력을 확장할 수 있습니다.
  5. 지속성: 데이터는 디스크에 지속적으로 저장되므로, 소비자가 메시지를 처리하지 않아도 데이터는 보존됩니다.

Kafka 구성 방법

아래는 간단한 Apache Kafka 구성 단계입니다. 이는 기본적인 설정으로 시작하는 것이며, 실제 환경에서는 더 많은 설정 및 조정이 필요할 수 있습니다.

  1. Java 설치 확인
    • Apache Kafka는 Java 기반으로 동작하므로, Java가 설치되어 있는지 확인합니다.
  2. Kafka 다운로드
    • Apache Kafka 공식 웹사이트에서 최신 버전의 Kafka를 다운로드합니다.
  3. 압축 해제
    tar -xzf kafka_2.x.x.tgz
    cd kafka_2.x.x
  4. Zookeeper 구동 (옵션)
    • Kafka는 Zookeeper를 사용하여 브로커 간의 상태 및 메타데이터를 관리합니다. Zookeeper를 실행하려면 다음을 사용합니다.
      bin/zookeeper-server-start.sh config/zookeeper.properties
  5. Kafka 브로커 구동
    • Kafka 브로커를 시작하려면 다음을 사용합니다.
      bin/kafka-server-start.sh config/server.properties
  6. 토픽 생성
    • 새로운 토픽을 생성하려면 다음을 사용합니다.
      bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic myTopic
  7. 생산자 및 소비자 테스트
    • 메시지를 생성하려면:
      bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic myTopic
    • 메시지를 소비하려면:
      bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning

이러한 단계로 기본적인 Kafka 환경을 설정하고 메시지를 생성하고 소비할 수 있습니다. 필요에 따라 더 복잡한 설정과 운영 환경에 대한 고려가 필요할 수 있습니다.

 

Apache Kafka는 내부적으로 Zookeeper를 사용하여 분산 시스템에서의 동기화, 조정, 리더 선출 및 다른 중요한 작업을 수행합니다. Zookeeper는 Kafka 브로커 간의 협력 및 클러스터 상태 관리를 담당하며 안정적인 운영을 위해 필수적인 구성 요소입니다.

 

Kafka가 Zookeeper를 사용하는 주요 이유는 다음과 같습니다.

  1. 브로커 상태 관리: Zookeeper는 Kafka 브로커의 메타데이터와 상태를 저장하고 관리하여 브로커 간의 협력을 가능하게 합니다.
  2. 리더 선출: 토픽의 각 파티션에는 리더(Leader)와 팔로워(Follower)가 있습니다. Zookeeper는 리더 선출을 조정하고 이에 따른 파티션의 상태를 추적합니다.
  3. 동기화: 여러 브로커 간의 동기화를 통해 데이터의 일관성과 안정성을 유지합니다.
  4. Failover 및 내결함성: Zookeeper는 브로커의 실패를 감지하고, 새로운 리더를 선출하여 시스템의 내결함성을 제공합니다.

따라서 Kafka를 사용하려면 Zookeeper가 필수적입니다.

일반적으로 Kafka 클러스터를 운영하려면 Zookeeper 클러스터도 함께 구성되어야 합니다.

 

Docker에서 Kafka 실행

  1. Docker Compose 파일 작성 (docker-compose.yml)
    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka
        ports:
          - "9092:9092"
        expose:
          - "9093"
        environment:
          KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
          KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
          KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
          KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
          KAFKA_CREATE_TOPICS: "myTopic:1:1"
  2. Docker Compose 실행
    docker-compose up -d
    이제 Zookeeper와 Kafka가 도커 컨테이너에서 실행됩니다.

Kubernetes에서 Kafka 실행

  1. Kafka를 위한 Kubernetes YAML 파일 작성 (kafka.yaml)
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: kafka
    spec:
      replicas: 1
      serviceName: kafka
      selector:
        matchLabels:
          app: kafka
      template:
        metadata:
          labels:
            app: kafka
        spec:
          containers:
          - name: kafka
            image: wurstmeister/kafka
            ports:
            - containerPort: 9092
            env:
            - name: KAFKA_ADVERTISED_LISTENERS
              value: INSIDE://kafka:9093,OUTSIDE://localhost:9092
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
            - name: KAFKA_LISTENERS
              value: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
            - name: KAFKA_INTER_BROKER_LISTENER_NAME
              value: INSIDE
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: "zookeeper:2181"
            - name: KAFKA_CREATE_TOPICS
              value: "myTopic:1:1"
  2. Kafka를 위한 Kubernetes Service YAML 파일 작성 (kafka-service.yaml)
    apiVersion: v1
    kind: Service
    metadata:
      name: kafka
    spec:
      selector:
        app: kafka
      ports:
        - protocol: TCP
          port: 9092
          targetPort: 9092
      type: LoadBalancer
  3. Kubernetes 클러스터에 배포
    kubectl apply -f kafka.yaml
    kubectl apply -f kafka-service.yaml
  4. Kafka 브로커 IP 및 포트 확인
    kubectl get services
    이제 Kafka가 Kubernetes 클러스터에서 실행되며, 브로커의 외부 IP 및 포트를 확인할 수 있습니다.

이러한 단계로 Docker와 Kubernetes에서 Kafka를 구성하고 실행할 수 있습니다. 필요에 따라 YAML 파일의 설정을 조절하여 클러스터의 크기를 확장하거나 다양한 설정을 변경할 수 있습니다.

 

Apache Kafka는 다양한 도구 및 클라이언트를 사용하여 데이터를 소비(consume)하고 생산(produce)할 수 있는 풍부한 생태계를 제공합니다. Kafka의 클라이언트 라이브러리들은 다양한 프로그래밍 언어에서 사용 가능하며, 데이터를 처리하고 다른 시스템으로 전송할 수 있는 풍부한 기능을 제공합니다.

 

일부 흔히 사용되는 Kafka 클라이언트 및 도구는 다음과 같습니다.

  1. Kafka Connect
    • Kafka Connect는 Kafka와 다른 시스템 간의 데이터 이동을 위한 확장 가능하고 확장 가능한 플랫폼입니다. 다양한 Connectors를 사용하여 데이터 소스와 대상 간의 통합을 쉽게 설정할 수 있습니다. 예를 들어, JDBC, Elasticsearch, HDFS 등과의 연결이 가능합니다.
  2. Kafka Streams
    • Kafka Streams는 Kafka 클러스터에서 데이터 처리 및 분석을 수행하는 라이브러리입니다. Kafka Streams를 사용하여 데이터를 처리하고 결과를 Kafka 토픽에 전송할 수 있습니다.
  3. Kafka Producer 및 Consumer API
    • Kafka에는 여러 프로그래밍 언어를 지원하는 Producer 및 Consumer API가 있습니다. 이러한 API를 사용하여 데이터를 Kafka 토픽으로 보내고, 토픽에서 데이터를 소비하고 처리할 수 있습니다.
  4. Logstash 및 Beats
    • Elasticsearch의 Logstash 및 Beats와 같은 다른 데이터 처리 도구들은 Kafka와 통합되어 데이터를 Kafka 토픽으로 전송하거나 Kafka에서 데이터를 가져올 수 있습니다.

따라서 Kafka는 다양한 통합 옵션을 제공하며, 데이터를 전달하고 처리하기 위해 별도의 도구나 라이브러리를 사용할 필요는 없습니다. 그러나 사용 사례에 따라서는 Logstash, Fluentd, Apache Flink, Spark 등의 다른 도구를 사용하여 데이터를 처리하고 이동할 수 있습니다. 선택한 도구는 주어진 요구 사항과 시스템 아키텍처에 따라 다를 수 있습니다.

 

Kafka는 데이터를 Elasticsearch나 다른 데이터베이스로 직접 전달할 수 있습니다. 이를 위해서는 Kafka Connect를 사용하거나 Kafka Producer API를 활용하여 데이터를 전송하는 방법을 선택할 수 있습니다.

Kafka Connect를 사용한 Elasticsearch로의 전송

Kafka Connect를 통해 Kafka와 Elasticsearch를 연결하면, 데이터의 이동이 간단하게 설정됩니다. 이를 위해 confluent-hub에서 Elasticsearch Connector를 설치하고, Connect 구성 파일에 필요한 설정을 추가합니다.

  1. Elasticsearch Connector 설치
    confluent-hub install confluentinc/kafka-connect-elasticsearch:latest
  2. Kafka Connect 설정 파일 (connect-standalone.properties) 수정
    bootstrap.servers=localhost:9092
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    offset.storage.file.filename=/tmp/connect.offsets
    plugin.path=/path/to/confluentinc-kafka-connect-elasticsearch-<version>
  3. Elasticsearch Connector 설정 파일 (elasticsearch-connector.properties) 작성
    name=elasticsearch-connector
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=myTopic
    key.ignore=true
    connection.url=http://elasticsearch:9200
    type.name=kafka-connect
  4. Kafka Connect 실행
    connect-standalone /path/to/connect-standalone.properties /path/to/elasticsearch-connector.properties

Kafka Producer API를 사용한 데이터베이스로의 전송

Kafka Producer API를 사용하여 데이터를 생성하고, 해당 데이터를 데이터베이스에 저장할 수 있습니다. 예를 들어, Kafka Producer API를 사용하여 데이터를 생성한 후 JDBC를 이용하여 데이터베이스에 저장하는 방법이 있습니다.

예시 (Java 코드)

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "key", "value");

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("Record sent successfully to Kafka.");
                    // Code to store data in the database can be added here.
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.close();
    }
}

이후 데이터베이스에 데이터를 저장하는 코드를 Callback 내에서 추가하면 됩니다.

 

두 가지 방법 모두를 사용하여 Kafka에서 Elasticsearch나 데이터베이스로 데이터를 전송할 수 있습니다. 선택은 사용 사례 및 시스템 아키텍처에 따라 달라질 수 있습니다.

 

아래는 Python을 사용하여 Kafka Producer API를 통해 메시지를 생성하고, Elasticsearch로 전송하는 예제 코드입니다. 이 코드를 실행하려면 confluent-kafka 패키지를 설치해야 합니다.

pip install confluent-kafka

이후에는 다음과 같은 Python 코드를 사용할 수 있습니다.

from confluent_kafka import Producer
import json

# Kafka Producer 설정
conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(conf)

# 전송할 데이터
data = {'name': 'John Doe', 'age': 30, 'city': 'New York'}

# Kafka 토픽
topic = 'myTopic'

# 메시지를 JSON 형식으로 직렬화하여 전송
producer.produce(topic, key='myKey', value=json.dumps(data))

# 메시지 전송 확인
producer.flush()

print(f"Message sent to Kafka topic '{topic}': {data}")

이 코드는 myTopic 토픽으로 메시지를 보내고, JSON 형식으로 직렬화하여 Kafka에 전송합니다. 메시지를 Elasticsearch로 전송하려면, 앞서 언급한 Kafka Connect Elasticsearch Connector를 사용하거나, 다른 Elasticsearch Python 라이브러리를 사용하여 메시지를 인덱싱할 수 있습니다.

 

또한, Elasticsearch에 데이터를 전송하기 위해 elasticsearch 패키지를 설치해야 합니다.

pip install elasticsearch

그리고 다음은 Elasticsearch로 데이터를 전송하는 Python 예제 코드입니다.

from elasticsearch import Elasticsearch

# Elasticsearch 연결 설정
es = Elasticsearch(['http://localhost:9200'])

# Elasticsearch 인덱스와 타입 설정
index_name = 'my_index'
doc_type = 'my_doc'

# 전송할 데이터
data = {'name': 'John Doe', 'age': 30, 'city': 'New York'}

# Elasticsearch에 데이터 전송
es.index(index=index_name, doc_type=doc_type, body=data)

print(f"Data sent to Elasticsearch index '{index_name}': {data}")

이 코드는 Elasticsearch에 데이터를 전송하는 기본적인 예제입니다. 데이터를 저장할 Elasticsearch 인덱스와 타입을 설정하고, es.index를 사용하여 데이터를 전송합니다. 이 코드를 실행하면 Elasticsearch에 데이터가 인덱싱됩니다.

728x90

댓글