目录
6.2.2 工具学习与Agent架构(Tool Learning & Agents)
第一部分:原理详解
第二部分:结构化伪代码
第三部分:代码实现
脚本6.2.2.1:函数调用Agent与准确率可视化
脚本6.2.2.2:ReAct循环与HotpotQA评估
脚本6.2.2.3:多Agent协作系统
6.2.2.1 函数调用与JSON模式强制
工具学习(Tool Learning)作为基础模型与外部环境交互的核心机制,其理论基础源于Qin等人于2023年发表的综述研究。该领域关注如何使大型语言模型(LLM)理解、选择并执行外部工具以扩展其能力边界。函数调用(Function Calling)作为工具学习的具体实现范式,通过结构化描述将工具接口暴露给模型,使其能够在生成自然语言响应的同时输出可执行的操作指令。
工具描述(Tool Description)通常遵循JSON Schema规范,包含工具名称、功能描述、参数类型及约束条件。当LLM接收到用户查询时,首先进行意图识别与工具选择决策,随后进入参数生成阶段。为确保输出符合预定义的模式约束,现代系统采用约束解码(Constrained Decoding)技术,通过在解码过程中动态屏蔽不符合语法的token,强制生成结构合法的JSON对象。另一种技术路径涉及低秩适应(LoRA)微调,通过在特定工具调用数据集上训练适配器,使模型内化模式约束而无需修改基础模型参数。
约束解码的实现依赖于有限状态自动机(FSA)或上下文无关语法(CFG)的实时编译。当模型生成每个token时,解码器查询当前语法状态,仅保留能使部分生成序列保持语法有效性的候选token。这种方法避免了后验证的开销,确保一次生成即满足模式要求。对于复杂嵌套模式,采用递归下降解析器的增量版本,在生成过程中维护解析栈状态,动态调整可用token集合。
在实现支持多工具调度的Agent系统时,需构建工具注册表(Tool Registry)维护可用工具集合,设计描述合成模块将工具元数据转换为模型可理解的提示格式。系统架构包含工具选择器、参数生成器、执行调度器及结果解析器四个核心组件。工具选择器基于查询语义相似度计算,从候选工具集合中检索最相关子集;参数生成器在选定工具的模式约束下完成结构化输出;执行调度器处理外部API调用并管理异步回调;结果解析器将工具返回值转换为自然语言描述,支持下一轮推理。
准确率优化策略涵盖训练数据增强、负样本挖掘与动态 few-shot 示例检索。通过在训练集中引入错误模式样本,模型学习识别边界情况;动态示例检索根据当前查询从工具使用案例库中抽取语义相似的示范,提升上下文学习能力。
T={t1,t2,…,tn},ti=(namei,desci,schemai)P(at∣q,T)=softmax(W⋅LLM(q,T)+b)constrained_decode(schema,s
6.2.2.2 ReAct循环实现
ReAct(Reasoning + Acting)框架由Yao等人于2023年提出,核心思想在于将链式思考(Chain-of-Thought)推理与行动执行交错融合,形成认知与行为的协同闭环。与传统先思考后行动的分离模式不同,ReAct在每个决策步骤中迭代生成推理轨迹(Thought)与行动指令(Action),通过环境观察(Observation)反馈构建动态上下文。
该架构的理论基础建立在认知科学的双重过程理论之上,模拟人类解决问题的启发式路径:当面对复杂查询时,主体首先进行初步推理确定信息缺口,执行外部行动获取缺失数据,基于观察结果修正推理路径,循环直至达成目标。这种交错生成模式有效缓解了长程依赖问题,通过显式追踪推理步骤增强可解释性。
长上下文管理是ReAct工程实现的关键挑战。随着交互轮次增加,历史轨迹长度呈线性增长,最终可能超出模型上下文窗口限制。Truncation策略需保留决策关键路径,丢弃冗余信息。常用方法包括基于信息熵的句子重要性评分,保留高熵值(信息量大)的推理步骤;采用摘要模型对早期轮次进行压缩编码;实施滑动窗口机制,仅保留最近k轮交互作为工作记忆,将早期历史转入外部向量存储。
在多跳问答(Multi-hop QA)场景如HotpotQA中,ReAct展现出显著优势。系统通过显式追踪推理链,识别跨文档关联关系,动态决定信息检索策略。与纯链式思考(CoT)相比,ReAct允许在推理过程中验证事实假设,通过外部知识检索纠正错误前提,避免错误累积。
交错生成机制要求模型具备严格的格式遵循能力,通常通过提示工程或微调实现Thought-Action-Observation三元组的结构化输出。Thought段采用自由文本形式阐述推理逻辑,Action段遵循特定语法调用外部工具,Observation段由环境执行模块填充,形成闭环反馈。
τ=[(r1,a1,o1),(r2,a2,o2),…,(rT,aT,oT)]at=πθ(q,τ
6.2.2.3 多Agent协作
多智能体协作架构的理论根源可追溯至分布式人工智能与多智能体系统(MAS)研究。Park等人于2023年提出的生成式智能体(Generative Agents)架构展示了自主智能体在虚拟环境中的社交行为模拟,而AutoGen框架则系统化了对话式编程范式,确立了智能体间协作的工程实践标准。
该架构的核心在于智能体通信协议的设计,借鉴人工智能通信语言(ACL)原语,定义Inform(通知)、Request(请求)、Propose(提议)等言语行为(Speech Acts)。这些原语构成智能体间消息传递的基础,支持信息交换、任务委托与协商决策。协议实现需包含消息类型标识、发送者/接收者地址、内容载荷及会话上下文管理。
角色专业化(Role Specialization)通过功能解耦提升系统整体效能。规划者(Planner)负责高层策略制定与任务分解,将复杂目标拆解为可执行子任务序列;执行者(Executor)专注于具体工具调用与环境交互,处理实时数据;审查者(Critic)实施质量监控与一致性校验,识别逻辑漏洞与事实错误。三角色形成生产-检验-优化的闭环质量控制体系。
智能体状态管理采用共享黑板(Blackboard)架构与点对点消息传递的混合模式。全局黑板维护公共知识与任务状态,支持异步信息广播;私有通道用于点对点协调,保护敏感中间结果。状态同步机制处理并发冲突,通过乐观锁或向量时钟确保一致性。
在代码生成任务中,多智能体系统通过分工协作提升复杂问题解决能力。编码智能体专注于算法实现与语法正确性,测试智能体生成验证用例并执行边界条件检查,审查智能体评估代码风格、安全性与可维护性。三智能体通过迭代反馈循环逐步优化解决方案,直至通过全部验收标准。
A={A1,A2,…,Am},Ai=(rolei,statei,policyi)M=(sender,receiver,performative,content,ϕ)G(task)=arg{ai}mini=1∑mC(Ai,ai)+λD({ai})
6.2.2.4 工具创建与代码即工具
工具创建(Tool Creation)代表了智能体从工具使用者向工具制造者的进化,其理论基础源于Wang等人2023年提出的Voyager框架。该范式突破预定义工具集合的限制,使智能体能够根据环境需求动态编写可执行代码,构建个性化技能库(Skill Library)。
技能库采用模块化存储架构,每个技能表示为包含代码实现、自然语言描述、嵌入向量及元数据的结构化记录。技能创建流程包含需求分析、代码生成、安全校验与功能验证四个阶段。智能体首先分析当前任务与现有能力的差距,确定所需新功能;随后基于环境观察与目标描述,生成实现该功能的Python函数;通过沙箱执行验证语法正确性与功能符合性;最终通过描述嵌入实现技能检索。
代码即工具(Code-as-Tool)范式利用编程语言的表达力与通用性,将工具创建统一为代码生成问题。相比传统固定接口的工具,代码工具具备图灵完备性,可表达复杂控制流与数据转换。技能组合(Skill Composition)通过函数调用链实现,新技能可复用既有技能作为子程序,形成层次化能力体系。
向量检索机制支持基于语义相似度的技能召回。当智能体面临新任务时,系统计算任务描述与技能库中各条目的嵌入向量相似度,返回Top-K相关技能作为上下文示例。这种检索增强生成(RAG)模式使智能体能够累积经验,避免重复造轮子,实现持续学习与自我改进。
在开放世界环境如Minecraft物品合成任务中,工具创建智能体通过观察环境状态,识别缺失的合成配方,自动生成查询合成路径或执行采集策略的代码工具。随着探索深入,技能库不断扩充,智能体逐步掌握从基础资源获取到复杂物品制造的全链条能力。
S={(ci,di,ei,metai)},ei=Embed(di)create(obs,goal)=LLMcode(promptvoyager(obs,goal,Srelevant))retrieve(q,S)=top-ks∈Ssim(Embed(q),es)
6.2.2.1 函数调用与JSON模式强制
begin{algorithm} caption{Constrained Tool Selection and Parameter Generation} begin{algorithmic} Require Query q , Tool Registry T , JSON Schema S Ensure Tool call c=(t,params) or ∅
State Tcand←Filter(T,q) Comment{Semantic relevance pre-filtering} State scores←{} For{t∈Tcand } do State st←LLM(q,desct) Comment{Compatibility scoring} State scores←scores∪{(t,st)} EndFor State t∗←argmax(t,s)∈scoress Comment{Tool selection}
If{st∗ <τthreshold }="" then="" state="" return="" ∅="" comment{no="" suitable="" tool="" found}="" endif<="" p=""> τthreshold>
State params←ConstrainedGenerate(q,t∗,St∗) Comment{Schema-compliant generation}
Function{ConstrainedGenerate}{q,t,schema } State sequence←[] State state←CompileFSA(schema) Comment{Initialize finite state automaton} While{¬state.isTerminal() } do State valid←state.validTokens() State probs←LLM.nextTokenProbs(q,sequence) State v∗←argmaxv∈validprobs[v] State sequence←sequence⊕[v∗] State state←state.transition(v∗) EndWhile State Return ParseJSON(sequence) EndFunction
State Return (t∗,params) end{algorithmic} end{algorithm}
begin{algorithm} caption{Tool Execution and Accuracy Evaluation} begin{algorithmic} Require Dataset D={(qi,tigold,pigold)}i=1N , Agent M Ensure Accuracy acc
State correct←0 For{i←1 to N } do State ci←M(qi) Comment{Predicted tool call} If{ci=∅ } then State match←(tigold=∅) Else State (tipred,pipred)←ci State match←(tipred=tigold)∧(pipred≈pigold) EndIf If{match } then State correct←correct+1 EndIf EndFor State acc←correct/N State Return acc end{algorithmic} end{algorithm}
6.2.2.2 ReAct循环实现
begin{algorithm} caption{ReAct Interleaved Reasoning and Acting} begin{algorithmic} Require Query q , Max steps Tmax , Environment E Ensure Answer a or failure ⊥
State τ←[] Comment{Trajectory history} State step←0 While{step
plain
复制
State $thought leftarrow ext{LLM}_{gen}(context, ext{prompt}_{think})$ Comment{Reasoning step}
State \(context leftarrow context oplus thought\)
State \(action leftarrow ext{LLM}_{gen}(context, ext{prompt}_{act})\) Comment{Action generation} State \(context leftarrow context oplus action\)
If{\(action.type = ext{Terminate}\)} then
State Return $action.content$ Comment{Final answer}
EndIf
State \(obs leftarrow mathcal{E}. ext{execute}(action)\) Comment{Environmental feedback} State \( au leftarrow au oplus [(thought, action, obs)]\) State \(step leftarrow step + 1\)
If{\(| au| > W_{window}\)} then
State $ au leftarrow ext{SmartTruncate}( au)$ Comment{Entropy-based compression}
EndIf
EndWhile
State Return ⊥ Comment{Max steps exceeded} end{algorithmic} end{algorithm}
begin{algorithm} caption{Long Context Truncation Strategy} begin{algorithmic} Require Trajectory τ , Window size W , Compressor C Ensure Compressed trajectory τ′
If{∣τ∣≤W } then State Return τ EndIf
State importance←[] For{i←1 to ∣τ∣ } do State ei←Entropy(thoughti)+λ⋅IG(actioni,obsi) State importance←importance⊕[(i,ei)] EndFor
State keep←TopK(importance,k=W/2) Comment{Keep high-entropy steps} State summarize←{i∣i∈/keep∧i
State τ′←[summary]⊕{(thoughti,actioni,obsi)∣i∈keep} State Return $ au' end{algorithmic} end{algorithm}
begin{algorithm} caption{HotpotQA Evaluation: ReAct vs CoT} begin{algorithmic} Require Dataset Dhotpot , ReAct agent MR , CoT agent MC Ensure F1 scores F1R,F1C , API counts ApiR,ApiC
State metricsR,metricsC←[],[]
For{d∈Dhotpot } do State (ansR,traceR)←MR(d.question) State (ansC,traceC)←MC(d.question)
plain
复制
State $f1_R leftarrow ext{ComputeF1}(ans_R, d. ext{answer})$
State \(f1_C leftarrow ext{ComputeF1}(ans_C, d. ext{answer})\)
State \(api_R leftarrow |{a in trace_R | a.type = ext{Search}}|\) State \(api_C leftarrow 0\) Comment{CoT uses no external tools}
State \(metrics_R leftarrow metrics_R oplus (f1_R, api_R)\) State \(metrics_C leftarrow metrics_C oplus (f1_C, api_C)\)
EndFor
State F1R←∣D∣1∑f1R , quad ApiR←∣D∣1∑apiR State F1C←∣D∣1∑f1C , quad ApiC←0
State Return (F1R,F1C,ApiR,ApiC) end{algorithmic} end{algorithm}
6.2.2.3 多Agent协作
begin{algorithm} caption{ACL-Based Multi-Agent Communication Protocol} begin{algorithmic} Require Task task , Agent set A={A1,A2,A3} with roles {Planner,Executor,Critic} Ensure Solution sol or failure ⊥
State blackboard←{task:task,status:pending,iter:0} State inbox←{} Comment{Message queues} State max_iter←10
While{blackboard.iter
plain
复制
State $inbox[A_{Executor}] leftarrow inbox[A_{Executor}] oplus [(A_{Planner}, ext{Request}, plan)]$
State \(msg_E leftarrow A_{Executor}. ext{process}(inbox[A_{Executor}], blackboard)\) State \(code leftarrow msg_E. ext{content}\) State \(exec_result leftarrow ext{SandboxRun}(code)\)
State \(inbox[A_{Critic}] leftarrow inbox[A_{Critic}] oplus [(A_{Executor}, ext{Inform}, exec_result)]\) State \(msg_C leftarrow A_{Critic}. ext{evaluate}(inbox[A_{Critic}], blackboard)\) State \(review leftarrow msg_C. ext{content}\)
If{\(review. ext{pass} = ext{True}\)} then
State $blackboard. ext{status} leftarrow ext{completed}$ State Return $code$
Else
State $inbox[A_{Planner}] leftarrow inbox[A_{Planner}] oplus [(A_{Critic}, ext{Propose}, review. ext{feedback})]$ State $blackboard. ext{iter} leftarrow blackboard. ext{iter} + 1$
EndIf
EndWhile
State Return ⊥ end{algorithmic} end{algorithm}
begin{algorithm} caption{LeetCode Hard Problem Solving with 3-Agent System} begin{algorithmic} Require Problem p , Test cases T , Success threshold θ=0.3 Ensure Success rate rate
State solved←0 , quad total←∣Phard∣ State Phard←LoadLeetCodeHard()
For{p∈Phard } do State attempts←0 , quad max_attempts←5 State success←False
plain
复制
While{$
eg success land attempts < max_attempts$} do
State $code leftarrow ext{MultiAgentSolve}(p)$ Comment{Algorithm 4} State $pass leftarrow ext{RunTests}(code, mathcal{T}_p)$ If{$pass$} then State $success leftarrow ext{True}$ State $solved leftarrow solved + 1$ Else State $attempts leftarrow attempts + 1$ State $p leftarrow p oplus { ext{prev_error}: ext{GetLastError}()}$ EndIf
EndWhile
EndFor
State rate←solved/total State Return rate end{algorithmic} end{algorithm}
6.2.2.4 工具创建与代码即工具
begin{algorithm} caption{Voyager-Style Skill Library Management} begin{algorithmic} Require Environment E , Goal g , Skill library S , Embedding model ME Ensure Success status, Updated S′
State Srelevant←RetrieveSkills(g,S,ME,k=3) State prompt←CraftVoyagerPrompt(g,E.observe(),Srelevant) State code←LLMcode(prompt)
If{¬SyntaxCheck(code) } then State Return False,S EndIf
State result←SandboxExec(code) If{result.success } then State desc←LLMsummarize(code,result) State emb←ME(desc) State skill leftarrow (code, desc, emb, {created: ext{now}, used}: 0}) State S′←S∪{skill} State Return True,S′ Else State Return False,S EndIf end{algorithmic} end{algorithm}
begin{algorithm} caption{Self-Improving Agent with Dynamic Tool Creation} begin{algorithmic} Require Task sequence {task1,...,taskN} , Environment E Ensure Cumulative success rate RN
State S←∅ Comment{Initialize empty skill library} State successes←0
For{i←1 to N } do State attempts←0 , quad done←False While{¬done∧attempts<3 } do If{S=∅ } then State skill∗←argmaxs∈Ssim(ME(taski),es) State result←ExecSkill(skill∗,E) If{result.covers(taski) } then State done←True State successes←successes+1 State skill∗.used←skill∗.used+1 EndIf EndIf
plain
复制
If{$
eg done$} then
State $(success, mathcal{S}) leftarrow ext{CreateSkill}(task_i, mathcal{E}, mathcal{S}, mathcal{M}_E)$ If{$success$} then State $done leftarrow ext{True}$ State $successes leftarrow successes + 1$ EndIf EndIf State $attempts leftarrow attempts + 1$
EndWhile
EndFor
State RN←successes/N State Return RN,S end{algorithmic} end{algorithm}
脚本6.2.2.1:函数调用Agent与准确率可视化
内容概述:
本脚本实现支持5种工具(计算器、搜索、日历、翻译、代码执行)的函数调用Agent。采用模拟LLM进行工具选择演示,实现JSON Schema约束生成器,构建完整评估流水线,并通过matplotlib生成交互式准确率对比可视化与工具使用热力图。
使用方式:
bash
复制
python tool_calling_agent.py
直接运行将执行模拟测试集评估,生成可视化报告,并输出工具调用准确率统计。
Python
复制
#!/usr/bin/env python3
-- coding: utf-8 --
""" 脚本6.2.2.1:函数调用(Function Calling)与JSON模式强制
- 实现工具描述的schema解析
- 强制输出符合JSON Schema的函数调用参数
- 构建支持5个工具的Agent(计算器、搜索、日历、翻译、代码执行)
- 工具调用准确率>95%验证与可视化 """
import json import random import numpy as np import matplotlib.pyplot as plt from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field from datetime import datetime, timedelta import re from matplotlib.patches import Rectangle, FancyBboxPatch import matplotlib.patches as mpatches
============================================================================
第一部分:工具定义与Schema规范
============================================================================
@dataclass class ToolParameter:
"""工具参数定义""" name: str type: str description: str required: bool = True enum: Optional[List[str]] = None
@dataclass class Tool:
"""工具定义""" name: str description: str parameters: List[ToolParameter] func: callable = field(repr=False) def to_schema(self) -> Dict[str, Any]: """转换为JSON Schema""" properties = {} required = [] for param in self.parameters: prop = {"type": param.type, "description": param.description} if param.enum: prop["enum"] = param.enum properties[param.name] = prop if param.required: required.append(param.name) return { "type": "object", "properties": properties, "required": required }
工具实现函数
def calculator(expression: str) -> str:
"""安全计算器工具""" try: # 只允许数字和基本运算符 allowed = set('0+-*/.() ') if not all(c in allowed for c in expression): return "Error: Invalid characters" result = eval(expression) return str(result) except Exception as e: return f"Error: {str(e)}"
def search_tool(query: str, top_k: int = 3) -> str:
"""模拟搜索工具""" # 模拟知识库 kb = results = [v for k, v in kb.items() if query.lower() in k] if not results: return "No relevant results found." return "
".join(results[:top_k])
def calendar_tool(action: str, date: Optional[str] = None, event: Optional[str] = None) -> str:
"""日历管理工具""" if action == "get_current": return datetime.now().strftime("%Y-%m-%d %H:%M:%S") elif action == "add_event": if not date or not event: return "Error: Date and event required" return f"Added event '{event}' on {date}" elif action == "check_weekday": if not date: date = datetime.now().strftime("%Y-%m-%d") dt = datetime.strptime(date, "%Y-%m-%d") weekdays = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] return weekdays[dt.weekday()] else: return "Error: Unknown action"
def translate_tool(text: str, target_lang: str, source_lang: str = "auto") -> str:
"""模拟翻译工具""" # 简单模拟翻译 translations = { ("hello", "zh"): "你好", ("world", "zh"): "世界", ("thank you", "zh"): "谢谢", ("你好", "en"): "Hello" } key = (text.lower(), target_lang.lower()) if key in translations: return translations[key] return f"[Translated {text} from {source_lang} to ]"
def code_executor(code: str, language: str = "python") -> str:
"""代码执行工具(模拟沙箱)""" if language != "python": return "Error: Only Python supported" try: # 模拟执行结果 if "print" in code: return "Execution output: Hello World" elif "def " in code: return "Function defined successfully" elif "import" in code: return "Modules imported" else: return "Code executed without output" except Exception as e: return f"Execution error: {str(e)}"
工具注册表
TOOLS =
============================================================================
第二部分:约束解码与Agent实现
============================================================================
class ConstrainedJSONGenerator:
"""模拟约束解码生成器""" def __init__(self, schema: Dict[str, Any]): self.schema = schema self.required = schema.get("required", []) self.properties = schema.get("properties", {}) def generate(self, query: str, context: str = "") -> Dict[str, Any]: """ 模拟基于约束的JSON生成 实际实现中会使用FSM或CFG约束token生成 """ result = {} # 基于查询内容模拟参数提取 for prop_name, prop_info in self.properties.items(): if prop_name == "expression": # 从查询中提取数学表达式 numbers = re.findall(r'd+', query) ops = re.findall(r'[+-*/]', query) if numbers and ops: result[prop_name] = f"{numbers[0]} {ops[0]} {numbers[1]}" if len(numbers) > 1 else numbers[0] else: result[prop_name] = "2 + 2" elif prop_name == "query": result[prop_name] = query.replace("search for", "").replace("find", "").strip() elif prop_name == "text" and "translate" in query.lower(): # 提取翻译文本 match = re.search(r'translate["']?([^"']+)["']?', query.lower()) result[prop_name] = match.group(1).strip() if match else "hello" elif prop_name == "target_lang": if "chinese" in query.lower() or "中文" in query: result[prop_name] = "zh" elif "english" in query.lower(): result[prop_name] = "en" else: result[prop_name] = "zh" elif prop_name == "action": if "current" in query.lower() or "now" in query.lower(): result[prop_name] = "get_current" elif "weekday" in query.lower() or "day" in query.lower(): result[prop_name] = "check_weekday" else: result[prop_name] = "get_current" elif prop_name == "code": # 提取代码块 code_match = re.search(r'pythons*(.*?)s*', query, re.DOTALL) if code_match: result[prop_name] = code_match.group(1) else: result[prop_name] = "print('Hello World')" # 处理可选参数 if not prop_info.get("required", True) and prop_name not in result: if prop_name == "top_k": result[prop_name] = 3 elif prop_name == "source_lang": result[prop_name] = "auto" elif prop_name == "language": result[prop_name] = "python" return result
class ToolCallingAgent:
"""工具调用Agent""" def __init__(self, tools: Dict[str, Tool]): self.tools = tools self.generators = {name: ConstrainedJSONGenerator(tool.to_schema()) for name, tool in tools.items()} self.history = [] def select_tool(self, query: str) -> Optional[str]: """ 工具选择逻辑 基于关键词匹配模拟语义选择 """ query_lower = query.lower() # 工具选择规则(模拟LLM决策) if any(kw in query_lower for kw in ["calculate", "compute", "math", "sum", "multiply", "divide", "+", "-", "*", "/"]): return "calculator" elif any(kw in query_lower for kw in ["search", "find", "look up", "information", "what is", "who is", "capital"]): return "search" elif any(kw in query_lower for kw in ["calendar", "date", "time", "schedule", "event", "today", "weekday"]): return "calendar" elif any(kw in query_lower for kw in ["translate", "translation", "chinese", "english", "language"]): return "translate" elif any(kw in query_lower for kw in ["code", "execute", "run", "python", "program", "function"]): return "code_executor" return None def execute(self, query: str) -> Dict[str, Any]: """ 执行完整工具调用流程 """ # 1. 工具选择 tool_name = self.select_tool(query) if not tool_name: return { "success": False, "error": "No suitable tool found", "tool": None, "parameters": None, "result": None } # 2. 参数生成(约束解码) generator = self.generators[tool_name] params = generator.generate(query) # 3. 验证参数完整性 tool = self.tools[tool_name] missing = [p.name for p in tool.parameters if p.required and p.name not in params] if missing: return { "success": False, "error": f"Missing required parameters: {missing}", "tool": tool_name, "parameters": params, "result": None } # 4. 执行工具 try: result = tool.func(params) self.history.append({ "query": query, "tool": tool_name, "params": params, "result": result }) return { "success": True, "tool": tool_name, "parameters": params, "result": result, "schema_valid": True } except Exception as e: return { "success": False, "error": str(e), "tool": tool_name, "parameters": params, "result": None }
============================================================================
第三部分:评估与可视化
============================================================================
def generate_test_dataset(n_samples: int = 100) -> List[Dict[str, Any]]:
""" 生成测试数据集 """ templates = { "calculator": [ ("What is 5 plus 3?", "calculator", {"expression": "5 + 3"}), ("Calculate 10 * 25", "calculator", {"expression": "10 * 25"}), ("Compute 100 divided by 4", "calculator", {"expression": "100 / 4"}), ("Math: 2+2", "calculator", {"expression": "2 + 2"}), ], "search": [ ("Search for capital of france", "search", {"query": "capital of france", "top_k": 3}), ("Find information about python list comprehension", "search", {"query": "python list comprehension"}), ("Look up machine learning", "search", {"query": "machine learning"}), ], "calendar": [ ("What is the current time?", "calendar", ), ("Check weekday for 2024-01-01", "calendar", {"action": "check_weekday", "date": "2024-01-01"}), ("Add meeting on 2024-12-25", "calendar", {"action": "add_event", "date": "2024-12-25", "event": "meeting"}), ], "translate": [ ("Translate hello to Chinese", "translate", ), ("Translate 'world' to zh", "translate", ), ("Convert thank you to Chinese", "translate", ), ], "code_executor": [ ("Execute python code: print('Hello')", "code_executor", {"code": "print('Hello')", "language": "python"}), ("Run this code: def add(x,y): return x+y", "code_executor", {"code": "def add(x,y): return x+y", "language": "python"}), ] } dataset = [] all_samples = [item for sublist in templates.values() for item in sublist] for i in range(n_samples): sample = random.choice(all_samples) dataset.append({ "query": sample[0], "expected_tool": sample[1], "expected_params": sample[2], "id": i }) return dataset
def evaluate_agent(agent: ToolCallingAgent, dataset: List[Dict[str, Any]]) -> Dict[str, Any]:
""" 评估工具调用准确率 """ correct_tool = 0 correct_params = 0 total = len(dataset) per_tool_stats = {tool: {"total": 0, "correct": 0} for tool in agent.tools.keys()} for item in dataset: result = agent.execute(item["query"]) expected_tool = item["expected_tool"] per_tool_stats[expected_tool]["total"] += 1 # 检查工具选择 if result["tool"] == expected_tool: correct_tool += 1 per_tool_stats[expected_tool]["correct"] += 1 # 检查参数(简化比较) if result["success"]: correct_params += 1 accuracy_tool = correct_tool / total accuracy_params = correct_params / total return { "total": total, "correct_tool": correct_tool, "correct_params": correct_params, "accuracy_tool": accuracy_tool, "accuracy_params": accuracy_params, "per_tool": per_tool_stats }
def visualize_results(eval_results: Dict[str, Any], agent: ToolCallingAgent):
""" 生成综合可视化报告 """ fig = plt.figure(figsize=(16, 10)) gs = fig.add_gridspec(3, 3, hspace=0.3, wspace=0.3) # 1. 总体准确率仪表盘(左上) ax1 = fig.add_subplot(gs[0, 0]) accuracies = [eval_results["accuracy_tool"], eval_results["accuracy_params"]] colors = ['#2ecc71' if a >= 0.95 else '#e74c3c' for a in accuracies] bars = ax1.bar(["Tool
Selection", "Parameter Generation"],
[a * 100 for a in accuracies], color=colors, alpha=0.8, edgecolor='black') ax1.axhline(y=95, color='red', linestyle='--', label='Target: 95%') ax1.set_ylim(0, 105) ax1.set_ylabel("Accuracy (%)") ax1.set_title("Overall Accuracy Metrics", fontweight='bold') for bar, acc in zip(bars, accuracies): height = bar.get_height() ax1.text(bar.get_x() + bar.get_width()/2., height + 1, f'{acc*100:.1f}%', ha='center', va='bottom', fontweight='bold') ax1.legend() # 2. 每工具准确率(右上) ax2 = fig.add_subplot(gs[0, 1:]) tool_names = list(eval_results["per_tool"].keys()) tool_accs = [eval_results["per_tool"][t]["correct"] / eval_results["per_tool"][t]["total"] * 100 if eval_results["per_tool"][t]["total"] > 0 else 0 for t in tool_names] bars2 = ax2.bar(tool_names, tool_accs, color=['#3498db', '#e67e22', '#9b59b6', '#1abc9c', '#f39c12'], alpha=0.8, edgecolor='black') ax2.axhline(y=95, color='red', linestyle='--', alpha=0.5) ax2.set_ylabel("Accuracy (%)") ax2.set_title("Per-Tool Selection Accuracy", fontweight='bold') ax2.tick_params(axis='x', rotation=45) for bar, acc in zip(bars2, tool_accs): height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height + 1, f'{acc:.1f}%', ha='center', va='bottom', fontsize=9) # 3. 工具使用热力图(中间整行) ax3 = fig.add_subplot(gs[1, :]) history = agent.history[-50:] # 最近50次调用 tool_usage = {t: [] for t in agent.tools.keys()} for h in history: for t in agent.tools.keys(): tool_usage[t].append(1 if h["tool"] == t else 0) # 创建累积使用图 x_pos = np.arange(len(history)) bottom = np.zeros(len(history)) colors_map = ['#3498db', '#e67e22', '#9b59b6', '#1abc9c', '#f39c12'] for i, (tool, color) in enumerate(zip(agent.tools.keys(), colors_map)): values = np.array(tool_usage[tool]) ax3.bar(x_pos, values, bottom=bottom, label=tool, color=color, alpha=0.8, width=0.8) bottom += values ax3.set_xlabel("Query Index (Recent 50)") ax3.set_ylabel("Active Tool Call") ax3.set_title("Tool Call Sequence Heatmap", fontweight='bold') ax3.legend(loc='upper right') ax3.set_ylim(0, 1.5) # 4. 约束解码流程可视化(左下) ax4 = fig.add_subplot(gs[2, 0]) ax4.axis('off') ax4.set_title("Constrained Decoding FSM", fontweight='bold') # 绘制简单的FSM状态图 states = ['Start', 'Key', 'Value', 'Comma', 'End'] y_pos = np.linspace(0.8, 0.2, len(states)) for i, (state, y) in enumerate(zip(states, y_pos)): color = '#2ecc71' if state in ['Start', 'End'] else '#3498db' rect = FancyBboxPatch((0.2, y-0.05), 0.6, 0.1, boxstyle="round,pad=0.01", facecolor=color, edgecolor='black', alpha=0.7) ax4.add_patch(rect) ax4.text(0.5, y, state, ha='center', va='center', fontweight='bold') if i < len(states) - 1: ax4.arrow(0.5, y-0.05, 0, y_pos[i+1]-y+0.05, head_width=0.05, head_length=0.02, fc='black', ec='black') ax4.set_xlim(0, 1) ax4.set_ylim(0, 1) # 5. 工具Schema复杂度对比(中下) ax5 = fig.add_subplot(gs[2, 1]) complexity = [len(tool.parameters) for tool in agent.tools.values()] names = list(agent.tools.keys()) bars5 = ax5.barh(names, complexity, color=colors_map, alpha=0.8, edgecolor='black') ax5.set_xlabel("Number of Parameters") ax5.set_title("Tool Schema Complexity", fontweight='bold') for bar, comp in zip(bars5, complexity): width = bar.get_width() ax5.text(width + 0.1, bar.get_y() + bar.get_height()/2., f'{comp}', ha='left', va='center', fontweight='bold') # 6. 执行延迟分布(右下) ax6 = fig.add_subplot(gs[2, 2]) # 模拟延迟数据 latencies = np.random.normal(0.15, 0.05, 100) # 150ms average ax6.hist(latencies, bins=20, color='#9b59b6', alpha=0.7, edgecolor='black') ax6.axvline(np.mean(latencies), color='red', linestyle='--', label=f'Mean: {np.mean(latencies)*1000:.0f}ms') ax6.set_xlabel("Latency (seconds)") ax6.set_ylabel("Frequency") ax6.set_title("Tool Execution Latency", fontweight='bold') ax6.legend() plt.suptitle("Tool Learning & Function Calling Analysis", fontsize=16, fontweight='bold', y=0.98) plt.tight_layout() plt.savefig("tool_calling_analysis.png", dpi=150, bbox_inches="tight") plt.show() print(f"
Visualization saved to tool_calling_analysis.png")
def main():
print("=" * 60) print("工具调用Agent系统 - Function Calling & JSON Schema") print("=" * 60) # 初始化Agent agent = ToolCallingAgent(TOOLS) print(f"
[系统初始化] 已加载 {len(TOOLS)} 个工具:")
for name, tool in TOOLS.items(): print(f" - {name}: {len(tool.parameters)} 个参数") # 生成测试数据 print("
[数据生成] 构建测试数据集…")
dataset = generate_test_dataset(n_samples=100) print(f" 生成 条测试样本") # 运行评估 print("
[评估开始] 执行工具调用准确率测试…")
results = evaluate_agent(agent, dataset) # 输出结果 print(f"
{‘=’*60}")
print("评估结果:") print(f"{'='*60}") print(f"总样本数: {results['total']}") print(f"工具选择准确率: {results['accuracy_tool']*100:.2f}%") print(f"参数生成准确率: {results['accuracy_params']*100:.2f}%") print(f"
各工具表现:")
for tool, stats in results["per_tool"].items(): if stats['total'] > 0: acc = stats['correct'] / stats['total'] * 100 print(f" - {tool}: {stats['correct']}/{stats['total']} ({acc:.1f}%)") # 验证>95%目标 if results['accuracy_tool'] >= 0.95: print(f"
[✓] 工具选择准确率达标 (>95%)")
else: print(f"
[✗] 工具选择准确率未达标")
# 演示单次调用 print(f"
{‘=’*60}")
print("交互演示:") print(f"{'='*60}") demo_queries = [ "Calculate 15 * 23 + 5", "Search for machine learning definitions", "What is today's date?", "Translate 'hello world' to Chinese", "Execute python: def factorial(n): return 1 if n<=1 else n*factorial(n-1)" ] for query in demo_queries: result = agent.execute(query) print(f"
查询: {query}")
print(f" 选中工具: {result['tool']}") print(f" 参数: {result['parameters']}") print(f" 执行结果: {result['result']}") print(f" 状态: ") # 可视化 print(f"
{‘=’*60}")
print("生成可视化报告...") visualize_results(results, agent) print(f"
{‘=’*60}")
print("系统运行完成") print(f"{'='*60}")
if name == "main":
main()
脚本6.2.2.2:ReAct循环与HotpotQA评估
内容概述:
本脚本完整实现ReAct(Reasoning + Acting)交错生成循环,包含Thought-Action-Observation状态机管理、长上下文Truncation策略(基于信息熵的智能压缩)。在模拟的HotpotQA多跳问答数据集上,对比ReAct与Chain-of-Thought(CoT)的F1分数与API调用次数,生成分步推理轨迹可视化与性能对比图表。
使用方式:
bash
复制
python react_agent.py
运行后执行多跳问答评估,输出F1分数对比与API效率分析,并保存推理过程可视化。
Python
复制
#!/usr/bin/env python3
-- coding: utf-8 --
""" 脚本6.2.2.2:ReAct(Reasoning + Acting)循环实现
- 实现ReAct的交错生成(Thought -> Action -> Observation -> Thought)
- 处理长上下文的truncation策略(基于信息熵)
- 在HotpotQA多跳问答上实现ReAct Agent
- 对比Chain-of-Thought vs ReAct的F1与API调用次数 """
import json import random import numpy as np import matplotlib.pyplot as plt from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, field from collections import deque import re from matplotlib.patches import FancyBboxPatch, ConnectionPatch
============================================================================
第一部分:环境模拟与数据结构
============================================================================
@dataclass class Thought:
"""推理步骤""" content: str step: int entropy: float = 0.0 # 信息熵,用于truncation决策
@dataclass class Action:
"""行动步骤""" type: str # "Search", "Lookup", "Finish" content: str params: Dict[str, Any] = field(default_factory=dict)
@dataclass class Observation:
"""观察结果""" content: str source: str # 来源工具/环境
@dataclass class TrajectoryStep:
"""轨迹步骤""" thought: Thought action: Action observation: Observation
class HotpotQADataset:
"""模拟HotpotQA多跳问答数据集""" def __init__(self): # 模拟多跳问题 self.questions = [ { "question": "Which university did the author of 'The Old Man and the Sea' attend?", "answer": "Oak Park and River Forest High School", "hops": [ ("The Old Man and the Sea", "Ernest Hemingway"), ("Ernest Hemingway", "Oak Park and River Forest High School") ], "type": "bridge" # 需要连接两个实体 }, , { "question": "Which scientist developed the theory that was used to create the first atomic bomb?", "answer": "Albert Einstein", "hops": [ ("atomic bomb", "mass-energy equivalence"), ("mass-energy equivalence", "Albert Einstein") ], "type": "bridge" }, { "question": "When was the founder of Microsoft born?", "answer": "October 28, 1955", "hops": [ ("Microsoft", "Bill Gates"), ("Bill Gates", "October 28, 1955") ], "type": "bridge" }, { "question": "What programming language was created by the developer of the Linux kernel?", "answer": "C", "hops": [ ("Linux kernel", "Linus Torvalds"), ("Linus Torvalds", "C") ], "type": "bridge" } ] # 模拟文档库 self.documents = def search(self, query: str) -> str: """模拟搜索""" query_lower = query.lower() best_match = None best_score = 0 for key, content in self.documents.items(): # 简单相似度计算 if query_lower in key.lower() or key.lower() in query_lower: score = len(set(query_lower.split()) & set(key.lower().split())) if score > best_score: best_score = score best_match = content if best_match: return best_match return f"No results found for '{query}'. Try different keywords."
============================================================================
第二部分:ReAct Agent实现
============================================================================
class ReActAgent:
"""ReAct Agent:交错推理与行动""" def __init__(self, dataset: HotpotQADataset, max_steps: int = 10, window_size: int = 5): self.dataset = dataset self.max_steps = max_steps self.window_size = window_size self.trajectory: List[TrajectoryStep] = [] self.search_count = 0 def calculate_entropy(self, text: str) -> float: """计算文本信息熵(用于truncation)""" # 简化的熵计算 if not text: return 0.0 freq = {} for char in text: freq[char] = freq.get(char, 0) + 1 entropy = 0.0 total = len(text) for count in freq.values(): p = count / total entropy -= p * np.log2(p) return entropy def generate_thought(self, question: str, context: str) -> Thought: """生成思考步骤""" step = len(self.trajectory) + 1 # 基于上下文长度模拟不同思考策略 if not self.trajectory: thought_text = f"I need to find information about: {question}. Let me start by searching for key entities." else: last_obs = self.trajectory[-1].observation.content[:100] thought_text = f"Based on the previous observation: '{last_obs}...', I should search for more specific information to answer the question." entropy = self.calculate_entropy(thought_text) return Thought(thought_text, step, entropy) def generate_action(self, thought: Thought, question: str) -> Action: """生成行动""" # 模拟动作选择逻辑 if len(self.trajectory) >= self.max_steps - 1: # 最后一步,直接回答 return Action("Finish", self.generate_answer(), {"answer": self.generate_answer()}) # 基于thought内容决定搜索关键词 # 从问题中提取实体(简化模拟) if "Eiffel" in question or "France" in str(self.trajectory): if any("Eiffel Tower" in str(t) for t in self.trajectory): search_term = "France" else: search_term = "Eiffel Tower" elif "Old Man" in question or "Hemingway" in str(self.trajectory): if any("Hemingway" in str(t) for t in self.trajectory): search_term = "Oak Park and River Forest High School" else: search_term = "Ernest Hemingway" elif "atomic bomb" in question or "Einstein" in str(self.trajectory): if any("Einstein" in str(t) for t in self.trajectory): search_term = "Finish" else: search_term = "mass-energy equivalence" elif "Microsoft" in question or "Gates" in str(self.trajectory): if any("Gates" in str(t) for t in self.trajectory): search_term = "Bill Gates" else: search_term = "Microsoft" elif "Linux" in question or "Torvalds" in str(self.trajectory): if any("Torvalds" in str(t) for t in self.trajectory): search_term = "C" else: search_term = "Linux kernel" else: search_term = "general" if search_term == "Finish" or len(self.trajectory) >= self.max_steps - 1: return Action("Finish", self.generate_answer(), {"answer": self.generate_answer()}) self.search_count += 1 return Action("Search", f"Searching for {search_term}", {"query": search_term}) def generate_answer(self) -> str: """基于轨迹生成答案""" # 从轨迹中提取关键信息(简化) all_text = " ".join([t.observation.content for t in self.trajectory]) # 基于包含的关键词返回答案(模拟推理) if "Hemingway" in all_text and "Oak Park" in all_text: return "Oak Park and River Forest High School" elif "Paris" in all_text and "France" in all_text: return "Paris" elif "Einstein" in all_text and "physicist" in all_text: return "Albert Einstein" elif "October 28, 1955" in all_text: return "October 28, 1955" elif "C" in all_text and "Torvalds" in all_text: return "C" return "Unknown" def execute_action(self, action: Action) -> Observation: """执行行动""" if action.type == "Search": result = self.dataset.search(action.params.get("query", "")) return Observation(result, "SearchEngine") elif action.type == "Finish": return Observation("Task completed", "System") else: return Observation("Unknown action", "System") def smart_truncate(self) -> List[TrajectoryStep]: """智能截断策略""" if len(self.trajectory) <= self.window_size: return self.trajectory # 基于信息熵保留重要步骤 entropies = [(i, step.thought.entropy) for i, step in enumerate(self.trajectory)] entropies.sort(key=lambda x: x[1], reverse=True) # 保留高熵步骤和最近的步骤 keep_indices = set([i for i, _ in entropies[:self.window_size//2]]) recent_indices = set(range(len(self.trajectory) - self.window_size//2, len(self.trajectory))) keep_indices = keep_indices.union(recent_indices) truncated = [self.trajectory[i] for i in sorted(keep_indices)] return truncated def solve(self, question: str) -> Tuple[str, List[TrajectoryStep], int]: """ 使用ReAct解决问题 返回: (答案, 轨迹, API调用次数) """ self.trajectory = [] self.search_count = 0 for step in range(self.max_steps): # 1. 生成思考 thought = self.generate_thought(question, "") # 2. 生成行动 action = self.generate_action(thought, question) # 3. 执行观察 observation = self.execute_action(action) # 4. 记录轨迹 self.trajectory.append(TrajectoryStep(thought, action, observation)) # 5. 检查终止 if action.type == "Finish": break # 6. 截断检查 if len(self.trajectory) > self.window_size: self.trajectory = self.smart_truncate() final_answer = self.trajectory[-1].action.content if self.trajectory[-1].action.type == "Finish" else "No answer" return final_answer, self.trajectory, self.search_count
class ChainOfThoughtAgent:
"""纯Chain-of-Thought基线(无工具调用)""" def __init__(self, dataset: HotpotQADataset): self.dataset = dataset def solve(self, question: str) -> Tuple[str, List[str], int]: """ CoT解决问题(仅推理,无搜索) 返回: (答案, 推理链, 0次API调用) """ # 模拟CoT推理 reasoning_steps = [] if "Eiffel" in question: reasoning_steps = [ "The Eiffel Tower is located in Paris.", "Paris is the capital of France.", "Therefore, the answer is Paris." ] answer = "Paris" elif "Old Man" in question: reasoning_steps = [ "The Old Man and the Sea was written by Ernest Hemingway.", "Ernest Hemingway attended Oak Park and River Forest High School.", "Therefore, the answer is Oak Park and River Forest High School." ] answer = "Oak Park and River Forest High School" elif "atomic bomb" in question: reasoning_steps = [ "The atomic bomb was based on mass-energy equivalence.", "Mass-energy equivalence was formulated by Einstein.", "Therefore, the answer is Albert Einstein." ] answer = "Albert Einstein" elif "Microsoft" in question: reasoning_steps = [ "Microsoft was founded by Bill Gates.", "Bill Gates was born on October 28, 1955.", "Therefore, the answer is October 28, 1955." ] answer = "October 28, 1955" elif "Linux" in question: reasoning_steps = [ "Linux was created by Linus Torvalds.", "Linus Torvalds primarily uses C.", "Therefore, the answer is C." ] answer = "C" else: reasoning_steps = ["Unable to determine the answer through reasoning alone."] answer = "Unknown" return answer, reasoning_steps, 0
============================================================================
第三部分:评估与可视化
============================================================================
def compute_f1(pred: str, gold: str) -> float:
"""计算F1分数""" pred_tokens = set(pred.lower().split()) gold_tokens = set(gold.lower().split()) if not pred_tokens or not gold_tokens: return 0.0 common = pred_tokens & gold_tokens precision = len(common) / len(pred_tokens) recall = len(common) / len(gold_tokens) if precision + recall == 0: return 0.0 f1 = 2 * precision * recall / (precision + recall) return f1
def evaluate_agents(dataset: HotpotQADataset, num_runs: int = 5):
"""评估ReAct vs CoT""" react_agent = ReActAgent(dataset, max_steps=8, window_size=4) cot_agent = ChainOfThoughtAgent(dataset) results = { "react": {"f1s": [], "api_calls": [], "answers": []}, "cot": {"f1s": [], "api_calls": [], "answers": []} } print("开始评估...") for item in dataset.questions: question = item["question"] gold_answer = item["answer"] # ReAct评估 react_ans, react_trace, react_apis = react_agent.solve(question) react_f1 = compute_f1(react_ans, gold_answer) results["react"]["f1s"].append(react_f1) results["react"]["api_calls"].append(react_apis) results["react"]["answers"].append(react_ans) # CoT评估 cot_ans, cot_trace, cot_apis = cot_agent.solve(question) cot_f1 = compute_f1(cot_ans, gold_answer) results["cot"]["f1s"].append(cot_f1) results["cot"]["api_calls"].append(cot_apis) results["cot"]["answers"].append(cot_ans) print(f"Q: {question}") print(f" Gold: {gold_answer}") print(f" ReAct: {react_ans} (F1: {react_f1:.2f}, APIs: {react_apis})") print(f" CoT: {cot_ans} (F1: {cot_f1:.2f})") return results
def visualize_react_analysis(results: Dict, dataset: HotpotQADataset):
"""ReAct综合分析可视化""" fig = plt.figure(figsize=(16, 12)) gs = fig.add_gridspec(3, 3, hspace=0.35, wspace=0.35) # 1. F1分数对比(左上) ax1 = fig.add_subplot(gs[0, 0]) x = np.arange(len(dataset.questions)) width = 0.35 react_f1s = results["react"]["f1s"] cot_f1s = results["cot"]["f1s"] bars1 = ax1.bar(x - width/2, react_f1s, width, label='ReAct', color='#3498db', alpha=0.8) bars2 = ax1.bar(x + width/2, cot_f1s, width, label='CoT', color='#e74c3c', alpha=0.8) ax1.set_ylabel('F1 Score') ax1.set_title('F1 Score Comparison: ReAct vs CoT', fontweight='bold') ax1.set_xticks(x) ax1.set_xticklabels([f'Q{i+1}' for i in range(len(dataset.questions))]) ax1.legend() ax1.set_ylim(0, 1.2) # 添加数值标签 for bars in [bars1, bars2]: for bar in bars: height = bar.get_height() ax1.text(bar.get_x() + bar.get_width()/2., height + 0.02, f'{height:.2f}', ha='center', va='bottom', fontsize=8) # 2. API调用次数对比(中上) ax2 = fig.add_subplot(gs[0, 1]) api_calls = results["react"]["api_calls"] colors_api = ['#2ecc71' if c <= 2 else '#f39c12' if c <= 4 else '#e74c3c' for c in api_calls] bars_api = ax2.bar(range(len(api_calls)), api_calls, color=colors_api, alpha=0.8, edgecolor='black') ax2.axhline(y=np.mean(api_calls), color='red', linestyle='--', label=f'Avg: {np.mean(api_calls):.1f}') ax2.set_xlabel('Question Index') ax2.set_ylabel('API Calls') ax2.set_title('ReAct API Call Efficiency', fontweight='bold') ax2.legend() for bar in bars_api: height = bar.get_height() ax2.text(bar.get_x() + bar.get_width()/2., height + 0.1, f'{int(height)}', ha='center', va='bottom', fontsize=9) # 3. 综合性能雷达图(右上) ax3 = fig.add_subplot(gs[0, 2], projection='polar') # 计算平均指标 metrics = ['F1 Score', 'API Efficiency', 'Completeness', 'Consistency'] react_scores = [ np.mean(results["react"]["f1s"]), 1 - (np.mean(results["react"]["api_calls"]) / 10), # 归一化,越少越好 0.9, # 模拟完整性 0.85 # 模拟一致性 ] cot_scores = [ np.mean(results["cot"]["f1s"]), 1.0, # CoT不需要API 0.7, 0.75 ] angles = np.linspace(0, 2 * np.pi, len(metrics), endpoint=False).tolist() react_scores += react_scores[:1] cot_scores += cot_scores[:1] angles += angles[:1] ax3.plot(angles, react_scores, 'o-', linewidth=2, label='ReAct', color='#3498db') ax3.fill(angles, react_scores, alpha=0.25, color='#3498db') ax3.plot(angles, cot_scores, 's-', linewidth=2, label='CoT', color='#e74c3c') ax3.fill(angles, cot_scores, alpha=0.25, color='#e74c3c') ax3.set_xticks(angles[:-1]) ax3.set_xticklabels(metrics) ax3.set_ylim(0, 1) ax3.set_title('Performance Radar', fontweight='bold', pad=20) ax3.legend(loc='upper right', bbox_to_anchor=(1.3, 1.0)) # 4. ReAct推理流程可视化(中间行整行) ax4 = fig.add_subplot(gs[1, :]) ax4.axis('off') ax4.set_title('ReAct Interleaved Reasoning-Acting Flow', fontweight='bold', fontsize=14, pad=20) # 模拟一个轨迹的可视化 y_pos = 0.5 x_positions = np.linspace(0.1, 0.9, 8) colors_step = ['#9b59b6', '#3498db', '#2ecc71'] * 3 for i, x in enumerate(x_positions[:6]): if i % 3 == 0: box_color = '#9b59b6' label = f'Thought {i//3 + 1}' elif i % 3 == 1: box_color = '#3498db' label = f'Action {i//3 + 1}' else: box_color = '#2ecc71' label = f'Obs {i//3 + 1}' rect = FancyBboxPatch((x-0.05, y_pos-0.05), 0.1, 0.1, boxstyle="round,pad=0.01", facecolor=box_color, edgecolor='black', alpha=0.7) ax4.add_patch(rect) ax4.text(x, y_pos, label, ha='center', va='center', fontsize=8, fontweight='bold') if i < 5: ax4.arrow(x+0.05, y_pos, 0.08, 0, head_width=0.03, head_length=0.02, fc='black', ec='black') ax4.set_xlim(0, 1) ax4.set_ylim(0, 1) # 5. 长上下文Truncation策略(左下) ax5 = fig.add_subplot(gs[2, 0]) steps = np.arange(1, 11) window_size = 5 # 模拟不同truncation策略的性能 no_trunc = [min(s, 10) for s in steps] # 无截断,超出窗口后失败 sliding_window = [min(s, window_size) for s in steps] # 滑动窗口 entropy_based = [min(s, window_size + 1) for s in steps] # 基于熵的智能截断 ax5.plot(steps, no_trunc, 'r--', label='No Truncation (Fail)', linewidth=2) ax5.plot(steps, sliding_window, 'b-', label='Sliding Window', linewidth=2) ax5.plot(steps, entropy_based, 'g-', label='Entropy-based', linewidth=2) ax5.fill_between(steps, sliding_window, alpha=0.3, color='blue') ax5.set_xlabel('Step Number') ax5.set_ylabel('Effective Context Length') ax5.set_title('Truncation Strategies', fontweight='bold') ax5.legend() ax5.grid(True, alpha=0.3) # 6. 信息熵分布(中下) ax6 = fig.add_subplot(gs[2, 1]) entropies = np.random.gamma(2, 2, 100) # 模拟熵分布 ax6.hist(entropies, bins=20, color='#9b59b6', alpha=0.7, edgecolor='black') ax6.axvline(np.mean(entropies), color='red', linestyle='--', linewidth=2, label=f'Mean: {np.mean(entropies):.2f}') ax6.set_xlabel('Information Entropy') ax6.set_ylabel('Frequency') ax6.set_title('Thought Entropy Distribution', fontweight='bold') ax6.legend() # 7. 多跳推理准确率(右下) ax7 = fig.add_subplot(gs[2, 2]) hop_counts = [2, 3, 4] # 跳数 react_by_hop = [0.9, 0.75, 0.6] # ReAct随跳数增加的准确率 cot_by_hop = [0.85, 0.65, 0.4] # CoT随跳数增加的准确率 ax7.plot(hop_counts, react_by_hop, 'o-', label='ReAct', color='#3498db', linewidth=2, markersize=8) ax7.plot(hop_counts, cot_by_hop, 's-', label='CoT', color='#e74c3c', linewidth=2, markersize=8) ax7.fill_between(hop_counts, react_by_hop, cot_by_hop, alpha=0.3, color='green', label='ReAct Advantage') ax7.set_xlabel('Number of Hops') ax7.set_ylabel('Accuracy') ax7.set_title('Multi-hop Reasoning Performance', fontweight='bold') ax7.legend() ax7.grid(True, alpha=0.3) ax7.set_ylim(0, 1) plt.suptitle('ReAct: Synergizing Reasoning and Acting - Analysis', fontsize=16, fontweight='bold', y=0.98) plt.tight_layout() plt.savefig("react_analysis.png", dpi=150, bbox_inches="tight") plt.show() print("
可视化已保存至 react_analysis.png")
def main():
print("=" * 70) print("ReAct Agent实现 - 交错推理与行动循环") print("=" * 70) # 初始化数据集 dataset = HotpotQADataset() print(f"
[数据集] 加载 条多跳问答")
# 运行评估 print("
[评估] 对比ReAct与Chain-of-Thought…")
results = evaluate_agents(dataset) # 统计结果 print(f"
{‘=’*70}")
print("评估统计:") print(f"{'='*70}") avg_react_f1 = np.mean(results["react"]["f1s"]) avg_cot_f1 = np.mean(results["cot"]["f1s"]) avg_react_apis = np.mean(results["react"]["api_calls"]) print(f"ReAct平均F1: {avg_react_f1:.3f}") print(f"CoT平均F1: {avg_cot_f1:.3f}") print(f"ReAct平均API调用: {avg_react_apis:.1f}") print(f"F1提升: {(avg_react_f1 - avg_cot_f1)*100:.1f}%") # 详细轨迹演示 print(f"
{‘=’*70}")
print("ReAct推理轨迹演示:") print(f"{'='*70}") demo_agent = ReActAgent(dataset, max_steps=6, window_size=3) question = dataset.questions[0]["question"] answer, trajectory, api_count = demo_agent.solve(question) print(f"问题: {question}") print(f"最终答案: {answer}") print(f"API调用次数: {api_count}") print("
详细步骤:")
for i, step in enumerate(trajectory): print(f"
步骤 {i+1}:")
print(f" [Thought {step.thought.step}] {step.thought.content[:80]}...") print(f" [Action] {step.action.type}: {step.action.content}") print(f" [Observation] {step.observation.content[:60]}...") # 生成可视化 print(f"
{‘=’*70}")
print("生成综合分析可视化...") visualize_react_analysis(results, dataset) print(f"
{‘=’*70}")
print("系统运行完成") print(f"{'='*70}")
if name == "main":
main()
脚本6.2.2.3:多Agent协作系统
内容概述:
本脚本实现基于ACL通信原语(Inform, Request, Propose)的三智能体协作系统,包含规划者(Planner)、执行者(Executor)、审查者(Critic)角色专业化实现。构建消息传递协议与共享黑板架构,在模拟LeetCode Hard问题集上测试协作解决能力,生成智能体通信拓扑图与成功率迭代优化曲线。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/253265.html