GCP BigQuery 向量搜索与 GCP 函数和 ChatGPT 中的 GPT Actions

,
2024 年 8 月 2 日
在 Github 中打开

本笔记本提供了关于如何使用 Google Cloud BigQuery 作为具有向量搜索功能的数据库的逐步说明,结合 OpenAI 嵌入,然后创建一个 Google Cloud Function 以插入 ChatGPT 中的自定义 GPT。

对于希望在 Google Cloud Platform (GCP) 内设置 RAG 基础设施,并将其作为端点公开以与其他平台(如 ChatGPT)集成的客户来说,这可能是一个解决方案。

Google Cloud BigQuery 是一种完全托管的无服务器数据仓库,它利用 Google 基础设施的处理能力实现超快速 SQL 查询。它允许开发人员轻松存储和分析海量数据集。

Google Cloud Functions 是一种轻量级、基于事件的异步计算解决方案,允许您创建小型、单一用途的函数,以响应云事件,而无需管理服务器或运行时环境。

先决条件

要运行此 Cookbook,您必须具备:

  • 您可以访问的 GCP 项目
  • 具有创建 BigQuery 数据集和 Google Cloud Function 权限的 GCP 用户
  • GCP CLI 已安装并连接
  • OpenAI API 密钥
  • ChatGPT Plus、Teams 或 Enterprise 订阅

架构

以下是此解决方案的架构图,我们将逐步介绍

bigquery-rag-architecture.png

目录

  1. 环境设置 通过安装和导入所需的库并配置我们的 GCP 设置来设置环境。包括:

  2. 准备数据 通过嵌入文档以及捕获其他元数据来准备用于上传的数据。我们将使用 OpenAI 文档的子集作为示例数据。

  3. 创建具有向量搜索功能的 BigQuery 表
    创建一个 BigQuery 表并上传我们准备好的数据。包括:

  4. 创建 GCP 函数 使用 gcloud CLI 和先前计算的环境变量

  5. 在 ChatGPT 的自定义 GPT 中输入 对 BigQuery 中嵌入的数据执行搜索

! pip install -q google-auth
! pip install -q openai
! pip install -q pandas
! pip install -q google-cloud-functions
! pip install -q python-dotenv
! pip install -q pyperclip
! pip install -q PyPDF2
! pip install -q tiktoken
! pip install -q google-cloud-bigquery
! pip install -q pyyaml
# Standard Libraries
import json  
import os
import csv
import shutil
from itertools import islice
import concurrent.futures
import yaml

# Third-Party Libraries
import pandas as pd
import numpy as np
from PyPDF2 import PdfReader
import tiktoken
from dotenv import load_dotenv
import pyperclip

# OpenAI Libraries
from openai import OpenAI

# Google Cloud Identity and Credentials
from google.auth import default
from google.cloud import bigquery
from google.cloud import functions_v1

配置 GCP 项目

如果尚未设置,我们将安装 GCP CLI,验证 GCP 身份并设置您的默认项目。

# Add gcloud to PATH
os.environ['PATH'] += os.pathsep + os.path.expanduser('~/google-cloud-sdk/bin')

# Verify gcloud is in PATH
! gcloud --version
! gcloud auth application-default login
project_id = "<insert_project_id>"  # Replace with your actual project ID
! gcloud config set project {project_id}
! gcloud services enable cloudfunctions.googleapis.com
! gcloud services enable cloudbuild.googleapis.com
! gcloud services enable bigquery.googleapis.com

配置 OpenAI 设置

本节指导您完成 OpenAI 的身份验证设置。在完成本节之前,请确保您拥有 OpenAI API 密钥。

openai_api_key = os.environ.get("OPENAI_API_KEY", "<your OpenAI API key if not set as an env var>") # Saving this as a variable to reference in function app in later step
openai_client = OpenAI(api_key=openai_api_key)
embeddings_model = "text-embedding-3-small" # We'll use this by default, but you can change to your text-embedding-3-large if desired
from google.auth import default

# Use default credentials
credentials, project_id = default()
region = "us-central1" # e.g: "us-central1"
print("Default Project ID:", project_id)

准备数据

我们将嵌入 OpenAI 文档的几个页面并将其存储在 oai_docs 文件夹中。我们将首先嵌入每个页面,将其添加到 CSV 文件,然后使用该 CSV 文件上传到索引。

我们将使用本 cookbook中突出显示的一些技术。这是一种快速嵌入文本的方法,无需考虑诸如章节之类的变量,使用我们的视觉模型来描述图像/图表/示意图,以及较长文档的块之间重叠的文本等。

为了处理超出 8191 个 token 上下文的较长文本文件,我们可以分别使用块嵌入,或者以某种方式组合它们,例如平均(按每个块的大小加权)。

我们将从 Python 自己的 cookbook 中获取一个将序列分解为块的函数。

def batched(iterable, n):
    """Batch data into tuples of length n. The last batch may be shorter."""
    # batched('ABCDEFG', 3) --> ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    it = iter(iterable)
    while (batch := tuple(islice(it, n))):
        yield batch

现在我们定义一个函数,该函数将字符串编码为 token,然后将其分解为块。我们将使用 tiktoken,这是 OpenAI 的一个快速开源分词器。

要了解有关使用 Tiktoken 计算 token 的更多信息,请查看本 cookbook

def chunked_tokens(text, chunk_length, encoding_name='cl100k_base'):
    # Get the encoding object for the specified encoding name. OpenAI's tiktoken library, which is used in this notebook, currently supports two encodings: 'bpe' and 'cl100k_base'. The 'bpe' encoding is used for GPT-3 and earlier models, while 'cl100k_base' is used for newer models like GPT-4.
    encoding = tiktoken.get_encoding(encoding_name)
    # Encode the input text into tokens
    tokens = encoding.encode(text)
    # Create an iterator that yields chunks of tokens of the specified length
    chunks_iterator = batched(tokens, chunk_length)
    # Yield each chunk from the iterator
    yield from chunks_iterator

最后,我们可以编写一个函数,该函数可以安全地处理嵌入请求,即使输入文本长度超过最大上下文长度,方法是将输入 token 分块并分别嵌入每个块。可以将 average 标志设置为 True 以返回块嵌入的加权平均值,或设置为 False 以仅返回未修改的块嵌入列表。

注意:您可以采取其他技术,包括:

  • 使用 GPT-4o 捕获图像/图表描述以进行嵌入
  • 基于段落或章节进行分块
  • 添加关于每篇文章的更具描述性的元数据。
EMBEDDING_CTX_LENGTH = 8191
EMBEDDING_ENCODING='cl100k_base'
def generate_embeddings(text, model):
    # Generate embeddings for the provided text using the specified model
    embeddings_response = openai_client.embeddings.create(model=model, input=text)
    # Extract the embedding data from the response
    embedding = embeddings_response.data[0].embedding
    return embedding

def len_safe_get_embedding(text, model=embeddings_model, max_tokens=EMBEDDING_CTX_LENGTH, encoding_name=EMBEDDING_ENCODING):
    # Initialize lists to store embeddings and corresponding text chunks
    chunk_embeddings = []
    chunk_texts = []
    # Iterate over chunks of tokens from the input text
    for chunk in chunked_tokens(text, chunk_length=max_tokens, encoding_name=encoding_name):
        # Generate embeddings for each chunk and append to the list
        chunk_embeddings.append(generate_embeddings(chunk, model=model))
        # Decode the chunk back to text and append to the list
        chunk_texts.append(tiktoken.get_encoding(encoding_name).decode(chunk))
    # Return the list of chunk embeddings and the corresponding text chunks
    return chunk_embeddings, chunk_texts

接下来,我们可以定义一个辅助函数,该函数将捕获关于文档的其他元数据。在本例中,我将从类别列表中选择一个类别,以便稍后在元数据过滤器中使用

categories = ['authentication','models','techniques','tools','setup','billing_limits','other']

def categorize_text(text, categories):

    # Create a prompt for categorization
    messages = [
        {"role": "system", "content": f"""You are an expert in LLMs, and you will be given text that corresponds to an article in OpenAI's documentation.
         Categorize the document into one of these categories: {', '.join(categories)}. Only respond with the category name and nothing else."""},
        {"role": "user", "content": text}
    ]
    try:
        # Call the OpenAI API to categorize the text
        response = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )

        # Extract the category from the response
        category = response.choices[0].message.content
        return category
    except Exception as e:
        print(f"Error categorizing text: {str(e)}")
        return None

# Example usage

现在,我们可以定义一些辅助函数来处理 oai_docs 文件夹中的 .txt 文件。随意在您自己的数据上使用它,它支持 .txt 和 .pdf 文件。

def extract_text_from_pdf(pdf_path):
    # Initialize the PDF reader
    reader = PdfReader(pdf_path)
    text = ""
    # Iterate through each page in the PDF and extract text
    for page in reader.pages:
        text += page.extract_text()
    return text

def process_file(file_path, idx, categories, embeddings_model):
    file_name = os.path.basename(file_path)
    print(f"Processing file {idx + 1}: {file_name}")
    
    # Read text content from .txt files
    if file_name.endswith('.txt'):
        with open(file_path, 'r', encoding='utf-8') as file:
            text = file.read()
    # Extract text content from .pdf files
    elif file_name.endswith('.pdf'):
        text = extract_text_from_pdf(file_path)
    
    title = file_name
    # Generate embeddings for the title
    title_vectors, title_text = len_safe_get_embedding(title, embeddings_model)
    print(f"Generated title embeddings for {file_name}")
    
    # Generate embeddings for the content
    content_vectors, content_text = len_safe_get_embedding(text, embeddings_model)
    print(f"Generated content embeddings for {file_name}")
    
    category = categorize_text(' '.join(content_text), categories)
    print(f"Categorized {file_name} as {category}")
    
    # Prepare the data to be appended
    data = []
    for i, content_vector in enumerate(content_vectors):
        data.append({
            "id": f"{idx}_{i}",
            "vector_id": f"{idx}_{i}",
            "title": title_text[0],
            "text": content_text[i],
            "title_vector": json.dumps(title_vectors[0]),  # Assuming title is short and has only one chunk
            "content_vector": json.dumps(content_vector),
            "category": category
        })
        print(f"Appended data for chunk {i + 1}/{len(content_vectors)} of {file_name}")
    
    return data

我们现在将使用此辅助函数来处理我们的 OpenAI 文档。随意更新此函数以通过更改下面 process_files 中的文件夹来使用您自己的数据。

请注意,这将并发处理所选文件夹中的文档,因此如果使用 txt 文件,则应花费 <30 秒,如果使用 PDF,则时间稍长。

## Customize the location below if you are using different data besides the OpenAI documentation. Note that if you are using a different dataset, you will need to update the categories list as well.
folder_name = "../../../data/oai_docs"

files = [os.path.join(folder_name, f) for f in os.listdir(folder_name) if f.endswith('.txt') or f.endswith('.pdf')]
data = []

# Process each file concurrently
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(process_file, file_path, idx, categories, embeddings_model): idx for idx, file_path in enumerate(files)}
    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            data.extend(result)
        except Exception as e:
            print(f"Error processing file: {str(e)}")

# Write the data to a CSV file
csv_file = os.path.join("..", "embedded_data.csv")
with open(csv_file, 'w', newline='', encoding='utf-8') as csvfile:
    fieldnames = ["id", "vector_id", "title", "text", "title_vector", "content_vector","category"]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()
    for row in data:
        writer.writerow(row)
        print(f"Wrote row with id {row['id']} to CSV")

# Convert the CSV file to a Dataframe
article_df = pd.read_csv("../embedded_data.csv")
# Read vectors from strings back into a list using json.loads
article_df["title_vector"] = article_df.title_vector.apply(json.loads)
article_df["content_vector"] = article_df.content_vector.apply(json.loads)
article_df["vector_id"] = article_df["vector_id"].apply(str)
article_df["category"] = article_df["category"].apply(str)
article_df.head()

我们现在有一个 embedded_data.csv 文件,其中包含六列,我们可以将其上传到我们的向量数据库!

创建具有向量搜索功能的 BigQuery 表

创建 BigQuery 数据集

我们将利用 Google SDK 并创建一个名为“oai_docs”的数据集,表名为“embedded_data”,但您可以随意更改这些变量(您也可以更改区域)。

PS:我们不会创建 BigQuery 索引,这可以提高向量搜索的性能,因为这样的索引在我们的数据集中需要超过 1k 行,而我们的示例中没有这么多行,但您可以随意在您自己的用例中利用它。

# Create bigquery table

from google.cloud import bigquery
from google.api_core.exceptions import Conflict

# Define the dataset ID (project_id.dataset_id)
raw_dataset_id = 'oai_docs'
dataset_id = project_id + '.' + raw_dataset_id

client = bigquery.Client(credentials=credentials, project=project_id)

# Construct a full Dataset object to send to the API
dataset = bigquery.Dataset(dataset_id)

# Specify the geographic location where the dataset should reside
dataset.location = "US"

# Send the dataset to the API for creation
try:
    dataset = client.create_dataset(dataset, timeout=30)
    print(f"Created dataset {client.project}.{dataset.dataset_id}")
except Conflict:
    print(f"dataset {dataset.dataset_id } already exists")
    
# Read the CSV file, properly handling multiline fields
csv_file_path = "../embedded_data.csv"
df = pd.read_csv(csv_file_path, engine='python', quotechar='"', quoting=1)

# Display the first few rows of the dataframe
df.head()

创建表并上传数据

我们将使用属性名称和类型创建表。请注意 'content_vector' 属性,该属性允许为单行存储浮点向量,我们将将其用于向量搜索。

然后,此代码将循环处理我们先前创建的 CSV 文件,以将行插入 Bigquery。如果您多次运行此代码,则会插入多个相同的行,这将在进行搜索时给出不太准确的结果(您可以在 ID 上设置唯一性或每次清理数据库)。

# Read the CSV file, properly handling multiline fields
dataset_id = project_id + '.' + raw_dataset_id
client = bigquery.Client(credentials=credentials, project=project_id)
csv_file_path = "../embedded_data.csv"
df = pd.read_csv(csv_file_path, engine='python', quotechar='"', quoting=1)

# Preprocess the data to ensure content_vector is correctly formatted
# removing last and first character which are brackets [], comma splitting and converting to float
def preprocess_content_vector(row):
    row['content_vector'] = [float(x) for x in row['content_vector'][1:-1].split(',')]
    return row

# Apply preprocessing to the dataframe
df = df.apply(preprocess_content_vector, axis=1)

# Define the schema of the final table
final_schema = [
    bigquery.SchemaField("id", "STRING"),
    bigquery.SchemaField("vector_id", "STRING"),
    bigquery.SchemaField("title", "STRING"),
    bigquery.SchemaField("text", "STRING"),
    bigquery.SchemaField("title_vector", "STRING"),
    bigquery.SchemaField("content_vector", "FLOAT64", mode="REPEATED"),
    bigquery.SchemaField("category", "STRING"),
]

# Define the final table ID
raw_table_id = 'embedded_data'
final_table_id = f'{dataset_id}.' + raw_table_id

# Create the final table object
final_table = bigquery.Table(final_table_id, schema=final_schema)

# Send the table to the API for creation
final_table = client.create_table(final_table, exists_ok=True)  # API request
print(f"Created final table {project_id}.{final_table.dataset_id}.{final_table.table_id}")

# Convert DataFrame to list of dictionaries for BigQuery insertion
rows_to_insert = df.to_dict(orient='records')

# Upload data to the final table
errors = client.insert_rows_json(f"{final_table.dataset_id}.{final_table.table_id}", rows_to_insert)  # API request

if errors:
    print(f"Encountered errors while inserting rows: {errors}")
else:
    print(f"Successfully loaded data into {dataset_id}:{final_table_id}")

测试搜索

现在数据已上传,我们将在本地测试纯向量相似度搜索和带有元数据过滤的搜索,以确保它按预期工作。

您可以测试纯向量搜索和元数据过滤。

以下查询是纯向量搜索,我们没有按类别进行过滤。

query = "What model should I use to embed?"
category = "models"

embedding_query = generate_embeddings(query, embeddings_model)
embedding_query_list = ', '.join(map(str, embedding_query))

query = f"""
WITH search_results AS (
  SELECT query.id AS query_id, base.id AS base_id, distance
  FROM VECTOR_SEARCH(
    TABLE oai_docs.embedded_data, 'content_vector',
    (SELECT ARRAY[{embedding_query_list}] AS content_vector, 'query_vector' AS id),
    top_k => 2, distance_type => 'COSINE', options => '{{"use_brute_force": true}}')
)
SELECT sr.query_id, sr.base_id, sr.distance, ed.text, ed.title
FROM search_results sr
JOIN oai_docs.embedded_data ed ON sr.base_id = ed.id
ORDER BY sr.distance ASC
"""

query_job = client.query(query)
results = query_job.result()  # Wait for the job to complete

for row in results:
    print(f"query_id: {row['query_id']}, base_id: {row['base_id']}, distance: {row['distance']}, text_truncated: {row['text'][0:100]}")

执行带有元数据过滤的搜索

元数据过滤允许在具有最接近的向量搜索语义结果之上,限制具有某些属性的查找结果。

提供的代码片段演示了如何执行带有元数据过滤的查询


query = "What model should I use to embed?"
category = "models"

embedding_query = generate_embeddings(query, embeddings_model)
embedding_query_list = ', '.join(map(str, embedding_query))


query = f"""
WITH search_results AS (
  SELECT query.id AS query_id, base.id AS base_id, distance
  FROM VECTOR_SEARCH(
    (SELECT * FROM oai_docs.embedded_data WHERE category = '{category}'), 
    'content_vector',
    (SELECT ARRAY[{embedding_query_list}] AS content_vector, 'query_vector' AS id),
    top_k => 4, distance_type => 'COSINE', options => '{{"use_brute_force": true}}')
)
SELECT sr.query_id, sr.base_id, sr.distance, ed.text, ed.title, ed.category
FROM search_results sr
JOIN oai_docs.embedded_data ed ON sr.base_id = ed.id
ORDER BY sr.distance ASC
"""


query_job = client.query(query)
results = query_job.result()  # Wait for the job to complete

for row in results:
    print(f"category: {row['category']}, title: {row['title']}, base_id: {row['base_id']}, distance: {row['distance']}, text_truncated: {row['text'][0:100]}")

创建 GCP 函数

导出变量

我们将在本文件夹的 main.py 中部署该函数(也可在此处找到:here)。

在第一步中,我们将导出变量以定位我们的表/数据集,并使用 OpenAI 的 API 生成嵌入。

# Create a dictionary to store the environment variables (they were used previously and are just retrieved)
env_variables = {
    'OPENAI_API_KEY': openai_api_key,
    'EMBEDDINGS_MODEL': embeddings_model,
    'PROJECT_ID': project_id,
    'DATASET_ID': raw_dataset_id,
    'TABLE_ID': raw_table_id
}

# Write the environment variables to a YAML file
with open('env.yml', 'w') as yaml_file:
    yaml.dump(env_variables, yaml_file, default_flow_style=False)

print("env.yml file created successfully.")

部署函数

我们现在将为我们当前的项目创建一个名为“openai_docs_search”的 google 函数,为此,我们将启动以下 CLI 命令,利用先前创建的环境变量。请注意,此函数可以从任何地方调用,无需身份验证,请勿将其用于生产环境,或添加额外的身份验证机制。

! gcloud functions deploy openai_docs_search \
  --runtime python39 \
  --trigger-http \
  --allow-unauthenticated \
  --env-vars-file env.yml

在 ChatGPT 的自定义 GPT 中输入

现在我们有了一个 GCP 函数,可以查询此向量搜索索引,让我们将其作为 GPT Action 放入!

请参阅关于 GPT 的文档此处和关于 GPT Actions 的文档此处。使用以下内容作为 GPT 的说明和 GPT Action 的 OpenAPI 规范。

创建 OpenAPI 规范

以下是 OpenAPI 规范示例。当我们运行下面的代码块时,应将功能规范复制到剪贴板以粘贴到 GPT Action 中。

请注意,默认情况下这没有任何身份验证,但您可以按照 GCP 的文档此处设置带有身份验证的 GCP 函数。

spec = f"""
openapi: 3.1.0
info:
  title: OpenAI API documentation search
  description: API to perform a semantic search over OpenAI APIs
  version: 1.0.0
servers:
  - url: https://{region}-{project_id}.cloudfunctions.net
    description: Main (production) server
paths:
  /openai_docs_search:
    post:
      operationId: openai_docs_search
      summary: Perform a search
      description: Returns search results for the given query parameters.
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                query:
                  type: string
                  description: The search query string
                top_k:
                  type: integer
                  description: Number of top results to return. Maximum is 3.
                category:
                  type: string
                  description: The category to filter on, on top of similarity search (used for metadata filtering). Possible values are {categories}.
      responses:
        '200':
          description: A JSON response with the search results
          content:
            application/json:
              schema:
                type: object
                properties:
                  items:
                    type: array
                    items:
                      type: object
                      properties:
                        text:
                          type: string
                          example: "Learn how to turn text into numbers, unlocking use cases like search..."
                        title:
                          type: string
                          example: "embeddings.txt"
                        distance:
                          type: number
                          format: float
                          example: 0.484939891778730
                        category:
                          type: string
                          example: "models"
"""
print(spec)
pyperclip.copy(spec)
print("OpenAPI spec copied to clipboard")

创建 GPT 指令

随意修改指令以适合您的需求。请查看我们的文档此处,以获取有关提示工程的一些技巧。

instructions = f'''
You are an OpenAI docs assistant. You have an action in your knowledge base where you can make a POST request to search for information. The POST request should always include: {{
    "query": "<user_query>",
    "k_": <integer>,
    "category": <string, but optional>
}}. Your goal is to assist users by performing searches using this POST request and providing them with relevant information based on the query.

You must only include knowledge you get from your action in your response.
The category must be from the following list: {categories}, which you should determine based on the user's query. If you cannot determine, then do not include the category in the POST request.
'''
pyperclip.copy(instructions)
print("GPT Instructions copied to clipboard")
print(instructions)

回顾

我们现在已成功地将 GCP BigQuery 向量搜索与 ChatGPT 中的 GPT Actions 集成,方法如下:

  1. 使用 OpenAI 的嵌入嵌入文档,同时使用 gpt-4o 添加一些额外的元数据。
  2. 将该数据上传到 GCP BigQuery(原始数据和嵌入向量)
  3. 在 GCP Functions 上创建了一个端点来检索这些数据
  4. 将其合并到自定义 GPT 中。

我们的 GPT 现在可以检索信息以帮助回答用户查询,使其更准确并根据我们的数据进行定制。以下是 GPT 的实际应用

gcp-rag-quickstart-gpt.png