본문 바로가기
서버구축 (WEB,DB)

Debezium으로 DB(데이터베이스) 동기화 구축

by 날으는물고기 2023. 11. 11.

Debezium으로 DB(데이터베이스) 동기화 구축

Debezium은 데이터베이스의 변경 사항을 캡처하고 이를 이벤트 스트림으로 변환하는 오픈 소스 데이터 변경 스트리밍 플랫폼입니다. Debezium을 사용하여 데이터베이스 간 또는 데이터베이스와 다른 애플리케이션 간의 실시간 데이터 동기화를 구축할 수 있습니다.

아래는 Debezium을 사용하여 DB 동기화를 구축하는 과정에 대한 기본적인 정보입니다.

  1. Debezium 설치 및 설정:
    먼저 Debezium을 설치하고 설정해야 합니다. Debezium은 Kafka Connect 플러그인으로 제공되며, Apache Kafka 클러스터를 사용하여 변경 이벤트를 게시합니다. Kafka와 Kafka Connect를 먼저 설치하고 Debezium 플러그인을 추가해야 합니다.
  2. 데이터베이스 연결 구성:
    Debezium은 다양한 데이터베이스 시스템을 지원하며, 사용 중인 데이터베이스에 맞는 커넥터를 선택하고 설정해야 합니다. 예를 들어, MySQL, PostgreSQL, MongoDB, SQL Server, Oracle 등의 데이터베이스를 지원합니다.
  3. Debezium 커넥터 설정:
    데이터베이스에 대한 Debezium 커넥터를 구성합니다. 이 커넥터는 데이터베이스 변경 사항을 감지하고 Kafka 토픽으로 이동시킵니다. 커넥터 설정에는 데이터베이스 연결 정보, 토픽 이름, 변환 설정 등이 포함됩니다.
  4. Kafka로 변경 이벤트 전송:
    Debezium 커넥터가 데이터베이스 변경을 감지하면 해당 변경 이벤트를 Kafka 토픽으로 전송합니다. Kafka의 프로듀서를 사용하여 변경 이벤트를 다른 애플리케이션으로 전송할 수 있습니다.
  5. 변경 이벤트 구독 및 처리:
    다른 애플리케이션은 Kafka 컨슈머를 사용하여 변경 이벤트를 구독하고 처리합니다. 이를 통해 데이터베이스의 변경 사항을 실시간으로 동기화하고 다른 시스템과 연동할 수 있습니다.
  6. 변경 이벤트 처리 및 적용:
    변경 이벤트를 받은 애플리케이션에서는 이벤트를 처리하고 필요한 작업을 수행합니다. 예를 들어, 데이터베이스에 대한 업데이트를 수행하거나 다른 서비스에 데이터를 전송하는 등의 작업을 수행할 수 있습니다.
  7. 오류 처리와 모니터링:
    실시간 동기화 환경에서는 오류 처리와 모니터링이 중요합니다. Debezium은 오류를 처리하고 모니터링할 수 있는 도구와 기능을 제공합니다.

Debezium을 사용하여 데이터베이스 동기화를 구축하면 데이터베이스의 변경 사항을 실시간으로 추적하고 다른 시스템과 데이터를 공유할 수 있습니다. 이것은 마이그레이션, 데이터 분석, 이벤트 소싱 및 다른 실시간 데이터 통합 시나리오에 유용합니다. Debezium의 공식 문서와 각 데이터베이스 시스템에 대한 튜토리얼을 참조하여 자세한 정보를 얻을 수 있습니다.

 

SQL Server를 Source DB로 사용하고 Kafka Connect를 실행하는 과정을 단계별로 설명하겠습니다.

 

Kafka Connect 설정 파일 생성:

Kafka Connect를 설정하기 위한 설정 파일 connect-distributed.properties을 생성합니다. 설정 파일은 JSON 형식으로 작성되어야 합니다.

bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/opt/kafka_2.13-2.8.1/connectors

 

Kafka Connect 실행:

Kafka Connect를 실행합니다. 이를 위해 설정 파일과 이미 설치된 Kafka를 사용합니다.

# Kafka Connect 실행
connect-distributed.sh /opt/kafka/config/connect-distributed.properties

# Kafka Connect 확인
curl http://localhost:8083

{
  "version": "2.8.1",
  "commit": "839b886f9b732b15",
  "kafka_cluster_id": "UNddERzlQEGcC32OlruIkw"
}

# Connector 플러그인 확인
curl --location --request GET http://localhost:8083/connector-plugins

[
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.5.4.Final"
  },
  {
    "class": "io.debezium.connector.postgresql.PostgresConnector",
    "type": "source",
    "version": "1.5.4.Final"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "1.5.4.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.8.1"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.8.1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

 

SQL Server Connector 생성:

SQL Server Connector를 REST API를 사용하여 생성합니다. 설정은 SQL Server 연결 정보와 Debezium 커넥터 설정으로 구성됩니다. 아래 명령은 SQL Server Connector를 생성하는 예시입니다. 여기서 "database.user""database.password"를 SQL Server 인증 정보로 설정해야 합니다.

curl --location --request POST http://localhost:8083/connectors \
--header 'Content-Type: application/json' \
--data-raw '{
  "name": "source-sqlserver-connector",
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "tasks.max": "1",
    "database.hostname": "your-sqlserver-host",
    "database.port": "1433",
    "database.user": "your-sqlserver-username",
    "database.password": "your-sqlserver-password",
    "database.dbname": "your-database-name",
    "database.server.name": "your-server-name",
    "table.whitelist": "your-table-name",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "dbhistory.sourcedb",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true"
  }
}'

 

Connector 상태 확인:

생성된 Connector의 상태를 확인합니다.

curl --location --request GET http://localhost:8083/connectors

[
  "source-sqlserver-connector"
]

 

Kafka Topic 확인:

생성된 Kafka Topic을 확인합니다.

kafka-topics.sh --list --bootstrap-server localhost:9092

__consumer_offsets
connect-configs
connect-offsets
connect-status
dbhistory.sourcedb

 

위의 단계를 따라하면 SQL Server에서 Kafka로 데이터를 스트리밍하는 Kafka Connect를 설정할 수 있습니다. 실제 SQL Server 인증 정보 및 데이터베이스, 테이블 이름에 맞게 설정하세요.

 

여러 개의 테이블을 추적하려면 하나의 커넥터를 사용하여 여러 테이블을 관리할 수 있습니다. Debezium과 같은 CDC(Change Data Capture) 도구를 사용할 때, 하나의 커넥터가 여러 테이블의 변경 사항을 추적하도록 구성할 수 있습니다. table.whitelist 또는 table.include.list 설정을 사용하여 여러 테이블을 동시에 추적할 수 있습니다. 설정 파일에서 이러한 설정을 사용하여 변경 사항을 추적할 테이블을 지정할 수 있습니다.

 

예를 들어, 여러 테이블을 하나의 커넥터로 추적하려면 설정 파일에서 다음과 같이 table.whitelist 설정을 구성할 수 있습니다:

"table.whitelist": "database_name.table1,database_name.table2,database_name.table3"

위에서 "database_name"은 데이터베이스의 이름을 나타내며, "table1," "table2," 및 "table3"은 추적할 테이블의 이름을 나타냅니다. 이렇게 설정하면 하나의 커넥터가 여러 테이블의 변경 사항을 추적하고 Kafka 토픽으로 스트리밍합니다. 따라서 하나의 커넥터로 여러 테이블을 처리할 수 있으며, 각 테이블에 대한 개별 커넥터를 만들 필요가 없습니다.

 

table.* 형태로 와일드카드를 사용하여 모든 테이블을 추적할 수 있습니다. table.whitelist 설정을 사용하여 여러 테이블을 지정할 때 와일드카드를 사용할 수 있습니다.

 

예를 들어, 다음과 같이 설정하면 해당 데이터베이스의 모든 테이블을 추적합니다:

"table.whitelist": "database_name.*"

위에서 "database_name"은 실제 데이터베이스의 이름을 나타내며, .*는 해당 데이터베이스의 모든 테이블을 의미합니다. 이렇게 설정하면 Debezium 커넥터가 지정한 데이터베이스 내의 모든 테이블의 변경 사항을 추적하고 Kafka 토픽으로 스트리밍합니다. 이것은 여러 테이블을 개별적으로 지정하는 대신 모든 테이블을 한 번에 추적하는 편리한 방법입니다.

 

따라서 table.*와 같은 와일드카드를 사용하면 특정 데이터베이스 내의 모든 테이블을 커넥터로 추적할 수 있습니다.

 

특정 필드 레벨의 변경 사항을 추적하기 위한 내장 기능을 제공하지 않습니다. 일반적으로 Debezium은 데이터베이스의 로우 레벨 변경 사항을 통째로 캡처하고 전송합니다. 그러므로 변경된 행 전체가 아니라 특정 필드만 추적하는 것은 Debezium의 주요 목적은 아닙니다.

 

그러나 Debezium에서 변경 사항을 커스터마이징하고 특정 필드를 선택적으로 처리하기 위해 다른 Kafka Connect 변환기(Transforms)를 사용할 수 있습니다. Kafka Connect 변환기를 사용하여 메시지를 변환하고 특정 필드를 추출할 수 있습니다. 예를 들어, JSON 변환기(JsonConverter)를 사용하여 Kafka Connect 설정에 대한 변환을 구성하고, 필요한 필드만 선택하여 새로운 토픽에 전송할 수 있습니다. 이를 통해 Debezium에서 전체 행을 스트리밍하고 Kafka Connect 변환을 사용하여 특정 필드를 선택할 수 있습니다.

 

이렇게 하려면 Kafka Connect의 변환(Transforms) 설정을 수정하여 데이터 변환이 필요한 경우에만 해당 변환을 적용하도록 할 수 있습니다. 변환 방법은 Kafka Connect의 설정에 따라 다를 수 있으며, 필요에 따라 변환기를 작성하여 데이터를 원하는 형식으로 변환할 수도 있습니다.

 

그리고, Debezium은 기본적으로 테이블 단위의 변경 사항을 캡처하고 Kafka에 스트리밍합니다. 따라서 테이블 간 조인을 자동으로 수행하지 않습니다. 그러나 여러 테이블을 조인하여 수집하려면 다음과 같은 옵션이 있습니다.

  1. Kafka Stream 또는 Apache Flink와 같은 스트림 처리 프레임워크 사용
    Debezium에서 변경 사항을 Kafka로 스트리밍한 후, Kafka Stream 또는 Apache Flink와 같은 스트림 처리 프레임워크를 사용하여 여러 테이블 간의 조인 및 데이터 처리를 수행할 수 있습니다. 이러한 스트림 처리 프레임워크는 Debezium에서 수집한 데이터를 조인하고 필요한 데이터를 추출할 수 있도록 도와줍니다.
  2. Database Connector를 수정하여 테이블 조인 추가
    Debezium 커넥터를 직접 수정하여 테이블 간의 조인을 수행할 수도 있습니다. 그러나 이는 Debezium의 기본 동작을 변경하고 복잡성을 증가시킬 수 있으므로 주의해야 합니다. 변경 사항을 추가하려면 Debezium 커넥터의 소스 코드를 수정하고 필요한 커스텀 로직을 추가해야 합니다.
  3. 데이터 웨어하우스 또는 데이터베이스에서 조인 수행
    변경된 데이터를 수집하여 데이터 웨어하우스나 데이터베이스에 저장한 다음, 해당 데이터베이스에서 SQL 질의를 사용하여 여러 테이블을 조인하는 것이 다른 옵션일 수 있습니다.
  4. Debezium과 함께 Kafka Streams Data Integration 사용
    Confluent에서 개발한 Kafka Streams Data Integration을 사용하여 Debezium으로부터 변경 데이터를 처리하고, 여러 테이블 간의 조인을 수행하는 것이 가능합니다. 이 방법은 비교적 쉽게 사용할 수 있으며, Debezium과 Kafka를 통합하여 데이터 처리 및 조인을 수행할 수 있습니다.

어떤 방법을 선택할지는 프로젝트의 요구 사항, 복잡성, 성능 및 확장 가능성을 고려하여 결정해야 합니다. 각 방법은 장단점이 있으며, 프로젝트의 특정 상황에 맞는 최적의 접근 방식을 선택해야 합니다.

 

SQL Server에 데이터 추가:

SQL Server 컨테이너로 접속하여 데이터를 추가합니다.

docker exec -it your-sqlserver-container-name /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P your-sa-password
USE your-database-name;
INSERT INTO your-table-name VALUES ("value1", "value2", ...);

 

변경 사항 확인:

변경 사항이 Kafka Topic으로 푸시되는지 확인합니다.

kafka-console-consumer.sh --topic your-server-name.your-database-name.your-table-name --bootstrap-server localhost:9092 --from-beginning

 

Kafka Topic의 특정 파티션에 대한 리더(leader)가 사용할 수 없는 상태에 있거나 리더를 찾을 수 없을 때, Kafka 브로커 자체에 문제가 있는 경우 "LEADER_NOT_AVAILABLE" 오류가 발생할 수 있습니다.

 

아래는 "LEADER_NOT_AVAILABLE" 오류를 해결하기 위한 기본적인 단계입니다.

  1. Kafka 브로커 상태 확인
    Kafka 브로커가 올바르게 실행 중인지 확인하려면 다음 명령어를 사용할 수 있습니다.
    # Kafka 브로커 목록 확인
    kafka-topics.sh --list --bootstrap-server localhost:9092
    이 명령어는 현재 실행 중인 Kafka 브로커의 토픽 목록을 반환합니다.

  2. Kafka Topic 파티션 리더 확인:
    Kafka Topic의 파티션 리더가 올바르게 설정되어 있는지 확인합니다.
    # Kafka Topic 파티션 정보 확인
    kafka-topics.sh --describe --topic your-topic-name --bootstrap-server localhost:9092
    "your-topic-name"에는 문제가 있는 Kafka Topic의 이름을 입력하세요. 이 명령어는 파티션 리더 정보를 표시합니다. "Leader" 항목이 "LEADER_NOT_AVAILABLE"이라면 해당 파티션에 문제가 있을 수 있습니다.

  3. Kafka Topic 상태 확인:
    다음 명령어를 사용하여 Kafka Topic의 파티션 상태를 확인합니다.
    # Kafka Topic 파티션 상태 확인
    kafka-log-dirs.sh --describe --bootstrap-server localhost:9092
    이 명령어는 Kafka Topic의 파티션 상태를 표시합니다.

  4. Kafka 브로커 및 토픽 재시작:
    문제가 지속되면 Kafka 브로커 및 해당 토픽을 재시작해 보세요. 먼저 Kafka 브로커를 중지하고 다시 시작합니다.
    # Kafka 브로커 중지
    kafka-server-stop.sh
    
    # Kafka 브로커 시작
    kafka-server-start.sh /path/to/server.properties
    그리고 나서 Kafka Topic을 재시작합니다.
    # Kafka Topic 삭제 (주의: 모든 데이터가 삭제됩니다)
    kafka-topics.sh --delete --topic your-topic-name --bootstrap-server localhost:9092
    
    # Kafka Topic 생성
    kafka-topics.sh --create --topic your-topic-name --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
    "your-topic-name"에는 문제가 있는 Kafka Topic의 이름을 입력하세요. 다시 시작된 Kafka Topic에는 이전 데이터가 포함되지 않을 수 있으므로 주의가 필요합니다.

  5. Kafka 클러스터 로그 확인:
    Kafka 클러스터 로그를 검토하여 추가 정보나 오류 메시지를 확인합니다. Kafka 클러스터 로그 위치와 로그 파일 이름은 Kafka 설정에 따라 다를 수 있습니다.

  6. Kafka 클러스터 상태 모니터링:
    Kafka 클러스터 상태를 모니터링하는 도구를 사용하여 Kafka 브로커 및 토픽의 상태를 지속적으로 모니터링합니다. 이를 통해 문제가 발생하기 전에 문제를 사전에 감지할 수 있습니다.

위의 단계를 따르고 문제가 지속되면 더 자세한 진단을 위해 Kafka 클러스터 및 애플리케이션 로그를 확인하고 Kafka 컨슈머 애플리케이션에 대한 구성을 검토해야 합니다.

 

아래와 같은 오류가 발생하는 경우 SQL Server의 cdc.lsn_time_mapping 객체를 찾을 수 없다는 것을 나타냅니다. 이 오류는 SQL Server Change Data Capture (CDC) 설정과 관련이 있습니다.

Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: Invalid object name 'cdc.lsn_time_mapping'.

이러한 에러 메시지는 SQL Server의 변경 데이터 캡처(CDC)를 사용하는 Debezium과 관련된 문제를 나타냅니다. 이 에러 메시지는 LSN(로그 시퀀스 번호)가 최대 값으로 기록되지 않거나 SQL Server Agent가 실행 중이지 않거나 CDC가 올바르게 구성되지 않았을 때 발생할 수 있습니다.

 

원인:

  • SQL Server Agent가 실행 중이지 않거나 제대로 구성되지 않은 경우.
  • CDC가 활성화되지 않거나 잘못 구성된 경우.

 

CDC를 사용하여 SQL Server를 모니터링하려면 CDC를 적절하게 구성해야 합니다. CDC 설정을 변경하고 커넥터를 다시 실행하여 문제를 해결할 수 있습니다.

 

SQL Server 컨테이너에 접속

SQL Server 컨테이너를 실행 중인 환경에서 명령줄 터미널을 열어 SQL Server에 접속합니다.

 

CDC 테이블 활성화 확인
다음 쿼리를 사용하여 CDC가 활성화된 테이블 목록을 확인할 수 있습니다.

SELECT s.name AS Schema_Name, tb.name AS Table_Name
, tb.object_id, tb.type, tb.type_desc, tb.is_tracked_by_cdc
FROM sys.tables tb
INNER JOIN sys.schemas s on s.schema_id = tb.schema_id
WHERE tb.is_tracked_by_cdc = 1

CDC가 활성화되지 않은 테이블을 활성화해야 합니다.

 

CDC 데이터베이스 활성화 및 실행 확인
다음 쿼리를 사용하여 CDC 데이터베이스가 활성화되어 있는지 확인할 수 있습니다.

SELECT * 
FROM sys.change_tracking_databases 
WHERE database_id = DB_ID('YourDatabaseName')

또한 CDC 데이터베이스가 실행 중인지 확인해야 합니다.

EXECUTE sys.sp_cdc_enable_db;
GO

 

SQL Server Agent 실행 확인:
SQL Server Agent가 실행 중인지 확인해야 합니다. 다음 명령을 사용하여 SQL Server Agent를 실행할 수 있습니다.

EXECUTE sys.sp_cdc_start_job;
GO

 

CDC 테이블 활성화 시 롤 이름 설정 확인
CDC 테이블을 활성화할 때 롤 이름 설정에 문제가 발생할 수 있습니다. 롤 이름을 null로 설정하여 문제를 해결할 수 있습니다.

EXECUTE sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'YourTableName',
    @capture_instance = N'YourCaptureInstanceName',
    @role_name = NULL,
    @filegroup_name = N'CDC_DATA',
    @supports_net_changes = 1

이러한 단계를 따라서 SQL Server의 CDC를 올바르게 구성하고 SQL Server Agent를 실행하면 Debezium에서 발생하는 LSN 관련 에러를 해결할 수 있을 것입니다.

 

CDC 테이블 활성화

SQL Server 인스턴스에 로그인한 후 다음 T-SQL 쿼리를 실행하여 CDC를 활성화합니다.

USE [YourDatabaseName]; -- CDC를 활성화할 데이터베이스 선택

-- CDC를 활성화할 테이블 설정
EXEC sys.sp_cdc_enable_db; -- 데이터베이스 수준의 CDC 활성화
EXEC sys.sp_cdc_enable_table
   @source_schema = N'dbo', -- 스키마 이름
   @source_name = N'YourTableName', -- 테이블 이름
   @role_name = N'cdc_Admin', -- 역할 이름 (CDC 역할을 생성하고 사용하려면 먼저 역할을 만들어야 합니다.)
   @supports_net_changes = 1; -- 넷 변경 정보를 캡처

-- SQL Server에 적용된 변경을 컨슈머에게 전달할 CDC 스키마 변경
EXEC sys.sp_cdc_add_job @job_type = 'capture'; -- CDC 데이터 캡처 작업을 추가하는 경우
-- 또는
EXEC sys.sp_cdc_add_job @job_type = 'cleanup'; -- CDC 데이터 정리 작업을 추가하는 경우

이 쿼리에서 YourDatabaseName, YourTableName 등을 실제 데이터베이스 이름과 테이블 이름으로 변경하세요.

 

CDC 역할 생성 (선택 사항)

위의 쿼리에서 @role_name 매개변수를 사용하여 CDC 역할을 지정하고 있습니다. 역할이 이미 존재하지 않는 경우 역할을 생성해야 합니다.

USE [YourDatabaseName]; CREATE ROLE cdc_Admin;

 

SQL Server 재시작

CDC 설정을 활성화한 후 SQL Server를 재시작하여 변경 데이터 캡처가 시작되도록 합니다.

 

CDC가 활성화되면 Debezium을 사용하여 SQL Server의 변경 데이터를 모니터링할 수 있게 됩니다. Debezium 커넥터를 설정하고 실행하여 SQL Server에서 변경 데이터를 Kafka로 스트리밍할 수 있습니다.

 

SQL Server의 Change Data Capture (CDC)는 데이터베이스와 테이블 수준에서 활성화해야 합니다. CDC를 활성화하면 각 테이블에 대한 변경 데이터를 캡처할 수 있습니다. 테이블별로 활성화하는 것이 일반적인 방법이며, SQL Server에서는 테이블 수준의 CDC 설정을 제공합니다.

 

SQL Server는 데이터베이스를 만들거나 스키마를 변경할 때 CDC를 자동으로 활성화하는 옵션을 제공하지 않습니다. 따라서 각 테이블에 대해 CDC를 개별적으로 활성화해야 합니다. 새로운 테이블을 추가하면 해당 테이블에 대한 CDC를 개별적으로 활성화해야 합니다. CDC를 자동으로 적용하려면 특정 테이블을 생성할 때마다 해당 테이블에 대한 CDC 설정을 자동으로 적용하는 스크립트나 프로세스를 만들어야 합니다.

 

CDC를 테이블 수준에서 자동으로 활성화하려면 데이터베이스 변경 이벤트를 감지하고 CDC 설정을 자동으로 적용하는 스크립트나 프로세스를 개발해야 합니다. 이러한 작업은 일반적으로 사용자 정의 스크립트나 관리 도구를 사용하여 수행됩니다. 더 많은 자동화를 위해 SQL Server Extended Events와 같은 기능을 활용하여 데이터베이스 및 테이블 변경을 감지하고 이벤트가 발생할 때 자동으로 CDC를 활성화하는 프로세스를 구축할 수 있습니다.

 

MySQL을 SinkDB로 사용하는 방법을 단계별로 설명합니다. SinkDB로 데이터를 동기화하는 것을 목표로 합니다.

 

MySQL SinkDB 컨테이너 생성:

MySQL SinkDB를 실행하는 Docker 컨테이너를 생성합니다. Docker Compose를 사용하여 아래와 같이 docker-compose.yml 파일을 작성합니다.

 version: '3'
 services:
   mysql-sink:
     container_name: mysql-sink
     image: mysql:latest
     restart: always
     networks:
       - testnet
     ports:
       - 3306:3306
     environment:
       - MYSQL_ROOT_PASSWORD=your_root_password
     volumes:
       - ./mysql-sink:/var/lib/mysql

위 설정에서 your_root_password를 실제로 사용할 루트 패스워드로 변경합니다.

그리고 Docker Compose를 사용하여 컨테이너를 실행합니다.

 docker-compose up -d

 

MySQL SinkDB에 접속 및 DB 및 테이블 생성:

MySQL SinkDB에 접속하고 SinkDB 및 테이블을 생성합니다. 컨테이너 이름과 패스워드를 사용하여 MySQL에 접속합니다.

 docker exec -it mysql-sink mysql -u root -p

그러면 패스워드를 입력하라는 프롬프트가 표시됩니다. 패스워드를 입력한 후 MySQL 쉘에 접속하게 됩니다.

 

다음으로 SinkDB를 생성하고, 이 DB에 테이블을 만듭니다. 아래 명령어를 사용하여 데이터베이스와 테이블을 생성할 수 있습니다. 이 예제에서는 "sinkdb"라는 데이터베이스와 "sinktable"라는 테이블을 생성합니다. 필요한 경우 테이블 구조를 소스 데이터베이스와 동일하게 맞춥니다.

 CREATE DATABASE sinkdb;
 USE sinkdb;

 CREATE TABLE sinktable (
   identification_id VARCHAR(255),
   identification_pw VARCHAR(255),
   name VARCHAR(255),
   department VARCHAR(255),
   PRIMARY KEY (identification_id)
 );

 

MySQL Sink Connector 설정:

SinkDB를 위한 MySQL Connector를 설정합니다. 앞서 설치한 Kafka Connect 인스턴스를 사용하여 MySQL Sink Connector를 생성합니다. Connector 설정 파일은 다음과 같은 내용을 포함해야 합니다.

 {
   "name": "mysql-sink-connector",
   "config": {
     "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
     "tasks.max": "1",
     "topics": "your_source_topic_name",
     "connection.url": "jdbc:mysql://mysql-sink:3306/sinkdb",
     "connection.user": "root",
     "connection.password": "your_root_password",
     "auto.create": "true",
     "auto.evolve": "true"
   }
 }

이 설정에서 your_source_topic_name은 데이터를 가져올 Kafka 토픽의 이름을 나타냅니다. 그리고 connection.url과 connection.password를 MySQL SinkDB의 정보에 맞게 업데이트합니다.

 

Kafka Connect 실행:

Kafka Connect를 실행하고 MySQL Sink Connector를 시작합니다.

 connect-distributed.sh /opt/kafka/config/connect-distributed.properties

 

SinkDB로 데이터 동기화 확인:

SourceDB에 데이터를 입력하면 SinkDB로 데이터가 자동으로 동기화됩니다. SinkDB에서 데이터를 확인하여 정상 동작하는지 확인합니다.

 docker exec -it mysql-sink bash
 mysql -u root -p
 use sinkdb;
 SELECT * FROM sinktable;

 

위의 단계를 따라 SQL Server 데이터베이스 데이타를 MySQL로 동기화하는 프로세스를 완료할 수 있습니다.

728x90

댓글