LangGraph实战:手把手搭建一个可迭代的DeepResearch多智能体系统
引言:为什么我们需要 DeepResearch?
在大模型时代,LLM(大语言模型)虽然知识渊博,但存在两大致命缺陷:
- 知识滞后:训练数据截止,无法获取最新信息。
 - 幻觉问题:容易“一本正经地胡说八道”。
 
为解决这些问题,OpenAI 推出了 DeepResearch 功能,通过“多智能体协作 + 自我反思 + 互联网检索”的模式,实现高质量、可验证的长篇报告生成。
本文将带你从零开始,使用 LangGraph 搭建一个具备完整研究能力的 DeepResearch 系统,并逐行拆解核心代码,助你掌握多智能体系统的构建精髓。
🎬 运行效果预览
🧩 一、整体架构概览
本系统基于 LangGraph 构建了一个有状态的多节点工作流(StateGraph),其核心流程如下:
[Planner]
↓
[Researcher] → 并行搜索多个子问题
↓
[Analyzer] → 整合信息进行结构化分析
↓
[Writer] → 生成专业研究报告
↓
[Reflector] → 自我评估与反馈
↖_________↓_________↙
是否需要修改?
↓ 是
返回 [Researcher]
↓ 否
[END]系统具备以下特性:
- ✅ 任务分解:自动将复杂问题拆解为子问题
 - ✅ 并行检索:使用 Tavily 工具并发获取信息
 - ✅ 深度分析:结构化整合多源信息
 - ✅ 自我反思:可迭代优化报告质量
 - ✅ 流式输出:实时观察每一步执行过程
 
🧱 二、环境与依赖准备
1. 安装所需库
pip install langgraph langchain-openai tavily-python python-dotenv2. 配置 .env 文件
# 模型配置(以阿里云通义千问为例)
MODEL_NAME=qwen-plus
MODEL_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
MODEL_API_KEY=sk-your-api-key
MODEL_TEMPERATURE=0.7
TOP_P=0.8
MAX_TOKENS=4000000
# Tavily 搜索 API
TAVILY_API_KEY=tvly-your-tavily-key
# 其他参数
STREAMING=True
MAX_RETRIES=2
REQUEST_TIMEOUT=120.0💡 提示:Tavily 是专为 AI Agent 设计的搜索引擎,支持 basic 和 deep 搜索模式,响应快且内容结构化。
🔍 三、核心模块拆解
步骤 1:初始化 LLM 与工具
from langchain_openai import ChatOpenAI
from tavily import TavilyClient
from dotenv import load_dotenv
import os
load_dotenv()
# 初始化大模型
llm = ChatOpenAI(
    model=os.getenv("MODEL_NAME", "qwen-plus"),
    temperature=float(os.getenv("MODEL_TEMPERATURE", 0.7)),
    base_url=os.getenv("MODEL_BASE_URL"),
    api_key=os.getenv("MODEL_API_KEY"),
    max_tokens=int(os.getenv("MAX_TOKENS")),
    timeout=float(os.getenv("REQUEST_TIMEOUT")),
    max_retries=int(os.getenv("MAX_RETRIES")),
    streaming=os.getenv("STREAMING", "True").lower() == "true",
)
# 初始化搜索工具
tavily = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))步骤 2:定义状态结构(AgentState)
from typing import TypedDict, List
class AgentState(TypedDict):
    question: str          # 原始问题
    plan: str              # 研究计划
    results: List[str]     # 搜索结果列表
    analysis: str          # 分析内容
    report: str            # 初步报告
    revision_notes: str    # 反馈意见
    final_report: str      # 最终报告(可选)
    iterations: int        # 当前迭代次数步骤 3:节点 1 - Planner(规划器)
def planner_node(state: AgentState):
    prompt = f"""
    请为以下研究问题制定一个分步计划,分解为3-5个子问题,并建议搜索关键词:
    问题:{state['question']}
    输出格式:
    1. 子问题1 - 关键词1, 关键词2
    2. 子问题2 - 关键词3, 关键词4
    ...
    """
    message = HumanMessage(content=prompt)
    plan = llm.invoke([message]).content
    return {"plan": plan}📌 功能:
将用户问题拆解为可执行的子任务。 输出格式规范,便于后续解析关键词。
✅ 示例输出:
- 什么是量子计算?- 量子比特, 叠加态, 量子门
 
- RSA加密原理是什么?- 公钥加密, 因数分解, 欧拉函数
 
- 量子算法如何破解RSA?- Shor算法, 量子傅里叶变换
 
步骤 4:节点 2 - Researcher(研究员)
调用Tavily互联网搜索信息
async def researcher_node_async(state: AgentState):
    lines = state["plan"].strip().split("\n")
    queries = [line.split("-")[1].strip() if"-"in line else line.strip() for line in lines]
    asyncdef search_query(query):
        try:
            result = tavily.search(query=query, search_depth="basic", include_raw_content=False)
            context = "\n".join([f"{r['title']}: {r['content']}"for r in result["results"]])
            return context
        except Exception as e:
            returnf"搜索失败: {str(e)}"
    results = await asyncio.gather(*[search_query(q) for q in queries])
    return {"results": results}步骤 5:节点 3 - Analyzer(分析器)
def analyzer_node(state: AgentState):
    context = "\n\n".join(state["results"])
    prompt = f"""
    请基于以下信息分析问题:{state['question']}
    搜索结果摘要:
    {context}
    请结构化输出:
    - 背景
    - 核心发现
    - 争议或不确定性
    - 数据/案例支持
    """
    message = HumanMessage(content=prompt)
    analysis = llm.invoke([message]).content
    return {"analysis": analysis}📌 作用:
- 整合多份搜索结果,提炼关键信息。
 - 结构化输出便于后续撰写报告。
 
步骤 6:节点 4 - Writer(写作者)
def writer_node(state: AgentState):
    prompt = f"""
    请撰写一篇专业的深度研究报告,主题:{state['question']}
    分析内容:
    {state['analysis']}
    要求:
    - 结构清晰:引言、研究方法、核心发现、讨论、结论
    - 语言专业,适合学术或商业场景
    - 使用中文
    - 在结尾列出参考来源
    """
    message = HumanMessage(content=prompt)
    report = llm.invoke([message]).content
    return {"report": report}步骤 7:节点 5 - Reflector(反思器)
def reflector_node(state: AgentState):
    prompt = f"""
    请评估以下研究报告的质量:
    问题:{state['question']}
    报告:{state['report']}
    请指出:
    1. 是否遗漏关键信息?
    2. 逻辑是否严密?
    3. 是否需要补充数据或案例?
    4. 建议如何改进?
    如果报告已足够完整,请返回"无需修改"。
    """
    message = HumanMessage(content=prompt)
    feedback = llm.invoke([message]).content
    new_iterations = state.get("iterations", 0) + 1
    return {"revision_notes": feedback, "iterations": new_iterations}📌 意义:
实现“自我反思”机制,提升报告质量。 为迭代优化提供依据。
步骤 8:条件边(Conditional Edge)
def should_continue(state: AgentState):
    notes = state.get("revision_notes", "")
    if "无需修改" in notes or state.get("iterations", 0) >= 1:
        return "end"
    else:
        return "revise"📌 逻辑说明:
若反馈包含“无需修改”或已迭代1次以上 → 结束 否则 → 返回 researcher 重新搜索补充信息
“🔁 支持最多1次迭代(可调),避免无限循环。
步骤 9:构建 LangGraph 工作流
async def create_deep_research_graph_async():
    workflow = StateGraph(AgentState)
    # 添加节点
    workflow.add_node("planner", planner_node)
    workflow.add_node("researcher", researcher_node_async)
    workflow.add_node("analyzer", analyzer_node)
    workflow.add_node("writer", writer_node)
    workflow.add_node("reflector", reflector_node)
    # 设置入口
    workflow.set_entry_point("planner")
    # 连接流程
    workflow.add_edge("planner", "researcher")
    workflow.add_edge("researcher", "analyzer")
    workflow.add_edge("analyzer", "writer")
    workflow.add_edge("writer", "reflector")
    # 条件跳转
    workflow.add_conditional_edges(
        "reflector", 
        should_continue, 
        {"revise": "researcher", "end": END}
    )
    # 编译图
    app = workflow.compile()
    return app步骤 10:异步流式运行与实时输出
async def run_async_research_stream(question: str):
    deep_researcher = await create_deep_research_graph_async()
    print("🔍 开始深度研究...")
    print(f"问题:{question}\n")
    config = {"recursion_limit": 50}
    current_iteration = 0
    async for step_output in deep_researcher.astream(
        {"question": question, "iterations": 0, "results": []}, 
        config
    ):
        for node_name, state in step_output.items():
            if"iterations"in state and state["iterations"] > current_iteration:
                current_iteration = state["iterations"]
                print(f"\n🔄 开始第 {current_iteration} 轮迭代...")
            print(f"\n--- 🧩 执行节点: {node_name} ---")
            if"plan"in state: print("📋 研究计划:\n", state["plan"])
            if"results"in state: 
                print("🔍 搜索结果摘录:")
                for i, r in enumerate(state["results"][:2]): 
                    print(f"  [{i+1}] {r[:300]}...")
            if"analysis"in state: 
                print("📊 分析结果:\n", state["analysis"][:600], "...")
            if"report"in state: 
                print("📝 初步报告:\n", state["report"][:600], "...")
            if"revision_notes"in state: 
                print("🔄 修订意见:\n", state["revision_notes"])
            if node_name == "reflector":
                if"无需修改"in state.get("revision_notes", ""):
                    print("\n✅ 研究完成,无需进一步修改")
                elif state.get("iterations", 0) >= 1:
                    print("\n⏹️ 达到最大迭代次数,结束研究")
    # 获取最终报告
    final_state = await deep_researcher.ainvoke(
        {"question": question, "iterations": 0, "results": []}, 
        config
    )
    print("\n" + "="*80)
    print("✅ 最终研究报告")
    print("="*80)
    print(final_state["report"])
    print("\n" + "="*80)
    print(f"📈 总迭代次数: {final_state['iterations']}")
    print("="*80)
    return final_state
if __name__ == "__main__":
    question = "量子计算对当前RSA加密体系的威胁有多大?未来应如何应对?"
    asyncio.run(run_async_research_stream(question))📌 运行效果示例:
🔍 开始深度研究...
问题:量子计算对当前RSA加密体系的威胁有多大?未来应如何应对?
--- 🧩 执行节点: planner ---
📋 研究计划:
1. 量子计算的基本原理是什么?- 量子比特, 叠加, 纠缠
2. RSA加密依赖的数学难题是什么?- 大数分解, 欧拉函数
3. Shor算法如何破解RSA?- 量子傅里叶变换, 周期查找
...
--- 🧩 执行节点: researcher ---
🔍 搜索结果摘录:
  [1] 量子计算原理简介: 量子比特可以同时处于0和1...
  [2] RSA加密安全性分析: 依赖大整数质因数分解的困难性...
--- 🧩 执行节点: analyzer ---
📊 分析结果:
- 背景:量子计算利用叠加态实现并行计算...
...
✅ 最终研究报告
==================================================
标题:量子计算对RSA加密的威胁与应对策略
引言:随着量子计算的发展,传统公钥加密面临严峻挑战...
...
==================================================
📈 总迭代次数: 1📚 四、总结
本文使用 LangGraph 搭建了一个可迭代的多智能体 DeepResearch 系统,完整实现了:
- ✅ 问题拆解(Planner)
 - ✅ 并行检索(Researcher + Tavily)
 - ✅ 信息整合(Analyzer)
 - ✅ 报告生成(Writer)
 - ✅ 自我反思与迭代优化(Reflector)
 
通过“规划 → 搜索 → 分析 → 写作 → 反思”的闭环流程,系统能生成高质量、可验证的研究报告,有效缓解大模型的知识滞后与幻觉问题。
推荐大家参考Langchain官方开源的DeepResearch项目:
https://github.com/langchain-ai/open_deep_research/tree/main
📚 项目介绍
🌈 项目亮点
- ✅ 集成 MCP 多智能体架构
 - ✅ 支持 Dify / LangChain / LlamaIndex / Ollama / vLLM / Neo4j
 - ✅ 前端采用 Vue3 + TypeScript + Vite5,现代化交互体验
 - ✅ 内置 ECharts / AntV 图表问答 + CSV 表格问答
 - ✅ 支持对接主流 RAG 系统 与 Text2SQL 引擎
 - ✅ 轻量级 Sanic 后端,适合快速部署与二次开发
 - ✅ 项目已被蚂蚁官方推荐收录
 

运行效果:

最后编辑:Jeebiz 更新时间:2025-10-21 09:18