如何使用函数调用自动化 AWS 任务

2023 年 9 月 27 日
在 Github 中打开

此代码演示了如何与 ChatGPT 函数交互以执行与 Amazon S3 存储桶相关的任务。该笔记本涵盖了 S3 存储桶密钥功能,例如运行简单的列表命令、在所有存储桶中搜索特定文件、将文件上传到存储桶以及从存储桶下载文件。OpenAI Chat API 理解用户指令,生成自然语言响应,并根据用户输入提取适当的函数调用。

要求:要运行笔记本,请生成具有 S3 存储桶写入权限的 AWS 访问密钥,并将其与 Openai 密钥一起存储在本地环境文件中。“.env”文件格式

AWS_ACCESS_KEY_ID=<your-key>
AWS_SECRET_ACCESS_KEY=<your-key>
OPENAI_API_KEY=<your-key>
! pip install openai
! pip install boto3
! pip install tenacity
! pip install python-dotenv
from openai import OpenAI
import json
import boto3
import os
import datetime
from urllib.request import urlretrieve

# load environment variables
from dotenv import load_dotenv
load_dotenv() 
True
OpenAI.api_key = os.environ.get("OPENAI_API_KEY")
GPT_MODEL = "gpt-3.5-turbo"
# Optional - if you had issues loading the environment file, you can set the AWS values using the below code
# os.environ['AWS_ACCESS_KEY_ID'] = ''
# os.environ['AWS_SECRET_ACCESS_KEY'] = ''

# Create S3 client
s3_client = boto3.client('s3')

# Create openai client
client = OpenAI()

为了将用户问题或命令连接到适当的函数,我们需要向 ChatGPT 提供必要的函数详细信息和预期参数。

# Functions dict to pass S3 operations details for the GPT model
functions = [
    {   
        "type": "function",
        "function":{
            "name": "list_buckets",
            "description": "List all available S3 buckets",
            "parameters": {
                "type": "object",
                "properties": {}
            }
        }
    },
    {
        "type": "function",
        "function":{
            "name": "list_objects",
            "description": "List the objects or files inside a given S3 bucket",
            "parameters": {
                "type": "object",
                "properties": {
                    "bucket": {"type": "string", "description": "The name of the S3 bucket"},
                    "prefix": {"type": "string", "description": "The folder path in the S3 bucket"},
                },
                "required": ["bucket"],
            },
        }
    },
    {   
        "type": "function",
        "function":{
            "name": "download_file",
            "description": "Download a specific file from an S3 bucket to a local distribution folder.",
            "parameters": {
                "type": "object",
                "properties": {
                    "bucket": {"type": "string", "description": "The name of the S3 bucket"},
                    "key": {"type": "string", "description": "The path to the file inside the bucket"},
                    "directory": {"type": "string", "description": "The local destination directory to download the file, should be specificed by the user."},
                },
                "required": ["bucket", "key", "directory"],
            }
        }
    },
    {
        "type": "function",
        "function":{
            "name": "upload_file",
            "description": "Upload a file to an S3 bucket",
            "parameters": {
                "type": "object",
                "properties": {
                    "source": {"type": "string", "description": "The local source path or remote URL"},
                    "bucket": {"type": "string", "description": "The name of the S3 bucket"},
                    "key": {"type": "string", "description": "The path to the file inside the bucket"},
                    "is_remote_url": {"type": "boolean", "description": "Is the provided source a URL (True) or local path (False)"},
                },
                "required": ["source", "bucket", "key", "is_remote_url"],
            }
        }
    },
    {
        "type": "function",
        "function":{
            "name": "search_s3_objects",
            "description": "Search for a specific file name inside an S3 bucket",
            "parameters": {
                "type": "object",
                "properties": {
                    "search_name": {"type": "string", "description": "The name of the file you want to search for"},
                    "bucket": {"type": "string", "description": "The name of the S3 bucket"},
                    "prefix": {"type": "string", "description": "The folder path in the S3 bucket"},
                    "exact_match": {"type": "boolean", "description": "Set exact_match to True if the search should match the exact file name. Set exact_match to False to compare part of the file name string (the file contains)"}
                },
                "required": ["search_name"],
            },
        }
    }
]

创建辅助函数以与 S3 服务交互,例如列出存储桶、列出对象、下载和上传文件以及搜索特定文件。

def datetime_converter(obj):
    if isinstance(obj, datetime.datetime):
        return obj.isoformat()
    raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
def list_buckets():
    response = s3_client.list_buckets()
    return json.dumps(response['Buckets'], default=datetime_converter)

def list_objects(bucket, prefix=''):
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    return json.dumps(response.get('Contents', []), default=datetime_converter)

def download_file(bucket, key, directory):
    
    filename = os.path.basename(key)
    
    # Resolve destination to the correct file path
    destination = os.path.join(directory, filename)
    
    s3_client.download_file(bucket, key, destination)
    return json.dumps({"status": "success", "bucket": bucket, "key": key, "destination": destination})

def upload_file(source, bucket, key, is_remote_url=False):
    if is_remote_url:
        file_name = os.path.basename(source)
        urlretrieve(source, file_name)
        source = file_name
       
    s3_client.upload_file(source, bucket, key)
    return json.dumps({"status": "success", "source": source, "bucket": bucket, "key": key})

def search_s3_objects(search_name, bucket=None, prefix='', exact_match=True):
    search_name = search_name.lower()
    
    if bucket is None:
        buckets_response = json.loads(list_buckets())
        buckets = [bucket_info["Name"] for bucket_info in buckets_response]
    else:
        buckets = [bucket]

    results = []

    for bucket_name in buckets:
        objects_response = json.loads(list_objects(bucket_name, prefix))
        if exact_match:
            bucket_results = [obj for obj in objects_response if search_name == obj['Key'].lower()]
        else:
            bucket_results = [obj for obj in objects_response if search_name in obj['Key'].lower()]

        if bucket_results:
            results.extend([{"Bucket": bucket_name, "Object": obj} for obj in bucket_results])

    return json.dumps(results)

以下字典将名称与函数连接起来,以便根据 ChatGPT 响应将其用于执行。

available_functions = {
    "list_buckets": list_buckets,
    "list_objects": list_objects,
    "download_file": download_file,
    "upload_file": upload_file,
    "search_s3_objects": search_s3_objects
}
def chat_completion_request(messages, functions=None, function_call='auto', 
                            model_name=GPT_MODEL):
    
    if functions is not None:
        return client.chat.completions.create(
            model=model_name,
            messages=messages,
            tools=functions,
            tool_choice=function_call)
    else:
        return client.chat.completions.create(
            model=model_name,
            messages=messages)

为聊天机器人创建一个主函数,该函数接受用户输入,将其发送到 OpenAI Chat API,接收响应,执行 API 生成的任何函数调用,并向用户返回最终响应。

def run_conversation(user_input, topic="S3 bucket functions.", is_log=False):

    system_message=f"Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous. If the user ask question not related to {topic} response your scope is {topic} only."
    
    messages = [{"role": "system", "content": system_message},
                {"role": "user", "content": user_input}]
    
    # Call the model to get a response
    response = chat_completion_request(messages, functions=functions)
    response_message = response.choices[0].message
    
    if is_log:
        print(response.choices)
    
    # check if GPT wanted to call a function
    if response_message.tool_calls:
        function_name = response_message.tool_calls[0].function.name
        function_args = json.loads(response_message.tool_calls[0].function.arguments)
        
        # Call the function
        function_response = available_functions[function_name](**function_args)
        
        # Add the response to the conversation
        messages.append(response_message)
        messages.append({
            "role": "tool",
            "content": function_response,
            "tool_call_id": response_message.tool_calls[0].id,
        })
        
        # Call the model again to summarize the results
        second_response = chat_completion_request(messages)
        final_message = second_response.choices[0].message.content
    else:
        final_message = response_message.content

    return final_message

S3 存储桶机器人测试

在以下示例中,请务必在执行前将占位符(例如 <file_name><bucket_name><directory_path>)替换为您特定的值。

让我们首先列出所有可用的存储桶。

print(run_conversation('list my S3 buckets'))

您可以要求助手在所有存储桶或特定存储桶中搜索特定文件名。

search_file = '<file_name>'
print(run_conversation(f'search for a file {search_file} in all buckets'))
search_word = '<file_name_part>'
bucket_name = '<bucket_name>'
print(run_conversation(f'search for a file contains {search_word} in {bucket_name}'))

如果参数值存在歧义(如系统消息中所述),则模型应澄清用户的要求。

print(run_conversation('search for a file'))
Sure, to help me find what you're looking for, could you please provide the name of the file you want to search for and the name of the S3 bucket? Also, should the search match the file name exactly, or should it also consider partial matches?

验证边缘情况

我们还指示模型拒绝不相关的任务。让我们测试一下,看看它在实际操作中是如何工作的。

# the model should not answer details not related to the scope
print(run_conversation('what is the weather today'))
Apologies for the misunderstanding, but I am only able to assist with S3 bucket functions. Can you please ask a question related to S3 bucket functions?

提供的函数不仅限于检索信息。它们还可以帮助用户上传或下载文件。

search_file = '<file_name>'
bucket_name = '<bucket_name>'
local_directory = '<directory_path>'
print(run_conversation(f'download {search_file} from {bucket_name} bucket to {local_directory} directory'))
local_file = '<file_name>'
bucket_name = '<bucket_name>'
print(run_conversation(f'upload {local_file} to {bucket_name} bucket'))