기술 스택 개요
1. Apache Airflow
개요: Apache Airflow는 워크플로우 자동화 및 스케줄링 도구로, 복잡한 데이터 파이프라인을 정의, 스케줄링 및 모니터링할 수 있습니다.
활용 예시
- 데이터 파이프라인을 정의하는 DAG(DAG: Directed Acyclic Graph) 생성
- DAG를 사용한 데이터 처리 작업 스케줄링
- Airflow UI를 통해 작업 상태 모니터링
2. Apache Zookeeper
개요: Apache Zookeeper는 분산 시스템을 위한 중앙 집중형 서비스로, 구성 정보를 관리하고, 분산 시스템의 동기화 및 그룹 서비스를 제공합니다.
활용 예시
- Kafka 클러스터 설정 및 관리
- 분산 시스템의 노드 상태 모니터링 및 구성 정보 저장
3. Apache Kafka
개요: Apache Kafka는 분산 스트리밍 플랫폼으로, 실시간 데이터 피드를 처리하고 저장하며, 높은 처리량과 내구성을 제공합니다.
활용 예시
- Kafka 프로듀서를 통해 데이터를 특정 토픽에 전송
- Kafka 컨슈머를 통해 토픽에서 데이터 읽기
- 스트리밍 데이터 처리 및 분석
4. Apache Hadoop HDFS
개요: HDFS(Hadoop Distributed File System)는 대규모 데이터 세트를 저장하고 액세스하기 위한 분산 파일 시스템입니다.
활용 예시
- 대용량 데이터 저장
- HDFS 명령어를 사용한 데이터 업로드 및 다운로드
5. Apache Spark (PySpark)
개요: Apache Spark는 대규모 데이터 처리를 위한 분산 컴퓨팅 시스템으로, 메모리 기반의 빠른 데이터 처리 성능을 제공합니다. PySpark는 Spark의 Python API입니다.
활용 예시
- 데이터 프레임 생성 및 변환
- 스트리밍 데이터 처리
- 데이터 분석 및 머신 러닝 작업 수행
6. Apache Hadoop YARN
개요: YARN(Yet Another Resource Negotiator)는 Hadoop의 클러스터 리소스 관리 및 작업 스케줄링 프레임워크입니다.
활용 예시
- 클러스터 리소스 할당 및 관리
- YARN UI를 통한 작업 모니터링
7. Elasticsearch
개요: Elasticsearch는 분산 검색 및 분석 엔진으로, 대규모 데이터의 빠른 검색 및 분석을 제공합니다.
활용 예시
- 데이터 색인 생성
- Elasticsearch API를 통한 데이터 검색 및 분석
8. Kibana
개요: Kibana는 Elasticsearch 데이터를 시각화하는 도구로, 대시보드 및 차트를 통해 데이터를 분석하고 시각화할 수 있습니다.
활용 예시
- 대시보드 생성
- 데이터 시각화 및 분석
9. MinIO
개요: MinIO는 고성능 오브젝트 스토리지 시스템으로, S3 호환 API를 제공합니다.
활용 예시
- 데이터 파일 저장 및 관리
- MinIO 클라이언트를 사용한 데이터 업로드 및 다운로드
10. Docker
개요: Docker는 응용 프로그램을 컨테이너로 패키징하여 배포 및 실행할 수 있는 플랫폼입니다.
활용 예시
- Docker 컨테이너 생성 및 실행
- docker-compose를 사용한 멀티 컨테이너 애플리케이션 관리
11. Python 및 SQL
개요: Python은 다양한 데이터 처리 및 분석 작업에 사용되는 프로그래밍 언어이며, SQL은 데이터베이스와의 상호작용을 위한 쿼리 언어입니다.
활용 예시
- 데이터 전처리 및 분석
- 데이터베이스 쿼리 및 데이터 조작
단계별 활용 예시
1. Apache Airflow 설정 및 DAG 작성
설치 및 시작
pip install apache-airflow
airflow db init
airflow webserver --port 8080
airflow scheduler
DAG 예시
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 7, 16),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'example_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
dag=dag,
)
t1 >> t2
2. Apache Kafka 설치 및 사용
설치 및 시작
# Zookeeper 시작
bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafka 서버 시작
bin/kafka-server-start.sh config/server.properties
Kafka 프로듀서 예시
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('example_topic', b'This is a test message')
producer.flush()
Kafka 컨슈머 예시
from kafka import KafkaConsumer
consumer = KafkaConsumer('example_topic', bootstrap_servers='localhost:9092')
for message in consumer:
print(message.value.decode())
3. HDFS 명령어 사용
HDFS에 파일 업로드
hdfs dfs -put localfile.txt /user/hadoop/hdfsfile.txt
HDFS에서 파일 다운로드
hdfs dfs -get /user/hadoop/hdfsfile.txt localfile.txt
4. PySpark 사용
PySpark 스크립트 예시
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('example').getOrCreate()
df = spark.read.csv('file:///path/to/data.csv', header=True)
df.show()
5. Elasticsearch 및 Kibana 사용
Docker로 Elasticsearch 및 Kibana 실행
# docker-compose.yaml
version: '3'
services:
elasticsearch:
image: elasticsearch:7.10.0
environment:
- discovery.type=single-node
ports:
- "9200:9200"
kibana:
image: kibana:7.10.0
ports:
- "5601:5601"
docker-compose up
Elasticsearch에 데이터 색인 생성
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
es.index(index='example_index', id=1, body={'name': 'John', 'age': 30})
Kibana에서 데이터 시각화
- 브라우저에서
http://localhost:5601
로 Kibana에 접속 - Index pattern 생성 후 대시보드 생성
6. MinIO 사용
Docker로 MinIO 실행
docker run -p 9000:9000 -p 9001:9001 --name minio \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
minio/minio server /data --console-address ":9001"
MinIO 클라이언트 사용
mc alias set myminio http://localhost:9000 minioadmin minioadmin
mc mb myminio/mybucket
mc cp localfile.txt myminio/mybucket/localfile.txt
이 예시들을 통해 각 기술 스택을 처음 접하는 사람도 쉽게 이해하고 활용할 수 있습니다. 필요시 각 단계별로 더 깊이 있는 설명과 추가 예제를 제공할 수 있습니다.
위와 같은 프로젝트는 주요 부분으로 나뉩니다.
- URL에서 압축된 데이터 소스를 가져오기
- 원시 데이터를 PySpark 또는 Pandas로 처리하고 HDFS에 파일 저장, Apache Hadoop YARN으로 리소스 확인
- 데이터 생성기를 사용하여 스트리밍 데이터를 시뮬레이션하고 Apache Kafka로 데이터 전송(프로듀서)
- PySpark(Spark Structured Streaming)를 사용하여 Kafka 주제(컨슈머)에서 스트리밍 데이터 읽기
- 스트리밍 데이터를 Elasticsearch에 쓰고 Kibana를 사용하여 시각화
- 스트리밍 데이터를 MinIO(AWS Object Storage)에 쓰기
- Apache Airflow를 사용하여 전체 데이터 파이프라인 오케스트레이션
- Elasticsearch, Kibana 및 MinIO를 Docker로 컨테이너화
프로젝트 단계
- Apache Kafka, Apache Spark 및 Apache Hadoop을 로컬에 설치
- docker-compose.yaml을 사용하여 Elasticsearch, Kibana, MinIO 실행
- Airflow DAG을 통해 전체 데이터 파이프라인 확인
- 각 단계별 스크립트 및 설정 파일 제공
로컬 호스트 포트
- Elasticsearch -> localhost:5601
- Airflow -> localhost:1502
- MinIO -> localhost:9001
- Spark Jobs -> localhost:4040
- pgAdmin -> localhost:5050
- Kafka -> localhost:9092
- Hadoop Namenode -> localhost:9870
- Hadoop YARN -> localhost:8088/cluster
- Hadoop HDFS -> localhost:9000
모든 서비스 시작 명령어
sudo systemctl start docker
sudo systemctl start airflow
sudo systemctl start airflow-scheduler
sudo systemctl start zookeeper
sudo systemctl start kafka
start-all.sh
cd /<location_of_docker_compose.yaml>/ && docker-compose up -d
데이터 다운로드 및 전처리
데이터 다운로드
wget -O <your_local_directory>/sensors.zip https://github.com/dogukannulu/datasets/raw/master/sensors_instrumented_in_an_office_building_dataset.zip
데이터 압축 해제 및 README.txt 제거
unzip <location_of_zip_file>/sensors.zip -d /<desired_location_of_unzipped_folder/>
rm /<location_of_KETI>/KETI/README.txt
데이터를 HDFS에 저장
hdfs dfs -put /<location_of_KETI>/KETI/ /<desired_location_to_put_KETI>/
데이터 처리 및 저장
Pandas를 사용한 초기 데이터 읽기 및 쓰기
import os
import pandas as pd
from functools import reduce
directory = '/home/train/datasets/KETI'
dataframes = {}
dataframes_room = {}
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']
def create_separate_dataframes() -> dict:
for filename in os.listdir(directory):
new_directory = directory + '/' + filename
count = 0
for new_files in os.listdir(new_directory):
f = os.path.join(new_directory, new_files)
my_path = new_directory.split('/')[-1] + '_' + new_files.split('.')[0]
dataframes[my_path] = pd.read_csv(f, names=['ts_min_bignt', columns[count]])
count += 1
dataframes_room[filename] = reduce(lambda left, right:
pd.merge(left, right, on='ts_min_bignt', how='inner'),
[dataframes[f'{filename}_co2'], dataframes[f'{filename}_humidity'],
dataframes[f'{filename}_light'], dataframes[f'{filename}_pir'],
dataframes[f'{filename}_temperature']])
dataframes_room[filename]['room'] = filename
return dataframes_room
def create_main_dataframe(separate_dataframes: dict):
dataframes_to_concat = [i for i in separate_dataframes.values()]
df = pd.concat(dataframes_to_concat, ignore_index=True)
df = df.sort_values('ts_min_bignt')
df.dropna(inplace=True)
df["event_ts_min"] = pd.to_datetime(df["ts_min_bignt"], unit='s')
return df
def write_main_dataframe(df):
df.to_csv('/home/train/data-generator/input/sensors.csv', index=False)
dataframes_room = create_separate_dataframes()
main_df = create_main_dataframe(dataframes_room)
write_main_dataframe(main_df)
PySpark를 사용한 초기 데이터 읽기 및 쓰기
import os
import findspark
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
findspark.init("/opt/manual/spark")
spark = SparkSession.builder \
.appName("Spark Read Write") \
.master("local[2]") \
.getOrCreate()
def create_separate_dataframes() -> dict:
dataframes = {}
directory = '/home/train/datasets/KETI'
dataframes_room = {}
columns = ['co2', 'humidity', 'light', 'pir', 'temperature']
for filename in os.listdir(directory):
new_directory = directory + '/' + filename
for new_files in os.listdir(new_directory):
f = os.path.join(new_directory, new_files)
f_hdfs = f.replace('home', 'user')
my_path = filename + '_' + new_files.split('.')[0]
dataframes[my_path] = spark.read.csv(f'{f_hdfs}')
dataframes[my_path] = dataframes[my_path].toDF('ts_min_bignt', columns[count])
dataframes[f'{filename}_co2'].createOrReplaceTempView('df_co2')
dataframes[f'{filename}_humidity'].createOrReplaceTempView('df_humidity')
dataframes[f'{filename}_light'].createOrReplaceTempView('df_light')
dataframes[f'{filename}_pir'].createOrReplaceTempView('df_pir')
dataframes[f'{filename}_temperature'].createOrReplaceTempView('df_temperature')
dataframes_room[filename] = spark.sql('''
select
df_co2.*,
df_humidity.humidity,
df_light.light,
df_pir.pir,
df_temperature.temperature
from df_co2
inner join df_humidity on df_co2.ts_min_bignt = df_humidity.ts_min_bignt
inner join df_light on df_humidity.ts_min_bignt = df_light.ts_min_bignt
inner join df_pir on df_light.ts_min_bignt = df_pir.ts_min_bignt
inner join df_temperature on df_pir.ts_min_bignt = df_temperature.ts_min_bignt
''')
dataframes_room[filename] = dataframes_room[filename].withColumn("room", F.lit(filename))
return dataframes_room
def create_main_dataframe(separate_dataframes: dict):
dataframes_to_concat = [i for i in separate_dataframes.values()]
df = reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dataframes_to_concat)
df = df.sort(F.col("ts_min_bignt"))
df = df.dropna()
df_main = df.withColumn("event_ts_min", F.from_unixtime(F.col("ts_min_bignt")).cast(DateType()))
df_main = df_main.withColumn("event_ts_min", F.date_format(F.col("event_ts_min"), "yyyy-MM-dd HH:mm:ss"))
return df_main
def write_main_dataframe(df):
df = df.toPandas()
df.to_csv("/home/train/data-generator/input/sensors.csv")
dataframes_room = create_separate_dataframes()
main_df = create_main_dataframe(dataframes_room)
write_main_dataframe(main_df)
Kafka 토픽 생성
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers=['localhost:9092'],
client_id='kafka_admin_client'
)
topic_list = admin_client.list_topics()
def create_new_topic():
try:
admin_client.create_topics(new_topics=[NewTopic('office_input', 1, 1)])
return "Topic office_input successfully created"
except:
return "Topic office_input already exists"
kafka-topics.sh --bootstrap-server localhost:9092 --list
데이터 생성기 실행
#!/usr/bin/env bash
set -e
source /home/train/data-generator/datagen/bin/activate
python /home/train/data-generator/dataframe_to_kafka.py -i /home/train/data-generator/input/sensors.csv -t office_input -rst 2
이 단계까지 우리는 원격 데이터 소스를 가져와 로컬에 저장하고, Pandas 또는 PySpark를 사용하여 데이터를 처리했으며, Kafka 토픽을 생성하고, 데이터를 Kafka 토픽에 스트리밍으로 전송하는 작업을 완료했습니다. 다음 단계는 Elasticsearch와 MinIO에 데이터를 저장하고, Kibana로 시각화하는 작업입니다. 각 단계에 대한 코드와 설정을 통해 쉽게 이해할 수 있도록 설명했습니다.
댓글