본문 바로가기
서버구축 (WEB,DB)

데이터 엔지니어링 엔드투엔드(End-to-End) 프로젝트

by 날으는물고기 2024. 7. 29.

데이터 엔지니어링 엔드투엔드(End-to-End) 프로젝트

Data Engineering End-to-End Project — Spark, Kafka, Elasticsearch, Kibana, MinIO, Docker, Airflow, Hadoop YARN, HDFS, Zookeeper, Pandas

기술 스택 개요

1. Apache Airflow

개요: Apache Airflow는 워크플로우 자동화 및 스케줄링 도구로, 복잡한 데이터 파이프라인을 정의, 스케줄링 및 모니터링할 수 있습니다.

활용 예시

  • 데이터 파이프라인을 정의하는 DAG(DAG: Directed Acyclic Graph) 생성
  • DAG를 사용한 데이터 처리 작업 스케줄링
  • Airflow UI를 통해 작업 상태 모니터링

2. Apache Zookeeper

개요: Apache Zookeeper는 분산 시스템을 위한 중앙 집중형 서비스로, 구성 정보를 관리하고, 분산 시스템의 동기화 및 그룹 서비스를 제공합니다.

활용 예시

  • Kafka 클러스터 설정 및 관리
  • 분산 시스템의 노드 상태 모니터링 및 구성 정보 저장

3. Apache Kafka

개요: Apache Kafka는 분산 스트리밍 플랫폼으로, 실시간 데이터 피드를 처리하고 저장하며, 높은 처리량과 내구성을 제공합니다.

활용 예시

  • Kafka 프로듀서를 통해 데이터를 특정 토픽에 전송
  • Kafka 컨슈머를 통해 토픽에서 데이터 읽기
  • 스트리밍 데이터 처리 및 분석

4. Apache Hadoop HDFS

개요: HDFS(Hadoop Distributed File System)는 대규모 데이터 세트를 저장하고 액세스하기 위한 분산 파일 시스템입니다.

활용 예시

  • 대용량 데이터 저장
  • HDFS 명령어를 사용한 데이터 업로드 및 다운로드

5. Apache Spark (PySpark)

개요: Apache Spark는 대규모 데이터 처리를 위한 분산 컴퓨팅 시스템으로, 메모리 기반의 빠른 데이터 처리 성능을 제공합니다. PySpark는 Spark의 Python API입니다.

활용 예시

  • 데이터 프레임 생성 및 변환
  • 스트리밍 데이터 처리
  • 데이터 분석 및 머신 러닝 작업 수행

6. Apache Hadoop YARN

개요: YARN(Yet Another Resource Negotiator)는 Hadoop의 클러스터 리소스 관리 및 작업 스케줄링 프레임워크입니다.

활용 예시

  • 클러스터 리소스 할당 및 관리
  • YARN UI를 통한 작업 모니터링

7. Elasticsearch

개요: Elasticsearch는 분산 검색 및 분석 엔진으로, 대규모 데이터의 빠른 검색 및 분석을 제공합니다.

활용 예시

  • 데이터 색인 생성
  • Elasticsearch API를 통한 데이터 검색 및 분석

8. Kibana

개요: Kibana는 Elasticsearch 데이터를 시각화하는 도구로, 대시보드 및 차트를 통해 데이터를 분석하고 시각화할 수 있습니다.

활용 예시

  • 대시보드 생성
  • 데이터 시각화 및 분석

9. MinIO

개요: MinIO는 고성능 오브젝트 스토리지 시스템으로, S3 호환 API를 제공합니다.

활용 예시

  • 데이터 파일 저장 및 관리
  • MinIO 클라이언트를 사용한 데이터 업로드 및 다운로드

10. Docker

개요: Docker는 응용 프로그램을 컨테이너로 패키징하여 배포 및 실행할 수 있는 플랫폼입니다.

활용 예시

  • Docker 컨테이너 생성 및 실행
  • docker-compose를 사용한 멀티 컨테이너 애플리케이션 관리

11. Python 및 SQL

개요: Python은 다양한 데이터 처리 및 분석 작업에 사용되는 프로그래밍 언어이며, SQL은 데이터베이스와의 상호작용을 위한 쿼리 언어입니다.

활용 예시

  • 데이터 전처리 및 분석
  • 데이터베이스 쿼리 및 데이터 조작

단계별 활용 예시

1. Apache Airflow 설정 및 DAG 작성

설치 및 시작

pip install apache-airflow
airflow db init
airflow webserver --port 8080
airflow scheduler

DAG 예시

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 7, 16),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'example_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    dag=dag,
)

t1 >> t2

2. Apache Kafka 설치 및 사용

설치 및 시작

# Zookeeper 시작
bin/zookeeper-server-start.sh config/zookeeper.properties

# Kafka 서버 시작
bin/kafka-server-start.sh config/server.properties

Kafka 프로듀서 예시

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('example_topic', b'This is a test message')
producer.flush()

Kafka 컨슈머 예시

from kafka import KafkaConsumer

consumer = KafkaConsumer('example_topic', bootstrap_servers='localhost:9092')
for message in consumer:
    print(message.value.decode())

3. HDFS 명령어 사용

HDFS에 파일 업로드

hdfs dfs -put localfile.txt /user/hadoop/hdfsfile.txt

HDFS에서 파일 다운로드

hdfs dfs -get /user/hadoop/hdfsfile.txt localfile.txt

4. PySpark 사용

PySpark 스크립트 예시

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.read.csv('file:///path/to/data.csv', header=True)
df.show()

5. Elasticsearch 및 Kibana 사용

Docker로 Elasticsearch 및 Kibana 실행

# docker-compose.yaml
version: '3'
services:
  elasticsearch:
    image: elasticsearch:7.10.0
    environment:
      - discovery.type=single-node
    ports:
      - "9200:9200"
  kibana:
    image: kibana:7.10.0
    ports:
      - "5601:5601"
docker-compose up

Elasticsearch에 데이터 색인 생성

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
es.index(index='example_index', id=1, body={'name': 'John', 'age': 30})

Kibana에서 데이터 시각화

  • 브라우저에서 http://localhost:5601로 Kibana에 접속
  • Index pattern 생성 후 대시보드 생성

6. MinIO 사용

Docker로 MinIO 실행

docker run -p 9000:9000 -p 9001:9001 --name minio \
  -e "MINIO_ROOT_USER=minioadmin" \
  -e "MINIO_ROOT_PASSWORD=minioadmin" \
  minio/minio server /data --console-address ":9001"

MinIO 클라이언트 사용

mc alias set myminio http://localhost:9000 minioadmin minioadmin
mc mb myminio/mybucket
mc cp localfile.txt myminio/mybucket/localfile.txt

이 예시들을 통해 각 기술 스택을 처음 접하는 사람도 쉽게 이해하고 활용할 수 있습니다. 필요시 각 단계별로 더 깊이 있는 설명과 추가 예제를 제공할 수 있습니다.

 

위와 같은 프로젝트는 주요 부분으로 나뉩니다.

  1. URL에서 압축된 데이터 소스를 가져오기
  2. 원시 데이터를 PySpark 또는 Pandas로 처리하고 HDFS에 파일 저장, Apache Hadoop YARN으로 리소스 확인
  3. 데이터 생성기를 사용하여 스트리밍 데이터를 시뮬레이션하고 Apache Kafka로 데이터 전송(프로듀서)
  4. PySpark(Spark Structured Streaming)를 사용하여 Kafka 주제(컨슈머)에서 스트리밍 데이터 읽기
  5. 스트리밍 데이터를 Elasticsearch에 쓰고 Kibana를 사용하여 시각화
  6. 스트리밍 데이터를 MinIO(AWS Object Storage)에 쓰기
  7. Apache Airflow를 사용하여 전체 데이터 파이프라인 오케스트레이션
  8. Elasticsearch, Kibana 및 MinIO를 Docker로 컨테이너화

프로젝트 단계

  1. Apache Kafka, Apache Spark 및 Apache Hadoop을 로컬에 설치
  2. docker-compose.yaml을 사용하여 Elasticsearch, Kibana, MinIO 실행
  3. Airflow DAG을 통해 전체 데이터 파이프라인 확인
  4. 각 단계별 스크립트 및 설정 파일 제공

로컬 호스트 포트

  • Elasticsearch -> localhost:5601
  • Airflow -> localhost:1502
  • MinIO -> localhost:9001
  • Spark Jobs -> localhost:4040
  • pgAdmin -> localhost:5050
  • Kafka -> localhost:9092
  • Hadoop Namenode -> localhost:9870
  • Hadoop YARN -> localhost:8088/cluster
  • Hadoop HDFS -> localhost:9000

모든 서비스 시작 명령어

sudo systemctl start docker
sudo systemctl start airflow
sudo systemctl start airflow-scheduler
sudo systemctl start zookeeper
sudo systemctl start kafka
start-all.sh
cd /<location_of_docker_compose.yaml>/ && docker-compose up -d

데이터 다운로드 및 전처리

데이터 다운로드

wget -O <your_local_directory>/sensors.zip https://github.com/dogukannulu/datasets/raw/master/sensors_instrumented_in_an_office_building_dataset.zip

데이터 압축 해제 및 README.txt 제거

unzip <location_of_zip_file>/sensors.zip -d /<desired_location_of_unzipped_folder/>
rm /<location_of_KETI>/KETI/README.txt

데이터를 HDFS에 저장

hdfs dfs -put /<location_of_KETI>/KETI/ /<desired_location_to_put_KETI>/

데이터 처리 및 저장

Pandas를 사용한 초기 데이터 읽기 및 쓰기

import os
import pandas as pd
from functools import reduce

directory = '/home/train/datasets/KETI'
dataframes = {}
dataframes_room = {}
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']

def create_separate_dataframes() -> dict:
    for filename in os.listdir(directory):
        new_directory = directory + '/' + filename
        count = 0
        for new_files in os.listdir(new_directory):
            f = os.path.join(new_directory, new_files)
            my_path = new_directory.split('/')[-1] + '_' + new_files.split('.')[0]
            dataframes[my_path] = pd.read_csv(f, names=['ts_min_bignt', columns[count]])
            count += 1

        dataframes_room[filename] = reduce(lambda left, right:
                                           pd.merge(left, right, on='ts_min_bignt', how='inner'),
                                           [dataframes[f'{filename}_co2'], dataframes[f'{filename}_humidity'],
                                            dataframes[f'{filename}_light'], dataframes[f'{filename}_pir'],
                                            dataframes[f'{filename}_temperature']])
        dataframes_room[filename]['room'] = filename
    return dataframes_room

def create_main_dataframe(separate_dataframes: dict):
    dataframes_to_concat = [i for i in separate_dataframes.values()]
    df = pd.concat(dataframes_to_concat, ignore_index=True)
    df = df.sort_values('ts_min_bignt')
    df.dropna(inplace=True)
    df["event_ts_min"] = pd.to_datetime(df["ts_min_bignt"], unit='s')
    return df

def write_main_dataframe(df):
    df.to_csv('/home/train/data-generator/input/sensors.csv', index=False)

dataframes_room = create_separate_dataframes()
main_df = create_main_dataframe(dataframes_room)
write_main_dataframe(main_df)

PySpark를 사용한 초기 데이터 읽기 및 쓰기

import os
import findspark
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

findspark.init("/opt/manual/spark")
spark = SparkSession.builder \
    .appName("Spark Read Write") \
    .master("local[2]") \
    .getOrCreate()

def create_separate_dataframes() -> dict:
    dataframes = {}
    directory = '/home/train/datasets/KETI'
    dataframes_room = {}
    columns = ['co2', 'humidity', 'light', 'pir', 'temperature']
    for filename in os.listdir(directory):
        new_directory = directory + '/' + filename
        for new_files in os.listdir(new_directory):
            f = os.path.join(new_directory, new_files)
            f_hdfs = f.replace('home', 'user')
            my_path = filename + '_' + new_files.split('.')[0]
            dataframes[my_path] = spark.read.csv(f'{f_hdfs}')
            dataframes[my_path] = dataframes[my_path].toDF('ts_min_bignt', columns[count])

        dataframes[f'{filename}_co2'].createOrReplaceTempView('df_co2')
        dataframes[f'{filename}_humidity'].createOrReplaceTempView('df_humidity')
        dataframes[f'{filename}_light'].createOrReplaceTempView('df_light')
        dataframes[f'{filename}_pir'].createOrReplaceTempView('df_pir')
        dataframes[f'{filename}_temperature'].createOrReplaceTempView('df_temperature')

        dataframes_room[filename] = spark.sql('''
        select
          df_co2.*,
          df_humidity.humidity,
          df_light.light,
          df_pir.pir,
          df_temperature.temperature        
        from df_co2
        inner join df_humidity on df_co2.ts_min_bignt = df_humidity.ts_min_bignt
        inner join df_light on df_humidity.ts_min_bignt = df_light.ts_min_bignt
        inner join df_pir on df_light.ts_min_bignt = df_pir.ts_min_bignt
        inner join df_temperature on df_pir.ts_min_bignt = df_temperature.ts_min_bignt
        ''')
        dataframes_room[filename] = dataframes_room[filename].withColumn("room", F.lit(filename))
    return dataframes_room

def create_main_dataframe(separate_dataframes: dict):
    dataframes_to_concat = [i for i in separate_dataframes.values()]
    df = reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dataframes_to_concat)
    df = df.sort(F.col("ts_min_bignt"))
    df = df.dropna()
    df_main = df.withColumn("event_ts_min", F.from_unixtime(F.col("ts_min_bignt")).cast(DateType()))
    df_main = df_main.withColumn("event_ts_min", F.date_format(F.col("event_ts_min"), "yyyy-MM-dd HH:mm:ss"))
    return df_main

def write_main_dataframe(df):
    df = df.toPandas()
    df.to_csv("/home/train/data-generator/input/sensors.csv")

dataframes_room = create_separate_dataframes()
main_df = create_main_dataframe(dataframes_room)
write_main_dataframe(main_df)

Kafka 토픽 생성

from kafka.admin import KafkaAdminClient, NewTopic

admin_client = KafkaAdminClient(
    bootstrap_servers=['localhost:9092'],
    client_id='kafka_admin_client'
)
topic_list = admin_client.list_topics()

def create_new_topic():
    try:
        admin_client.create_topics(new_topics=[NewTopic('office_input', 1, 1)])
        return "Topic office_input successfully created"
    except:
        return "Topic office_input already exists"
kafka-topics.sh --bootstrap-server localhost:9092 --list

데이터 생성기 실행

#!/usr/bin/env bash

set -e
source /home/train/data-generator/datagen/bin/activate
python /home/train/data-generator/dataframe_to_kafka.py -i /home/train/data-generator/input/sensors.csv -t office_input -rst 2

이 단계까지 우리는 원격 데이터 소스를 가져와 로컬에 저장하고, Pandas 또는 PySpark를 사용하여 데이터를 처리했으며, Kafka 토픽을 생성하고, 데이터를 Kafka 토픽에 스트리밍으로 전송하는 작업을 완료했습니다. 다음 단계는 Elasticsearch와 MinIO에 데이터를 저장하고, Kibana로 시각화하는 작업입니다. 각 단계에 대한 코드와 설정을 통해 쉽게 이해할 수 있도록 설명했습니다.

728x90

댓글