본문 바로가기
프로그램 (PHP,Python)

Google ADK 기반 LLM Multi-Tool Agent 구축, FastAPI + A2A 통합 전략

by 날으는물고기 2025. 7. 17.

Google ADK 기반 LLM Multi-Tool Agent 구축, FastAPI + A2A 통합 전략

728x90

Multi-Tool Agent는 Google의 ADK (Agent Development Kit)를 활용한 LLM Agent 프로젝트로, 날씨와 시간 정보를 제공하는 간단한 예제로 시작하여 A2A, Slack/Webhook 연동, FastAPI 서비스화, 그리고 n8n, LangGraph, Guardrails, Wazuh와의 실시간 연계까지 확장 가능한 구조입니다.

🧰 1. 사전 준비 및 환경 설정

1.1 Python 환경 준비

▶ Python 3.11 이상 설치 확인

python --version

▶ uv 설치 (초고속 패키지 관리자)

pip install --upgrade pipx
pipx ensurepath
pipx install uv

# PowerShell/터미널 재시작 후
uv --version

▶ 가상환경 생성 및 활성화

uv venv .venv

# Windows CMD
.venv\Scripts\activate.bat

# Windows PowerShell
.venv\Scripts\Activate.ps1

# macOS/Linux
source .venv/bin/activate

1.2 Google ADK 및 필수 패키지 설치

# 기본 설치
uv pip install google-adk

# 추가 종속성 (Vertex AI 사용 시)
uv pip install google-auth google-cloud-aiplatform

📁 2. 프로젝트 구조 및 초기화

자동 생성 명령어

adk init multi_tool_agent
cd multi_tool_agent

생성된 프로젝트 구조

multi_tool_agent/
├── __init__.py
├── agent.py         # 핵심 Agent 로직
├── .env            # 환경 변수 파일
├── config.yaml     # 설정 파일 (선택사항)
└── requirements.txt

🔐 3. Google Cloud SDK 및 인증 설정

3.1 Cloud SDK 설치

macOS

brew install --cask google-cloud-sdk

Windows/Linux

공식 문서 참조

3.2 애플리케이션 기본 인증 (ADC) 설정

gcloud auth application-default login

3.3 인증 파일 복사

cp ~/.config/gcloud/application_default_credentials.json ./

📄 4. 환경 변수 설정 (.env)

GOOGLE_GENAI_USE_VERTEXAI=TRUE
GOOGLE_CLOUD_PROJECT=your-gcp-project-id
GOOGLE_CLOUD_LOCATION=us-central1
GOOGLE_APPLICATION_CREDENTIALS=./application_default_credentials.json
300x250

💻 5. 기본 Agent 구현 (agent.py)

날씨 정보 제공 함수

def get_weather(city: str) -> dict:
    """
    도시별 날씨 정보를 반환하는 함수
    실제 구현에서는 외부 API를 호출할 수 있음
    """
    weather_data = {
        "new york": {"weather": "sunny", "temperature": "25°C", "humidity": "60%"},
        "london": {"weather": "cloudy", "temperature": "18°C", "humidity": "75%"},
        "tokyo": {"weather": "rainy", "temperature": "22°C", "humidity": "85%"},
        "seoul": {"weather": "clear", "temperature": "20°C", "humidity": "55%"}
    }

    city_lower = city.lower()
    if city_lower in weather_data:
        return {
            "city": city,
            "data": weather_data[city_lower],
            "timestamp": datetime.now().isoformat()
        }
    else:
        return {
            "error": f"Weather data for {city} is not available.",
            "available_cities": list(weather_data.keys())
        }

시간 정보 제공 함수

from datetime import datetime
import pytz

def get_current_time(city: str) -> dict:
    """
    도시별 현재 시간을 반환하는 함수
    """
    timezone_map = {
        "new york": "America/New_York",
        "london": "Europe/London",
        "tokyo": "Asia/Tokyo",
        "seoul": "Asia/Seoul"
    }

    city_lower = city.lower()
    if city_lower in timezone_map:
        tz = pytz.timezone(timezone_map[city_lower])
        current_time = datetime.now(tz)
        return {
            "city": city,
            "time": current_time.strftime("%Y-%m-%d %H:%M:%S"),
            "timezone": timezone_map[city_lower]
        }
    else:
        return {
            "error": f"Time data for {city} is not available.",
            "available_cities": list(timezone_map.keys())
        }

Agent 정의

from google.adk import Agent

agent = Agent(
    model="gemini-2.0-flash-001",
    tools=[get_weather, get_current_time],
)

🚀 6. 실행 방법

웹 인터페이스 실행

adk web
# 브라우저에서 http://localhost:8080 접속

테스트 질의 예시

  • "What's the weather in Seoul?"
  • "What time is it in New York?"
  • "Tell me the weather and time in Tokyo"

🧠 7. A2A (Agent-to-Agent) 확장

프로젝트 구조

multi_tool_agent/
├── agents/
│   ├── __init__.py
│   ├── weather_agent.py
│   ├── time_agent.py
│   └── analysis_agent.py
└── root_agent.py

weather_agent.py

from google.adk import Agent
import httpx

async def get_detailed_weather(city: str) -> dict:
    """고급 날씨 정보 (API 연동 예시)"""
    # 실제 날씨 API 호출 시뮬레이션
    weather_api_data = {
        "seoul": {
            "temperature": 20,
            "feels_like": 18,
            "humidity": 55,
            "wind_speed": 5.2,
            "description": "맑음",
            "air_quality": {"pm2.5": 35, "pm10": 45, "aqi": "보통"}
        }
    }

    if city.lower() in weather_api_data:
        return {
            "status": "success",
            "city": city,
            "weather": weather_api_data[city.lower()]
        }
    return {"status": "error", "message": f"No data for {city}"}

weather_agent = Agent(
    name="WeatherSpecialist",
    model="gemini-2.0-pro",
    tools=[get_detailed_weather],
    description="날씨 정보 전문 에이전트"
)

analysis_agent.py

from google.adk import Agent

def analyze_weather_impact(weather_data: dict) -> dict:
    """날씨 데이터를 분석하여 활동 추천"""
    temp = weather_data.get("temperature", 20)
    humidity = weather_data.get("humidity", 50)

    recommendations = []

    if temp > 25:
        recommendations.append("더운 날씨입니다. 충분한 수분 섭취를 하세요.")
    elif temp < 10:
        recommendations.append("추운 날씨입니다. 따뜻하게 입으세요.")

    if humidity > 70:
        recommendations.append("습도가 높습니다. 실내 활동을 권장합니다.")

    return {
        "analysis": "날씨 분석 완료",
        "recommendations": recommendations,
        "outdoor_activity_score": calculate_activity_score(temp, humidity)
    }

def calculate_activity_score(temp: int, humidity: int) -> int:
    """야외 활동 적합도 점수 계산 (0-100)"""
    score = 100

    # 온도 기준
    if temp < 0 or temp > 35:
        score -= 50
    elif temp < 10 or temp > 30:
        score -= 20

    # 습도 기준
    if humidity > 80:
        score -= 30
    elif humidity > 70:
        score -= 15

    return max(0, score)

analysis_agent = Agent(
    name="WeatherAnalyzer",
    model="gemini-2.0-flash-001",
    tools=[analyze_weather_impact],
    description="날씨 영향 분석 에이전트"
)

root_agent.py

from google.adk import Agent
from agents.weather_agent import weather_agent
from agents.time_agent import time_agent
from agents.analysis_agent import analysis_agent

root_agent = Agent(
    name="MasterCoordinator",
    model="gemini-2.0-pro",
    agents=[weather_agent, time_agent, analysis_agent],
    system_message="""
    당신은 여러 전문 에이전트를 조율하는 마스터 에이전트입니다.
    사용자의 요청에 따라 적절한 하위 에이전트를 선택하여 작업을 위임하고,
    결과를 종합하여 사용자에게 전달합니다.
    """
)

📩 8. Slack / Webhook 연동

8.1 Slack App 설정

  1. https://api.slack.com/apps에서 앱 생성
  2. Incoming Webhook 활성화
  3. Webhook URL 복사

8.2 FastAPI 서버 구현

from fastapi import FastAPI, Request
import httpx
from multi_tool_agent.root_agent import root_agent

app = FastAPI()

SLACK_WEBHOOK_URL = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"

@app.post("/slack")
async def slack_webhook(request: Request):
    """Slack 명령어 처리"""
    data = await request.json()

    # Slack 슬래시 명령어 파싱
    text = data.get("text", "")
    user_name = data.get("user_name", "Unknown")

    # 날씨 명령어 처리
    if text.startswith("weather"):
        city = text.replace("weather", "").strip()
        query = f"What's the weather in {city}?"
    else:
        query = text

    # Agent 실행
    response = await root_agent.run_async(query)

    # Slack 포맷팅
    slack_message = {
        "text": f"@{user_name}님의 요청 결과:",
        "attachments": [{
            "color": "good",
            "fields": [{
                "title": "질의",
                "value": query,
                "short": False
            }, {
                "title": "응답",
                "value": str(response),
                "short": False
            }]
        }]
    }

    # Slack으로 응답 전송
    async with httpx.AsyncClient() as client:
        await client.post(SLACK_WEBHOOK_URL, json=slack_message)

    return {"response": "처리 완료"}

🌐 9. FastAPI 기반 서비스화

API 서버 구현 (api/main.py)

from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import logging

from multi_tool_agent.root_agent import root_agent

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="Multi Tool Agent API", version="1.0.0")

# CORS 설정
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# 요청/응답 모델
class AgentRequest(BaseModel):
    query: str
    context: Optional[dict] = None

class AgentResponse(BaseModel):
    response: str
    metadata: Optional[dict] = None

@app.post("/agent", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
    """Agent 실행 엔드포인트"""
    try:
        logger.info(f"Query received: {request.query}")

        # Agent 실행
        response = await root_agent.run_async(request.query)

        # 응답 형식화
        return AgentResponse(
            response=str(response),
            metadata={
                "query": request.query,
                "timestamp": datetime.now().isoformat()
            }
        )
    except Exception as e:
        logger.error(f"Error processing request: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """헬스 체크 엔드포인트"""
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

실행 스크립트 (run_server.sh)

#!/bin/bash
uvicorn api.main:app --host 0.0.0.0 --port 8080 --reload

🌐 10. 확장 연계 아키텍처

10.1 n8n 연동 (자동화 워크플로우)

# n8n webhook 전송 함수
async def send_to_n8n(payload: dict):
    """n8n으로 결과 전송하여 후처리 자동화"""
    N8N_WEBHOOK_URL = "https://n8n.yourdomain.com/webhook/weather-alert"

    # 날씨 임계값 체크
    if "weather" in payload:
        weather_data = payload["weather"]
        temp = weather_data.get("temperature", 0)

        # 극한 날씨 알림
        if temp > 35 or temp < 0:
            alert_payload = {
                "type": "extreme_weather",
                "city": payload.get("city"),
                "temperature": temp,
                "alert_level": "high",
                "timestamp": datetime.now().isoformat()
            }

            async with httpx.AsyncClient() as client:
                await client.post(N8N_WEBHOOK_URL, json=alert_payload)

n8n 워크플로우 예시

  1. Webhook 노드: 날씨 알림 수신
  2. IF 노드: 온도별 분기 처리
  3. Slack 노드: 극한 날씨 알림
  4. Email 노드: 관리자 이메일 발송
  5. Google Sheets 노드: 날씨 로그 기록

🧠 10.2 LangGraph 통합 (복잡한 에이전트 흐름)

from langgraph.graph import StateGraph, END
from typing import TypedDict, List

class WeatherState(TypedDict):
    city: str
    weather_data: dict
    analysis: dict
    recommendations: List[str]

# 그래프 정의
workflow = StateGraph(WeatherState)

# 노드 정의
async def fetch_weather(state: WeatherState):
    """날씨 데이터 가져오기"""
    result = await weather_agent.run(f"Get weather for {state['city']}")
    state["weather_data"] = result
    return state

async def analyze_weather(state: WeatherState):
    """날씨 분석"""
    result = await analysis_agent.run(state["weather_data"])
    state["analysis"] = result
    return state

async def generate_recommendations(state: WeatherState):
    """추천사항 생성"""
    # 날씨 조건에 따른 맞춤 추천
    recommendations = []
    temp = state["weather_data"].get("temperature", 20)

    if temp > 30:
        recommendations.extend([
            "야외 활동은 이른 아침이나 저녁에 하세요",
            "자외선 차단제를 꼭 바르세요",
            "수분 섭취를 자주 하세요"
        ])

    state["recommendations"] = recommendations
    return state

# 워크플로우 구성
workflow.add_node("fetch", fetch_weather)
workflow.add_node("analyze", analyze_weather)
workflow.add_node("recommend", generate_recommendations)

workflow.set_entry_point("fetch")
workflow.add_edge("fetch", "analyze")
workflow.add_edge("analyze", "recommend")
workflow.add_edge("recommend", END)

# 컴파일 및 실행
app = workflow.compile()
result = await app.ainvoke({"city": "Seoul"})

🛡 10.3 Guardrails 통합 (응답 검증)

from guardrails import Guard
from guardrails.hub import ValidLength, DetectPII, JSONSchema
from pydantic import BaseModel, Field

# 날씨 응답 스키마 정의
class WeatherResponse(BaseModel):
    city: str = Field(..., description="도시명")
    temperature: float = Field(..., ge=-50, le=50, description="온도 (섭씨)")
    humidity: int = Field(..., ge=0, le=100, description="습도 (%)")
    weather_condition: str = Field(..., description="날씨 상태")

# Guard 설정
weather_guard = Guard.from_pydantic(
    output_class=WeatherResponse,
    prompt="날씨 정보를 다음 형식으로 반환하세요:"
)

# 민감정보 필터링 Guard
privacy_guard = Guard()
privacy_guard.use(DetectPII, on_fail="filter")

# 통합 사용 예시
async def get_validated_weather(city: str):
    """검증된 날씨 정보 반환"""
    # Agent 실행
    raw_response = await weather_agent.run(f"Get weather for {city}")

    # 형식 검증
    validated = weather_guard.parse(raw_response)

    # 민감정보 필터링
    filtered = privacy_guard.parse(str(validated))

    return filtered

🚨 10.4 Wazuh 보안 알림 통합

Wazuh 설정 (ossec.conf)

<integration>
  <name>custom-weather-analyzer</name>
  <hook_url>http://localhost:8080/security/weather-anomaly</hook_url>
  <level>10</level>
  <alert_format>json</alert_format>
  <options>
    <log_type>weather_security</log_type>
  </options>
</integration>

FastAPI 보안 엔드포인트

@app.post("/security/weather-anomaly")
async def handle_security_alert(request: Request):
    """Wazuh 보안 알림 처리"""
    alert_data = await request.json()

    # 날씨 관련 보안 이벤트 분석
    if "unusual_weather_pattern" in alert_data.get("full_log", ""):
        # AI 분석 요청
        analysis_query = f"""
        다음 보안 알림을 분석해주세요:
        - 알림 내용: {alert_data.get('full_log')}
        - 심각도: {alert_data.get('level')}
        - 시간: {alert_data.get('timestamp')}

        이상 날씨 패턴이 시스템에 미칠 영향을 평가해주세요.
        """

        result = await root_agent.run_async(analysis_query)

        # 중요도에 따라 다른 채널로 알림
        if "critical" in str(result).lower():
            await send_critical_alert(result)
        else:
            await send_to_n8n({"type": "weather_security", "analysis": result})

    return {"status": "processed"}

📊 11. 전체 아키텍처 통합 예시

통합 시나리오: 날씨 기반 스마트 알림 시스템

# 메인 통합 서비스
class WeatherIntegrationService:
    def __init__(self):
        self.weather_agent = weather_agent
        self.analysis_agent = analysis_agent
        self.langgraph_workflow = self._setup_workflow()
        self.guards = self._setup_guards()

    async def process_weather_request(self, city: str, user_context: dict):
        """통합 날씨 처리 파이프라인"""

        # 1. LangGraph로 워크플로우 실행
        workflow_result = await self.langgraph_workflow.ainvoke({
            "city": city,
            "user_preferences": user_context
        })

        # 2. Guardrails로 검증
        validated_result = self.guards["weather"].parse(workflow_result)

        # 3. 이상 패턴 감지
        if self._detect_anomaly(validated_result):
            # Wazuh 알림 트리거
            await self._send_security_alert(validated_result)

        # 4. n8n으로 후처리 전송
        await self._send_to_automation(validated_result)

        # 5. Slack 알림 (필요시)
        if validated_result.get("alert_required"):
            await self._send_slack_notification(validated_result)

        return validated_result

    def _detect_anomaly(self, weather_data: dict) -> bool:
        """날씨 이상 패턴 감지"""
        temp = weather_data.get("temperature", 20)

        # 극한 온도 체크
        if temp > 40 or temp < -20:
            return True

        # 급격한 변화 체크 (이전 데이터와 비교)
        # ... 추가 로직

        return False

✅ 12. 실전 배포 체크리스트

Docker 구성 (Dockerfile)

FROM python:3.11-slim

WORKDIR /app

# 시스템 패키지 설치
RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

# Python 패키지 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 애플리케이션 코드 복사
COPY . .

# 환경 변수 설정
ENV PYTHONUNBUFFERED=1

# 실행
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8080"]

docker-compose.yml

version: '3.8'

services:
  agent-api:
    build: .
    ports:
      - "8080:8080"
    env_file:
      - .env
    volumes:
      - ./logs:/app/logs
    restart: unless-stopped

  redis:
    image: redis:alpine
    ports:
      - "6379:6379"

  monitoring:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
    ports:
      - "9090:9090"

📊 13. 모니터링 및 로깅

로깅 설정

import logging
from logging.handlers import RotatingFileHandler
import json

# 구조화된 로깅 설정
class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            "timestamp": datetime.now().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }

        # 날씨 관련 추가 정보
        if hasattr(record, 'weather_data'):
            log_data['weather_data'] = record.weather_data

        return json.dumps(log_data)

# 로거 설정
logger = logging.getLogger(__name__)
handler = RotatingFileHandler(
    'logs/weather_agent.log',
    maxBytes=10485760,  # 10MB
    backupCount=5
)
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)

🎯 14. 실전 사용 예시

종합 시나리오: 날씨 기반 자동화 시스템

# 사용자 요청
user_query = "서울의 날씨를 알려주고 오늘 야외 활동하기 좋은지 분석해줘"

# 1. FastAPI 엔드포인트로 요청
response = httpx.post(
    "http://localhost:8080/agent",
    json={"query": user_query}
)

# 2. 응답 예시
{
    "response": {
        "city": "서울",
        "weather": {
            "temperature": 22,
            "humidity": 55,
            "condition": "맑음",
            "air_quality": "보통"
        },
        "analysis": {
            "outdoor_activity_score": 85,
            "recommendations": [
                "오늘은 야외 활동하기 좋은 날씨입니다.",
                "자외선 지수가 높으니 선크림을 바르세요.",
                "미세먼지가 '보통' 수준이므로 장시간 운동은 피하세요."
            ]
        }
    },
    "metadata": {
        "timestamp": "2024-01-15T14:30:00",
        "processing_time": 1.2
    }
}

# 3. 자동화 트리거
# - n8n: 날씨가 좋으면 운동 알림 전송
# - Slack: 팀 채널에 오늘의 날씨 브리핑
# - Wazuh: 이상 기후 패턴 모니터링

🚀 15. 성능 최적화 팁

캐싱 전략

from functools import lru_cache
import redis

# Redis 캐싱
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

async def get_weather_with_cache(city: str):
    """캐시를 활용한 날씨 정보 조회"""
    cache_key = f"weather:{city.lower()}"

    # 캐시 확인
    cached_data = redis_client.get(cache_key)
    if cached_data:
        return json.loads(cached_data)

    # 새로운 데이터 조회
    weather_data = await weather_agent.run(f"Get weather for {city}")

    # 캐시 저장 (1시간 TTL)
    redis_client.setex(
        cache_key,
        3600,
        json.dumps(weather_data)
    )

    return weather_data

📋 최종 점검 사항

구성 요소 체크 포인트 확인 명령어
Python 환경 가상환경 활성화 where python
Google ADK 설치 완료 `pip list
인증 ADC 설정 gcloud auth application-default print-access-token
API 서버 실행 상태 curl http://localhost:8080/health
Slack Webhook 연동 Slack 채널 메시지 확인
Docker 컨테이너 실행 docker ps
728x90
그리드형(광고전용)

댓글