← 포스트 목록으로

SSE 스트리밍으로 AI 채팅 구현하기 - Provider Adapter 패턴

📖 4분 소요
SSEAITypeScriptReactDesign Pattern

SSE 스트리밍으로 AI 채팅 구현하기

AI 채팅 서비스를 구축할 때 가장 중요한 사용자 경험 요소 중 하나는 실시간 응답이다. 사용자가 질문을 보낸 후 전체 응답이 완성될 때까지 기다리는 것보다, 글자가 타이핑되듯이 실시간으로 나타나는 것이 훨씬 자연스럽다.

이 글에서는 Server-Sent Events(SSE)를 활용하여 AI 채팅의 스트리밍 응답을 구현하는 방법과, 여러 AI 제공자를 통합할 때 유용한 Provider Adapter 패턴을 소개한다.

SSE vs WebSocket

실시간 통신을 구현할 때 WebSocket과 SSE 중 선택해야 한다.

특성SSEWebSocket
통신 방향단방향 (서버 → 클라이언트)양방향
프로토콜HTTPws:// 별도 프로토콜
재연결자동 지원직접 구현
브라우저 지원EventSource APIWebSocket API

AI 채팅의 경우 사용자 → 서버는 일반 HTTP 요청, 서버 → 사용자는 스트리밍 응답이므로 SSE가 더 적합하다. WebSocket의 양방향 통신은 이 경우 오버스펙이다.

기본 SSE 구현

서버 사이드 (Next.js API Route)

// app/api/chat/route.ts
import { NextRequest } from 'next/server';
import OpenAI from 'openai';

export async function POST(req: NextRequest) {
  const { messages } = await req.json();

  const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

  const stream = await openai.chat.completions.create({
    model: 'gpt-4',
    messages,
    stream: true,
  });

  // ReadableStream으로 SSE 응답 생성
  const encoder = new TextEncoder();
  const readable = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        const text = chunk.choices[0]?.delta?.content || '';
        controller.enqueue(encoder.encode(`data: ${JSON.stringify({ text })}\n\n`));
      }
      controller.enqueue(encoder.encode('data: [DONE]\n\n'));
      controller.close();
    },
  });

  return new Response(readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
    },
  });
}

클라이언트 사이드

// hooks/useChat.ts
import { useState, useCallback } from 'react';

export function useChat() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (content: string) => {
    setIsStreaming(true);

    // 사용자 메시지 추가
    const userMessage = { role: 'user', content };
    setMessages(prev => [...prev, userMessage]);

    // AI 응답 메시지 초기화
    const assistantMessage = { role: 'assistant', content: '' };
    setMessages(prev => [...prev, assistantMessage]);

    const response = await fetch('/api/chat', {
      method: 'POST',
      body: JSON.stringify({ messages: [...messages, userMessage] }),
    });

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();

    while (reader) {
      const { done, value } = await reader.read();
      if (done) break;

      const chunk = decoder.decode(value);
      const lines = chunk.split('\n').filter(line => line.startsWith('data:'));

      for (const line of lines) {
        const data = line.replace('data: ', '');
        if (data === '[DONE]') break;

        const { text } = JSON.parse(data);
        // 마지막 메시지(AI 응답)에 텍스트 추가
        setMessages(prev => {
          const updated = [...prev];
          updated[updated.length - 1].content += text;
          return updated;
        });
      }
    }

    setIsStreaming(false);
  }, [messages]);

  return { messages, sendMessage, isStreaming };
}

Provider Adapter 패턴

여러 AI 제공자(OpenAI, Anthropic, Gemini 등)를 지원하려면 각 제공자의 API 차이를 추상화해야 한다. 여기서 Adapter 패턴이 빛을 발한다.

통일된 인터페이스 정의

// types/stream.ts
export interface StreamAdapter {
  provider: string;
  connect(params: StreamParams): AsyncGenerator<StreamChunk>;
  abort(): void;
}

export interface StreamParams {
  model: string;
  messages: Message[];
  systemPrompt?: string;
}

export interface StreamChunk {
  text: string;
  done: boolean;
}

Provider별 Adapter 구현

// adapters/openai.ts
export class OpenAIAdapter implements StreamAdapter {
  provider = 'openai';
  private controller: AbortController | null = null;

  async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
    this.controller = new AbortController();

    const response = await fetch('/api/providers/openai', {
      method: 'POST',
      body: JSON.stringify(params),
      signal: this.controller.signal,
    });

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();

    while (reader) {
      const { done, value } = await reader.read();
      if (done) {
        yield { text: '', done: true };
        break;
      }

      const chunk = decoder.decode(value);
      // OpenAI 형식 파싱
      const text = this.parseOpenAIChunk(chunk);
      yield { text, done: false };
    }
  }

  abort() {
    this.controller?.abort();
  }

  private parseOpenAIChunk(chunk: string): string {
    // OpenAI SSE 형식: data: {"choices":[{"delta":{"content":"텍스트"}}]}
    const lines = chunk.split('\n').filter(l => l.startsWith('data:'));
    return lines.map(line => {
      const json = JSON.parse(line.replace('data: ', ''));
      return json.choices?.[0]?.delta?.content || '';
    }).join('');
  }
}
// adapters/anthropic.ts
export class AnthropicAdapter implements StreamAdapter {
  provider = 'anthropic';
  private controller: AbortController | null = null;

  async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
    this.controller = new AbortController();

    const response = await fetch('/api/providers/anthropic', {
      method: 'POST',
      body: JSON.stringify(params),
      signal: this.controller.signal,
    });

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();

    while (reader) {
      const { done, value } = await reader.read();
      if (done) {
        yield { text: '', done: true };
        break;
      }

      const chunk = decoder.decode(value);
      // Anthropic 형식 파싱 (OpenAI와 다름)
      const text = this.parseAnthropicChunk(chunk);
      yield { text, done: false };
    }
  }

  abort() {
    this.controller?.abort();
  }

  private parseAnthropicChunk(chunk: string): string {
    // Anthropic SSE 형식: event: content_block_delta\ndata: {"delta":{"text":"텍스트"}}
    const dataLines = chunk.split('\n').filter(l => l.startsWith('data:'));
    return dataLines.map(line => {
      const json = JSON.parse(line.replace('data: ', ''));
      return json.delta?.text || '';
    }).join('');
  }
}

Adapter Factory

// adapters/factory.ts
import { OpenAIAdapter } from './openai';
import { AnthropicAdapter } from './anthropic';
import { GeminiAdapter } from './gemini';

const adapters = {
  openai: OpenAIAdapter,
  anthropic: AnthropicAdapter,
  gemini: GeminiAdapter,
  // 새 Provider 추가 시 여기에 등록
};

export function createAdapter(provider: string): StreamAdapter {
  const AdapterClass = adapters[provider];
  if (!AdapterClass) {
    throw new Error(`Unknown provider: ${provider}`);
  }
  return new AdapterClass();
}

사용 예시

// hooks/useStreamChat.ts
import { useState, useCallback, useRef } from 'react';
import { createAdapter } from '@/adapters/factory';

export function useStreamChat(provider: string) {
  const [content, setContent] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);
  const adapterRef = useRef<StreamAdapter | null>(null);

  const stream = useCallback(async (params: StreamParams) => {
    setIsStreaming(true);
    setContent('');

    const adapter = createAdapter(provider);
    adapterRef.current = adapter;

    for await (const chunk of adapter.connect(params)) {
      if (chunk.done) break;
      setContent(prev => prev + chunk.text);
    }

    setIsStreaming(false);
  }, [provider]);

  const stop = useCallback(() => {
    adapterRef.current?.abort();
    setIsStreaming(false);
  }, []);

  return { content, isStreaming, stream, stop };
}

새로운 Provider 추가하기

Adapter 패턴의 장점은 새로운 AI 제공자 추가가 매우 간단하다는 것이다.

  1. 새 Adapter 클래스 생성 (adapters/deepseek.ts)
  2. StreamAdapter 인터페이스 구현
  3. Factory에 등록
// adapters/deepseek.ts
export class DeepSeekAdapter implements StreamAdapter {
  provider = 'deepseek';

  async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
    // DeepSeek API 형식에 맞게 구현
    // ...
  }

  abort() { /* ... */ }
}

// adapters/factory.ts에 추가
adapters.deepseek = DeepSeekAdapter;

기존 코드 수정 없이 확장이 가능하다. Open-Closed Principle 적용.

에러 처리

SSE 스트리밍에서 에러 처리는 중요하다.

async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
  try {
    const response = await fetch('/api/providers/openai', {
      method: 'POST',
      body: JSON.stringify(params),
      signal: this.controller?.signal,
    });

    if (!response.ok) {
      throw new Error(`HTTP ${response.status}: ${response.statusText}`);
    }

    // 스트리밍 처리...
  } catch (error) {
    if (error.name === 'AbortError') {
      // 사용자가 중단한 경우 - 정상 종료
      yield { text: '', done: true };
      return;
    }
    // 실제 에러
    throw error;
  }
}

마무리

SSE 스트리밍은 AI 채팅 서비스의 사용자 경험을 크게 향상시킨다. Provider Adapter 패턴을 적용하면 여러 AI 제공자를 통합하면서도 코드의 확장성과 유지보수성을 유지할 수 있다.

핵심 포인트:

  • SSE는 AI 채팅에 적합: 단방향 스트리밍, 자동 재연결, 간단한 구현
  • Adapter 패턴: Provider별 API 차이를 추상화하여 클라이언트 코드 단순화
  • 확장성: 새 Provider 추가 시 Adapter만 구현하면 됨

이 패턴은 실제로 Gng 프로젝트에서 6개의 AI 제공자(OpenAI, Anthropic, Gemini, xAI, DeepSeek, Solar)를 통합하는 데 사용했다.