#' @tool_code ST_CHI_SQUARE #' @name 卡方检验 #' @version 1.0.0 #' @description 两个分类变量的独立性检验 #' @author SSA-Pro Team library(glue) library(ggplot2) library(base64enc) run_analysis <- function(input) { # ===== 初始化 ===== logs <- c() log_add <- function(msg) { logs <<- c(logs, paste0("[", Sys.time(), "] ", msg)) } on.exit({}, add = TRUE) # ===== 数据加载 ===== log_add("开始加载输入数据") df <- tryCatch( load_input_data(input), error = function(e) { log_add(paste("数据加载失败:", e$message)) return(NULL) } ) if (is.null(df)) { return(make_error(ERROR_CODES$E100_INTERNAL_ERROR, details = "数据加载失败")) } log_add(glue("数据加载成功: {nrow(df)} 行, {ncol(df)} 列")) p <- input$params var1 <- p$var1 var2 <- p$var2 # ===== 参数校验 ===== if (!(var1 %in% names(df))) { return(make_error(ERROR_CODES$E001_COLUMN_NOT_FOUND, col = var1)) } if (!(var2 %in% names(df))) { return(make_error(ERROR_CODES$E001_COLUMN_NOT_FOUND, col = var2)) } # ===== 数据清洗 ===== original_rows <- nrow(df) df <- df[!is.na(df[[var1]]) & trimws(as.character(df[[var1]])) != "", ] df <- df[!is.na(df[[var2]]) & trimws(as.character(df[[var2]])) != "", ] removed_rows <- original_rows - nrow(df) if (removed_rows > 0) { log_add(glue("数据清洗: 移除 {removed_rows} 行缺失值 (剩余 {nrow(df)} 行)")) } # ===== 护栏检查 ===== guardrail_results <- list() warnings_list <- c() # 样本量检查 sample_check <- check_sample_size(nrow(df), min_required = 10, action = ACTION_BLOCK) guardrail_results <- c(guardrail_results, list(sample_check)) log_add(glue("样本量检查: N = {nrow(df)}, {sample_check$reason}")) guardrail_status <- run_guardrail_chain(guardrail_results) if (guardrail_status$status == "blocked") { return(list( status = "blocked", message = guardrail_status$reason, trace_log = logs )) } # ===== 构建列联表 ===== contingency_table <- table(df[[var1]], df[[var2]]) log_add(glue("列联表维度: {nrow(contingency_table)} x {ncol(contingency_table)}")) # 检查列联表有效性 if (nrow(contingency_table) < 2 || ncol(contingency_table) < 2) { return(make_error(ERROR_CODES$E003_INSUFFICIENT_GROUPS, col = paste(var1, "或", var2), expected = 2, actual = min(nrow(contingency_table), ncol(contingency_table)))) } # ===== 期望频数检查(决定使用卡方还是 Fisher) ===== expected <- chisq.test(contingency_table)$expected low_expected_count <- sum(expected < 5) total_cells <- length(expected) low_expected_pct <- low_expected_count / total_cells use_fisher <- FALSE is_2x2 <- nrow(contingency_table) == 2 && ncol(contingency_table) == 2 if (low_expected_pct > 0.2) { warnings_list <- c(warnings_list, glue("期望频数 < 5 的格子占 {round(low_expected_pct * 100, 1)}%")) if (is_2x2) { use_fisher <- TRUE log_add("2x2 表且期望频数不足,自动切换为 Fisher 精确检验") } else { log_add("期望频数不足,但非 2x2 表,继续使用卡方检验(结果需谨慎解读)") } } # ===== 核心计算 ===== if (use_fisher) { log_add("执行 Fisher 精确检验") result <- fisher.test(contingency_table) method_used <- "Fisher's Exact Test" output_results <- list( method = method_used, p_value = jsonlite::unbox(as.numeric(result$p.value)), p_value_fmt = format_p_value(result$p.value), odds_ratio = if (!is.null(result$estimate)) jsonlite::unbox(as.numeric(result$estimate)) else NULL, conf_int = if (!is.null(result$conf.int)) as.numeric(result$conf.int) else NULL ) } else { log_add("执行 Pearson 卡方检验") result <- chisq.test(contingency_table, correct = is_2x2) # 2x2表使用Yates连续性校正 method_used <- if (is_2x2) "Pearson's Chi-squared test with Yates' continuity correction" else "Pearson's Chi-squared test" # 计算 Cramér's V n <- sum(contingency_table) k <- min(nrow(contingency_table), ncol(contingency_table)) cramers_v <- sqrt(result$statistic / (n * (k - 1))) # 效应量解释 v_interpretation <- if (cramers_v < 0.1) "微小" else if (cramers_v < 0.3) "小" else if (cramers_v < 0.5) "中等" else "大" log_add(glue("χ² = {round(result$statistic, 3)}, df = {result$parameter}, p = {round(result$p.value, 4)}, Cramér's V = {round(cramers_v, 3)}")) output_results <- list( method = method_used, statistic = jsonlite::unbox(as.numeric(result$statistic)), df = jsonlite::unbox(as.numeric(result$parameter)), p_value = jsonlite::unbox(as.numeric(result$p.value)), p_value_fmt = format_p_value(result$p.value), effect_size = list( cramers_v = jsonlite::unbox(round(as.numeric(cramers_v), 4)), interpretation = v_interpretation ) ) } # 添加列联表信息(精简版,不含原始数据) # 将 table 转为纯数值矩阵以便 JSON 序列化 observed_matrix <- matrix( as.numeric(contingency_table), nrow = nrow(contingency_table), ncol = ncol(contingency_table), dimnames = list(rownames(contingency_table), colnames(contingency_table)) ) output_results$contingency_table <- list( row_var = var1, col_var = var2, row_levels = as.character(rownames(contingency_table)), col_levels = as.character(colnames(contingency_table)), observed = observed_matrix, row_totals = as.numeric(rowSums(contingency_table)), col_totals = as.numeric(colSums(contingency_table)), grand_total = jsonlite::unbox(sum(contingency_table)) ) # ===== 生成图表 ===== log_add("生成马赛克图") plot_base64 <- tryCatch({ generate_mosaic_plot(contingency_table, var1, var2) }, error = function(e) { log_add(paste("图表生成失败:", e$message)) NULL }) # ===== 生成可复现代码 ===== original_filename <- if (!is.null(input$original_filename) && nchar(input$original_filename) > 0) { input$original_filename } else { "data.csv" } reproducible_code <- glue(' # SSA-Pro 自动生成代码 # 工具: 卡方检验 # 时间: {Sys.time()} # ================================ library(ggplot2) # 数据准备 df <- read.csv("{original_filename}") var1 <- "{var1}" var2 <- "{var2}" # 数据清洗 df <- df[!is.na(df[[var1]]) & !is.na(df[[var2]]), ] # 构建列联表 contingency_table <- table(df[[var1]], df[[var2]]) print(contingency_table) # 卡方检验 result <- chisq.test(contingency_table) print(result) # 计算 Cramer V (效应量) n <- sum(contingency_table) k <- min(nrow(contingency_table), ncol(contingency_table)) cramers_v <- sqrt(result$statistic / (n * (k - 1))) cat("Cramer V =", round(cramers_v, 3), "\\n") # 可视化(马赛克图) mosaicplot(contingency_table, main = "Mosaic Plot", color = TRUE) ') # ===== 构建 report_blocks ===== # Block 1: 列联表 table_headers <- c(var1, as.character(colnames(contingency_table))) table_rows <- lapply(seq_len(nrow(contingency_table)), function(i) { c(as.character(rownames(contingency_table)[i]), as.character(contingency_table[i, ])) }) blocks <- list( make_table_block(table_headers, table_rows, title = "列联表") ) # Block 2: 检验结果键值对 if (use_fisher) { kv_items <- list( "方法" = method_used, "P 值" = output_results$p_value_fmt ) if (!is.null(output_results$odds_ratio)) { kv_items[["比值比"]] <- as.character(round(as.numeric(output_results$odds_ratio), 4)) } if (!is.null(output_results$conf_int)) { kv_items[["95% 置信区间"]] <- sprintf("[%.4f, %.4f]", output_results$conf_int[1], output_results$conf_int[2]) } } else { kv_items <- list( "方法" = method_used, "χ² 统计量" = as.character(round(as.numeric(output_results$statistic), 4)), "自由度" = as.character(output_results$df), "P 值" = output_results$p_value_fmt, "Cramér's V" = as.character(output_results$effect_size$cramers_v), "效应量解释" = output_results$effect_size$interpretation ) } blocks[[length(blocks) + 1]] <- make_kv_block(kv_items, title = "检验结果") # Block 3: 马赛克图(若有) if (!is.null(plot_base64)) { blocks[[length(blocks) + 1]] <- make_image_block(plot_base64, title = "马赛克图") } # Block 4: 结论摘要 p_val <- as.numeric(output_results$p_value) conclusion <- if (p_val < 0.05) { glue("在 α=0.05 水平下,{var1} 与 {var2} 之间存在显著关联(P {output_results$p_value_fmt})。") } else { glue("在 α=0.05 水平下,未发现 {var1} 与 {var2} 之间的显著关联(P {output_results$p_value_fmt})。") } if (!use_fisher) { conclusion <- paste0(conclusion, " 效应量为", output_results$effect_size$interpretation, "(Cramér's V = ", output_results$effect_size$cramers_v, ")。") } else if (!is.null(output_results$odds_ratio)) { conclusion <- paste0(conclusion, " 比值比 = ", round(as.numeric(output_results$odds_ratio), 4), "。") } blocks[[length(blocks) + 1]] <- make_markdown_block(conclusion, title = "结论摘要") # ===== 返回结果 ===== log_add("分析完成") return(list( status = "success", message = "分析完成", warnings = if (length(warnings_list) > 0) warnings_list else NULL, results = output_results, report_blocks = blocks, plots = if (!is.null(plot_base64)) list(plot_base64) else list(), trace_log = logs, reproducible_code = as.character(reproducible_code) )) } # 辅助函数:生成马赛克图(使用 ggplot2 模拟) generate_mosaic_plot <- function(contingency_table, var1, var2) { # 转换为长格式数据 df_plot <- as.data.frame(contingency_table) names(df_plot) <- c("Var1", "Var2", "Freq") p <- ggplot(df_plot, aes(x = Var1, y = Freq, fill = Var2)) + geom_bar(stat = "identity", position = "fill") + scale_y_continuous(labels = scales::percent) + theme_minimal() + labs( title = paste("Association between", var1, "and", var2), x = var1, y = "Proportion", fill = var2 ) + scale_fill_brewer(palette = "Set2") tmp_file <- tempfile(fileext = ".png") ggsave(tmp_file, p, width = 7, height = 5, dpi = 100) base64_str <- base64encode(tmp_file) unlink(tmp_file) return(paste0("data:image/png;base64,", base64_str)) }