/** * 任务拆分工具函数 * * 用于将长时间任务拆分成多个小任务,避免: * - SAE 30秒超时 * - pg-boss 24小时任务过期 * - 任务失败时重做所有工作 * * 核心策略: * - 文献筛选:每批20-50篇 * - 数据提取:每批10-20条 * - 统计分析:按数据集大小动态调整 */ /** * 任务类型的拆分策略 */ export interface ChunkStrategy { /** 任务类型标识 */ type: string /** 每批处理的数据量 */ chunkSize: number /** 最大批次数(防止过度拆分) */ maxChunks?: number /** 描述 */ description: string } /** * 预定义的拆分策略 * * 根据实际业务场景和性能测试数据配置 */ export const CHUNK_STRATEGIES: Record = { // ASL模块:文献筛选 'asl:screening:title-abstract': { type: 'asl:screening:title-abstract', chunkSize: 50, // 每批50篇(LLM API较快) maxChunks: 100, // 最多100批(5000篇) description: '标题/摘要筛选 - 每批50篇' }, 'asl:screening:full-text': { type: 'asl:screening:full-text', chunkSize: 20, // 每批20篇(全文较慢) maxChunks: 50, // 最多50批(1000篇) description: '全文筛选 - 每批20篇' }, 'asl:extraction': { type: 'asl:extraction', chunkSize: 30, // 每批30篇 maxChunks: 50, description: '数据提取 - 每批30篇' }, // DC模块:数据清洗 'dc:clean:batch': { type: 'dc:clean:batch', chunkSize: 100, // 每批100行 maxChunks: 100, description: '数据清洗 - 每批100行' }, 'dc:extract:medical-record': { type: 'dc:extract:medical-record', chunkSize: 10, // 每批10份病历(AI提取较慢) maxChunks: 100, description: '病历提取 - 每批10份' }, // SSA模块:统计分析 'ssa:analysis:batch': { type: 'ssa:analysis:batch', chunkSize: 1000, // 每批1000条数据 maxChunks: 50, description: '统计分析 - 每批1000条' }, // 默认策略 'default': { type: 'default', chunkSize: 50, maxChunks: 100, description: '默认策略 - 每批50条' } } /** * 将数据数组拆分成多个批次 * * @param items 要拆分的数据数组 * @param chunkSize 每批的大小 * @returns 拆分后的批次数组 * * @example * ```typescript * const ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] * const batches = splitIntoChunks(ids, 3) * // 结果: [[1,2,3], [4,5,6], [7,8,9], [10]] * ``` */ export function splitIntoChunks(items: T[], chunkSize: number): T[][] { if (chunkSize <= 0) { throw new Error('chunkSize must be positive') } if (items.length === 0) { return [] } const chunks: T[][] = [] for (let i = 0; i < items.length; i += chunkSize) { chunks.push(items.slice(i, i + chunkSize)) } return chunks } /** * 根据任务类型推荐批次大小 * * @param taskType 任务类型(如:'asl:screening:title-abstract') * @param totalItems 总数据量 * @returns 推荐的批次大小 * * @example * ```typescript * const chunkSize = recommendChunkSize('asl:screening:title-abstract', 1000) * // 返回: 50 (根据CHUNK_STRATEGIES配置) * ``` */ export function recommendChunkSize(taskType: string, totalItems: number): number { // 查找对应的策略 const strategy = CHUNK_STRATEGIES[taskType] || CHUNK_STRATEGIES['default'] let chunkSize = strategy.chunkSize // 如果总量很小,不拆分 if (totalItems <= chunkSize) { return totalItems } // 如果拆分后批次数超过maxChunks,增大chunkSize if (strategy.maxChunks) { const predictedChunks = Math.ceil(totalItems / chunkSize) if (predictedChunks > strategy.maxChunks) { chunkSize = Math.ceil(totalItems / strategy.maxChunks) console.log( `[TaskSplit] Adjusted chunkSize to ${chunkSize} to limit chunks to ${strategy.maxChunks}` ) } } return chunkSize } /** * 计算任务拆分信息 * * @param taskType 任务类型 * @param totalItems 总数据量 * @returns 拆分信息 * * @example * ```typescript * const info = calculateSplitInfo('asl:screening:title-abstract', 1000) * // 返回: { chunkSize: 50, totalChunks: 20, strategy: {...} } * ``` */ export function calculateSplitInfo(taskType: string, totalItems: number) { const strategy = CHUNK_STRATEGIES[taskType] || CHUNK_STRATEGIES['default'] const chunkSize = recommendChunkSize(taskType, totalItems) const totalChunks = Math.ceil(totalItems / chunkSize) return { taskType, totalItems, chunkSize, totalChunks, strategy, avgItemsPerChunk: totalChunks > 0 ? Math.round(totalItems / totalChunks) : 0, lastChunkSize: totalItems % chunkSize || chunkSize } } /** * 获取批次索引的人类可读描述 * * @param batchIndex 批次索引(从0开始) * @param totalBatches 总批次数 * @returns 描述字符串 * * @example * ```typescript * getBatchDescription(0, 20) // "批次 1/20" * getBatchDescription(19, 20) // "批次 20/20(最后一批)" * ``` */ export function getBatchDescription(batchIndex: number, totalBatches: number): string { const humanIndex = batchIndex + 1 if (humanIndex === totalBatches) { return `批次 ${humanIndex}/${totalBatches}(最后一批)` } return `批次 ${humanIndex}/${totalBatches}` } /** * 估算批次执行时间(秒) * * 基于经验值估算,用于前端显示预计完成时间 * * @param taskType 任务类型 * @param batchSize 批次大小 * @returns 估算的执行时间(秒) */ export function estimateBatchDuration(taskType: string, batchSize: number): number { // 每项平均处理时间(秒) const TIME_PER_ITEM: Record = { 'asl:screening:title-abstract': 0.5, // 0.5秒/篇(含LLM调用) 'asl:screening:full-text': 2, // 2秒/篇 'asl:extraction': 3, // 3秒/篇 'dc:clean:batch': 0.1, // 0.1秒/行 'dc:extract:medical-record': 5, // 5秒/份 'ssa:analysis:batch': 0.01, // 0.01秒/条 'default': 1 // 1秒/条 } const timePerItem = TIME_PER_ITEM[taskType] || TIME_PER_ITEM['default'] return Math.ceil(batchSize * timePerItem) } /** * 验证批次索引是否有效 * * @param batchIndex 批次索引 * @param totalBatches 总批次数 * @throws Error 如果索引无效 */ export function validateBatchIndex(batchIndex: number, totalBatches: number): void { if (batchIndex < 0 || batchIndex >= totalBatches) { throw new Error( `Invalid batch index: ${batchIndex}. Must be between 0 and ${totalBatches - 1}` ) } } /** * 从数组中提取指定批次的数据 * * @param items 完整数据数组 * @param batchIndex 批次索引(从0开始) * @param chunkSize 批次大小 * @returns 该批次的数据 * * @example * ```typescript * const ids = [1,2,3,4,5,6,7,8,9,10] * getBatchItems(ids, 0, 3) // [1,2,3] * getBatchItems(ids, 1, 3) // [4,5,6] * getBatchItems(ids, 3, 3) // [10] * ``` */ export function getBatchItems( items: T[], batchIndex: number, chunkSize: number ): T[] { const start = batchIndex * chunkSize const end = Math.min(start + chunkSize, items.length) return items.slice(start, end) }