fix(rvw): speed up review flow and complete forensics export

Run RVW skills in controlled parallel mode and persist per-skill progress so users can view completed tabs during execution.
Also include data-forensics tables in Word export and refresh the RVW module status documentation.

Made-with: Cursor
This commit is contained in:
2026-03-10 21:10:43 +08:00
parent d96cdf3fe8
commit 4a4771fbbe
7 changed files with 377 additions and 78 deletions

View File

@@ -315,6 +315,11 @@ export async function getTaskDetail(
logger.info('[RVW:Controller] 获取任务详情', { taskId });
const task = await reviewService.getTaskDetail(userId, taskId);
const contextData = task.contextData as {
forensicsResult?: unknown;
clinicalReview?: unknown;
skillProgress?: Record<string, unknown>;
} | null;
// 🆕 直接使用新字段
return reply.send({
@@ -336,6 +341,11 @@ export async function getTaskDetail(
durationSeconds: task.durationSeconds,
errorMessage: task.errorMessage,
errorDetails: task.errorDetails ?? undefined,
editorialReview: task.editorialReview ?? undefined,
methodologyReview: task.methodologyReview ?? undefined,
forensicsResult: contextData?.forensicsResult ?? undefined,
clinicalReview: contextData?.clinicalReview ?? undefined,
reviewProgress: contextData?.skillProgress ?? undefined,
},
});
} catch (error) {

View File

@@ -125,45 +125,49 @@ export class SkillExecutor<TContext extends BaseSkillContext = SkillContext> {
taskId: context.taskId,
skillIds: stage.map(s => s.skillId),
});
const stageConcurrency = Math.max(1, profile.globalConfig?.maxConcurrency ?? stage.length);
for (let offset = 0; offset < stage.length; offset += stageConcurrency) {
const chunk = stage.slice(offset, offset + stageConcurrency);
const promises = chunk.map(item => this.executePipelineItem(item, context, profile));
const settled = await Promise.allSettled(promises);
const promises = stage.map(item => this.executePipelineItem(item, context, profile));
const settled = await Promise.allSettled(promises);
for (let i = 0; i < stage.length; i++) {
const outcome = settled[i];
if (outcome.status === 'fulfilled') {
const result = outcome.value;
if (result) {
results.push(result);
context.previousResults.push(result);
const skill = SkillRegistry.get(stage[i].skillId);
if (skill) this.updateContextWithResult(context, skill, result);
for (let i = 0; i < chunk.length; i++) {
const outcome = settled[i];
const currentItem = chunk[i];
if (outcome.status === 'fulfilled') {
const result = outcome.value;
if (result) {
results.push(result);
context.previousResults.push(result);
const skill = SkillRegistry.get(currentItem.skillId);
if (skill) this.updateContextWithResult(context, skill, result);
}
} else {
// Promise 本身 rejected — 极端情况下的兜底
const errorMessage = outcome.reason instanceof Error
? outcome.reason.message
: String(outcome.reason);
logger.error('[SkillExecutor] Parallel skill promise rejected (uncaught)', {
skillId: currentItem.skillId,
taskId: context.taskId,
error: errorMessage,
});
const now = new Date();
results.push({
skillId: currentItem.skillId,
skillName: currentItem.skillId,
status: 'error',
issues: [{
severity: 'ERROR',
type: SkillErrorCodes.SKILL_EXECUTION_ERROR,
message: `${currentItem.skillId} 执行异常: ${errorMessage}`,
}],
error: errorMessage,
executionTime: 0,
startedAt: now,
completedAt: now,
});
}
} else {
// Promise 本身 rejected — 极端情况下的兜底
const errorMessage = outcome.reason instanceof Error
? outcome.reason.message
: String(outcome.reason);
logger.error('[SkillExecutor] Parallel skill promise rejected (uncaught)', {
skillId: stage[i].skillId,
taskId: context.taskId,
error: errorMessage,
});
const now = new Date();
results.push({
skillId: stage[i].skillId,
skillName: stage[i].skillId,
status: 'error',
issues: [{
severity: 'ERROR',
type: SkillErrorCodes.SKILL_EXECUTION_ERROR,
message: `${stage[i].skillId} 执行异常: ${errorMessage}`,
}],
error: errorMessage,
executionTime: 0,
startedAt: now,
completedAt: now,
});
}
}
}

View File

@@ -29,6 +29,7 @@ export const DEFAULT_PROFILE: JournalProfile = {
tolerancePercent: 0.1,
},
timeout: 300000, // 5min: Python + LLM核查(内部180s超时降级) + 长文档余量
parallelGroup: 'llm-review', // 与其余模块并行,缩短总时长
},
{
skillId: 'EditorialSkill',
@@ -57,6 +58,7 @@ export const DEFAULT_PROFILE: JournalProfile = {
strictness: 'STANDARD',
continueOnError: true,
timeoutMultiplier: 1.0,
maxConcurrency: 4, // 受控并行默认同时跑4个模块
},
};

View File

@@ -34,6 +34,7 @@ import {
createPartialContextFromTask,
registerBuiltinSkills,
ExecutionSummary,
SkillResult,
} from '../skills/index.js';
/**
@@ -307,6 +308,7 @@ export async function registerReviewWorker() {
},
forensicsResult: skillsSummary.results.find(r => r.skillId === 'DataForensicsSkill')?.data,
clinicalReview: clinicalResult,
skillProgress: buildSkillProgressMap(skillsSummary.results),
}
: null;
@@ -439,6 +441,19 @@ function buildErrorDetails(summary: ExecutionSummary): Record<string, unknown> {
};
}
function buildSkillProgressMap(results: SkillResult[]): Record<string, unknown> {
const progress: Record<string, unknown> = {};
for (const item of results) {
progress[item.skillId] = {
status: item.status,
executionTime: item.executionTime,
completedAt: item.completedAt.toISOString(),
error: item.error || null,
};
}
return progress;
}
/**
* 使用 V2.0 Skills 架构执行审查
*/
@@ -480,9 +495,73 @@ async function executeWithSkills(
fileSize,
});
const currentTask = await prisma.reviewTask.findUnique({
where: { id: taskId },
select: { contextData: true },
});
const incrementalContext =
((currentTask?.contextData as Record<string, unknown> | null) || {});
const runningContext: Record<string, unknown> = { ...incrementalContext };
const skillProgress =
((incrementalContext.skillProgress as Record<string, unknown> | undefined) || {});
let persistQueue: Promise<void> = Promise.resolve();
const persistSkillResult = (result: SkillResult) => {
persistQueue = persistQueue.then(async () => {
skillProgress[result.skillId] = {
status: result.status,
executionTime: result.executionTime,
completedAt: result.completedAt.toISOString(),
error: result.error || null,
};
const nextContext: Record<string, unknown> = {
...runningContext,
skillProgress: { ...skillProgress },
};
const updateData: Record<string, unknown> = {
contextData: nextContext as Prisma.InputJsonValue,
};
if (result.skillId === 'DataForensicsSkill' && result.data) {
nextContext.forensicsResult = result.data;
}
if (result.skillId === 'ClinicalAssessmentSkill' && result.data) {
nextContext.clinicalReview = result.data;
}
if (result.skillId === 'EditorialSkill' && result.data) {
const editorialData = result.data as EditorialReview;
updateData.editorialReview = editorialData as unknown as Prisma.InputJsonValue;
updateData.editorialScore = editorialData.overall_score ?? null;
}
if (result.skillId === 'MethodologySkill' && result.data) {
const methodologyData = result.data as MethodologyReview;
updateData.methodologyReview = methodologyData as unknown as Prisma.InputJsonValue;
updateData.methodologyScore = methodologyData.overall_score ?? null;
updateData.methodologyStatus = getMethodologyStatus(methodologyData);
}
updateData.contextData = nextContext as Prisma.InputJsonValue;
Object.assign(runningContext, nextContext);
await prisma.reviewTask.update({
where: { id: taskId },
data: updateData as Prisma.ReviewTaskUpdateInput,
});
});
return persistQueue;
};
// 执行 Pipeline
const executor = new SkillExecutor();
const executor = new SkillExecutor({
onSkillComplete: async (_skillId, result) => {
await persistSkillResult(result);
},
});
const summary = await executor.execute(profile, partialContext);
await persistQueue;
// 输出执行结果
console.log(`\n 📊 Skills 执行结果:`);