目录

spring-ai-alibaba-deepsearch-学习四源码学习之RagNode

spring-ai-alibaba-deepsearch 学习(四)——源码学习之RagNode

本篇为spring-ai-alibaba学习系列第三十篇

前面介绍 rewrite_multi_query 节点最后会根据用户上传文件标识 user_upload_file 决定下一节点

现在来看一下第一个分支,当 user_upload_file 为 true 时,转入 user_file_rag 节点

该节点用来对用户自建知识库进行检索,实现了查询重写、查询扩展、条件过滤、ES混合检索、结果去重、重排序等功能,并最终根据检索内容生成对用户问题的回答

但使用时注意测试,可能尚存在一些问题

使用方法

需要添加配置,来启用各项功能


# RAG 开关
spring.ai.alibaba.deepresearch.rag.enabled=true
# 是否启用查询扩展功能,默认为false
spring.ai.alibaba.deepresearch.rag.pipeline.queryExpansionEnabled=
# 是否启用查询翻译功能,默认为false
spring.ai.alibaba.deepresearch.rag.pipeline.queryTranslationEnabled=
spring.ai.alibaba.deepresearch.rag.pipeline.queryTranslationLanguage=
# 是否启用后处理选择第一个结果功能,默认为false,与重排序二选一
spring.ai.alibaba.deepresearch.rag.pipeline.postProcessingSelectFirstEnabled=
# 是否启用去重功能,默认为 true
spring.ai.alibaba.deepresearch.rag.pipeline.deduplicationEnabled=
# 是否启用重排序,默认为 true
spring.ai.alibaba.deepresearch.rag.pipeline.rerankEnabled=
spring.ai.alibaba.deepresearch.rag.pipeline.rerankTopK=
spring.ai.alibaba.deepresearch.rag.pipeline.rerankThreshold=


# RAG 简单向量存储相关配置,与ES二选一
#使用 simple
spring.ai.alibaba.deepresearch.rag.vectorStoreType=simple
# 简单向量存储的存储路径,默认为"vector_store.json"
spring.ai.alibaba.deepresearch.rag.simple.storagePath=
spring.ai.alibaba.deepresearch.rag.pipeline.topK=
spring.ai.alibaba.deepresearch.rag.pipeline.similarityThreshold=


# RAG ES 相关配置
#使用 ES,默认为 simple
spring.ai.alibaba.deepresearch.rag.vectorStoreType=elasticsearch
# 索引名称,默认为 spring-ai-rag-es-index
spring.ai.alibaba.deepresearch.rag.elasticsearch.indexName=
# 向量维度,默认为1536
spring.ai.alibaba.deepresearch.rag.elasticsearch.dimensions=
# Elasticsearch连接URI
spring.ai.alibaba.deepresearch.rag.elasticsearch.uris=
# Elasticsearch用户名
spring.ai.alibaba.deepresearch.rag.elasticsearch.username=
# Elasticsearch密码
spring.ai.alibaba.deepresearch.rag.elasticsearch.password=
# 相似度函数配置,可选值 l2_norm,dot_product,cosine
spring.ai.alibaba.deepresearch.rag.elasticsearch.similarityFunction=
# 混合搜索配置
# 是否启用混合搜索,默认false
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.enabled=
# BM25搜索的权重,默认 1.0f
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.bm25Boost=
# KNN搜索的权重,默认 1.0f
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.knnBoost=
# RRF算法中的窗口大小,默认 10
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.rrfWindowSize=
# RRF算法中的排名常数,默认 60
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.rrfRankConstant=

输入项:

user_upload_file:进入当前节点的前提(true)

还需添加参数,以区分数据来源

user_id:用户id

session_id:会话id

节点产出

rag_content:大模型结合用户知识库检索结果做出的回答

提示词模板


# 角色
你是一个专门负责根据所提供的上下文信息来回答用户问题的 AI 助手。你没有先验知识,所有回答都必须严格基于上下文。

# 核心任务
你的任务是基于下方提供的 [上下文],为 [用户问题] 生成一份结构清晰、格式严谨、专业中立的 Markdown 格式回答。

# 回答规则

### 1. 内容与准确性
- **严格忠于原文**:只使用上下文中包含的信息。如果上下文没有足够信息来回答问题,必须明确回答:“根据所提供的资料,我无法找到相关问题的答案。”
- **切中要点**:只提供与问题直接相关的信息,避免无关内容和重复信息。
- **语言一致**:回答的语言必须与用户问题的语言保持一致(专有名词和引用除外)。

### 2. 结构与格式
- **结论先行**:在回答的开头,首先给出最核心的结论或要点,无需任何前缀标题。
- **层级清晰**:
    - 使用 Markdown 的二级标题 (`## 标题`) 和三级标题 (`### 子标题`) 来组织内容,形成逻辑清晰的层次结构。
    - 确保每个部分都有一个简明扼要的标题。
- **格式严谨**:整个回答必须是美观且规范的 Markdown 格式。

### 3. 风格与引用
- **专业口吻**:以专家、客观、中立的语气进行陈述。
- **直接陈述**:直接给出答案和信息,避免使用“根据上下文...”、“所提供的资料显示...”等引导性短语。
- **标注来源**:当引用上下文中的具体信息时,必须以 Markdown 超链接的形式 `[来源文档A](链接)` 附上来源,以便用户点击查证。

---
[上下文]:
"""
{context}
"""

[用户问题]:
"""
{question}
"""

在获取到用户知识库的检索结果后,会调用大模型进行回答,以上即回答时使用的提示词,指定了角色,任务,规则等内容

源码跟踪

跟踪:在 DeepResearchConfiguration 中,user_file_rag 节点是通过 ragNodeService 的 createUserFileRagNode 方法来创建的

跟踪:在 RagNodeService 的 createUserFileRagNode 方法中,若 hyBridRagProcessor 不为空,则使用 hyBridRagProcessorragAgent 来创建 RagNode,否则使用 userFileRetrievalStrategyfusionStrategyragAgent 来创建RagNode

跟踪:hyBridRagProcessor 是一个类似 RetrievalAugmentationAdvisor 的检索类,实现了查询重写、查询扩展、条件过滤、ES混合检索、结果去重、重排序等功能,并支持通过配置来控制这些功能的启用与否,hyBridRagProcessor 本身的启用也是通过配置文件的 spring.ai.alibaba.deepresearch.rag.enabled 来控制的

ps:1.0.0.3版本这里有个问题,hyBridRagProcessor 并不会读取前序节点 rewrite_multi_query 处理好的optimize_queries,而是重新进行问题重写和扩展,这样浪费了token;如果关闭问题重写和扩展,则使用的是用户的原始问题进行后续操作,没有经过问题重写和扩展

跟踪:ragAgent 是一个定义了系统消息的 ChatClient,系统rag消息模板在文末

跟踪:userFileRetrievalStrategy 指定搜索条件source_type=user_upload,底层使用 hybridRagProcessor 进行检索

ps:userFileRetrievalStrategy 也是通过spring.ai.alibaba.deepresearch.rag.enabled 来控制启用的,与 hyBridRagProcessor 相同,也就是说 ragNodeService 中永远会走 hyBridRagProcessor 不为空的分支,这里应该是有问题

跟踪:fusionStrategy 是用来对多个检索策略的结果进行融合的

ps:当前情景下,仅userFileRetrievalStrategy 一个检索策略,实际用不上

跟踪:回到 RagNode ,看一下 apply 方法的整体流程:

1)从 OverAllState 中获取用户原始查询,以及 session_id 和 user_id

2)调用 hyBridRagProcessor 的 process 方法,获取检索结果

3)调用 ragAgent,将检索结果和用户原始问题作为 UserMessage 传入,得到大模型响应,并最终放入 OverAllState 的 rag_content 中

ps:这里也有点问题,rag模板中设置了上下文和用户问题的占位符,但实际并没有用到

总结

user_file_rag 这个节点问题不少,使用时可以自行优化一下,总体作用本来应该是如果满足条件(前序节点对用户上传文件标识的判断),就对指定的数据源数据进行检索(source_type=user_upload),但实际由于都走的 hyBridRagProcessor,并没有对数据源进行区分

实际代码流程梳理

ragNodeService 使用 hyBridRagProcessor 和 ragAgent 创建 RagNode

apply 方法调用 hyBridRagProcessor 方法,传入用户id、会话id、用户原始问题

hyBridRagProcessor 根据用户配置,调用或跳过查询重写、查询扩展、条件过滤、ES混合检索、结果去重、重排序等功能,返回检索结果

ragAgent 调用大模型,传入系统消息(rag模板)、检索结果、用户问题,并将响应结果放入OverAllState 的 rag_content 中

后续节点

后续节点的条件边也有点问题,UserFileRagDispatcher 判断 user_upload_file 为 true 返回 user_file_rag,否则返回 background_investigator,但实际指向 background_investigator 和 END节点

附 RagNode.apply 代码


public Map<String, Object> apply(OverAllState state) throws Exception {
		logger.info("rag_node is running.");
		String queryText = StateUtil.getQuery(state);

		Map<String, Object> options = new HashMap<>();
		state.value("session_id", String.class).ifPresent(v -> options.put("session_id", v));
		state.value("user_id", String.class).ifPresent(v -> options.put("user_id", v));
		options.put("query", queryText); // 添加查询文本供后处理使用

		List<Document> documents = new ArrayList<>();

		// 使用统一的RAG处理器或传统的策略模式
		if (hybridRagProcessor != null) {
			// 使用统一的RAG处理器,包含完整的前后处理逻辑
			Query query = new Query(queryText);
			documents = hybridRagProcessor.process(query, options);
		}
		else if (retrievalStrategies != null && fusionStrategy != null) {
			// 传统策略模式(向后兼容)
			List<List<Document>> allResults = new ArrayList<>();
			for (RetrievalStrategy strategy : retrievalStrategies) {
				allResults.add(strategy.retrieve(queryText, options));
			}
			documents = fusionStrategy.fuse(allResults);
		}

		// 构建上下文
		StringBuilder contextBuilder = new StringBuilder();
		for (Document doc : documents) {
			contextBuilder.append(doc.getText()).append("\n");
		}

		// 生成响应
		Flux<ChatResponse> streamResult = ragAgent.prompt()
			.messages(new UserMessage(contextBuilder.toString()))
			.user(queryText)
			.stream()
			.chatResponse()
			.timeout(Duration.ofSeconds(180))
			.retry(2);

		logger.info("RAG node produced a result.");

		var generatedContent = StreamingChatGenerator.builder()
			.startingNode("rag_llm_stream")
			.startingState(state)
			.mapResult(response -> Map.of("rag_content",
					Objects.requireNonNull(response.getResult().getOutput().getText())))
			.buildWithChatResponse(streamResult);

		logger.info("RAG node produced a streaming result.");

		Map<String, Object> updated = new HashMap<>();
		updated.put("rag_content", generatedContent);

		return updated;
	}