spring-ai-alibaba-deepsearch-学习四源码学习之RagNode
spring-ai-alibaba-deepsearch 学习(四)——源码学习之RagNode
本篇为spring-ai-alibaba学习系列第三十篇
前面介绍 rewrite_multi_query 节点最后会根据用户上传文件标识 user_upload_file 决定下一节点
现在来看一下第一个分支,当 user_upload_file 为 true 时,转入 user_file_rag 节点
该节点用来对用户自建知识库进行检索,实现了查询重写、查询扩展、条件过滤、ES混合检索、结果去重、重排序等功能,并最终根据检索内容生成对用户问题的回答
但使用时注意测试,可能尚存在一些问题
使用方法
需要添加配置,来启用各项功能
# RAG 开关
spring.ai.alibaba.deepresearch.rag.enabled=true
# 是否启用查询扩展功能,默认为false
spring.ai.alibaba.deepresearch.rag.pipeline.queryExpansionEnabled=
# 是否启用查询翻译功能,默认为false
spring.ai.alibaba.deepresearch.rag.pipeline.queryTranslationEnabled=
spring.ai.alibaba.deepresearch.rag.pipeline.queryTranslationLanguage=
# 是否启用后处理选择第一个结果功能,默认为false,与重排序二选一
spring.ai.alibaba.deepresearch.rag.pipeline.postProcessingSelectFirstEnabled=
# 是否启用去重功能,默认为 true
spring.ai.alibaba.deepresearch.rag.pipeline.deduplicationEnabled=
# 是否启用重排序,默认为 true
spring.ai.alibaba.deepresearch.rag.pipeline.rerankEnabled=
spring.ai.alibaba.deepresearch.rag.pipeline.rerankTopK=
spring.ai.alibaba.deepresearch.rag.pipeline.rerankThreshold=
# RAG 简单向量存储相关配置,与ES二选一
#使用 simple
spring.ai.alibaba.deepresearch.rag.vectorStoreType=simple
# 简单向量存储的存储路径,默认为"vector_store.json"
spring.ai.alibaba.deepresearch.rag.simple.storagePath=
spring.ai.alibaba.deepresearch.rag.pipeline.topK=
spring.ai.alibaba.deepresearch.rag.pipeline.similarityThreshold=
# RAG ES 相关配置
#使用 ES,默认为 simple
spring.ai.alibaba.deepresearch.rag.vectorStoreType=elasticsearch
# 索引名称,默认为 spring-ai-rag-es-index
spring.ai.alibaba.deepresearch.rag.elasticsearch.indexName=
# 向量维度,默认为1536
spring.ai.alibaba.deepresearch.rag.elasticsearch.dimensions=
# Elasticsearch连接URI
spring.ai.alibaba.deepresearch.rag.elasticsearch.uris=
# Elasticsearch用户名
spring.ai.alibaba.deepresearch.rag.elasticsearch.username=
# Elasticsearch密码
spring.ai.alibaba.deepresearch.rag.elasticsearch.password=
# 相似度函数配置,可选值 l2_norm,dot_product,cosine
spring.ai.alibaba.deepresearch.rag.elasticsearch.similarityFunction=
# 混合搜索配置
# 是否启用混合搜索,默认false
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.enabled=
# BM25搜索的权重,默认 1.0f
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.bm25Boost=
# KNN搜索的权重,默认 1.0f
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.knnBoost=
# RRF算法中的窗口大小,默认 10
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.rrfWindowSize=
# RRF算法中的排名常数,默认 60
spring.ai.alibaba.deepresearch.rag.elasticsearch.hybrid.rrfRankConstant=
输入项:
user_upload_file:进入当前节点的前提(true)
还需添加参数,以区分数据来源
user_id:用户id
session_id:会话id
节点产出
rag_content:大模型结合用户知识库检索结果做出的回答
提示词模板
# 角色
你是一个专门负责根据所提供的上下文信息来回答用户问题的 AI 助手。你没有先验知识,所有回答都必须严格基于上下文。
# 核心任务
你的任务是基于下方提供的 [上下文],为 [用户问题] 生成一份结构清晰、格式严谨、专业中立的 Markdown 格式回答。
# 回答规则
### 1. 内容与准确性
- **严格忠于原文**:只使用上下文中包含的信息。如果上下文没有足够信息来回答问题,必须明确回答:“根据所提供的资料,我无法找到相关问题的答案。”
- **切中要点**:只提供与问题直接相关的信息,避免无关内容和重复信息。
- **语言一致**:回答的语言必须与用户问题的语言保持一致(专有名词和引用除外)。
### 2. 结构与格式
- **结论先行**:在回答的开头,首先给出最核心的结论或要点,无需任何前缀标题。
- **层级清晰**:
- 使用 Markdown 的二级标题 (`## 标题`) 和三级标题 (`### 子标题`) 来组织内容,形成逻辑清晰的层次结构。
- 确保每个部分都有一个简明扼要的标题。
- **格式严谨**:整个回答必须是美观且规范的 Markdown 格式。
### 3. 风格与引用
- **专业口吻**:以专家、客观、中立的语气进行陈述。
- **直接陈述**:直接给出答案和信息,避免使用“根据上下文...”、“所提供的资料显示...”等引导性短语。
- **标注来源**:当引用上下文中的具体信息时,必须以 Markdown 超链接的形式 `[来源文档A](链接)` 附上来源,以便用户点击查证。
---
[上下文]:
"""
{context}
"""
[用户问题]:
"""
{question}
"""
在获取到用户知识库的检索结果后,会调用大模型进行回答,以上即回答时使用的提示词,指定了角色,任务,规则等内容
源码跟踪
跟踪:在 DeepResearchConfiguration 中,user_file_rag 节点是通过 ragNodeService 的 createUserFileRagNode 方法来创建的
跟踪:在 RagNodeService 的 createUserFileRagNode 方法中,若 hyBridRagProcessor 不为空,则使用 hyBridRagProcessor 和 ragAgent 来创建 RagNode,否则使用 userFileRetrievalStrategy,fusionStrategy,ragAgent 来创建RagNode
跟踪:hyBridRagProcessor 是一个类似 RetrievalAugmentationAdvisor 的检索类,实现了查询重写、查询扩展、条件过滤、ES混合检索、结果去重、重排序等功能,并支持通过配置来控制这些功能的启用与否,hyBridRagProcessor 本身的启用也是通过配置文件的 spring.ai.alibaba.deepresearch.rag.enabled 来控制的
ps:1.0.0.3版本这里有个问题,hyBridRagProcessor 并不会读取前序节点 rewrite_multi_query 处理好的optimize_queries,而是重新进行问题重写和扩展,这样浪费了token;如果关闭问题重写和扩展,则使用的是用户的原始问题进行后续操作,没有经过问题重写和扩展
跟踪:ragAgent 是一个定义了系统消息的 ChatClient,系统rag消息模板在文末
跟踪:userFileRetrievalStrategy 指定搜索条件source_type=user_upload,底层使用 hybridRagProcessor 进行检索
ps:userFileRetrievalStrategy 也是通过spring.ai.alibaba.deepresearch.rag.enabled 来控制启用的,与 hyBridRagProcessor 相同,也就是说 ragNodeService 中永远会走 hyBridRagProcessor 不为空的分支,这里应该是有问题
跟踪:fusionStrategy 是用来对多个检索策略的结果进行融合的
ps:当前情景下,仅userFileRetrievalStrategy 一个检索策略,实际用不上
跟踪:回到 RagNode ,看一下 apply 方法的整体流程:
1)从 OverAllState 中获取用户原始查询,以及 session_id 和 user_id
2)调用 hyBridRagProcessor 的 process 方法,获取检索结果
3)调用 ragAgent,将检索结果和用户原始问题作为 UserMessage 传入,得到大模型响应,并最终放入 OverAllState 的 rag_content 中
ps:这里也有点问题,rag模板中设置了上下文和用户问题的占位符,但实际并没有用到
总结
user_file_rag 这个节点问题不少,使用时可以自行优化一下,总体作用本来应该是如果满足条件(前序节点对用户上传文件标识的判断),就对指定的数据源数据进行检索(source_type=user_upload),但实际由于都走的 hyBridRagProcessor,并没有对数据源进行区分
实际代码流程梳理
ragNodeService 使用 hyBridRagProcessor 和 ragAgent 创建 RagNode
apply 方法调用 hyBridRagProcessor 方法,传入用户id、会话id、用户原始问题
hyBridRagProcessor 根据用户配置,调用或跳过查询重写、查询扩展、条件过滤、ES混合检索、结果去重、重排序等功能,返回检索结果
ragAgent 调用大模型,传入系统消息(rag模板)、检索结果、用户问题,并将响应结果放入OverAllState 的 rag_content 中
后续节点
后续节点的条件边也有点问题,UserFileRagDispatcher 判断 user_upload_file 为 true 返回 user_file_rag,否则返回 background_investigator,但实际指向 background_investigator 和 END节点
附 RagNode.apply 代码
public Map<String, Object> apply(OverAllState state) throws Exception {
logger.info("rag_node is running.");
String queryText = StateUtil.getQuery(state);
Map<String, Object> options = new HashMap<>();
state.value("session_id", String.class).ifPresent(v -> options.put("session_id", v));
state.value("user_id", String.class).ifPresent(v -> options.put("user_id", v));
options.put("query", queryText); // 添加查询文本供后处理使用
List<Document> documents = new ArrayList<>();
// 使用统一的RAG处理器或传统的策略模式
if (hybridRagProcessor != null) {
// 使用统一的RAG处理器,包含完整的前后处理逻辑
Query query = new Query(queryText);
documents = hybridRagProcessor.process(query, options);
}
else if (retrievalStrategies != null && fusionStrategy != null) {
// 传统策略模式(向后兼容)
List<List<Document>> allResults = new ArrayList<>();
for (RetrievalStrategy strategy : retrievalStrategies) {
allResults.add(strategy.retrieve(queryText, options));
}
documents = fusionStrategy.fuse(allResults);
}
// 构建上下文
StringBuilder contextBuilder = new StringBuilder();
for (Document doc : documents) {
contextBuilder.append(doc.getText()).append("\n");
}
// 生成响应
Flux<ChatResponse> streamResult = ragAgent.prompt()
.messages(new UserMessage(contextBuilder.toString()))
.user(queryText)
.stream()
.chatResponse()
.timeout(Duration.ofSeconds(180))
.retry(2);
logger.info("RAG node produced a result.");
var generatedContent = StreamingChatGenerator.builder()
.startingNode("rag_llm_stream")
.startingState(state)
.mapResult(response -> Map.of("rag_content",
Objects.requireNonNull(response.getResult().getOutput().getText())))
.buildWithChatResponse(streamResult);
logger.info("RAG node produced a streaming result.");
Map<String, Object> updated = new HashMap<>();
updated.put("rag_content", generatedContent);
return updated;
}