본문 바로가기

AI 프롬프트 기반 서버 관리: 자연어로 시스템을 제어하는 새로운 패러다임

728x90

CLI에서 자연어로의 진화

“터미널 명령어의 미래는 말로 한다”

개발자들이 GitHub Copilot이나 Cursor를 통해 자연어로 코드를 작성하는 시대가 도래했듯이, 이제 서버 관리 영역에서도 복잡한 CLI 명령어 대신 자연어 프롬프트로 시스템을 제어하는 패러다임 전환이 일어나고 있습니다. "현재 CPU 사용률이 높은 프로세스를 보여줘"라고 말하면 ps aux | sort -k 3 -r | head -10 같은 명령이 자동으로 실행되는 미래가 현실이 되고 있는 것입니다. 리눅스 환경에서의 자연어 기반 서버 명령 수행 방식과 DevOps 자동화 적용 방안, 그리고 AI를 활용한 디바이스 관리 도구들의 미래입니다.

자연어 명령 처리 아키텍처의 이해

기본 동작 원리

자연어 명령 처리 시스템의 핵심은 LLM(Large Language Model)을 활용한 명령 변환입니다. 사용자가 입력한 자연어를 분석하여 적절한 쉘 명령어로 변환하는 과정은 다음과 같이 진행됩니다.

사용자 입력: "최근 1시간 동안의 시스템 로그 중 에러만 보여줘"
    ↓
LLM 분석 및 변환
    ↓
생성된 명령: journalctl --since "1 hour ago" --priority=err
    ↓
사용자 확인: [실행✅] [수정📝] [취소❌]
    ↓
명령 실행 및 결과 반환

멀티스텝 작업 처리 에이전트

더 발전된 형태로는 "계획 후 실행(plan-then-execute)" 패턴을 채택한 멀티스텝 에이전트가 있습니다. 예를 들어 Warp 터미널의 Agent Mode는 다음과 같이 작동합니다.

  1. 작업 분석: "웹 서버 성능 문제를 진단해줘"
  2. 계획 수립
    • CPU/메모리 사용률 확인
    • 네트워크 연결 상태 점검
    • 웹 서버 로그 분석
    • 디스크 I/O 확인
  3. 순차 실행: 각 단계별로 사용자 승인 후 실행
  4. 자체 수정: 오류 발생 시 대안 명령 자동 생성

환경 인지 및 최적화

현대적인 자연어 인터페이스는 시스템 환경을 자동으로 감지하여 최적화된 명령을 생성합니다.

  • OS 감지: Ubuntu에서는 apt update, CentOS에서는 yum update
  • 쉘 환경: Bash, Zsh, Fish 등에 맞는 문법 생성
  • 권한 수준: 필요시 자동으로 sudo 추가

오픈소스 도구 심화 분석

도구명 특징 활용 예
ShellGPT 자연어→명령어, 실행 전 확인 “json 파일 찾아줘” → find 명령 생성
LazyShell 멀티 모델 지원, 환경 감지 우분투면 apt, CentOS면 yum 자동 구분
Clio DevOps 자동화 중심, 로컬 실행 AWS/GCP 자원 통제, Helm 배포
Warp AI Agent Agent 기반 멀티스텝 실행 GitHub 리포 복제→패키지 설치→서버 실행
Amazon Q CLI AWS CLI 연동 “S3 버킷 만들어줘” → aws 명령 자동화

ShellGPT - Python 기반 선구자

  • OpenAI GPT-4, Anthropic Claude 등 다양한 LLM 지원
  • 쉘 통합으로 Ctrl+L 단축키로 즉시 호출 가능
  • 대화형 모드와 단일 명령 모드 지원
300x250

사용 예시

# 단일 명령 모드
sgpt -s "find all json files modified in last 24 hours"

# 대화형 모드
sgpt --chat devops "서버 배포 스크립트를 작성해줘"

LazyShell - 다중 AI 모델 지원

  • 로컬 LLM (Ollama, Mistral) 지원으로 프라이버시 보호
  • Refine 기능으로 명령 점진적 개선
  • 시스템 정보 자동 감지 및 최적화

 

설정 예시

// .lazyshellrc
{
  "defaultModel": "claude-3-opus",
  "localModels": {
    "enabled": true,
    "ollama": {
      "model": "llama2:13b"
    }
  },
  "safety": {
    "alwaysConfirm": true,
    "dangerousCommands": ["rm -rf", "dd", "mkfs"]
  }
}

Clio - DevOps 특화 AI 비서

  • AWS/GCP/Azure 클라우드 자원 관리
  • Kubernetes 클러스터 운영
  • Docker 컨테이너 제어
  • GitHub 이슈/PR 자동화

 

실제 활용 사례

# 클라우드 비용 최적화
clio "비용이 높은 미사용 EC2 인스턴스를 찾아서 정지해줘"

# Kubernetes 문제 해결
clio "크래시루프 상태인 파드들을 재시작하고 로그를 보여줘"

DevOps 파이프라인 자동화 심화

CI/CD 파이프라인 통합

자연어 인터페이스를 CI/CD에 통합하면 다음과 같은 워크플로우가 가능합니다.

# .github/workflows/ai-deploy.yml
name: AI-Assisted Deployment
on:
  workflow_dispatch:
    inputs:
      prompt:
        description: 'Deployment instruction in natural language'
        required: true

jobs:
  deploy:
    runs-on: ubuntu-latest
    steps:
      - name: AI Command Generation
        run: |
          clio "${{ github.event.inputs.prompt }}" \
            --context "production deployment" \
            --safety-level "high" \
            --require-approval

ChatOps 통합

Slack이나 Teams와의 통합으로 팀 협업 강화

# Slack 봇 예시
@app.message("deploy")
def handle_deploy(message, say):
    prompt = message['text']

    # AI가 배포 명령 생성
    commands = generate_deployment_commands(prompt)

    # 승인 요청
    say(f"배포 명령을 생성했습니다:\n```{commands}```\n승인하시겠습니까?")

    # 승인 후 실행
    if approved:
        execute_deployment(commands)

인프라 코드(IaC) 자동 생성

자연어로 인프라를 정의하고 Terraform 코드로 변환

# 입력
"AWS에 3개의 t3.medium EC2 인스턴스로 구성된 웹 서버 클러스터를 생성하고, 
Application Load Balancer로 트래픽을 분산해줘"

# AI가 생성한 Terraform 코드
resource "aws_instance" "web_server" {
  count         = 3
  instance_type = "t3.medium"
  ami           = data.aws_ami.ubuntu.id

  tags = {
    Name = "web-server-${count.index + 1}"
  }
}

resource "aws_lb" "web_lb" {
  name               = "web-server-lb"
  load_balancer_type = "application"
  # ... 추가 설정
}

엔터프라이즈급 보안 아키텍처

다층 보안 모델

┌─────────────────────────────────────┐
│         사용자 입력 (자연어)           │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│      입력 검증 및 정제 레이어          │
│   - 프롬프트 인젝션 방지              │
│   - 민감 정보 필터링                  │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│        LLM 처리 레이어                │
│   - 격리된 환경에서 실행              │
│   - 권한 제한된 컨텍스트              │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│      명령 검증 레이어                 │
│   - 화이트리스트/블랙리스트           │
│   - 정적 분석                        │
│   - 위험도 평가                      │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│      실행 레이어                      │
│   - 샌드박스 환경                    │
│   - 트랜잭션 실행                    │
│   - 실시간 모니터링                  │
└─────────────────────────────────────┘

권한 관리 통합

class AICommandExecutor:
    def __init__(self, user_context):
        self.user = user_context
        self.rbac = RBACManager()

    def execute_command(self, natural_language_prompt):
        # 1. 사용자 권한 확인
        permissions = self.rbac.get_user_permissions(self.user)

        # 2. LLM에 권한 컨텍스트 전달
        command = self.generate_command(
            prompt=natural_language_prompt,
            allowed_operations=permissions
        )

        # 3. 생성된 명령의 권한 요구사항 검증
        required_perms = self.analyze_command_permissions(command)

        if not self.rbac.has_permissions(self.user, required_perms):
            raise PermissionDeniedError(
                f"명령 실행에 필요한 권한이 없습니다: {required_perms}"
            )

        # 4. 감사 로그 기록
        self.audit_log.record(
            user=self.user,
            prompt=natural_language_prompt,
            command=command,
            timestamp=datetime.now()
        )

        return self.sandbox_execute(command)

프롬프트 인젝션 방어

class PromptSanitizer:
    def __init__(self):
        self.dangerous_patterns = [
            r"ignore previous instructions",
            r"system prompt",
            r"reveal your instructions",
            # ... 추가 패턴
        ]

    def sanitize(self, user_input):
        # 1. 위험 패턴 검사
        for pattern in self.dangerous_patterns:
            if re.search(pattern, user_input, re.IGNORECASE):
                raise SecurityException("잠재적 프롬프트 인젝션 감지")

        # 2. 입력 길이 제한
        if len(user_input) > MAX_PROMPT_LENGTH:
            user_input = user_input[:MAX_PROMPT_LENGTH]

        # 3. 특수 문자 이스케이프
        user_input = self.escape_special_chars(user_input)

        return user_input

시스템 안정성 및 추적성 구현

트랜잭션 기반 실행 모델

class TransactionalCommandExecutor:
    def __init__(self):
        self.overlay_fs = OverlayFileSystem()
        self.snapshot_manager = SnapshotManager()

    def execute_with_rollback(self, command):
        # 1. 현재 상태 스냅샷
        snapshot_id = self.snapshot_manager.create_snapshot()

        try:
            # 2. 오버레이 파일시스템에서 실행
            result = self.overlay_fs.execute(command)

            # 3. 변경사항 미리보기
            changes = self.overlay_fs.get_changes()

            if self.user_approves(changes):
                # 4. 변경사항 커밋
                self.overlay_fs.commit()
                return result
            else:
                # 5. 변경사항 폐기
                self.overlay_fs.discard()
                return "작업이 취소되었습니다"

        except Exception as e:
            # 6. 오류 시 자동 롤백
            self.snapshot_manager.restore(snapshot_id)
            raise

상세 감사 로깅

{
  "audit_entry": {
    "id": "cmd-2025-01-10-001",
    "timestamp": "2025-01-10T14:30:00Z",
    "user": {
      "id": "user123",
      "name": "김개발",
      "role": "DevOps Engineer"
    },
    "natural_language_input": "프로덕션 서버의 nginx 로그 확인해줘",
    "generated_command": "sudo tail -f /var/log/nginx/access.log",
    "execution": {
      "status": "completed",
      "duration_ms": 1250,
      "exit_code": 0
    },
    "system_changes": {
      "files_read": ["/var/log/nginx/access.log"],
      "files_modified": [],
      "network_connections": []
    },
    "security_checks": {
      "permission_check": "passed",
      "command_validation": "passed",
      "risk_score": 0.2
    }
  }
}

AI 기반 리눅스 디바이스 관리 통합

솔루션 지원 OS 특징 비고
Microsoft Intune Ubuntu, RHEL 정책 적용, MFA 연동 Azure 계정 기반
Hexnode MDM Fedora, Debian 실시간 터미널, 원격 제어 기업 중심
baramundi 모든 리눅스 업데이트 자동화, 통합 관리 2025 R1 기준
Esper IoT/키오스크 OTA 업데이트, 역할 제어 UI 기반 디바이스 최적

차세대 MDM 솔루션 아키텍처

2025년 리눅스 디바이스 관리는 AI와의 통합으로 새로운 차원으로 진화하고 있습니다.

class AIEnhancedMDM:
    def __init__(self):
        self.device_fleet = DeviceFleetManager()
        self.ai_assistant = CopilotForDevOps()

    def manage_by_intent(self, admin_intent):
        """
        관리자의 의도를 해석하여 디바이스 관리 작업 수행
        """
        # 예: "모든 개발자 워크스테이션을 최신 보안 패치로 업데이트해줘"

        # 1. 의도 분석
        action_plan = self.ai_assistant.analyze_intent(admin_intent)

        # 2. 대상 디바이스 식별
        target_devices = self.identify_devices(action_plan.device_criteria)

        # 3. 작업 계획 생성
        deployment_plan = self.create_deployment_plan(
            devices=target_devices,
            actions=action_plan.actions
        )

        # 4. 단계적 실행
        return self.execute_with_rollout(deployment_plan)

주요 MDM 솔루션의 AI 통합 현황

Microsoft Intune + Copilot

  • 자연어로 컴플라이언스 정책 생성
  • 이상 징후 자동 감지 및 대응
  • 예측적 유지보수 권장사항

Hexnode AI Assistant

  • 실시간 디바이스 상태 쿼리: "CPU 사용률이 80% 이상인 서버 목록"
  • 자동화된 문제 해결 스크립트 생성
  • 채팅 기반 원격 지원

Baramundi 2025 R1

  • AI 기반 패치 우선순위 결정
  • 자연어 기반 정책 설정
  • 예측적 장애 분석

실제 활용 시나리오

# 시나리오 1: 보안 취약점 대응
관리자: "최근 발표된 OpenSSL 취약점에 영향받는 모든 시스템을 찾아서 패치해줘"

AI 응답:
1. CVE-2025-XXXX 분석 중...
2. 영향받는 시스템: 42대 발견
   - 웹 서버: 15대
   - 데이터베이스 서버: 8대
   - 개발 워크스테이션: 19대
3. 패치 계획:
   - Stage 1: 개발 환경 (19대) - 즉시 시작
   - Stage 2: 스테이징 서버 (5대) - 1시간 후
   - Stage 3: 프로덕션 (18대) - 승인 대기
4. 예상 다운타임: 서비스별 30초 미만

[실행] [수정] [취소]

# 시나리오 2: 성능 최적화
관리자: "최근 일주일간 성능이 저하된 서버들을 분석하고 최적화 방안을 제시해줘"

AI 분석 결과:
- 5대 서버에서 비정상적인 I/O 대기 시간 감지
- 원인: 로그 로테이션 미설정으로 인한 디스크 공간 부족
- 권장 조치:
  1. 즉시: 오래된 로그 파일 압축 및 아카이빙
  2. 설정: logrotate 구성 업데이트
  3. 모니터링: 디스크 사용률 알람 설정

자동 실행 스크립트를 생성하시겠습니까? [예] [아니오]

미래 전망: OS 레벨 통합

AI-Native 운영체제의 등장

미래의 리눅스 배포판은 AI를 커널 레벨부터 통합할 가능성이 높습니다.

# 가상의 AI-Native Linux 사용 예시
$ ai "시스템 성능을 게임에 최적화해줘"
[AI OS] 다음 작업을 수행합니다:
- CPU 거버너를 'performance' 모드로 변경
- GPU 클럭 속도 최대화
- 불필요한 백그라운드 서비스 일시 중지
- 메모리 스왑 최소화
- 네트워크 우선순위 조정

확인하시겠습니까? [Y/n]

$ ai "내일 오전 9시 프레젠테이션을 위해 시스템을 준비해줘"
[AI OS] 일정을 확인하고 다음을 준비하겠습니다:
- 오전 8:45 시스템 업데이트 및 재부팅
- 프레젠테이션 앱 사전 로드
- 외부 디스플레이 설정 자동 구성
- 배터리 최적화 모드 활성화
- 방해 금지 모드 설정

예상되는 기술 스택

┌─────────────────────────────────────┐
│      사용자 인터페이스 레이어          │
│  - 음성 명령                         │
│  - 자연어 텍스트                     │
│  - 제스처 인식                       │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│        AI 해석 레이어                 │
│  - 로컬 LLM (경량화된 모델)           │
│  - 의도 분석 엔진                    │
│  - 컨텍스트 관리자                   │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│      시스템 추상화 레이어              │
│  - 통합 API                          │
│  - 권한 관리                         │
│  - 안전 실행 환경                    │
└────────────┬────────────────────────┘
             │
┌────────────▼────────────────────────┐
│         커널 레이어                   │
│  - AI 가속 지원                      │
│  - 실시간 최적화                     │
│  - 보안 샌드박스                     │
└─────────────────────────────────────┘

도입 가이드라인

단계별 도입 전략

Phase 1: 파일럿 (1-3개월)

  • 개발/테스트 환경에서 시작
  • 단순 조회 명령 위주로 제한
  • 팀 내 피드백 수집

Phase 2: 확장 (3-6개월)

  • 스테이징 환경으로 확대
  • 자동화 스크립트 생성 기능 추가
  • 보안 정책 수립

Phase 3: 프로덕션 (6-12개월)

  • 엄격한 승인 프로세스 하에 프로덕션 적용
  • 24/7 모니터링 체계 구축
  • 지속적인 개선

조직 준비사항

  1. 기술적 준비
    • AI 모델 선택 (클라우드 vs 온프레미스)
    • 보안 아키텍처 설계
    • 백업 및 롤백 체계
  2. 프로세스 정비
    • 승인 워크플로우 정의
    • 사고 대응 절차
    • 감사 및 컴플라이언스
  3. 인적 자원
    • AI 도구 사용 교육
    • 보안 인식 제고
    • 챔피언 육성

실전 구현 예제

완전한 AI 서버 관리 시스템 구축

#!/usr/bin/env python3
"""
AI-Powered Server Management System
자연어 기반 서버 명령 실행 및 관리 시스템
"""

import os
import subprocess
import json
import logging
from datetime import datetime
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import openai
from abc import ABC, abstractmethod

# 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CommandRisk(Enum):
    """명령어 위험도 레벨"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class CommandContext:
    """명령 실행 컨텍스트"""
    user: str
    timestamp: datetime
    natural_language: str
    generated_command: str
    risk_level: CommandRisk
    requires_approval: bool
    
@dataclass
class ExecutionResult:
    """명령 실행 결과"""
    success: bool
    output: str
    error: Optional[str]
    changes: List[str]
    rollback_available: bool

class SecurityPolicy:
    """보안 정책 관리"""
    
    def __init__(self):
        self.blacklist_patterns = [
            r"rm\s+-rf\s+/",
            r"mkfs",
            r"dd\s+if=.*of=/dev/",
            r":(){ :|:& };:",  # Fork bomb
        ]
        
        self.whitelist_commands = [
            "ls", "cat", "grep", "find", "ps", "top", "df", 
            "du", "netstat", "ss", "journalctl", "systemctl status"
        ]
        
        self.high_risk_commands = [
            "rm", "chmod", "chown", "systemctl restart", 
            "reboot", "shutdown", "iptables"
        ]
    
    def evaluate_risk(self, command: str) -> CommandRisk:
        """명령어 위험도 평가"""
        import re
        
        # 블랙리스트 검사
        for pattern in self.blacklist_patterns:
            if re.search(pattern, command):
                return CommandRisk.CRITICAL
        
        # 고위험 명령어 검사
        for risky in self.high_risk_commands:
            if risky in command:
                return CommandRisk.HIGH
        
        # 화이트리스트 검사
        base_command = command.split()[0] if command else ""
        if base_command in self.whitelist_commands:
            return CommandRisk.LOW
            
        return CommandRisk.MEDIUM
    
    def is_allowed(self, command: str, user_role: str) -> bool:
        """명령 실행 허용 여부"""
        risk = self.evaluate_risk(command)
        
        if risk == CommandRisk.CRITICAL:
            return False
            
        if risk == CommandRisk.HIGH and user_role != "admin":
            return False
            
        return True

class LLMInterface(ABC):
    """LLM 인터페이스 추상 클래스"""
    
    @abstractmethod
    def generate_command(self, prompt: str, context: Dict) -> str:
        pass

class OpenAIInterface(LLMInterface):
    """OpenAI GPT 인터페이스"""
    
    def __init__(self, api_key: str):
        openai.api_key = api_key
        self.model = "gpt-4"
    
    def generate_command(self, prompt: str, context: Dict) -> str:
        """자연어를 리눅스 명령어로 변환"""
        
        system_prompt = """
        You are a Linux system administrator assistant. 
        Convert natural language requests to appropriate Linux commands.
        Consider the system context and security best practices.
        Return only the command without explanation.
        
        System Context:
        - OS: {os}
        - Shell: {shell}
        - User Role: {role}
        """.format(**context)
        
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=[
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.3,
                max_tokens=150
            )
            
            return response.choices[0].message.content.strip()
            
        except Exception as e:
            logger.error(f"LLM 오류: {e}")
            raise

class LocalLLMInterface(LLMInterface):
    """로컬 LLM 인터페이스 (Ollama 등)"""
    
    def __init__(self, model_name: str = "llama2"):
        self.model = model_name
    
    def generate_command(self, prompt: str, context: Dict) -> str:
        """Ollama를 사용한 명령 생성"""
        # 실제 구현에서는 Ollama API 호출
        # 여기서는 간단한 예시만 제공
        command_map = {
            "메모리": "free -h",
            "프로세스": "ps aux",
            "디스크": "df -h",
            "네트워크": "ss -tuln"
        }
        
        for key, cmd in command_map.items():
            if key in prompt:
                return cmd
                
        return "echo '명령을 찾을 수 없습니다'"

class TransactionalExecutor:
    """트랜잭션 기반 명령 실행기"""
    
    def __init__(self):
        self.snapshots = {}
        self.change_log = []
    
    def create_snapshot(self, command: str) -> str:
        """실행 전 스냅샷 생성"""
        snapshot_id = f"snap_{datetime.now().timestamp()}"
        
        # 실제로는 파일시스템 스냅샷이나 상태 저장
        # 여기서는 간단한 시뮬레이션
        self.snapshots[snapshot_id] = {
            "command": command,
            "timestamp": datetime.now(),
            "system_state": self._capture_state()
        }
        
        return snapshot_id
    
    def _capture_state(self) -> Dict:
        """현재 시스템 상태 캡처"""
        return {
            "processes": subprocess.getoutput("ps aux | wc -l"),
            "disk_usage": subprocess.getoutput("df -h"),
            "memory": subprocess.getoutput("free -h")
        }
    
    def execute_with_rollback(self, command: str) -> ExecutionResult:
        """롤백 가능한 명령 실행"""
        snapshot_id = self.create_snapshot(command)
        
        try:
            # 명령 실행
            result = subprocess.run(
                command, 
                shell=True, 
                capture_output=True, 
                text=True,
                timeout=30
            )
            
            # 변경사항 감지
            changes = self._detect_changes(snapshot_id)
            
            return ExecutionResult(
                success=result.returncode == 0,
                output=result.stdout,
                error=result.stderr if result.returncode != 0 else None,
                changes=changes,
                rollback_available=True
            )
            
        except subprocess.TimeoutExpired:
            return ExecutionResult(
                success=False,
                output="",
                error="명령 실행 시간 초과",
                changes=[],
                rollback_available=True
            )
        except Exception as e:
            return ExecutionResult(
                success=False,
                output="",
                error=str(e),
                changes=[],
                rollback_available=True
            )
    
    def _detect_changes(self, snapshot_id: str) -> List[str]:
        """시스템 변경사항 감지"""
        # 실제로는 더 정교한 변경 감지 로직 필요
        return ["파일 시스템 변경 감지됨"]
    
    def rollback(self, snapshot_id: str) -> bool:
        """스냅샷으로 롤백"""
        if snapshot_id not in self.snapshots:
            return False
            
        # 실제 롤백 로직 구현
        logger.info(f"스냅샷 {snapshot_id}로 롤백 중...")
        return True

class AuditLogger:
    """감사 로그 관리"""
    
    def __init__(self, log_file: str = "/var/log/ai_commands.log"):
        self.log_file = log_file
    
    def log_command(self, context: CommandContext, result: ExecutionResult):
        """명령 실행 기록"""
        log_entry = {
            "timestamp": context.timestamp.isoformat(),
            "user": context.user,
            "natural_language": context.natural_language,
            "generated_command": context.generated_command,
            "risk_level": context.risk_level.value,
            "execution": {
                "success": result.success,
                "output_preview": result.output[:200] if result.output else "",
                "error": result.error,
                "changes": result.changes
            }
        }
        
        # 파일에 기록 (실제로는 더 안전한 로깅 메커니즘 사용)
        with open(self.log_file, 'a') as f:
            f.write(json.dumps(log_entry) + '\n')
        
        # 중요 이벤트는 syslog에도 기록
        if context.risk_level in [CommandRisk.HIGH, CommandRisk.CRITICAL]:
            logger.warning(f"고위험 명령 실행: {context.generated_command}")

class AIServerManager:
    """AI 기반 서버 관리 시스템 메인 클래스"""
    
    def __init__(self, llm_interface: LLMInterface):
        self.llm = llm_interface
        self.security = SecurityPolicy()
        self.executor = TransactionalExecutor()
        self.audit_logger = AuditLogger()
        self.system_context = self._get_system_context()
    
    def _get_system_context(self) -> Dict:
        """시스템 컨텍스트 수집"""
        return {
            "os": subprocess.getoutput("uname -s"),
            "shell": os.environ.get("SHELL", "/bin/bash"),
            "role": "admin",  # 실제로는 사용자 역할 확인
            "hostname": subprocess.getoutput("hostname")
        }
    
    def process_request(self, user: str, natural_language: str) -> Dict:
        """자연어 요청 처리"""
        
        # 1. 명령 생성
        try:
            generated_command = self.llm.generate_command(
                natural_language, 
                self.system_context
            )
        except Exception as e:
            return {
                "error": f"명령 생성 실패: {str(e)}",
                "success": False
            }
        
        # 2. 보안 검사
        risk_level = self.security.evaluate_risk(generated_command)
        
        if not self.security.is_allowed(generated_command, self.system_context["role"]):
            return {
                "error": "보안 정책에 의해 명령이 차단되었습니다",
                "command": generated_command,
                "risk_level": risk_level.value,
                "success": False
            }
        
        # 3. 컨텍스트 생성
        context = CommandContext(
            user=user,
            timestamp=datetime.now(),
            natural_language=natural_language,
            generated_command=generated_command,
            risk_level=risk_level,
            requires_approval=risk_level in [CommandRisk.HIGH, CommandRisk.CRITICAL]
        )
        
        # 4. 사용자 승인 (고위험 명령)
        if context.requires_approval:
            return {
                "command": generated_command,
                "risk_level": risk_level.value,
                "requires_approval": True,
                "message": "이 명령은 승인이 필요합니다",
                "success": False
            }
        
        # 5. 명령 실행
        result = self.executor.execute_with_rollback(generated_command)
        
        # 6. 감사 로그
        self.audit_logger.log_command(context, result)
        
        return {
            "command": generated_command,
            "output": result.output,
            "error": result.error,
            "changes": result.changes,
            "risk_level": risk_level.value,
            "success": result.success,
            "rollback_available": result.rollback_available
        }
    
    def execute_approved_command(self, command: str, approval_token: str) -> Dict:
        """승인된 명령 실행"""
        # 실제로는 승인 토큰 검증
        
        result = self.executor.execute_with_rollback(command)
        
        return {
            "command": command,
            "output": result.output,
            "error": result.error,
            "success": result.success
        }

# 사용 예시
if __name__ == "__main__":
    # LLM 인터페이스 선택 (로컬 또는 클라우드)
    # llm = OpenAIInterface(api_key="your-api-key")
    llm = LocalLLMInterface()
    
    # AI 서버 관리자 초기화
    manager = AIServerManager(llm)
    
    # 테스트 요청들
    test_requests = [
        ("user1", "현재 메모리 사용량을 보여줘"),
        ("user1", "CPU 사용률이 높은 프로세스 5개를 보여줘"),
        ("admin", "nginx 서비스를 재시작해줘"),
        ("user1", "루트 디렉토리를 모두 삭제해줘"),  # 차단되어야 함
    ]
    
    print("AI 기반 서버 관리 시스템 데모\n")
    print("="*50)
    
    for user, request in test_requests:
        print(f"\n사용자: {user}")
        print(f"요청: {request}")
        
        result = manager.process_request(user, request)
        
        if result["success"]:
            print(f"명령: {result['command']}")
            print(f"위험도: {result['risk_level']}")
            print(f"출력:\n{result['output'][:200]}...")
        else:
            print(f"실패: {result.get('error', '알 수 없는 오류')}")
            if result.get('requires_approval'):
                print("→ 관리자 승인이 필요합니다")
        
        print("-"*50)

리눅스 MDM 통합 AI 관리 시스템

#!/usr/bin/env python3
"""
AI-Enhanced Linux Device Management System
자연어 기반 리눅스 디바이스 관리 및 자동화 시스템
"""

import asyncio
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set, Tuple
from dataclasses import dataclass, field
from enum import Enum
import aiohttp
import yaml
from abc import ABC, abstractmethod

# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class DeviceStatus(Enum):
    """디바이스 상태"""
    ONLINE = "online"
    OFFLINE = "offline"
    MAINTENANCE = "maintenance"
    ERROR = "error"
    UPDATING = "updating"

class PolicyType(Enum):
    """정책 유형"""
    SECURITY = "security"
    COMPLIANCE = "compliance"
    PERFORMANCE = "performance"
    UPDATE = "update"

@dataclass
class Device:
    """관리 대상 디바이스"""
    id: str
    hostname: str
    ip_address: str
    os_version: str
    device_type: str  # server, workstation, iot
    status: DeviceStatus
    last_seen: datetime
    tags: List[str] = field(default_factory=list)
    metrics: Dict = field(default_factory=dict)

@dataclass
class Policy:
    """관리 정책"""
    id: str
    name: str
    type: PolicyType
    rules: Dict
    target_tags: List[str]
    created_by: str
    created_at: datetime
    enabled: bool = True

@dataclass
class TaskResult:
    """작업 실행 결과"""
    device_id: str
    task_id: str
    success: bool
    output: str
    error: Optional[str]
    duration: float
    timestamp: datetime

class AIAssistant:
    """AI 어시스턴트 인터페이스"""
    
    def __init__(self, model_endpoint: str):
        self.endpoint = model_endpoint
    
    async def interpret_intent(self, natural_language: str) -> Dict:
        """자연어 의도 해석"""
        # 실제로는 LLM API 호출
        # 여기서는 간단한 규칙 기반 해석
        
        intent_map = {
            "업데이트": {"action": "update", "scope": "all"},
            "패치": {"action": "patch", "scope": "security"},
            "재시작": {"action": "restart", "scope": "service"},
            "상태": {"action": "status", "scope": "health"},
            "성능": {"action": "monitor", "scope": "performance"},
            "보안": {"action": "scan", "scope": "security"},
            "백업": {"action": "backup", "scope": "data"}
        }
        
        for keyword, intent in intent_map.items():
            if keyword in natural_language:
                return {
                    "intent": intent,
                    "confidence": 0.9,
                    "entities": self._extract_entities(natural_language)
                }
        
        return {
            "intent": {"action": "unknown"},
            "confidence": 0.0,
            "entities": {}
        }
    
    def _extract_entities(self, text: str) -> Dict:
        """텍스트에서 엔티티 추출"""
        entities = {}
        
        # 태그 추출
        if "서버" in text:
            entities["device_type"] = "server"
        elif "워크스테이션" in text:
            entities["device_type"] = "workstation"
        
        # 긴급도 추출
        if "긴급" in text or "즉시" in text:
            entities["priority"] = "high"
        
        # 시간 추출
        if "1시간" in text:
            entities["timeframe"] = "1h"
        elif "오늘" in text:
            entities["timeframe"] = "today"
        
        return entities
    
    async def generate_script(self, task: str, context: Dict) -> str:
        """작업 수행을 위한 스크립트 생성"""
        # 실제로는 LLM이 스크립트 생성
        
        scripts = {
            "update": """#!/bin/bash
# 시스템 업데이트 스크립트
apt-get update
apt-get upgrade -y
apt-get autoremove -y
""",
            "patch": """#!/bin/bash
# 보안 패치 스크립트
unattended-upgrade -d
""",
            "status": """#!/bin/bash
# 시스템 상태 확인
echo "=== System Status ==="
uptime
free -h
df -h
systemctl status --no-pager
"""
        }
        
        return scripts.get(task, "echo 'Unknown task'")

class DeviceManager:
    """디바이스 관리자"""
    
    def __init__(self):
        self.devices: Dict[str, Device] = {}
        self.policies: Dict[str, Policy] = {}
        
    async def discover_devices(self, network_range: str) -> List[Device]:
        """네트워크에서 디바이스 자동 발견"""
        discovered = []
        
        # 실제로는 nmap이나 다른 도구 사용
        # 여기서는 시뮬레이션
        for i in range(5):
            device = Device(
                id=f"device_{i}",
                hostname=f"linux-{i}.company.local",
                ip_address=f"192.168.1.{100+i}",
                os_version="Ubuntu 22.04 LTS",
                device_type="server" if i < 3 else "workstation",
                status=DeviceStatus.ONLINE,
                last_seen=datetime.now(),
                tags=["production"] if i < 3 else ["development"]
            )
            self.devices[device.id] = device
            discovered.append(device)
        
        logger.info(f"{len(discovered)}개 디바이스 발견됨")
        return discovered
    
    def get_devices_by_criteria(self, criteria: Dict) -> List[Device]:
        """조건에 맞는 디바이스 검색"""
        results = []
        
        for device in self.devices.values():
            match = True
            
            if "device_type" in criteria and device.device_type != criteria["device_type"]:
                match = False
            
            if "tags" in criteria:
                required_tags = set(criteria["tags"])
                if not required_tags.issubset(set(device.tags)):
                    match = False
            
            if "status" in criteria and device.status != criteria["status"]:
                match = False
            
            if match:
                results.append(device)
        
        return results
    
    async def update_device_status(self, device_id: str, status: DeviceStatus):
        """디바이스 상태 업데이트"""
        if device_id in self.devices:
            self.devices[device_id].status = status
            self.devices[device_id].last_seen = datetime.now()

class TaskExecutor:
    """작업 실행기"""
    
    def __init__(self, device_manager: DeviceManager):
        self.device_manager = device_manager
        self.running_tasks: Dict[str, asyncio.Task] = {}
    
    async def execute_on_device(self, device: Device, script: str) -> TaskResult:
        """디바이스에서 스크립트 실행"""
        task_id = f"task_{datetime.now().timestamp()}"
        start_time = datetime.now()
        
        try:
            # 실제로는 SSH나 에이전트를 통해 실행
            # 여기서는 시뮬레이션
            await asyncio.sleep(1)  # 실행 시뮬레이션
            
            # 랜덤하게 성공/실패 시뮬레이션
            import random
            success = random.random() > 0.1
            
            if success:
                output = f"Task completed successfully on {device.hostname}"
                error = None
            else:
                output = ""
                error = "Connection timeout"
            
            duration = (datetime.now() - start_time).total_seconds()
            
            return TaskResult(
                device_id=device.id,
                task_id=task_id,
                success=success,
                output=output,
                error=error,
                duration=duration,
                timestamp=datetime.now()
            )
            
        except Exception as e:
            return TaskResult(
                device_id=device.id,
                task_id=task_id,
                success=False,
                output="",
                error=str(e),
                duration=0,
                timestamp=datetime.now()
            )
    
    async def execute_batch(self, devices: List[Device], script: str, 
                          max_concurrent: int = 5) -> List[TaskResult]:
        """여러 디바이스에 배치 실행"""
        results = []
        
        # 동시 실행 제한
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def execute_with_limit(device):
            async with semaphore:
                return await self.execute_on_device(device, script)
        
        # 모든 작업 동시 실행
        tasks = [execute_with_limit(device) for device in devices]
        results = await asyncio.gather(*tasks)
        
        return results

class PolicyEngine:
    """정책 엔진"""
    
    def __init__(self, device_manager: DeviceManager):
        self.device_manager = device_manager
        self.policies: Dict[str, Policy] = {}
    
    def create_policy_from_natural_language(self, description: str, user: str) -> Policy:
        """자연어로부터 정책 생성"""
        # 실제로는 LLM이 정책 규칙 생성
        
        policy_templates = {
            "보안": {
                "firewall_enabled": True,
                "auto_updates": True,
                "password_policy": {
                    "min_length": 12,
                    "require_special_chars": True
                }
            },
            "성능": {
                "cpu_threshold": 80,
                "memory_threshold": 85,
                "disk_threshold": 90
            }
        }
        
        policy_type = PolicyType.SECURITY
        rules = policy_templates.get("보안", {})
        
        if "성능" in description:
            policy_type = PolicyType.PERFORMANCE
            rules = policy_templates.get("성능", {})
        
        policy = Policy(
            id=f"policy_{datetime.now().timestamp()}",
            name=f"AI Generated Policy - {description[:30]}",
            type=policy_type,
            rules=rules,
            target_tags=["all"],
            created_by=user,
            created_at=datetime.now()
        )
        
        self.policies[policy.id] = policy
        return policy
    
    async def enforce_policy(self, policy: Policy) -> Dict[str, List[TaskResult]]:
        """정책 적용"""
        # 대상 디바이스 찾기
        target_devices = self.device_manager.get_devices_by_criteria({
            "tags": policy.target_tags
        })
        
        results = {}
        
        # 정책 유형별 처리
        if policy.type == PolicyType.SECURITY:
            # 보안 정책 적용 스크립트 생성
            script = self._generate_security_script(policy.rules)
        elif policy.type == PolicyType.PERFORMANCE:
            # 성능 정책 적용 스크립트 생성
            script = self._generate_performance_script(policy.rules)
        else:
            script = "echo 'Policy type not implemented'"
        
        # 실행
        executor = TaskExecutor(self.device_manager)
        results = await executor.execute_batch(target_devices, script)
        
        return {
            "policy_id": policy.id,
            "applied_devices": len(target_devices),
            "results": results
        }
    
    def _generate_security_script(self, rules: Dict) -> str:
        """보안 정책 스크립트 생성"""
        return """#!/bin/bash
# 보안 정책 적용
ufw enable
apt-get install unattended-upgrades -y
echo 'Security policy applied'
"""
    
    def _generate_performance_script(self, rules: Dict) -> str:
        """성능 정책 스크립트 생성"""
        return """#!/bin/bash
# 성능 모니터링 설정
echo 'vm.swappiness=10' >> /etc/sysctl.conf
sysctl -p
echo 'Performance policy applied'
"""

class AIEnhancedMDM:
    """AI 강화 MDM 시스템"""
    
    def __init__(self):
        self.ai_assistant = AIAssistant("http://localhost:8080/ai")
        self.device_manager = DeviceManager()
        self.task_executor = TaskExecutor(self.device_manager)
        self.policy_engine = PolicyEngine(self.device_manager)
        self.audit_log = []
    
    async def process_natural_language_command(self, user: str, command: str) -> Dict:
        """자연어 명령 처리"""
        logger.info(f"자연어 명령 수신: {command}")
        
        # 1. 의도 해석
        intent = await self.ai_assistant.interpret_intent(command)
        
        if intent["confidence"] < 0.5:
            return {
                "success": False,
                "error": "명령을 이해할 수 없습니다. 다시 시도해주세요."
            }
        
        # 2. 작업 계획
        action = intent["intent"]["action"]
        entities = intent["entities"]
        
        # 3. 대상 디바이스 선택
        criteria = {}
        if "device_type" in entities:
            criteria["device_type"] = entities["device_type"]
        
        target_devices = self.device_manager.get_devices_by_criteria(criteria)
        
        if not target_devices:
            return {
                "success": False,
                "error": "조건에 맞는 디바이스를 찾을 수 없습니다."
            }
        
        # 4. 작업 실행 계획 생성
        execution_plan = {
            "action": action,
            "target_devices": [d.hostname for d in target_devices],
            "estimated_time": len(target_devices) * 2,  # 초
            "risk_level": self._assess_risk(action, target_devices)
        }
        
        # 5. 위험도가 높으면 확인 요청
        if execution_plan["risk_level"] == "high":
            return {
                "success": False,
                "requires_confirmation": True,
                "plan": execution_plan,
                "message": "이 작업은 위험도가 높습니다. 계속하시겠습니까?"
            }
        
        # 6. 스크립트 생성
        script = await self.ai_assistant.generate_script(action, entities)
        
        # 7. 실행
        results = await self.task_executor.execute_batch(target_devices, script)
        
        # 8. 감사 로그
        self._log_activity(user, command, intent, results)
        
        # 9. 결과 요약
        success_count = sum(1 for r in results if r.success)
        
        return {
            "success": True,
            "summary": {
                "total_devices": len(target_devices),
                "successful": success_count,
                "failed": len(target_devices) - success_count
            },
            "details": [
                {
                    "device": self.device_manager.devices[r.device_id].hostname,
                    "success": r.success,
                    "output": r.output,
                    "error": r.error
                } for r in results
            ]
        }
    
    def _assess_risk(self, action: str, devices: List[Device]) -> str:
        """작업 위험도 평가"""
        high_risk_actions = ["restart", "update", "patch"]
        
        if action in high_risk_actions:
            # 프로덕션 디바이스가 포함되어 있으면 고위험
            if any("production" in d.tags for d in devices):
                return "high"
        
        return "medium" if len(devices) > 5 else "low"
    
    def _log_activity(self, user: str, command: str, intent: Dict, results: List[TaskResult]):
        """활동 로그 기록"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "user": user,
            "command": command,
            "intent": intent,
            "results_summary": {
                "total": len(results),
                "successful": sum(1 for r in results if r.success)
            }
        }
        self.audit_log.append(log_entry)
    
    async def auto_remediate(self, issue_description: str) -> Dict:
        """자동 문제 해결"""
        logger.info(f"자동 해결 요청: {issue_description}")
        
        # AI가 문제 분석 및 해결책 제시
        remediation_plans = {
            "high_cpu": {
                "diagnosis": "CPU 사용률이 높은 프로세스 확인",
                "actions": [
                    "top 명령으로 CPU 사용률 확인",
                    "불필요한 프로세스 종료",
                    "시스템 재시작 고려"
                ]
            },
            "disk_full": {
                "diagnosis": "디스크 공간 부족",
                "actions": [
                    "대용량 파일 검색",
                    "오래된 로그 파일 정리",
                    "임시 파일 삭제"
                ]
            }
        }
        
        # 간단한 키워드 매칭
        if "CPU" in issue_description or "cpu" in issue_description:
            plan = remediation_plans["high_cpu"]
        elif "디스크" in issue_description or "disk" in issue_description:
            plan = remediation_plans["disk_full"]
        else:
            return {
                "success": False,
                "error": "문제를 자동으로 해결할 수 없습니다."
            }
        
        return {
            "success": True,
            "remediation_plan": plan,
            "message": "다음 단계를 수행하여 문제를 해결하세요."
        }
    
    async def generate_compliance_report(self, standard: str = "CIS") -> Dict:
        """컴플라이언스 보고서 생성"""
        report = {
            "standard": standard,
            "generated_at": datetime.now().isoformat(),
            "total_devices": len(self.device_manager.devices),
            "compliance_status": []
        }
        
        for device in self.device_manager.devices.values():
            # 실제로는 각 디바이스의 설정을 확인
            compliance = {
                "device": device.hostname,
                "checks": {
                    "firewall_enabled": True,
                    "auto_updates": True,
                    "password_policy": True,
                    "encryption": False  # 예시로 하나는 실패
                },
                "compliance_score": 75.0  # 4개 중 3개 통과
            }
            report["compliance_status"].append(compliance)
        
        # 전체 평균 점수
        avg_score = sum(c["compliance_score"] for c in report["compliance_status"]) / len(report["compliance_status"])
        report["average_compliance_score"] = avg_score
        
        return report

# 사용 예시 및 데모
async def main():
    """메인 데모 함수"""
    print("AI 기반 리눅스 디바이스 관리 시스템")
    print("="*50)
    
    # MDM 시스템 초기화
    mdm = AIEnhancedMDM()
    
    # 디바이스 발견
    print("\n1. 네트워크에서 디바이스 자동 발견 중...")
    devices = await mdm.device_manager.discover_devices("192.168.1.0/24")
    print(f"→ {len(devices)}개 디바이스 발견됨")
    
    # 자연어 명령 테스트
    print("\n2. 자연어 명령 처리 테스트")
    
    test_commands = [
        ("admin", "모든 서버의 현재 상태를 확인해줘"),
        ("admin", "개발 환경 워크스테이션들을 업데이트해줘"),
        ("user", "프로덕션 서버를 재시작해줘"),  # 고위험 작업
        ("admin", "CPU 사용률이 높은 서버를 찾아서 진단해줘"),
    ]
    
    for user, command in test_commands:
        print(f"\n명령: '{command}' (사용자: {user})")
        result = await mdm.process_natural_language_command(user, command)
        
        if result["success"]:
            print(f"✓ 성공: {result['summary']['successful']}/{result['summary']['total_devices']} 디바이스")
            for detail in result["details"][:2]:  # 처음 2개만 표시
                print(f"  - {detail['device']}: {detail['output'][:50]}...")
        else:
            print(f"✗ 실패: {result.get('error', '알 수 없는 오류')}")
            if result.get("requires_confirmation"):
                print(f"  → 확인 필요: {result['plan']}")
    
    # 정책 생성 및 적용
    print("\n3. AI 기반 정책 생성 및 적용")
    
    policy_requests = [
        "모든 서버에 강화된 보안 정책을 적용해줘",
        "워크스테이션의 성능을 최적화하는 정책을 만들어줘"
    ]
    
    for request in policy_requests:
        print(f"\n정책 요청: '{request}'")
        policy = mdm.policy_engine.create_policy_from_natural_language(request, "admin")
        print(f"→ 정책 생성됨: {policy.name}")
        
        # 정책 적용
        enforcement_result = await mdm.policy_engine.enforce_policy(policy)
        print(f"→ {enforcement_result['applied_devices']}개 디바이스에 적용됨")
    
    # 자동 문제 해결
    print("\n4. 자동 문제 해결 테스트")
    
    issues = [
        "서버의 CPU 사용률이 너무 높아요",
        "디스크 공간이 부족합니다"
    ]
    
    for issue in issues:
        print(f"\n문제: '{issue}'")
        remediation = await mdm.auto_remediate(issue)
        
        if remediation["success"]:
            print(f"✓ 해결 방안:")
            plan = remediation["remediation_plan"]
            print(f"  진단: {plan['diagnosis']}")
            for i, action in enumerate(plan['actions'], 1):
                print(f"  {i}. {action}")
        else:
            print(f"✗ {remediation['error']}")
    
    # 컴플라이언스 보고서
    print("\n5. 컴플라이언스 보고서 생성")
    report = await mdm.generate_compliance_report("CIS")
    print(f"→ 표준: {report['standard']}")
    print(f"→ 전체 디바이스: {report['total_devices']}개")
    print(f"→ 평균 준수율: {report['average_compliance_score']:.1f}%")
    
    # 대시보드 요약
    print("\n6. AI MDM 대시보드 요약")
    print("="*50)
    
    # 디바이스 상태 요약
    status_summary = {}
    for device in mdm.device_manager.devices.values():
        status = device.status.value
        status_summary[status] = status_summary.get(status, 0) + 1
    
    print("디바이스 상태:")
    for status, count in status_summary.items():
        print(f"  - {status}: {count}개")
    
    # 최근 활동 요약
    print(f"\n최근 활동: {len(mdm.audit_log)}개 명령 실행됨")
    
    # 활성 정책
    print(f"활성 정책: {len(mdm.policy_engine.policies)}개")

class AIDeviceMonitor:
    """AI 기반 실시간 디바이스 모니터링"""
    
    def __init__(self, mdm: AIEnhancedMDM):
        self.mdm = mdm
        self.alert_thresholds = {
            "cpu": 80,
            "memory": 85,
            "disk": 90
        }
        self.active_alerts = []
    
    async def continuous_monitoring(self):
        """지속적인 모니터링 루프"""
        while True:
            try:
                # 모든 온라인 디바이스 확인
                online_devices = [
                    d for d in self.mdm.device_manager.devices.values() 
                    if d.status == DeviceStatus.ONLINE
                ]
                
                for device in online_devices:
                    # 메트릭 수집 (시뮬레이션)
                    metrics = await self._collect_metrics(device)
                    device.metrics = metrics
                    
                    # 이상 감지
                    anomalies = self._detect_anomalies(device, metrics)
                    
                    if anomalies:
                        await self._handle_anomalies(device, anomalies)
                
                # 5분마다 반복
                await asyncio.sleep(300)
                
            except Exception as e:
                logger.error(f"모니터링 오류: {e}")
                await asyncio.sleep(60)
    
    async def _collect_metrics(self, device: Device) -> Dict:
        """디바이스 메트릭 수집"""
        # 실제로는 에이전트나 SNMP로 수집
        import random
        
        return {
            "cpu_usage": random.randint(10, 100),
            "memory_usage": random.randint(20, 95),
            "disk_usage": random.randint(30, 95),
            "network_in": random.randint(100, 10000),
            "network_out": random.randint(100, 10000),
            "uptime_hours": random.randint(1, 720)
        }
    
    def _detect_anomalies(self, device: Device, metrics: Dict) -> List[Dict]:
        """이상 징후 감지"""
        anomalies = []
        
        # 임계값 초과 확인
        for metric, threshold in self.alert_thresholds.items():
            value = metrics.get(f"{metric}_usage", 0)
            if value > threshold:
                anomalies.append({
                    "type": f"high_{metric}",
                    "value": value,
                    "threshold": threshold,
                    "severity": "high" if value > 95 else "medium"
                })
        
        # AI 기반 이상 패턴 감지 (시뮬레이션)
        if metrics.get("cpu_usage", 0) > 60 and metrics.get("memory_usage", 0) > 80:
            anomalies.append({
                "type": "resource_pressure",
                "severity": "high",
                "description": "CPU와 메모리가 동시에 높은 사용률을 보임"
            })
        
        return anomalies
    
    async def _handle_anomalies(self, device: Device, anomalies: List[Dict]):
        """이상 징후 처리"""
        for anomaly in anomalies:
            # 알림 생성
            alert = {
                "id": f"alert_{datetime.now().timestamp()}",
                "device_id": device.id,
                "device_name": device.hostname,
                "anomaly": anomaly,
                "timestamp": datetime.now(),
                "status": "active"
            }
            self.active_alerts.append(alert)
            
            # 심각도가 높으면 자동 대응
            if anomaly["severity"] == "high":
                # AI에게 자동 해결 요청
                issue_description = f"{device.hostname}에서 {anomaly['type']} 발생"
                remediation = await self.mdm.auto_remediate(issue_description)
                
                if remediation["success"]:
                    logger.info(f"자동 대응 시작: {device.hostname} - {anomaly['type']}")
                    # 실제로는 해결 스크립트 실행
    
    def get_dashboard_data(self) -> Dict:
        """대시보드용 데이터 반환"""
        return {
            "total_alerts": len(self.active_alerts),
            "critical_alerts": sum(1 for a in self.active_alerts 
                                 if a["anomaly"]["severity"] == "high"),
            "device_health": self._calculate_fleet_health(),
            "top_issues": self._get_top_issues()
        }
    
    def _calculate_fleet_health(self) -> float:
        """전체 디바이스 건강도 계산"""
        if not self.mdm.device_manager.devices:
            return 100.0
        
        healthy_devices = sum(
            1 for d in self.mdm.device_manager.devices.values()
            if d.status == DeviceStatus.ONLINE and 
            all(d.metrics.get(f"{m}_usage", 0) < t 
                for m, t in self.alert_thresholds.items())
        )
        
        return (healthy_devices / len(self.mdm.device_manager.devices)) * 100
    
    def _get_top_issues(self) -> List[Dict]:
        """가장 빈번한 문제 유형"""
        issue_count = {}
        
        for alert in self.active_alerts:
            issue_type = alert["anomaly"]["type"]
            issue_count[issue_type] = issue_count.get(issue_type, 0) + 1
        
        # 상위 5개 반환
        sorted_issues = sorted(issue_count.items(), key=lambda x: x[1], reverse=True)
        return [
            {"type": issue, "count": count} 
            for issue, count in sorted_issues[:5]
        ]

class IntegratedAIManagementUI:
    """통합 AI 관리 UI (텍스트 기반)"""
    
    def __init__(self, mdm: AIEnhancedMDM, monitor: AIDeviceMonitor):
        self.mdm = mdm
        self.monitor = monitor
        self.running = True
    
    async def run(self):
        """메인 UI 루프"""
        print("\n" + "="*60)
        print("AI 기반 리눅스 통합 관리 시스템 v2.0")
        print("="*60)
        
        while self.running:
            self.display_menu()
            choice = input("\n선택> ").strip()
            
            if choice == "1":
                await self.natural_language_interface()
            elif choice == "2":
                await self.show_dashboard()
            elif choice == "3":
                await self.manage_policies()
            elif choice == "4":
                await self.view_alerts()
            elif choice == "5":
                await self.generate_reports()
            elif choice == "6":
                await self.ai_assistant_chat()
            elif choice == "0":
                self.running = False
                print("시스템을 종료합니다.")
            else:
                print("잘못된 선택입니다.")
    
    def display_menu(self):
        """메인 메뉴 표시"""
        print("\n" + "-"*40)
        print("1. 자연어 명령 실행")
        print("2. 실시간 대시보드")
        print("3. 정책 관리")
        print("4. 알림 및 경고")
        print("5. 보고서 생성")
        print("6. AI 어시스턴트 채팅")
        print("0. 종료")
        print("-"*40)
    
    async def natural_language_interface(self):
        """자연어 명령 인터페이스"""
        print("\n자연어로 명령을 입력하세요. (뒤로 가려면 'exit' 입력)")
        
        while True:
            command = input("\n명령> ").strip()
            
            if command.lower() == 'exit':
                break
            
            print("처리 중...")
            result = await self.mdm.process_natural_language_command("admin", command)
            
            if result["success"]:
                print(f"\n✓ 명령 실행 완료")
                print(f"성공: {result['summary']['successful']}개")
                print(f"실패: {result['summary']['total_devices'] - result['summary']['successful']}개")
            else:
                print(f"\n✗ 오류: {result.get('error', '알 수 없는 오류')}")
    
    async def show_dashboard(self):
        """실시간 대시보드 표시"""
        dashboard = self.monitor.get_dashboard_data()
        
        print("\n" + "="*50)
        print("실시간 시스템 대시보드")
        print("="*50)
        
        print(f"\n전체 디바이스 건강도: {dashboard['device_health']:.1f}%")
        print(f"활성 알림: {dashboard['total_alerts']}개 (중요: {dashboard['critical_alerts']}개)")
        
        print("\n주요 문제:")
        for issue in dashboard['top_issues']:
            print(f"  - {issue['type']}: {issue['count']}건")
        
        print("\n디바이스 상태:")
        for device in list(self.mdm.device_manager.devices.values())[:5]:
            status_icon = "✓" if device.status == DeviceStatus.ONLINE else "✗"
            print(f"  {status_icon} {device.hostname}: {device.status.value}")
            if device.metrics:
                print(f"     CPU: {device.metrics.get('cpu_usage', 0)}% | "
                      f"Memory: {device.metrics.get('memory_usage', 0)}% | "
                      f"Disk: {device.metrics.get('disk_usage', 0)}%")
    
    async def ai_assistant_chat(self):
        """AI 어시스턴트 대화형 인터페이스"""
        print("\nAI 어시스턴트와 대화를 시작합니다. (종료: 'bye')")
        print("시스템 관리, 문제 해결, 최적화 등에 대해 물어보세요.\n")
        
        chat_context = []
        
        while True:
            user_input = input("You> ").strip()
            
            if user_input.lower() in ['bye', 'exit', '종료']:
                print("AI> 도움이 필요하시면 언제든 불러주세요!")
                break
            
            # AI 응답 생성 (실제로는 더 정교한 대화 모델 사용)
            response = await self._generate_ai_response(user_input, chat_context)
            print(f"AI> {response}")
            
            # 대화 컨텍스트 유지
            chat_context.append({"user": user_input, "ai": response})
            if len(chat_context) > 10:
                chat_context.pop(0)
    
    async def _generate_ai_response(self, user_input: str, context: List[Dict]) -> str:
        """AI 응답 생성"""
        # 간단한 규칙 기반 응답 (실제로는 LLM 사용)
        
        responses = {
            "도움": "제가 도와드릴 수 있는 것들:\n"
                   "- 서버 상태 확인 및 모니터링\n"
                   "- 성능 문제 진단 및 해결\n"
                   "- 보안 정책 설정 및 적용\n"
                   "- 자동화 스크립트 생성\n"
                   "무엇을 도와드릴까요?",
            
            "성능": "성능 최적화를 위해 다음을 확인해보세요:\n"
                   "1. CPU 사용률이 높은 프로세스 확인\n"
                   "2. 메모리 누수 가능성 체크\n"
                   "3. 디스크 I/O 병목 현상 분석\n"
                   "구체적인 문제가 있다면 알려주세요.",
            
            "보안": "보안 강화를 위한 권장사항:\n"
                   "1. 정기적인 보안 패치 적용\n"
                   "2. 방화벽 규칙 검토\n"
                   "3. 접근 권한 최소화\n"
                   "4. 로그 모니터링 강화\n"
                   "특정 보안 정책을 적용하시겠습니까?"
        }
        
        # 키워드 매칭
        for keyword, response in responses.items():
            if keyword in user_input:
                return response
        
        # 기본 응답
        return "네, 이해했습니다. 좀 더 구체적으로 말씀해 주시면 정확한 도움을 드릴 수 있습니다."

# 전체 시스템 실행
async def run_complete_system():
    """전체 통합 시스템 실행"""
    
    # 시스템 초기화
    mdm = AIEnhancedMDM()
    
    # 디바이스 발견
    await mdm.device_manager.discover_devices("192.168.1.0/24")
    
    # 모니터링 시작
    monitor = AIDeviceMonitor(mdm)
    
    # 백그라운드 모니터링 태스크 시작
    monitoring_task = asyncio.create_task(monitor.continuous_monitoring())
    
    # UI 실행
    ui = IntegratedAIManagementUI(mdm, monitor)
    
    try:
        await ui.run()
    finally:
        # 정리
        monitoring_task.cancel()
        try:
            await monitoring_task
        except asyncio.CancelledError:
            pass

if __name__ == "__main__":
    # 메인 실행
    asyncio.run(main())
    
    # 전체 시스템 실행 (옵션)
    # asyncio.run(run_complete_system())

핵심 인사이트

AI 프롬프트 기반 서버 관리는 단순한 기술 트렌드를 넘어 IT 운영의 패러다임을 근본적으로 바꾸고 있습니다. 주요 시사점은 다음과 같습니다.

  1. 접근성의 혁명: 복잡한 CLI 명령어를 외울 필요 없이 자연어로 시스템을 제어할 수 있어, IT 전문가가 아닌 사용자도 기본적인 서버 관리가 가능해집니다.
  2. 생산성 향상: 숙련된 DevOps 엔지니어도 반복적인 작업을 자동화하고, 복잡한 멀티스텝 작업을 간단한 프롬프트로 처리할 수 있습니다.
  3. 안전성과 편의성의 균형: 적절한 보안 아키텍처와 검증 메커니즘을 통해 편의성을 희생하지 않으면서도 안전한 운영이 가능합니다.

도입 시 고려사항

기술적 측면

  • 온프레미스 LLM vs 클라우드 API 선택
  • 기존 인프라와의 통합 방안
  • 성능 및 확장성 계획

조직적 측면

  • 단계적 도입 전략 수립
  • 팀 교육 및 문화 변화 관리
  • ROI 측정 및 개선 지표 설정

보안 측면

  • 제로 트러스트 아키텍처 적용
  • 지속적인 보안 감사 체계
  • 인시던트 대응 프로세스 구축

미래 전망

2025년 이후 우리는 다음과 같은 발전을 기대할 수 있습니다.

  1. OS 레벨 통합: 리눅스 커널 자체에 AI 인터페이스가 내장되어, 부팅 시점부터 자연어 제어가 가능한 시스템
  2. 음성 기반 운영: "Hey Linux, 서버 상태 확인해줘" 같은 음성 명령으로 핸즈프리 서버 관리
  3. 예측적 자동화: AI가 문제를 사전에 감지하고 자동으로 해결하는 자율 운영 시스템
  4. 통합 관리 플랫폼: 온프레미스, 클라우드, 엣지 디바이스를 하나의 자연어 인터페이스로 관리

자연어 기반 서버 관리는 이제 실험적 단계를 넘어 실무에 적용 가능한 수준에 도달했습니다. GitHub Copilot이 개발자의 코딩 방식을 바꾸었듯이, AI 기반 서버 관리 도구들은 시스템 관리자와 DevOps 엔지니어의 업무 방식을 혁신적으로 변화시킬 것입니다.

 

중요한 것은 이러한 도구를 단순히 명령어 번역기로만 보지 않고, 인간과 시스템 간의 새로운 소통 방식으로 이해하는 것입니다. AI는 복잡한 인프라를 더 인간 친화적으로 만들어주는 다리 역할을 하며, 우리가 본질적인 문제 해결에 더 집중할 수 있게 해줍니다.

 

조직들은 이제 "AI를 도입할 것인가?"가 아닌 "어떻게 안전하고 효과적으로 도입할 것인가?"를 고민해야 할 시점입니다. 위 제시한 아키텍처, 보안 고려사항, 그리고 실제 구현 예제들이 그 여정에 도움이 되기를 바랍니다. 미래의 서버 관리는 더 이상 검은 화면에 복잡한 명령어를 입력하는 것이 아닙니다. 그것은 마치 숙련된 동료와 대화하듯, 자연스럽게 시스템과 소통하는 새로운 경험이 될 것입니다.


본 글에서 다룬 도구들과 기술들은 빠르게 발전하고 있습니다. 최신 정보와 업데이트는 각 프로젝트의 공식 문서를 참조하시기 바랍니다. 또한 프로덕션 환경에 적용하기 전에는 충분한 테스트와 보안 검토를 거치시기를 권장합니다.

728x90
그리드형(광고전용)

댓글