从零开始搭建基于LangChain的流量风险审计系统

原文:https://mp.weixin.qq.com/s/vvyWi_BX5jB8q6_ko6NJrw

本文探讨基于Langchain生态的Agent、MCP开发,设计一套简单的流量风险审计系统,该系统通过四个模块实现从静态风险识别到动态漏洞验证的完整闭环。

数据流动如下:

总体流程:

  • 风险提取:使用正则规则扫描流量,标记高风险记录。
  • Agent判断:AI Agent分析风险流量,生成POC。
  • POC执行:通过MCP平台执行Curl命令模拟攻击。
  • 验证成功:SSRF靶场监听并记录是否触发漏洞。

技术栈:re、concurrent.futures、LangChain、FastMCP、BaseHTTPRequestHandler、异步/并发处理(asyncio + ThreadPoolExecutor)、安全考虑(命令验证、鉴权、超时)

AdvancedRiskTrafficAnalyzer

这个模块使用Python类实现流量风险检测,主要通过正则表达式匹配常见攻击模式(如SQL注入、RCE、SSRF等)。主要通过下面三个方式计算流量风险:置信度因子、风险评分权重、风险检测规则。简单记录实现代码如下:

以SQL类风险配置为例

'SQL_INJECTION': 0.80,
'SQL_INJECTION': {'base': 15, 'multiplier': 1.2, 'max_single': 30}
(r"(?i)\binsert\s+into\s+\w+\s*\([^)]*\)\s*values", 15, "SQL操作-INSERT语句")
 ...
combined_content = f"{decoded_url} {decoded_body} {user_agent} {headers}"

多层URL解码机制

def _decode_url(self, url: str) -> str:
    """多层URL解码"""
    try:
        decoded = url
        for _ in range(5):
            new_decoded = urllib.parse.unquote(decoded, errors='ignore')
            if new_decoded == decoded:
                break
            decoded = new_decoded
        return decoded
    except Exception:
        return url

通过ThreadPoolExecutor多线程执行

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_batch = {
            executor.submit(process_batch, batch): i 
            for i, batch in enumerate(batches)
        }

        for future in as_completed(future_to_batch): #字典在 Python 中是可迭代的,默认迭代其键
            batch_results = future.result()
            results.extend(batch_results)

AutonomousAuditAgent

这个模块构建AI Agent,使用LangChain和LangGraph自主决策:分析风险流量、生成POC、执行验证(最多3次poc优化尝试),输出严格JSON格式,简单记录实现代码如下:

Agent初始化

self.llm = ChatOpenAI(openai_api_base=...,openai_api_key=...,model=..., max_tokens=10240, temperature=0.3)
mcp_servers = {
                "curl_executor": {
                    "url": "",
                    "transport": "streamable_http",
                    "headers": {"Authorization": MCP_TOKEN}
                },
                "login_service": {
                    "url": "",
                    "transport": "streamable_http",
                    "headers": {"Authorization": API_KEY}
                }
            }
self.mcp_client = MultiServerMCPClient(mcp_servers)
tools = await asyncio.wait_for(self.mcp_client.get_tools(),timeout=30.0)
tools=tools+[localtool]
self.prompt = ChatPromptTemplate.from_messages([("system", 
"""
...
- SSRF漏洞验证: 必须使用http://.../[marker],并检查SSRF标记
- RCE类漏洞验证:必须通过使其执行curl http://.../[marker]命令,并检查SSRF标记
- SQL类漏洞验证:必须执行SQL语句查询数据库版本或者查询其他无害信息
...
"""),
 ("human", "{input}"),
("placeholder","{agent_scratchpad}")])
agent = create_openai_tools_agent(self.llm, tools, self.prompt)
self.agent_executor = AgentExecutor(agent=agent, tools=tools, max_iterations=10,handle_parsing_errors=True,verbose=True)
result = await asyncio.wait_for(self.agent_executor.ainvoke({"input": audit_request}),timeout=240.0)
output = result.get("output", "").strip()

多线程

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_row = {}
        for index, row in merged_df.iterrows():
            future = executor.submit(platform.audit_record_sync, row, index)
            future_to_row[future] = (index, row)

        for future in as_completed(future_to_row):
            result = future.result(timeout=300)
            audit_results.append(result)
create_openai_tools_agent/create_react_agent
特性 create_openai_tools_agent create_react_agent
底层机制 OpenAI 函数调用(JSON 输出) ReAct 框架(Thought-Action-Observation 循环)
模型依赖 需支持函数调用的 OpenAI 模型(如 GPT-3.5/4) 任意支持文本输出的 LLM
推理过程 隐式推理,模型内部决定工具调用 显式推理,输出详细的思考步骤
效率 更高(更少 token,单次调用为主) 较低(多轮迭代,更多 token)
灵活性 适合简单、结构化任务 适合复杂、多步推理任务
定制化 定制化较弱(依赖模型函数调用逻辑) 高度可定制(通过提示模板控制推理和工具使用)
输出格式 结构化 JSON(工具调用)、json.loads加载 文本(包含 Thought/Action/Observation)
调试透明性 较低(推理过程不透明) 较高(显式记录推理步骤)

curl-executor_mcp

MCP平台执行Curl命令,支持SSRF检查。使用FastMCP框架,提供工具接口。简单记录实现代码如下:

鉴权

def auth():
    request: Request = get_http_request()
    token = request.headers.get("Authorization", "Unknown")
    if token != mcp_token:
        raise HTTPException(status_code=401, detail="Unauthorized")

执行命令

safe_command = f"{command} --max-time {timeout} --connect-timeout 15 --compressed --show-error"
if self.validate_curl_command(command):
  result = subprocess.run(safe_command, shell=True, capture_output=True, text=True, timeout=timeout + 5)
  return {
                'success': result.returncode == 0,
                'execution_id': execution_id,
                'command': command,
                'safe_command': safe_command,
                'stdout': result.stdout,
                'stderr': result.stderr,
                'return_code': result.returncode,
                'execution_time': execution_time,
                'timestamp': datetime.now().isoformat()
            }

mcp框架

from fastmcp import FastMCP

sandbox = CurlSandbox()
# MCP工具函数
@mcp.tool()
def execute_curl_command(command: str, check_ssrf: bool = False, ssrf_marker: str = "") -> Dict[str, Any]:
    """
    执行curl命令并可选检查SSRF触发
    Args:
        command: curl命令字符串
        check_ssrf: 是否检查SSRF触发
        ssrf_marker: SSRF标记(如果为空则自动提取)
    Returns:
        执行结果字典
    """
   auth()
   ifnot command ornot isinstance(command, str):
        return {'success': False, 'error': '无效的curl命令'}

    result = sandbox.execute_curl(command)
    if check_ssrf:
        ifnot ssrf_marker:
            ssrf_marker = sandbox.extract_ssrf_marker(command)

        if ssrf_marker:
            ssrf_result = sandbox.ssrf_checker.check_ssrf_triggered(ssrf_marker)
            result['ssrf_check'] = ssrf_result

if __name__ == "__main__":
    try:
        mcp.run(transport="http", host="0.0.0.0", port=8000, path="/curlmcp")
    except KeyboardInterrupt:
        logger.info("服务停止")
    except Exception as e:
        logger.error(f"服务异常: {str(e)}")
        raise

SSRFReceiver

实现一个简化的SSRF(服务器端请求伪造)测试接收平台,实现如下功能

  • 监听和记录HTTP请求(包括GET、POST等方法),用于接收SSRF测试的连接。
  • 提供查询API,用于查看连接记录和统计信息。
  • 支持自动清理旧数据(每24小时一次)。
  • 提供简单的Web界面展示API文档。

主要是使用 BaseHTTPRequestHandler

作用:BaseHTTPRequestHandler 是一个抽象基类,用于处理单个 HTTP 请求。开发者通过继承该类并重写其方法来定义自定义的请求处理逻辑。

工作机制:

  • 每次客户端发起 HTTP 请求(如 GET、POST 等),HTTPServer 会创建一个 BaseHTTPRequestHandler 实例来处理该请求。
  • 它解析请求(方法、路径、头、数据等),并提供方法供开发者自定义响应。

典型用法:

  • 继承 BaseHTTPRequestHandler,重写 do_<METHOD> 方法(如 do_GET、do_POST)来处理特定 HTTP 方法的请求。
  • 使用提供的属性(如 self.path、self.headers)访问请求信息。
  • 通过 self.wfile 发送响应内容,通过 self.send_response 和 self.send_header 设置响应状态和头。

重写的方法:

  • do_GET、do_POST、do_PUT、do_DELETE、do_OPTIONS、do_HEAD:统一调用自定义的 handlerequest 处理各种 HTTP 方法。
  • log_message:pass禁用默认日志,使用 logging 模块记录。

使用的属性:

  • self.client_address:获取客户端 IP 和端口。
  • self.path:解析请求路径。
  • self.headers:获取请求头。
  • self.rfile:读取请求体。request_body = self.rfile.read(content_length).decode(‘utf-8’, errors=’ignore’)
  • self.wfile:发送响应内容。self.wfile.write(json.dumps(response_data, indent=2, ensure_ascii=False).encode(‘utf-8’))
  • self.server.server_port:获取服务器端口。

简单记录实现代码如下:

定时清理

    def start_cleanup_thread(self):
        """启动定时清理线程"""

        def cleanup_worker():
            whileTrue:
                try:
                    # 检查是否需要清理(24小时间隔)
                    if datetime.now() - self.last_cleanup > timedelta(hours=24):
                        self.cleanup_old_data()
                    time.sleep(3600)  # 每小时检查一次
                except Exception as e:
                    logging.error(f"清理线程错误: {e}")
                    time.sleep(3600)

        cleanup_thread = threading.Thread(target=cleanup_worker)
        cleanup_thread.daemon = True
        cleanup_thread.start()
        logging.info("定时清理线程已启动")

接受连接

class HTTPRequestHandler(BaseHTTPRequestHandler):
    """HTTP请求处理器 - 用于接收SSRF测试"""

    def __init__(self, *args, logger=None, **kwargs):
        self.logger = logger
        super().__init__(*args, **kwargs)

    def do_GET(self):
        self._handle_request('GET')

    def do_POST(self):
        self._handle_request('POST')

    def do_PUT(self):
        self._handle_request('PUT')

    def do_DELETE(self):
        self._handle_request('DELETE')

    def do_OPTIONS(self):
        self._handle_request('OPTIONS')

    def do_HEAD(self):
        self._handle_request('HEAD')

    def _handle_request(self, method):
        """处理HTTP请求"""
        client_ip = self.client_address[0]
        client_port = self.client_address[1]
        server_port = self.server.server_port

        # 获取URL路径
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path
        # 获取请求数据
        content_length = int(self.headers.get('Content-Length', 0))
        request_body = self.rfile.read(content_length).decode('utf-8', errors='ignore')

        # 构建请求信息
        request_info = {
            'method': method,
            'path': path,
            'query': urllib.parse.parse_qs(parsed_path.query),
            'body': request_body[:1000] if request_body else''# 限制body长度
        }

        # 记录连接
        if self.logger:
            self.logger.log_connection(
                source_ip=client_ip,
                source_port=client_port,
                target_port=server_port,
                path=path,
                request_data=json.dumps(request_info),
                headers=self.headers,
                user_agent=self.headers.get('User-Agent')
            )

        # 发送响应
        self.send_response(200)
        self.send_header('Content-type', 'application/json; charset=utf-8')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.end_headers()

        response_data = {
            'status': 'success',
            'message': 'SSRF received',
            'timestamp': datetime.now().isoformat(),
            'client_info': {
                'ip': client_ip,
                'port': client_port,
                'user_agent': self.headers.get('User-Agent')
            },
            'request_info': {
                'method': method,
                'path': path,
                'server_port': server_port
            }
        }

        self.wfile.write(json.dumps(response_data, indent=2, ensure_ascii=False).encode('utf-8'))

    def log_message(self, format, *args):
        """重写日志方法"""
        pass

查询连接

class QueryAPIHandler(BaseHTTPRequestHandler):
    """查询API处理器"""

    def __init__(self, *args, logger=None, auth_token=None, **kwargs):
        self.logger = logger
        self.auth_token = auth_token or"ssrf-auth"
        super().__init__(*args, **kwargs)

    def _check_auth(self):
        """检查鉴权"""
        auth_header = self.headers.get('Auth') or self.headers.get('Authorization')
        ...

    def do_GET(self):
        parsed_path = urllib.parse.urlparse(self.path)
        path = parsed_path.path
        query_params = urllib.parse.parse_qs(parsed_path.query)

        try:
            if path == '/':
                self.send_response(200)
                self.send_header('Content-type', 'text/html; charset=utf-8')
                self.send_header('Access-Control-Allow-Origin', '*')
                self.end_headers()
                self._handle_index()
            elif path.startswith('/api/'):
                is_auth, auth_message = self._check_auth()
                ifnot is_auth:
                    logging.warning(f"鉴权失败: {self.client_address[0]} - {auth_message}")
                    self._send_auth_error()
                    return

                self.send_response(200)
                self.send_header('Content-type', 'application/json; charset=utf-8')
                self.send_header('Access-Control-Allow-Origin', '*')
                self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
                self.send_header('Access-Control-Allow-Headers', 'Content-Type, Auth, Authorization')
                self.end_headers()

                if path == '/api/query':
                    self._handle_path_query(query_params)
                elif path == '/api/all':
                    self._handle_all_query(query_params)
                elif path == '/api/stats':
                    self._handle_stats_query()
                else:
                    self._send_error(404, "API endpoint not found")
            else:
                self.send_response(404)
                self.send_header('Content-type', 'application/json; charset=utf-8')
                self.send_header('Access-Control-Allow-Origin', '*')
                self.end_headers()
                self._send_error(404, "Endpoint not found")

                if path == '/api/query':
                    self._handle_path_query(query_params)

后记

以下表格总结了风险流量审计平台中所有导入的Python库及其用到的具体函数/方法,包含每个库的模块、作用、使用的函数及其功能描述。表格旨在为开发、复习和分享提供清晰参考。

库名称 模块/子模块 作用 使用的函数/方法 功能描述
os os.path 提供操作系统接口,用于文件路径操作 os.path.join 拼接文件路径,确保跨平台兼容(如生成CSV输出路径)
os os.path 提供操作系统接口,用于文件路径操作 os.path.exists 检查文件是否存在(如检查patterns_file是否有效)
datetime datetime 处理日期和时间,用于时间戳和清理 datetime.now 获取当前时间,用于记录连接时间戳或清理旧数据
datetime datetime 处理日期和时间,用于时间戳和清理 datetime.fromisoformat 解析ISO格式时间字符串(如SSRFReceiver中处理时间查询)
datetime timedelta 表示时间间隔,用于定时任务 timedelta 创建时间差(如SSRFReceiver中设置24小时清理间隔)
re re 正则表达式处理,用于风险流量模式匹配 re.compile 编译正则表达式,提高匹配效率(如SQL注入模式)
re re 正则表达式处理,用于风险流量模式匹配 re.findall 查找所有匹配的模式(如检测攻击特征)
re re 正则表达式处理,用于风险流量模式匹配 re.search 查找首个匹配模式(如提取SSRF标记)
concurrent.futures ThreadPoolExecutor 提供线程池,支持并行处理 ThreadPoolExecutor 创建线程池(如批量处理流量或并行审计)
concurrent.futures concurrent.futures 提供线程池,支持并行处理 as_completed 按完成顺序获取线程结果(如处理批次结果)
asyncio asyncio 异步编程,支持Agent异步执行 asyncio.wait_for 设置异步任务超时(如Agent审计单条记录)
asyncio asyncio 异步编程,支持Agent异步执行 asyncio.ensure_future 调度异步任务(如并行审计)
subprocess subprocess 执行外部命令,用于Curl沙箱 subprocess.run 运行Curl命令,捕获输出和错误(如execute_curl)
requests requests HTTP请求,用于SSRF检查和API调用 requests.get 发送GET请求(如查询SSRF触发状态)
urllib.parse urllib.parse URL解析和解码 urllib.parse.unquote 解码URL编码字符串(如decodeurl)
urllib.parse urllib.parse URL解析和解码 urllib.parse.parse_qs 解析URL查询参数(如SSRFReceiver处理API查询)
json json JSON数据处理,用于结果存储和解析 json.loads 解析JSON字符串(如提取Agent输出)
json json JSON数据处理,用于结果存储和解析 json.dump 将数据写入JSON文件(如SSRFReceiver保存连接记录)
time time 时间操作,用于清理线程睡眠 time.sleep 暂停执行(如SSRFReceiver清理线程等待1小时)
threading threading 线程管理,用于SSRFReceiver并发安全 threading.Lock 创建锁,确保连接记录线程安全
threading threading 线程管理,用于SSRFReceiver并发安全 threading.Thread 创建守护线程(如定时清理旧数据)
http.server http.server 构建HTTP服务器,用于SSRF接收 HTTPServer 创建HTTP服务器,监听SSRF请求
http.server http.server 构建HTTP服务器,用于SSRF接收 BaseHTTPRequestHandler 基类,用于自定义SSRF查询处理
csv csv 处理CSV文件,用于流量输入/输出 csv.DictReader 读取CSV为字典列表(如read_traffic_from_csv_optimized)
csv csv 处理CSV文件,用于流量输入/输出 csv.DictWriter 写入字典列表到CSV(如export_results_to_csv_enhanced)
pandas pandas 数据处理,用于合并和分析数据 pandas.DataFrame 创建数据框(如autonomous_parallel_audit处理merged_df)
pandas pandas.DataFrame 数据处理,用于合并和分析数据 DataFrame.iterrows 迭代数据框行(如并行审计记录)
base64 base64 Base64编码解码,用于流量内容处理 base64.b64decode 解码Base64字符串(如decodecontent)
binascii binascii 十六进制解码,用于流量内容处理 binascii.unhexlify 解码Hex字符串(如decodecontent)
langchain langchain.chat_models LLM集成,用于Agent初始化 BaseChatOpenAI 初始化OpenAI模型(如gpt-4.1-mini)
langchain langchain.prompts 提示模板,用于Agent指令 ChatPromptTemplate.from_messages 定义Agent提示(如指定JSON输出格式)
langchain langchain.agents Agent框架,支持ReAct循环 create_openai_tools_agent 创建工具调用Agent
langchain langchain.agents Agent框架,支持ReAct循环 AgentExecutor 执行Agent任务,限制迭代次数
fastmcp fastmcp MCP框架,提供工具接口 @mcp.tool 装饰器,定义MCP工具(如execute_curl_command)
fastmcp fastmcp MCP框架,提供工具接口 MultiServerMCPClient 初始化MCP客户端,连接多个服务
作者:Jeebiz  创建时间:2025-10-19 11:54
最后编辑:Jeebiz  更新时间:2025-10-19 12:18