/** * 通用流式响应服务 * * 封装 LLM 调用 + OpenAI Compatible 输出 * 支持深度思考、Token 统计、错误处理 */ import { FastifyReply } from 'fastify'; import { OpenAIStreamAdapter, createOpenAIStreamAdapter } from './OpenAIStreamAdapter'; import { StreamOptions, StreamCallbacks, THINKING_TAGS, OpenAIMessage } from './types'; import { LLMFactory } from '../llm/adapters/LLMFactory'; import type { Message as LLMMessage } from '../llm/adapters/types'; import { logger } from '../logging/logger'; /** * 深度思考标签处理结果 */ interface ThinkingParseResult { content: string; thinking: string; inThinking: boolean; } /** * 流式响应服务 */ export class StreamingService { private adapter: OpenAIStreamAdapter; private options: StreamOptions; private fullContent: string = ''; private thinkingContent: string = ''; private isInThinking: boolean = false; constructor(reply: FastifyReply, options: StreamOptions = {}) { this.adapter = createOpenAIStreamAdapter(reply, options.model); this.options = options; } /** * 执行流式生成 */ async streamGenerate( messages: OpenAIMessage[], callbacks?: StreamCallbacks ): Promise<{ content: string; thinking: string; messageId: string }> { const { model = 'deepseek-v3', temperature = 0.7, maxTokens = 4096 } = this.options; try { // 获取 LLM 适配器 const llm = LLMFactory.getAdapter(model as any); // 发送角色开始标识 this.adapter.sendRoleStart(); // 流式生成 const stream = llm.chatStream( messages as LLMMessage[], { temperature, maxTokens } ); for await (const chunk of stream) { if (chunk.content) { // 处理深度思考标签 const { content, thinking, inThinking } = this.processThinkingTags( chunk.content, this.options.enableDeepThinking ?? false ); // 发送思考内容 if (thinking) { this.thinkingContent += thinking; this.adapter.sendReasoningDelta(thinking); callbacks?.onThinking?.(thinking); } // 发送正文内容 if (content) { this.fullContent += content; this.adapter.sendContentDelta(content); callbacks?.onContent?.(content); } } } // 发送完成标识 const usage = { promptTokens: this.estimateTokens(messages.map(m => m.content).join('')), completionTokens: this.estimateTokens(this.fullContent), totalTokens: 0, }; usage.totalTokens = usage.promptTokens + usage.completionTokens; this.adapter.sendComplete(usage); this.adapter.end(); // 完成回调 callbacks?.onComplete?.(this.fullContent, this.thinkingContent); logger.info('[StreamingService] 流式生成完成', { conversationId: this.options.conversationId, contentLength: this.fullContent.length, thinkingLength: this.thinkingContent.length, tokens: usage.totalTokens, }); return { content: this.fullContent, thinking: this.thinkingContent, messageId: this.adapter.getMessageId(), }; } catch (error) { const errorMessage = error instanceof Error ? error.message : '流式生成失败'; this.adapter.sendError(errorMessage); this.adapter.end(); callbacks?.onError?.(error instanceof Error ? error : new Error(errorMessage)); logger.error('[StreamingService] 流式生成失败', { error, conversationId: this.options.conversationId, }); throw error; } } /** * 处理深度思考标签 */ private processThinkingTags(text: string, enableDeepThinking: boolean): ThinkingParseResult { if (!enableDeepThinking) { return { content: text, thinking: '', inThinking: this.isInThinking }; } let content = ''; let thinking = ''; let remaining = text; while (remaining.length > 0) { if (this.isInThinking) { // 在思考模式中,查找结束标签 const endIndex = remaining.indexOf(THINKING_TAGS.END); if (endIndex !== -1) { thinking += remaining.substring(0, endIndex); remaining = remaining.substring(endIndex + THINKING_TAGS.END.length); this.isInThinking = false; } else { thinking += remaining; remaining = ''; } } else { // 不在思考模式,查找开始标签 const startIndex = remaining.indexOf(THINKING_TAGS.START); if (startIndex !== -1) { content += remaining.substring(0, startIndex); remaining = remaining.substring(startIndex + THINKING_TAGS.START.length); this.isInThinking = true; } else { content += remaining; remaining = ''; } } } return { content, thinking, inThinking: this.isInThinking }; } /** * 估算 Token 数量(简单实现) */ private estimateTokens(text: string): number { // 中文约 1.5 字符/token,英文约 4 字符/token const chineseChars = (text.match(/[\u4e00-\u9fa5]/g) || []).length; const otherChars = text.length - chineseChars; return Math.ceil(chineseChars / 1.5 + otherChars / 4); } } /** * 创建流式响应服务 */ export function createStreamingService( reply: FastifyReply, options?: StreamOptions ): StreamingService { return new StreamingService(reply, options); } /** * 快捷方法:直接执行流式生成 */ export async function streamChat( reply: FastifyReply, messages: OpenAIMessage[], options?: StreamOptions, callbacks?: StreamCallbacks ) { const service = createStreamingService(reply, options); return service.streamGenerate(messages, callbacks); }