AI Agent的构成
for key, value in knowledge_base.items(): if key in query.lower(): return value returnf"未找到与'{query}'相关的信息"
AGENT_PROMPT = “”"你是一个智能助手,能够帮助用户完成各种任务。
工具名称列表: {tool_names}
请按照以下格式回答问题:
重要指导方针:
- 仔细分析用户的问题,选择合适的工具
- 如果问题可以直接回答,无需使用工具
- 回答要准确、简洁、有帮助
- 如果不确定,诚实地说明
开始!
# 1. 初始化模型(Model) # 这里使用OpenAI的GPT模型,你也可以替换为其他LLM llm = ChatOpenAI( model="gpt-3.5-turbo", # 或 "gpt-4" temperature=0, # 设为0使输出更确定性 # api_key=os.getenv("OPENAI_API_KEY"), # 从环境变量获取 ) # 2. 准备工具列表(Tools) tools = [calculator, get_current_weather, search_knowledge] # 3. 创建Prompt模板(Instructions) prompt = PromptTemplate.from_template(AGENT_PROMPT) # 4. 创建ReAct Agent agent = create_react_agent( llm=llm, tools=tools, prompt=prompt ) # 5. 创建Agent执行器 agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=True, # 打印详细的执行过程 handle_parsing_errors=True, # 处理解析错误 max_iterations=5, # 最大迭代次数,防止无限循环 ) return agent_executor
print("=" * 60) print("LangChain Agent 示例") print("=" * 60) # 创建Agent agent = create_simple_agent() # 测试用例 test_questions = [ "计算 (15 + 25) * 3 等于多少?", "北京今天天气怎么样?", "什么是LangChain?", "帮我计算一下,如果北京温度是5度,上海是12度,它们的平均温度是多少?", ] for i, question in enumerate(test_questions, 1): print(f" {'='*60}") print(f"测试 {i}: {question}") print("=" * 60) try: result = agent.invoke({"input": question}) print(f" 最终答案: {result['output']}") except Exception as e: print(f"执行出错: {str(e)}")
if name == “main”:
main()
logger = logging.getLogger(name)
logging.basicConfig(format=“[%(levelname)s]: %(message)s”, level=logging.INFO)
mcp = FastMCP(“Currency MCP Server ”)
Args: currency_from: The currency to convert from (e.g., "USD"). currency_to: The currency to convert to (e.g., "EUR"). currency_date: The date for the exchange rate or "latest". Defaults to "latest". Returns: A dictionary containing the exchange rate data, or an error message if the request fails. """ logger.info(f"--- ️ Tool: get_exchange_rate called for converting {currency_from} to {currency_to} ---") try: response = httpx.get( f'https://api.frankfurter.app/{currency_date}', params={'from': currency_from, 'to': currency_to}, ) response.raise_for_status() data = response.json() if'rates'notin data: return {'error': 'Invalid API response format.'} logger.info(f'✅ API response: {data}') return data except httpx.HTTPError as e: return {'error': f'API request failed: {e}'} except ValueError: return {'error': 'Invalid JSON response from API.'}
if name == “main”:
logger.info(f" MCP server started on port ")
asyncio.run(
mcp.run_async(
transport=“http”,
host=“0.0.0.0”,
port=os.getenv(“PORT”, 8080),
)
)
A2A的工作原理:
import click
load_dotenv()
agent_host_url = ( os.getenv('HOST_OVERRIDE') if os.getenv('HOST_OVERRIDE') elsef'http://{host}:{port}/' ) agent_card = AgentCard( name='Image Generator Agent', description=( 'Generate stunning, high-quality images on demand and leverage' ' powerful editing capabilities to modify, enhance, or completely' ' transform visuals.' ), url=agent_host_url, version='1.0.0', default_input_modes=ImageGenerationAgent.SUPPORTED_CONTENT_TYPES, default_output_modes=ImageGenerationAgent.SUPPORTED_CONTENT_TYPES, capabilities=capabilities, skills=[skill], ) request_handler = DefaultRequestHandler( agent_executor=ImageGenerationAgentExecutor(), task_store=InMemoryTaskStore(), ) server = A2AStarletteApplication( agent_card=agent_card, http_handler=request_handler ) import uvicorn uvicorn.run(server.build(), host=host, port=port) except MissingAPIKeyError as e: logger.error(f'Error: {e}') exit(1) except Exception as e: logger.error(f'An error occurred during server startup: {e}') exit(1)
if name == ‘main’:
main()
tools = [search_information]
agent = create_tool_calling_agent(llm, tools, agent_prompt) agent_executor = AgentExecutor(agent=agent, verbose=True, tools=tools)
llm = ChatOpenAI(model=“gpt-5”, temperature=0)
def _create_executor(self) -> AgentExecutor:
prompt = ChatPromptTemplate.from_messages([ ("system", f"你是{self.name},职责是{self.role}。专注完成分配的任务,返回简洁结果。"), ("human", "{input}"), ("placeholder", "{agent_scratchpad}"), ]) agent = create_tool_calling_agent(llm, self.tools, prompt) return AgentExecutor(agent=agent, tools=self.tools, verbose=True)
asyncdef execute(self, task: str) -> Dict[str, Any]:
"""异步执行任务""" print(f"
{‘=’*50}“)
print(f"🤖 子智能体 [{self.name}] 开始执行任务: {task}") print(f"{'='*50}") result = await self.agent_executor.ainvoke({"input": task}) return {"agent": self.name, "task": task, "result": result["output"]}
def init(self):
self.sub_agents = self._create_sub_agents() self.memory = [] # 用于持久化计划和中间结果
def _create_sub_agents(self) -> Dict[str, SubAgent]:
"""创建专用子智能体""" return { "researcher": SubAgent( name="研究员智能体", role="负责网络搜索和信息收集", tools=[search_web] ), "data_analyst": SubAgent( name="数据分析智能体", role="负责数据库查询和数据分析", tools=[search_database, analyze_data] ), "reporter": SubAgent( name="报告智能体", role="负责整合信息并生成最终报告", tools=[generate_report] ), }
asyncdef _decompose_task(self, query: str) -> List[Dict[str, str]]:
"""使用LLM分解任务""" print(f"
🧠 主智能体正在分析任务: {query}”)
# 使用LLM进行任务分解 response = await llm.ainvoke([ SystemMessage(content="""你是一个任务分解专家。将用户查询分解为子任务。
# 解析任务分解结果
tasks = [] for line in response.content.strip().split("
”):
if"|"in line: agent_type, task_desc = line.split("|", 1) agent_type = agent_type.strip().lower() if agent_type in self.sub_agents: tasks.append({"agent": agent_type, "task": task_desc.strip()}) # 默认任务流程(如果LLM分解失败) ifnot tasks: tasks = [ {"agent": "researcher", "task": f"搜索关于'{query}'的信息"}, {"agent": "data_analyst", "task": f"分析'{query}'相关数据"}, {"agent": "reporter", "task": "整合以上结果生成报告"}, ] # 保存计划到内存 self.memory.append({"type": "plan", "tasks": tasks}) print(f"📋 任务分解完成,共{len(tasks)}个子任务") return tasks
asyncdef _execute_parallel_tasks(self, tasks: List[Dict]) -> List[Dict]:
"""并行执行子任务""" print(f"
🚀 开始并行执行 {len(tasks)} 个子任务…“)
# 创建并行任务 coroutines = [] for task_info in tasks: agent = self.sub_agents[task_info["agent"]] coroutines.append(agent.execute(task_info["task"])) # 并行执行所有子智能体任务 results = await asyncio.gather(*coroutines, return_exceptions=True) # 处理结果 valid_results = [] for r in results: if isinstance(r, Exception): print(f"⚠️ 子任务执行失败: {r}") else: valid_results.append(r) self.memory.append({"type": "result", "data": r}) return valid_results
asyncdef _synthesize_results(self, results: List[Dict]) -> str:
"""综合所有子智能体结果""" print(f"
🔄 主智能体正在综合 {len(results)} 个结果…”)
# 构建综合提示 results_text = "
“.join([
f"- {r['agent']}: {r['result']}"for r in results ]) response = await llm.ainvoke([ SystemMessage(content="你是一个信息综合专家,负责整合多个智能体的执行结果,生成完整、连贯的最终回答。"), HumanMessage(content=f"请综合以下各智能体的执行结果:
{results_text}”)
]) return response.content
asyncdef run(self, query: str) -> str:
"""运行多智能体系统""" print(f"
{‘=’*60}“)
print(f"🎯 多智能体系统启动") print(f"📝 用户查询: {query}") print(f"{'='*60}") # 1. 任务分解 tasks = await self._decompose_task(query) # 2. 并行执行子任务 results = await self._execute_parallel_tasks(tasks) # 3. 综合结果 final_answer = await self._synthesize_results(results) print(f"
{‘=’*60}”)
print(f"✅ 多智能体系统执行完成") print(f"{'='*60}") return final_answer
# 测试查询 query = “分析2025年人工智能行业的发展趋势,并给出投资建议”
result = await master.run(query)
print(f“ {‘=’*60}”) print(“📊 最终报告”) print(f“{‘=’*60}”) print(result)
if name == “main”:
asyncio.run(main())
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/276178.html