使用 Ray Data 实现可扩展的 RAG 数据摄取和分页#

在我们之前的教程中,我们概述了 RAG 文档摄取的标准流程。

本教程将深入探讨如何利用 Ray 对文档进行高效处理、分块、嵌入和存储到向量数据库中,从而实现快速的嵌入相似性搜索。

Ray 能够并发处理大批量文件,是规模化管理海量非结构化数据的理想解决方案。

这是架构图

https://raw.githubusercontent.com/ray-project/ray/refs/heads/master/doc/source/ray-overview/examples/e2e-rag/images/rag-data-ingestion-with-ray-data.png
特定于 Anyscale 的配置

注意:本教程针对 Anyscale 平台进行了优化。在开源 Ray 上运行时,需要额外的配置。例如,您需要手动

从 S3 加载文档#

在此步骤中,我们从 AWS S3 存储桶中检索 100 个文档。我们利用 Ray Data,它能简化大规模数据集的高效处理。

我们使用函数 ray.data.read_binary_files 将 S3 存储桶中的所有文件读取为原始二进制数据。

此外,设置 include_paths=True 可以让我们跟踪每个文件的路径(例如,your-bucket-name/folder/example.pdf),这在后续步骤中非常有用。

import ray
import io
from pathlib import Path
from typing import Dict, List
from unstructured.partition.auto import partition
import chromadb
from sentence_transformers import SentenceTransformer
import torch
import numpy as np
import uuid
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    NLTKTextSplitter,
)

SOURCE_DIRECTORY_S3="s3://anyscale-rag-application/100-docs/"
ds = ray.data.read_binary_files(SOURCE_DIRECTORY_S3, include_paths=True, concurrency=5)
ds.schema()
2025-04-28 14:02:42,918	INFO worker.py:1660 -- Connecting to existing Ray cluster at address: 10.0.41.124:6379...
2025-04-28 14:02:42,928	INFO worker.py:1843 -- Connected to Ray cluster. View the dashboard at https://session-xl5p5c8v2puhejgj5rjjn1g6ht.i.anyscaleuserdata.com 
2025-04-28 14:02:42,942	INFO packaging.py:367 -- Pushing file package 'gcs://_ray_pkg_72f4a5291407a334fea98932ca96e79412931b97.zip' (4.94MiB) to Ray cluster...
2025-04-28 14:02:42,961	INFO packaging.py:380 -- Successfully pushed file package 'gcs://_ray_pkg_72f4a5291407a334fea98932ca96e79412931b97.zip'.
2025-04-28 14:02:43,060	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-04-28_12-58-27_336835_47308/logs/ray-data
2025-04-28 14:02:43,061	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles]
Column  Type
------  ----
bytes   binary
path    string

可扩展的文档处理和分页#

从 S3 检索文件后,下一步是从每页提取文本。

S3 存储桶包含 100 个文档,总计超过 6000 页。其中大部分是来自此 github 仓库的示例 PDF:https://github.com/tpn/pdfs。这些 PDF 包括基于文本和基于图像的格式。此外,还有五个与 Anyscale 作业相关的文档,格式多样,包括 PDF、PPTX、HTML、TXT 和 DOCX

注意:一些文档非常大,包含超过 900 页。

我们定义了一个函数,该函数使用 Unstructured 库进行解析,以加载和分区这些文档。有关更多详细信息,请参阅 Unstructured 文档:https://docs.unstructured.io/welcome

对于基于图像的 PDF,Unstructured 库使用 Tesseract OCR 引擎来处理和提取文本。

每个文档都从指定目录读取,并逐页处理。每页的文本与其相应的元数据一起分组,例如源文件、文件类型、页码和唯一文档 ID。

以下是执行此任务的函数

from pathlib import Path
import io
import uuid

def process_file(record: dict) -> dict:
    file_path = Path(record["path"])

    # Only process the following file extensions
    supported_extensions = {".pdf", ".docx", ".pptx", ".ppt", ".html", ".txt"}
    
    if file_path.suffix.lower() not in supported_extensions:
        # Not a supported file
        return {"pages": []}
    
    print(f"Processing file: {file_path}")
    
    try:
        with io.BytesIO(record["bytes"]) as stream:
            elements = partition(file=stream)  # This call may fail on some files
            
            # Generate a unique doc_id for this file
            doc_id = str(uuid.uuid4())

            # Group text by page
            page_texts = {}
            for el in elements:
                page_number = getattr(el.metadata, "page_number", 1) or 1
                if page_number not in page_texts:
                    page_texts[page_number] = []
                page_texts[page_number].append(str(el))

            # Combine text for each page
            pages = []
            for page_number, texts in page_texts.items():
                combined_text = " ".join(texts)
                pages.append({
                    "text": combined_text.strip(),
                    "source": str(file_path),
                    "page_number": page_number,
                    "doc_id": doc_id
                })

            return {"pages": pages}

    except Exception as e:
        # Handle files that cause errors during parsing
        print(f"Cannot process file {file_path}: {e}")
        return {"pages": []}


# Assuming `ds` is your dataset, apply the function
ds = ds.map(process_file, concurrency=8, num_cpus=1)
ds = ds.flat_map(lambda x: x["pages"])

print(ds)
2025-04-28 14:02:54,705	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-04-28_12-58-27_336835_47308/logs/ray-data
2025-04-28 14:02:54,706	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ListFiles] -> TaskPoolMapOperator[ReadFiles]
FlatMap(<lambda>)
+- Map(process_file)
   +- Dataset(num_rows=?, schema={bytes: binary, path: string})

文本处理流程#

此流程简化了从原始文本到结构化嵌入的转换

  • 分块:将文本分解成更小的片段。

  • 嵌入:将文本块转换为数值向量。

  • 存储:将向量和元数据保存到 Chroma DB。

为了使用 Ray Data 构建文本处理流程,我们定义了三个类:ChunkerEmbedderChromaWriter

然后,我们利用 Ray Data 操作,如 map / flat_mapmap_batches,无缝集成这些步骤。

有关更多详细信息,请查看:https://docs.rayai.org.cn/en/latest/data/api/doc/ray.data.Dataset.map_batches.html。

分块器#

Chunker 类负责使用多种方法之一将文档文本拆分成更小的块。根据指定的方法(固定或递归),它使用 LangChain TextSplitter 中的相应拆分器。每个块都有自己的唯一 chunk_id,同时保留原始的 doc_id、page_number 等。

class Chunker:
    def __init__(self, method: str = "recursive", chunk_size: int = 2048, chunk_overlap: int = 200):
        self.method = method
        if self.method == "fixed":
            splitter = CharacterTextSplitter(
                chunk_size=chunk_size, chunk_overlap=chunk_overlap ## here, the chunk size is number of chars, not tokens
            )
        elif self.method == "recursive":
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=chunk_size, chunk_overlap=chunk_overlap ## here, the chunk size here is number of chars, not tokens
            )
        else:
            raise ValueError("Invalid chunking method")
        self.splitter = splitter

    def __call__(self, page: Dict) -> List[Dict]:
        chunks = []
        texts = self.splitter.split_text(page["text"])
        for chunk_index, text in enumerate(texts):
            chunks.append({
                "text": text,
                "source": page["source"],
                "page_number": page.get("page_number", 1),
                "chunk_id": str(uuid.uuid4()),
                "doc_id": page["doc_id"]  
            })
        return chunks

嵌入器#

Embedder 使用 SentenceTransformer 模型将文本块转换为数值嵌入。

此类初始化 SentenceTransformer 模型(如果可用,则使用 CUDA),并定义一个可调用对象,该对象接收文本块的批次,将其编码为嵌入,并返回一个包含嵌入和元数据的字典。

class Embedder:
    def __init__(self, model_name: str = "intfloat/multilingual-e5-large-instruct"):
        self.model_name = model_name
        self.model = SentenceTransformer(
            self.model_name,
            device="cuda" if torch.cuda.is_available() else "cpu"
        )

    def __call__(self, batch: Dict) -> Dict:
        # Generate embeddings for the batch of text chunks
        embeddings = self.model.encode(batch["text"], convert_to_numpy=True)

        return {
            "embeddings": embeddings,
            "text": batch["text"],
            "source": batch["source"],
            "doc_id": batch["doc_id"],
            "page_number": batch["page_number"],
            "chunk_id": batch["chunk_id"],
        }

ChromaWriter#

ChromaWriter 负责将嵌入的向量以及元数据写入 Chroma 向量存储。

我们实现了两个特殊方法,__getstate____setstate__,它们是 Python 序列化协议中的特殊钩子。

  • __getstate__:通过移除无法序列化的属性来准备对象进行序列化,确保只保存必要的状态。

  • __setstate__:在反序列化后通过恢复其状态并重新初始化不可序列化组件来重建对象,以确保对象保持完全功能。

这两个函数将防止在使用 Ray Data 的批处理处理期间使用 map_batches 时出现类似 TypeError: cannot pickle 'weakref.ReferenceType' object 的错误。

class ChromaWriter:
    def __init__(self, collection_name: str, chroma_path: str):
        self.collection_name = collection_name
        self.chroma_path = chroma_path
        self._init_chroma_client()

    def _init_chroma_client(self):
        self.chroma_client = chromadb.PersistentClient(path=self.chroma_path)
        self.collection = self.chroma_client.get_or_create_collection(
            name=self.collection_name,
            metadata={"hnsw:space": "cosine"}
        )

    def __getstate__(self):
        """
        Exclude the unpickleable chroma_client and collection from the state.
        """
        state = self.__dict__.copy()
        state.pop("chroma_client", None)
        state.pop("collection", None)
        return state

    def __setstate__(self, state):
        """
        Restore the state and reinitialize the chroma client and collection.
        """
        self.__dict__.update(state)
        self._init_chroma_client()

    def __call__(self, batch: dict) -> dict:
        """
        Process a batch of documents by adding them to the Chroma collection.
        """
        # Prepare metadata for each entry in the batch
        metadatas = []
        for i in range(len(batch["chunk_id"])):
            metadata = {
                "source": batch["source"][i],
                "doc_id": batch["doc_id"][i],
                "page_number": int(batch["page_number"][i]),
                "chunk_id": batch["chunk_id"][i]
            }
            metadatas.append(metadata)

        embeddings = batch["embeddings"].tolist()
        documents = [text for text in batch["text"]]
        ids = [id for id in batch["chunk_id"]]

        # Add the embeddings, documents, ids, and metadata to the collection
        self.collection.add(
            embeddings=embeddings,
            documents=documents,
            ids=ids,
            metadatas=metadatas
        )
        return {}

设置文本处理流程#

最后,定义数据源和向量存储配置的常量。然后,构建 Ray 流程,该流程

  • 分块文本。

  • 生成嵌入。

  • 将结果写入 Chroma DB。

# Constants for your data sources and vector store configuration
EMBEDDER_MODEL = "intfloat/multilingual-e5-large-instruct"
CHROMA_PATH = "/mnt/cluster_storage/vector_store"
CHROMA_COLLECTION_NAME = "anyscale_jobs_docs_embeddings"

# Build the processing pipeline using Ray Data API
processed_ds = (
    ds.flat_map(
        Chunker,
        fn_constructor_kwargs={"method": "recursive"},
        concurrency=5,
        num_cpus=1
    )
    .map_batches(
        Embedder,
        fn_constructor_kwargs={"model_name": EMBEDDER_MODEL},
        batch_size=800,
        concurrency=1,
        num_gpus=1
    )
    .map_batches(
        ChromaWriter,
        batch_size=500,
        concurrency=1,
        num_cpus=1,
        fn_constructor_kwargs={
            "collection_name": CHROMA_COLLECTION_NAME,
            "chroma_path": CHROMA_PATH
        }
    )
)

执行整个文本处理流程#

运行流程以处理所有文档

# Execute pipeline
processed_ds.take_all()
print("Data ingestion completed successfully!")

验证和搜索数据#

检查已存储的嵌入#

处理完成后,您可以验证 Chroma DB 中存储了多少向量。

import chromadb
CHROMA_PATH = "/mnt/cluster_storage/vector_store"
CHROMA_COLLECTION_NAME = "anyscale_jobs_docs_embeddings"

# Initialize the Chroma client and retrieve (or create) your collection
chroma_client = chromadb.PersistentClient(path=CHROMA_PATH)
collection = chroma_client.get_or_create_collection(name=CHROMA_COLLECTION_NAME)

# Show how many vectors are stored in the collection.
vector_count = collection.count()
print("Total number of vectors in the collection:", vector_count)
Total number of vectors in the collection: 9504

您还可以检查存储使用情况。

!ls -lh /mnt/cluster_storage/vector_store
total 106M
drwxr-xr-x 2 ray users 6.0K Apr 28 14:06 05e4ca13-4b3c-4db8-a68e-945320c94ef8
-rw-r--r-- 1 ray users 106M Apr 28 14:15 chroma.sqlite3

重新格式化搜索结果#

之前 Chroma 的查询结果难以阅读。在此,我们改进格式以提高可读性,使结果更加用户友好。

from pprint import pprint
def reformat(chroma_results: dict) -> list:
    """
    Reformat chroma db results to a list of search items containing:
    - chunk_id
    - chunk_index
    - doc_id
    - page_number
    - source
    - text (from documents)
    - distance
    - score

    Parameters:
        chroma_results (dict): The raw results from the Chroma DB query.

    Returns:
        list: A list of dictionaries with the desired keys.
    """
    reformatted = []
    
    # Get the lists from the results. They are expected to be lists of lists.
    metadatas = chroma_results.get("metadatas", [])
    documents = chroma_results.get("documents", [])
    distances = chroma_results.get("distances", [])
    
    # Loop over each group (each inner list represents one set of matches)
    chunk_index = 1
    for meta_group, doc_group, distance_group in zip(metadatas, documents, distances):
        # Iterate over each item in the inner lists
        for meta, text, distance in zip(meta_group, doc_group, distance_group):
            item = {
                "chunk_index": chunk_index,
                "chunk_id": meta.get("chunk_id"),
                "doc_id": meta.get("doc_id"),
                "page_number": meta.get("page_number"),
                "source": meta.get("source"),
                "text": text,
                "distance": distance,
                "score": 1 - distance
            }
            reformatted.append(item)
            chunk_index += 1
    
    return reformatted


print("refromat Results:")
pprint(reformat(results))
refromat Results:
[{'chunk_id': 'd313cd7a-2835-49e4-80de-666a3e2f98df',
  'chunk_index': 1,
  'distance': 0.18700510263442993,
  'doc_id': 'b62b2ee6-f4f8-4d1c-a63f-4dc7415f9f69',
  'page_number': 1,
  'score': 0.8129948973655701,
  'source': 'anyscale-rag-application/100-docs/Job_schedules.html',
  'text': 'Create and manage jobs Submitting a job\u200b To submit your job to '
          'Anyscale, use the Python SDK or CLI and pass in any additional '
          'options or configurations for the job. By default, Anyscale uses '
          'your workspace or cloud to provision a cluster to run your job. You '
          'can define a custom cluster through a compute config or specify an '
          'existing cluster. Once submitted, Anyscale runs the job as '
          'specified in the entrypoint command, which is typically a Ray Job. '
          "If the run doesn't succeed, the job restarts using the same "
          'entrypoint up to the number of max_retries. CLI Python SDK anyscale '
          'job submit --name=my-job \\\n'
          '  --working-dir=. --max-retries=5 \\\n'
          '  --image-uri="anyscale/image/IMAGE_NAME:VERSION" \\\n'
          '  --compute-config=COMPUTE_CONFIG_NAME \\\n'
          '  -- python main.py With the CLI, you can either specify an '
          'existing compute config with --compute-config=COMPUTE_CONFIG_NAME '
          'or define a new one in a job YAML. For more information on '
          'submitting jobs with the CLI, see the reference docs. import '
          'anyscale\n'
          'from anyscale.job.models import JobConfig\n'
          '\n'
          'config = JobConfig(\n'
          '  name="my-job",\n'
          '  entrypoint="python main.py",\n'
          '  working_dir=".",\n'
          '  max_retries=5,\n'
          '  image_uri="anyscale/image/IMAGE_NAME:VERSION",\n'
          '  compute_config="COMPUTE_CONFIG_NAME"\n'
          ')'},
 {'chunk_id': '6f1ad3db-92cc-4582-b800-3ed44b5b99f7',
  'chunk_index': 2,
  'distance': 0.18982568383216858,
  'doc_id': '81b7cde5-74f1-4bd6-8397-352b62d22cec',
  'page_number': 1,
  'score': 0.8101743161678314,
  'source': 'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf',
  'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Create and '
          'manage jobs Submitting a job To submit your job to Anyscale, use '
          'the Python SDK or CLI and pass in any additional options or '
          'configurations for the job. By default, Anyscale uses your '
          'workspace or cloud to provision a cluster to run your job. You can '
          'define a custom cluster through a compute config or specify an '
          'existing cluster. Once submitted, Anyscale runs the job as '
          'specified in the entrypoint command, which is typically a Ray Job. '
          "If the run doesn't succeed, the job restarts using the same "
          'entrypoint up to the number of max_retries . CLI Python SDK '
          'anyscale job submit --name=my-job \\ --working-dir=. '
          '--max-retries=5 \\ --image-uri="anyscale/image/IMAGE_NAME:VERSION" '
          '\\ --compute-config=COMPUTE_CONFIG_NAME \\ -- python main.py With '
          'the CLI, you can either specify an existing compute config with '
          '--compute- config=COMPUTE_CONFIG_NAME or define a new one in a job '
          'YAML. For more information on submitting jobs with the CLI, see the '
          'reference docs. TIP For large-scale, compute-intensive jobs, avoid '
          'scheduling Ray tasks onto the head node because it manages '
          'cluster-level orchestration. To do that, set the CPU resource on '
          'the head node to 0 in your compute config. Ask AI '
          'https://docs.anyscale.com/platform/jobs/manage-jobs 1/5'},
 {'chunk_id': '5cbd13dd-45af-4dd4-a8e4-2bd1ef12ec33',
  'chunk_index': 3,
  'distance': 0.19173364341259003,
  'doc_id': '618d723d-f9e5-4ec4-ab54-238ebe74c692',
  'page_number': 1,
  'score': 0.80826635658741,
  'source': 'anyscale-rag-application/100-docs/Jobs.txt',
  'text': '2/12/25, 9:48 AM Jobs | Anyscale Docs Jobs Run discrete workloads '
          'in production such as batch inference, bulk embeddings generation, '
          'or model fine-tuning. Anyscale Jobs allow you to submit '
          'applications developed on workspaces to a standalone Ray cluster '
          'for execution. Built for production and designed to fit into your '
          'CI/CD pipeline, jobs ensure scalable and reliable performance. How '
          'does it work? # When you’re ready to promote an app to production, '
          'submit a job from the workspace using anyscale job submit . '
          'Anyscale Jobs have the following features: Scalability: Rapid '
          'scaling to thousands of cloud instances, adjusting computing '
          'resources to match application demand. Fault tolerance: Retries for '
          'failures and automatic rescheduling to an alternative cluster for '
          'unexpected failures like running out of memory. Monitoring and '
          'observability: Persistent dashboards that allow you to observe '
          'tasks in real time and email alerts upon successf ul job '
          'completion. Get started 1. Sign in or sign up for an account. 2. '
          'Select the Intro to Jobs example. 3. Select Launch. This example '
          'runs in a Workspace. See Workspaces for background information. 4. '
          'Follow the notebook or view it in the docs. 5. Terminate the '
          "Workspace when you're done. Ask AI "
          'https://docs.anyscale.com/platform/jobs/ 1/2 2/12/25, 9:48 AM Jobs '
          '| Anyscale Docs https://docs.anyscale.com/platform/jobs/ 2/2'}]
(autoscaler +16m48s) Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.
(autoscaler +16m48s) [autoscaler] [4xA10G:48CPU-192GB] Attempting to add 1 node(s) to the cluster (increasing from 0 to 1).
(autoscaler +16m53s) [autoscaler] [4xA10G:48CPU-192GB] Launched 1 instances.
(autoscaler +17m53s) [autoscaler] Cluster upscaled to {60 CPU, 5 GPU}.
(autoscaler +21m33s) [autoscaler] Downscaling node i-0cb70f496c5966174 (node IP: 10.0.53.5) due to node idle termination.
(autoscaler +21m33s) [autoscaler] Cluster resized to {12 CPU, 1 GPU}.
(autoscaler +21m38s) [autoscaler] Cluster upscaled to {60 CPU, 5 GPU}.
(autoscaler +34m3s) [autoscaler] Downscaling node i-0cb70f496c5966174 (node IP: 10.0.53.5) due to node idle termination.
(autoscaler +34m3s) [autoscaler] Cluster resized to {12 CPU, 1 GPU}.
(autoscaler +37m8s) [autoscaler] [4xA10G:48CPU-192GB] Attempting to add 1 node(s) to the cluster (increasing from 0 to 1).
(autoscaler +37m13s) [autoscaler] [4xA10G:48CPU-192GB] Launched 1 instances.
(autoscaler +38m8s) [autoscaler] Cluster upscaled to {60 CPU, 5 GPU}.
(autoscaler +52m28s) [autoscaler] Downscaling node i-0fa50a0e4f892e7be (node IP: 10.0.10.57) due to node idle termination.
(autoscaler +52m28s) [autoscaler] Cluster resized to {12 CPU, 1 GPU}.
(autoscaler +55m28s) [autoscaler] Downscaling node i-027d8d6d4c4471230 (node IP: 10.0.37.194) due to node idle termination.
(autoscaler +55m28s) [autoscaler] Cluster resized to {8 CPU, 0 GPU}.

观察结果#

通过向量搜索检索到的文本块与用户的查询“如何提交 Anyscale 作业?”非常吻合。此外,前三个块显示出相对较高的相似度得分。考虑到我们已经摄取了 100 个文档——超过 6000 页——其中大部分内容无关紧要,这一点令人印象深刻。尽管内容量很大,系统仍然成功检索到了相关信息。

总结#

使用 Ray Data,您可以并行摄取和处理大量文档。本教程演示了如何

  • 从 S3 检索文档

  • 使用 Ray 提取和分区文本

  • 将文本转换为嵌入并存储在向量数据库中

  • 对文档数据执行快速相似性搜索

这个可扩展的流程非常适合管理非结构化数据并实现快速、准确的文档检索。