fix(aia,ssa,asl,infra): harden SSE transport and stabilize attachment context

Deliver SSE protocol hardening for SAE/HTTP2 paths, add graceful shutdown health behavior, and improve SSA retry UX for transient stream failures. For AIA, persist attachment extraction results in database with cache read-through fallback, plus production cache safety guard to prevent memory-cache drift in multi-instance deployments; also restore ASL SR page scrolling behavior.

Made-with: Cursor
This commit is contained in:
2026-03-09 18:45:12 +08:00
parent 50657dd81f
commit 5c5fec52c1
27 changed files with 807 additions and 100 deletions

View File

@@ -2,11 +2,21 @@ import { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify'
import { prisma, getDatabaseConnectionCount } from '../../config/database.js'
import os from 'os'
/**
* 停机标记SIGTERM 后设为 trueliveness/readiness 立即返回 503
* 让 CLB/SAE 不再向本 Pod 派发新请求
*/
let isShuttingDown = false;
export function markShuttingDown(): void {
isShuttingDown = true;
}
/**
* 健康检查响应
*/
export interface HealthCheckResponse {
status: 'ok' | 'error' | 'degraded'
status: 'ok' | 'error' | 'degraded' | 'shutting_down'
timestamp: number
uptime: number
checks?: Record<string, {
@@ -46,6 +56,14 @@ export async function registerHealthRoutes(app: FastifyInstance): Promise<void>
_request: FastifyRequest,
reply: FastifyReply
) => {
if (isShuttingDown) {
return reply.status(503).send({
status: 'shutting_down',
timestamp: Date.now(),
uptime: process.uptime(),
});
}
const response: HealthCheckResponse = {
status: 'ok',
timestamp: Date.now(),
@@ -68,6 +86,14 @@ export async function registerHealthRoutes(app: FastifyInstance): Promise<void>
_request: FastifyRequest,
reply: FastifyReply
) => {
if (isShuttingDown) {
return reply.status(503).send({
status: 'shutting_down',
timestamp: Date.now(),
uptime: process.uptime(),
});
}
const checks: Record<string, any> = {}
let overallStatus: 'ok' | 'error' | 'degraded' = 'ok'

View File

@@ -21,7 +21,7 @@
* ```
*/
export { registerHealthRoutes } from './healthCheck.js'
export { registerHealthRoutes, markShuttingDown } from './healthCheck.js'
export type { HealthCheckResponse } from './healthCheck.js'

View File

@@ -36,7 +36,6 @@ export class OpenAIStreamAdapter {
this.reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
'Access-Control-Allow-Origin': '*',
});

View File

@@ -220,6 +220,11 @@ export function validateEnv(): void {
}
}
// 生产环境禁止内存缓存(多实例不共享,易导致状态不一致)
if (config.nodeEnv === 'production' && config.cacheType === 'memory') {
errors.push('CACHE_TYPE=memory is forbidden in production; use postgres or redis')
}
// 如果使用PgBoss队列验证数据库配置
if (config.queueType === 'pgboss') {
if (!config.databaseUrl) {

View File

@@ -15,7 +15,7 @@ import { aslRoutes } from './modules/asl/routes/index.js';
import { registerDCRoutes, initDCModule } from './modules/dc/index.js';
import pkbRoutes from './modules/pkb/routes/index.js';
import { aiaRoutes } from './modules/aia/index.js';
import { registerHealthRoutes } from './common/health/index.js';
import { registerHealthRoutes, markShuttingDown } from './common/health/index.js';
import { logger } from './common/logging/index.js';
import { authRoutes, registerAuthPlugin } from './common/auth/index.js';
import { promptRoutes } from './common/prompt/index.js';
@@ -339,11 +339,24 @@ start();
// ============================================
// 🛡️ 优雅关闭处理Graceful Shutdown
// ============================================
const SHUTDOWN_TIMEOUT_MS = 30_000;
const gracefulShutdown = async (signal: string) => {
console.log(`\n⚠ 收到 ${signal} 信号,开始优雅关闭...`);
// 立即标记停机,健康检查返回 503CLB 不再派发新请求
markShuttingDown();
console.log('🚫 健康检查已切换为 503CLB 将停止路由新流量');
// 强制超时兜底:防止 SSE 长连接或死循环任务阻塞退出
const forceTimer = setTimeout(() => {
console.error(`❌ 优雅关闭超时 (${SHUTDOWN_TIMEOUT_MS / 1000}s),强制退出`);
process.exit(1);
}, SHUTDOWN_TIMEOUT_MS);
forceTimer.unref();
try {
// 1. 停止接收新请求
// 1. 停止接收新请求(已有 SSE 连接继续跑完)
await fastify.close();
console.log('✅ HTTP 服务已停止');

View File

@@ -399,8 +399,8 @@ export class ChatController {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});
// 保存用户消息

View File

@@ -195,8 +195,8 @@ export class ConversationController {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});
// 流式输出

View File

@@ -12,6 +12,7 @@ import { logger } from '../../../common/logging/index.js';
import { storage } from '../../../common/storage/index.js';
import { cache } from '../../../common/cache/index.js';
import { ExtractionClient } from '../../../common/document/ExtractionClient.js';
import { prisma } from '../../../config/database.js';
import type { Attachment } from '../types/index.js';
// 附件缓存前缀和过期时间2小时
@@ -25,6 +26,24 @@ const MAX_ATTACHMENTS = 5;
const MAX_TOKENS_PER_ATTACHMENT = 30000; // 单个附件最大 30k Token
const ALLOWED_FILE_TYPES = ['pdf', 'docx', 'txt', 'xlsx', 'doc'];
interface AttachmentQueryScope {
userId?: string;
conversationId?: string;
}
interface AiaAttachmentTextRecord {
id: string;
textContent: string | null;
extractStatus: 'success' | 'failed' | 'empty' | string;
extractError: string | null;
}
interface AiaAttachmentInfoRecord {
id: string;
filename: string;
size: number;
}
// ==================== 附件上传 ====================
/**
@@ -56,10 +75,17 @@ export async function uploadAttachment(
// 3. 提取文本内容
let extractedText = '';
let extractStatus: 'success' | 'failed' | 'empty' = 'success';
let extractError: string | undefined;
let wasTruncated = false;
try {
// 对于 txt 文件,直接读取内容(不依赖 Python 服务)
if (ext === 'txt') {
extractedText = file.buffer.toString('utf-8');
if (!extractedText.trim()) {
extractStatus = 'empty';
extractedText = '[文档内容为空或无法提取]';
}
logger.info('[AIA:AttachmentService] TXT文件直接读取成功', {
filename: file.filename,
charCount: extractedText.length,
@@ -89,6 +115,8 @@ export async function uploadAttachment(
filename: file.filename,
error: result.error,
});
extractStatus = 'empty';
extractError = result.error || '文档内容为空或无法提取';
extractedText = '[文档内容为空或无法提取]';
}
}
@@ -99,6 +127,7 @@ export async function uploadAttachment(
const ratio = MAX_TOKENS_PER_ATTACHMENT / tokens;
const truncatedLength = Math.floor(extractedText.length * ratio);
extractedText = extractedText.slice(0, truncatedLength) + '\n\n[内容已截断超过30k Token限制]';
wasTruncated = true;
logger.info('[AIA:AttachmentService] 附件内容截断', {
originalTokens: tokens,
@@ -107,17 +136,19 @@ export async function uploadAttachment(
});
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
logger.error('[AIA:AttachmentService] 文本提取失败', {
error,
filename: file.filename,
});
extractStatus = 'failed';
extractError = message;
extractedText = '[文档内容提取失败]';
}
// 5. 构建附件对象
const attachmentId = `att-${Date.now()}-${Math.random().toString(36).slice(2, 8)}`;
const tokenCount = estimateTokens(extractedText);
const truncated = tokenCount > MAX_TOKENS_PER_ATTACHMENT;
const tokenCount = extractStatus === 'success' ? estimateTokens(extractedText) : 0;
const attachment: Attachment = {
id: attachmentId,
@@ -127,12 +158,12 @@ export async function uploadAttachment(
ossUrl: url,
textContent: extractedText,
tokenCount,
truncated,
truncated: wasTruncated,
createdAt: new Date().toISOString(),
};
// 6. 将提取的文本存储到缓存(供后续发送消息时使用)
if (extractedText && extractedText !== '[文档内容提取失败]' && extractedText !== '[文档内容为空或无法提取]') {
if (extractStatus === 'success' && extractedText) {
await cache.set(
`${ATTACHMENT_CACHE_PREFIX}${attachmentId}`,
extractedText,
@@ -150,6 +181,9 @@ export async function uploadAttachment(
id: attachmentId,
filename: file.filename,
size: file.buffer.length,
mimeType: file.mimetype,
ossUrl: url,
extractStatus,
};
await cache.set(
`${ATTACHMENT_INFO_CACHE_PREFIX}${attachmentId}`,
@@ -157,13 +191,48 @@ export async function uploadAttachment(
ATTACHMENT_CACHE_TTL
);
// 8. 附件信息持久化到数据库(真相源)
await (prisma as any).aiaAttachment.upsert({
where: { id: attachmentId },
update: {
userId,
conversationId,
filename: file.filename,
mimeType: file.mimetype,
size: file.buffer.length,
ossUrl: url,
textContent: extractStatus === 'success' ? extractedText : null,
extractStatus,
extractError: extractError || null,
tokenCount,
truncated: wasTruncated,
},
create: {
id: attachmentId,
userId,
conversationId,
filename: file.filename,
mimeType: file.mimetype,
size: file.buffer.length,
ossUrl: url,
textContent: extractStatus === 'success' ? extractedText : null,
extractStatus,
extractError: extractError || null,
tokenCount,
truncated: wasTruncated,
},
});
return attachment;
}
/**
* 批量获取附件文本内容
*/
export async function getAttachmentsText(attachmentIds: string[]): Promise<string> {
export async function getAttachmentsText(
attachmentIds: string[],
scope: AttachmentQueryScope = {},
): Promise<string> {
if (!attachmentIds || attachmentIds.length === 0) {
return '';
}
@@ -174,11 +243,28 @@ export async function getAttachmentsText(attachmentIds: string[]): Promise<strin
});
const texts: string[] = [];
const where: any = {
id: { in: attachmentIds },
};
if (scope.userId) where.userId = scope.userId;
if (scope.conversationId) where.conversationId = scope.conversationId;
const records = await (prisma as any).aiaAttachment.findMany({
where,
select: {
id: true,
textContent: true,
extractStatus: true,
extractError: true,
},
});
const typedRecords = records as AiaAttachmentTextRecord[];
const recordMap = new Map(typedRecords.map((r: AiaAttachmentTextRecord) => [r.id, r]));
for (const attachmentId of attachmentIds) {
try {
const cacheKey = `${ATTACHMENT_CACHE_PREFIX}${attachmentId}`;
const text = await cache.get(cacheKey);
const text = await cache.get<string>(cacheKey);
if (text) {
texts.push(`【附件: ${attachmentId}\n${text}`);
@@ -187,8 +273,22 @@ export async function getAttachmentsText(attachmentIds: string[]): Promise<strin
textLength: text.length,
});
} else {
logger.warn('[AIA:AttachmentService] 附件文本不在缓存中', { attachmentId });
texts.push(`【附件: ${attachmentId}\n[附件内容已过期或不存在]`);
const record = recordMap.get(attachmentId);
logger.warn('[AIA:AttachmentService] 附件文本缓存未命中,尝试数据库回源', {
attachmentId,
hasDbRecord: !!record,
});
if (record?.extractStatus === 'success' && record.textContent) {
texts.push(`【附件: ${attachmentId}\n${record.textContent}`);
await cache.set(cacheKey, record.textContent, ATTACHMENT_CACHE_TTL);
} else if (record?.extractStatus === 'failed') {
texts.push(`【附件: ${attachmentId}\n[附件内容提取失败:${record.extractError || '请重新上传附件'}]`);
} else if (record?.extractStatus === 'empty') {
texts.push(`【附件: ${attachmentId}\n[附件内容为空或无法提取有效文本]`);
} else {
texts.push(`【附件: ${attachmentId}\n[附件内容不存在或未就绪]`);
}
}
} catch (error) {
logger.error('[AIA:AttachmentService] 获取附件文本失败', {
@@ -206,33 +306,73 @@ export async function getAttachmentsText(attachmentIds: string[]): Promise<strin
* 用于发送消息时保存附件信息到数据库
*/
export async function getAttachmentDetails(
attachmentIds: string[]
attachmentIds: string[],
scope: AttachmentQueryScope = {},
): Promise<Array<{ id: string; filename: string; size: number }>> {
if (!attachmentIds || attachmentIds.length === 0) {
return [];
}
const details: Array<{ id: string; filename: string; size: number }> = [];
const missingIds: string[] = [];
for (const attachmentId of attachmentIds) {
try {
const cacheKey = `${ATTACHMENT_INFO_CACHE_PREFIX}${attachmentId}`;
const infoJson = await cache.get(cacheKey);
const infoJson = await cache.get<string>(cacheKey);
if (infoJson) {
const info = JSON.parse(infoJson);
details.push(info);
} else {
logger.warn('[AIA:AttachmentService] 附件信息不在缓存中', { attachmentId });
// 如果缓存中没有,添加一个占位信息
missingIds.push(attachmentId);
}
} catch (error) {
logger.error('[AIA:AttachmentService] 获取附件信息失败', { attachmentId, error });
missingIds.push(attachmentId);
}
}
if (missingIds.length > 0) {
const where: any = {
id: { in: missingIds },
};
if (scope.userId) where.userId = scope.userId;
if (scope.conversationId) where.conversationId = scope.conversationId;
const dbRecords = await (prisma as any).aiaAttachment.findMany({
where,
select: {
id: true,
filename: true,
size: true,
},
});
const typedDbRecords = dbRecords as AiaAttachmentInfoRecord[];
const dbMap = new Map(typedDbRecords.map((r: AiaAttachmentInfoRecord) => [r.id, r]));
for (const attachmentId of missingIds) {
const record = dbMap.get(attachmentId);
if (record) {
const info = {
id: record.id,
filename: record.filename,
size: record.size,
};
details.push(info);
await cache.set(
`${ATTACHMENT_INFO_CACHE_PREFIX}${attachmentId}`,
JSON.stringify(info),
ATTACHMENT_CACHE_TTL,
);
} else {
logger.warn('[AIA:AttachmentService] 附件信息缓存/数据库均未命中', { attachmentId });
details.push({
id: attachmentId,
filename: '未知文件',
size: 0,
});
}
} catch (error) {
logger.error('[AIA:AttachmentService] 获取附件信息失败', { attachmentId, error });
}
}

View File

@@ -227,7 +227,6 @@ export async function getMessages(
return {
messages: messages.map(m => {
const attachmentsJson = m.attachments as any;
const attachmentIds = attachmentsJson?.ids as string[] | undefined;
// 直接从 JSON 字段读取附件详情(不查询数据库)
const attachmentDetails = attachmentsJson?.details as Array<{ id: string; filename: string; size: number }> | undefined;
@@ -237,10 +236,10 @@ export async function getMessages(
role: m.role as 'user' | 'assistant',
content: m.content,
thinkingContent: m.thinkingContent || undefined,
attachments: attachmentIds,
attachmentDetails: attachmentDetails && attachmentDetails.length > 0 ? attachmentDetails : undefined,
model: m.model || undefined,
tokens: m.tokens || undefined,
isPinned: m.isPinned,
createdAt: m.createdAt.toISOString(),
};
}),
@@ -287,7 +286,10 @@ export async function sendMessageStream(
let attachmentsData = undefined;
if (attachmentIds && attachmentIds.length > 0) {
// 从缓存获取附件详情
const attachmentDetails = await attachmentService.getAttachmentDetails(attachmentIds);
const attachmentDetails = await attachmentService.getAttachmentDetails(attachmentIds, {
userId,
conversationId,
});
attachmentsData = {
ids: attachmentIds,
details: attachmentDetails,
@@ -309,7 +311,7 @@ export async function sendMessageStream(
// 5. 处理附件文本(如果有)
let userContent = content;
if (attachmentIds && attachmentIds.length > 0) {
const attachmentText = await getAttachmentText(attachmentIds);
const attachmentText = await getAttachmentText(attachmentIds, userId, conversationId);
if (attachmentText) {
userContent = `${content}\n\n---\n附件内容\n${attachmentText}`;
}
@@ -434,9 +436,13 @@ async function buildContextMessages(
* 获取附件文本内容
* 从缓存中获取上传时提取的文本
*/
async function getAttachmentText(attachmentIds: string[]): Promise<string> {
async function getAttachmentText(
attachmentIds: string[],
userId: string,
conversationId: string,
): Promise<string> {
logger.info('[AIA:ConversationService] 获取附件文本', { attachmentIds });
return attachmentService.getAttachmentsText(attachmentIds);
return attachmentService.getAttachmentsText(attachmentIds, { userId, conversationId });
}
/**

View File

@@ -38,8 +38,8 @@ export async function streamSearch(
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});
try {

View File

@@ -202,7 +202,6 @@ export async function streamTaskLogs(
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
});

View File

@@ -61,8 +61,7 @@ export class StreamAIController {
// 设置SSE响应头
reply.raw.setHeader('Content-Type', 'text/event-stream');
reply.raw.setHeader('Cache-Control', 'no-cache');
reply.raw.setHeader('Connection', 'keep-alive');
reply.raw.setHeader('X-Accel-Buffering', 'no'); // 禁用Nginx缓冲
reply.raw.setHeader('X-Accel-Buffering', 'no');
// 发送步骤消息的辅助函数
const sendStep = (step: number, stepName: string, status: StreamMessage['status'], message: string, data?: any, error?: string, retryCount?: number) => {

View File

@@ -182,7 +182,7 @@ export async function sendMessageStream(
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
reply.raw.write(`data: ${JSON.stringify({
content: `\n\n⚠ **Token数量超限**\n\n${errorMsg}`,
@@ -223,8 +223,8 @@ export async function sendMessageStream(
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});
// 流式输出

View File

@@ -43,11 +43,10 @@ export default async function chatRoutes(app: FastifyInstance) {
return reply.status(400).send({ error: '消息内容不能为空' });
}
// SSE 响应头
// SSE 响应头(不设置 ConnectionHTTP/2 中为禁止头部)
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});

View File

@@ -266,8 +266,8 @@ export default async function sessionRoutes(app: FastifyInstance) {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});
const send = (type: string, data: any) => {

View File

@@ -276,8 +276,8 @@ export default async function workflowRoutes(app: FastifyInstance) {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*'
'Access-Control-Allow-Origin': '*',
'X-Accel-Buffering': 'no',
});
// 发送初始连接确认