feat(platform): Implement platform infrastructure with cloud-native support

- Add storage service (LocalAdapter + OSSAdapter stub)
- Add database connection pool with graceful shutdown
- Add logging system with winston (JSON format)
- Add environment config management
- Add async job queue (MemoryQueue + DatabaseQueue stub)
- Add cache service (MemoryCache + RedisCache stub)
- Add health check endpoints for SAE
- Add monitoring metrics for DB, memory, API

Key Features:
- Zero-code switching between local and cloud environments
- Adapter pattern for multi-environment support
- Backward compatible with legacy modules
- Ready for Aliyun Serverless deployment

Related: Platform Infrastructure Planning (docs/09-鏋舵瀯瀹炴柦/04-骞冲彴鍩虹璁炬柦瑙勫垝.md)
This commit is contained in:
2025-11-17 08:31:23 +08:00
parent a79abf88db
commit 8bba33ac89
28 changed files with 3716 additions and 51 deletions

View File

@@ -0,0 +1,82 @@
import { JobQueue } from './types.js'
import { MemoryQueue } from './MemoryQueue.js'
/**
* 任务队列工厂类
*
* 根据环境变量自动选择队列实现:
* - QUEUE_TYPE=memory: 使用MemoryQueue内存队列
* - QUEUE_TYPE=database: 使用DatabaseQueue数据库队列待实现
*
* 零代码切换:
* - 本地开发不配置QUEUE_TYPE默认使用memory
* - 云端部署配置QUEUE_TYPE=database多实例共享
*
* @example
* ```typescript
* import { jobQueue } from '@/common/jobs'
*
* // 业务代码不关心是memory还是database
* const job = await jobQueue.push('asl:screening', { projectId: 123 })
* ```
*/
export class JobFactory {
private static instance: JobQueue | null = null
/**
* 获取任务队列实例(单例模式)
*/
static getInstance(): JobQueue {
if (!this.instance) {
this.instance = this.createQueue()
}
return this.instance
}
/**
* 创建任务队列
*/
private static createQueue(): JobQueue {
const queueType = process.env.QUEUE_TYPE || 'memory'
switch (queueType) {
case 'memory':
return this.createMemoryQueue()
case 'database':
// TODO: 实现DatabaseQueue
console.warn('[JobFactory] DatabaseQueue not implemented yet, fallback to MemoryQueue')
return this.createMemoryQueue()
default:
console.warn(`[JobFactory] Unknown QUEUE_TYPE: ${queueType}, fallback to memory`)
return this.createMemoryQueue()
}
}
/**
* 创建内存队列
*/
private static createMemoryQueue(): MemoryQueue {
console.log('[JobFactory] Using MemoryQueue')
const queue = new MemoryQueue()
// 定期清理已完成的任务(避免内存泄漏)
if (process.env.NODE_ENV !== 'test') {
setInterval(() => {
queue.cleanup()
}, 60 * 60 * 1000) // 每小时清理一次
}
return queue
}
/**
* 重置实例(用于测试)
*/
static reset(): void {
this.instance = null
}
}

View File

@@ -0,0 +1,204 @@
import { Job, JobQueue, JobHandler } from './types.js'
import { randomUUID } from 'crypto'
/**
* 内存队列实现
*
* 适用场景:
* - 本地开发环境
* - 单实例部署
* - 非关键任务(重启会丢失)
*
* 特点:
* - ✅ 简单易用,无需外部依赖
* - ✅ 性能高效
* - ⚠️ 进程重启后任务丢失
* - ⚠️ 不支持多实例共享
*
* @example
* ```typescript
* const queue = new MemoryQueue()
*
* // 注册处理函数
* queue.process('email:send', async (job) => {
* await sendEmail(job.data.to, job.data.subject)
* })
*
* // 创建任务
* const job = await queue.push('email:send', { to: 'user@example.com', subject: 'Hello' })
*
* // 查询任务
* const status = await queue.getJob(job.id)
* ```
*/
export class MemoryQueue implements JobQueue {
private jobs: Map<string, Job> = new Map()
private handlers: Map<string, JobHandler> = new Map()
private processing: boolean = false
/**
* 添加任务到队列
*/
async push<T>(type: string, data: T): Promise<Job<T>> {
const job: Job<T> = {
id: randomUUID(),
type,
data,
status: 'pending',
progress: 0,
createdAt: new Date(),
updatedAt: new Date()
}
this.jobs.set(job.id, job)
// 触发任务处理
this.processNextJob()
return job
}
/**
* 注册任务处理函数
*/
process<T>(type: string, handler: JobHandler<T>): void {
this.handlers.set(type, handler)
console.log(`[MemoryQueue] Registered handler for job type: ${type}`)
// 开始处理队列中的待处理任务
this.processNextJob()
}
/**
* 获取任务信息
*/
async getJob(id: string): Promise<Job | null> {
return this.jobs.get(id) || null
}
/**
* 更新任务进度
*/
async updateProgress(id: string, progress: number): Promise<void> {
const job = this.jobs.get(id)
if (job) {
job.progress = Math.min(100, Math.max(0, progress))
job.updatedAt = new Date()
this.jobs.set(id, job)
}
}
/**
* 标记任务为完成
*/
async completeJob(id: string, result: any): Promise<void> {
const job = this.jobs.get(id)
if (job) {
job.status = 'completed'
job.progress = 100
job.result = result
job.completedAt = new Date()
job.updatedAt = new Date()
this.jobs.set(id, job)
console.log(`[MemoryQueue] Job completed: ${id} (type: ${job.type})`)
}
}
/**
* 标记任务为失败
*/
async failJob(id: string, error: string): Promise<void> {
const job = this.jobs.get(id)
if (job) {
job.status = 'failed'
job.error = error
job.completedAt = new Date()
job.updatedAt = new Date()
this.jobs.set(id, job)
console.error(`[MemoryQueue] Job failed: ${id} (type: ${job.type})`, error)
}
}
/**
* 处理下一个待处理任务
*/
private async processNextJob(): Promise<void> {
if (this.processing) return
// 查找第一个待处理的任务
const pendingJob = Array.from(this.jobs.values()).find(
job => job.status === 'pending'
)
if (!pendingJob) return
// 获取对应的处理函数
const handler = this.handlers.get(pendingJob.type)
if (!handler) {
// 没有注册处理函数,跳过
return
}
// 标记为处理中
this.processing = true
pendingJob.status = 'processing'
pendingJob.startedAt = new Date()
pendingJob.updatedAt = new Date()
this.jobs.set(pendingJob.id, pendingJob)
console.log(`[MemoryQueue] Processing job: ${pendingJob.id} (type: ${pendingJob.type})`)
try {
// 执行处理函数
const result = await handler(pendingJob)
// 标记为完成
await this.completeJob(pendingJob.id, result)
} catch (error: any) {
// 标记为失败
await this.failJob(pendingJob.id, error.message)
} finally {
this.processing = false
// 继续处理下一个任务
setImmediate(() => this.processNextJob())
}
}
/**
* 获取队列统计信息
*/
getStats() {
const jobs = Array.from(this.jobs.values())
return {
total: jobs.length,
pending: jobs.filter(j => j.status === 'pending').length,
processing: jobs.filter(j => j.status === 'processing').length,
completed: jobs.filter(j => j.status === 'completed').length,
failed: jobs.filter(j => j.status === 'failed').length
}
}
/**
* 清理已完成的任务(避免内存泄漏)
* 建议定期调用
*/
cleanup(olderThan: Date = new Date(Date.now() - 24 * 60 * 60 * 1000)) {
let removed = 0
for (const [id, job] of this.jobs) {
if (
(job.status === 'completed' || job.status === 'failed') &&
job.completedAt &&
job.completedAt < olderThan
) {
this.jobs.delete(id)
removed++
}
}
console.log(`[MemoryQueue] Cleanup: removed ${removed} old jobs`)
return removed
}
}

View File

@@ -0,0 +1,53 @@
/**
* 异步任务系统统一导出
*
* 提供平台级的异步任务处理能力避免Serverless超时。
*
* @module jobs
*
* @example
* ```typescript
* // 方式1使用全局队列推荐
* import { jobQueue } from '@/common/jobs'
*
* // 创建任务
* const job = await jobQueue.push('asl:screening', {
* projectId: 123,
* literatureIds: [1, 2, 3]
* })
*
* // 返回任务ID给前端
* res.send({ jobId: job.id })
*
* // 前端轮询任务状态
* const status = await jobQueue.getJob(job.id)
*
* // 注册处理函数(在应用启动时)
* jobQueue.process('asl:screening', async (job) => {
* // 处理任务
* for (const id of job.data.literatureIds) {
* await processLiterature(id)
* await jobQueue.updateProgress(job.id, ...)
* }
* // 返回结果
* return { success: true, processed: 3 }
* })
* ```
*/
export type { Job, JobStatus, JobHandler, JobQueue } from './types.js'
export { MemoryQueue } from './MemoryQueue.js'
export { JobFactory } from './JobFactory.js'
// Import for usage below
import { JobFactory } from './JobFactory.js'
/**
* 全局任务队列实例(推荐使用)
*
* 自动根据环境变量选择队列实现:
* - QUEUE_TYPE=memory: 内存队列(本地开发)
* - QUEUE_TYPE=database: 数据库队列(生产环境,待实现)
*/
export const jobQueue = JobFactory.getInstance()

View File

@@ -0,0 +1,89 @@
/**
* 异步任务系统类型定义
*
* 用于长时间任务的异步处理避免Serverless超时。
*/
/**
* 任务状态
*/
export type JobStatus = 'pending' | 'processing' | 'completed' | 'failed'
/**
* 任务对象
*/
export interface Job<T = any> {
/** 任务唯一ID */
id: string
/** 任务类型asl:screening, asl:extraction */
type: string
/** 任务数据 */
data: T
/** 任务状态 */
status: JobStatus
/** 任务进度0-100 */
progress: number
/** 任务结果(完成后) */
result?: any
/** 错误信息(失败时) */
error?: string
/** 创建时间 */
createdAt: Date
/** 更新时间 */
updatedAt: Date
/** 开始执行时间 */
startedAt?: Date
/** 完成时间 */
completedAt?: Date
}
/**
* 任务处理函数
*/
export type JobHandler<T = any> = (job: Job<T>) => Promise<any>
/**
* 任务队列接口
*/
export interface JobQueue {
/**
* 添加任务到队列
*/
push<T>(type: string, data: T): Promise<Job<T>>
/**
* 注册任务处理函数
*/
process<T>(type: string, handler: JobHandler<T>): void
/**
* 获取任务信息
*/
getJob(id: string): Promise<Job | null>
/**
* 更新任务进度
*/
updateProgress(id: string, progress: number): Promise<void>
/**
* 标记任务为完成
*/
completeJob(id: string, result: any): Promise<void>
/**
* 标记任务为失败
*/
failJob(id: string, error: string): Promise<void>
}