使用 Ray Data 进行非结构化数据摄取和处理#

   

完成时间:35 分钟 | 难度:中级 | 先决条件:数据工程经验、文档处理、基本自然语言处理 (NLP) 知识

使用 Ray Data 的分布式处理能力,为企业数据仓库工作流构建一个全面的文档摄取管道,将数据湖中的非结构化文档转换为结构化的、可用于分析的数据集。

目录#

  1. 数据湖文档发现 (8 分钟)

  2. 文档处理和分类 (10 分钟)

  3. 文本提取和丰富 (8 分钟)

  4. 数据仓库输出 (6 分钟)

  5. 验证和总结 (3 分钟)

学习目标#

为什么非结构化数据摄取很重要:企业数据湖包含大量的非结构化文档(PDF、Word 文档、演示文稿、报告),需要系统地处理以提取业务价值,用于分析和报告。

Ray Data 的摄取能力:将文档处理分布在集群上,以处理大规模文档集合,提取结构化数据,并为数据仓库消耗准备可用于分析的数据集。

数据湖到数据仓库的模式:数据工程团队用于系统地处理文档集合、提取结构化信息并创建可用于商业智能的查询数据集的技术。

生产摄取工作流:可扩展的文档处理模式,处理各种文件格式,提取元数据,并为下游分析系统创建结构化模式。

概述#

挑战:企业数据湖跨多种格式包含数百万份非结构化文档(PDF、Word 文档、演示文稿),需要系统地处理以提取业务价值。传统的文档处理方法在处理以下方面存在困难:

  • 规模:单机处理限制了文档量

  • 一致性:手动提取会导致模式不一致

  • 集成:用于分析的复杂基础设施

  • 数据仓库集成:手动数据建模和 ETL 过程

  • 不断增长的数据量:非结构化数据正在快速增长,其数据量通常是以 10 倍的速度超过结构化数据。

解决方案:Ray Data 支持端到端的文档摄取管道,具有原生的分布式操作,可以高效地处理数百万份文档。

  • 规模:利用流式执行和大规模处理能力来处理 PB 甚至 EB 级的数据。

  • 一致性:一个灵活的 API,通过 map 支持质量检查,并支持任何类型的 PyArrow schema。读取的所有数据也必须是一致的,否则管道会失败,并提供额外的配置选项来控制此行为。

  • 集成:支持与所有数据类型以及任何 AI 类型集成,在云和本地的 CPU/GPU/加速器上运行

  • 数据仓库集成:预先构建了与流行数据仓库的连接器,并且能够轻松构建自定义连接器。

  • 不断增长的数据量:支持单节点优化到跨 10k+ 节点的扩展

先决条件清单#

开始之前,请确保您已

  • [ ] 理解数据湖和数据仓库的概念

  • [ ] 具有文档处理和文本提取的经验

  • [ ] 了解结构化数据格式(Parquet、Delta Lake、Iceberg)

  • [ ] 拥有一个包含 Ray Data 和文档处理库的 Python 环境

  • [ ] 访问 S3 或其他云存储作为文档源

安装系统依赖项#

安装以下系统依赖项。请确保将它们包含在您的 worker 镜像中。

sudo apt-get update && \
    sudo apt-get install -y libgl1-mesa-glx libmagic1 poppler-utils tesseract-ocr libreoffice && \
    sudo rm -rf /var/lib/apt/lists/*

安装 Python 依赖项#

安装所需的 Python 包(这些也可以包含在您的镜像中)。

pip install --force-reinstall --no-cache-dir "unstructured[all-docs]==0.18.21" "pandas==2.3.3"

设置和初始化 Ray Data

import json 
import uuid  
from datetime import datetime  
from pathlib import Path 
from typing import Any, Dict, List  #

import numpy as np 
import pandas as pd 

import ray 
from ray.data.aggregate import Count, Max, Mean, Sum 
from ray.data.expressions import col, lit 

ctx = ray.data.DataContext.get_current()

# Disable progress bars for cleaner output in production
# You can enable these for debugging: set to True to see progress
ctx.enable_progress_bars = False
ctx.enable_operator_progress_bars = False

# Use runtime env to install pip dependencies across all workers
# You can skip this if your custom image already has these dependencies installed
runtime_env = dict(
    pip= {
        "packages": ["unstructured[all-docs]==0.18.21", "pandas==2.3.3"],
        "pip_install_options": ["--force-reinstall", "--no-cache-dir"]
    }
)
# Initialize Ray cluster connection to set the Ray Data context
ray.init(ignore_reinit_error=True, runtime_env= runtime_env)

第一步:数据湖文档发现#

发现数据湖中的文档集合#

# READ DOCUMENTS FROM DATA LAKE (S3)

# Use Ray Data's read_binary_files() to load documents from S3
# Why read_binary_files()?
#   - Reads files as raw bytes (works for PDFs, DOCX, images, etc.)
#   - Distributes file reading across cluster workers
#   - include_paths=True gives us the file path for each document
#
# Parameters explained:
#   - S3 path: Location of document collection in data lake
#   - include_paths=True: Adds 'path' column with file location
#   - ray_remote_args: Resource allocation per task
#     * num_cpus=0.025: Very low CPU since this is I/O-bound (reading files)
#     * This allows many concurrent reads without CPU bottleneck
#   - limit(100): Process only 100 documents for this demo
#     * Remove this limit to process entire collection

document_collection = ray.data.read_binary_files(
    "s3://anyscale-rag-application/1000-docs/",
    include_paths=True,
    ray_remote_args={"num_cpus": 0.025}
).limit(100)

# Display the schema to understand our data structure
# At this point, we have: 'bytes' (file content) and 'path' (file location)
print(f"Dataset schema: {document_collection.schema()}")

文档元数据提取#

# TEXT EXTRACTION FUNCTION
# This function extracts text from various document formats (PDF, DOCX, etc.)
# It processes one document at a time (distirbuted by Ray) and returns structured metadata + text


def process_file(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Extract text content from document files using the Unstructured library.
    
    BEGINNER NOTE:
    - Input: A dictionary (record) with 'bytes' (file content) and 'path' (file location)
    - Output: A dictionary with extracted text + metadata
    - This function runs on each worker node in parallel
    
    Why extract text immediately?
    - Avoids passing large binary data through multiple operations
    - Reduces memory usage in downstream processing
    - Enables faster processing by dropping binary data early
    """
    # Import libraries inside function so each Ray worker has access
    import io
    from pathlib import Path
    
    from unstructured.partition.auto import partition
    
    # Extract file metadata from the input record
    file_path = Path(record["path"])  # Convert path string to Path object
    file_bytes = record["bytes"]  # Raw file content (binary data)
    file_size = len(file_bytes)  # Size in bytes
    file_extension = file_path.suffix.lower()  # Get extension (.pdf, .docx, etc.)
    file_name = file_path.name  # Just the filename (not full path)
    
    # We can only extract text from certain file types
    # If unsupported, return metadata with empty text
    supported_extensions = {".pdf", ".docx", ".doc", ".pptx", ".ppt", ".html", ".txt"}
    
    if file_extension not in supported_extensions:
        # Return a record with metadata but no extracted text
        return {
            "document_id": str(uuid.uuid4()),  # Generate unique ID
            "file_path": str(file_path),
            "file_name": file_name,
            "file_extension": file_extension,
            "file_size_bytes": file_size,
            "file_size_mb": round(file_size / (1024 * 1024), 2),  # Convert to MB
            "discovery_timestamp": datetime.now().isoformat(),
            "extracted_text": "",  # Empty - unsupported format
            "text_length": 0,
            "word_count": 0,
            "extraction_status": "unsupported_format"
        }
    
    # Extract text using the Unstructured library
    try:
        # Create an in-memory file stream from bytes (avoids writing to disk)
        with io.BytesIO(file_bytes) as stream:
            # partition() automatically detects format and extracts text
            # It returns a list of text elements (paragraphs, tables, etc.)
            elements = partition(file=stream)
            
            # Combine all extracted text elements into one string
            extracted_text = " ".join([str(el) for el in elements]).strip()
            
            # Calculate text statistics for quality assessment
            text_length = len(extracted_text)  # Total characters
            word_count = len(extracted_text.split()) if extracted_text else 0
            extraction_status = "success"
            
    except Exception as e:
        # If extraction fails (corrupted file, unsupported format, etc.)
        # Log the error and continue processing other files
        print(f"Cannot process file {file_path}: {e}")
        extracted_text = ""
        text_length = 0
        word_count = 0
        extraction_status = f"error: {str(e)[:100]}"  # Store error message (truncated)
    
    # Return record with all metadata and extracted text
    return {
        "document_id": str(uuid.uuid4()),  # Unique identifier for this document
        "file_path": str(file_path),
        "file_name": file_name,
        "file_extension": file_extension,
        "file_size_bytes": file_size,
        "file_size_mb": round(file_size / (1024 * 1024), 2),
        "discovery_timestamp": datetime.now().isoformat(),
        "extracted_text": extracted_text,  # The actual text content
        "text_length": text_length,
        "word_count": word_count,
        "extraction_status": extraction_status
    }


# Ray Data's map() applies process_file() to each document in parallel
# This is the "embarrassingly parallel" pattern - each document is independent
print("Extracting text from documents...")

# Why map() instead of map_batches()?
#   - map(): Process one record at a time (good for variable-size documents)
#   - map_batches(): Process records in batches (better for vectorized operations)
#   - Text extraction is I/O-bound and document-specific, so map() is ideal

# Parameters:
#   - process_file: The function to apply to each record

documents_with_text = document_collection.map(
    process_file
)
# Convert a sample of documents to pandas DataFrame for easy viewing
documents_with_text.limit(25).to_pandas()
# BUSINESS METADATA ENRICHMENT FUNCTION (Modern Approach)
# Instead of simple string matching, apply basic NLP for categorization.
# For high-quality production, you'd use an ML or LLM model endpoint here.

from transformers import pipeline

# For demonstration: Use a small zero-shot classification model.
# In a real pipeline, you should call a production LLM/ML endpoint or use a domain-specific model.
# The 'facebook/bart-large-mnli' model works for zero-shot/label classification tasks.
# You may swap with "typeform/distilbert-base-uncased-mnli" or another small MNLI model for lighter resource use.
# If working at scale or with large docs, consider using Ray Serve + LLM API instead.

zero_shot_classifier = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli"  # Or another MNLI model if needed
)

# Define candidate classes (map to business categories)
CANDIDATE_LABELS = [
    "financial document",  # maps to 'finance'
    "legal document",      # maps to 'legal'
    "regulatory document", # maps to 'compliance'
    "client document",     # maps to 'client_services'
    "research document",   # maps to 'research'
    "general document"     # maps to 'general'
]

BUSINESS_CATEGORY_MAPPING = {
    "financial document":    ("financial_document", "finance"),
    "legal document":        ("legal_document", "legal"),
    "regulatory document":   ("regulatory_document", "compliance"),
    "client document":       ("client_document", "client_services"),
    "research document":     ("research_document", "research"),
    "general document":      ("general_document", "general"),
}


def enrich_business_metadata(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Uses zero-shot text classification to predict business category and assign processing priority.
    For production: Replace this with a call to your domain-tuned classifier or LLM endpoint.
    """

    file_size = record["file_size_bytes"]
    text = record.get("extracted_text", "") or ""
    filename = record.get("file_name", "")

    # Concatenate extracted text with filename for context, up to limit (to save inference cost)
    context_text = (filename + "\n\n" + text[:1000]).strip()  # Truncate to first 1000 chars for speed

    # Run zero-shot classification (useful even with short context)
    result = zero_shot_classifier(context_text, CANDIDATE_LABELS, multi_label=False)
    top_label = result["labels"][0] if result and "labels" in result and result["labels"] else "general document"

    doc_type, business_category = BUSINESS_CATEGORY_MAPPING.get(top_label, ("general_document", "general"))

    # Priority assignment using simple logic + NLP keyword search (feel free to LLM this too)
    lower_context = context_text.lower()
    if any(w in lower_context for w in ["urgent", "critical", "deadline"]):
        priority = "high"
        priority_score = 3
    elif any(w in lower_context for w in ["important", "quarterly", "annual"]):
        priority = "medium"
        priority_score = 2
    else:
        priority = "low"
        priority_score = 1
    
    return {
        **record,
        "document_type": doc_type,
        "business_category": business_category,
        "processing_priority": priority,
        "priority_score": priority_score,
        "estimated_pages": max(1, file_size // 50000),
        "processing_status": "classified"
    }


# Apply business metadata enrichment to all documents
print("Enriching with business metadata (using zero-shot NLP)...")

# Note: zero-shot and LLM models can be heavy;
# For fast local testing, use a smaller model, or replace with a production endpoint.

documents_with_metadata = documents_with_text.map(
    enrich_business_metadata
)


# View a few documents with business classification added
documents_with_metadata.limit(5).to_pandas()
# WHY AGGREGATIONS?
# Before writing to warehouse, understand what we're processing:
# - How many documents of each type?
# - What's the size distribution?
# - Which categories have the most content?

# AGGREGATION 1: Document type distribution
# Group by document_type and calculate statistics

doc_type_stats = documents_with_metadata.groupby("document_type").aggregate(
    Count(),  # How many documents of each type?
    Sum("file_size_bytes"),  # Total size per document type
    Mean("file_size_mb"),  # Average size per document type
    Max("estimated_pages")  # Largest document per type
)

# AGGREGATION 2: Business category analysis
# Understand the distribution across business categories
# This helps with warehouse partitioning strategy

category_stats = documents_with_metadata.groupby("business_category").aggregate(
    Count(),  # How many per category?
    Mean("priority_score"),  # Average priority per category
    Sum("file_size_mb")  # Total data volume per category
)

第二步:文档处理和分类#

文本提取和质量评估#

# QUALITY ASSESSMENT FUNCTION
# Evaluate document quality to filter out low-quality or problematic documents

def assess_document_quality(batch: pd.DataFrame) -> pd.DataFrame:
    """
    Assess document quality for data warehouse ingestion.
    
    BEGINNER NOTE:
    - Input: pandas DataFrame with multiple documents (a "batch")
    - Output: Same DataFrame with quality assessment columns added
    - Uses map_batches() for efficiency (process many docs at once)
    
    Why use map_batches() instead of map()?
    - Batching is more efficient for lightweight operations
    - Pandas DataFrame operations are optimized
    - Reduces overhead from function calls
    
    Explicitly use batch_format="pandas" 
    """
    quality_scores = np.zeros(len(batch), dtype=int)  # Numeric score (0-4)
    quality_ratings = []  # Text rating (high/medium/low)
    quality_issues_list = []  # List of issues found
    
    # Iterate through rows to apply business rules for quality
    # Each document gets a score from 0-4 based on quality criteria
    
    for idx, row in batch.iterrows():
        quality_score = 0
        quality_issues = []
        
        # CRITERION 1: File size check
        # Files smaller than 10KB might be empty or corrupt
        if row["file_size_mb"] > 0.01:  # More than 10KB
            quality_score += 1
        else:
            quality_issues.append("file_too_small")
        
        # CRITERION 2: Text length check
        # Documents should have meaningful text content
        if row["text_length"] > 100:  # At least 100 characters
            quality_score += 1
        else:
            quality_issues.append("insufficient_text")
        
        # CRITERION 3: Business relevance check
        # Classified documents are more valuable than unclassified
        if row["business_category"] != "general":
            quality_score += 1
        else:
            quality_issues.append("low_business_relevance")
        
        # CRITERION 4: Word count check
        # Documents should have substantial content
        if row["word_count"] > 20:  # At least 20 words
            quality_score += 1
        else:
            quality_issues.append("insufficient_content")
        
        # Score 4: All checks passed - high quality
        # Score 2-3: Some issues - medium quality
        # Score 0-1: Major issues - low quality
        quality_rating = "high" if quality_score >= 4 else "medium" if quality_score >= 2 else "low"
        
        # Store results for this document
        quality_scores[idx] = quality_score
        quality_ratings.append(quality_rating)
        quality_issues_list.append(json.dumps(quality_issues))  # Convert list to JSON string
    
    batch["quality_score"] = quality_scores
    batch["quality_rating"] = quality_ratings
    batch["quality_issues"] = quality_issues_list
    
    return batch


# Apply quality assessment to all documents

# Parameters:
#   - batch_format="pandas": Process as pandas DataFrame (easier than numpy arrays)
#   - num_cpus=0.25: Very low CPU (this is lightweight logic)
#   - batch_size=100: Process 100 documents at a time
#     * Larger batches = fewer function calls = better efficiency

quality_assessed_docs = documents_with_metadata.map_batches(
    assess_document_quality,
    batch_format="pandas",  # Ray Data pattern: explicit pandas format
    num_cpus=0.25,
    batch_size=100
)

第三步:文本分块和丰富#

# TEXT CHUNKING FUNCTION
# Split long documents into smaller chunks for downstream processing
# Why chunk? Many applications (LLMs, vector databases) have size limits

def create_text_chunks(record: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Create overlapping text chunks from each document.
    
    BEGINNER NOTE:
    - Input: ONE document record with full text
    - Output: MULTIPLE chunk records (one document → many chunks)
    - This is a "one-to-many" transformation
    
    Why chunking?
    - LLM APIs have token limits (e.g., 4096 tokens)
    - Vector databases work better with smaller chunks
    - Enables more granular search and analysis
    
    Why overlapping chunks?
    - Preserves context across chunk boundaries
    - Prevents splitting important information
    - 150 character overlap means each chunk shares text with neighbors
    """
    text = record["extracted_text"]  # The full document text
    chunk_size = 1500  # Each chunk will be ~1500 characters
    overlap = 150  # Adjacent chunks share 150 characters
    
    # Why these numbers?
    # - 1500 chars ≈ 300-400 tokens (good for most LLM APIs)
    # - 150 char overlap ≈ 10% overlap (preserves context without too much redundancy)
    # - You can adjust these based on your use case
    
    # Create chunks of data by a sliding window
    chunks = []
    start = 0  # Starting position in the text
    chunk_index = 0  # Track which chunk number this is

    # There are many more advanced chunking methods, this example uses a simple technique for demo purposes
    
    # Loop until processing all the text
    while start < len(text):
        # Calculate end position (don't go past text length)
        end = min(start + chunk_size, len(text))
        
        # Extract this chunk's text
        chunk_text = text[start:end]
        
        # Create a new record for this chunk
        # It contains all the original document metadata PLUS chunk-specific data
        chunk_record = {
            **record,  # All original fields (document_id, business_category, etc.)
            "chunk_id": str(uuid.uuid4()),  # Unique ID for this specific chunk
            "chunk_index": chunk_index,  # Position in sequence (0, 1, 2, ...)
            "chunk_text": chunk_text,  # The actual text content of this chunk
            "chunk_length": len(chunk_text),  # Characters in this chunk
            "chunk_word_count": len(chunk_text.split())  # Words in this chunk
        }
        
        chunks.append(chunk_record)
        
        # If you've reached the end of the text, you're done
        if end >= len(text):
            break
        
        # Move to next chunk position (with overlap)
        # Example: If chunk_size=1500 and overlap=150
        # Chunk 1: chars 0-1500
        # Chunk 2: chars 1350-2850 (starts 150 before chunk 1 ended)
        # Chunk 3: chars 2700-4200 (starts 150 before chunk 2 ended)
        start = end - overlap
        chunk_index += 1

    # After creating all chunks, add how many chunks the document has
    # This helps with progress tracking and completeness checks
    for chunk in chunks:
        chunk["total_chunks"] = len(chunks)
    
    # Return the list of chunk records
    # Ray Data's flat_map() automatically flattens this list
    return chunks


# Apply text chunking to all documents
# Use flat_map() for one-to-many transformations
# One document becomes multiple chunks
print("Creating text chunks...")

# Why flat_map() instead of map()?
#   - map(): One input → One output (document → document)
#   - flat_map(): One input → Many outputs (document → multiple chunks)
#   - flat_map() automatically "flattens" the list of chunks
#
# Example:
#   Input: 100 documents
#   Output: 10,000+ chunks (each document becomes ~100 chunks on average)
#
# Parameters:
#   - num_cpus=0.5: Moderate CPU usage (string slicing is lightweight)

chunked_documents = quality_assessed_docs.flat_map(
    create_text_chunks,
    num_cpus=0.5
)

第四步:数据仓库模式和输出#

创建数据仓库模式#

# DATA WAREHOUSE SCHEMA TRANSFORMATION
# Transform the raw processing data into a clean warehouse schema
# This is the "ETL" part - Extract (done), Transform (now), Load (next)

print("Creating data warehouse schema...")


# Get today's date in ISO format (YYYY-MM-DD)
# Ray Data uses this to partition the data by date in the warehouse
processing_date = datetime.now().isoformat()[:10]


# Data warehouses need clean, organized schemas
# Select only the columns needed and organize them logically
#
# Why not keep all columns?
# - Cleaner schema = easier queries
# - Less storage space
# - Better performance
# - Clear data contracts for downstream users

warehouse_dataset = chunked_documents.select_columns([
    # PRIMARY IDENTIFIERS: Keys for joining and relationships
    "document_id",  # Links all chunks from same document
    "chunk_id",     # Unique identifier for this specific chunk
    
    # DIMENSIONAL ATTRIBUTES: Categorical data for filtering/grouping
    # These are typically used in WHERE clauses and GROUP BY
    "business_category",      # finance, legal, compliance, etc.
    "document_type",          # financial_document, legal_document, etc.
    "file_extension",         # .pdf, .docx, etc.
    "quality_rating",         # high, medium, low
    "processing_priority",    # high, medium, low
    
    # FACT MEASURES: Numeric values for aggregation and analysis
    # These are typically used in SUM(), AVG(), COUNT(), etc.
    "file_size_mb",           # Document size
    "word_count",             # Total words in document
    "chunk_word_count",       # Words in this chunk
    "quality_score",          # Numeric quality (0-4)
    "priority_score",         # Numeric priority (1-3)
    "estimated_pages",        # Page count estimate
    "chunk_index",            # Position in document (0, 1, 2, ...)
    "total_chunks",           # How many chunks total
    
    # CONTENT FIELDS: The actual data payload
    "chunk_text",             # The text content (will rename this)
    "file_name",              # Original filename
    "file_path",              # S3 location
    
    # METADATA: Processing provenance and status tracking
    "discovery_timestamp",    # When was file discovered
    "extraction_status",      # success, error, unsupported_format
    "processing_status"       # classified, processed, etc.
])

# RENAME COLUMNS for data warehouse conventions
# "chunk_text" → "text_content" (more descriptive)
warehouse_dataset = warehouse_dataset.rename_columns({
    "chunk_text": "text_content"
})

# ADD PIPELINE METADATA: Constant columns for all records
# These columns are the same for every record in this run
# They help with data lineage and debugging
warehouse_dataset = (
    warehouse_dataset
    .with_column("processing_date", lit(processing_date))       # When was this processed?
    .with_column("pipeline_version", lit("1.0"))               # Which version of pipeline?
    .with_column("processing_engine", lit("ray_data"))         # What tool processed it?
)

带分区写入数据仓库#

# WRITE TO DATA WAREHOUSE - MAIN TABLE
# Save all processed chunks to Parquet format with partitioning
# This is the "Load" part of ETL

# /mnt/cluster_storage is a shared storage volume accessible by all workers
# In production, this would typically be:
# - S3: s3://your-bucket/warehouse/
# - Azure: abfs://container@account.dfs.core.windows.net/
# - GCS: gs://your-bucket/warehouse/
OUTPUT_WAREHOUSE_PATH = "/mnt/cluster_storage"

# WRITE MAIN TABLE with PARTITIONING
# write_parquet() is Ray Data's native way to save data

# Key parameters explained:

# partition_cols=["business_category", "processing_date"]
#   - Creates folder structure: business_category=finance/processing_date=2025-10-15/
#   - Enables efficient querying: "SELECT * FROM table WHERE business_category='finance'"
#   - Query engines (Spark, Presto, Athena) can skip entire partitions
#   - Example structure:
#       main_table/
#       ├── business_category=finance/
#       │   └── processing_date=2025-10-15/
#       │       ├── part-001.parquet
#       │       └── part-002.parquet
#       ├── business_category=legal/
#       │   └── processing_date=2025-10-15/
#       │       └── part-001.parquet
#       └── business_category=compliance/
#           └── processing_date=2025-10-15/
#               └── part-001.parquet

# compression="snappy"
#   - Compress files to save storage space (50-70% reduction)
#   - Snappy is fast and well-supported by all query engines
#   - Alternatives: gzip (higher compression), zstd (good balance)

# ray_remote_args={"num_cpus": 0.1}
#   - Writing to storage is I/O-bound, not CPU-bound
#   - Low CPU allocation allows more parallel writes

warehouse_dataset.write_parquet(
    f"{OUTPUT_WAREHOUSE_PATH}/main_table/",
    partition_cols=["business_category", "processing_date"],
    compression="snappy",
    ray_remote_args={"num_cpus": 0.1}
)

print("Main warehouse table written successfully")
# CREATE BUSINESS-SPECIFIC DATASETS
# Create specialized datasets for specific business teams
# Each team gets only the data they need with relevant columns


# Example: Compliance analytics dataset
# Compliance team needs: document content, quality, and priority
# Priority matters for compliance review workflows

compliance_analytics = warehouse_dataset.filter(
    lambda row: row["business_category"] == "compliance"
).select_columns([
    "document_id",          # Link to main table
    "chunk_id",             # Unique chunk identifier
    "text_content",         # The actual text
    "quality_score",        # Data reliability
    "processing_priority",  # urgent/important/normal
    "processing_date"       # When processed
])

# Write to dedicated compliance folder
compliance_analytics.write_parquet(
    f"{OUTPUT_WAREHOUSE_PATH}/analytics/compliance/",
    partition_cols=["processing_date"],
    compression="snappy",
    ray_remote_args={"num_cpus": 0.1}
)

创建分析汇总表#

# CREATE ANALYTICS SUMMARY TABLES
# Pre-compute common aggregations for fast dashboard queries
# Summary tables = faster analytics queries


# SUMMARY TABLE 1: Processing metrics by category and date

# Answer questions like:
# - How many documents processed per category per day?
# - What's the total data volume per category?
# - What's the average document quality by category?

# This summary makes dashboard queries instant instead of scanning all data

# groupby() groups data by multiple columns
# aggregate() calculates statistics for each group

processing_metrics = warehouse_dataset.groupby(["business_category", "processing_date"]).aggregate(
    Count(),                    # How many chunks per category+date?
    Sum("file_size_mb"),        # Total data volume
    Mean("word_count"),         # Average document size
    Mean("quality_score")       # Average quality
)

# Write summary table
# Smaller than main table, so queries are very fast
processing_metrics.write_parquet(
    f"{OUTPUT_WAREHOUSE_PATH}/summaries/processing_metrics/",
    partition_cols=["processing_date"],
    compression="snappy",
    ray_remote_args={"num_cpus": 0.1}
)
# SUMMARY TABLE 2: Quality distribution
# Answer questions like:
# - What percentage of documents are high/medium/low quality?
# - Which categories have the highest quality scores?
# - How does quality correlate with document size?

# This helps identify data quality issues by category

quality_distribution = warehouse_dataset.groupby(["quality_rating", "business_category"]).aggregate(
    Count(),                        # How many per quality+category?
    Mean("word_count"),             # Average document size by quality
    Mean("chunk_word_count")        # Average chunk size by quality
)

# Write quality summary
# Used for quality monitoring dashboards

quality_distribution.write_parquet(
    f"{OUTPUT_WAREHOUSE_PATH}/summaries/quality_distribution/",
    compression="snappy",
    ray_remote_args={"num_cpus": 0.1}
)

验证#

将数据写入数据仓库后,验证一切是否正常工作。本节演示了

为什么验证很重要

  • 确保管道成功写入数据

  • 验证记录计数是否符合预期

  • 确认模式是否正确

  • 提供样本数据供目视检查

验证什么

  1. 主表记录计数(应为 10,000 多个块)

  2. 汇总表存在且包含数据

  3. 模式包含所有预期的列

  4. 样本记录看起来正确

验证数据仓库输出#

# VERIFY DATA WAREHOUSE OUTPUT
# Always verify your data pipeline worked correctly
# This is a critical production practice
print("Verifying data warehouse integration...")

# Use Ray Data's read_parquet() to read what was just written
# This verifies:
# 1. Files were written successfully
# 2. Partitioning works correctly
# 3. Data can be read back (no corruption)
#
# Ray Data automatically discovers all partitions:
# - main_table/business_category=finance/processing_date=2025-10-15/*.parquet
# - main_table/business_category=legal/processing_date=2025-10-15/*.parquet
# - etc.
main_table_verify = ray.data.read_parquet(
    f"{OUTPUT_WAREHOUSE_PATH}/main_table/",
    ray_remote_args={"num_cpus": 0.025}  # Low CPU for reading
)

# Verify our aggregated metrics tables also wrote successfully
metrics_verify = ray.data.read_parquet(
    f"{OUTPUT_WAREHOUSE_PATH}/summaries/processing_metrics/",
    ray_remote_args={"num_cpus": 0.025}
)

print(f"Data warehouse verification:")
print(f"  Main table records: {main_table_verify.count():,}")
print(f"  Processing metrics: {metrics_verify.count():,}")
print(f"  Schema compatibility: Verified")
# INSPECT SAMPLE DATA

# take(10) gets first 10 records for manual inspection
# This helps catch issues like:
# - Wrong data types
# - Missing fields
# - Incorrect values
# - Encoding problems
samples = main_table_verify.take(10)

# Display key fields from each sample record
for i, record in enumerate(samples):
    # Show abbreviated document ID (first 8 characters)
    doc_id = record['document_id'][:8]
    category = record['business_category']
    words = record['word_count']
    quality = record['quality_rating']
    
    print(f"\t{i+1}. Doc: {doc_id}, Category: {category}, Words: {words}, Quality: {quality}")

总结#

您已经使用 Ray Data 构建了一个完整的端到端文档摄取管道。本节回顾了您学到的内容以及下一步该做什么。

你构建了什么#

完整的 ETL 管道:提取 → 转换 → 加载

  1. 提取:从 S3 数据湖读取 100 份文档

  2. 转换:提取文本、分类、评估质量、创建块

  3. 加载:写入带分区的数仓,包含分析表

最终输出:从原始文档到结构化数据仓库

  • 主表:10,000 多个文本块可供分析

  • 业务数据集:针对金融和合规性的特定视图

  • 汇总表:为仪表板预先计算的指标

  • 分区存储:为查询性能优化

您使用的 Ray Data 操作#

该管道演示了所有主要的 Ray Data 操作

操作

目的

何时使用

read_binary_files()

从 S3/存储加载文档

读取 PDF、图像,任何二进制文件

map()

单独转换每个记录

变长处理,I/O 密集型任务

map_batches()

批量转换记录

批处理优化操作,ML 推理

flat_map()

一对多转换

分块、拆分、展开数据

filter()

保留/删除记录

选择子集,数据质量过滤

select_columns()

选择特定字段

模式投影,减小数据大小

rename_columns()

更改列名

模式标准化

groupby().aggregate()

计算统计数据

分析,指标,摘要

write_parquet()

保存到数据仓库

最终输出,检查点

初学者关键概念#

1. 分布式处理

  • 您的代码运行在一个机器集群上(不仅仅是一台机器)

  • Ray Data 自动将工作分布到 worker 上

  • 每个函数(process_file、assess_quality)并行运行

  • 100 份文档同时处理 = 快 100 倍

2. 惰性求值

  • map()filter() 这样的操作不会立即执行

  • Ray 构建计划并进行优化

  • 执行发生在您调用 write_parquet()count()take()

  • 这使得 Ray 能够优化整个管道

3. 资源管理

  • batch_size:每个批次包含多少条记录

  • concurrency:有多少任务并行运行(高级)

  • num_cpus:每个任务有多少个 CPU 核心(高级)

  • 根据您的工作负载平衡这些参数

4. 分区策略

  • 分区 = 按列值组织的文件夹

  • partition_cols=["business_category", "processing_date"]

  • 查询引擎在过滤时会跳过整个文件夹

  • 通过减少扫描的数据量,实现高效的查询性能

应用的实现模式#

代码组织:

  • 为每个处理阶段分离函数

  • 清晰的 docstring 解释目的

  • 输入和输出的类型提示

  • 注释解释“为什么”而不是仅仅解释“是什么”

Ray Data 实现模式:

  • 为清晰起见,使用 batch_format="pandas"

  • 尽早处理文本(不要将二进制文件传递到管道中)

  • 为不同类型的操作分配适当的资源

  • 为查询优化进行分区写入

  • 使用原生的 Ray Data 操作(而不是自定义代码)

数据工程模式:

  • 即时文本提取(减少内存占用)

  • 分离分类阶段(便于调试)

  • 质量评估(数据验证)

  • 模式转换(干净的数据仓库模式)

  • 验证步骤(始终检查输出)

生产建议#

扩展到生产

  1. 删除 .limit(100) 以处理完整数据集

    • 目前处理 100 份文档用于演示

    • 删除此行以处理数百万份文档

    • 无需更改代码,只需删除一行

  2. 调整集群的资源参数

    # For larger clusters, increase parallelism:
    batch_size=5000    # Larger batches
    concurrency=50     # More parallel tasks (advanced)
    num_cpus=2         # More CPU per task (advanced)
    
  3. 添加错误处理和重试逻辑

    # For production, catch specific errors:
    try:
        elements = partition(file=stream)
    except CorruptedFileError:
        # Log and skip
    except TimeoutError:
        # Retry with backoff
    
  4. 使用 Ray Dashboard 进行监控

    • 查看实时进度

    • 检查资源利用率

    • 识别瓶颈

    • 调试失败

  5. 实现增量处理

    # Only process new documents:
    new_docs = all_docs.filter(
        lambda row: row["processing_date"] > last_run_date
    )
    
  6. 添加数据质量检查

    • 在写入之前验证模式

    • 检查 null 值

    • 验证外键关系

    • 监控随时间变化的质量指标

你学到了什么#

Ray Data 基础知识:

  • 如何从云存储(S3)读取

  • 分布式数据处理模式

  • 批处理与行处理操作

  • 资源管理和调优

  • 写入数据仓库

数据工程技能:

  • ETL 管道设计

  • 大规模文档处理

  • 质量评估策略

  • 数据仓库模式设计

  • 用于性能的分区

生产实践:

  • 验证和测试

  • 错误处理方法

  • 资源优化

  • 监控和调试

  • 可扩展性考虑