Phase 2A: WorkflowPlannerService, WorkflowExecutorService, Python data quality, 6 bug fixes, DescriptiveResultView, multi-step R code/Word export, MVP UI reuse. V11 UI: Gemini-style, multi-task, single-page scroll, Word export. Architecture: Block-based rendering consensus (4 block types). New R tools: chi_square, correlation, descriptive, logistic_binary, mann_whitney, t_test_paired. Docs: dev summary, block-based plan, status updates, task list v2.0. Co-authored-by: Cursor <cursoragent@cursor.com>
165 lines
5.9 KiB
R
165 lines
5.9 KiB
R
# utils/data_loader.R
|
||
# 混合数据协议:自动识别 inline 数据 vs 预签名 URL
|
||
#
|
||
# 架构说明:
|
||
# - R 服务不持有 OSS 密钥,遵循平台 OSS 存储规范
|
||
# - Node.js 后端通过 storage.getUrl() 生成预签名 URL
|
||
# - R 服务直接访问预签名 URL 下载数据
|
||
# - 开发环境使用 ai-clinical-data-dev bucket,无需 Mock
|
||
|
||
library(httr)
|
||
library(jsonlite)
|
||
library(glue)
|
||
|
||
# 统一数据加载入口
|
||
load_input_data <- function(input) {
|
||
# 检查输入结构
|
||
if (is.null(input$data_source)) {
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = "请求缺少 data_source 字段"))
|
||
}
|
||
|
||
source_type <- input$data_source$type # "inline" | "oss"
|
||
|
||
if (source_type == "inline") {
|
||
# 方式1:内联 JSON 数据(< 2MB)
|
||
message("[DataLoader] 使用 inline 数据模式")
|
||
|
||
raw_data <- input$data_source$data
|
||
|
||
# 调试:打印原始数据结构
|
||
message(glue("[DataLoader] 原始数据类型: {class(raw_data)}"))
|
||
message(glue("[DataLoader] 原始数据长度: {length(raw_data)}"))
|
||
|
||
# 安全转换:处理不同的 JSON 解析结果
|
||
if (is.data.frame(raw_data)) {
|
||
# 已经是 data.frame
|
||
df <- raw_data
|
||
message("[DataLoader] 数据已是 data.frame")
|
||
|
||
} else if (is.list(raw_data) && length(raw_data) > 0) {
|
||
# 检查是行格式还是列格式
|
||
first_elem <- raw_data[[1]]
|
||
|
||
if (is.list(first_elem) && !is.null(names(first_elem))) {
|
||
# 行格式: [{"col1": val1, "col2": val2}, {...}, ...]
|
||
# 每个元素是一行数据
|
||
message("[DataLoader] 检测到行格式数据 (JSON array of objects)")
|
||
|
||
# 使用 jsonlite 的 bind_rows 功能
|
||
df <- tryCatch({
|
||
# 方法1:使用 do.call + rbind.data.frame
|
||
df_list <- lapply(raw_data, function(row) {
|
||
# 将每一行转为 data.frame
|
||
as.data.frame(lapply(row, function(val) {
|
||
if (is.null(val)) NA else val
|
||
}), stringsAsFactors = FALSE)
|
||
})
|
||
do.call(rbind, df_list)
|
||
}, error = function(e) {
|
||
# 方法2:如果上面失败,尝试 jsonlite 转换
|
||
message(glue("[DataLoader] rbind 失败,尝试 jsonlite 转换: {e$message}"))
|
||
jsonlite::fromJSON(jsonlite::toJSON(raw_data), flatten = TRUE)
|
||
})
|
||
|
||
} else if (!is.null(names(raw_data))) {
|
||
# 列格式: {"col1": [...], "col2": [...]}
|
||
message("[DataLoader] 检测到列格式数据 (JSON object with arrays)")
|
||
df <- data.frame(
|
||
lapply(raw_data, function(x) {
|
||
if (is.list(x)) unlist(x) else x
|
||
}),
|
||
stringsAsFactors = FALSE
|
||
)
|
||
|
||
} else {
|
||
# 未知格式
|
||
message(glue("[DataLoader] 未知数据格式,first_elem class: {class(first_elem)}"))
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = "无法识别的数据格式"))
|
||
}
|
||
|
||
} else {
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = paste("无法解析的数据类型:", class(raw_data), "或数据为空")))
|
||
}
|
||
|
||
message(glue("[DataLoader] 转换后: {nrow(df)} 行, {ncol(df)} 列, 列名: {paste(names(df), collapse=', ')}"))
|
||
return(df)
|
||
|
||
} else if (source_type == "oss") {
|
||
# 方式2:从预签名 URL 下载(2MB - 20MB)
|
||
# 注意:oss_url 是由 Node.js 后端生成的预签名 URL,不是 oss_key
|
||
oss_url <- input$data_source$oss_url
|
||
|
||
if (is.null(oss_url) || oss_url == "") {
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = "OSS 模式缺少 oss_url 字段"))
|
||
}
|
||
|
||
return(load_from_signed_url(oss_url))
|
||
|
||
} else {
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = paste("未知的 data_source.type:", source_type)))
|
||
}
|
||
}
|
||
|
||
# 从预签名 URL 下载数据
|
||
#
|
||
# @param url 预签名 URL(由 Node.js storage.getUrl() 生成)
|
||
# @return data.frame
|
||
#
|
||
# 说明:开发环境和生产环境都使用真实 OSS
|
||
# - 开发环境:ai-clinical-data-dev bucket
|
||
# - 生产环境:ai-clinical-data bucket
|
||
load_from_signed_url <- function(url) {
|
||
|
||
message(glue("[DataLoader] 从预签名 URL 下载数据"))
|
||
|
||
temp_file <- tempfile(fileext = ".csv")
|
||
on.exit(unlink(temp_file))
|
||
|
||
tryCatch({
|
||
# 预签名 URL 自带认证信息,直接 GET 即可
|
||
response <- GET(url, write_disk(temp_file, overwrite = TRUE))
|
||
|
||
status <- status_code(response)
|
||
if (status != 200) {
|
||
# 403 通常表示签名过期
|
||
if (status == 403) {
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = "预签名 URL 已过期,请重新上传数据"))
|
||
}
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = paste("OSS 下载失败,HTTP 状态码:", status)))
|
||
}
|
||
|
||
# 检测文件类型并读取
|
||
content_type <- headers(response)$`content-type`
|
||
|
||
if (grepl("csv", content_type, ignore.case = TRUE) ||
|
||
grepl("\\.csv", url, ignore.case = TRUE)) {
|
||
return(read.csv(temp_file, stringsAsFactors = FALSE))
|
||
} else if (grepl("excel|xlsx", content_type, ignore.case = TRUE) ||
|
||
grepl("\\.xlsx?", url, ignore.case = TRUE)) {
|
||
# 需要 readxl 包
|
||
if (!requireNamespace("readxl", quietly = TRUE)) {
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = "Excel 文件需要 readxl 包"))
|
||
}
|
||
return(as.data.frame(readxl::read_excel(temp_file)))
|
||
} else {
|
||
# 默认尝试 CSV
|
||
return(read.csv(temp_file, stringsAsFactors = FALSE))
|
||
}
|
||
|
||
}, error = function(e) {
|
||
if (grepl("make_error", deparse(e$call))) {
|
||
stop(e) # 重新抛出已格式化的错误
|
||
}
|
||
stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR,
|
||
details = paste("OSS 网络错误:", e$message)))
|
||
})
|
||
}
|