마이크로서비스 아키텍처: Python으로 구현하는 현대적 시스템 설계와 개발 가이드

콘텐츠 대표 이미지 - 마이크로서비스 아키텍처: Python으로 구현하는 현대적 시스템 설계와 개발 가이드

 

 

🚀 안녕, 개발자 친구들! 오늘은 2025년 3월 5일 기준으로 최신 트렌드를 반영한 마이크로서비스 아키텍처에 대해 Python으로 어떻게 구현하는지 함께 알아볼 거야. 복잡한 개념도 쉽게 풀어서 설명할 테니 걱정 마!

🐍 Python은 마이크로서비스 개발에 정말 좋은 언어야. 간결한 문법과 풍부한 라이브러리 덕분에 빠른 개발과 확장성을 모두 잡을 수 있거든. 이 글을 통해 너도 현업에서 바로 활용할 수 있는 실전 지식을 얻어갈 수 있을 거야!

📚 목차

  1. 마이크로서비스 아키텍처 이해하기
  2. Python으로 마이크로서비스 시작하기
  3. 핵심 구성요소 구현하기
  4. 서비스 간 통신 방법
  5. 데이터베이스 전략
  6. 배포와 운영
  7. 모니터링과 로깅
  8. 실전 프로젝트: 쇼핑몰 마이크로서비스 구현
  9. 성능 최적화 전략
  10. 마이크로서비스 아키텍처의 미래

1. 마이크로서비스 아키텍처 이해하기 🧩

마이크로서비스 아키텍처는 하나의 큰 애플리케이션을 여러 개의 작은 서비스로 나누는 방식이야. 각 서비스는 독립적으로 개발, 배포, 확장이 가능하지. 이런 접근 방식이 왜 중요해졌는지 알아볼까?

1.1 모놀리식 vs 마이크로서비스

모놀리식 UI 레이어 비즈니스 로직 데이터 액세스 단일 데이터베이스 마이크로서비스 사용자 서비스 상품 서비스 주문 서비스 결제 서비스 API 게이트웨이

모놀리식 아키텍처는 전통적인 방식으로, 모든 기능이 하나의 코드베이스에 통합되어 있어. 작은 프로젝트에서는 간단하고 효율적일 수 있지만, 프로젝트가 커질수록 여러 문제가 생겨:

  1. 코드베이스가 복잡해져서 새로운 개발자가 이해하기 어려워짐
  2. 작은 변경사항도 전체 애플리케이션을 다시 배포해야 함
  3. 특정 부분만 확장하기 어려움
  4. 기술 스택을 변경하거나 업데이트하기 어려움

반면, 마이크로서비스 아키텍처는 이런 문제를 해결하기 위해 등장했어:

  1. 각 서비스가 독립적으로 개발, 배포, 확장 가능
  2. 서비스별로 다른 기술 스택 사용 가능
  3. 장애 격리(한 서비스의 문제가 전체 시스템에 영향을 미치지 않음)
  4. 팀별로 서비스를 담당하여 개발 속도 향상

1.2 마이크로서비스의 핵심 원칙

마이크로서비스를 제대로 구현하려면 몇 가지 핵심 원칙을 이해해야 해:

🔄 단일 책임 원칙: 각 서비스는 하나의 비즈니스 기능에 집중해야 해. 예를 들어, 사용자 관리, 상품 관리, 주문 처리 등으로 분리할 수 있지.

🔒 독립적인 데이터 관리: 각 서비스는 자신만의 데이터베이스를 가지고, 다른 서비스의 데이터베이스에 직접 접근하지 않아야 해.

🔌 API 기반 통신: 서비스 간 통신은 잘 정의된 API를 통해 이루어져야 해. REST, gRPC, 메시지 큐 등을 활용할 수 있어.

🚀 자동화된 배포: CI/CD 파이프라인을 통해 각 서비스를 독립적으로 빌드, 테스트, 배포할 수 있어야 해.

📊 분산 모니터링: 여러 서비스로 구성된 시스템을 효과적으로 모니터링하고 문제를 추적할 수 있어야 해.

1.3 Python이 마이크로서비스에 적합한 이유

Python은 마이크로서비스 개발에 여러 장점을 제공해:

1. 빠른 개발 속도: Python의 간결한 문법과 풍부한 라이브러리 덕분에 빠르게 프로토타입을 만들고 개발할 수 있어.

2. 강력한 웹 프레임워크: Flask, FastAPI, Django 등 다양한 웹 프레임워크를 통해 API 서비스를 쉽게 구현할 수 있어.

3. 비동기 지원: Python 3.5+ 버전부터 asyncio를 통한 비동기 프로그래밍이 가능해져서 높은 동시성을 처리할 수 있어.

4. 데이터 처리 능력: 데이터 과학, 머신러닝 등의 기능을 마이크로서비스에 쉽게 통합할 수 있어.

5. 커뮤니티 지원: 활발한 커뮤니티와 풍부한 문서, 오픈소스 라이브러리 덕분에 문제 해결이 쉬워.

2025년 현재, Python은 마이크로서비스 개발에서 가장 인기 있는 언어 중 하나로 자리 잡았어. 특히 FastAPI와 같은 최신 프레임워크는 성능과 개발 편의성을 모두 제공하면서 큰 인기를 끌고 있지.

2. Python으로 마이크로서비스 시작하기 🚀

이제 Python으로 마이크로서비스를 구현하는 방법을 알아볼게. 먼저 필요한 도구와 프레임워크부터 살펴보자!

2.1 필요한 도구와 프레임워크

🛠️ 웹 프레임워크:

  1. FastAPI: 2025년 현재 가장 인기 있는 고성능 비동기 API 프레임워크. 자동 문서화, 타입 힌팅 지원이 강점이야.
  2. Flask: 가볍고 유연한 프레임워크로, 작은 서비스에 적합해.
  3. Django REST Framework: 풀스택 Django 기반으로, 복잡한 API를 빠르게 개발할 수 있어.

🗄️ 데이터베이스:

  1. PostgreSQL: 강력한 관계형 데이터베이스로, JSON 지원과 확장성이 뛰어나.
  2. MongoDB: 문서 기반 NoSQL 데이터베이스로, 스키마 없이 유연하게 데이터를 저장할 수 있어.
  3. Redis: 인메모리 데이터 스토어로, 캐싱과 메시지 브로커로 활용 가능해.

📨 메시지 큐:

  1. RabbitMQ: 강력한 메시지 브로커로, 서비스 간 비동기 통신에 적합해.
  2. Kafka: 고성능 분산 이벤트 스트리밍 플랫폼으로, 대규모 데이터 처리에 적합해.
  3. Redis Streams: Redis의 스트림 기능을 활용한 경량 메시지 큐.

🔍 모니터링 도구:

  1. Prometheus: 메트릭 수집 및 알림 시스템.
  2. Grafana: 데이터 시각화 및 모니터링 대시보드.
  3. Jaeger/Zipkin: 분산 트레이싱 시스템.

🚢 컨테이너화 및 오케스트레이션:

  1. Docker: 애플리케이션을 컨테이너화하여 일관된 환경에서 실행할 수 있게 해줘.
  2. Kubernetes: 컨테이너 오케스트레이션 플랫폼으로, 서비스 배포, 확장, 관리를 자동화해.

2.2 첫 번째 마이크로서비스 만들기: FastAPI 활용

FastAPI는 2025년 현재 Python 마이크로서비스 개발에 가장 인기 있는 프레임워크야. 높은 성능, 자동 문서화, 타입 힌팅 지원 등 많은 장점을 제공해. 간단한 사용자 서비스를 만들어볼게:

먼저 필요한 패키지를 설치해야 해:

pip install fastapi uvicorn sqlalchemy pydantic

이제 간단한 사용자 서비스를 만들어보자:


# main.py
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
import uvicorn

# 데이터베이스 설정
SQLALCHEMY_DATABASE_URL = "sqlite:///./users.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 데이터베이스 모델
class UserDB(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    full_name = Column(String)

# Pydantic 모델
class UserCreate(BaseModel):
    username: str
    email: str
    full_name: str

class User(BaseModel):
    id: int
    username: str
    email: str
    full_name: str
    
    class Config:
        orm_mode = True

# 데이터베이스 테이블 생성
Base.metadata.create_all(bind=engine)

# FastAPI 앱 생성
app = FastAPI(title="User Microservice")

# 의존성
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# API 엔드포인트
@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = UserDB(username=user.username, email=user.email, full_name=user.full_name)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

@app.get("/users/", response_model=list[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(UserDB).offset(skip).limit(limit).all()
    return users

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
      

이 코드를 실행하면 http://localhost:8000/docs에서 자동 생성된 API 문서를 확인할 수 있어!

2.3 Docker로 마이크로서비스 컨테이너화하기

마이크로서비스는 컨테이너화하여 배포하는 것이 일반적이야. Docker를 사용해서 위에서 만든 서비스를 컨테이너화해보자:

Dockerfile 생성:


# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
      

requirements.txt 파일:


fastapi==0.110.0
uvicorn==0.27.1
sqlalchemy==2.0.27
pydantic==2.5.3
      

Docker 이미지 빌드 및 실행:


# 이미지 빌드
docker build -t user-service .

# 컨테이너 실행
docker run -d -p 8000:8000 --name user-service user-service
      

이제 첫 번째 마이크로서비스가 Docker 컨테이너 안에서 실행되고 있어! 이런 방식으로 여러 마이크로서비스를 독립적으로 개발하고 배포할 수 있지.

재능넷 같은 플랫폼에서도 이런 마이크로서비스 아키텍처를 활용하면 서비스별로 독립적인 개발과 확장이 가능해져서 플랫폼의 안정성과 확장성을 크게 높일 수 있어. 특히 사용자 서비스, 결제 서비스, 콘텐츠 서비스 등을 분리하면 각 기능을 독립적으로 관리하기 훨씬 쉬워지지!

3. 핵심 구성요소 구현하기 🧱

마이크로서비스 아키텍처에서는 여러 핵심 구성요소가 필요해. 이제 이러한 구성요소들을 Python으로 어떻게 구현하는지 알아보자!

3.1 API 게이트웨이 구현

API 게이트웨이는 클라이언트와 마이크로서비스 사이의 중간 계층으로, 다음과 같은 역할을 해:

  1. 요청 라우팅: 클라이언트 요청을 적절한 마이크로서비스로 전달
  2. 인증 및 권한 부여: 모든 요청에 대한 중앙 집중식 인증 처리
  3. 요청/응답 변환: 클라이언트와 서비스 간의 데이터 형식 변환
  4. 로드 밸런싱: 여러 서비스 인스턴스 간의 부하 분산
  5. 캐싱: 자주 요청되는 데이터 캐싱
  6. 속도 제한: API 호출 횟수 제한

Python으로 API 게이트웨이를 구현하는 방법을 알아보자. FastAPI를 사용한 간단한 게이트웨이 예제야:


# api_gateway.py
from fastapi import FastAPI, Depends, HTTPException, Header
from fastapi.responses import JSONResponse
import httpx
import jwt
from typing import Optional

app = FastAPI(title="API Gateway")

# 서비스 URL 설정
SERVICE_URLS = {
    "user": "http://user-service:8000",
    "product": "http://product-service:8001",
    "order": "http://order-service:8002",
    "payment": "http://payment-service:8003"
}

# JWT 시크릿 키
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

# 인증 함수
async def verify_token(authorization: Optional[str] = Header(None)):
    if authorization is None:
        raise HTTPException(status_code=401, detail="Authorization header missing")
    
    try:
        token = authorization.split(" ")[1]
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

# 사용자 서비스 라우팅
@app.get("/api/users/{user_id}")
async def get_user(user_id: int, token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{SERVICE_URLS['user']}/users/{user_id}")
        return JSONResponse(content=response.json(), status_code=response.status_code)

@app.post("/api/users/")
async def create_user(token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICE_URLS['user']}/users/")
        return JSONResponse(content=response.json(), status_code=response.status_code)

# 상품 서비스 라우팅
@app.get("/api/products/{product_id}")
async def get_product(product_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{SERVICE_URLS['product']}/products/{product_id}")
        return JSONResponse(content=response.json(), status_code=response.status_code)

# 주문 서비스 라우팅
@app.post("/api/orders/")
async def create_order(token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICE_URLS['order']}/orders/")
        return JSONResponse(content=response.json(), status_code=response.status_code)

# 결제 서비스 라우팅
@app.post("/api/payments/")
async def process_payment(token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICE_URLS['payment']}/payments/")
        return JSONResponse(content=response.json(), status_code=response.status_code)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("api_gateway:app", host="0.0.0.0", port=8080, reload=True)
      

이 게이트웨이는 클라이언트 요청을 받아 적절한 마이크로서비스로 전달하고, JWT 토큰을 사용해 인증을 처리해. 실제 프로덕션 환경에서는 Kong, Traefik, AWS API Gateway 같은 전문 API 게이트웨이 솔루션을 사용하는 것도 좋은 선택이야.

3.2 서비스 디스커버리 구현

서비스 디스커버리는 마이크로서비스 환경에서 서비스의 위치(IP 주소, 포트)를 동적으로 찾는 메커니즘이야. 서비스가 자동으로 확장되거나 재배포될 때 특히 중요해.

Python에서는 Consul, etcd, ZooKeeper 같은 서비스 디스커버리 도구와 통합할 수 있어. 여기서는 Consul과 통합하는 간단한 예제를 보여줄게:


# service_discovery.py
import consul
import socket
import uuid
import time

class ServiceRegistry:
    def __init__(self, service_name, service_port, consul_host="localhost", consul_port=8500):
        self.service_name = service_name
        self.service_port = service_port
        self.consul_client = consul.Consul(host=consul_host, port=consul_port)
        self.service_id = f"{service_name}-{uuid.uuid4()}"
        self.hostname = socket.gethostname()
        self.ip_address = socket.gethostbyname(self.hostname)
    
    def register(self):
        """서비스를 Consul에 등록"""
        self.consul_client.agent.service.register(
            name=self.service_name,
            service_id=self.service_id,
            address=self.ip_address,
            port=self.service_port,
            check=consul.Check.http(f"http://{self.ip_address}:{self.service_port}/health", "10s")
        )
        print(f"Service {self.service_name} registered with ID {self.service_id}")
    
    def deregister(self):
        """서비스를 Consul에서 제거"""
        self.consul_client.agent.service.deregister(self.service_id)
        print(f"Service {self.service_id} deregistered")
    
    @staticmethod
    def discover_service(service_name, consul_host="localhost", consul_port=8500):
        """서비스 인스턴스 찾기"""
        consul_client = consul.Consul(host=consul_host, port=consul_port)
        services = consul_client.catalog.service(service_name)[1]
        if not services:
            return None
        
        # 간단한 로드 밸런싱 (라운드 로빈)
        service = services[int(time.time()) % len(services)]
        return {
            "address": service["ServiceAddress"],
            "port": service["ServicePort"]
        }

# 사용 예시
if __name__ == "__main__":
    # 서비스 등록
    registry = ServiceRegistry("user-service", 8000)
    registry.register()
    
    try:
        # 서비스 실행 중...
        while True:
            time.sleep(10)
            print("Service running...")
    except KeyboardInterrupt:
        # 서비스 종료 시 등록 해제
        registry.deregister()
    
    # 다른 서비스에서 사용자 서비스 찾기
    user_service = ServiceRegistry.discover_service("user-service")
    if user_service:
        print(f"Found user service at {user_service['address']}:{user_service['port']}")
    else:
        print("User service not found")
      

이 코드를 사용하면 각 마이크로서비스가 시작될 때 자동으로 Consul에 등록되고, 다른 서비스에서는 서비스 이름만으로 해당 서비스의 위치를 찾을 수 있어.

3.3 서킷 브레이커 패턴 구현

서킷 브레이커(Circuit Breaker) 패턴은 마이크로서비스 간의 호출이 실패할 때 연쇄적인 장애를 방지하는 패턴이야. 특정 서비스가 응답하지 않을 때 계속해서 요청을 보내는 대신, 일시적으로 호출을 중단하고 기본값이나 캐시된 응답을 반환해.

Python에서는 pybreaker 라이브러리를 사용해 서킷 브레이커를 구현할 수 있어:


# circuit_breaker.py
import pybreaker
import httpx
import asyncio
from fastapi import FastAPI, HTTPException

app = FastAPI(title="Circuit Breaker Example")

# 서킷 브레이커 설정
# - max_failures: 서킷이 열리기 전 허용되는 최대 실패 횟수
# - reset_timeout: 서킷이 half-open 상태로 전환되기 전 대기 시간(초)
user_service_breaker = pybreaker.CircuitBreaker(
    fail_max=3,
    reset_timeout=30,
    exclude=[HTTPException]
)

# 서킷 브레이커 이벤트 리스너
@user_service_breaker.on_open
def on_open():
    print("Circuit breaker opened - user service is down!")

@user_service_breaker.on_close
def on_close():
    print("Circuit breaker closed - user service is operational again!")

@user_service_breaker.on_half_open
def on_half_open():
    print("Circuit breaker half-open - trying to test if user service is back")

# 서킷 브레이커로 보호된 함수
@user_service_breaker
async def get_user_from_service(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://user-service:8000/users/{user_id}", timeout=2.0)
        if response.status_code != 200:
            raise HTTPException(status_code=response.status_code, detail=response.text)
        return response.json()

# 폴백(fallback) 메커니즘
async def get_user_fallback(user_id: int):
    # 캐시에서 데이터 가져오기 또는 기본값 반환
    return {
        "id": user_id,
        "username": "unknown",
        "email": "unknown@example.com",
        "full_name": "Unknown User"
    }

# API 엔드포인트
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    try:
        # 서킷 브레이커로 보호된 함수 호출
        return await get_user_from_service(user_id)
    except pybreaker.CircuitBreakerError:
        # 서킷이 열려 있을 때 폴백 메커니즘 사용
        print(f"Circuit is OPEN! Using fallback for user {user_id}")
        return await get_user_fallback(user_id)
    except Exception as e:
        # 다른 예외 처리
        print(f"Error getting user {user_id}: {str(e)}")
        return await get_user_fallback(user_id)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("circuit_breaker:app", host="0.0.0.0", port=8081, reload=True)
      

이 코드는 사용자 서비스에 대한 호출이 연속으로 3번 실패하면 서킷을 열고, 30초 동안 추가 호출을 차단해. 그 동안에는 폴백 메커니즘을 통해 기본 사용자 정보를 반환하지. 이렇게 하면 한 서비스의 장애가 전체 시스템으로 확산되는 것을 방지할 수 있어.

이러한 패턴들은 마이크로서비스 아키텍처의 회복력(resilience)을 높이는 데 매우 중요해. 특히 여러 서비스가 상호 의존하는 복잡한 시스템에서는 이런 패턴들이 필수적이지!

4. 서비스 간 통신 방법 📡

마이크로서비스 아키텍처에서는 서비스 간 통신이 매우 중요해. 크게 두 가지 통신 방식이 있어: 동기식 통신비동기식 통신. 각각의 방식을 Python으로 어떻게 구현하는지 알아보자!

마이크로서비스 통신 방식 동기식 통신 (REST/gRPC) 서비스 A 서비스 B 요청 응답 특징 • 요청-응답 패턴 • 클라이언트가 응답을 기다림 • 간단하고 직관적인 구현 • 서비스 간 강한 결합 • REST API, gRPC 등으로 구현 비동기식 통신 (메시지 큐) 서비스 C 메시지 큐 서비스 D 발행 구독 특징 • 발행-구독 패턴 • 서비스 간 느슨한 결합 • 메시지 브로커(RabbitMQ, Kafka 등) 사용

4.1 동기식 통신: REST API

REST API는 가장 일반적인 동기식 통신 방법이야. 한 서비스가 다른 서비스의 API를 직접 호출하고 응답을 기다려. Python에서는 requests 또는 httpx 라이브러리를 사용해 구현할 수 있어:


# order_service.py (동기식 통신 예제)
from fastapi import FastAPI, HTTPException
import httpx
from pydantic import BaseModel

app = FastAPI(title="Order Service")

class OrderCreate(BaseModel):
    user_id: int
    product_id: int
    quantity: int

class Order(BaseModel):
    id: int
    user_id: int
    product_id: int
    quantity: int
    total_price: float

# 동기식으로 다른 서비스 호출
async def get_product_price(product_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://product-service:8001/products/{product_id}")
        if response.status_code != 200:
            raise HTTPException(status_code=response.status_code, detail="Product service error")
        product = response.json()
        return product["price"]

async def check_user_exists(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://user-service:8000/users/{user_id}")
        if response.status_code != 200:
            raise HTTPException(status_code=response.status_code, detail="User service error")
        return True

@app.post("/orders/", response_model=Order)
async def create_order(order: OrderCreate):
    # 사용자 확인 (동기식 호출)
    await check_user_exists(order.user_id)
    
    # 상품 가격 확인 (동기식 호출)
    price = await get_product_price(order.product_id)
    
    # 주문 생성
    total_price = price * order.quantity
    new_order = {
        "id": 1,  # 실제로는 DB에서 생성된 ID
        "user_id": order.user_id,
        "product_id": order.product_id,
        "quantity": order.quantity,
        "total_price": total_price
    }
    
    # 실제 구현에서는 DB에 저장
    
    return new_order
      

이 방식의 장점은 구현이 간단하고 직관적이라는 거야. 하지만 서비스 간 강한 결합이 생기고, 한 서비스가 응답하지 않으면 호출하는 서비스도 영향을 받을 수 있어.

4.2 동기식 통신: gRPC

gRPC는 Google에서 개발한 고성능 RPC(원격 프로시저 호출) 프레임워크야. REST API보다 더 효율적이고 타입 안전한 통신이 가능해. Python에서 gRPC를 구현하는 방법을 알아보자:

먼저 Protocol Buffers로 서비스를 정의해야 해:


// product.proto
syntax = "proto3";

package product;

service ProductService {
  rpc GetProduct (ProductRequest) returns (ProductResponse) {}
}

message ProductRequest {
  int32 product_id = 1;
}

message ProductResponse {
  int32 id = 1;
  string name = 2;
  float price = 3;
  int32 stock = 4;
}
      

이제 서버 측 구현:


# product_service.py (gRPC 서버)
import grpc
from concurrent import futures
import product_pb2
import product_pb2_grpc

class ProductServicer(product_pb2_grpc.ProductServiceServicer):
    def GetProduct(self, request, context):
        # 실제 구현에서는 데이터베이스에서 제품 정보를 가져옴
        product_id = request.product_id
        
        # 예시 데이터
        products = {
            1: {"id": 1, "name": "Laptop", "price": 1299.99, "stock": 10},
            2: {"id": 2, "name": "Smartphone", "price": 699.99, "stock": 20},
            3: {"id": 3, "name": "Headphones", "price": 149.99, "stock": 30}
        }
        
        if product_id not in products:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"Product with ID {product_id} not found")
            return product_pb2.ProductResponse()
        
        product = products[product_id]
        return product_pb2.ProductResponse(
            id=product["id"],
            name=product["name"],
            price=product["price"],
            stock=product["stock"]
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    product_pb2_grpc.add_ProductServiceServicer_to_server(ProductServicer(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    print("Product gRPC server started on port 50051")
    server.wait_for_termination()

if __name__ == '__main__':
    serve()
      

클라이언트 측 구현:


# order_service_grpc_client.py
import grpc
import product_pb2
import product_pb2_grpc
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI(title="Order Service (gRPC Client)")

class OrderCreate(BaseModel):
    user_id: int
    product_id: int
    quantity: int

class Order(BaseModel):
    id: int
    user_id: int
    product_id: int
    quantity: int
    total_price: float

# gRPC를 사용하여 제품 서비스 호출
def get_product_price(product_id: int):
    with grpc.insecure_channel('product-service:50051') as channel:
        stub = product_pb2_grpc.ProductServiceStub(channel)
        request = product_pb2.ProductRequest(product_id=product_id)
        try:
            response = stub.GetProduct(request)
            return response.price
        except grpc.RpcError as e:
            status_code = e.code()
            if status_code == grpc.StatusCode.NOT_FOUND:
                raise HTTPException(status_code=404, detail=f"Product {product_id} not found")
            else:
                raise HTTPException(status_code=500, detail=f"gRPC error: {e.details()}")

@app.post("/orders/", response_model=Order)
def create_order(order: OrderCreate):
    # 제품 가격 확인 (gRPC 호출)
    price = get_product_price(order.product_id)
    
    # 주문 생성
    total_price = price * order.quantity
    new_order = {
        "id": 1,  # 실제로는 DB에서 생성된 ID
        "user_id": order.user_id,
        "product_id": order.product_id,
        "quantity": order.quantity,
        "total_price": total_price
    }
    
    # 실제 구현에서는 DB에 저장
    
    return new_order
      

gRPC는 REST API보다 더 효율적이고, 강력한 타입 체크를 제공해. 특히 마이크로서비스 간에 많은 데이터를 주고받아야 하는 경우 좋은 선택이야. 하지만 REST보다 구현이 복잡하고, 브라우저에서 직접 호출하기 어렵다는 단점이 있어.

4.3 비동기식 통신: 메시지 큐

비동기식 통신은 서비스 간에 직접 호출하지 않고, 메시지 큐를 통해 통신해. 이 방식은 서비스 간 결합도를 낮추고 시스템의 회복력을 높여줘. Python에서는 RabbitMQ, Kafka 등의 메시지 브로커와 통합할 수 있어.

RabbitMQ를 사용한 예제를 살펴보자:

메시지 발행자(Publisher):


# order_service_publisher.py
import pika
import json
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(title="Order Service (Publisher)")

class OrderCreate(BaseModel):
    user_id: int
    product_id: int
    quantity: int

class Order(BaseModel):
    id: int
    user_id: int
    product_id: int
    quantity: int

# RabbitMQ 연결 설정
def get_rabbitmq_connection():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters('rabbitmq', 5672, '/', credentials)
    return pika.BlockingConnection(parameters)

@app.post("/orders/", response_model=Order)
def create_order(order: OrderCreate):
    # 주문 생성
    new_order = {
        "id": 1,  # 실제로는 DB에서 생성된 ID
        "user_id": order.user_id,
        "product_id": order.product_id,
        "quantity": order.quantity
    }
    
    # 실제 구현에서는 DB에 저장
    
    # 주문 생성 이벤트 발행
    connection = get_rabbitmq_connection()
    channel = connection.channel()
    
    # 큐 선언
    channel.queue_declare(queue='order_created')
    
    # 메시지 발행
    channel.basic_publish(
        exchange='',
        routing_key='order_created',
        body=json.dumps(new_order),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 메시지 지속성 보장
        )
    )
    
    connection.close()
    
    return new_order
      

메시지 소비자(Consumer):


# inventory_service_consumer.py
import pika
import json
import time

def process_order(order):
    """주문을 처리하고 재고를 업데이트하는 함수"""
    print(f"Processing order {order['id']} for product {order['product_id']}")
    # 실제 구현에서는 데이터베이스에서 재고 업데이트
    print(f"Updated inventory for product {order['product_id']}")

def callback(ch, method, properties, body):
    """메시지를 받았을 때 실행되는 콜백 함수"""
    order = json.loads(body)
    print(f"Received order: {order}")
    
    try:
        process_order(order)
        # 메시지 처리 완료 확인
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error processing order: {str(e)}")
        # 처리 실패 시 메시지 다시 큐에 넣기
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

def start_consuming():
    """메시지 소비 시작"""
    # RabbitMQ 연결 설정
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters('rabbitmq', 5672, '/', credentials)
    
    # 연결 시도 (재시도 로직 포함)
    connected = False
    while not connected:
        try:
            connection = pika.BlockingConnection(parameters)
            connected = True
        except pika.exceptions.AMQPConnectionError:
            print("Failed to connect to RabbitMQ. Retrying in 5 seconds...")
            time.sleep(5)
    
    channel = connection.channel()
    
    # 큐 선언
    channel.queue_declare(queue='order_created')
    
    # 한 번에 하나의 메시지만 처리하도록 설정
    channel.basic_qos(prefetch_count=1)
    
    # 콜백 함수 등록
    channel.basic_consume(queue='order_created', on_message_callback=callback)
    
    print("Inventory service started. Waiting for orders...")
    channel.start_consuming()

if __name__ == '__main__':
    start_consuming()
      

이 방식의 장점은 서비스 간 느슨한 결합을 제공하고, 한 서비스가 다운되어도 다른 서비스가 계속 작동할 수 있다는 거야. 메시지는 큐에 저장되므로 소비자 서비스가 다시 시작되면 처리되지 않은 메시지를 처리할 수 있어.

또한 이벤트 기반 아키텍처를 구현하기에 좋아서, 시스템을 더 확장 가능하고 유연하게 만들 수 있어. 예를 들어, 주문이 생성되면 재고 서비스, 결제 서비스, 알림 서비스 등 여러 서비스가 독립적으로 이 이벤트에 반응할 수 있지.

재능넷과 같은 플랫폼에서도 이런 비동기 통신 방식을 활용하면 사용자 활동, 결제 처리, 알림 발송 등의 기능을 효율적으로 처리할 수 있어. 특히 트래픽이 많은 서비스에서는 비동기 처리가 시스템 안정성을 크게 향상시킬 수 있지!

5. 데이터베이스 전략 💾

마이크로서비스 아키텍처에서 데이터베이스 전략은 매우 중요해. 전통적인 모놀리식 애플리케이션에서는 하나의 중앙 집중식 데이터베이스를 사용하지만, 마이크로서비스에서는 다른 접근 방식이 필요해.

5.1 데이터베이스 패턴: 데이터베이스 per 서비스

데이터베이스 per 서비스 패턴은 각 마이크로서비스가 자신만의 데이터베이스를 가지는 방식이야. 이 패턴의 장점과 구현 방법을 알아보자:

데이터베이스 per 서비스 패턴 사용자 서비스 사용자 DB (PostgreSQL) 상품 서비스 상품 DB (MongoDB) 주문 서비스 주문 DB (MySQL)

이 패턴의 주요 장점은 다음과 같아:

  1. 서비스 독립성: 각 서비스가 자신의 데이터베이스를 가지므로 독립적으로 개발, 배포, 확장할 수 있어.
  2. 기술 다양성: 각 서비스의 요구사항에 맞는 데이터베이스 기술을 선택할 수 있어 (SQL, NoSQL 등).
  3. 장애 격리: 한 데이터베이스의 장애가 다른 서비스에 영향을 미치지 않아.

Python으로 이 패턴을 구현하는 방법을 살펴보자. 여러 서비스에서 각각 다른 데이터베이스를 사용하는 예제야:

사용자 서비스 (PostgreSQL 사용):


# user_service/database.py
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# PostgreSQL 연결 설정
DATABASE_URL = "postgresql://username:password@postgres-user-db:5432/userdb"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 사용자 모델
class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    full_name = Column(String)
    hashed_password = Column(String)

# 데이터베이스 초기화
def init_db():
    Base.metadata.create_all(bind=engine)
      

상품 서비스 (MongoDB 사용):


# product_service/database.py
from pymongo import MongoClient
from bson.objectid import ObjectId

# MongoDB 연결 설정
client = MongoClient("mongodb://mongo-product-db:27017/")
db = client["productdb"]
products_collection = db["products"]

# 상품 CRUD 함수
def create_product(product_data):
    result = products_collection.insert_one(product_data)
    return str(result.inserted_id)

def get_product(product_id):
    return products_collection.find_one({"_id": ObjectId(product_id)})

def get_products(skip=0, limit=100):
    return list(products_collection.find().skip(skip).limit(limit))

def update_product(product_id, product_data):
    return products_collection.update_one(
        {"_id": ObjectId(product_id)},
        {"$set": product_data}
    )

def delete_product(product_id):
    return products_collection.delete_one({"_id": ObjectId(product_id)})
      

주문 서비스 (MySQL 사용):


# order_service/database.py
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import datetime

# MySQL 연결 설정
DATABASE_URL = "mysql+pymysql://username:password@mysql-order-db:3306/orderdb"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 주문 모델
class Order(Base):
    __tablename__ = "orders"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, index=True)
    total_amount = Column(Float)
    status = Column(String(20))
    created_at = Column(DateTime, default=datetime.datetime.utcnow)

class OrderItem(Base):
    __tablename__ = "order_items"
    
    id = Column(Integer, primary_key=True, index=True)
    order_id = Column(Integer, ForeignKey("orders.id"))
    product_id = Column(Integer, index=True)
    quantity = Column(Integer)
    price = Column(Float)
    
    order = relationship("Order", back_populates="items")

Order.items = relationship("OrderItem", back_populates="order")

# 데이터베이스 초기화
def init_db():
    Base.metadata.create_all(bind=engine)
      

5.2 데이터 일관성 유지하기

각 서비스가 자신의 데이터베이스를 가지면 데이터 일관성을 유지하는 것이 도전 과제가 돼. 이를 해결하기 위한 몇 가지 패턴을 살펴보자:

1. Saga 패턴

Saga 패턴은 여러 서비스에 걸친 트랜잭션을 관리하는 방법이야. 각 단계가 실패하면 보상 트랜잭션을 통해 이전 단계를 롤백해.

Python으로 구현한 간단한 Saga 패턴 예제:


# saga_orchestrator.py
class Saga:
    def __init__(self):
        self.steps = []
        self.compensations = []
    
    def add_step(self, step_function, compensation_function):
        self.steps.append(step_function)
        self.compensations.append(compensation_function)
    
    async def execute(self):
        current_step = 0
        try:
            # 모든 단계 실행
            for i, step in enumerate(self.steps):
                await step()
                current_step = i + 1
            return True
        except Exception as e:
            # 오류 발생 시 보상 트랜잭션 실행
            print(f"Error in step {current_step}: {str(e)}")
            for i in range(current_step - 1, -1, -1):
                try:
                    await self.compensations[i]()
                except Exception as comp_error:
                    print(f"Error in compensation {i}: {str(comp_error)}")
            return False

# 사용 예시: 주문 생성 Saga
async def create_order_saga(user_id, product_id, quantity):
    saga = Saga()
    
    # 1. 재고 확인 및 예약
    async def reserve_inventory():
        # 재고 서비스 API 호출
        print(f"Reserving {quantity} items of product {product_id}")
        # 실제 구현에서는 API 호출
    
    async def release_inventory():
        # 예약 취소
        print(f"Releasing reservation for {quantity} items of product {product_id}")
        # 실제 구현에서는 API 호출
    
    saga.add_step(reserve_inventory, release_inventory)
    
    # 2. 결제 처리
    async def process_payment():
        # 결제 서비스 API 호출
        print(f"Processing payment for user {user_id}")
        # 실제 구현에서는 API 호출
    
    async def refund_payment():
        # 결제 취소
        print(f"Refunding payment for user {user_id}")
        # 실제 구현에서는 API 호출
    
    saga.add_step(process_payment, refund_payment)
    
    # 3. 주문 생성
    async def create_order():
        # 주문 서비스 API 호출
        print(f"Creating order for user {user_id}, product {product_id}, quantity {quantity}")
        # 실제 구현에서는 API 호출
    
    async def cancel_order():
        # 주문 취소
        print(f"Cancelling order for user {user_id}")
        # 실제 구현에서는 API 호출
    
    saga.add_step(create_order, cancel_order)
    
    # Saga 실행
    success = await saga.execute()
    return success
      

2. 이벤트 소싱 (Event Sourcing)

이벤트 소싱은 상태 변경을 이벤트로 저장하고, 이벤트를 재생하여 현재 상태를 구성하는 패턴이야. 이 패턴은 데이터 일관성과 감사 추적에 유용해.

간단한 이벤트 소싱 구현:


# event_sourcing.py
from datetime import datetime
import json
import uuid

class EventStore:
    def __init__(self):
        self.events = []
    
    def append_event(self, aggregate_id, event_type, event_data):
        event = {
            "id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat(),
            "aggregate_id": aggregate_id,
            "type": event_type,
            "data": event_data
        }
        self.events.append(event)
        return event
    
    def get_events_for_aggregate(self, aggregate_id):
        return [event for event in self.events if event["aggregate_id"] == aggregate_id]

class Order:
    def __init__(self, order_id, event_store):
        self.order_id = order_id
        self.event_store = event_store
        self.status = "new"
        self.items = []
        self.total_amount = 0
        
        # 이벤트 재생하여 현재 상태 구성
        self._replay_events()
    
    def _replay_events(self):
        for event in self.event_store.get_events_for_aggregate(self.order_id):
            self._apply_event(event)
    
    def _apply_event(self, event):
        if event["type"] == "OrderCreated":
            self.user_id = event["data"]["user_id"]
        elif event["type"] == "ItemAdded":
            self.items.append(event["data"])
            self.total_amount += event["data"]["price"] * event["data"]["quantity"]
        elif event["type"] == "OrderPaid":
            self.status = "paid"
        elif event["type"] == "OrderShipped":
            self.status = "shipped"
        elif event["type"] == "OrderCancelled":
            self.status = "cancelled"
    
    def create(self, user_id):
        event = self.event_store.append_event(
            self.order_id,
            "OrderCreated",
            {"user_id": user_id}
        )
        self._apply_event(event)
    
    def add_item(self, product_id, quantity, price):
        event = self.event_store.append_event(
            self.order_id,
            "ItemAdded",
            {
                "product_id": product_id,
                "quantity": quantity,
                "price": price
            }
        )
        self._apply_event(event)
    
    def pay(self):
        event = self.event_store.append_event(
            self.order_id,
            "OrderPaid",
            {}
        )
        self._apply_event(event)
    
    def ship(self):
        event = self.event_store.append_event(
            self.order_id,
            "OrderShipped",
            {}
        )
        self._apply_event(event)
    
    def cancel(self):
        event = self.event_store.append_event(
            self.order_id,
            "OrderCancelled",
            {}
        )
        self._apply_event(event)

# 사용 예시
event_store = EventStore()
order_id = str(uuid.uuid4())
order = Order(order_id, event_store)

# 주문 생성 및 아이템 추가
order.create(user_id="user123")
order.add_item(product_id="prod456", quantity=2, price=29.99)
order.add_item(product_id="prod789", quantity=1, price=49.99)
order.pay()
order.ship()

# 주문 상태 출력
print(f"Order ID: {order.order_id}")
print(f"Status: {order.status}")
print(f"Items: {len(order.items)}")
print(f"Total Amount: ${order.total_amount:.2f}")

# 이벤트 히스토리 출력
print("\nEvent History:")
for event in event_store.get_events_for_aggregate(order_id):
    print(f"{event['timestamp']} - {event['type']}")
      

5.3 CQRS (Command Query Responsibility Segregation) 패턴

CQRS는 데이터의 쓰기(Command)읽기(Query) 작업을 분리하는 패턴이야. 이 패턴은 복잡한 도메인에서 성능, 확장성, 보안을 개선하는 데 도움이 돼.

CQRS 패턴 클라이언트 명령 모델 (Command) 데이터 생성, 수정, 삭제 쿼리 모델 (Query) 데이터 조회 최적화 명령 데이터베이스 (정규화된 스키마) 쿼리 데이터베이스 (비정규화된 스키마) 명령 쿼리 이벤트 기반 동기화

Python으로 CQRS 패턴을 구현하는 간단한 예제를 살펴보자:


# cqrs_example.py
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import datetime
import json
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
import asyncio

app = FastAPI(title="CQRS Example")

# 명령 모델 (쓰기 작업용)
command_engine = create_engine("postgresql://username:password@postgres-command-db:5432/commanddb")
CommandSession = sessionmaker(autocommit=False, autoflush=False, bind=command_engine)
CommandBase = declarative_base()

class ProductCommand(CommandBase):
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(Text)
    price = Column(Float)
    stock = Column(Integer)
    created_at = Column(DateTime, default=datetime.datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow)

# 쿼리 모델 (읽기 작업용)
query_engine = create_engine("postgresql://username:password@postgres-query-db:5432/querydb")
QuerySession = sessionmaker(autocommit=False, autoflush=False, bind=query_engine)
QueryBase = declarative_base()

class ProductQuery(QueryBase):
    __tablename__ = "product_views"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(Text)
    price = Column(Float)
    stock = Column(Integer)
    category = Column(String, index=True)
    tags = Column(String)  # JSON 문자열로 저장된 태그
    average_rating = Column(Float)
    review_count = Column(Integer)
    updated_at = Column(DateTime, default=datetime.datetime.utcnow)

# 데이터베이스 초기화
CommandBase.metadata.create_all(bind=command_engine)
QueryBase.metadata.create_all(bind=query_engine)

# 이벤트 저장소 (실제로는 메시지 큐나 이벤트 스트림을 사용)
class EventStore:
    events = []
    
    @classmethod
    def publish_event(cls, event_type, event_data):
        event = {
            "id": len(cls.events) + 1,
            "timestamp": datetime.datetime.utcnow().isoformat(),
            "type": event_type,
            "data": event_data
        }
        cls.events.append(event)
        # 실제로는 여기서 메시지 큐에 이벤트를 발행
        asyncio.create_task(cls.process_event(event))
        return event
    
    @classmethod
    async def process_event(cls, event):
        # 이벤트 처리 (쿼리 모델 업데이트)
        if event["type"] == "ProductCreated" or event["type"] == "ProductUpdated":
            data = event["data"]
            db = QuerySession()
            try:
                # 쿼리 모델에서 제품 찾기 또는 새로 생성
                product = db.query(ProductQuery).filter(ProductQuery.id == data["id"]).first()
                if not product:
                    product = ProductQuery(id=data["id"])
                
                # 기본 필드 업데이트
                product.name = data["name"]
                product.description = data["description"]
                product.price = data["price"]
                product.stock = data["stock"]
                
                # 추가 필드 (실제로는 다른 서비스에서 가져올 수 있음)
                if "category" in data:
                    product.category = data["category"]
                if "tags" in data:
                    product.tags = json.dumps(data["tags"])
                if "average_rating" in data:
                    product.average_rating = data["average_rating"]
                if "review_count" in data:
                    product.review_count = data["review_count"]
                
                product.updated_at = datetime.datetime.utcnow()
                
                db.add(product)
                db.commit()
                print(f"Query model updated for product {data['id']}")
            except Exception as e:
                db.rollback()
                print(f"Error updating query model: {str(e)}")
            finally:
                db.close()

# API 모델
class ProductCreate(BaseModel):
    name: str
    description: str
    price: float
    stock: int
    category: str = None
    tags: list = None

class ProductResponse(BaseModel):
    id: int
    name: str
    description: str
    price: float
    stock: int
    category: str = None
    tags: list = None
    average_rating: float = None
    review_count: int = None

# 의존성
def get_command_db():
    db = CommandSession()
    try:
        yield db
    finally:
        db.close()

def get_query_db():
    db = QuerySession()
    try:
        yield db
    finally:
        db.close()

# 명령 API (쓰기 작업)
@app.post("/products/", response_model=ProductResponse)
def create_product(product: ProductCreate, db: CommandSession = Depends(get_command_db)):
    db_product = ProductCommand(
        name=product.name,
        description=product.description,
        price=product.price,
        stock=product.stock
    )
    db.add(db_product)
    db.commit()
    db.refresh(db_product)
    
    # 이벤트 발행
    event_data = {
        "id": db_product.id,
        "name": db_product.name,
        "description": db_product.description,
        "price": db_product.price,
        "stock": db_product.stock,
        "category": product.category,
        "tags": product.tags
    }
    EventStore.publish_event("ProductCreated", event_data)
    
    return db_product

@app.put("/products/{product_id}", response_model=ProductResponse)
def update_product(product_id: int, product: ProductCreate, db: CommandSession = Depends(get_command_db)):
    db_product = db.query(ProductCommand).filter(ProductCommand.id == product_id).first()
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    
    db_product.name = product.name
    db_product.description = product.description
    db_product.price = product.price
    db_product.stock = product.stock
    db_product.updated_at = datetime.datetime.utcnow()
    
    db.commit()
    db.refresh(db_product)
    
    # 이벤트 발행
    event_data = {
        "id": db_product.id,
        "name": db_product.name,
        "description": db_product.description,
        "price": db_product.price,
        "stock": db_product.stock,
        "category": product.category,
        "tags": product.tags
    }
    EventStore.publish_event("ProductUpdated", event_data)
    
    return db_product

# 쿼리 API (읽기 작업)
@app.get("/products/{product_id}", response_model=ProductResponse)
def read_product(product_id: int, db: QuerySession = Depends(get_query_db)):
    db_product = db.query(ProductQuery).filter(ProductQuery.id == product_id).first()
    if db_product is None:
        raise HTTPException(status_code=404, detail="Product not found")
    
    response = {
        "id": db_product.id,
        "name": db_product.name,
        "description": db_product.description,
        "price": db_product.price,
        "stock": db_product.stock,
        "category": db_product.category,
        "tags": json.loads(db_product.tags) if db_product.tags else None,
        "average_rating": db_product.average_rating,
        "review_count": db_product.review_count
    }
    
    return response

@app.get("/products/", response_model=list[ProductResponse])
def read_products(skip: int = 0, limit: int = 100, category: str = None, db: QuerySession = Depends(get_query_db)):
    query = db.query(ProductQuery)
    
    if category:
        query = query.filter(ProductQuery.category == category)
    
    products = query.offset(skip).limit(limit).all()
    
    response = []
    for db_product in products:
        product_data = {
            "id": db_product.id,
            "name": db_product.name,
            "description": db_product.description,
            "price": db_product.price,
            "stock": db_product.stock,
            "category": db_product.category,
            "tags": json.loads(db_product.tags) if db_product.tags else None,
            "average_rating": db_product.average_rating,
            "review_count": db_product.review_count
        }
        response.append(product_data)
    
    return response
      

CQRS 패턴은 복잡한 비즈니스 로직과 높은 읽기 성능이 필요한 시스템에 적합해. 특히 재능넷과 같은 플랫폼에서 사용자 프로필, 서비스 목록, 리뷰 등의 데이터를 효율적으로 관리하는 데 유용할 수 있어. 쓰기 작업은 정규화된 데이터베이스를 사용하고, 읽기 작업은 비정규화된 뷰를 사용함으로써 각각의 요구사항에 최적화된 성능을 제공할 수 있지!

6. 배포와 운영 🚢

마이크로서비스를 효과적으로 배포하고 운영하는 것은 성공적인 아키텍처 구현의 핵심이야. 여기서는 Docker와 Kubernetes를 사용한 배포 방법과 CI/CD 파이프라인 구축에 대해 알아보자!

6.1 Docker 컨테이너화

앞서 간단히 살펴봤지만, 여기서는 더 자세하게 마이크로서비스를 Docker로 컨테이너화하는 방법을 알아볼게.

먼저 각 서비스별로 Dockerfile을 만들어야 해. 사용자 서비스의 Dockerfile 예시:


# user-service/Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 의존성 파일 복사 및 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 애플리케이션 코드 복사
COPY . .

# 포트 노출
EXPOSE 8000

# 헬스체크 엔드포인트 추가
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
  CMD curl -f http://localhost:8000/health || exit 1

# 애플리케이션 실행
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
      

requirements.txt 파일:


fastapi==0.110.0
uvicorn==0.27.1
sqlalchemy==2.0.27
pydantic==2.5.3
psycopg2-binary==2.9.9
python-jose==3.3.0
passlib==1.7.4
python-multipart==0.0.9
      

이제 여러 서비스를 함께 실행하기 위한 docker-compose.yml 파일을 만들자:


# docker-compose.yml
version: '3.8'

services:
  # API 게이트웨이
  api-gateway:
    build: ./api-gateway
    ports:
      - "8080:8080"
    depends_on:
      - user-service
      - product-service
      - order-service
    environment:
      - USER_SERVICE_URL=http://user-service:8000
      - PRODUCT_SERVICE_URL=http://product-service:8001
      - ORDER_SERVICE_URL=http://order-service:8002
    networks:
      - microservices-net

  # 사용자 서비스
  user-service:
    build: ./user-service
    ports:
      - "8000:8000"
    depends_on:
      - postgres-user-db
    environment:
      - DATABASE_URL=postgresql://postgres:postgres@postgres-user-db:5432/userdb
      - SECRET_KEY=your-secret-key
    networks:
      - microservices-net

  # 상품 서비스
  product-service:
    build: ./product-service
    ports:
      - "8001:8001"
    depends_on:
      - mongo-product-db
    environment:
      - MONGODB_URI=mongodb://mongo-product-db:27017/productdb
    networks:
      - microservices-net

  # 주문 서비스
  order-service:
    build: ./order-service
    ports:
      - "8002:8002"
    depends_on:
      - mysql-order-db
      - rabbitmq
    environment:
      - DATABASE_URL=mysql+pymysql://root:root@mysql-order-db:3306/orderdb
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672/
    networks:
      - microservices-net

  # 사용자 데이터베이스
  postgres-user-db:
    image: postgres:15
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=userdb
    volumes:
      - postgres-user-data:/var/lib/postgresql/data
    networks:
      - microservices-net

  # 상품 데이터베이스
  mongo-product-db:
    image: mongo:6
    volumes:
      - mongo-product-data:/data/db
    networks:
      - microservices-net

  # 주문 데이터베이스
  mysql-order-db:
    image: mysql:8
    environment:
      - MYSQL_ROOT_PASSWORD=root
      - MYSQL_DATABASE=orderdb
    volumes:
      - mysql-order-data:/var/lib/mysql
    networks:
      - microservices-net

  # 메시지 브로커
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    networks:
      - microservices-net

networks:
  microservices-net:
    driver: bridge

volumes:
  postgres-user-data:
  mongo-product-data:
  mysql-order-data:
      

이 docker-compose.yml 파일은 API 게이트웨이, 세 개의 마이크로서비스, 각각의 데이터베이스, 그리고 메시지 브로커를 함께 실행해. 각 서비스는 자신의 환경 변수를 통해 구성되고, 모두 같은 네트워크에 연결되어 서로 통신할 수 있어.

6.2 Kubernetes 배포

프로덕션 환경에서는 Kubernetes를 사용하여 마이크로서비스를 배포하고 관리하는 것이 일반적이야. Kubernetes는 컨테이너 오케스트레이션 플랫폼으로, 자동 확장, 롤링 업데이트, 자가 복구 등의 기능을 제공해.

간단한 Kubernetes 매니페스트 파일을 살펴보자:

사용자 서비스의 Deployment 및 Service:


# user-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: user-service
  labels:
    app: user-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: user-service
  template:
    metadata:
      labels:
        app: user-service
    spec:
      containers:
      - name: user-service
        image: your-registry/user-service:latest
        ports:
        - containerPort: 8000
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: user-service-secrets
              key: database-url
        - name: SECRET_KEY
          valueFrom:
            secretKeyRef:
              name: user-service-secrets
              key: secret-key
        resources:
          limits:
            cpu: "500m"
            memory: "512Mi"
          requests:
            cpu: "100m"
            memory: "256Mi"
        readinessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 15
          periodSeconds: 20
---
apiVersion: v1
kind: Service
metadata:
  name: user-service
spec:
  selector:
    app: user-service
  ports:
  - port: 8000
    targetPort: 8000
  type: ClusterIP
      

API 게이트웨이의 Deployment 및 Service:


# api-gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: api-gateway
  labels:
    app: api-gateway
spec:
  replicas: 2
  selector:
    matchLabels:
      app: api-gateway
  template:
    metadata:
      labels:
        app: api-gateway
    spec:
      containers:
      - name: api-gateway
        image: your-registry/api-gateway:latest
        ports:
        - containerPort: 8080
        env:
        - name: USER_SERVICE_URL
          value: "http://user-service:8000"
        - name: PRODUCT_SERVICE_URL
          value: "http://product-service:8001"
        - name: ORDER_SERVICE_URL
          value: "http://order-service:8002"
        resources:
          limits:
            cpu: "500m"
            memory: "512Mi"
          requests:
            cpu: "100m"
            memory: "256Mi"
---
apiVersion: v1
kind: Service
metadata:
  name: api-gateway
spec:
  selector:
    app: api-gateway
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer
      

Kubernetes ConfigMap 및 Secret:


# configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: microservices-config
data:
  ENVIRONMENT: "production"
  LOG_LEVEL: "info"
---
# secrets.yaml (실제로는 base64로 인코딩된 값을 사용)
apiVersion: v1
kind: Secret
metadata:
  name: user-service-secrets
type: Opaque
data:
  database-url: cG9zdGdyZXNxbDovL3Bvc3RncmVzOnBvc3RncmVzQHBvc3RncmVzLXVzZXItZGI6NTQzMi91c2VyZGI=
  secret-key: eW91ci1zZWNyZXQta2V5
      

이 Kubernetes 매니페스트 파일들은 마이크로서비스를 배포하고, 서비스 디스커버리를 설정하며, 구성과 비밀을 관리하는 방법을 보여줘. 실제 프로덕션 환경에서는 Helm 차트를 사용하여 이러한 매니페스트를 더 쉽게 관리할 수 있어.

6.3 CI/CD 파이프라인 구축

지속적 통합(CI)과 지속적 배포(CD)는 마이크로서비스 개발 및 배포 프로세스를 자동화하는 데 필수적이야. GitHub Actions를 사용한 간단한 CI/CD 파이프라인을 살펴보자:


# .github/workflows/user-service-ci-cd.yml
name: User Service CI/CD

on:
  push:
    branches: [ main ]
    paths:
      - 'user-service/**'
      - '.github/workflows/user-service-ci-cd.yml'
  pull_request:
    branches: [ main ]
    paths:
      - 'user-service/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    - name: Install dependencies
      run: |
        cd user-service
        python -m pip install --upgrade pip
        pip install pytest pytest-cov
        if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
    - name: Test with pytest
      run: |
        cd user-service
        pytest --cov=. --cov-report=xml
    - name: Upload coverage to Codecov
      uses: codecov/codecov-action@v3
      with:
        file: ./user-service/coverage.xml
        flags: user-service
        fail_ci_if_error: true

  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    steps:
    - uses: actions/checkout@v3
    - name: Set up Docker Buildx
      uses: docker/setup-buildx-action@v2
    - name: Login to DockerHub
      uses: docker/login-action@v2
      with:
        username: ${{ secrets.DOCKERHUB_USERNAME }}
        password: ${{ secrets.DOCKERHUB_TOKEN }}
    - name: Build and push
      uses: docker/build-push-action@v4
      with:
        context: ./user-service
        push: true
        tags: your-registry/user-service:latest,your-registry/user-service:${{ github.sha }}
        cache-from: type=registry,ref=your-registry/user-service:buildcache
        cache-to: type=registry,ref=your-registry/user-service:buildcache,mode=max

  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    if: github.event_name == 'push'
    steps:
    - uses: actions/checkout@v3
    - name: Set up kubectl
      uses: azure/setup-kubectl@v3
    - name: Set Kubernetes context
      uses: azure/k8s-set-context@v3
      with:
        kubeconfig: ${{ secrets.KUBE_CONFIG }}
    - name: Update deployment image
      run: |
        kubectl set image deployment/user-service user-service=your-registry/user-service:${{ github.sha }} --record
    - name: Verify deployment
      run: |
        kubectl rollout status deployment/user-service
      

이 GitHub Actions 워크플로우는 다음과 같은 단계를 자동화해:

  1. 테스트: 코드를 체크아웃하고 단위 테스트를 실행해 코드 품질을 확인해.
  2. 빌드 및 푸시: Docker 이미지를 빌드하고 레지스트리에 푸시해.
  3. 배포: Kubernetes 클러스터에 새 이미지를 배포하고 롤아웃 상태를 확인해.

이러한 CI/CD 파이프라인을 통해 개발자는 코드를 푸시하기만 하면 자동으로 테스트, 빌드, 배포가 이루어져서 개발 속도를 높이고 배포 위험을 줄일 수 있어.

재능넷과 같은 플랫폼에서도 이런 CI/CD 파이프라인을 구축하면 새로운 기능을 빠르고 안전하게 배포할 수 있어. 특히 마이크로서비스 아키텍처에서는 각 서비스를 독립적으로 배포할 수 있기 때문에, 전체 시스템을 중단하지 않고도 특정 기능을 업데이트할 수 있다는 큰 장점이 있지!

7. 모니터링과 로깅 📊

마이크로서비스 아키텍처에서는 여러 서비스가 분산되어 있기 때문에 효과적인 모니터링과 로깅이 매우 중요해. 문제가 발생했을 때 빠르게 감지하고 해결할 수 있어야 하거든. Python 마이크로서비스에서 모니터링과 로깅을 구현하는 방법을 알아보자!

7.1 분산 로깅 구현

분산 시스템에서는 여러 서비스의 로그를 중앙에서 수집하고 분석하는 것이 중요해. ELK 스택(Elasticsearch, Logstash, Kibana) 또는 EFK 스택(Elasticsearch, Fluentd, Kibana)이 일반적으로 사용돼.

Python 마이크로서비스에서 구조화된 로깅을 구현하는 방법을 살펴보자:


# logger.py
import logging
import json
import uuid
import time
from datetime import datetime
import socket
import os
from pythonjsonlogger import jsonlogger

class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def add_fields(self, log_record, record, message_dict):
        super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
        
        # 표준 필드 추가
        log_record['timestamp'] = datetime.utcnow().isoformat()
        log_record['level'] = record.levelname
        log_record['logger'] = record.name
        
        # 서비스 정보 추가
        log_record['service'] = os.environ.get('SERVICE_NAME', 'unknown-service')
        log_record['host'] = socket.gethostname()
        log_record['environment'] = os.environ.get('ENVIRONMENT', 'development')
        
        # 요청 추적을 위한 필드
        if hasattr(record, 'request_id'):
            log_record['request_id'] = record.request_id
        else:
            log_record['request_id'] = 'no-request-id'

def get_logger(name):
    logger = logging.getLogger(name)
    
    # 로거가 이미 핸들러를 가지고 있으면 중복 설정 방지
    if logger.handlers:
        return logger
    
    # 로그 레벨 설정
    log_level = os.environ.get('LOG_LEVEL', 'INFO').upper()
    logger.setLevel(getattr(logging, log_level))
    
    # 콘솔 핸들러 추가
    handler = logging.StreamHandler()
    
    # JSON 포맷터 설정
    formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
    handler.setFormatter(formatter)
    
    logger.addHandler(handler)
    return logger

# 미들웨어에서 사용할 요청 ID 생성 함수
def generate_request_id():
    return str(uuid.uuid4())

# FastAPI 미들웨어 예시
class RequestIdMiddleware:
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            return await self.app(scope, receive, send)
        
        # 요청 ID 생성
        request_id = generate_request_id()
        
        # 로깅 컨텍스트에 요청 ID 추가
        logging.LoggerAdapter(get_logger("request"), {"request_id": request_id})
        
        # 스코프에 요청 ID 추가 (미들웨어에서 접근 가능)
        scope["request_id"] = request_id
        
        # 응답 헤더에 요청 ID 추가
        original_send = send
        
        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                headers = message.get("headers", [])
                headers.append((b"X-Request-ID", request_id.encode()))
                message["headers"] = headers
            await original_send(message)
        
        await self.app(scope, receive, send_wrapper)

# 사용 예시
logger = get_logger(__name__)

def process_order(order_id, user_id):
    logger.info("Processing order", extra={
        "order_id": order_id,
        "user_id": user_id,
        "operation": "process_order"
    })
    
    try:
        # 주문 처리 로직
        logger.debug("Order details retrieved", extra={
            "order_id": order_id,
            "status": "processing"
        })
        
        # 결제 처리
        logger.info("Payment processed", extra={
            "order_id": order_id,
            "payment_status": "success"
        })
        
        return True
    except Exception as e:
        logger.error("Order processing failed", extra={
            "order_id": order_id,
            "error": str(e),
            "exception_type": type(e).__name__
        })
        return False
      

이 코드는 구조화된 JSON 로깅을 구현하고, 요청 ID를 통해 여러 서비스에 걸친 요청을 추적할 수 있게 해. 이러한 로그는 Elasticsearch에 저장되고 Kibana를 통해 시각화될 수 있어.

7.2 메트릭 수집 및 모니터링

메트릭은 시스템의 상태와 성능을 이해하는 데 중요해. Prometheus와 Grafana는 메트릭 수집 및 시각화를 위한 인기 있는 도구야. Python 마이크로서비스에서 메트릭을 수집하는 방법을 살펴보자:


# metrics.py
from prometheus_client import Counter, Histogram, Gauge, Summary, start_http_server
import time
import random
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

# 메트릭 정의
REQUEST_COUNT = Counter(
    'app_request_count',
    'Application Request Count',
    ['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
    'app_request_latency_seconds',
    'Application Request Latency',
    ['method', 'endpoint']
)

ACTIVE_REQUESTS = Gauge(
    'app_active_requests',
    'Active Requests'
)

DB_QUERY_LATENCY = Summary(
    'app_db_query_latency_seconds',
    'Database Query Latency'
)

# FastAPI 미들웨어
class PrometheusMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        # 활성 요청 수 증가
        ACTIVE_REQUESTS.inc()
        
        # 메서드와 경로 추출
        method = request.method
        endpoint = request.url.path
        
        try:
            response = await call_next(request)
            
            # 요청 카운터 증가
            REQUEST_COUNT.labels(
                method=method,
                endpoint=endpoint,
                status=response.status_code
            ).inc()
            
            return response
        except Exception as e:
            # 예외 발생 시 500 상태 코드로 카운트
            REQUEST_COUNT.labels(
                method=method,
                endpoint=endpoint,
                status=500
            ).inc()
            raise e
        finally:
            # 요청 처리 시간 기록
            REQUEST_LATENCY.labels(
                method=method,
                endpoint=endpoint
            ).observe(time.time() - start_time)
            
            # 활성 요청 수 감소
            ACTIVE_REQUESTS.dec()

# 데이터베이스 쿼리 시간 측정 데코레이터
def track_db_query_time(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        DB_QUERY_LATENCY.observe(time.time() - start_time)
        return result
    return wrapper

# FastAPI 애플리케이션에 미들웨어 등록 예시
def setup_metrics(app: FastAPI, port=8000):
    # Prometheus 메트릭 서버 시작
    start_http_server(port)
    
    # 미들웨어 등록
    app.add_middleware(PrometheusMiddleware)
    
    # 메트릭 엔드포인트 추가
    @app.get("/metrics")
    async def metrics():
        return {"message": f"Metrics available at http://localhost:{port}"}

# 사용 예시
app = FastAPI()
setup_metrics(app, port=8001)  # 메트릭은 별도 포트에서 제공

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # 데이터베이스 쿼리 시뮬레이션
    query_result = simulate_db_query(user_id)
    return {"user_id": user_id, "data": query_result}

@track_db_query_time
def simulate_db_query(user_id):
    # 데이터베이스 쿼리 시뮬레이션
    time.sleep(random.uniform(0.05, 0.2))  # 50-200ms 쿼리 시간 시뮬레이션
    return {"name": f"User {user_id}", "email": f"user{user_id}@example.com"}
      

이 코드는 Prometheus 클라이언트를 사용하여 요청 수, 지연 시간, 활성 요청 수, 데이터베이스 쿼리 시간 등의 메트릭을 수집해. 이러한 메트릭은 Grafana 대시보드를 통해 시각화될 수 있어.

7.3 분산 트레이싱

분산 트레이싱은 여러 마이크로서비스에 걸친 요청의 흐름을 추적하는 기술이야. OpenTelemetry는 분산 트레이싱을 위한 표준 API, SDK, 도구를 제공해. Python에서 OpenTelemetry를 사용하는 방법을 살펴보자:


# tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from fastapi import FastAPI
import os

def setup_tracing(app: FastAPI, service_name: str):
    # 리소스 설정
    resource = Resource(attributes={
        SERVICE_NAME: service_name
    })
    
    # 트레이서 프로바이더 설정
    tracer_provider = TracerProvider(resource=resource)
    
    # OTLP 익스포터 설정 (Jaeger 또는 다른 백엔드로 전송)
    otlp_exporter = OTLPSpanExporter(
        endpoint=os.environ.get("OTLP_ENDPOINT", "http://jaeger:4317")
    )
    
    # 배치 프로세서 설정
    span_processor = BatchSpanProcessor(otlp_exporter)
    tracer_provider.add_span_processor(span_processor)
    
    # 글로벌 트레이서 프로바이더 설정
    trace.set_tracer_provider(tracer_provider)
    
    # FastAPI 계측
    FastAPIInstrumentor.instrument_app(app)
    
    # HTTP 클라이언트 계측
    RequestsInstrumentor().instrument()
    
    # SQLAlchemy 계측 (선택적)
    # SQLAlchemyInstrumentor().instrument()
    
    return trace.get_tracer(service_name)

# 사용 예시
app = FastAPI()
tracer = setup_tracing(app, "user-service")

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    # 수동 트레이싱 예시
    with tracer.start_as_current_span("get_user_details") as span:
        # 스팬에 속성 추가
        span.set_attribute("user.id", user_id)
        
        # 데이터베이스 쿼리 시뮬레이션
        user_data = await get_user_from_database(user_id)
        
        # 결과에 따라 스팬 상태 설정
        if user_data:
            span.set_status(trace.Status(trace.StatusCode.OK))
        else:
            span.set_status(trace.Status(trace.StatusCode.ERROR))
            span.record_exception(Exception(f"User {user_id} not found"))
        
        return user_data

async def get_user_from_database(user_id: int):
    # 중첩된 스팬 생성
    with tracer.start_as_current_span("database.query") as span:
        span.set_attribute("db.query", f"SELECT * FROM users WHERE id = {user_id}")
        
        # 데이터베이스 쿼리 시뮬레이션
        import time
        time.sleep(0.1)
        
        # 다른 서비스 호출 시뮬레이션
        await get_user_preferences(user_id)
        
        return {"id": user_id, "name": f"User {user_id}", "email": f"user{user_id}@example.com"}

async def get_user_preferences(user_id: int):
    # 다른 서비스 호출을 나타내는 스팬
    with tracer.start_as_current_span("get_user_preferences") as span:
        span.set_attribute("user.id", user_id)
        
        # 외부 서비스 호출 시뮬레이션
        import time
        time.sleep(0.05)
        
        return {"theme": "dark", "language": "en"}
      

이 코드는 OpenTelemetry를 사용하여 분산 트레이싱을 구현해. 여러 서비스에 걸친 요청의 흐름을 시각화하고, 성능 병목 현상을 식별하는 데 도움이 돼.

🔍 효과적인 모니터링을 위한 팁

  1. 상관관계 ID 사용: 모든 로그와 트레이스에 요청 ID를 포함시켜 여러 서비스에 걸친 요청을 추적할 수 있게 해.
  2. 알림 설정: 중요한 메트릭에 대한 알림을 설정하여 문제를 조기에 감지해.
  3. 대시보드 구성: 주요 비즈니스 및 기술 메트릭을 보여주는 대시보드를 만들어 시스템 상태를 한눈에 파악할 수 있게 해.
  4. 로그 수준 관리: 개발 환경에서는 상세한 로그를, 프로덕션 환경에서는 중요한 로그만 수집하도록 설정해.
  5. 헬스 체크 엔드포인트: 각 서비스에 헬스 체크 엔드포인트를 구현하여 서비스 상태를 모니터링해.

효과적인 모니터링과 로깅은 마이크로서비스 아키텍처의 성공에 매우 중요해. 특히 재능넷과 같은 플랫폼에서는 사용자 경험에 직접적인 영향을 미치는 성능 문제를 빠르게 감지하고 해결할 수 있어야 해. 이러한 도구들을 활용하면 시스템의 안정성과 신뢰성을 크게 향상시킬 수 있어!

8. 실전 프로젝트: 쇼핑몰 마이크로서비스 구현 🛒

지금까지 배운 내용을 바탕으로 실제 쇼핑몰 시스템을 마이크로서비스 아키텍처로 구현해보자! 이 프로젝트는 여러 서비스로 구성되며, Python과 FastAPI를 사용할 거야.

8.1 시스템 아키텍처 설계

쇼핑몰 마이크로서비스 아키텍처 클라이언트 (웹/모바일) API 게이트웨이 사용자 서비스 (인증, 프로필) 상품 서비스 (카탈로그, 검색) 주문 서비스 (장바구니, 주문) 결제 서비스 (결제 처리) 메시지 브로커 사용자 DB (PostgreSQL) 상품 DB (MongoDB) 주문 DB (MySQL) 결제 DB (PostgreSQL)

이 아키텍처는 다음과 같은 구성 요소로 이루어져 있어:

  1. API 게이트웨이: 클라이언트 요청을 적절한 서비스로 라우팅하고 인증을 처리해.
  2. 사용자 서비스: 사용자 등록, 인증, 프로필 관리를 담당해.
  3. 상품 서비스: 상품 카탈로그, 재고 관리, 검색 기능을 제공해.
  4. 주문 서비스: 장바구니 관리, 주문 처리를 담당해.
  5. 결제 서비스: 결제 처리 및 결제 내역 관리를 담당해.
  6. 메시지 브로커: 서비스 간 비동기 통신을 지원해.
  7. 데이터베이스: 각 서비스는 자신만의 데이터베이스를 가져.

8.2 사용자 서비스 구현

사용자 서비스는 사용자 등록, 인증, 프로필 관리를 담당해. FastAPI와 PostgreSQL을 사용해 구현해보자:


# user_service/main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
import os

from database import SessionLocal, engine, Base
from models import User as UserModel
import schemas

# 데이터베이스 초기화
Base.metadata.create_all(bind=engine)

app = FastAPI(title="User Service")

# 보안 설정
SECRET_KEY = os.environ.get("SECRET_KEY", "your-secret-key")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# 의존성
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 유틸리티 함수
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    return db.query(UserModel).filter(UserModel.username == username).first()

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user or not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(db, username=username)
    if user is None:
        raise credentials_exception
    return user

# API 엔드포인트
@app.post("/users/", response_model=schemas.User)
def create_user(user: schemas.UserCreate, db: Session = Depends(get_db)):
    db_user = get_user(db, username=user.username)
    if db_user:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(user.password)
    db_user = UserModel(
        username=user.username,
        email=user.email,
        full_name=user.full_name,
        hashed_password=hashed_password
    )
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    user = authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me/", response_model=schemas.User)
async def read_users_me(current_user: UserModel = Depends(get_current_user)):
    return current_user

@app.put("/users/me/", response_model=schemas.User)
async def update_user(
    user_update: schemas.UserUpdate,
    current_user: UserModel = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    if user_update.email:
        current_user.email = user_update.email
    if user_update.full_name:
        current_user.full_name = user_update.full_name
    if user_update.password:
        current_user.hashed_password = get_password_hash(user_update.password)
    
    db.commit()
    db.refresh(current_user)
    return current_user

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
      

데이터베이스 모델 (models.py):


# user_service/models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from database import Base

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    full_name = Column(String)
    hashed_password = Column(String)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())
      

스키마 정의 (schemas.py):


# user_service/schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime

class UserBase(BaseModel):
    username: str
    email: EmailStr
    full_name: Optional[str] = None

class UserCreate(UserBase):
    password: str

class UserUpdate(BaseModel):
    email: Optional[EmailStr] = None
    full_name: Optional[str] = None
    password: Optional[str] = None

class User(UserBase):
    id: int
    is_active: bool
    created_at: datetime
    
    class Config:
        orm_mode = True

class Token(BaseModel):
    access_token: str
    token_type: str
      

데이터베이스 설정 (database.py):


# user_service/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os

DATABASE_URL = os.environ.get("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/userdb")

engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
      

8.3 주문 서비스 구현

주문 서비스는 장바구니 관리와 주문 처리를 담당해. 이 서비스는 상품 서비스와 결제 서비스와 통신해야 해. RabbitMQ를 사용한 비동기 통신을 구현해보자:


# order_service/main.py
from fastapi import FastAPI, Depends, HTTPException, Header
from sqlalchemy.orm import Session
from typing import List, Optional
import httpx
import json
import pika
import uuid
from datetime import datetime

from database import SessionLocal, engine, Base
from models import Order as OrderModel, OrderItem as OrderItemModel
import schemas

# 데이터베이스 초기화
Base.metadata.create_all(bind=engine)

app = FastAPI(title="Order Service")

# RabbitMQ 연결 설정
def get_rabbitmq_connection():
    credentials = pika.PlainCredentials('guest', 'guest')
    parameters = pika.ConnectionParameters('rabbitmq', 5672, '/', credentials)
    return pika.BlockingConnection(parameters)

# 의존성
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

async def get_current_user_id(authorization: Optional[str] = Header(None)):
    if authorization is None:
        raise HTTPException(status_code=401, detail="Authorization header missing")
    
    # 실제 구현에서는 API 게이트웨이에서 전달된 사용자 ID를 추출
    # 여기서는 간단히 헤더에서 사용자 ID를 추출한다고 가정
    try:
        token = authorization.split(" ")[1]
        # 실제로는 토큰 검증 로직이 필요
        user_id = 1  # 테스트용 사용자 ID
        return user_id
    except Exception:
        raise HTTPException(status_code=401, detail="Invalid token")

# 상품 서비스 API 호출
async def get_product(product_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://product-service:8001/products/{product_id}")
        if response.status_code != 200:
            raise HTTPException(status_code=response.status_code, detail="Product service error")
        return response.json()

# API 엔드포인트
@app.post("/cart/items/", response_model=schemas.CartItem)
async def add_to_cart(
    item: schemas.CartItemCreate,
    user_id: int = Depends(get_current_user_id),
    db: Session = Depends(get_db)
):
    # 상품 정보 가져오기
    product = await get_product(item.product_id)
    
    # 장바구니에 상품 추가 (실제로는 DB에 저장)
    cart_item = {
        "id": str(uuid.uuid4()),
        "user_id": user_id,
        "product_id": item.product_id,
        "product_name": product["name"],
        "quantity": item.quantity,
        "price": product["price"]
    }
    
    # 실제 구현에서는 DB에 저장
    
    return cart_item

@app.get("/cart/", response_model=List[schemas.CartItem])
async def get_cart(
    user_id: int = Depends(get_current_user_id)
):
    # 실제 구현에서는 DB에서 사용자의 장바구니 항목을 가져옴
    # 여기서는 테스트 데이터 반환
    return [
        {
            "id": "1",
            "user_id": user_id,
            "product_id": 1,
            "product_name": "Laptop",
            "quantity": 1,
            "price": 1299.99
        },
        {
            "id": "2",
            "user_id": user_id,
            "product_id": 2,
            "product_name": "Smartphone",
            "quantity": 2,
            "price": 699.99
        }
    ]

@app.post("/orders/", response_model=schemas.Order)
async def create_order(
    order_create: schemas.OrderCreate,
    user_id: int = Depends(get_current_user_id),
    db: Session = Depends(get_db)
):
    # 주문 생성
    db_order = OrderModel(
        user_id=user_id,
        status="pending",
        shipping_address=order_create.shipping_address,
        total_amount=0  # 초기값, 나중에 계산
    )
    db.add(db_order)
    db.commit()
    db.refresh(db_order)
    
    total_amount = 0
    
    # 주문 항목 추가
    for item in order_create.items:
        # 상품 정보 가져오기
        product = await get_product(item.product_id)
        
        # 주문 항목 생성
        db_order_item = OrderItemModel(
            order_id=db_order.id,
            product_id=item.product_id,
            quantity=item.quantity,
            price=product["price"]
        )
        db.add(db_order_item)
        
        # 총액 계산
        total_amount += product["price"] * item.quantity
    
    # 주문 총액 업데이트
    db_order.total_amount = total_amount
    db.commit()
    db.refresh(db_order)
    
    # 주문 생성 이벤트 발행
    connection = get_rabbitmq_connection()
    channel = connection.channel()
    
    # 큐 선언
    channel.queue_declare(queue='order_created')
    
    # 메시지 발행
    order_data = {
        "order_id": db_order.id,
        "user_id": user_id,
        "total_amount": total_amount,
        "status": "pending",
        "created_at": datetime.utcnow().isoformat()
    }
    
    channel.basic_publish(
        exchange='',
        routing_key='order_created',
        body=json.dumps(order_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 메시지 지속성 보장
        )
    )
    
    connection.close()
    
    return db_order

@app.get("/orders/{order_id}", response_model=schemas.OrderDetail)
async def get_order(
    order_id: int,
    user_id: int = Depends(get_current_user_id),
    db: Session = Depends(get_db)
):
    # 주문 정보 가져오기
    db_order = db.query(OrderModel).filter(OrderModel.id == order_id, OrderModel.user_id == user_id).first()
    if db_order is None:
        raise HTTPException(status_code=404, detail="Order not found")
    
    # 주문 항목 가져오기
    db_order_items = db.query(OrderItemModel).filter(OrderItemModel.order_id == order_id).all()
    
    # 응답 구성
    order_detail = {
        "id": db_order.id,
        "user_id": db_order.user_id,
        "status": db_order.status,
        "shipping_address": db_order.shipping_address,
        "total_amount": db_order.total_amount,
        "created_at": db_order.created_at,
        "items": []
    }
    
    for item in db_order_items:
        order_detail["items"].append({
            "id": item.id,
            "product_id": item.product_id,
            "quantity": item.quantity,
            "price": item.price
        })
    
    return order_detail

@app.get("/orders/", response_model=List[schemas.Order])
async def get_orders(
    user_id: int = Depends(get_current_user_id),
    db: Session = Depends(get_db)
):
    # 사용자의 모든 주문 가져오기
    db_orders = db.query(OrderModel).filter(OrderModel.user_id == user_id).all()
    return db_orders

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8002, reload=True)
      

이 코드는 주문 서비스의 핵심 기능을 구현해. 장바구니 관리, 주문 생성, 주문 조회 등의 기능을 제공하고, RabbitMQ를 통해 주문 생성 이벤트를 발행해.

이런 방식으로 각 서비스를 구현하고, API 게이트웨이를 통해 클라이언트 요청을 적절한 서비스로 라우팅하면 완전한 마이크로서비스 아키텍처의 쇼핑몰 시스템을 구축할 수 있어!

이 프로젝트는 재능넷과 같은 플랫폼에도 적용할 수 있어. 예를 들어, 사용자 서비스는 재능 제공자와 구매자 관리, 상품 서비스는 재능 서비스 목록 관리, 주문 서비스는 재능 구매 처리, 결제 서비스는 결제 처리를 담당할 수 있지. 이런 마이크로서비스 아키텍처는 플랫폼의 확장성과 유연성을 크게 향상시킬 수 있어!

9. 성능 최적화 전략 🚀

마이크로서비스 아키텍처에서는 성능 최적화가 중요한 과제야. 여러 서비스로 분산된 시스템에서 최적의 성능을 얻기 위한 전략을 알아보자!

9.1 캐싱 전략

캐싱은 자주 액세스하는 데이터를 빠르게 검색할 수 있도록 저장하는 기술이야. Python 마이크로서비스에서 Redis를 사용한 캐싱 구현 방법을 살펴보자:


# caching.py
import redis
import json
import functools
import hashlib
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
import time

# Redis 연결 설정
redis_client = redis.Redis(host='redis', port=6379, db=0)

# 캐시 데코레이터
def cache(expire=3600):
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            # 캐시 키 생성
            key_parts = [func.__name__]
            key_parts.extend([str(arg) for arg in args])
            key_parts.extend([f"{k}:{v}" for k, v in sorted(kwargs.items())])
            cache_key = hashlib.md5(":".join(key_parts).encode()).hexdigest()
            
            # 캐시에서 데이터 조회
            cached_data = redis_client.get(cache_key)
            if cached_data:
                return json.loads(cached_data)
            
            # 캐시에 없으면 함수 실행
            result = await func(*args, **kwargs)
            
            # 결과를 캐시에 저장
            redis_client.setex(
                cache_key,
                expire,
                json.dumps(result, default=str)  # datetime 등을 문자열로 변환
            )
            
            return result
        return wrapper
    return decorator

# FastAPI 애플리케이션에 적용
app = FastAPI(title="Product Service with Caching")

# 데이터베이스 의존성 (예시)
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 캐시를 사용하는 API 엔드포인트
@app.get("/products/{product_id}")
@cache(expire=300)  # 5분 캐시
async def get_product(product_id: int, db: Session = Depends(get_db)):
    # 데이터베이스 쿼리 시뮬레이션
    time.sleep(0.5)  # 실제로는 DB 쿼리
    
    # 실제 구현에서는 DB에서 제품 정보를 가져옴
    product = {
        "id": product_id,
        "name": f"Product {product_id}",
        "description": "A high-quality product",
        "price": 99.99,
        "stock": 100
    }
    
    return product

# 캐시 무효화 예시
@app.put("/products/{product_id}")
async def update_product(product_id: int, db: Session = Depends(get_db)):
    # 제품 업데이트 로직
    
    # 캐시 무효화
    cache_key = hashlib.md5(f"get_product:{product_id}").hexdigest()
    redis_client.delete(cache_key)
    
    return {"message": "Product updated and cache invalidated"}

# 캐시 통계 엔드포인트
@app.get("/cache/stats")
async def get_cache_stats():
    info = redis_client.info()
    return {
        "used_memory": info["used_memory_human"],
        "hits": info["keyspace_hits"],
        "misses": info["keyspace_misses"],
        "hit_rate": info["keyspace_hits"] / (info["keyspace_hits"] + info["keyspace_misses"] + 0.001) * 100
    }
      

이 코드는 Redis를 사용한 캐싱 전략을 구현해. 자주 요청되는 데이터를 캐시에 저장하여 데이터베이스 쿼리를 줄이고 응답 시간을 단축할 수 있어.

9.2 비동기 프로그래밍

Python 3.5+에서는 async/await 구문을 사용한 비동기 프로그래밍이 가능해. 이를 통해 I/O 바운드 작업의 성능을 크게 향상시킬 수 있어. FastAPI는 기본적으로 비동기 프로그래밍을 지원해:


# async_example.py
from fastapi import FastAPI
import asyncio
import httpx
from typing import List

app = FastAPI(title="Async API Example")

# 비동기 HTTP 클라이언트
async def fetch_data(url: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

# 순차적 API 호출 (느림)
@app.get("/sequential")
async def get_data_sequential():
    start_time = asyncio.get_event_loop().time()
    
    # 순차적으로 API 호출
    data1 = await fetch_data("https://jsonplaceholder.typicode.com/posts/1")
    data2 = await fetch_data("https://jsonplaceholder.typicode.com/posts/2")
    data3 = await fetch_data("https://jsonplaceholder.typicode.com/posts/3")
    
    end_time = asyncio.get_event_loop().time()
    execution_time = end_time - start_time
    
    return {
        "execution_time": execution_time,
        "data": [data1, data2, data3]
    }

# 병렬 API 호출 (빠름)
@app.get("/parallel")
async def get_data_parallel():
    start_time = asyncio.get_event_loop().time()
    
    # 병렬로 API 호출
    tasks = [
        fetch_data("https://jsonplaceholder.typicode.com/posts/1"),
        fetch_data("https://jsonplaceholder.typicode.com/posts/2"),
        fetch_data("https://jsonplaceholder.typicode.com/posts/3")
    ]
    
    results = await asyncio.gather(*tasks)
    
    end_time = asyncio.get_event_loop().time()
    execution_time = end_time - start_time
    
    return {
        "execution_time": execution_time,
        "data": results
    }

# 비동기 스트리밍 응답
@app.get("/stream")
async def stream_data():
    async def generate_data():
        for i in range(10):
            await asyncio.sleep(0.5)  # 비동기 대기
            yield f"data: {i}\n\n"
    
    return generate_data()
      

이 코드는 비동기 프로그래밍의 장점을 보여줘. 순차적 API 호출보다 병렬 API 호출이 훨씬 빠르고, 비동기 스트리밍을 통해 대용량 데이터를 효율적으로 처리할 수 있어.

9.3 데이터베이스 최적화

데이터베이스 쿼리 최적화는 마이크로서비스 성능에 큰 영향을 미쳐. SQLAlchemy를 사용한 데이터베이스 최적화 기법을 살펴보자:


# database_optimization.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy import create_engine, Column, Integer, String, Float, ForeignKey, func, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, joinedload, Session
from typing import List
import time

# 데이터베이스 설정
DATABASE_URL = "postgresql://username:password@postgres:5432/productdb"
engine = create_engine(DATABASE_URL, pool_size=20, max_overflow=0)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 모델 정의
class Category(Base):
    __tablename__ = "categories"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, unique=True, index=True)
    
    products = relationship("Product", back_populates="category")

class Product(Base):
    __tablename__ = "products"
    
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String)
    price = Column(Float)
    stock = Column(Integer)
    category_id = Column(Integer, ForeignKey("categories.id"))
    
    category = relationship("Category", back_populates="products")
    reviews = relationship("Review", back_populates="product")
    
    # 복합 인덱스 추가
    __table_args__ = (
        Index('idx_product_name_price', 'name', 'price'),
    )

class Review(Base):
    __tablename__ = "reviews"
    
    id = Column(Integer, primary_key=True, index=True)
    product_id = Column(Integer, ForeignKey("products.id"), index=True)
    rating = Column(Integer)
    comment = Column(String)
    
    product = relationship("Product", back_populates="reviews")

# 데이터베이스 초기화
Base.metadata.create_all(bind=engine)

app = FastAPI(title="Database Optimization Example")

# 의존성
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 최적화되지 않은 쿼리
@app.get("/products/unoptimized")
def get_products_unoptimized(db: Session = Depends(get_db)):
    start_time = time.time()
    
    # 모든 제품 가져오기
    products = db.query(Product).all()
    
    # N+1 문제: 각 제품에 대해 별도의 쿼리로 카테고리와 리뷰 가져오기
    result = []
    for product in products:
        # 이 접근은 각 제품마다 추가 쿼리를 발생시킴
        category = product.category
        reviews = product.reviews
        
        avg_rating = sum(review.rating for review in reviews) / len(reviews) if reviews else 0
        
        result.append({
            "id": product.id,
            "name": product.name,
            "price": product.price,
            "category": category.name,
            "avg_rating": avg_rating,
            "review_count": len(reviews)
        })
    
    execution_time = time.time() - start_time
    
    return {
        "execution_time": execution_time,
        "count": len(result),
        "products": result[:10]  # 처음 10개만 반환
    }

# 최적화된 쿼리
@app.get("/products/optimized")
def get_products_optimized(db: Session = Depends(get_db)):
    start_time = time.time()
    
    # 조인을 사용하여 한 번의 쿼리로 제품, 카테고리, 리뷰 가져오기
    products = db.query(Product).options(
        joinedload(Product.category),
        joinedload(Product.reviews)
    ).all()
    
    result = []
    for product in products:
        avg_rating = sum(review.rating for review in product.reviews) / len(product.reviews) if product.reviews else 0
        
        result.append({
            "id": product.id,
            "name": product.name,
            "price": product.price,
            "category": product.category.name,
            "avg_rating": avg_rating,
            "review_count": len(product.reviews)
        })
    
    execution_time = time.time() - start_time
    
    return {
        "execution_time": execution_time,
        "count": len(result),
        "products": result[:10]  # 처음 10개만 반환
    }

# 더 최적화된 쿼리 (집계 함수 사용)
@app.get("/products/highly-optimized")
def get_products_highly_optimized(db: Session = Depends(get_db)):
    start_time = time.time()
    
    # 서브쿼리를 사용하여 리뷰 집계
    review_stats = db.query(
        Review.product_id,
        func.avg(Review.rating).label("avg_rating"),
        func.count(Review.id).label("review_count")
    ).group_by(Review.product_id).subquery()
    
    # 조인을 사용하여 한 번의 쿼리로 모든 데이터 가져오기
    products = db.query(
        Product,
        Category.name.label("category_name"),
        review_stats.c.avg_rating,
        review_stats.c.review_count
    ).join(
        Category, Product.category_id == Category.id
    ).outerjoin(
        review_stats, Product.id == review_stats.c.product_id
    ).all()
    
    result = []
    for product, category_name, avg_rating, review_count in products:
        result.append({
            "id": product.id,
            "name": product.name,
            "price": product.price,
            "category": category_name,
            "avg_rating": float(avg_rating) if avg_rating else 0,
            "review_count": review_count or 0
        })
    
    execution_time = time.time() - start_time
    
    return {
        "execution_time": execution_time,
        "count": len(result),
        "products": result[:10]  # 처음 10개만 반환
    }

# 페이지네이션 적용
@app.get("/products/paginated")
def get_products_paginated(page: int = 1, page_size: int = 20, db: Session = Depends(get_db)):
    start_time = time.time()
    
    # 오프셋 계산
    offset = (page - 1) * page_size
    
    # 전체 개수 가져오기
    total_count = db.query(func.count(Product.id)).scalar()
    
    # 페이지네이션 적용하여 쿼리
    products = db.query(
        Product,
        Category.name.label("category_name")
    ).join(
        Category, Product.category_id == Category.id
    ).offset(offset).limit(page_size).all()
    
    result = []
    for product, category_name in products:
        result.append({
            "id": product.id,
            "name": product.name,
            "price": product.price,
            "category": category_name
        })
    
    execution_time = time.time() - start_time
    
    return {
        "execution_time": execution_time,
        "page": page,
        "page_size": page_size,
        "total_count": total_count,
        "total_pages": (total_count + page_size - 1) // page_size,
        "products": result
    }
      

이 코드는 데이터베이스 쿼리 최적화의 여러 기법을 보여줘:

  1. N+1 문제 해결: joinedload를 사용하여 관련 데이터를 한 번의 쿼리로 가져와.
  2. 집계 함수 사용: 서브쿼리와 집계 함수를 사용하여 데이터베이스 수준에서 계산을 수행해.
  3. 인덱스 활용: 적절한 인덱스를 생성하여 쿼리 성능을 향상시켜.
  4. 페이지네이션: 대량의 데이터를 처리할 때 페이지네이션을 적용하여 메모리 사용량을 줄이고 응답 시간을 단축해.

9.4 부하 테스트 및 프로파일링

성능 최적화를 위해서는 부하 테스트와 프로파일링이 필수적이야. Python 마이크로서비스의 성능을 테스트하고 분석하는 방법을 알아보자:

🔍 성능 분석 도구

  1. Locust: Python으로 작성된 부하 테스트 도구로, 실제 사용자 행동을 시뮬레이션할 수 있어.
  2. cProfile: Python 내장 프로파일러로, 함수 호출 시간과 빈도를 측정할 수 있어.
  3. py-spy: Python 프로세스의 실시간 프로파일링을 위한 샘플링 프로파일러야.
  4. Prometheus + Grafana: 실시간 메트릭 모니터링 및 시각화를 위한 도구야.

Locust를 사용한 부하 테스트 스크립트 예시:


# locustfile.py
from locust import HttpUser, task, between

class ShopUser(HttpUser):
    wait_time = between(1, 5)
    
    @task(3)
    def view_products(self):
        self.client.get("/products")
    
    @task(1)
    def view_product_details(self):
        product_id = 1  # 실제로는 랜덤 ID 사용
        self.client.get(f"/products/{product_id}")
    
    @task(1)
    def add_to_cart(self):
        self.client.post("/cart/items/", json={
            "product_id": 1,
            "quantity": 1
        })
    
    @task(1)
    def checkout(self):
        self.client.post("/orders/", json={
            "shipping_address": "123 Main St",
            "items": [
                {"product_id": 1, "quantity": 1},
                {"product_id": 2, "quantity": 2}
            ]
        })
      

cProfile을 사용한 프로파일링 예시:


# profile_api.py
import cProfile
import pstats
import io
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

app = FastAPI()

class ProfilingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # 프로파일러 시작
        pr = cProfile.Profile()
        pr.enable()
        
        # 요청 처리
        response = await call_next(request)
        
        # 프로파일러 중지
        pr.disable()
        
        # 프로파일링 결과 분석
        s = io.StringIO()
        ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
        ps.print_stats(20)  # 상위 20개 결과만 출력
        
        # 로그에 프로파일링 결과 출력
        print(f"Profiling results for {request.method} {request.url.path}:")
        print(s.getvalue())
        
        return response

# 미들웨어 등록
app.add_middleware(ProfilingMiddleware)
      

이러한 성능 최적화 전략을 적용하면 마이크로서비스의 응답 시간을 단축하고, 처리량을 증가시키며, 리소스 사용을 효율화할 수 있어. 재능넷과 같은 플랫폼에서는 이런 최적화가 사용자 경험과 시스템 비용에 직접적인 영향을 미치기 때문에 매우 중요해!

결론 📝

이 글에서는 Python을 사용한 마이크로서비스 아키텍처의 설계와 구현에 대해 깊이 있게 살펴봤어. 마이크로서비스의 기본 개념부터 실전 구현, 성능 최적화, 그리고 미래 트렌드까지 다양한 주제를 다뤘지.

🔑 핵심 요약

  1. 마이크로서비스 아키텍처는 복잡한 애플리케이션을 독립적으로 개발, 배포, 확장할 수 있는 작은 서비스로 분리하는 접근 방식이야.
  2. Python은 간결한 문법, 풍부한 라이브러리, 비동기 지원 등으로 마이크로서비스 개발에 매우 적합한 언어야.
  3. API 게이트웨이, 서비스 디스커버리, 서킷 브레이커 등의 핵심 구성요소를 통해 안정적인 마이크로서비스 시스템을 구축할 수 있어.
  4. 동기식 통신(REST, gRPC)과 비동기식 통신(메시지 큐)을 적절히 조합하여 서비스 간 효율적인 통신을 구현할 수 있어.
  5. 데이터베이스 per 서비스, CQRS, 이벤트 소싱 등의 패턴을 통해 데이터 일관성과 성능을 관리할 수 있어.
  6. Docker와 Kubernetes를 활용하여 마이크로서비스를 효율적으로 배포하고 운영할 수 있어.
  7. 모니터링, 로깅, 트레이싱을 통해 분산 시스템의 문제를 신속하게 감지하고 해결할 수 있어.
  8. 캐싱, 비동기 프로그래밍, 데이터베이스 최적화 등의 전략을 통해 성능을 향상시킬 수 있어.
  9. 서버리스, 서비스 메시, AI 기반 운영 등의 트렌드가 마이크로서비스의 미래를 형성하고 있어.

마이크로서비스 아키텍처는 모든 상황에 적합한 만능 솔루션이 아니야. 작은 프로젝트나 팀에서는 모놀리식 아키텍처가 여전히 더 효율적일 수 있어. 하지만 시스템이 복잡해지고 팀이 커질수록 마이크로서비스의 이점이 더욱 분명해져.

Python은 마이크로서비스 개발에 탁월한 선택이야. 특히 FastAPI, asyncio, SQLAlchemy 등의 현대적인 라이브러리와 프레임워크를 활용하면 고성능, 확장 가능한 마이크로서비스를 효율적으로 구현할 수 있어.

마이크로서비스 아키텍처로의 전환은 하룻밤에 이루어지는 것이 아니야. 점진적인 접근 방식을 취하고, 가장 가치가 높은 부분부터 분리하는 것이 좋아. 또한 팀의 역량과 조직 문화도 중요한 고려 사항이야.

마지막으로, 마이크로서비스 아키텍처는 계속 발전하고 있어. 새로운 도구와 패턴이 등장하고 있으니, 지속적인 학습과 실험을 통해 최신 트렌드를 따라가는 것이 중요해.

이 글이 Python으로 마이크로서비스를 구현하는 여정에 도움이 되었기를 바라! 질문이나 의견이 있다면 언제든지 댓글로 남겨줘. 함께 배우고 성장해 나가자! 🚀

참고 자료 📚

  1. Sam Newman, "Building Microservices: Designing Fine-Grained Systems", O'Reilly Media, 2021
  2. Chris Richardson, "Microservices Patterns", Manning Publications, 2018
  3. FastAPI 공식 문서: https://fastapi.tiangolo.com/
  4. Kubernetes 공식 문서: https://kubernetes.io/docs/home/
  5. Martin Fowler, "CQRS": https://martinfowler.com/bliki/CQRS.html
  6. Martin Fowler, "Event Sourcing": https://martinfowler.com/eaaDev/EventSourcing.html
  7. Python asyncio 문서: https://docs.python.org/3/library/asyncio.html
  8. gRPC 공식 문서: https://grpc.io/docs/languages/python/
  9. RabbitMQ 튜토리얼: https://www.rabbitmq.com/getstarted.html
  10. OpenTelemetry 문서: https://opentelemetry.io/docs/

1. 마이크로서비스 아키텍처 이해하기 🧩

마이크로서비스 아키텍처는 하나의 큰 애플리케이션을 여러 개의 작은 서비스로 나누는 방식이야. 각 서비스는 독립적으로 개발, 배포, 확장이 가능하지. 이런 접근 방식이 왜 중요해졌는지 알아볼까?

1.1 모놀리식 vs 마이크로서비스

모놀리식 UI 레이어 비즈니스 로직 데이터 액세스 단일 데이터베이스 마이크로서비스 사용자 서비스 상품 서비스 주문 서비스 결제 서비스 API 게이트웨이

모놀리식 아키텍처는 전통적인 방식으로, 모든 기능이 하나의 코드베이스에 통합되어 있어. 작은 프로젝트에서는 간단하고 효율적일 수 있지만, 프로젝트가 커질수록 여러 문제가 생겨:

  1. 코드베이스가 복잡해져서 새로운 개발자가 이해하기 어려워짐
  2. 작은 변경사항도 전체 애플리케이션을 다시 배포해야 함
  3. 특정 부분만 확장하기 어려움
  4. 기술 스택을 변경하거나 업데이트하기 어려움

반면, 마이크로서비스 아키텍처는 이런 문제를 해결하기 위해 등장했어:

  1. 각 서비스가 독립적으로 개발, 배포, 확장 가능
  2. 서비스별로 다른 기술 스택 사용 가능
  3. 장애 격리(한 서비스의 문제가 전체 시스템에 영향을 미치지 않음)
  4. 팀별로 서비스를 담당하여 개발 속도 향상

1.2 마이크로서비스의 핵심 원칙

마이크로서비스를 제대로 구현하려면 몇 가지 핵심 원칙을 이해해야 해:

🔄 단일 책임 원칙: 각 서비스는 하나의 비즈니스 기능에 집중해야 해. 예를 들어, 사용자 관리, 상품 관리, 주문 처리 등으로 분리할 수 있지.

🔒 독립적인 데이터 관리: 각 서비스는 자신만의 데이터베이스를 가지고, 다른 서비스의 데이터베이스에 직접 접근하지 않아야 해.

🔌 API 기반 통신: 서비스 간 통신은 잘 정의된 API를 통해 이루어져야 해. REST, gRPC, 메시지 큐 등을 활용할 수 있어.

🚀 자동화된 배포: CI/CD 파이프라인을 통해 각 서비스를 독립적으로 빌드, 테스트, 배포할 수 있어야 해.

📊 분산 모니터링: 여러 서비스로 구성된 시스템을 효과적으로 모니터링하고 문제를 추적할 수 있어야 해.

1.3 Python이 마이크로서비스에 적합한 이유

Python은 마이크로서비스 개발에 여러 장점을 제공해:

1. 빠른 개발 속도: Python의 간결한 문법과 풍부한 라이브러리 덕분에 빠르게 프로토타입을 만들고 개발할 수 있어.

2. 강력한 웹 프레임워크: Flask, FastAPI, Django 등 다양한 웹 프레임워크를 통해 API 서비스를 쉽게 구현할 수 있어.

3. 비동기 지원: Python 3.5+ 버전부터 asyncio를 통한 비동기 프로그래밍이 가능해져서 높은 동시성을 처리할 수 있어.

4. 데이터 처리 능력: 데이터 과학, 머신러닝 등의 기능을 마이크로서비스에 쉽게 통합할 수 있어.

5. 커뮤니티 지원: 활발한 커뮤니티와 풍부한 문서, 오픈소스 라이브러리 덕분에 문제 해결이 쉬워.

2025년 현재, Python은 마이크로서비스 개발에서 가장 인기 있는 언어 중 하나로 자리 잡았어. 특히 FastAPI와 같은 최신 프레임워크는 성능과 개발 편의성을 모두 제공하면서 큰 인기를 끌고 있지.

2. Python으로 마이크로서비스 시작하기 🚀

이제 Python으로 마이크로서비스를 구현하는 방법을 알아볼게. 먼저 필요한 도구와 프레임워크부터 살펴보자!

2.1 필요한 도구와 프레임워크

🛠️ 웹 프레임워크:

  1. FastAPI: 2025년 현재 가장 인기 있는 고성능 비동기 API 프레임워크. 자동 문서화, 타입 힌팅 지원이 강점이야.
  2. Flask: 가볍고 유연한 프레임워크로, 작은 서비스에 적합해.
  3. Django REST Framework: 풀스택 Django 기반으로, 복잡한 API를 빠르게 개발할 수 있어.

🗄️ 데이터베이스:

  1. PostgreSQL: 강력한 관계형 데이터베이스로, JSON 지원과 확장성이 뛰어나.
  2. MongoDB: 문서 기반 NoSQL 데이터베이스로, 스키마 없이 유연하게 데이터를 저장할 수 있어.
  3. Redis: 인메모리 데이터 스토어로, 캐싱과 메시지 브로커로 활용 가능해.

📨 메시지 큐:

  1. RabbitMQ: 강력한 메시지 브로커로, 서비스 간 비동기 통신에 적합해.
  2. Kafka: 고성능 분산 이벤트 스트리밍 플랫폼으로, 대규모 데이터 처리에 적합해.
  3. Redis Streams: Redis의 스트림 기능을 활용한 경량 메시지 큐.

🔍 모니터링 도구:

  1. Prometheus: 메트릭 수집 및 알림 시스템.
  2. Grafana: 데이터 시각화 및 모니터링 대시보드.
  3. Jaeger/Zipkin: 분산 트레이싱 시스템.

🚢 컨테이너화 및 오케스트레이션:

  1. Docker: 애플리케이션을 컨테이너화하여 일관된 환경에서 실행할 수 있게 해줘.
  2. Kubernetes: 컨테이너 오케스트레이션 플랫폼으로, 서비스 배포, 확장, 관리를 자동화해.

2.2 첫 번째 마이크로서비스 만들기: FastAPI 활용

FastAPI는 2025년 현재 Python 마이크로서비스 개발에 가장 인기 있는 프레임워크야. 높은 성능, 자동 문서화, 타입 힌팅 지원 등 많은 장점을 제공해. 간단한 사용자 서비스를 만들어볼게:

먼저 필요한 패키지를 설치해야 해:

pip install fastapi uvicorn sqlalchemy pydantic

이제 간단한 사용자 서비스를 만들어보자:


# main.py
from fastapi import FastAPI, HTTPException, Depends
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from pydantic import BaseModel
import uvicorn

# 데이터베이스 설정
SQLALCHEMY_DATABASE_URL = "sqlite:///./users.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# 데이터베이스 모델
class UserDB(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, index=True)
    email = Column(String, unique=True, index=True)
    full_name = Column(String)

# Pydantic 모델
class UserCreate(BaseModel):
    username: str
    email: str
    full_name: str

class User(BaseModel):
    id: int
    username: str
    email: str
    full_name: str
    
    class Config:
        orm_mode = True

# 데이터베이스 테이블 생성
Base.metadata.create_all(bind=engine)

# FastAPI 앱 생성
app = FastAPI(title="User Microservice")

# 의존성
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# API 엔드포인트
@app.post("/users/", response_model=User)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = UserDB(username=user.username, email=user.email, full_name=user.full_name)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.get("/users/{user_id}", response_model=User)
def read_user(user_id: int, db: Session = Depends(get_db)):
    db_user = db.query(UserDB).filter(UserDB.id == user_id).first()
    if db_user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return db_user

@app.get("/users/", response_model=list[User])
def read_users(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    users = db.query(UserDB).offset(skip).limit(limit).all()
    return users

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
      

이 코드를 실행하면 http://localhost:8000/docs에서 자동 생성된 API 문서를 확인할 수 있어!

2.3 Docker로 마이크로서비스 컨테이너화하기

마이크로서비스는 컨테이너화하여 배포하는 것이 일반적이야. Docker를 사용해서 위에서 만든 서비스를 컨테이너화해보자:

Dockerfile 생성:


# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
      

requirements.txt 파일:


fastapi==0.110.0
uvicorn==0.27.1
sqlalchemy==2.0.27
pydantic==2.5.3
      

Docker 이미지 빌드 및 실행:


# 이미지 빌드
docker build -t user-service .

# 컨테이너 실행
docker run -d -p 8000:8000 --name user-service user-service
      

이제 첫 번째 마이크로서비스가 Docker 컨테이너 안에서 실행되고 있어! 이런 방식으로 여러 마이크로서비스를 독립적으로 개발하고 배포할 수 있지.

재능넷 같은 플랫폼에서도 이런 마이크로서비스 아키텍처를 활용하면 서비스별로 독립적인 개발과 확장이 가능해져서 플랫폼의 안정성과 확장성을 크게 높일 수 있어. 특히 사용자 서비스, 결제 서비스, 콘텐츠 서비스 등을 분리하면 각 기능을 독립적으로 관리하기 훨씬 쉬워지지!

3. 핵심 구성요소 구현하기 🧱

마이크로서비스 아키텍처에서는 여러 핵심 구성요소가 필요해. 이제 이러한 구성요소들을 Python으로 어떻게 구현하는지 알아보자!

3.1 API 게이트웨이 구현

API 게이트웨이는 클라이언트와 마이크로서비스 사이의 중간 계층으로, 다음과 같은 역할을 해:

  1. 요청 라우팅: 클라이언트 요청을 적절한 마이크로서비스로 전달
  2. 인증 및 권한 부여: 모든 요청에 대한 중앙 집중식 인증 처리
  3. 요청/응답 변환: 클라이언트와 서비스 간의 데이터 형식 변환
  4. 로드 밸런싱: 여러 서비스 인스턴스 간의 부하 분산
  5. 캐싱: 자주 요청되는 데이터 캐싱
  6. 속도 제한: API 호출 횟수 제한

Python으로 API 게이트웨이를 구현하는 방법을 알아보자. FastAPI를 사용한 간단한 게이트웨이 예제야:


# api_gateway.py
from fastapi import FastAPI, Depends, HTTPException, Header
from fastapi.responses import JSONResponse
import httpx
import jwt
from typing import Optional

app = FastAPI(title="API Gateway")

# 서비스 URL 설정
SERVICE_URLS = {
    "user": "http://user-service:8000",
    "product": "http://product-service:8001",
    "order": "http://order-service:8002",
    "payment": "http://payment-service:8003"
}

# JWT 시크릿 키
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"

# 인증 함수
async def verify_token(authorization: Optional[str] = Header(None)):
    if authorization is None:
        raise HTTPException(status_code=401, detail="Authorization header missing")
    
    try:
        token = authorization.split(" ")[1]
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

# 사용자 서비스 라우팅
@app.get("/api/users/{user_id}")
async def get_user(user_id: int, token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{SERVICE_URLS['user']}/users/{user_id}")
        return JSONResponse(content=response.json(), status_code=response.status_code)

@app.post("/api/users/")
async def create_user(token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICE_URLS['user']}/users/")
        return JSONResponse(content=response.json(), status_code=response.status_code)

# 상품 서비스 라우팅
@app.get("/api/products/{product_id}")
async def get_product(product_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"{SERVICE_URLS['product']}/products/{product_id}")
        return JSONResponse(content=response.json(), status_code=response.status_code)

# 주문 서비스 라우팅
@app.post("/api/orders/")
async def create_order(token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICE_URLS['order']}/orders/")
        return JSONResponse(content=response.json(), status_code=response.status_code)

# 결제 서비스 라우팅
@app.post("/api/payments/")
async def process_payment(token_data: dict = Depends(verify_token)):
    async with httpx.AsyncClient() as client:
        response = await client.post(f"{SERVICE_URLS['payment']}/payments/")
        return JSONResponse(content=response.json(), status_code=response.status_code)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("api_gateway:app", host="0.0.0.0", port=8080, reload=True)
      

이 게이트웨이는 클라이언트 요청을 받아 적절한 마이크로서비스로 전달하고, JWT 토큰을 사용해 인증을 처리해. 실제 프로덕션 환경에서는 Kong, Traefik, AWS API Gateway 같은 전문 API 게이트웨이 솔루션을 사용하는 것도 좋은 선택이야.

3.2 서비스 디스커버리 구현

서비스 디스커버리는 마이크로서비스 환경에서 서비스의 위치(IP 주소, 포트)를 동적으로 찾는 메커니즘이야. 서비스가 자동으로 확장되거나 재배포될 때 특히 중요해.

Python에서는 Consul, etcd, ZooKeeper 같은 서비스 디스커버리 도구와 통합할 수 있어. 여기서는 Consul과 통합하는 간단한 예제를 보여줄게:


# service_discovery.py
import consul
import socket
import uuid
import time

class ServiceRegistry:
    def __init__(self, service_name, service_port, consul_host="localhost", consul_port=8500):
        self.service_name = service_name
        self.service_port = service_port
        self.consul_client = consul.Consul(host=consul_host, port=consul_port)
        self.service_id = f"{service_name}-{uuid.uuid4()}"
        self.hostname = socket.gethostname()
        self.ip_address = socket.gethostbyname(self.hostname)
    
    def register(self):
        """서비스를 Consul에 등록"""
        self.consul_client.agent.service.register(
            name=self.service_name,
            service_id=self.service_id,
            address=self.ip_address,
            port=self.service_port,
            check=consul.Check.http(f"http://{self.ip_address}:{self.service_port}/health", "10s")
        )
        print(f"Service {self.service_name} registered with ID {self.service_id}")
    
    def deregister(self):
        """서비스를 Consul에서 제거"""
        self.consul_client.agent.service.deregister(self.service_id)
        print(f"Service {self.service_id} deregistered")
    
    @staticmethod
    def discover_service(service_name, consul_host="localhost", consul_port=8500):
        """서비스 인스턴스 찾기"""
        consul_client = consul.Consul(host=consul_host, port=consul_port)
        services = consul_client.catalog.service(service_name)[1]
        if not services:
            return None
        
        # 간단한 로드 밸런싱 (라운드 로빈)
        service = services[int(time.time()) % len(services)]
        return {
            "address": service["ServiceAddress"],
            "port": service["ServicePort"]
        }

# 사용 예시
if __name__ == "__main__":
    # 서비스 등록
    registry = ServiceRegistry("user-service", 8000)
    registry.register()
    
    try:
        # 서비스 실행 중...
        while True:
            time.sleep(10)
            print("Service running...")
    except KeyboardInterrupt:
        # 서비스 종료 시 등록 해제
        registry.deregister()
    
    # 다른 서비스에서 사용자 서비스 찾기
    user_service = ServiceRegistry.discover_service("user-service")
    if user_service:
        print(f"Found user service at {user_service['address']}:{user_service['port']}")
    else:
        print("User service not found")
      

이 코드를 사용하면 각 마이크로서비스가 시작될 때 자동으로 Consul에 등록되고, 다른 서비스에서는 서비스 이름만으로 해당 서비스의 위치를 찾을 수 있어.

3.3 서킷 브레이커 패턴 구현

서킷 브레이커(Circuit Breaker) 패턴은 마이크로서비스 간의 호출이 실패할 때 연쇄적인 장애를 방지하는 패턴이야. 특정 서비스가 응답하지 않을 때 계속해서 요청을 보내는 대신, 일시적으로 호출을 중단하고 기본값이나 캐시된 응답을 반환해.

Python에서는 pybreaker 라이브러리를 사용해 서킷 브레이커를 구현할 수 있어:


# circuit_breaker.py
import pybreaker
import httpx
import asyncio
from fastapi import FastAPI, HTTPException

app = FastAPI(title="Circuit Breaker Example")

# 서킷 브레이커 설정
# - max_failures: 서킷이 열리기 전 허용되는 최대 실패 횟수
# - reset_timeout: 서킷이 half-open 상태로 전환되기 전 대기 시간(초)
user_service_breaker = pybreaker.CircuitBreaker(
    fail_max=3,
    reset_timeout=30,
    exclude=[HTTPException]
)

# 서킷 브레이커 이벤트 리스너
@user_service_breaker.on_open
def on_open():
    print("Circuit breaker opened - user service is down!")

@user_service_breaker.on_close
def on_close():
    print("Circuit breaker closed - user service is operational again!")

@user_service_breaker.on_half_open
def on_half_open():
    print("Circuit breaker half-open - trying to test if user service is back")

# 서킷 브레이커로 보호된 함수
@user_service_breaker
async def get_user_from_service(user_id: int):
    async with httpx.AsyncClient() as client:
        response = await client.get(f"http://user-service:8000/users/{user_id}", timeout=2.0)
        if response.status_code != 200:
            raise HTTPException(status_code=response.status_code, detail=response.text)
        return response.json()

# 폴백(fallback) 메커니즘
async def get_user_fallback(user_id: int):
    # 캐시에서 데이터 가져오기 또는 기본값 반환
    return {
        "id": user_id,
        "username": "unknown",
        "email": "unknown@example.com",
        "full_name": "Unknown User"
    }

# API 엔드포인트
@app.get("/users/{user_id}")
async def get_user(user_id: int):
    try:
        # 서킷 브레이커로 보호된 함수 호출
        return await get_user_from_service(user_id)
    except pybreaker.CircuitBreakerError:
        # 서킷이 열려 있을 때 폴백 메커니즘 사용
        print(f"Circuit is OPEN! Using fallback for user {user_id}")
        return await get_user_fallback(user_id)
    except Exception as e:
        # 다른 예외 처리
        print(f"Error getting user {user_id}: {str(e)}")
        return await get_user_fallback(user_id)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("circuit_breaker:app", host="0.0.0.0", port=8081, reload=True)
      

이 코드는 사용자 서비스에 대한 호출이 연속으로 3번 실패하면 서킷을 열고, 30초 동안 추가 호출을 차단해. 그 동안에는 폴백 메커니즘을 통해 기본 사용자 정보를 반환하지. 이렇게 하면 한 서비스의 장애가 전체 시스템으로 확산되는 것을 방지할 수 있어.

이러한 패턴들은 마이크로서비스 아키텍처의 회복력(resilience)을 높이는 데 매우 중요해. 특히 여러 서비스가 상호 의존하는 복잡한 시스템에서는 이런 패턴들이 필수적이지!