본문 바로가기
서버구축 (WEB,DB)

Pandas보다 빠르고, SQL만큼 쉬운 실시간 분석 DB, DuckDB 강력 도구

by 날으는물고기 2025. 6. 20.

Pandas보다 빠르고, SQL만큼 쉬운 실시간 분석 DB, DuckDB 강력 도구

728x90

DuckDB: 데이터 분석의 판도를 바꾼 혁신적인 분석형 데이터베이스

DuckDB는 내장형 분석 데이터베이스(embedded analytical database)로, SQLite의 철학을 분석 워크로드에 적용한 오픈소스 프로젝트입니다. 2019년 네덜란드의 CWI(Centrum Wiskunde & Informatica) 연구소에서 시작되어, 현재는 DuckDB Labs를 통해 개발되고 있습니다.

핵심 특징

1. 컬럼 지향 저장 구조

  • 행 단위가 아닌 컬럼 단위로 데이터를 저장하여 분석 쿼리에 최적화
  • 압축률이 높고 메모리 효율적

2. 벡터화된 쿼리 실행

  • CPU의 SIMD(Single Instruction, Multiple Data) 명령어를 활용
  • 한 번에 여러 데이터를 병렬 처리

3. 제로 카피 통합

  • Pandas DataFrame, Arrow 테이블과 직접 통합
  • 데이터 복사 없이 즉시 쿼리 가능

DuckDB의 부상 역사

2019-2020: 초기 개발 단계

DuckDB는 Mark Raasveldt와 Hannes Mühleisen이 CWI 연구소에서 시작했습니다. 초기 목표는 데이터 과학자들이 노트북에서 대용량 데이터를 분석할 수 있는 도구를 만드는 것이었습니다.

# 초기 DuckDB 사용 예시 (2020년)
import duckdb

conn = duckdb.connect()
conn.execute("CREATE TABLE users (id INTEGER, name VARCHAR)")
conn.execute("INSERT INTO users VALUES (1, '김철수'), (2, '이영희')")
result = conn.execute("SELECT * FROM users").fetchall()

2021-2022: 생태계 확장

Python, R, Java, Node.js 등 다양한 언어 바인딩이 추가되었고, Parquet 파일 직접 쿼리 기능이 도입되었습니다.

# Parquet 파일 직접 쿼리 (2021년 도입)
import duckdb

# 파일을 로드하지 않고 직접 쿼리
result = duckdb.query("""
    SELECT category, SUM(sales) as total_sales
    FROM 'sales_data.parquet'
    WHERE year = 2021
    GROUP BY category
""").to_df()

2023: 지리공간 혁명

2023년 11월, DuckDB Spatial 확장이 출시되면서 지리공간 데이터 분석의 진입 장벽이 획기적으로 낮아졌습니다.

-- 지리공간 확장 설치 및 로드
INSTALL spatial;
LOAD spatial;

-- GeoParquet 파일에서 서울시 내 편의점 찾기
SELECT 
    name,
    ST_AsText(geometry) as location
FROM 'korea_pois.parquet'
WHERE 
    category = '편의점' AND
    ST_Within(geometry, 
        ST_GeomFromText('POLYGON((126.8 37.4, 127.2 37.4, 127.2 37.7, 126.8 37.7, 126.8 37.4))'))
LIMIT 10;

2024-2025: 현재와 미래

DuckDB는 이제 데이터 분석 생태계의 핵심 구성 요소가 되었습니다.

  • 확장 생태계: HTTP, PostgreSQL, SQLite 스캐너 등 다양한 확장 기능
  • 클라우드 통합: S3, Azure Blob Storage 직접 쿼리
  • 성능 최적화: 병렬 처리, 메모리 관리 개선

구체적인 사용 예시

1. 대용량 CSV 분석

import duckdb
import time

# 10GB CSV 파일 분석
start_time = time.time()

# DuckDB로 직접 쿼리
result = duckdb.query("""
    SELECT 
        product_category,
        DATE_TRUNC('month', order_date) as month,
        COUNT(*) as order_count,
        SUM(amount) as total_revenue
    FROM read_csv_auto('large_orders.csv')
    WHERE order_date >= '2024-01-01'
    GROUP BY 1, 2
    ORDER BY 2, 4 DESC
""").to_df()

print(f"처리 시간: {time.time() - start_time:.2f}초")
# 처리 시간: 12.34초 (Pandas로는 메모리 부족)

2. 다중 파일 조인 분석

# 여러 Parquet 파일을 조인하여 분석
query = """
WITH customer_segments AS (
    SELECT 
        customer_id,
        CASE 
            WHEN total_purchases > 1000000 THEN 'VIP'
            WHEN total_purchases > 100000 THEN 'Gold'
            ELSE 'Regular'
        END as segment
    FROM 'customers/*.parquet'
),
recent_orders AS (
    SELECT 
        customer_id,
        COUNT(*) as order_count,
        AVG(order_amount) as avg_order_value
    FROM 'orders/2024/*.parquet'
    GROUP BY customer_id
)
SELECT 
    cs.segment,
    COUNT(DISTINCT cs.customer_id) as customer_count,
    AVG(ro.order_count) as avg_orders_per_customer,
    AVG(ro.avg_order_value) as avg_order_value
FROM customer_segments cs
JOIN recent_orders ro ON cs.customer_id = ro.customer_id
GROUP BY cs.segment
ORDER BY 
    CASE cs.segment 
        WHEN 'VIP' THEN 1 
        WHEN 'Gold' THEN 2 
        ELSE 3 
    END
"""

result = duckdb.query(query).to_df()

3. 실시간 스트리밍 데이터 분석

import duckdb
from datetime import datetime, timedelta

# 메모리 내 테이블 생성
conn = duckdb.connect(':memory:')

# 실시간 로그 데이터 테이블
conn.execute("""
    CREATE TABLE access_logs (
        timestamp TIMESTAMP,
        user_id VARCHAR,
        endpoint VARCHAR,
        response_time INTEGER,
        status_code INTEGER
    )
""")

# 5분 단위 집계 뷰 생성
conn.execute("""
    CREATE VIEW metrics_5min AS
    SELECT 
        DATE_TRUNC('minute', timestamp) as minute,
        endpoint,
        COUNT(*) as request_count,
        AVG(response_time) as avg_response_time,
        PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY response_time) as p95_response_time,
        SUM(CASE WHEN status_code >= 500 THEN 1 ELSE 0 END) as error_count
    FROM access_logs
    WHERE timestamp > NOW() - INTERVAL '5 minutes'
    GROUP BY 1, 2
""")

# 실시간 데이터 삽입 및 조회 (예시)
def process_new_logs(log_batch):
    conn.executemany(
        "INSERT INTO access_logs VALUES (?, ?, ?, ?, ?)",
        log_batch
    )

    # 최신 메트릭 조회
    metrics = conn.execute("SELECT * FROM metrics_5min ORDER BY minute DESC").fetchdf()
    return metrics

4. 지리공간 데이터 분석 실전 예시

-- 서울시 지하철역 주변 500m 내 카페 밀도 분석
WITH subway_buffers AS (
    SELECT 
        station_name,
        ST_Buffer(geometry, 500) as buffer_geom
    FROM 'seoul_subway_stations.parquet'
),
cafe_counts AS (
    SELECT 
        s.station_name,
        COUNT(c.id) as cafe_count
    FROM subway_buffers s
    JOIN 'seoul_cafes.parquet' c
        ON ST_Within(c.geometry, s.buffer_geom)
    GROUP BY s.station_name
)
SELECT 
    station_name,
    cafe_count,
    RANK() OVER (ORDER BY cafe_count DESC) as density_rank
FROM cafe_counts
ORDER BY cafe_count DESC
LIMIT 20;

5. 시계열 데이터 분석

# 주식 데이터 기술적 분석
query = """
WITH price_data AS (
    SELECT 
        symbol,
        date,
        close_price,
        volume,
        -- 이동평균 계산
        AVG(close_price) OVER (
            PARTITION BY symbol 
            ORDER BY date 
            ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
        ) as ma20,
        AVG(close_price) OVER (
            PARTITION BY symbol 
            ORDER BY date 
            ROWS BETWEEN 49 PRECEDING AND CURRENT ROW
        ) as ma50,
        -- RSI 계산을 위한 가격 변화
        close_price - LAG(close_price) OVER (PARTITION BY symbol ORDER BY date) as price_change
    FROM 'stock_prices.parquet'
    WHERE date >= '2023-01-01'
),
rsi_calc AS (
    SELECT 
        *,
        AVG(CASE WHEN price_change > 0 THEN price_change ELSE 0 END) OVER (
            PARTITION BY symbol 
            ORDER BY date 
            ROWS BETWEEN 13 PRECEDING AND CURRENT ROW
        ) as avg_gain,
        AVG(CASE WHEN price_change < 0 THEN ABS(price_change) ELSE 0 END) OVER (
            PARTITION BY symbol 
            ORDER BY date 
            ROWS BETWEEN 13 PRECEDING AND CURRENT ROW
        ) as avg_loss
    FROM price_data
)
SELECT 
    symbol,
    date,
    close_price,
    volume,
    ma20,
    ma50,
    CASE 
        WHEN avg_loss = 0 THEN 100
        ELSE 100 - (100 / (1 + (avg_gain / avg_loss)))
    END as rsi_14,
    -- 골든크로스/데드크로스 시그널
    CASE 
        WHEN ma20 > ma50 AND LAG(ma20) OVER (PARTITION BY symbol ORDER BY date) <= LAG(ma50) OVER (PARTITION BY symbol ORDER BY date) 
        THEN 'Golden Cross'
        WHEN ma20 < ma50 AND LAG(ma20) OVER (PARTITION BY symbol ORDER BY date) >= LAG(ma50) OVER (PARTITION BY symbol ORDER BY date)
        THEN 'Death Cross'
        ELSE NULL
    END as signal
FROM rsi_calc
WHERE signal IS NOT NULL
ORDER BY date DESC, symbol;
"""

signals = duckdb.query(query).to_df()

DuckDB의 현재 위치

채택 현황

  • 데이터 엔지니어링: dbt, Airflow, Dagster 등과 통합
  • 데이터 과학: Jupyter, RStudio에서 표준 도구로 자리잡음
  • 비즈니스 인텔리전스: Metabase, Superset 등 BI 도구 지원

성능 벤치마크

TPC-H 벤치마크 기준 (100GB 데이터셋):

  • DuckDB: 45초
  • PostgreSQL: 8분
  • SQLite: 실행 불가 (메모리 부족)
  • Pandas: 실행 불가 (메모리 부족)

향후 전망

1. 분산 처리 지원

현재 단일 노드에서만 동작하지만, 분산 처리 기능 개발 중입니다.

2. 실시간 스트리밍

Apache Kafka, Pulsar 등과의 네이티브 통합이 계획되어 있습니다.

3. AI/ML 통합

벡터 데이터베이스 기능과 ML 모델 실행 기능이 추가될 예정입니다.

# 미래의 DuckDB (예상)
result = duckdb.query("""
    SELECT 
        product_id,
        product_name,
        cosine_similarity(
            product_embedding, 
            text_to_embedding('스포츠 신발')
        ) as similarity
    FROM products
    WHERE category = '신발'
    ORDER BY similarity DESC
    LIMIT 10
""").to_df()

DuckDB는 단순히 또 하나의 데이터베이스가 아닙니다. 데이터 분석의 민주화를 실현하고, 복잡한 인프라 없이도 대용량 데이터를 다룰 수 있게 만든 혁신적인 도구입니다. 특히 지리공간 확장의 출시는 GIS 전문가가 아닌 일반 데이터 분석가들도 지리공간 분석을 수행할 수 있게 만들었습니다.

300x250

앞으로도 DuckDB는 더 많은 데이터 타입과 분석 기능을 지원하며, 현대 데이터 스택의 핵심 구성 요소로 자리매김할 것으로 예상됩니다. 데이터 분석가, 엔지니어, 과학자라면 반드시 익혀야 할 도구가 되었습니다.

DuckDB 환경 구축 운영체제별 설치 방법

Windows 환경

방법 1: Python pip 설치 (권장)

# 1. Python이 설치되어 있는지 확인
python --version

# 2. pip 업그레이드
python -m pip install --upgrade pip

# 3. DuckDB 설치
pip install duckdb

# 4. 설치 확인
python -c "import duckdb; print(duckdb.__version__)"

방법 2: 바이너리 직접 다운로드

# PowerShell 관리자 권한으로 실행

# 1. DuckDB CLI 다운로드
Invoke-WebRequest -Uri "https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-windows-amd64.zip" -OutFile "duckdb.zip"

# 2. 압축 해제
Expand-Archive -Path "duckdb.zip" -DestinationPath "C:\duckdb"

# 3. 환경 변수 설정
[System.Environment]::SetEnvironmentVariable("Path", $env:Path + ";C:\duckdb", [System.EnvironmentVariableTarget]::User)

# 4. 새 터미널 열고 확인
duckdb --version

macOS 환경

방법 1: Homebrew 설치 (권장)

# 1. Homebrew 설치 (이미 있다면 생략)
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

# 2. DuckDB 설치
brew install duckdb

# 3. Python 바인딩 설치
pip3 install duckdb

# 4. 설치 확인
duckdb --version
python3 -c "import duckdb; print(duckdb.__version__)"

방법 2: conda 설치

# 1. conda-forge 채널 추가
conda config --add channels conda-forge

# 2. DuckDB 설치
conda install python-duckdb

# 3. 설치 확인
conda list | grep duckdb

Linux (Ubuntu/Debian) 환경

방법 1: apt 패키지 매니저

# 1. 시스템 업데이트
sudo apt update && sudo apt upgrade -y

# 2. 필수 패키지 설치
sudo apt install -y wget unzip python3-pip

# 3. DuckDB CLI 다운로드 및 설치
wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
sudo mv duckdb /usr/local/bin/
sudo chmod +x /usr/local/bin/duckdb

# 4. Python 바인딩 설치
pip3 install duckdb

# 5. 설치 확인
duckdb --version
python3 -c "import duckdb; print(duckdb.__version__)"

개발 환경별 설정

VSCode 설정

확장 프로그램 설치

# VSCode 터미널에서 실행
code --install-extension ms-python.python
code --install-extension ms-toolsai.jupyter
code --install-extension mtxr.sqltools
code --install-extension mtxr.sqltools-driver-duckdb

settings.json 설정

{
    "sqltools.connections": [
        {
            "name": "Local DuckDB",
            "driver": "DuckDB",
            "database": "${workspaceFolder}/my_database.db"
        }
    ],
    "python.defaultInterpreterPath": "${workspaceFolder}/venv/bin/python",
    "python.terminal.activateEnvironment": true
}

Jupyter Notebook/Lab 설정

Jupyter 설치 및 설정

# 1. Jupyter 설치
pip install jupyter jupyterlab

# 2. 확장 프로그램 설치
pip install ipywidgets
jupyter nbextension enable --py widgetsnbextension

# 3. SQL 매직 명령어 설치
pip install ipython-sql duckdb-engine

Jupyter 환경에서 DuckDB 사용 설정

# ~/.jupyter/jupyter_notebook_config.py 파일 생성
c = get_config()
c.NotebookApp.notebook_dir = '~/duckdb_projects'
c.NotebookApp.open_browser = True

PyCharm 설정

DuckDB 데이터베이스 연결

1. View → Tool Windows → Database 클릭
2. + 버튼 → Data Source → DuckDB 선택
3. Driver 다운로드 (자동)
4. Database 경로 설정
5. Test Connection 클릭

프로젝트 구조 설정

기본 프로젝트 구조

my_duckdb_project/
├── data/
│   ├── raw/              # 원본 데이터
│   ├── processed/        # 처리된 데이터
│   └── output/          # 결과 데이터
├── notebooks/           # Jupyter 노트북
├── scripts/            # Python 스크립트
│   ├── __init__.py
│   ├── etl.py
│   └── analysis.py
├── sql/               # SQL 쿼리 파일
│   ├── create_tables.sql
│   └── queries/
├── tests/             # 테스트 코드
├── .env              # 환경 변수
├── requirements.txt   # 의존성 목록
└── README.md         # 프로젝트 설명

프로젝트 초기 설정 스크립트

#!/bin/bash
# setup_project.sh

# 1. 프로젝트 디렉토리 생성
PROJECT_NAME="my_duckdb_project"
mkdir -p $PROJECT_NAME/{data/{raw,processed,output},notebooks,scripts,sql/queries,tests}
cd $PROJECT_NAME

# 2. 가상환경 생성 및 활성화
python3 -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 3. 필수 패키지 설치
cat > requirements.txt << EOF
duckdb==1.0.0
pandas==2.1.3
numpy==1.25.2
jupyter==1.0.0
jupyterlab==4.0.9
plotly==5.18.0
matplotlib==3.8.2
seaborn==0.13.0
python-dotenv==1.0.0
pytest==7.4.3
EOF

pip install -r requirements.txt

# 4. .env 파일 생성
cat > .env << EOF
DATABASE_PATH=./data/main.duckdb
LOG_LEVEL=INFO
THREADS=4
MEMORY_LIMIT=8GB
EOF

# 5. 초기 Python 스크립트 생성
cat > scripts/__init__.py << EOF
import os
from dotenv import load_dotenv

load_dotenv()

DATABASE_PATH = os.getenv('DATABASE_PATH', ':memory:')
EOF

# 6. 데이터베이스 초기화 스크립트
cat > scripts/init_db.py << EOF
import duckdb
import os
from dotenv import load_dotenv

load_dotenv()

def init_database():
    db_path = os.getenv('DATABASE_PATH', './data/main.duckdb')
    conn = duckdb.connect(db_path)
    
    # 기본 설정
    conn.execute("SET threads TO 4")
    conn.execute("SET memory_limit = '8GB'")
    
    # 확장 프로그램 설치
    conn.execute("INSTALL httpfs")
    conn.execute("INSTALL parquet")
    
    print(f"Database initialized at: {db_path}")
    return conn

if __name__ == "__main__":
    init_database()
EOF

python scripts/init_db.py

DuckDB 확장 프로그램 설치

필수 확장 프로그램 설치 스크립트

# install_extensions.py
import duckdb

def install_all_extensions():
    conn = duckdb.connect()
    
    extensions = [
        'httpfs',      # HTTP/S3 파일 시스템 지원
        'parquet',     # Parquet 파일 지원
        'json',        # JSON 파일 지원
        'excel',       # Excel 파일 지원
        'spatial',     # 지리공간 데이터 지원
        'sqlite',      # SQLite 파일 읽기
        'postgres',    # PostgreSQL 연결
        'substrait',   # Substrait 지원
        'arrow',       # Apache Arrow 통합
        'aws',         # AWS 서비스 통합
        'azure',       # Azure 서비스 통합
        'inet',        # 네트워크 주소 타입
        'tpch',        # TPC-H 벤치마크 데이터
        'tpcds',       # TPC-DS 벤치마크 데이터
    ]
    
    for ext in extensions:
        try:
            conn.execute(f"INSTALL {ext}")
            conn.execute(f"LOAD {ext}")
            print(f"✓ {ext} 확장 설치 완료")
        except Exception as e:
            print(f"✗ {ext} 확장 설치 실패: {e}")
    
    # 설치된 확장 확인
    installed = conn.execute("SELECT * FROM duckdb_extensions()").fetchall()
    print("\n설치된 확장 목록:")
    for ext in installed:
        print(f"  - {ext[0]}: {ext[1]}")

if __name__ == "__main__":
    install_all_extensions()

성능 최적화 설정

DuckDB 설정 파일

# config.py
import duckdb
import os

def create_optimized_connection():
    """최적화된 DuckDB 연결 생성"""
    
    # 시스템 리소스 확인
    import psutil
    total_memory = psutil.virtual_memory().total
    cpu_count = psutil.cpu_count()
    
    # 메모리의 60% 할당
    memory_limit = int(total_memory * 0.6 / (1024**3))  # GB 단위
    
    # CPU 코어의 75% 사용
    threads = max(1, int(cpu_count * 0.75))
    
    # 연결 생성
    conn = duckdb.connect(
        database=os.getenv('DATABASE_PATH', ':memory:'),
        config={
            'threads': threads,
            'memory_limit': f'{memory_limit}GB',
            'max_memory': f'{memory_limit}GB',
            'temp_directory': './data/temp',
            'preserve_insertion_order': False,
            'checkpoint_threshold': '1GB',
            'wal_autocheckpoint': '1GB',
            'default_null_order': 'nulls_last',
            'enable_progress_bar': True,
            'enable_object_cache': True,
        }
    )
    
    print(f"DuckDB 연결 생성 완료:")
    print(f"  - 스레드: {threads}")
    print(f"  - 메모리 제한: {memory_limit}GB")
    
    return conn

# 사용 예시
if __name__ == "__main__":
    conn = create_optimized_connection()
    
    # 현재 설정 확인
    settings = conn.execute("SELECT * FROM duckdb_settings()").fetchdf()
    print("\n주요 설정:")
    important_settings = ['threads', 'memory_limit', 'temp_directory']
    for setting in important_settings:
        value = settings[settings['name'] == setting]['value'].values[0]
        print(f"  - {setting}: {value}")

도커 환경 구축

Dockerfile

FROM python:3.11-slim

# 시스템 패키지 설치
RUN apt-get update && apt-get install -y \
    wget \
    curl \
    git \
    && rm -rf /var/lib/apt/lists/*

# 작업 디렉토리 설정
WORKDIR /app

# Python 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# DuckDB CLI 설치
RUN wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-amd64.zip \
    && unzip duckdb_cli-linux-amd64.zip \
    && mv duckdb /usr/local/bin/ \
    && rm duckdb_cli-linux-amd64.zip

# 애플리케이션 코드 복사
COPY . .

# 볼륨 마운트 포인트
VOLUME ["/app/data"]

# Jupyter Lab 포트
EXPOSE 8888

# 시작 명령
CMD ["jupyter", "lab", "--ip=0.0.0.0", "--port=8888", "--no-browser", "--allow-root"]

docker-compose.yml

version: '3.8'

services:
  duckdb-dev:
    build: .
    container_name: duckdb-dev
    ports:
      - "8888:8888"
    volumes:
      - ./data:/app/data
      - ./notebooks:/app/notebooks
      - ./scripts:/app/scripts
    environment:
      - DATABASE_PATH=/app/data/main.duckdb
      - JUPYTER_ENABLE_LAB=yes
    command: >
      bash -c "
        python scripts/init_db.py &&
        jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root
      "

도커 실행 명령

# 이미지 빌드 및 실행
docker-compose up -d

# 로그 확인
docker-compose logs -f

# Jupyter 토큰 확인
docker exec duckdb-dev jupyter notebook list

# 컨테이너 접속
docker exec -it duckdb-dev bash

클라우드 환경 설정

AWS S3 연동

# s3_setup.py
import duckdb
import os

def setup_s3_access():
    conn = duckdb.connect()
    
    # httpfs 확장 로드
    conn.execute("INSTALL httpfs")
    conn.execute("LOAD httpfs")
    
    # AWS 자격 증명 설정
    conn.execute(f"""
        SET s3_region='{os.getenv('AWS_DEFAULT_REGION', 'ap-northeast-2')}';
        SET s3_access_key_id='{os.getenv('AWS_ACCESS_KEY_ID')}';
        SET s3_secret_access_key='{os.getenv('AWS_SECRET_ACCESS_KEY')}';
    """)
    
    # S3 데이터 직접 쿼리 테스트
    result = conn.execute("""
        SELECT COUNT(*) 
        FROM read_parquet('s3://my-bucket/data/*.parquet')
    """).fetchone()
    
    print(f"S3 연결 성공! 총 레코드 수: {result[0]}")
    return conn

Google Cloud Storage 연동

# gcs_setup.py
import duckdb

def setup_gcs_access():
    conn = duckdb.connect()
    
    conn.execute("INSTALL httpfs")
    conn.execute("LOAD httpfs")
    
    # GCS 인증 설정
    conn.execute(f"""
        SET gcs_credentials_path='{os.getenv('GOOGLE_APPLICATION_CREDENTIALS')}';
    """)
    
    # GCS 데이터 쿼리
    result = conn.execute("""
        SELECT * 
        FROM read_csv('gs://my-bucket/data.csv')
        LIMIT 10
    """).fetchdf()
    
    return result

개발 도구 통합

Git pre-commit 훅 설정

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.5.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files
      
  - repo: https://github.com/psf/black
    rev: 23.11.0
    hooks:
      - id: black
        language_version: python3.11
        
  - repo: https://github.com/sqlfluff/sqlfluff
    rev: 2.3.5
    hooks:
      - id: sqlfluff-lint
        files: '\.sql$'

설치 명령

pip install pre-commit
pre-commit install

테스트 환경 설정

# tests/conftest.py
import pytest
import duckdb
import tempfile
import os

@pytest.fixture(scope="session")
def test_db():
    """테스트용 DuckDB 연결 생성"""
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
        db_path = f.name
    
    conn = duckdb.connect(db_path)
    
    # 테스트 데이터 설정
    conn.execute("""
        CREATE TABLE test_users (
            id INTEGER PRIMARY KEY,
            name VARCHAR,
            created_at TIMESTAMP
        )
    """)
    
    yield conn
    
    conn.close()
    os.unlink(db_path)

# tests/test_queries.py
def test_basic_query(test_db):
    result = test_db.execute("SELECT 1").fetchone()
    assert result[0] == 1
    
def test_insert_query(test_db):
    test_db.execute("INSERT INTO test_users VALUES (1, 'Test User', CURRENT_TIMESTAMP)")
    result = test_db.execute("SELECT COUNT(*) FROM test_users").fetchone()
    assert result[0] == 1

모니터링 및 로깅 설정

로깅 설정

# logging_config.py
import logging
import sys
from datetime import datetime

def setup_logging():
    """DuckDB 쿼리 로깅 설정"""
    
    # 로거 생성
    logger = logging.getLogger('duckdb_logger')
    logger.setLevel(logging.INFO)
    
    # 파일 핸들러
    file_handler = logging.FileHandler(
        f'logs/duckdb_{datetime.now().strftime("%Y%m%d")}.log'
    )
    file_handler.setLevel(logging.INFO)
    
    # 콘솔 핸들러
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.WARNING)
    
    # 포맷터
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    # 핸들러 추가
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

# 쿼리 실행 래퍼
def execute_with_logging(conn, query, logger):
    """로깅과 함께 쿼리 실행"""
    start_time = datetime.now()
    logger.info(f"쿼리 시작: {query[:100]}...")
    
    try:
        result = conn.execute(query)
        duration = (datetime.now() - start_time).total_seconds()
        logger.info(f"쿼리 완료 (소요시간: {duration:.2f}초)")
        return result
    except Exception as e:
        logger.error(f"쿼리 실패: {str(e)}")
        raise

일반적인 문제와 해결 방법

1. 메모리 부족 오류

# 메모리 사용량 모니터링
import duckdb
import psutil

def check_memory_usage(conn):
    # 시스템 메모리
    mem = psutil.virtual_memory()
    print(f"시스템 메모리: {mem.percent}% 사용중")
    
    # DuckDB 메모리 설정
    memory_limit = conn.execute("SELECT current_setting('memory_limit')").fetchone()[0]
    print(f"DuckDB 메모리 제한: {memory_limit}")
    
    # 임시 파일 사용 설정
    conn.execute("SET temp_directory='./data/temp'")
    conn.execute("SET preserve_insertion_order=false")

2. 확장 프로그램 로드 실패

# 수동 설치 스크립트
#!/bin/bash

# DuckDB 버전 확인
DUCKDB_VERSION=$(duckdb --version | grep -oE '[0-9]+\.[0-9]+\.[0-9]+')

# 확장 프로그램 수동 다운로드
mkdir -p ~/.duckdb/extensions/$DUCKDB_VERSION/
cd ~/.duckdb/extensions/$DUCKDB_VERSION/

# 필요한 확장 다운로드
for ext in spatial parquet httpfs; do
    wget https://extensions.duckdb.org/v$DUCKDB_VERSION/linux_amd64/$ext.duckdb_extension.gz
    gunzip $ext.duckdb_extension.gz
done

이제 DuckDB 환경이 완전히 구축되었습니다! 각 단계를 따라하면서 본인의 환경에 맞게 조정하시면 됩니다.

728x90
그리드형(광고전용)

댓글