GPT Actions library - Snowflake Middleware

2024 年 8 月 14 日
在 Github 中打开

本指南详细介绍了如何将 ChatGPT 连接到 Snowflake 数据仓库,以便将 SQL 查询返回给 ChatGPT,以用于 数据分析。GPT 需要一个与中间件(即 Azure 函数)交互的 action,以便该 action 可以正确格式化来自 Snowflake 的响应,以供 Python notebook 环境使用。数据必须以文件形式返回,因此中间件函数应将 SQL 响应转换为 CSV/Excel 文件,大小在 10MB 以下。

本文档将概述中间件函数 GPT action。有关设置中间件函数本身的详细信息,请参阅GPT Actions library (Middleware) - Azure Functions。您可以将此 Snowflake 中间件 action 与直接连接到 Snowflake 的 action 结合使用,以启用一个 GPT,该 GPT 可以在执行 SQL 查询之前形成和测试它们。

现有的 Snowflake 客户可以利用这些指南从他们的数据仓库中查询数据,并将该数据加载到数据分析 Python 环境中以进行进一步的洞察分析。这使得 ChatGPT 驱动的分析成为可能,例如可视化数据集、识别模式/异常或识别数据清理目的的差距。此 GPT 可用于从相对较小的数据集中驱动业务决策,或通过 AI 探索数据子集以在 BI 工具中探索整体数据集时生成假设,从而节省时间和金钱,同时识别以前未见的模式。

在开始之前,请确保在您的应用环境中完成以下步骤

  • 配置 Snowflake 数据仓库
  • 确保通过 ChatGPT 验证身份进入 Snowflake 的用户具有访问数据库、模式和表的权限以及必要的角色

此外,在 Azure Function App 中创建应用程序之前,您需要一种处理用户身份验证的方法。您需要在 Azure Entra ID 中设置一个 OAuth 应用注册,该注册可以与 Snowflake 外部 OAuth 安全集成相关联。Snowflake 的外部 OAuth 安全集成允许外部系统颁发访问令牌,Snowflake 可以使用这些令牌来确定访问级别。在本例中,该外部令牌提供程序是 Azure Entra ID。由于 ChatGPT 将连接到 Azure 而不是 Snowflake,因此 GPT 用户的 OAuth 令牌将由 Azure 提供,并与其在 Entra ID 中的用户关联。因此,您需要一种方法将 Snowflake 中的用户映射到他们在 Azure 中的对应用户。

Azure 端和 Snowflake 端的所有必要步骤都列在下面。

我们将设置一个新的应用注册,在 Azure 中配置将要使用的必要的 Snowflake 范围,并检索 Snowflake 和 ChatGPT 中都需要的所有 OAuth 配置参数。本节将全部在 Azure 中完成,以便在下一节中,您将拥有必要的信息在 Snowflake 端配置时链接到此应用注册。

  1. 导航到 Microsoft Azure 门户 并进行身份验证。
  2. 导航到 Azure Entra ID(以前称为 Active Directory)。
  3. 单击 管理 下的 应用注册
  4. 单击 新注册
  5. 输入 Snowflake GPT OAuth Client 或类似的值作为 名称
  6. 验证 支持的帐户类型 是否设置为 单租户
  7. 暂时忽略重定向 URI。您将在配置 GPT 时返回此处
  8. 单击 注册
  9. 记下 目录(租户)ID (TENANT_ID) 在 基本信息 下。您将使用它来生成您的 AZURE_AD_ISSUERAZURE_AD_JWS_KEY_ENDPOINT
    • AZURE_AD_ISSUERhttps://sts.windows.net/TENANT_ID/
    • AZURE_AD_JWS_KEY_ENDPOINThttps://login.microsoftonline.com/TENANT_ID/discovery/v2.0/keys
  10. 单击 概述 界面中的 终结点
  11. 在右侧,记下 OAuth 2.0 授权终结点 (v2) 作为 AZURE_AD_OAUTH_AUTHORIZATION_ENDPOINTOAuth 2.0 令牌终结点 (v2) 作为 AZURE_AD_OAUTH_TOKEN_ENDPOINT
    • 终结点应类似于 https://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/authorizationhttps://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/token
  12. 单击 管理 下的 **公开 API **。
  13. 单击 应用程序 ID URI 旁边的 设置 链接以设置 Application ID URI
    • Application ID URI 在您组织的目录中必须是唯一的,例如 https://your.company.com/4d2a8c2b-a5f4-4b86-93ca-294185f45f2e。此值在后续配置步骤中将被称为 <SNOWFLAKE_APPLICATION_ID_URI>
  14. 要将 Snowflake 角色添加为 OAuth 范围,以便程序化客户端代表用户执行 OAuth 流程,请单击 添加范围 以添加表示 Snowflake 角色的范围。
    • 通过使用带有 session:scope: 前缀的 Snowflake 角色名称输入范围。例如,对于 Snowflake Analyst 角色,输入 session:scope:analyst
    • 选择谁可以同意。
    • 输入范围的 显示名称(例如:Account Admin)。
    • 输入范围的 描述(例如:可以管理 Snowflake 帐户)。
    • 单击 添加范围
    • 将范围保存为 AZURE_AD_SCOPE。它应该是您的 Application ID URI 和您的 Scope name 的串联
  15. 概述 部分中,从 应用程序(客户端)ID 字段复制 ClientID。这在以下步骤中将被称为 OAUTH_CLIENT_ID
  16. 单击 证书和机密,然后单击 新客户端机密
  17. 添加机密的描述。
  18. 选择 730 天(24 个月)。出于测试目的,选择不会很快过期的机密。
  19. 单击 添加。复制机密。这在以下步骤中将被称为 OAUTH_CLIENT_SECRET
  20. 对于将代表用户请求访问令牌的程序化客户端,请按如下方式配置应用程序的委派权限。
    • 单击 API 权限
    • 单击 添加权限
    • 单击 我的 API
    • 单击您在 在 Azure AD 中配置 OAuth 资源 中创建的 Snowflake OAuth 资源
    • 单击 委派权限 框。
    • 选中与您希望授予此客户端的应用程序中定义的范围相关的权限。
    • 单击 添加权限
    • 单击 授予管理员同意 按钮以授予客户端权限。请注意,出于测试目的,权限以这种方式配置。但是,在生产环境中,不建议以这种方式授予权限。
    • 单击

Azure Entra ID 中的应用注册完成后,下一步是将该应用注册通过外部 OAuth 安全集成链接到 Snowflake。安全集成的 external_oauth_audience_list 参数必须与您在配置 Azure Entra ID 时指定的 应用程序 ID URI 匹配。

IssuerJWS Keys endpoint 也将来自先前步骤中收集的值。用户映射属性 可以设置为 EMAIL_ADDRESSLOGIN_NAME,这就是用户的 Microsoft 登录凭据如何映射到他们在 Snowflake 中的用户,以确保 Snowflake 中的权限受到颁发给 ChatGPT 的访问令牌的尊重。

CREATE OR REPLACE SECURITY INTEGRATION AZURE_OAUTH_INTEGRATION
  TYPE = EXTERNAL_OAUTH
  ENABLED = TRUE
  EXTERNAL_OAUTH_TYPE = 'AZURE'
  EXTERNAL_OAUTH_ISSUER = '<AZURE_AD_ISSUER>'
  EXTERNAL_OAUTH_JWS_KEYS_URL = '<AZURE_AD_JWS_KEY_ENDPOINT>'
  EXTERNAL_OAUTH_AUDIENCE_LIST = ('<SNOWFLAKE_APPLICATION_ID_URI>')
  EXTERNAL_OAUTH_TOKEN_USER_MAPPING_CLAIM = 'upn'
  EXTERNAL_OAUTH_SNOWFLAKE_USER_MAPPING_ATTRIBUTE = 'EMAIL_ADDRESS';

确保在您的 Azure 环境中完成以下步骤

  • Azure 门户或 VS Code,具有创建 Azure Function Apps 和 Azure Entra App Registrations 的访问权限
  • 本指南中有一个详细部分,涉及部署和设计包装来自 Snowflake 的响应所需的函数,以便将查询结果作为 CSV 返回给 ChatGPT。Azure Function App 允许您的 GPT 摄取更大的数据集,因为 ChatGPT 可以从文件响应而不是从 application/json 有效负载中摄取更多数据。此外,这些数据集仅适用于数据分析(又名 Code Interpreter),响应格式为 CSV 文件。

现在我们已经创建了 GPT 并处理了 Azure/Snowflake 身份验证,我们可以创建 Azure Function App 本身来执行 SQL 查询并处理响应格式化,从而使 GPT 能够下载结果作为 CSV 以用于数据分析。

请按照此Azure Cookbook 指南了解有关部署 Azure Function App 的更多详细信息。您将在下面找到添加到该函数的示例代码。

此代码旨在提供方向性指导 - 虽然它应该可以开箱即用,但您应该根据您的 GPT 和 IT 设置的具体需求对其进行自定义。

您需要在 Azure Function App 中设置以下流程

  • 从 HTTP 请求中提取令牌并使用它连接到 Snowflake
  • 执行 SQL 查询并将结果写入 CSV
  • 临时将该 CSV 存储在 Blob Storage 中*
  • 生成一个预签名 URL 以安全地访问该 CSV*
  • 使用 openaiFileResponse 进行响应

*如果您使用文件流选项而不是url选项来将文件返回到您的 GPT,则可能不需要这些步骤。下面将对此进行更多介绍。

确保您已安装必要的库并将其导入到您的脚本中。除了 Python 标准库之外,此示例脚本还利用了以下库

import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions, ContentSettings
import snowflake.connector
import jwt    # pyjwt for token decoding

要连接到 Snowflake,您需要从 Authorization 标头中提取从 Azure Entra ID 分配的访问令牌,并在连接到 Snowflake 服务器时使用该令牌。

在此示例中,Snowflake 用户名是电子邮件地址,这简化了将从 HTTP 访问令牌中提取的 Entra ID 用户映射到连接所需的 Snowflake 用户 ID 的过程。如果您的组织情况并非如此,您可以在 Python 应用程序中将电子邮件地址映射到 Snowflake 用户 ID。

我的应用程序构建为与单个 Snowflake 帐户(即 ab12345.eastus2.azure)和 Warehouse 交互。如果您需要访问多个帐户或仓库,您可以考虑在您的 GPT action 参数中传递这些参数,以便您可以从 HTTP 请求中提取它们。

# Extract the token from the Authorization header
auth_header = req.headers.get('Authorization')
token_type, token = auth_header.split()

try:
    # Extract email address from token to use for Snowflake user mapping
    # If Snowflake usernames are not emails, then identify the username accordingly
    decoded_token = jwt.decode(token, options={"verify_signature": False})
    email = decoded_token.get('upn') 
    
    conn = snowflake.connector.connect(
        user=email, # Snowflake username, i.e., user's email in my example
        account=SNOWFLAKE_ACCOUNT, # Snowflake account, i.e., ab12345.eastus2.azure
        authenticator="oauth",
        token=token,
        warehouse=SNOWFLAKE_WAREHOUSE # Replace with Snowflake warehouse
    )
    logging.info("Successfully connected to Snowflake.")
except Exception as e:
    logging.error(f"Failed to connect to Snowflake: {e}")

连接到 Snowflake 后,您需要执行查询并将结果存储到 CSV 中。虽然 Snowflake 中的角色应防止任何有害查询的可能性,但您可能希望在您的应用程序中清理您的查询(下面未包含),就像您对任何其他程序化 SQL 查询执行一样。

# Extract SQL query from request parameters or body
sql_query = req.params.get('sql_query')

try:
    # Use the specified warehouse
    cursor = conn.cursor()

    # Execute the query
    cursor.execute(sql_query)
    results = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]
    logger.info(f"Query executed successfully: {sql_query}")

    # Convert results to CSV
    csv_file_path = write_results_to_csv(results, column_names)
except Exception as e:
    logger.error(f"Error executing query or processing data: {e}")


def write_results_to_csv(results, column_names):
    try:
        # Create a temporary file
        temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', newline='')
        csv_writer = csv.writer(temp_file)
        csv_writer.writerow(column_names)  # Write the column headers
        csv_writer.writerows(results)      # Write the data rows
        temp_file.close()  # Close the file to flush the contents
        return temp_file.name  # Return file path
    except Exception as e:
        logger.error(f"Error writing results to CSV: {e}")

有两种方法可以将文件返回给 ChatGPT 进行处理。您可以流式传输 base64 编码的数据以及 openaiFileResponse 列表响应中的 mimeType 和文件名,或者您可以返回URL 列表。在此解决方案中,我们将重点关注后者。

为此,您需要将 CSV 上传到 Azure Blob Storage 并返回一个预签名 URL,以便在 ChatGPT 中安全地访问该文件。重要的是要注意,为了在 ChatGPT 中下载 URL,您需要确保 URL 包括 content_type 和 content_disposition,如下例所示。如果您想检查 URL 是否具有必要的标头,可以使用任何终端中的 curl -I <url>

您需要获取 Azure 存储桶的连接字符串,如此处的说明。

def upload_csv_to_azure(file_path, container_name, blob_name, connect_str):
    try:
        # Create the BlobServiceClient object which will be used to create a container client
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)
        
        # Create a blob client using the local file name as the name for the blob
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

        # Upload the file with specified content settings
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True, content_settings=ContentSettings(
                content_type='text/csv',
                content_disposition=f'attachment; filename="{blob_name}"'
            ))
        logger.info(f"Successfully uploaded {file_path} to {container_name}/{blob_name}")

        # Generate a SAS token for the blob
        sas_token = generate_blob_sas(
            account_name=blob_service_client.account_name,
            container_name=container_name,
            blob_name=blob_name,
            account_key=blob_service_client.credential.account_key,
            permission=BlobSasPermissions(read=True),
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1)  # Token valid for 1 hour
        )

        # Generate a presigned URL using the SAS token
        url = f"https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"
        logger.info(f"Generated presigned URL: {url}")

        return url
    except Exception as e:
        logger.error(f"Error uploading file to Azure Blob Storage: {e}")
        raise

最后,您需要适当地格式化响应,以指示 ChatGPT 将该响应处理为一个或一系列文件。openaiFileResponse 是一个列表,最多可以包含 10 个 URL(或者在使用内联选项时为 base64 编码)。

# Format the response so ChatGPT treats it as a file
response = {
    'openaiFileResponse': [csv_url]
}
cursor.close()
conn.close()
return func.HttpResponse(
    json.dumps(response), 
    status_code=200
)

此应用程序有很多活动部件,因此测试您的 Azure Function App 可能很重要。ChatGPT 可能是一个难以测试的平台,因为请求和响应有时可能比调试所需的更不透明。通过 cURL 或 Postman 从更受控的环境中调用 HTTP 请求来对您的应用程序进行初始测试,将使您能够更轻松地调试和分类问题。一旦您确定响应在这些工具中按预期返回,您就可以构建您的 GPT 了。

创建自定义 GPT 后,在“指令”面板中使用以下文本作为灵感。有疑问吗?查看入门示例以了解此步骤如何更详细地工作。

重要的是 ChatGPT 了解您的表架构,以便正确形成 SQL 查询。有不同的方法可以做到这一点,并且此指令集代表最直接的方法。我们正在努力发布针对您可能想要构建的不同版本的 Snowflake GPT 的其他说明,以允许使用多个不同的表、模式和数据库,甚至动态学习随时间变化的模式。

以下是一些在使用单个模式和表时的基本说明。此 GPT 已针对单个用例(分析 2013 年 1 月从纽约市出发的航班数据)进行了优化,这允许最简单的指令提供最可靠的 GPT 性能。

您是编写 SQL 查询以从 Snowflake 获取数据的专家。您帮助用户将其提示转换为 SQL 查询。任何关于航班数据的问题都将转换为 Snowflake SQL 查询,该查询命中表 FLIGHTS.PUBLIC.JAN_2013_NYC。将任何查询传递到“sql_query”参数

表的架构包括

ID	NUMBER	A unique identifier for each flight
YEAR	NUMBER	The year of the flight
MONTH	NUMBER	The month of the flight
DAY		NUMBER	The day of the month on which the flight departed
DEP_TIME	NUMBER	The actual departure time of the flight
SCHED_DEP_TIME	NUMBER	The scheduled departure time of the flight
DEP_DELAY	NUMBER	The departure delay in minutes (negative values indicate early departures)
ARR_TIME	NUMBER	The actual arrival time of the flight
SCHED_ARR_TIME	NUMBER	The scheduled arrival time of the flight
ARR_DELAY	NUMBER	The arrival delay in minutes (negative values indicate early arrivals)
CARRIER_CODE	TEXT	The carrier code of the airline
FLIGHT	NUMBER	The flight number
TAILNUM	TEXT	The aircraft tail number
ORIGIN_AIRPORT_CODE	TEXT	The origin airport code
DEST_AIRPORT_CODE	TEXT	The destination airport code
AIR_TIME	NUMBER	The total airtime of the flight in minutes
DISTANCE	NUMBER	The distance traveled by the flight in miles
HOUR	NUMBER	The hour part of the scheduled departure time
MINUTE	NUMBER	The minute part of the scheduled departure time
TIME_HOUR	NUMBER	The time at which the flight departed (rounded to the nearest hour)
CARRIER_NAME	TEXT	The full name of the airline carrier
ORIGIN_AIRPORT_NAME	TEXT	The full name of the origin airport
ORIGIN_REGION	TEXT	The region code of the origin airport
ORIGIN_MUNICIPALITY	TEXT	The city where the origin airport is located
ORIGIN_COORDINATES	TEXT	The geographical coordinates of the origin airport
DEST_AIRPORT_NAME	TEXT	The full name of the destination airport
DEST_REGION	TEXT	The region code of the destination airport
DEST_MUNICIPALITY	TEXT	The city where the destination airport is located
DEST_COORDINATES	TEXT	The geographical coordinates of the destination airport

当用户询问有关航班数据时,请执行以下操作

  1. 使用 executeSQL action 向 Azure 函数终结点发送 POST 请求
  2. 接收作为 Action 响应一部分返回的文件。将其显示为电子表格
  3. 对文件执行分析并提供用户要求的必要信息

用户希望在代码解释器中询问有关数据的问题,因此对于您提取的数据集中的任何数据分析见解,请使用代码解释器。

创建自定义 GPT 后,将以下文本复制到“Actions”面板中,将占位符值替换为您特定的函数详细信息,并根据您构建到 Azure Function App 中的任何其他输入来更新您的参数。

有疑问吗?查看入门示例以了解此步骤如何更详细地工作。

openapi: 3.1.0
info:
  title: Snowflake GPT API
  description: API to execute SQL queries on Snowflake and get the results as a CSV file URL.
  version: 1.0.0
servers:
  - url: https://<server-name>.azurewebsites.net
    description: Azure Function App server running Snowflake integration application
paths:
  /api/<function_name>?code=<code>:
    post:
      operationId: executeSQL
      summary: Executes a SQL query on Snowflake and returns the result file URL as a CSV.
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                sql_query:
                  type: string
                  description: The SQL query to be executed on Snowflake.
              required:
                - sql_query
      responses:
        '200':
          description: Successfully executed the query.
          content:
            application/json:
              schema:
                type: object
                properties:
                  openaiFileResponse:
                    type: array
                    items:
                      type: string
                      format: uri
                    description: Array of URLs pointing to the result files.
        '401':
          description: Unauthorized. Missing or invalid authentication token.
        '400':
          description: Bad Request. The request was invalid or cannot be otherwise served.
        '500':
          description: Internal Server Error. An error occurred on the server.
components:
  schemas: {} 
  • 返回给 ChatGPT 的文件大小限制为 10MB。如果返回的文件大于此限制,您的请求可能会失败。如果您发现自己遇到这些限制,请确保在您的 SQL 命令中包含 LIMIT。
  • 为什么首先需要 Azure Function App? ChatGPT 的数据分析功能(又名 Code Interpreter)依赖于一个安全的 Python 环境,该环境与模型的上下文窗口是分开的。今天,传递到数据分析的数据必须通过上传文件来完成。GPT action 返回数据然后必须将数据作为 CSV 或其他数据文件类型返回。为了通过 GPT action 返回文件,响应必须包装在 openaiFileResponse 对象中。这需要自定义代码来正确格式化响应。
  • 我的公司使用的云提供商与 Azure 不同。 对于通过 GPT action 将其他中间件函数连接到 ChatGPT,请参阅其他 AWSGCP 中间件 cookbook。您可以使用本 cookbook 中讨论的概念来为构建中间件应用程序时的注意事项提供建议,但将该中间件连接到 Snowflake 对于不同的云提供商可能会有所不同。例如,Snowflake 构建了一个专门用于与 Azure Entra ID 链接的外部 OAuth 集成。
  • 如何限制我的 GPT 可以访问的数据集? 限制 ChatGPT 在 Snowflake 中拥有的访问范围可能很重要。有几种方法可以做到这一点
    • Snowflake 角色可以限制谁有权访问哪些表,并且将受到 Azure Entra ID 提供的 GPT 用户访问令牌的尊重
    • 在您的中间件函数中,您可以添加健全性检查以验证访问的表是否已获得该应用程序的批准
    • 您可能想要生成一个全新的数据库/仓库,专门用于与 ChatGPT 集成,该数据库/仓库已清理掉任何敏感信息,例如 PII。
  • Schema 调用错误的仓库或数据集: 如果 ChatGPT 调用错误的仓库或数据库,请考虑更新您的指令,使其更明确 (a) 应调用哪个仓库/数据库,或者 (b) 要求用户在运行查询之前提供这些确切的详细信息

您希望我们优先考虑哪些集成?我们的集成中是否存在错误?在我们的 github 中提交 PR 或 issue,我们将查看。