# utils/data_loader.R # 混合数据协议:自动识别 inline 数据 vs 预签名 URL # # 架构说明: # - R 服务不持有 OSS 密钥,遵循平台 OSS 存储规范 # - Node.js 后端通过 storage.getUrl() 生成预签名 URL # - R 服务直接访问预签名 URL 下载数据 # - 开发环境使用 ai-clinical-data-dev bucket,无需 Mock library(httr) library(jsonlite) library(glue) # 统一数据加载入口 load_input_data <- function(input) { # 检查输入结构 if (is.null(input$data_source)) { stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "请求缺少 data_source 字段")) } source_type <- input$data_source$type # "inline" | "oss" if (source_type == "inline") { # 方式1:内联 JSON 数据(< 2MB) message("[DataLoader] 使用 inline 数据模式") raw_data <- input$data_source$data # 调试:打印原始数据结构 message(glue("[DataLoader] 原始数据类型: {class(raw_data)}")) message(glue("[DataLoader] 原始数据长度: {length(raw_data)}")) # 安全转换:处理不同的 JSON 解析结果 if (is.data.frame(raw_data)) { # 已经是 data.frame df <- raw_data message("[DataLoader] 数据已是 data.frame") } else if (is.list(raw_data) && length(raw_data) > 0) { # 检查是行格式还是列格式 first_elem <- raw_data[[1]] if (is.list(first_elem) && !is.null(names(first_elem))) { # 行格式: [{"col1": val1, "col2": val2}, {...}, ...] # 每个元素是一行数据 message("[DataLoader] 检测到行格式数据 (JSON array of objects)") # 使用 jsonlite 的 bind_rows 功能 df <- tryCatch({ # 方法1:使用 do.call + rbind.data.frame df_list <- lapply(raw_data, function(row) { # 将每一行转为 data.frame as.data.frame(lapply(row, function(val) { if (is.null(val)) NA else val }), stringsAsFactors = FALSE) }) do.call(rbind, df_list) }, error = function(e) { # 方法2:如果上面失败,尝试 jsonlite 转换 message(glue("[DataLoader] rbind 失败,尝试 jsonlite 转换: {e$message}")) jsonlite::fromJSON(jsonlite::toJSON(raw_data), flatten = TRUE) }) } else if (!is.null(names(raw_data))) { # 列格式: {"col1": [...], "col2": [...]} message("[DataLoader] 检测到列格式数据 (JSON object with arrays)") df <- data.frame( lapply(raw_data, function(x) { if (is.list(x)) unlist(x) else x }), stringsAsFactors = FALSE ) } else { # 未知格式 message(glue("[DataLoader] 未知数据格式,first_elem class: {class(first_elem)}")) stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "无法识别的数据格式")) } } else { stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = paste("无法解析的数据类型:", class(raw_data), "或数据为空"))) } message(glue("[DataLoader] 转换后: {nrow(df)} 行, {ncol(df)} 列, 列名: {paste(names(df), collapse=', ')}")) return(df) } else if (source_type == "oss") { # 方式2:从预签名 URL 下载(2MB - 20MB) # 注意:oss_url 是由 Node.js 后端生成的预签名 URL,不是 oss_key oss_url <- input$data_source$oss_url if (is.null(oss_url) || oss_url == "") { stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "OSS 模式缺少 oss_url 字段")) } return(load_from_signed_url(oss_url)) } else { stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = paste("未知的 data_source.type:", source_type))) } } # 从预签名 URL 下载数据 # # @param url 预签名 URL(由 Node.js storage.getUrl() 生成) # @return data.frame # # 说明:开发环境和生产环境都使用真实 OSS # - 开发环境:ai-clinical-data-dev bucket # - 生产环境:ai-clinical-data bucket load_from_signed_url <- function(url) { message(glue("[DataLoader] 从预签名 URL 下载数据")) temp_file <- tempfile(fileext = ".csv") on.exit(unlink(temp_file)) tryCatch({ # 预签名 URL 自带认证信息,直接 GET 即可 response <- GET(url, write_disk(temp_file, overwrite = TRUE)) status <- status_code(response) if (status != 200) { # 403 通常表示签名过期 if (status == 403) { stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "预签名 URL 已过期,请重新上传数据")) } stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = paste("OSS 下载失败,HTTP 状态码:", status))) } # 检测文件类型并读取 content_type <- headers(response)$`content-type` if (grepl("csv", content_type, ignore.case = TRUE) || grepl("\\.csv", url, ignore.case = TRUE)) { return(read.csv(temp_file, stringsAsFactors = FALSE)) } else if (grepl("excel|xlsx", content_type, ignore.case = TRUE) || grepl("\\.xlsx?", url, ignore.case = TRUE)) { # 需要 readxl 包 if (!requireNamespace("readxl", quietly = TRUE)) { stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "Excel 文件需要 readxl 包")) } return(as.data.frame(readxl::read_excel(temp_file))) } else { # 默认尝试 CSV return(read.csv(temp_file, stringsAsFactors = FALSE)) } }, error = function(e) { if (grepl("make_error", deparse(e$call))) { stop(e) # 重新抛出已格式化的错误 } stop(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = paste("OSS 网络错误:", e$message))) }) }