oracle数据库做的网站,上海公司网站建设以子,劳务公司注册流程和费用,wordpress双域名协调的艺术#xff1a;Pipeline 与消息系统核心解析
请关注公众号【碳硅化合物AI】
摘要
多智能体系统的核心是协调。AgentScope 通过 Pipeline 和消息系统实现了优雅的多智能体编排。本文将深入分析 MsgHub、Pipeline 模式以及消息系统的设计。你会发现#xff0c;消息Pipeline 与消息系统核心解析请关注公众号【碳硅化合物AI】摘要多智能体系统的核心是协调。AgentScope 通过 Pipeline 和消息系统实现了优雅的多智能体编排。本文将深入分析 MsgHub、Pipeline 模式以及消息系统的设计。你会发现消息Msg不仅是数据载体更是整个框架的统一接口MsgHub 通过订阅机制实现了自动消息广播Pipeline 提供了多种编排模式让多智能体协作变得简单而强大。通过阅读本文你会理解多智能体如何通过消息进行通信如何通过 Pipeline 进行编排以及这些机制背后的设计考量。入口类与类关系消息系统的类层次消息系统非常简单但设计精妙Pipeline 系统的类层次Pipeline 提供了多种编排模式关键代码Msg 数据结构Msg 是框架的核心数据结构class Msg: The message class in agentscope. def __init__( self, name: str, content: str | Sequence[ContentBlock], role: Literal[user, assistant, system], metadata: dict[str, JSONSerializableObject] | None None, timestamp: str | None None, invocation_id: str | None None, ) - None: self.name name self.content content self.role role self.metadata metadata self.id shortuuid.uuid() self.timestamp timestamp or datetime.now().strftime(...)这个设计非常巧妙统一接口所有智能体间通信都使用 Msg多模态支持content 可以是字符串或 ContentBlock 列表元数据支持metadata 字段可以存储结构化输出等额外信息自动标识每个消息都有唯一的 id 和 timestamp关键流程分析消息创建和传递流程消息在智能体间的传递非常直接多智能体对话流程使用 MsgHub 的多智能体对话流程Pipeline 执行流程顺序 Pipeline 的执行流程关键技术点1. 消息作为统一数据结构的设计Msg 类在 AgentScope 中扮演着核心角色它是智能体间通信的媒介所有智能体都通过 Msg 交换信息与 LLM API 的桥梁Formatter 将 Msg 转换为 API 所需格式记忆存储的单元Memory 存储的是 Msg 对象列表UI 显示的数据源前端可以直接显示 Msg 对象这种统一设计避免了数据格式转换的复杂性。无论消息来自哪里、要去哪里都是同一个数据结构。2. 多模态消息块系统Msg 支持多模态内容通过 ContentBlock 系统实现class TextBlock(TypedDict, totalFalse): type: Required[Literal[text]] text: str class ToolUseBlock(TypedDict, totalFalse): type: Required[Literal[tool_use]] id: Required[str] name: Required[str] input: Required[dict[str, object]] class ToolResultBlock(TypedDict, totalFalse): type: Required[Literal[tool_result]] id: Required[str] name: Required[str] output: Required[str | List[ContentBlock]] class ImageBlock(TypedDict, totalFalse): type: Required[Literal[image]] source: Required[Base64Source | URLSource] class AudioBlock(TypedDict, totalFalse): type: Required[Literal[audio]] source: Required[Base64Source | URLSource] ContentBlock ( ToolUseBlock | ToolResultBlock | TextBlock | ThinkingBlock | ImageBlock | AudioBlock | VideoBlock )这种设计让消息可以包含文本内容工具调用和结果图像、音频、视频等多媒体内容思考过程thinking block所有内容都统一在一个消息对象中非常灵活。3. 消息中心MsgHub的设计MsgHub 通过订阅机制实现自动消息广播。关键代码def _reset_subscriber(self) - None: Reset the subscriber for agent in self.participant if self.enable_auto_broadcast: for agent in self.participants: agent.reset_subscribers(self.name, self.participants)当智能体在 MsgHub 中回复时消息会自动广播给其他参与者。这个机制通过 AgentBase 的订阅系统实现# 在 AgentBase 中defreset_subscribers(self,hub_name:str,subscribers:list[AgentBase])-None:设置订阅者当智能体回复时消息会自动发送给订阅者self._subscribers[hub_name]subscribers这种设计让多智能体对话变得非常简单asyncwithMsgHub([agent1,agent2,agent3]):awaitagent1()# agent2 和 agent3 自动收到消息awaitagent2()# agent1 和 agent3 自动收到消息awaitagent3()# agent1 和 agent2 自动收到消息4. 不同 Pipeline 模式的实现AgentScope 提供了多种 Pipeline 模式顺序 PipelineSequentialPipelineasync def sequential_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None None, ) - Msg | list[Msg] | None: 执行智能体序列前一个的输出作为下一个的输入 for agent in agents: msg await agent(msg) return msg扇出 PipelineFanoutPipelineasync def fanout_pipeline( agents: list[AgentBase], msg: Msg | list[Msg] | None None, enable_gather: bool True, ) - list[Msg]: 将同一个输入分发给多个智能体 if enable_gather: # 并发执行 tasks [asyncio.create_task(agent(deepcopy(msg))) for agent in agents] return await asyncio.gather(*tasks) else: # 顺序执行 return [await agent(deepcopy(msg)) for agent in agents]这些 Pipeline 模式让多智能体编排变得非常灵活。你可以顺序执行前一个智能体的输出作为下一个的输入适合流水线场景并发执行多个智能体同时处理同一个输入适合并行分析场景动态编排在 MsgHub 中动态添加或删除参与者5. 消息的序列化和反序列化Msg 支持完整的序列化def to_dict(self) - dict: Convert the message into JSON dict data. return { id: self.id, name: self.name, role: self.role, content: self.content, metadata: self.metadata, timestamp: self.timestamp, } classmethod def from_dict(cls, json_data: dict) - Msg: Load a message object from the given JSON data. new_obj cls( namejson_data[name], contentjson_data[content], rolejson_data[role], metadatajson_data.get(metadata, None), timestampjson_data.get(timestamp, None), invocation_idjson_data.get(invocation_id, None), ) new_obj.id json_data.get(id, new_obj.id) return new_obj这种设计让消息可以保存到文件或数据库通过网络传输在不同进程间共享用于调试和日志记录总结Pipeline 和消息系统是 AgentScope 框架中实现多智能体协调的核心消息系统通过统一的 Msg 数据结构实现了智能体间通信、API 交互、记忆存储的统一接口MsgHub通过订阅机制实现了自动消息广播让多智能体对话变得简单优雅Pipeline提供了多种编排模式支持顺序、并发、动态等多种场景这些系统的设计都体现了 AgentScope 的核心理念透明、模块化、易用。在下一篇文章中我们会分析扩展机制的实现了解如何为框架添加新功能。