SSE 스트리밍으로 AI 채팅 구현하기 - Provider Adapter 패턴
SSE 스트리밍으로 AI 채팅 구현하기
AI 채팅 서비스를 구축할 때 가장 중요한 사용자 경험 요소 중 하나는 실시간 응답이다. 사용자가 질문을 보낸 후 전체 응답이 완성될 때까지 기다리는 것보다, 글자가 타이핑되듯이 실시간으로 나타나는 것이 훨씬 자연스럽다.
이 글에서는 Server-Sent Events(SSE)를 활용하여 AI 채팅의 스트리밍 응답을 구현하는 방법과, 여러 AI 제공자를 통합할 때 유용한 Provider Adapter 패턴을 소개한다.
SSE vs WebSocket
실시간 통신을 구현할 때 WebSocket과 SSE 중 선택해야 한다.
| 특성 | SSE | WebSocket |
|---|---|---|
| 통신 방향 | 단방향 (서버 → 클라이언트) | 양방향 |
| 프로토콜 | HTTP | ws:// 별도 프로토콜 |
| 재연결 | 자동 지원 | 직접 구현 |
| 브라우저 지원 | EventSource API | WebSocket API |
AI 채팅의 경우 사용자 → 서버는 일반 HTTP 요청, 서버 → 사용자는 스트리밍 응답이므로 SSE가 더 적합하다. WebSocket의 양방향 통신은 이 경우 오버스펙이다.
기본 SSE 구현
서버 사이드 (Next.js API Route)
// app/api/chat/route.ts
import { NextRequest } from 'next/server';
import OpenAI from 'openai';
export async function POST(req: NextRequest) {
const { messages } = await req.json();
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages,
stream: true,
});
// ReadableStream으로 SSE 응답 생성
const encoder = new TextEncoder();
const readable = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
const text = chunk.choices[0]?.delta?.content || '';
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ text })}\n\n`));
}
controller.enqueue(encoder.encode('data: [DONE]\n\n'));
controller.close();
},
});
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
});
}
클라이언트 사이드
// hooks/useChat.ts
import { useState, useCallback } from 'react';
export function useChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (content: string) => {
setIsStreaming(true);
// 사용자 메시지 추가
const userMessage = { role: 'user', content };
setMessages(prev => [...prev, userMessage]);
// AI 응답 메시지 초기화
const assistantMessage = { role: 'assistant', content: '' };
setMessages(prev => [...prev, assistantMessage]);
const response = await fetch('/api/chat', {
method: 'POST',
body: JSON.stringify({ messages: [...messages, userMessage] }),
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n').filter(line => line.startsWith('data:'));
for (const line of lines) {
const data = line.replace('data: ', '');
if (data === '[DONE]') break;
const { text } = JSON.parse(data);
// 마지막 메시지(AI 응답)에 텍스트 추가
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1].content += text;
return updated;
});
}
}
setIsStreaming(false);
}, [messages]);
return { messages, sendMessage, isStreaming };
}
Provider Adapter 패턴
여러 AI 제공자(OpenAI, Anthropic, Gemini 등)를 지원하려면 각 제공자의 API 차이를 추상화해야 한다. 여기서 Adapter 패턴이 빛을 발한다.
통일된 인터페이스 정의
// types/stream.ts
export interface StreamAdapter {
provider: string;
connect(params: StreamParams): AsyncGenerator<StreamChunk>;
abort(): void;
}
export interface StreamParams {
model: string;
messages: Message[];
systemPrompt?: string;
}
export interface StreamChunk {
text: string;
done: boolean;
}
Provider별 Adapter 구현
// adapters/openai.ts
export class OpenAIAdapter implements StreamAdapter {
provider = 'openai';
private controller: AbortController | null = null;
async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
this.controller = new AbortController();
const response = await fetch('/api/providers/openai', {
method: 'POST',
body: JSON.stringify(params),
signal: this.controller.signal,
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) {
yield { text: '', done: true };
break;
}
const chunk = decoder.decode(value);
// OpenAI 형식 파싱
const text = this.parseOpenAIChunk(chunk);
yield { text, done: false };
}
}
abort() {
this.controller?.abort();
}
private parseOpenAIChunk(chunk: string): string {
// OpenAI SSE 형식: data: {"choices":[{"delta":{"content":"텍스트"}}]}
const lines = chunk.split('\n').filter(l => l.startsWith('data:'));
return lines.map(line => {
const json = JSON.parse(line.replace('data: ', ''));
return json.choices?.[0]?.delta?.content || '';
}).join('');
}
}
// adapters/anthropic.ts
export class AnthropicAdapter implements StreamAdapter {
provider = 'anthropic';
private controller: AbortController | null = null;
async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
this.controller = new AbortController();
const response = await fetch('/api/providers/anthropic', {
method: 'POST',
body: JSON.stringify(params),
signal: this.controller.signal,
});
const reader = response.body?.getReader();
const decoder = new TextDecoder();
while (reader) {
const { done, value } = await reader.read();
if (done) {
yield { text: '', done: true };
break;
}
const chunk = decoder.decode(value);
// Anthropic 형식 파싱 (OpenAI와 다름)
const text = this.parseAnthropicChunk(chunk);
yield { text, done: false };
}
}
abort() {
this.controller?.abort();
}
private parseAnthropicChunk(chunk: string): string {
// Anthropic SSE 형식: event: content_block_delta\ndata: {"delta":{"text":"텍스트"}}
const dataLines = chunk.split('\n').filter(l => l.startsWith('data:'));
return dataLines.map(line => {
const json = JSON.parse(line.replace('data: ', ''));
return json.delta?.text || '';
}).join('');
}
}
Adapter Factory
// adapters/factory.ts
import { OpenAIAdapter } from './openai';
import { AnthropicAdapter } from './anthropic';
import { GeminiAdapter } from './gemini';
const adapters = {
openai: OpenAIAdapter,
anthropic: AnthropicAdapter,
gemini: GeminiAdapter,
// 새 Provider 추가 시 여기에 등록
};
export function createAdapter(provider: string): StreamAdapter {
const AdapterClass = adapters[provider];
if (!AdapterClass) {
throw new Error(`Unknown provider: ${provider}`);
}
return new AdapterClass();
}
사용 예시
// hooks/useStreamChat.ts
import { useState, useCallback, useRef } from 'react';
import { createAdapter } from '@/adapters/factory';
export function useStreamChat(provider: string) {
const [content, setContent] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const adapterRef = useRef<StreamAdapter | null>(null);
const stream = useCallback(async (params: StreamParams) => {
setIsStreaming(true);
setContent('');
const adapter = createAdapter(provider);
adapterRef.current = adapter;
for await (const chunk of adapter.connect(params)) {
if (chunk.done) break;
setContent(prev => prev + chunk.text);
}
setIsStreaming(false);
}, [provider]);
const stop = useCallback(() => {
adapterRef.current?.abort();
setIsStreaming(false);
}, []);
return { content, isStreaming, stream, stop };
}
새로운 Provider 추가하기
Adapter 패턴의 장점은 새로운 AI 제공자 추가가 매우 간단하다는 것이다.
- 새 Adapter 클래스 생성 (
adapters/deepseek.ts) StreamAdapter인터페이스 구현- Factory에 등록
// adapters/deepseek.ts
export class DeepSeekAdapter implements StreamAdapter {
provider = 'deepseek';
async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
// DeepSeek API 형식에 맞게 구현
// ...
}
abort() { /* ... */ }
}
// adapters/factory.ts에 추가
adapters.deepseek = DeepSeekAdapter;
기존 코드 수정 없이 확장이 가능하다. Open-Closed Principle 적용.
에러 처리
SSE 스트리밍에서 에러 처리는 중요하다.
async *connect(params: StreamParams): AsyncGenerator<StreamChunk> {
try {
const response = await fetch('/api/providers/openai', {
method: 'POST',
body: JSON.stringify(params),
signal: this.controller?.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
// 스트리밍 처리...
} catch (error) {
if (error.name === 'AbortError') {
// 사용자가 중단한 경우 - 정상 종료
yield { text: '', done: true };
return;
}
// 실제 에러
throw error;
}
}
마무리
SSE 스트리밍은 AI 채팅 서비스의 사용자 경험을 크게 향상시킨다. Provider Adapter 패턴을 적용하면 여러 AI 제공자를 통합하면서도 코드의 확장성과 유지보수성을 유지할 수 있다.
핵심 포인트:
- SSE는 AI 채팅에 적합: 단방향 스트리밍, 자동 재연결, 간단한 구현
- Adapter 패턴: Provider별 API 차이를 추상화하여 클라이언트 코드 단순화
- 확장성: 새 Provider 추가 시 Adapter만 구현하면 됨
이 패턴은 실제로 Gng 프로젝트에서 6개의 AI 제공자(OpenAI, Anthropic, Gemini, xAI, DeepSeek, Solar)를 통합하는 데 사용했다.