본문 바로가기
프로그램 (PHP,Python)

FastAPI 대량 데이터 처리와 BigQuery 사용한 효과적인 작업 수행

by 날으는물고기 2024. 3. 28.

FastAPI 대량 데이터 처리와 BigQuery 사용한 효과적인 작업 수행

FastAPI Tutorial: An Introduction to Using FastAPI - DataCamp

FastAPI에서 대량의 데이터를 다루는 경우, 효과적인 처리를 위해 여러 방법을 사용할 수 있습니다. 주로 고려해야 할 부분은 데이터의 저장, 검색, 전송이며, 이를 위해 비동기 처리, 데이터 스트리밍, 배치 처리, 데이터베이스 최적화 등의 기술을 활용할 수 있습니다. 다음은 대량의 데이터를 효과적으로 다루기 위한 몇 가지 방법과 예제 코드를 제공합니다.

1. 비동기 처리

대량의 데이터를 처리할 때, 요청을 비동기적으로 처리하면 서버의 응답성을 향상시킬 수 있습니다. FastAPI는 Python의 asyncawait 구문을 사용하여 비동기 처리를 쉽게 구현할 수 있도록 지원합니다.

 

데이터베이스 비동기 조회 예시

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.future import select
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine, class_=AsyncSession)
Base = declarative_base()

# 데이터 모델 예시
class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)

app = FastAPI()

# 비동기 데이터베이스 세션 생성 및 조회
async def get_items(db: Session):
    async with db() as session:
        async with session.begin():
            result = await session.execute(select(Item))
            return result.scalars().all()

@app.get("/items/")
async def read_items():
    items = await get_items(SessionLocal)
    return items

2. 데이터 스트리밍

큰 파일이나 대용량의 데이터를 전송할 때는 스트리밍을 사용하여 메모리 사용량을 최소화하고, 사용자에게 더 빠른 응답을 제공할 수 있습니다.

 

파일 스트리밍 예시

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

def iterfile():  
    with open("large_file.txt", mode="rb") as file_like:  # 대용량 파일
        yield from file_like

@app.get("/large-file/")
async def get_large_file():
    return StreamingResponse(iterfile(), media_type="text/plain")

3. 배치 처리

데이터를 일괄적으로 처리하는 배치 처리는 대량의 데이터를 효율적으로 처리할 때 유용합니다. FastAPI에서는 백그라운드 작업을 이용하여 배치 처리를 구현할 수 있습니다.

 

백그라운드 작업 예시

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def process_data(data):
    # 여기에 데이터 처리 로직 구현
    pass

@app.post("/process/")
async def process(background_tasks: BackgroundTasks, data: list):
    background_tasks.add_task(process_data, data)
    return {"message": "Processing data in the background"}

4. 데이터베이스 최적화

대량의 데이터를 다룰 때는 데이터베이스의 성능도 중요합니다. 적절한 인덱싱, 쿼리 최적화, 적합한 데이터베이스 엔진 선택 등을 통해 데이터베이스 작업의 효율을 높일 수 있습니다.

 

SQL 쿼리 최적화 예시

  • 필요한 데이터만 조회하기 위해 SELECT 구문을 최적화합니다.
  • 대량의 데이터를 다룰 때는 적절한 인덱스를 사용하여 검색 성능을 향상시킵니다.

 

FastAPI에서 대량의 데이터를 효과적으로 다루기 위해서는 비동기 처리, 데이터 스트리밍, 배치 처리, 데이터베이스 최적화 등 다양한 전략을 적절히 조합하여 사용해야 합니다. 이를 통해 서버의 성능을 최대화하고 사용자에게 더 나은 서비스를 제공할 수 있습니다.

 

FastAPI와 BigQuery를 사용하여 데이터 처리 작업을 수행할 때, 병렬 처리를 통해 성능을 개선할 수 있습니다. 이러한 개선은 주로 비동기 처리, 배치 요청, 멀티 스레딩 또는 멀티 프로세싱 방법을 통해 달성할 수 있습니다. 아래에서는 이러한 기법들을 활용하여 FastAPI 애플리케이션에서 BigQuery 작업의 수행 시간을 개선하는 방법을 설명합니다.

1. 비동기 처리

FastAPI는 비동기 프레임워크이며, Python의 asyncio 라이브러리와 await 구문을 사용하여 비동기 요청을 처리할 수 있습니다. BigQuery 클라이언트 라이브러리는 google-cloud-bigquery 패키지를 사용하는데, 이 패키지는 비동기 API를 제공하지 않으므로, aiogoogle와 같은 비동기 HTTP 클라이언트를 사용하여 BigQuery REST API에 비동기 요청을 보내는 방법이 있습니다.

 

예시 코드: 비동기 BigQuery 요청

from fastapi import FastAPI
from aiogoogle import Aiogoogle
from aiogoogle.auth.creds import ServiceAccountCreds

app = FastAPI()

async def query_bigquery(sql: str):
    creds = ServiceAccountCreds(scopes=["https://www.googleapis.com/auth/bigquery"])
    async with Aiogoogle(service_account_creds=creds) as aiogoogle:
        bigquery = await aiogoogle.discover("bigquery", "v2")
        project_id = "your-project-id"
        query_data = {
            "query": sql,
            "useLegacySql": False,
        }
        res = await aiogoogle.as_service_account(
            bigquery.jobs.query(projectId=project_id, json=query_data)
        )
        return res

@app.get("/query/")
async def run_query():
    sql = "SELECT * FROM `your-dataset.your-table` LIMIT 100"
    result = await query_bigquery(sql)
    return result

2. 배치 요청

BigQuery에서는 여러 SQL 쿼리를 하나의 요청으로 묶어 처리할 수 있는 배치 요청 기능을 제공합니다. 이를 활용하면 네트워크 지연 시간을 줄이고, 리소스 사용을 최적화하여 전체적인 성능을 향상시킬 수 있습니다.

 

예시 코드: 배치 요청

BigQuery의 클라이언트 라이브러리는 직접적인 배치 요청 기능을 제공하지 않으므로, 여러 쿼리를 실행하는 함수를 병렬로 실행하는 방법으로 대체할 수 있습니다.

3. 멀티 스레딩과 멀티 프로세싱

FastAPI 애플리케이션에서 BigQuery 요청을 병렬로 처리하기 위해 Python의 concurrent.futures 모듈의 ThreadPoolExecutor 또는 ProcessPoolExecutor를 사용할 수 있습니다. 이를 통해 CPU 또는 I/O 바운드 작업에 대해 동시에 여러 요청을 처리할 수 있습니다.

 

예시 코드: 멀티 스레딩을 사용한 BigQuery 요청 처리

from concurrent.futures import ThreadPoolExecutor
from google.cloud import bigquery
from fastapi import FastAPI

app = FastAPI()
client = bigquery.Client()

def query_bigquery(sql: str):
    query_job = client.query(sql)
    return query_job.result()

@app.get("/query/")
def run_query():
    sql_queries = [
        "SELECT * FROM `your-dataset.your-table` LIMIT 100",
        "SELECT * FROM `your-dataset.your-other-table` LIMIT 100",
        # 추가 쿼리들...
    ]
    with ThreadPoolExecutor() as executor:
        results = list(executor.map(query_bigquery, sql_queries))
    return {"results": results}

위 코드 예시들은 FastAPI와 BigQuery를 사용하여 병렬 처리를 구현하는 방법에 대한 기초를 제공합니다. 이러한 방법을 통해 성능을 개선하고 애플리케이션의 처리 능력을 향상시킬 수 있습니다. 각 방법은 특정 상황에 따라 장단점이 있으므로, 사용 사례에 가장 적합한 방법을 선택하는 것이 중요합니다.

추가 고려 사항

A. 비동기 처리와 병렬 처리의 조화

  • 비동기 처리는 주로 I/O 바운드 작업에 적합하며, 병렬 처리는 CPU 바운드 작업 또는 대량의 I/O 작업에 유리합니다.
  • FastAPI에서는 asyncawait를 사용하여 비동기 처리를 쉽게 구현할 수 있으며, concurrent.futures와 같은 모듈을 사용하여 병렬 처리를 구현할 수 있습니다.

B. 성능 테스트 및 모니터링

  • 성능 개선을 위한 변경 사항을 적용한 후에는, 애플리케이션의 성능 테스트를 수행하여 실제 성능 개선 효과를 확인해야 합니다.
  • Google Cloud Platform은 BigQuery 작업에 대한 상세한 모니터링과 로깅 기능을 제공하므로, 이를 활용하여 쿼리 성능을 분석하고 필요에 따라 최적화할 수 있습니다.

C. 에러 핸들링

  • 병렬 처리나 비동기 작업을 수행할 때는 에러 핸들링이 중요합니다. 각 요청이나 작업은 독립적으로 실패할 수 있으므로, 적절한 예외 처리 로직을 구현해야 합니다.
  • FastAPI는 자체적인 예외 처리 시스템을 제공하므로, 이를 활용하여 사용자에게 의미 있는 에러 메시지를 반환하고, 필요한 경우 재시도 로직을 구현할 수 있습니다.

D. 리소스 관리

  • 병렬 처리를 증가시킬 때는 사용되는 리소스(메모리, CPU 등)에 대한 관리가 중요합니다. 특히, 멀티 프로세싱을 사용할 때는 프로세스 간 리소스 공유가 제한적이므로, 각 프로세스의 리소스 사용량을 모니터링하고 적절히 조정해야 합니다.

결론

FastAPI와 BigQuery를 함께 사용할 때 비동기 처리, 배치 요청, 멀티 스레딩 또는 멀티 프로세싱을 통해 성능을 개선할 수 있습니다. 애플리케이션의 특성과 요구 사항을 고려하여 가장 적합한 방법을 선택하고, 적절한 리소스 관리 및 에러 핸들링을 통해 안정성과 성능을 동시에 확보해야 합니다. 성능 테스트와 모니터링을 통해 개선 사항의 효과를 지속적으로 검증하며, 필요에 따라 접근 방식을 조정해야 합니다.

728x90

댓글