#' @tool_code ST_ANOVA_ONE #' @name 单因素方差分析 #' @version 1.0.0 #' @description 三组及以上独立样本的均值差异比较(含事后多重比较) #' @author SSA-Pro Team library(glue) library(ggplot2) library(base64enc) run_analysis <- function(input) { # ===== 初始化 ===== logs <- c() log_add <- function(msg) { logs <<- c(logs, paste0("[", Sys.time(), "] ", msg)) } on.exit({}, add = TRUE) # ===== 数据加载 ===== log_add("开始加载输入数据") df <- tryCatch( load_input_data(input), error = function(e) { log_add(paste("数据加载失败:", e$message)) return(NULL) } ) if (is.null(df)) { return(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "数据加载失败")) } log_add(glue("数据加载成功: {nrow(df)} 行, {ncol(df)} 列")) p <- input$params guardrails_cfg <- input$guardrails group_var <- p$group_var value_var <- p$value_var # ===== 参数校验 ===== if (!(group_var %in% names(df))) { return(make_error(ERROR_CODES$E001_COLUMN_NOT_FOUND, col = group_var)) } if (!(value_var %in% names(df))) { return(make_error(ERROR_CODES$E001_COLUMN_NOT_FOUND, col = value_var)) } # ===== 数据清洗 ===== original_rows <- nrow(df) df <- df[!is.na(df[[group_var]]) & trimws(as.character(df[[group_var]])) != "", ] df <- df[!is.na(df[[value_var]]), ] removed_rows <- original_rows - nrow(df) if (removed_rows > 0) { log_add(glue("数据清洗: 移除 {removed_rows} 行缺失值 (剩余 {nrow(df)} 行)")) } # 确保数值型 if (!is.numeric(df[[value_var]])) { df[[value_var]] <- as.numeric(as.character(df[[value_var]])) df <- df[!is.na(df[[value_var]]), ] } # 分组信息 df[[group_var]] <- as.factor(df[[group_var]]) groups <- levels(df[[group_var]]) n_groups <- length(groups) if (n_groups < 3) { return(make_error(ERROR_CODES$E003_INSUFFICIENT_GROUPS, col = group_var, expected = "3+", actual = n_groups)) } log_add(glue("分组变量 '{group_var}' 有 {n_groups} 个水平: {paste(groups, collapse=', ')}")) # ===== 护栏检查 ===== guardrail_results <- list() warnings_list <- c() use_kruskal <- FALSE # 每组样本量检查 group_sizes <- table(df[[group_var]]) min_group_n <- min(group_sizes) sample_check <- check_sample_size(min_group_n, min_required = 3, action = ACTION_BLOCK) guardrail_results <- c(guardrail_results, list(sample_check)) log_add(glue("最小组样本量: {min_group_n}, {sample_check$reason}")) # 正态性检验(每组) if (isTRUE(guardrails_cfg$check_normality)) { log_add("执行正态性检验") normality_failed <- FALSE for (g in groups) { vals <- df[df[[group_var]] == g, value_var] if (length(vals) >= 3 && length(vals) <= 5000) { norm_check <- check_normality(vals, alpha = 0.05, action = ACTION_SWITCH, action_target = "Kruskal-Wallis") guardrail_results <- c(guardrail_results, list(norm_check)) log_add(glue("组[{g}] 正态性: p = {round(norm_check$p_value, 4)}, {norm_check$reason}")) if (!norm_check$passed) normality_failed <- TRUE } } if (normality_failed) use_kruskal <- TRUE } # 方差齐性检验 (Levene) if (!use_kruskal) { tryCatch({ homo_check <- check_homogeneity(df, group_var, value_var, alpha = 0.05, action = ACTION_WARN) guardrail_results <- c(guardrail_results, list(homo_check)) log_add(glue("方差齐性 (Levene): p = {round(homo_check$p_value, 4)}, {homo_check$reason}")) if (!homo_check$passed) { warnings_list <- c(warnings_list, "方差不齐性,使用 Welch 校正的 ANOVA") } }, error = function(e) { log_add(paste("方差齐性检验失败:", e$message)) }) } guardrail_status <- run_guardrail_chain(guardrail_results) if (guardrail_status$status == "blocked") { return(list(status = "blocked", message = guardrail_status$reason, trace_log = logs)) } if (guardrail_status$status == "switch") { use_kruskal <- TRUE warnings_list <- c(warnings_list, guardrail_status$reason) log_add(glue("正态性不满足,切换为 Kruskal-Wallis 检验")) } if (length(guardrail_status$warnings) > 0) { warnings_list <- c(warnings_list, guardrail_status$warnings) } # ===== 各组描述统计 ===== group_stats <- lapply(groups, function(g) { vals <- df[df[[group_var]] == g, value_var] list( group = g, n = length(vals), mean = round(mean(vals), 3), sd = round(sd(vals), 3), median = round(median(vals), 3), q1 = round(quantile(vals, 0.25), 3), q3 = round(quantile(vals, 0.75), 3) ) }) # ===== 核心计算 ===== if (use_kruskal) { log_add("执行 Kruskal-Wallis 检验") formula_obj <- as.formula(paste(value_var, "~", group_var)) result <- kruskal.test(formula_obj, data = df) method_used <- "Kruskal-Wallis rank sum test" stat_name <- "H" # 效应量: η² (eta-squared approximation for Kruskal-Wallis) eta_sq <- (result$statistic - n_groups + 1) / (nrow(df) - n_groups) eta_sq <- max(0, as.numeric(eta_sq)) output_results <- list( method = method_used, statistic = jsonlite::unbox(as.numeric(result$statistic)), statistic_name = stat_name, df = jsonlite::unbox(as.numeric(result$parameter)), p_value = jsonlite::unbox(as.numeric(result$p.value)), p_value_fmt = format_p_value(result$p.value), effect_size = list( eta_squared = jsonlite::unbox(round(eta_sq, 4)), interpretation = interpret_eta_sq(eta_sq) ), group_stats = group_stats ) # 事后多重比较: Dunn test (pairwise Wilcoxon) posthoc_result <- tryCatch({ pw <- pairwise.wilcox.test(df[[value_var]], df[[group_var]], p.adjust.method = "bonferroni") pw }, error = function(e) { log_add(paste("Dunn 事后检验失败:", e$message)) NULL }) } else { log_add("执行单因素 ANOVA") formula_obj <- as.formula(paste(value_var, "~", group_var)) # 检查方差齐性决定使用经典 ANOVA 还是 Welch ANOVA use_welch <- any(grepl("方差不齐性", warnings_list)) if (use_welch) { result <- oneway.test(formula_obj, data = df, var.equal = FALSE) method_used <- "One-way ANOVA (Welch correction)" } else { aov_result <- aov(formula_obj, data = df) result_summary <- summary(aov_result) result <- list( statistic = result_summary[[1]]$`F value`[1], parameter = c(result_summary[[1]]$Df[1], result_summary[[1]]$Df[2]), p.value = result_summary[[1]]$`Pr(>F)`[1] ) method_used <- "One-way ANOVA" } stat_name <- "F" # 效应量: η² (eta-squared) ss_between <- sum(tapply(df[[value_var]], df[[group_var]], function(x) length(x) * (mean(x) - mean(df[[value_var]]))^2)) ss_total <- sum((df[[value_var]] - mean(df[[value_var]]))^2) eta_sq <- ss_between / ss_total f_val <- if (is.list(result)) result$statistic else as.numeric(result$statistic) df_val <- if (is.list(result) && !is.null(result$parameter)) { if (length(result$parameter) == 2) result$parameter else as.numeric(result$parameter) } else { as.numeric(result$parameter) } p_val <- if (is.list(result)) result$p.value else as.numeric(result$p.value) output_results <- list( method = method_used, statistic = jsonlite::unbox(as.numeric(f_val)), statistic_name = stat_name, df = if (length(df_val) == 2) as.numeric(df_val) else jsonlite::unbox(as.numeric(df_val)), p_value = jsonlite::unbox(as.numeric(p_val)), p_value_fmt = format_p_value(p_val), effect_size = list( eta_squared = jsonlite::unbox(round(eta_sq, 4)), interpretation = interpret_eta_sq(eta_sq) ), group_stats = group_stats ) # 事后多重比较: Tukey HSD (if classic ANOVA) or pairwise t-test posthoc_result <- tryCatch({ if (use_welch) { pairwise.t.test(df[[value_var]], df[[group_var]], p.adjust.method = "bonferroni", pool.sd = FALSE) } else { TukeyHSD(aov(formula_obj, data = df)) } }, error = function(e) { log_add(paste("事后多重比较失败:", e$message)) NULL }) } log_add(glue("{stat_name} = {round(as.numeric(output_results$statistic), 3)}, P = {round(as.numeric(output_results$p_value), 4)}")) # 整理事后比较结果 posthoc_pairs <- NULL if (!is.null(posthoc_result)) { if (inherits(posthoc_result, "TukeyHSD")) { tukey_df <- as.data.frame(posthoc_result[[1]]) posthoc_pairs <- lapply(seq_len(nrow(tukey_df)), function(i) { list( comparison = rownames(tukey_df)[i], diff = round(tukey_df$diff[i], 3), ci_lower = round(tukey_df$lwr[i], 3), ci_upper = round(tukey_df$upr[i], 3), p_adj = round(tukey_df$`p adj`[i], 4), p_adj_fmt = format_p_value(tukey_df$`p adj`[i]), significant = tukey_df$`p adj`[i] < 0.05 ) }) } else if (inherits(posthoc_result, "pairwise.htest")) { p_matrix <- posthoc_result$p.value for (i in seq_len(nrow(p_matrix))) { for (j in seq_len(ncol(p_matrix))) { if (!is.na(p_matrix[i, j])) { if (is.null(posthoc_pairs)) posthoc_pairs <- list() posthoc_pairs[[length(posthoc_pairs) + 1]] <- list( comparison = paste(rownames(p_matrix)[i], "vs", colnames(p_matrix)[j]), p_adj = round(p_matrix[i, j], 4), p_adj_fmt = format_p_value(p_matrix[i, j]), significant = p_matrix[i, j] < 0.05 ) } } } } } output_results$posthoc <- posthoc_pairs # ===== 生成图表 ===== log_add("生成箱线图") plot_base64 <- tryCatch({ generate_anova_boxplot(df, group_var, value_var) }, error = function(e) { log_add(paste("图表生成失败:", e$message)) NULL }) # ===== 生成可复现代码 ===== original_filename <- if (!is.null(input$original_filename) && nchar(input$original_filename) > 0) { input$original_filename } else { "data.csv" } reproducible_code <- glue(' # SSA-Pro 自动生成代码 # 工具: 单因素方差分析 # 时间: {Sys.time()} # ================================ library(ggplot2) # 数据准备 df <- read.csv("{original_filename}") group_var <- "{group_var}" value_var <- "{value_var}" # 单因素 ANOVA result <- aov(as.formula(paste(value_var, "~", group_var)), data = df) summary(result) # 事后多重比较 (Tukey HSD) TukeyHSD(result) # 可视化 ggplot(df, aes(x = .data[[group_var]], y = .data[[value_var]], fill = .data[[group_var]])) + geom_boxplot(alpha = 0.7) + theme_minimal() + labs(title = paste("Distribution of", value_var, "by", group_var)) ') # ===== 构建 report_blocks ===== blocks <- list() # Block 1: 各组描述统计 desc_headers <- c("组别", "N", "均值", "标准差", "中位数") desc_rows <- lapply(group_stats, function(gs) { c(gs$group, as.character(gs$n), as.character(gs$mean), as.character(gs$sd), as.character(gs$median)) }) blocks[[length(blocks) + 1]] <- make_table_block(desc_headers, desc_rows, title = "各组描述统计") # Block 2: 检验结果 kv_items <- list( "方法" = method_used, "统计量" = paste0(stat_name, " = ", round(as.numeric(output_results$statistic), 3)), "P 值" = output_results$p_value_fmt, "η²" = as.character(output_results$effect_size$eta_squared), "效应量解释" = output_results$effect_size$interpretation ) blocks[[length(blocks) + 1]] <- make_kv_block(kv_items, title = "检验结果") # Block 3: 事后多重比较 if (!is.null(posthoc_pairs) && length(posthoc_pairs) > 0) { ph_headers <- c("比较", "P 值 (校正)", "显著性") ph_rows <- lapply(posthoc_pairs, function(pair) { sig <- if (pair$significant) "*" else "" c(pair$comparison, pair$p_adj_fmt, sig) }) blocks[[length(blocks) + 1]] <- make_table_block(ph_headers, ph_rows, title = "事后多重比较", footnote = if (use_kruskal) "Bonferroni 校正的 Wilcoxon 检验" else "Tukey HSD / Bonferroni 校正") } # Block 4: 箱线图 if (!is.null(plot_base64)) { blocks[[length(blocks) + 1]] <- make_image_block(plot_base64, title = paste(value_var, "by", group_var), alt = paste("箱线图:", value_var, "按", group_var, "分组")) } # Block 5: 结论摘要 p_val_num <- as.numeric(output_results$p_value) sig_text <- if (p_val_num < 0.05) "各组间存在统计学显著差异" else "各组间差异无统计学意义" conclusion <- glue("{method_used}: {stat_name} = {round(as.numeric(output_results$statistic), 3)}, P {output_results$p_value_fmt}。{sig_text}(η² = {output_results$effect_size$eta_squared},{output_results$effect_size$interpretation}效应)。") if (!is.null(posthoc_pairs) && p_val_num < 0.05) { sig_pairs <- Filter(function(x) x$significant, posthoc_pairs) if (length(sig_pairs) > 0) { pair_names <- sapply(sig_pairs, function(x) x$comparison) conclusion <- paste0(conclusion, glue("\n\n事后比较显示以下组间差异显著:{paste(pair_names, collapse = '、')}。")) } } blocks[[length(blocks) + 1]] <- make_markdown_block(conclusion, title = "结论摘要") # ===== 返回结果 ===== log_add("分析完成") return(list( status = "success", message = "分析完成", warnings = if (length(warnings_list) > 0) warnings_list else NULL, results = output_results, report_blocks = blocks, plots = if (!is.null(plot_base64)) list(plot_base64) else list(), trace_log = logs, reproducible_code = as.character(reproducible_code) )) } # η² 效应量解释 interpret_eta_sq <- function(eta_sq) { if (eta_sq < 0.01) return("微小") if (eta_sq < 0.06) return("小") if (eta_sq < 0.14) return("中等") return("大") } # NULL 合并运算符 `%||%` <- function(x, y) if (is.null(x)) y else x # 辅助函数:ANOVA 箱线图 generate_anova_boxplot <- function(df, group_var, value_var) { p <- ggplot(df, aes(x = .data[[group_var]], y = .data[[value_var]], fill = .data[[group_var]])) + geom_boxplot(alpha = 0.7, outlier.shape = 21) + stat_summary(fun = mean, geom = "point", shape = 18, size = 3, color = "red") + theme_minimal() + labs( title = paste("Distribution of", value_var, "by", group_var), x = group_var, y = value_var ) + scale_fill_brewer(palette = "Set2") + theme(legend.position = "none") tmp_file <- tempfile(fileext = ".png") ggsave(tmp_file, p, width = max(7, length(unique(df[[group_var]])) * 1.5), height = 5, dpi = 100) base64_str <- base64encode(tmp_file) unlink(tmp_file) return(paste0("data:image/png;base64,", base64_str)) }