Files
HaHafeng 428a22adf2 feat(ssa): Complete Phase 2A frontend integration - multi-step workflow end-to-end
Phase 2A: WorkflowPlannerService, WorkflowExecutorService, Python data quality, 6 bug fixes, DescriptiveResultView, multi-step R code/Word export, MVP UI reuse. V11 UI: Gemini-style, multi-task, single-page scroll, Word export. Architecture: Block-based rendering consensus (4 block types). New R tools: chi_square, correlation, descriptive, logistic_binary, mann_whitney, t_test_paired. Docs: dev summary, block-based plan, status updates, task list v2.0.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-20 23:09:27 +08:00

165 lines
5.9 KiB
R
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# utils/data_loader.R
# 混合数据协议:自动识别 inline 数据 vs 预签名 URL
#
# 架构说明:
# - R 服务不持有 OSS 密钥,遵循平台 OSS 存储规范
# - Node.js 后端通过 storage.getUrl() 生成预签名 URL
# - R 服务直接访问预签名 URL 下载数据
# - 开发环境使用 ai-clinical-data-dev bucket无需 Mock
library(httr)
library(jsonlite)
library(glue)
# 统一数据加载入口
load_input_data <- function(input) {
# 检查输入结构
if (is.null(input$data_source)) {
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = "请求缺少 data_source 字段"))
}
source_type <- input$data_source$type # "inline" | "oss"
if (source_type == "inline") {
# 方式1内联 JSON 数据(< 2MB
message("[DataLoader] 使用 inline 数据模式")
raw_data <- input$data_source$data
# 调试:打印原始数据结构
message(glue("[DataLoader] 原始数据类型: {class(raw_data)}"))
message(glue("[DataLoader] 原始数据长度: {length(raw_data)}"))
# 安全转换:处理不同的 JSON 解析结果
if (is.data.frame(raw_data)) {
# 已经是 data.frame
df <- raw_data
message("[DataLoader] 数据已是 data.frame")
} else if (is.list(raw_data) && length(raw_data) > 0) {
# 检查是行格式还是列格式
first_elem <- raw_data[[1]]
if (is.list(first_elem) && !is.null(names(first_elem))) {
# 行格式: [{"col1": val1, "col2": val2}, {...}, ...]
# 每个元素是一行数据
message("[DataLoader] 检测到行格式数据 (JSON array of objects)")
# 使用 jsonlite 的 bind_rows 功能
df <- tryCatch({
# 方法1使用 do.call + rbind.data.frame
df_list <- lapply(raw_data, function(row) {
# 将每一行转为 data.frame
as.data.frame(lapply(row, function(val) {
if (is.null(val)) NA else val
}), stringsAsFactors = FALSE)
})
do.call(rbind, df_list)
}, error = function(e) {
# 方法2如果上面失败尝试 jsonlite 转换
message(glue("[DataLoader] rbind 失败,尝试 jsonlite 转换: {e$message}"))
jsonlite::fromJSON(jsonlite::toJSON(raw_data), flatten = TRUE)
})
} else if (!is.null(names(raw_data))) {
# 列格式: {"col1": [...], "col2": [...]}
message("[DataLoader] 检测到列格式数据 (JSON object with arrays)")
df <- data.frame(
lapply(raw_data, function(x) {
if (is.list(x)) unlist(x) else x
}),
stringsAsFactors = FALSE
)
} else {
# 未知格式
message(glue("[DataLoader] 未知数据格式first_elem class: {class(first_elem)}"))
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = "无法识别的数据格式"))
}
} else {
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = paste("无法解析的数据类型:", class(raw_data), "或数据为空")))
}
message(glue("[DataLoader] 转换后: {nrow(df)} 行, {ncol(df)} 列, 列名: {paste(names(df), collapse=', ')}"))
return(df)
} else if (source_type == "oss") {
# 方式2从预签名 URL 下载2MB - 20MB
# 注意oss_url 是由 Node.js 后端生成的预签名 URL不是 oss_key
oss_url <- input$data_source$oss_url
if (is.null(oss_url) || oss_url == "") {
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = "OSS 模式缺少 oss_url 字段"))
}
return(load_from_signed_url(oss_url))
} else {
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = paste("未知的 data_source.type:", source_type)))
}
}
# 从预签名 URL 下载数据
#
# @param url 预签名 URL由 Node.js storage.getUrl() 生成)
# @return data.frame
#
# 说明:开发环境和生产环境都使用真实 OSS
# - 开发环境ai-clinical-data-dev bucket
# - 生产环境ai-clinical-data bucket
load_from_signed_url <- function(url) {
message(glue("[DataLoader] 从预签名 URL 下载数据"))
temp_file <- tempfile(fileext = ".csv")
on.exit(unlink(temp_file))
tryCatch({
# 预签名 URL 自带认证信息,直接 GET 即可
response <- GET(url, write_disk(temp_file, overwrite = TRUE))
status <- status_code(response)
if (status != 200) {
# 403 通常表示签名过期
if (status == 403) {
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = "预签名 URL 已过期,请重新上传数据"))
}
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = paste("OSS 下载失败HTTP 状态码:", status)))
}
# 检测文件类型并读取
content_type <- headers(response)$`content-type`
if (grepl("csv", content_type, ignore.case = TRUE) ||
grepl("\\.csv", url, ignore.case = TRUE)) {
return(read.csv(temp_file, stringsAsFactors = FALSE))
} else if (grepl("excel|xlsx", content_type, ignore.case = TRUE) ||
grepl("\\.xlsx?", url, ignore.case = TRUE)) {
# 需要 readxl 包
if (!requireNamespace("readxl", quietly = TRUE)) {
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = "Excel 文件需要 readxl 包"))
}
return(as.data.frame(readxl::read_excel(temp_file)))
} else {
# 默认尝试 CSV
return(read.csv(temp_file, stringsAsFactors = FALSE))
}
}, error = function(e) {
if (grepl("make_error", deparse(e$call))) {
stop(e) # 重新抛出已格式化的错误
}
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
details = paste("OSS 网络错误:", e$message)))
})
}