编排代理:例程和交接

2024 年 10 月 10 日
在 Github 中打开

当使用语言模型时,通常情况下,获得出色性能所需的只是一个好的提示和合适的工具。然而,当处理许多独特的流程时,事情可能会变得棘手。这本食谱将引导您了解解决此问题的一种方法。

我们将介绍例程交接的概念,然后逐步介绍实现过程,并展示如何以简单、强大且可控的方式使用它们来编排多个代理。

最后,我们提供了一个示例仓库 Swarm,它实现了这些想法以及示例。

让我们从设置导入开始。

from openai import OpenAI
from pydantic import BaseModel
from typing import Optional
import json


client = OpenAI()

例程

“例程”的概念没有严格定义,而是旨在捕捉一系列步骤的想法。具体而言,让我们将例程定义为自然语言(我们将用系统提示表示)中的一系列指令,以及完成这些指令所需的工具。

让我们看一个例子。下面,我们为客户服务代理定义了一个例程,指示它对用户问题进行分类,然后建议修复或提供退款。我们还定义了必要的函数 execute_refundlook_up_item。我们可以称之为客户服务例程、代理、助手等——但其核心思想是相同的:一系列步骤和执行这些步骤的工具。

# Customer Service Routine

system_message = (
    "You are a customer support agent for ACME Inc."
    "Always answer in a sentence or less."
    "Follow the following routine with the user:"
    "1. First, ask probing questions and understand the user's problem deeper.\n"
    " - unless the user has already provided a reason.\n"
    "2. Propose a fix (make one up).\n"
    "3. ONLY if not satesfied, offer a refund.\n"
    "4. If accepted, search for the ID and then execute refund."
    ""
)

def look_up_item(search_query):
    """Use to find item ID.
    Search query can be a description or keywords."""

    # return hard-coded item ID - in reality would be a lookup
    return "item_132612938"


def execute_refund(item_id, reason="not provided"):

    print("Summary:", item_id, reason) # lazy summary
    return "success"

例程的主要优势在于其简单性和稳健性。请注意,这些指令包含条件语句,很像状态机或代码中的分支。对于中小型例程,LLM 实际上可以非常稳健地处理这些情况,并且具有“软”依从性的额外好处——LLM 可以自然地引导对话,而不会陷入死胡同。

执行例程

要执行例程,让我们实现一个简单的循环,该循环执行以下操作:

  1. 获取用户输入。
  2. 将用户消息附加到 messages
  3. 调用模型。
  4. 将模型响应附加到 messages
def run_full_turn(system_message, messages):
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "system", "content": system_message}] + messages,
    )
    message = response.choices[0].message
    messages.append(message)

    if message.content: print("Assistant:", message.content)

    return message


messages = []
while True:
    user = input("User: ")
    messages.append({"role": "user", "content": user})

    run_full_turn(system_message, messages)

正如您所看到的,这目前忽略了函数调用,因此让我们添加它。

模型要求函数格式化为函数模式。为了方便起见,我们可以定义一个辅助函数,将 python 函数转换为相应的函数模式。

import inspect

def function_to_schema(func) -> dict:
    type_map = {
        str: "string",
        int: "integer",
        float: "number",
        bool: "boolean",
        list: "array",
        dict: "object",
        type(None): "null",
    }

    try:
        signature = inspect.signature(func)
    except ValueError as e:
        raise ValueError(
            f"Failed to get signature for function {func.__name__}: {str(e)}"
        )

    parameters = {}
    for param in signature.parameters.values():
        try:
            param_type = type_map.get(param.annotation, "string")
        except KeyError as e:
            raise KeyError(
                f"Unknown type annotation {param.annotation} for parameter {param.name}: {str(e)}"
            )
        parameters[param.name] = {"type": param_type}

    required = [
        param.name
        for param in signature.parameters.values()
        if param.default == inspect._empty
    ]

    return {
        "type": "function",
        "function": {
            "name": func.__name__,
            "description": (func.__doc__ or "").strip(),
            "parameters": {
                "type": "object",
                "properties": parameters,
                "required": required,
            },
        },
    }

例如

def sample_function(param_1, param_2, the_third_one: int, some_optional="John Doe"):
    """
    This is my docstring. Call this function when you want.
    """
    print("Hello, world")

schema =  function_to_schema(sample_function)
print(json.dumps(schema, indent=2))
{
  "type": "function",
  "function": {
    "name": "sample_function",
    "description": "This is my docstring. Call this function when you want.",
    "parameters": {
      "type": "object",
      "properties": {
        "param_1": {
          "type": "string"
        },
        "param_2": {
          "type": "string"
        },
        "the_third_one": {
          "type": "integer"
        },
        "some_optional": {
          "type": "string"
        }
      },
      "required": [
        "param_1",
        "param_2",
        "the_third_one"
      ]
    }
  }
}

现在,我们可以使用此函数在调用模型时将工具传递给模型。

messages = []

tools = [execute_refund, look_up_item]
tool_schemas = [function_to_schema(tool) for tool in tools]

response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "Look up the black boot."}],
            tools=tool_schemas,
        )
message = response.choices[0].message

message.tool_calls[0].function
Function(arguments='{"search_query":"black boot"}', name='look_up_item')

最后,当模型调用工具时,我们需要执行相应的函数并将结果提供回模型。

我们可以通过将工具名称映射到 tool_map 中的 python 函数来做到这一点,然后在 execute_tool_call 中查找并调用它。最后,我们将结果添加到对话中。

tools_map = {tool.__name__: tool for tool in tools}

def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # call corresponding function with provided arguments
    return tools_map[name](**args)

for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            # add result back to conversation 
            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)
Assistant: look_up_item({'search_query': 'black boot'})

在实践中,我们还需要让模型使用结果来生成另一个响应。该响应可能包含工具调用,因此我们可以只在一个循环中运行它,直到没有更多的工具调用为止。

如果我们将所有内容放在一起,它看起来会像这样

tools = [execute_refund, look_up_item]


def run_full_turn(system_message, tools, messages):

    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # turn python functions into tools and save a reverse map
        tool_schemas = [function_to_schema(tool) for tool in tools]
        tools_map = {tool.__name__: tool for tool in tools}

        # === 1. get openai completion ===
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": system_message}] + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print assistant response
            print("Assistant:", message.content)

        if not message.tool_calls:  # if finished handling tool calls, break
            break

        # === 2. handle tool calls ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. return new messages =====
    return messages[num_init_messages:]


def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # call corresponding function with provided arguments
    return tools_map[name](**args)


messages = []
while True:
    user = input("User: ")
    messages.append({"role": "user", "content": user})

    new_messages = run_full_turn(system_message, tools, messages)
    messages.extend(new_messages)

现在我们有了一个例程,假设我们想要添加更多步骤和更多工具。我们可以在一定程度上做到这一点,但最终,如果我们尝试使用太多不同的任务来扩展例程,它可能会开始变得吃力。这就是我们可以利用多个例程概念的地方——给定用户请求,我们可以加载正确的例程,其中包含适当的步骤和工具来解决它。

动态交换系统指令和工具似乎令人望而生畏。但是,如果我们将“例程”视为“代理”,那么交接的概念使我们能够简单地表示这些交换——就像一个代理将对话移交给另一个代理一样。

交接

让我们将交接定义为一个代理(或例程)将正在进行的对话移交给另一个代理,很像您在电话中被转移到其他人一样。不同的是,在这种情况下,代理完全了解您之前的对话!

要了解交接的实际应用,让我们首先为一个代理定义一个基本类。

class Agent(BaseModel):
    name: str = "Agent"
    model: str = "gpt-4o-mini"
    instructions: str = "You are a helpful Agent"
    tools: list = []

现在为了使我们的代码支持它,我们可以更改 run_full_turn 以采用 Agent 而不是单独的 system_messagetools

def run_full_turn(agent, messages):

    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # turn python functions into tools and save a reverse map
        tool_schemas = [function_to_schema(tool) for tool in agent.tools]
        tools_map = {tool.__name__: tool for tool in agent.tools}

        # === 1. get openai completion ===
        response = client.chat.completions.create(
            model=agent.model,
            messages=[{"role": "system", "content": agent.instructions}] + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print assistant response
            print("Assistant:", message.content)

        if not message.tool_calls:  # if finished handling tool calls, break
            break

        # === 2. handle tool calls ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. return new messages =====
    return messages[num_init_messages:]


def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # call corresponding function with provided arguments
    return tools_map[name](**args)

我们现在可以轻松运行多个代理

def execute_refund(item_name):
    return "success"

refund_agent = Agent(
    name="Refund Agent",
    instructions="You are a refund agent. Help the user with refunds.",
    tools=[execute_refund],
)

def place_order(item_name):
    return "success"

sales_assistant = Agent(
    name="Sales Assistant",
    instructions="You are a sales assistant. Sell the user a product.",
    tools=[place_order],
)


messages = []
user_query = "Place an order for a black boot."
print("User:", user_query)
messages.append({"role": "user", "content": user_query})

response = run_full_turn(sales_assistant, messages) # sales assistant
messages.extend(response)


user_query = "Actually, I want a refund." # implicitly refers to the last item
print("User:", user_query)
messages.append({"role": "user", "content": user_query})
response = run_full_turn(refund_agent, messages) # refund agent
User: Place an order for a black boot.
Assistant: place_order({'item_name': 'black boot'})
Assistant: Your order for a black boot has been successfully placed! If you need anything else, feel free to ask!
User: Actually, I want a refund.
Assistant: execute_refund({'item_name': 'black boot'})
Assistant: Your refund for the black boot has been successfully processed. If you need further assistance, just let me know!

太棒了!但是我们在这里手动进行了交接——我们希望代理本身来决定何时执行交接。一种简单但出奇有效的方法是给他们一个 transfer_to_XXX 函数,其中 XXX 是某个代理。模型足够聪明,知道何时调用此函数以进行交接!

交接函数

现在代理可以表达进行交接的意图,我们必须使其真正发生。有很多方法可以做到这一点,但有一种特别简洁的方法。

对于我们到目前为止定义的代理函数,例如 execute_refundplace_order,它们返回一个字符串,该字符串将提供给模型。如果相反,我们返回一个 Agent 对象来指示我们想要转移到哪个代理,会怎么样?像这样

refund_agent = Agent(
    name="Refund Agent",
    instructions="You are a refund agent. Help the user with refunds.",
    tools=[execute_refund],
)

def transfer_to_refunds():
    return refund_agent

sales_assistant = Agent(
    name="Sales Assistant",
    instructions="You are a sales assistant. Sell the user a product.",
    tools=[place_order],
)

然后,我们可以更新我们的代码以检查函数响应的返回类型,如果它是 Agent,则更新正在使用的代理!此外,现在 run_full_turn 将需要返回正在使用的最新代理,以防发生交接。(我们可以在 Response 类中执行此操作以保持整洁。)

class Response(BaseModel):
    agent: Optional[Agent]
    messages: list

现在是更新后的 run_full_turn

def run_full_turn(agent, messages):

    current_agent = agent
    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # turn python functions into tools and save a reverse map
        tool_schemas = [function_to_schema(tool) for tool in current_agent.tools]
        tools = {tool.__name__: tool for tool in current_agent.tools}

        # === 1. get openai completion ===
        response = client.chat.completions.create(
            model=agent.model,
            messages=[{"role": "system", "content": current_agent.instructions}]
            + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # print agent response
            print(f"{current_agent.name}:", message.content)

        if not message.tool_calls:  # if finished handling tool calls, break
            break

        # === 2. handle tool calls ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools, current_agent.name)

            if type(result) is Agent:  # if agent transfer, update current agent
                current_agent = result
                result = (
                    f"Transfered to {current_agent.name}. Adopt persona immediately."
                )

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. return last agent used and new messages =====
    return Response(agent=current_agent, messages=messages[num_init_messages:])


def execute_tool_call(tool_call, tools, agent_name):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"{agent_name}:", f"{name}({args})")

    return tools[name](**args)  # call corresponding function with provided arguments

让我们看一个包含更多代理的示例。

def escalate_to_human(summary):
    """Only call this if explicitly asked to."""
    print("Escalating to human agent...")
    print("\n=== Escalation Report ===")
    print(f"Summary: {summary}")
    print("=========================\n")
    exit()


def transfer_to_sales_agent():
    """User for anything sales or buying related."""
    return sales_agent


def transfer_to_issues_and_repairs():
    """User for issues, repairs, or refunds."""
    return issues_and_repairs_agent


def transfer_back_to_triage():
    """Call this if the user brings up a topic outside of your purview,
    including escalating to human."""
    return triage_agent


triage_agent = Agent(
    name="Triage Agent",
    instructions=(
        "You are a customer service bot for ACME Inc. "
        "Introduce yourself. Always be very brief. "
        "Gather information to direct the customer to the right department. "
        "But make your questions subtle and natural."
    ),
    tools=[transfer_to_sales_agent, transfer_to_issues_and_repairs, escalate_to_human],
)


def execute_order(product, price: int):
    """Price should be in USD."""
    print("\n\n=== Order Summary ===")
    print(f"Product: {product}")
    print(f"Price: ${price}")
    print("=================\n")
    confirm = input("Confirm order? y/n: ").strip().lower()
    if confirm == "y":
        print("Order execution successful!")
        return "Success"
    else:
        print("Order cancelled!")
        return "User cancelled order."


sales_agent = Agent(
    name="Sales Agent",
    instructions=(
        "You are a sales agent for ACME Inc."
        "Always answer in a sentence or less."
        "Follow the following routine with the user:"
        "1. Ask them about any problems in their life related to catching roadrunners.\n"
        "2. Casually mention one of ACME's crazy made-up products can help.\n"
        " - Don't mention price.\n"
        "3. Once the user is bought in, drop a ridiculous price.\n"
        "4. Only after everything, and if the user says yes, "
        "tell them a crazy caveat and execute their order.\n"
        ""
    ),
    tools=[execute_order, transfer_back_to_triage],
)


def look_up_item(search_query):
    """Use to find item ID.
    Search query can be a description or keywords."""
    item_id = "item_132612938"
    print("Found item:", item_id)
    return item_id


def execute_refund(item_id, reason="not provided"):
    print("\n\n=== Refund Summary ===")
    print(f"Item ID: {item_id}")
    print(f"Reason: {reason}")
    print("=================\n")
    print("Refund execution successful!")
    return "success"


issues_and_repairs_agent = Agent(
    name="Issues and Repairs Agent",
    instructions=(
        "You are a customer support agent for ACME Inc."
        "Always answer in a sentence or less."
        "Follow the following routine with the user:"
        "1. First, ask probing questions and understand the user's problem deeper.\n"
        " - unless the user has already provided a reason.\n"
        "2. Propose a fix (make one up).\n"
        "3. ONLY if not satesfied, offer a refund.\n"
        "4. If accepted, search for the ID and then execute refund."
        ""
    ),
    tools=[execute_refund, look_up_item, transfer_back_to_triage],
)

最后,我们可以在循环中运行它(这不会在 python 笔记本中运行,因此您可以在单独的 python 文件中尝试此操作)

agent = triage_agent
messages = []

while True:
    user = input("User: ")
    messages.append({"role": "user", "content": user})

    response = run_full_turn(agent, messages)
    agent = response.agent
    messages.extend(response.messages)

Swarm

作为概念验证,我们将这些想法打包到一个名为 Swarm 的示例库中。它仅作为示例,不应直接用于生产环境。但是,请随意采用这些想法和代码来构建您自己的库!