- 前言
- 目录
- 一、AI Agent 的核心架构
- 1.1 什么是AI Agent?
- 1.2 2026年Agent技术栈全景
- 二、从零搭建生产级Agent框架
- 2.1 项目结构设计
- 2.2 核心代码:Agent基类
- 2.3 记忆管理系统
- 三、三大核心技术实现
- 3.1 ReAct框架:推理+行动协同
- 3.2 工具调用系统
- 3.3 任务规划器
- 四、实战案例:智能客服Agent
- 4.1 场景分析
- 4.2 完整实现
- 4.3 性能对比
- 五、性能优化与成本控制
- 5.1 成本分析
- 5.2 优化策略
- 六、完整源码
- 快速开始
- 项目结构
- 总结
- 关键要点
- 下一步学习
- 参考资源
2023年是ChatGPT元年,2024年是多模态爆发之年,而2026年则是AI Agent的落地元年。
你可能用过Coze搭建过聊天机器人,也可能在Dify上配置过知识库问答,但当真正要把AI Agent投入生产环境时,你会发现:
- ❌ 上下文记忆经常丢失
- ❌ 工具调用成功率只有60%
- ❌ 成本控制一团混乱
- ❌ 无法处理复杂的多步任务
本文将带你从零开始,手写一个生产级AI Agent框架,解决上述所有问题。文末附完整源码。
- 一、AI Agent 的核心架构
- 二、从零搭建生产级Agent框架
- 三、三大核心技术实现
- 四、实战案例:智能客服Agent
- 五、性能优化与成本控制
- 六、完整源码
1.1 什么是AI Agent?
简单来说,AI Agent = LLM + 记忆 + 规划 + 工具
用户输入
感知层 Perception
决策层 Decision
行动层 Action
输出结果
1.2 2026年Agent技术栈全景
技术层级 主流框架/工具 推荐指数 适用场景
编排框架 LangChain / LangGraph ⭐⭐⭐⭐⭐ 复杂工作流编排
运行时 AutoGen / AgentScope ⭐⭐⭐⭐ 多Agent协作
向量数据库 Milvus / Chroma ⭐⭐⭐⭐⭐ RAG知识库
工具生态 OpenAI Function Calling ⭐⭐⭐⭐⭐ 结构化工具调用
记忆管理 MemGPT ⭐⭐⭐⭐ 长对话场景
评估框架 Ragas / TruLens ⭐⭐⭐⭐ 生产环境监控
2.1 项目结构设计
agent-framework/ ├── core/ │ ├── agent.py # Agent核心类 │ ├── memory.py # 记忆管理模块 │ ├── planner.py # 任务规划器 │ └── tools.py # 工具注册器 ├── memory/ │ ├── short_term.py # 短期记忆(Redis) │ ├── long_term.py # 长期记忆(向量DB) │ └── semantic.py # 语义记忆检索 ├── tools/ │ ├── base.py # 工具基类 │ ├── registry.py # 工具注册中心 │ └── builtin/ # 内置工具 ├── evaluators/ │ ├── cost.py # 成本评估 │ └── performance.py # 性能评估 └── utils/ ├── logger.py # 日志系统 └── retry.py # 重试机制
2.2 核心代码:Agent基类
from typing import List, Dict, Any, Optional from dataclasses import dataclass from enum import Enum import asyncio import json classAgentState(Enum):“”“Agent状态枚举”“” IDLE =“idle”# 空闲 THINKING =“thinking”# 思考中 ACTING =“acting”# 执行中 WAITING =“waiting”# 等待外部输入 ERROR =“error”# 错误状态@dataclassclassMessage:“”“消息数据结构”“” role:str# user / assistant / system / tool content:str# 消息内容 tool_calls: Optional[List[Dict]]=None# 工具调用 timestamp:float=None# 时间戳 metadata: Dict[str, Any]=None# 元数据classBaseAgent:“”“生产级Agent基类”““definit( self, llm_client: Any,# LLM客户端 memory_manager: Any =None,# 记忆管理器 tool_registry: Any =None,# 工具注册表 max_iterations:int=10,# 最大迭代次数 verbose:bool=True): self.llm = llm_client self.memory = memory_manager self.tools = tool_registry self.max_iterations = max_iterations self.verbose = verbose self.state = AgentState.IDLE self.conversation_history: List[Message]=[]asyncdefrun(self, user_input:str)->str:”““Agent主执行循环”“”# 添加用户消息到历史 self.conversation_history.append( Message(role=“user”, content=user_input)) self.state = AgentState.THINKING for iteration inrange(self.max_iterations): self._log(f”迭代 {iteration +1}/{self.max_iterations}“)# 1. 从记忆中检索相关信息 context =await self._retrieve_context(user_input)# 2. 构建提示词 prompt = self._build_prompt(context)# 3. LLM推理 response =await self._llm_inference(prompt)# 4. 检查是否需要调用工具if response.tool_calls: self.state = AgentState.ACTING # 执行工具调用 tool_results =await self._execute_tools(response.tool_calls)# 将工具结果添加到历史for result in tool_results: self.conversation_history.append( Message(role=“tool”, content=result[“content”], tool_name=result[“tool_name”]))else:# 5. 无需工具调用,返回最终答案 self.state = AgentState.IDLE self.conversation_history.append( Message(role=“assistant”, content=response.content))return response.content return”超过最大迭代次数,任务未完成”asyncdef_retrieve_context(self, query:str)->str:“”“从记忆中检索上下文”““ifnot self.memory:return”“# 检索相关记忆(这里简化处理)returnawait self.memory.search(query, top_k=3)def_build_prompt(self, context:str)->str:”““构建系统提示词”“” system_prompt =f”““你是一个智能AI助手。 # 可用工具 # 相关记忆 {context} # 任务要求 1. 分析用户需求 2. 如需信息查询或执行操作,调用相应工具 3. 基于工具结果给出准确答案 4. 如无法完成,明确说明原因 开始工作!”““return system_prompt asyncdef_llm_inference(self, prompt:str)-> Any:”““LLM推理(示例使用OpenAI格式)”“” messages =[{“role”:“system”,“content”: prompt},*[{“role”: m.role,“content”: m.content}for m in self.conversation_history]] response =await self.llm.chat.completions.create( model=“gpt-4”, messages=messages, tools=self.tools.get_tool_schemas()if self.tools elseNone, temperature=0.7)return response.choices[0].message asyncdef_execute_tools(self, tool_calls: List[Dict])-> List[Dict]:“”“执行工具调用”“” results =[]for call in tool_calls: tool_name = call[“function”][“name”] arguments = json.loads(call[“function”][“arguments”]) self._log(f”调用工具: {tool_name} | 参数: {arguments}“)try:# 从工具注册表获取工具并执行 tool = self.tools.get_tool(tool_name) result =await tool.execute(arguments) results.append({“tool_name”: tool_name,“content”: json.dumps(result, ensure_ascii=False)})except Exception as e: results.append({“tool_name”: tool_name,“content”: json.dumps({“error”:str(e)})})return results def_log(self, message:str):“”“日志输出”““if self.verbose:print(f”[Agent] {message}“)
2.3 记忆管理系统
from abc import ABC, abstractmethod from typing import List, Dict, Any import redis import numpy as np from datetime import datetime, timedelta classMemoryBackend(ABC):”““记忆后端抽象基类”“”@abstractmethodasyncdefadd(self, content:str, metadata: Dict =None)->str:“”“添加记忆”““pass@abstractmethodasyncdefsearch(self, query:str, top_k:int=5)-> List[Dict]:”““搜索记忆”““passclassShortTermMemory(MemoryBackend):”““短期记忆:基于Redis的会话记忆”““definit(self, redis_url:str=“redis://localhost:6379”, ttl:int=3600): self.client = redis.from_url(redis_url) self.ttl = ttl # 记忆过期时间(秒)asyncdefadd(self, content:str, metadata: Dict =None)->str:“”“添加会话记忆”“” memory_id =f”mem:{datetime.now().timestamp()}” memory_data ={“content”: content,“metadata”: metadata or{},“timestamp”: datetime.now().isoformat()} self.client.setex( memory_id, self.ttl, json.dumps(memory_data, ensure_ascii=False))return memory_id asyncdefsearch(self, query:str, top_k:int=5)-> List[Dict]:“”“检索最近的相关记忆”“”# 简化版:返回最近的记忆 keys = self.client.keys(“mem:*”) memories =[]for key in keys[-top_k:]: data = json.loads(self.client.get(key)) memories.append(data)return memories classLongTermMemory(MemoryBackend):“”“长期记忆:基于向量数据库的语义记忆”““definit(self, embedding_model: Any, vector_db: Any): self.embedding_model = embedding_model self.vector_db = vector_db asyncdefadd(self, content:str, metadata: Dict =None)->str:”““添加长期记忆”“”# 生成向量嵌入 embedding =await self.embedding_model.embed(content)# 存储到向量数据库 memory_id = self.vector_db.insert( vector=embedding, payload={“content”: content,“metadata”: metadata or{}})return memory_id asyncdefsearch(self, query:str, top_k:int=5)-> List[Dict]:“”“语义搜索长期记忆”“”# 查询向量嵌入 query_embedding =await self.embedding_model.embed(query)# 向量检索 results = self.vector_db.search( vector=query_embedding, top_k=top_k, score_threshold=0.7)return results classHybridMemory:“”“混合记忆管理器:整合短期和长期记忆”““definit(self, short_term: ShortTermMemory, long_term: LongTermMemory): self.short_term = short_term self.long_term = long_term asyncdefremember(self, content:str, importance:float=0.5, metadata: Dict =None):”““存储记忆(根据重要性决定存储位置)”“”# 始终存入短期记忆await self.short_term.add(content, metadata)# 重要记忆存入长期记忆if importance >0.7:await self.long_term.add(content, metadata)asyncdefrecall(self, query:str, top_k:int=5)-> List[Dict]:“”“回忆相关记忆(整合短期和长期)”“”# 并行检索 short_results =await self.short_term.search(query, top_k //2) long_results =await self.long_term.search(query, top_k //2)# 合并去重 all_results = short_results + long_results # 按相关性排序(这里简化)return all_results[:top_k]
3.1 ReAct框架:推理+行动协同
ReAct(Reasoning + Acting)是目前Agent最主流的推理范式。
是
否
用户问题
Thought: 分析问题
需要工具?
Action: 调用工具
Observation: 工具结果
Answer: 给出答案
代码实现:
classReActAgent(BaseAgent):“”“基于ReAct范式的Agent”““def_build_react_prompt(self, question:str)->str:returnf”““使用以下格式回答问题: Question: {question} Thought: 你应该思考做什么 Action: 要采取的操作,应该是 [] 中的一个 Observation: 操作的结果 … (这个 Thought/Action/Observation 可以重复N次) Thought: 我现在知道最终答案了 Answer: 对原始问题的最终答案 开始! Question: {question} Thought:”““asyncdefrun(self, user_input:str)->str:”““ReAct循环执行”“” prompt = self._build_react_prompt(user_input)for _ inrange(self.max_iterations):# LLM生成下一步动作 response =await self.llm.generate(prompt)# 解析响应 thought, action, action_input = self._parse_react_response(response)ifnot action:# 没有动作,说明已有答案return thought # 执行动作 observation =await self._execute_action(action, action_input)# 更新提示词 prompt +=f” {response} Observation: {observation} Thought:“return”无法在指定迭代次数内完成”def_parse_react_response(self, response:str)->tuple:“”“解析ReAct响应”“”# 解析 Thought、Action、Action Input# 这里简化处理,实际需要更复杂的解析 lines = response.strip().split(’ ‘) thought =“” action =None action_input =Nonefor line in lines:if line.startswith(“Thought:”): thought = line.replace(“Thought:”,“”).strip()elif line.startswith(“Action:”): action = line.replace(“Action:”,“”).strip()elif line.startswith(“Action Input:”): action_input = line.replace(“Action Input:”,“”).strip()return thought, action, action_input
3.2 工具调用系统
from typing import Callable, Dict, Any, List import inspect from pydantic import BaseModel, Field classTool(BaseModel):“”“工具基类”“” name:str= Field(description=“工具名称”) description:str= Field(description=“工具功能描述”) parameters: Dict[str, Any]= Field(default_factory=dict, description=“参数schema”) function: Callable = Field(description=“工具执行函数”)classConfig: arbitrary_types_allowed =Trueasyncdefexecute(self,kwargs)-> Any:“”“执行工具”““returnawait self.function(kwargs)defto_openai_schema(self)-> Dict:”““转换为OpenAI函数调用格式”““return{“type”:“function”,“function”:{“name”: self.name,“description”: self.description,“parameters”: self.parameters }}deftool(name:str=None, description:str=None):“”“工具装饰器”““defdecorator(func: Callable)-> Tool:# 提取函数签名 sig = inspect.signature(func) parameters ={}for param_name, param in sig.parameters.items(): param_type = param.annotation if param.annotation != inspect.Parameter.empty else”string” parameters[param_name]=“}return Tool( name=name or func.name, description=description or func.doc or”“, parameters=, function=func )return decorator # 工具使用示例@tool(name=“search_web”, description=“搜索网络信息”)asyncdefsearch_web(query:str, num_results:int=5):“”“搜索网络信息 Args: query: 搜索关键词 num_results: 返回结果数量 “”“# 实际实现调用搜索APIreturnf”找到 {num_results} 条关于 ‘{query}’ 的结果”@tool(name=“get_weather”, description=“获取天气信息”)asyncdefget_weather(location:str):“”“获取指定地点的天气信息 Args: location: 城市名称 “”“# 实际实现调用天气APIreturnf”{location} 今天晴,温度25°C”classToolRegistry:“”“工具注册中心”““definit(self): self._tools: Dict[str, Tool]={}defregister(self, tool: Tool):”““注册工具”“” self._tools[tool.name]= tool defget_tool(self, name:str)-> Tool:“”“获取工具”““return self._tools.get(name)defget_tool_names(self)-> List[str]:”““获取所有工具名称”““returnlist(self._tools.keys())defget_tool_descriptions(self)->str:”““获取工具描述文本”“” descriptions =[]for tool in self._tools.values(): descriptions.append(f”- {tool.name}: {tool.description}“)return” “.join(descriptions)defget_tool_schemas(self)-> List[Dict]:”““获取OpenAI格式的工具schema”““return[tool.to_openai_schema()for tool in self._tools.values()]
3.3 任务规划器
classTaskPlanner:”““任务分解与规划器”““definit(self, llm_client: Any): self.llm = llm_client asyncdefplan(self, goal:str)-> List[Dict]:”““将目标分解为子任务列表 Returns: [ {“task”: “任务描述”, “order”: 1, “dependencies”: []}, {“task”: “任务描述”, “order”: 2, “dependencies”: [1]}, … ] “”” prompt =f”““将以下目标分解为具体的、可执行的子任务列表。 目标:{goal} 请按以下格式输出: 1. [任务描述] 2. [任务描述] … 要求: - 每个任务应该独立且可执行 - 任务之间应该有逻辑顺序 - 尽量细化到可以直接执行”“” response =await self.llm.generate(prompt)# 解析任务列表 tasks =[]for line in response.strip().split(’ ‘):if line.strip():# 提取任务描述 task_desc = line.split(’.‘,1)[1].strip()if’.‘in line else line.strip() tasks.append({“task”: task_desc,“order”:len(tasks)+1,“status”:“pending”})return tasks asyncdefexecute_plan(self, agent: BaseAgent, tasks: List[Dict])-> Dict:“”“执行任务计划 Returns: { “success”: bool, “completed_tasks”: List[Dict], “failed_tasks”: List[Dict], “final_result”: Any } “”” completed =[] failed =[]for task in tasks:print(f” 执行任务 {task[‘order’]}: {task[‘task’]}“)try:# 使用Agent执行单个任务 result =await agent.run(task[‘task’]) task[‘status’]=‘completed’ task[‘result’]= result completed.append(task)except Exception as e: task[‘status’]=‘failed’ task[‘error’]=str(e) failed.append(task)return
4.1 场景分析
智能客服是AI Agent最典型的应用场景。我们来实现一个政务大厅智能客服,具备:
- 政策问答
- 办事流程引导
- 工单生成
- 人工转接
政策咨询
办事指引
投诉建议
复杂问题
用户咨询
意图识别
RAG检索知识库
流程引导Agent
工单生成
人工转接
生成回复
排队等待
4.2 完整实现
import asyncio from typing import Optional classCustomerServiceAgent(ReActAgent):”““智能客服Agent”““definit(self, knowledge_base, ticket_system,*args,kwargs):super().init(*args,kwargs) self.knowledge_base = knowledge_base self.ticket_system = ticket_system # 注册客服专用工具 self._register_customer_service_tools()def_register_customer_service_tools(self):”““注册客服工具”“”@self.tools.register@tool(name=“search_policy”, description=“搜索政策信息”)asyncdefsearch_policy(query:str):“”“从知识库搜索相关政策 Args: query: 政策关键词 “”” results =await self.knowledge_base.search(query, top_k=3)return” “.join([r[‘content’]for r in results])@self.tools.register@tool(name=“get_process_guide”, description=“获取办事流程”)asyncdefget_process_guide(service_type:str):“”“获取指定业务的办事流程 Args: service_type: 业务类型(如:身份证办理、社保卡申领) “”” guide =await self.knowledge_base.get_guide(service_type)return guide @self.tools.register@tool(name=“create_ticket”, description=“创建工单”)asyncdefcreate_ticket( category:str, description:str, priority:str=“normal”):“”“创建服务工单 Args: category: 工单类别 description: 问题描述 priority: 优先级(low/normal/high) “”” ticket_id =await self.ticket_system.create( category=category, description=description, priority=priority )returnf”工单已创建,编号:{ticket_id},我们将在1个工作日内处理”@self.tools.register@tool(name=“transfer_to_human”, description=“转人工客服”)asyncdeftransfer_to_human(reason:str):“”“转接到人工客服 Args: reason: 转接原因 “”” queue_number =await self.ticket_system.human_transfer(reason)returnf”已为您转接人工客服,当前排队人数:{queue_number}人,预计等待时间:{queue_number *2}分钟”asyncdefhandle_customer_query(self, user_input:str)->str:“”“处理客户咨询”“”# 意图识别 intent =await self._detect_intent(user_input)# 根据意图调整系统提示 system_prompt = self._get_system_prompt(intent)# 执行returnawait self.run(user_input)asyncdef_detect_intent(self, user_input:str)->str:“”“意图识别”“” intent_prompt =f”““分类以下用户咨询的意图类型: 用户输入:{user_input} 意图类型: 1. policy_inquiry - 政策咨询 2. process_guide - 办事流程咨询 3. complaint - 投诉建议 4. complex - 复杂问题需人工 只返回意图类型代码:”“” response =await self.llm.generate(intent_prompt)return response.strip()def_get_system_prompt(self, intent:str)->str:“”“根据意图获取系统提示”“” prompts ={“policy_inquiry”:“你是政策咨询专员,请准确引用政策文件内容…”,“process_guide”:“你是办事引导员,请给出清晰的办事步骤…”,“complaint”:“你是投诉处理专员,请先安抚情绪,再记录问题…”,“complex”:“你是客服助理,对于复杂问题,请主动建议转人工…”}return prompts.get(intent,“你是智能客服助手…”)# 使用示例asyncdefmain():from openai import AsyncOpenAI # 初始化 llm_client = AsyncOpenAI(api_key=“your-api-key”) knowledge_base = MockKnowledgeBase()# 模拟知识库 ticket_system = MockTicketSystem()# 模拟工单系统 agent = CustomerServiceAgent( llm_client=llm_client, memory_manager=HybridMemory( short_term=ShortTermMemory(), long_term=LongTermMemory()), tool_registry=ToolRegistry(), knowledge_base=knowledge_base, ticket_system=ticket_system )# 处理咨询 response =await agent.handle_customer_query(“我想办理社保卡,需要准备什么材料?”)print(response)if name ==”main”: asyncio.run(main())
4.3 性能对比
指标 传统规则客服 基础Chatbot 智能Agent
问题解决率 35% 60% 85%
平均响应时间 5分钟 2秒 3秒
多轮对话能力 ❌ ⚠️ ✅
工具调用能力 ❌ ❌ ✅
学习进化能力 ❌ ⚠️ ✅
运营成本 高 低 中
5.1 成本分析
AI Agent的主要成本来源:
45%25%12%10%8%Agent月度成本构成(万次调用)LLM Token消耗向量数据库Redis缓存API调用其他
5.2 优化策略
优化项 策略 预期节省
Prompt优化 精简系统提示词 20-30%
模型选择 混合使用GPT-4/GPT-3.5 40-50%
缓存策略 重复问题命中缓存 30-40%
Token限制 动态裁剪上下文 15-20%
批量处理 合并多个请求 10-15%
智能缓存实现:
import hashlib from functools import wraps defsmart_cache(ttl:int=3600):“”“智能缓存装饰器”“” cache ={}defdecorator(func):@wraps(func)asyncdefwrapper(*args,kwargs):# 生成缓存键 key = hashlib.md5(f”{func.name}{args}{kwargs}“.encode()).hexdigest()# 检查缓存if key in cache: cache_data = cache[key]if time.time()- cache_data[‘timestamp’]< ttl:print(“缓存命中!”)return cache_data[‘result’]# 执行函数 result =await func(*args,kwargs)# 存储缓存 cache[key]={‘result’: result,‘timestamp’: time.time()}return result return wrapper return decorator # 使用示例classOptimizedAgent(BaseAgent):@smart_cache(ttl=1800)asyncdef_llm_inference(self, prompt:str):“”“带缓存的LLM推理”““returnawaitsuper()._llm_inference(prompt)
项目地址: https://github.com/your-repo/agent-framework
Star支持: 如果这个项目对你有帮助,请给个⭐
快速开始
# 克隆项目git clone https://github.com/your-repo/agent-framework.git # 安装依赖 pip install-r requirements.txt # 配置环境变量cp .env.example .env # 编辑 .env 填入你的API密钥# 运行示例 python examples/customer_service.py
项目结构
agent-framework/ ├── src/ │ ├── agent/ │ │ ├── base.py # Agent基类 │ │ ├── react.py # ReAct实现 │ │ └── planner.py # 任务规划器 │ ├── memory/ │ │ ├── short_term.py # 短期记忆 │ │ ├── long_term.py # 长期记忆 │ │ └── hybrid.py # 混合记忆 │ ├── tools/ │ │ ├── base.py # 工具基类 │ │ ├── registry.py # 工具注册 │ │ └── builtin/ │ │ ├── search.py │ │ ├── weather.py │ │ └── calculator.py │ └── utils/ │ ├── cache.py # 缓存工具 │ ├── logger.py # 日志工具 │ └── retry.py # 重试机制 ├── examples/ │ ├── customer_service.py # 智能客服示例 │ ├── research_agent.py # 研究助手示例 │ └── code_agent.py # 代码助手示例 ├── tests/ │ ├── test_agent.py │ ├── test_memory.py │ └── test_tools.py ├── docs/ │ ├── API.md │ ├── ARCHITECTURE.md │ └── TUTORIAL.md ├── requirements.txt ├── setup.py └── README.md
AI Agent正在从”玩具”向”生产力工具”转变。2026年,掌握Agent开发将成为AI工程师的核心竞争力。
关键要点
- 记忆是Agent的核心竞争力 - 混合记忆架构(短期+长期)是**实践
- 工具调用决定Agent能力边界 - 丰富的工具生态 = 更强的Agent能力
- 成本控制是生产化关键 - 智能缓存+模型混合可节省50%+成本
- 评估体系必不可少 - 建立完善的监控和评估体系
下一步学习
- 📖 深入学习 LangChain / LangGraph
- 🔬 研究多Agent协作模式
- 🚀 探索 Agent + RAG **实践
- 📊 建立Agent评估体系
- OpenAI Function Calling文档
- LangChain Agent文档
- AutoGen论文
- AgentScope框架
✍️ 坚持用清晰易懂的图解+可落地的代码,让每个知识点都简单直观!💡 座右铭:“道路是曲折的,前途是光明的!”
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/258230.html