使用 Ray Data 进行非结构化数据摄取和处理#
完成时间:35 分钟 | 难度:中级 | 先决条件:数据工程经验、文档处理、基本自然语言处理 (NLP) 知识
使用 Ray Data 的分布式处理能力,为企业数据仓库工作流构建一个全面的文档摄取管道,将数据湖中的非结构化文档转换为结构化的、可用于分析的数据集。
目录#
学习目标#
为什么非结构化数据摄取很重要:企业数据湖包含大量的非结构化文档(PDF、Word 文档、演示文稿、报告),需要系统地处理以提取业务价值,用于分析和报告。
Ray Data 的摄取能力:将文档处理分布在集群上,以处理大规模文档集合,提取结构化数据,并为数据仓库消耗准备可用于分析的数据集。
数据湖到数据仓库的模式:数据工程团队用于系统地处理文档集合、提取结构化信息并创建可用于商业智能的查询数据集的技术。
生产摄取工作流:可扩展的文档处理模式,处理各种文件格式,提取元数据,并为下游分析系统创建结构化模式。
概述#
挑战:企业数据湖跨多种格式包含数百万份非结构化文档(PDF、Word 文档、演示文稿),需要系统地处理以提取业务价值。传统的文档处理方法在处理以下方面存在困难:
规模:单机处理限制了文档量
一致性:手动提取会导致模式不一致
集成:用于分析的复杂基础设施
数据仓库集成:手动数据建模和 ETL 过程
不断增长的数据量:非结构化数据正在快速增长,其数据量通常是以 10 倍的速度超过结构化数据。
解决方案:Ray Data 支持端到端的文档摄取管道,具有原生的分布式操作,可以高效地处理数百万份文档。
规模:利用流式执行和大规模处理能力来处理 PB 甚至 EB 级的数据。
一致性:一个灵活的 API,通过
map支持质量检查,并支持任何类型的 PyArrow schema。读取的所有数据也必须是一致的,否则管道会失败,并提供额外的配置选项来控制此行为。集成:支持与所有数据类型以及任何 AI 类型集成,在云和本地的 CPU/GPU/加速器上运行
数据仓库集成:预先构建了与流行数据仓库的连接器,并且能够轻松构建自定义连接器。
不断增长的数据量:支持单节点优化到跨 10k+ 节点的扩展
先决条件清单#
开始之前,请确保您已
[ ] 理解数据湖和数据仓库的概念
[ ] 具有文档处理和文本提取的经验
[ ] 了解结构化数据格式(Parquet、Delta Lake、Iceberg)
[ ] 拥有一个包含 Ray Data 和文档处理库的 Python 环境
[ ] 访问 S3 或其他云存储作为文档源
安装系统依赖项#
安装以下系统依赖项。请确保将它们包含在您的 worker 镜像中。
sudo apt-get update && \
sudo apt-get install -y libgl1-mesa-glx libmagic1 poppler-utils tesseract-ocr libreoffice && \
sudo rm -rf /var/lib/apt/lists/*
安装 Python 依赖项#
安装所需的 Python 包(这些也可以包含在您的镜像中)。
pip install --force-reinstall --no-cache-dir "unstructured[all-docs]==0.18.21" "pandas==2.3.3"
设置和初始化 Ray Data
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List #
import numpy as np
import pandas as pd
import ray
from ray.data.aggregate import Count, Max, Mean, Sum
from ray.data.expressions import col, lit
ctx = ray.data.DataContext.get_current()
# Disable progress bars for cleaner output in production
# You can enable these for debugging: set to True to see progress
ctx.enable_progress_bars = False
ctx.enable_operator_progress_bars = False
# Use runtime env to install pip dependencies across all workers
# You can skip this if your custom image already has these dependencies installed
runtime_env = dict(
pip= {
"packages": ["unstructured[all-docs]==0.18.21", "pandas==2.3.3"],
"pip_install_options": ["--force-reinstall", "--no-cache-dir"]
}
)
# Initialize Ray cluster connection to set the Ray Data context
ray.init(ignore_reinit_error=True, runtime_env= runtime_env)
第一步:数据湖文档发现#
发现数据湖中的文档集合#
# READ DOCUMENTS FROM DATA LAKE (S3)
# Use Ray Data's read_binary_files() to load documents from S3
# Why read_binary_files()?
# - Reads files as raw bytes (works for PDFs, DOCX, images, etc.)
# - Distributes file reading across cluster workers
# - include_paths=True gives us the file path for each document
#
# Parameters explained:
# - S3 path: Location of document collection in data lake
# - include_paths=True: Adds 'path' column with file location
# - ray_remote_args: Resource allocation per task
# * num_cpus=0.025: Very low CPU since this is I/O-bound (reading files)
# * This allows many concurrent reads without CPU bottleneck
# - limit(100): Process only 100 documents for this demo
# * Remove this limit to process entire collection
document_collection = ray.data.read_binary_files(
"s3://anyscale-rag-application/1000-docs/",
include_paths=True,
ray_remote_args={"num_cpus": 0.025}
).limit(100)
# Display the schema to understand our data structure
# At this point, we have: 'bytes' (file content) and 'path' (file location)
print(f"Dataset schema: {document_collection.schema()}")
文档元数据提取#
# TEXT EXTRACTION FUNCTION
# This function extracts text from various document formats (PDF, DOCX, etc.)
# It processes one document at a time (distirbuted by Ray) and returns structured metadata + text
def process_file(record: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract text content from document files using the Unstructured library.
BEGINNER NOTE:
- Input: A dictionary (record) with 'bytes' (file content) and 'path' (file location)
- Output: A dictionary with extracted text + metadata
- This function runs on each worker node in parallel
Why extract text immediately?
- Avoids passing large binary data through multiple operations
- Reduces memory usage in downstream processing
- Enables faster processing by dropping binary data early
"""
# Import libraries inside function so each Ray worker has access
import io
from pathlib import Path
from unstructured.partition.auto import partition
# Extract file metadata from the input record
file_path = Path(record["path"]) # Convert path string to Path object
file_bytes = record["bytes"] # Raw file content (binary data)
file_size = len(file_bytes) # Size in bytes
file_extension = file_path.suffix.lower() # Get extension (.pdf, .docx, etc.)
file_name = file_path.name # Just the filename (not full path)
# We can only extract text from certain file types
# If unsupported, return metadata with empty text
supported_extensions = {".pdf", ".docx", ".doc", ".pptx", ".ppt", ".html", ".txt"}
if file_extension not in supported_extensions:
# Return a record with metadata but no extracted text
return {
"document_id": str(uuid.uuid4()), # Generate unique ID
"file_path": str(file_path),
"file_name": file_name,
"file_extension": file_extension,
"file_size_bytes": file_size,
"file_size_mb": round(file_size / (1024 * 1024), 2), # Convert to MB
"discovery_timestamp": datetime.now().isoformat(),
"extracted_text": "", # Empty - unsupported format
"text_length": 0,
"word_count": 0,
"extraction_status": "unsupported_format"
}
# Extract text using the Unstructured library
try:
# Create an in-memory file stream from bytes (avoids writing to disk)
with io.BytesIO(file_bytes) as stream:
# partition() automatically detects format and extracts text
# It returns a list of text elements (paragraphs, tables, etc.)
elements = partition(file=stream)
# Combine all extracted text elements into one string
extracted_text = " ".join([str(el) for el in elements]).strip()
# Calculate text statistics for quality assessment
text_length = len(extracted_text) # Total characters
word_count = len(extracted_text.split()) if extracted_text else 0
extraction_status = "success"
except Exception as e:
# If extraction fails (corrupted file, unsupported format, etc.)
# Log the error and continue processing other files
print(f"Cannot process file {file_path}: {e}")
extracted_text = ""
text_length = 0
word_count = 0
extraction_status = f"error: {str(e)[:100]}" # Store error message (truncated)
# Return record with all metadata and extracted text
return {
"document_id": str(uuid.uuid4()), # Unique identifier for this document
"file_path": str(file_path),
"file_name": file_name,
"file_extension": file_extension,
"file_size_bytes": file_size,
"file_size_mb": round(file_size / (1024 * 1024), 2),
"discovery_timestamp": datetime.now().isoformat(),
"extracted_text": extracted_text, # The actual text content
"text_length": text_length,
"word_count": word_count,
"extraction_status": extraction_status
}
# Ray Data's map() applies process_file() to each document in parallel
# This is the "embarrassingly parallel" pattern - each document is independent
print("Extracting text from documents...")
# Why map() instead of map_batches()?
# - map(): Process one record at a time (good for variable-size documents)
# - map_batches(): Process records in batches (better for vectorized operations)
# - Text extraction is I/O-bound and document-specific, so map() is ideal
# Parameters:
# - process_file: The function to apply to each record
documents_with_text = document_collection.map(
process_file
)
# Convert a sample of documents to pandas DataFrame for easy viewing
documents_with_text.limit(25).to_pandas()
# BUSINESS METADATA ENRICHMENT FUNCTION (Modern Approach)
# Instead of simple string matching, apply basic NLP for categorization.
# For high-quality production, you'd use an ML or LLM model endpoint here.
from transformers import pipeline
# For demonstration: Use a small zero-shot classification model.
# In a real pipeline, you should call a production LLM/ML endpoint or use a domain-specific model.
# The 'facebook/bart-large-mnli' model works for zero-shot/label classification tasks.
# You may swap with "typeform/distilbert-base-uncased-mnli" or another small MNLI model for lighter resource use.
# If working at scale or with large docs, consider using Ray Serve + LLM API instead.
zero_shot_classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli" # Or another MNLI model if needed
)
# Define candidate classes (map to business categories)
CANDIDATE_LABELS = [
"financial document", # maps to 'finance'
"legal document", # maps to 'legal'
"regulatory document", # maps to 'compliance'
"client document", # maps to 'client_services'
"research document", # maps to 'research'
"general document" # maps to 'general'
]
BUSINESS_CATEGORY_MAPPING = {
"financial document": ("financial_document", "finance"),
"legal document": ("legal_document", "legal"),
"regulatory document": ("regulatory_document", "compliance"),
"client document": ("client_document", "client_services"),
"research document": ("research_document", "research"),
"general document": ("general_document", "general"),
}
def enrich_business_metadata(record: Dict[str, Any]) -> Dict[str, Any]:
"""
Uses zero-shot text classification to predict business category and assign processing priority.
For production: Replace this with a call to your domain-tuned classifier or LLM endpoint.
"""
file_size = record["file_size_bytes"]
text = record.get("extracted_text", "") or ""
filename = record.get("file_name", "")
# Concatenate extracted text with filename for context, up to limit (to save inference cost)
context_text = (filename + "\n\n" + text[:1000]).strip() # Truncate to first 1000 chars for speed
# Run zero-shot classification (useful even with short context)
result = zero_shot_classifier(context_text, CANDIDATE_LABELS, multi_label=False)
top_label = result["labels"][0] if result and "labels" in result and result["labels"] else "general document"
doc_type, business_category = BUSINESS_CATEGORY_MAPPING.get(top_label, ("general_document", "general"))
# Priority assignment using simple logic + NLP keyword search (feel free to LLM this too)
lower_context = context_text.lower()
if any(w in lower_context for w in ["urgent", "critical", "deadline"]):
priority = "high"
priority_score = 3
elif any(w in lower_context for w in ["important", "quarterly", "annual"]):
priority = "medium"
priority_score = 2
else:
priority = "low"
priority_score = 1
return {
**record,
"document_type": doc_type,
"business_category": business_category,
"processing_priority": priority,
"priority_score": priority_score,
"estimated_pages": max(1, file_size // 50000),
"processing_status": "classified"
}
# Apply business metadata enrichment to all documents
print("Enriching with business metadata (using zero-shot NLP)...")
# Note: zero-shot and LLM models can be heavy;
# For fast local testing, use a smaller model, or replace with a production endpoint.
documents_with_metadata = documents_with_text.map(
enrich_business_metadata
)
# View a few documents with business classification added
documents_with_metadata.limit(5).to_pandas()
# WHY AGGREGATIONS?
# Before writing to warehouse, understand what we're processing:
# - How many documents of each type?
# - What's the size distribution?
# - Which categories have the most content?
# AGGREGATION 1: Document type distribution
# Group by document_type and calculate statistics
doc_type_stats = documents_with_metadata.groupby("document_type").aggregate(
Count(), # How many documents of each type?
Sum("file_size_bytes"), # Total size per document type
Mean("file_size_mb"), # Average size per document type
Max("estimated_pages") # Largest document per type
)
# AGGREGATION 2: Business category analysis
# Understand the distribution across business categories
# This helps with warehouse partitioning strategy
category_stats = documents_with_metadata.groupby("business_category").aggregate(
Count(), # How many per category?
Mean("priority_score"), # Average priority per category
Sum("file_size_mb") # Total data volume per category
)
第二步:文档处理和分类#
文本提取和质量评估#
# QUALITY ASSESSMENT FUNCTION
# Evaluate document quality to filter out low-quality or problematic documents
def assess_document_quality(batch: pd.DataFrame) -> pd.DataFrame:
"""
Assess document quality for data warehouse ingestion.
BEGINNER NOTE:
- Input: pandas DataFrame with multiple documents (a "batch")
- Output: Same DataFrame with quality assessment columns added
- Uses map_batches() for efficiency (process many docs at once)
Why use map_batches() instead of map()?
- Batching is more efficient for lightweight operations
- Pandas DataFrame operations are optimized
- Reduces overhead from function calls
Explicitly use batch_format="pandas"
"""
quality_scores = np.zeros(len(batch), dtype=int) # Numeric score (0-4)
quality_ratings = [] # Text rating (high/medium/low)
quality_issues_list = [] # List of issues found
# Iterate through rows to apply business rules for quality
# Each document gets a score from 0-4 based on quality criteria
for idx, row in batch.iterrows():
quality_score = 0
quality_issues = []
# CRITERION 1: File size check
# Files smaller than 10KB might be empty or corrupt
if row["file_size_mb"] > 0.01: # More than 10KB
quality_score += 1
else:
quality_issues.append("file_too_small")
# CRITERION 2: Text length check
# Documents should have meaningful text content
if row["text_length"] > 100: # At least 100 characters
quality_score += 1
else:
quality_issues.append("insufficient_text")
# CRITERION 3: Business relevance check
# Classified documents are more valuable than unclassified
if row["business_category"] != "general":
quality_score += 1
else:
quality_issues.append("low_business_relevance")
# CRITERION 4: Word count check
# Documents should have substantial content
if row["word_count"] > 20: # At least 20 words
quality_score += 1
else:
quality_issues.append("insufficient_content")
# Score 4: All checks passed - high quality
# Score 2-3: Some issues - medium quality
# Score 0-1: Major issues - low quality
quality_rating = "high" if quality_score >= 4 else "medium" if quality_score >= 2 else "low"
# Store results for this document
quality_scores[idx] = quality_score
quality_ratings.append(quality_rating)
quality_issues_list.append(json.dumps(quality_issues)) # Convert list to JSON string
batch["quality_score"] = quality_scores
batch["quality_rating"] = quality_ratings
batch["quality_issues"] = quality_issues_list
return batch
# Apply quality assessment to all documents
# Parameters:
# - batch_format="pandas": Process as pandas DataFrame (easier than numpy arrays)
# - num_cpus=0.25: Very low CPU (this is lightweight logic)
# - batch_size=100: Process 100 documents at a time
# * Larger batches = fewer function calls = better efficiency
quality_assessed_docs = documents_with_metadata.map_batches(
assess_document_quality,
batch_format="pandas", # Ray Data pattern: explicit pandas format
num_cpus=0.25,
batch_size=100
)
第三步:文本分块和丰富#
# TEXT CHUNKING FUNCTION
# Split long documents into smaller chunks for downstream processing
# Why chunk? Many applications (LLMs, vector databases) have size limits
def create_text_chunks(record: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Create overlapping text chunks from each document.
BEGINNER NOTE:
- Input: ONE document record with full text
- Output: MULTIPLE chunk records (one document → many chunks)
- This is a "one-to-many" transformation
Why chunking?
- LLM APIs have token limits (e.g., 4096 tokens)
- Vector databases work better with smaller chunks
- Enables more granular search and analysis
Why overlapping chunks?
- Preserves context across chunk boundaries
- Prevents splitting important information
- 150 character overlap means each chunk shares text with neighbors
"""
text = record["extracted_text"] # The full document text
chunk_size = 1500 # Each chunk will be ~1500 characters
overlap = 150 # Adjacent chunks share 150 characters
# Why these numbers?
# - 1500 chars ≈ 300-400 tokens (good for most LLM APIs)
# - 150 char overlap ≈ 10% overlap (preserves context without too much redundancy)
# - You can adjust these based on your use case
# Create chunks of data by a sliding window
chunks = []
start = 0 # Starting position in the text
chunk_index = 0 # Track which chunk number this is
# There are many more advanced chunking methods, this example uses a simple technique for demo purposes
# Loop until processing all the text
while start < len(text):
# Calculate end position (don't go past text length)
end = min(start + chunk_size, len(text))
# Extract this chunk's text
chunk_text = text[start:end]
# Create a new record for this chunk
# It contains all the original document metadata PLUS chunk-specific data
chunk_record = {
**record, # All original fields (document_id, business_category, etc.)
"chunk_id": str(uuid.uuid4()), # Unique ID for this specific chunk
"chunk_index": chunk_index, # Position in sequence (0, 1, 2, ...)
"chunk_text": chunk_text, # The actual text content of this chunk
"chunk_length": len(chunk_text), # Characters in this chunk
"chunk_word_count": len(chunk_text.split()) # Words in this chunk
}
chunks.append(chunk_record)
# If you've reached the end of the text, you're done
if end >= len(text):
break
# Move to next chunk position (with overlap)
# Example: If chunk_size=1500 and overlap=150
# Chunk 1: chars 0-1500
# Chunk 2: chars 1350-2850 (starts 150 before chunk 1 ended)
# Chunk 3: chars 2700-4200 (starts 150 before chunk 2 ended)
start = end - overlap
chunk_index += 1
# After creating all chunks, add how many chunks the document has
# This helps with progress tracking and completeness checks
for chunk in chunks:
chunk["total_chunks"] = len(chunks)
# Return the list of chunk records
# Ray Data's flat_map() automatically flattens this list
return chunks
# Apply text chunking to all documents
# Use flat_map() for one-to-many transformations
# One document becomes multiple chunks
print("Creating text chunks...")
# Why flat_map() instead of map()?
# - map(): One input → One output (document → document)
# - flat_map(): One input → Many outputs (document → multiple chunks)
# - flat_map() automatically "flattens" the list of chunks
#
# Example:
# Input: 100 documents
# Output: 10,000+ chunks (each document becomes ~100 chunks on average)
#
# Parameters:
# - num_cpus=0.5: Moderate CPU usage (string slicing is lightweight)
chunked_documents = quality_assessed_docs.flat_map(
create_text_chunks,
num_cpus=0.5
)
第四步:数据仓库模式和输出#
创建数据仓库模式#
# DATA WAREHOUSE SCHEMA TRANSFORMATION
# Transform the raw processing data into a clean warehouse schema
# This is the "ETL" part - Extract (done), Transform (now), Load (next)
print("Creating data warehouse schema...")
# Get today's date in ISO format (YYYY-MM-DD)
# Ray Data uses this to partition the data by date in the warehouse
processing_date = datetime.now().isoformat()[:10]
# Data warehouses need clean, organized schemas
# Select only the columns needed and organize them logically
#
# Why not keep all columns?
# - Cleaner schema = easier queries
# - Less storage space
# - Better performance
# - Clear data contracts for downstream users
warehouse_dataset = chunked_documents.select_columns([
# PRIMARY IDENTIFIERS: Keys for joining and relationships
"document_id", # Links all chunks from same document
"chunk_id", # Unique identifier for this specific chunk
# DIMENSIONAL ATTRIBUTES: Categorical data for filtering/grouping
# These are typically used in WHERE clauses and GROUP BY
"business_category", # finance, legal, compliance, etc.
"document_type", # financial_document, legal_document, etc.
"file_extension", # .pdf, .docx, etc.
"quality_rating", # high, medium, low
"processing_priority", # high, medium, low
# FACT MEASURES: Numeric values for aggregation and analysis
# These are typically used in SUM(), AVG(), COUNT(), etc.
"file_size_mb", # Document size
"word_count", # Total words in document
"chunk_word_count", # Words in this chunk
"quality_score", # Numeric quality (0-4)
"priority_score", # Numeric priority (1-3)
"estimated_pages", # Page count estimate
"chunk_index", # Position in document (0, 1, 2, ...)
"total_chunks", # How many chunks total
# CONTENT FIELDS: The actual data payload
"chunk_text", # The text content (will rename this)
"file_name", # Original filename
"file_path", # S3 location
# METADATA: Processing provenance and status tracking
"discovery_timestamp", # When was file discovered
"extraction_status", # success, error, unsupported_format
"processing_status" # classified, processed, etc.
])
# RENAME COLUMNS for data warehouse conventions
# "chunk_text" → "text_content" (more descriptive)
warehouse_dataset = warehouse_dataset.rename_columns({
"chunk_text": "text_content"
})
# ADD PIPELINE METADATA: Constant columns for all records
# These columns are the same for every record in this run
# They help with data lineage and debugging
warehouse_dataset = (
warehouse_dataset
.with_column("processing_date", lit(processing_date)) # When was this processed?
.with_column("pipeline_version", lit("1.0")) # Which version of pipeline?
.with_column("processing_engine", lit("ray_data")) # What tool processed it?
)
带分区写入数据仓库#
# WRITE TO DATA WAREHOUSE - MAIN TABLE
# Save all processed chunks to Parquet format with partitioning
# This is the "Load" part of ETL
# /mnt/cluster_storage is a shared storage volume accessible by all workers
# In production, this would typically be:
# - S3: s3://your-bucket/warehouse/
# - Azure: abfs://container@account.dfs.core.windows.net/
# - GCS: gs://your-bucket/warehouse/
OUTPUT_WAREHOUSE_PATH = "/mnt/cluster_storage"
# WRITE MAIN TABLE with PARTITIONING
# write_parquet() is Ray Data's native way to save data
# Key parameters explained:
# partition_cols=["business_category", "processing_date"]
# - Creates folder structure: business_category=finance/processing_date=2025-10-15/
# - Enables efficient querying: "SELECT * FROM table WHERE business_category='finance'"
# - Query engines (Spark, Presto, Athena) can skip entire partitions
# - Example structure:
# main_table/
# ├── business_category=finance/
# │ └── processing_date=2025-10-15/
# │ ├── part-001.parquet
# │ └── part-002.parquet
# ├── business_category=legal/
# │ └── processing_date=2025-10-15/
# │ └── part-001.parquet
# └── business_category=compliance/
# └── processing_date=2025-10-15/
# └── part-001.parquet
# compression="snappy"
# - Compress files to save storage space (50-70% reduction)
# - Snappy is fast and well-supported by all query engines
# - Alternatives: gzip (higher compression), zstd (good balance)
# ray_remote_args={"num_cpus": 0.1}
# - Writing to storage is I/O-bound, not CPU-bound
# - Low CPU allocation allows more parallel writes
warehouse_dataset.write_parquet(
f"{OUTPUT_WAREHOUSE_PATH}/main_table/",
partition_cols=["business_category", "processing_date"],
compression="snappy",
ray_remote_args={"num_cpus": 0.1}
)
print("Main warehouse table written successfully")
# CREATE BUSINESS-SPECIFIC DATASETS
# Create specialized datasets for specific business teams
# Each team gets only the data they need with relevant columns
# Example: Compliance analytics dataset
# Compliance team needs: document content, quality, and priority
# Priority matters for compliance review workflows
compliance_analytics = warehouse_dataset.filter(
lambda row: row["business_category"] == "compliance"
).select_columns([
"document_id", # Link to main table
"chunk_id", # Unique chunk identifier
"text_content", # The actual text
"quality_score", # Data reliability
"processing_priority", # urgent/important/normal
"processing_date" # When processed
])
# Write to dedicated compliance folder
compliance_analytics.write_parquet(
f"{OUTPUT_WAREHOUSE_PATH}/analytics/compliance/",
partition_cols=["processing_date"],
compression="snappy",
ray_remote_args={"num_cpus": 0.1}
)
创建分析汇总表#
# CREATE ANALYTICS SUMMARY TABLES
# Pre-compute common aggregations for fast dashboard queries
# Summary tables = faster analytics queries
# SUMMARY TABLE 1: Processing metrics by category and date
# Answer questions like:
# - How many documents processed per category per day?
# - What's the total data volume per category?
# - What's the average document quality by category?
# This summary makes dashboard queries instant instead of scanning all data
# groupby() groups data by multiple columns
# aggregate() calculates statistics for each group
processing_metrics = warehouse_dataset.groupby(["business_category", "processing_date"]).aggregate(
Count(), # How many chunks per category+date?
Sum("file_size_mb"), # Total data volume
Mean("word_count"), # Average document size
Mean("quality_score") # Average quality
)
# Write summary table
# Smaller than main table, so queries are very fast
processing_metrics.write_parquet(
f"{OUTPUT_WAREHOUSE_PATH}/summaries/processing_metrics/",
partition_cols=["processing_date"],
compression="snappy",
ray_remote_args={"num_cpus": 0.1}
)
# SUMMARY TABLE 2: Quality distribution
# Answer questions like:
# - What percentage of documents are high/medium/low quality?
# - Which categories have the highest quality scores?
# - How does quality correlate with document size?
# This helps identify data quality issues by category
quality_distribution = warehouse_dataset.groupby(["quality_rating", "business_category"]).aggregate(
Count(), # How many per quality+category?
Mean("word_count"), # Average document size by quality
Mean("chunk_word_count") # Average chunk size by quality
)
# Write quality summary
# Used for quality monitoring dashboards
quality_distribution.write_parquet(
f"{OUTPUT_WAREHOUSE_PATH}/summaries/quality_distribution/",
compression="snappy",
ray_remote_args={"num_cpus": 0.1}
)
验证#
将数据写入数据仓库后,验证一切是否正常工作。本节演示了
为什么验证很重要
确保管道成功写入数据
验证记录计数是否符合预期
确认模式是否正确
提供样本数据供目视检查
验证什么
主表记录计数(应为 10,000 多个块)
汇总表存在且包含数据
模式包含所有预期的列
样本记录看起来正确
验证数据仓库输出#
# VERIFY DATA WAREHOUSE OUTPUT
# Always verify your data pipeline worked correctly
# This is a critical production practice
print("Verifying data warehouse integration...")
# Use Ray Data's read_parquet() to read what was just written
# This verifies:
# 1. Files were written successfully
# 2. Partitioning works correctly
# 3. Data can be read back (no corruption)
#
# Ray Data automatically discovers all partitions:
# - main_table/business_category=finance/processing_date=2025-10-15/*.parquet
# - main_table/business_category=legal/processing_date=2025-10-15/*.parquet
# - etc.
main_table_verify = ray.data.read_parquet(
f"{OUTPUT_WAREHOUSE_PATH}/main_table/",
ray_remote_args={"num_cpus": 0.025} # Low CPU for reading
)
# Verify our aggregated metrics tables also wrote successfully
metrics_verify = ray.data.read_parquet(
f"{OUTPUT_WAREHOUSE_PATH}/summaries/processing_metrics/",
ray_remote_args={"num_cpus": 0.025}
)
print(f"Data warehouse verification:")
print(f" Main table records: {main_table_verify.count():,}")
print(f" Processing metrics: {metrics_verify.count():,}")
print(f" Schema compatibility: Verified")
# INSPECT SAMPLE DATA
# take(10) gets first 10 records for manual inspection
# This helps catch issues like:
# - Wrong data types
# - Missing fields
# - Incorrect values
# - Encoding problems
samples = main_table_verify.take(10)
# Display key fields from each sample record
for i, record in enumerate(samples):
# Show abbreviated document ID (first 8 characters)
doc_id = record['document_id'][:8]
category = record['business_category']
words = record['word_count']
quality = record['quality_rating']
print(f"\t{i+1}. Doc: {doc_id}, Category: {category}, Words: {words}, Quality: {quality}")
总结#
您已经使用 Ray Data 构建了一个完整的端到端文档摄取管道。本节回顾了您学到的内容以及下一步该做什么。
你构建了什么#
完整的 ETL 管道:提取 → 转换 → 加载
提取:从 S3 数据湖读取 100 份文档
转换:提取文本、分类、评估质量、创建块
加载:写入带分区的数仓,包含分析表
最终输出:从原始文档到结构化数据仓库
主表:10,000 多个文本块可供分析
业务数据集:针对金融和合规性的特定视图
汇总表:为仪表板预先计算的指标
分区存储:为查询性能优化
您使用的 Ray Data 操作#
该管道演示了所有主要的 Ray Data 操作
操作 |
目的 |
何时使用 |
|---|---|---|
|
从 S3/存储加载文档 |
读取 PDF、图像,任何二进制文件 |
|
单独转换每个记录 |
变长处理,I/O 密集型任务 |
|
批量转换记录 |
批处理优化操作,ML 推理 |
|
一对多转换 |
分块、拆分、展开数据 |
|
保留/删除记录 |
选择子集,数据质量过滤 |
|
选择特定字段 |
模式投影,减小数据大小 |
|
更改列名 |
模式标准化 |
|
计算统计数据 |
分析,指标,摘要 |
|
保存到数据仓库 |
最终输出,检查点 |
初学者关键概念#
1. 分布式处理
您的代码运行在一个机器集群上(不仅仅是一台机器)
Ray Data 自动将工作分布到 worker 上
每个函数(process_file、assess_quality)并行运行
100 份文档同时处理 = 快 100 倍
2. 惰性求值
像
map()和filter()这样的操作不会立即执行Ray 构建计划并进行优化
执行发生在您调用
write_parquet()、count()或take()时这使得 Ray 能够优化整个管道
3. 资源管理
batch_size:每个批次包含多少条记录concurrency:有多少任务并行运行(高级)num_cpus:每个任务有多少个 CPU 核心(高级)根据您的工作负载平衡这些参数
4. 分区策略
分区 = 按列值组织的文件夹
partition_cols=["business_category", "processing_date"]查询引擎在过滤时会跳过整个文件夹
通过减少扫描的数据量,实现高效的查询性能
应用的实现模式#
代码组织:
为每个处理阶段分离函数
清晰的 docstring 解释目的
输入和输出的类型提示
注释解释“为什么”而不是仅仅解释“是什么”
Ray Data 实现模式:
为清晰起见,使用
batch_format="pandas"尽早处理文本(不要将二进制文件传递到管道中)
为不同类型的操作分配适当的资源
为查询优化进行分区写入
使用原生的 Ray Data 操作(而不是自定义代码)
数据工程模式:
即时文本提取(减少内存占用)
分离分类阶段(便于调试)
质量评估(数据验证)
模式转换(干净的数据仓库模式)
验证步骤(始终检查输出)
生产建议#
扩展到生产
删除
.limit(100)以处理完整数据集目前处理 100 份文档用于演示
删除此行以处理数百万份文档
无需更改代码,只需删除一行
调整集群的资源参数
# For larger clusters, increase parallelism: batch_size=5000 # Larger batches concurrency=50 # More parallel tasks (advanced) num_cpus=2 # More CPU per task (advanced)
添加错误处理和重试逻辑
# For production, catch specific errors: try: elements = partition(file=stream) except CorruptedFileError: # Log and skip except TimeoutError: # Retry with backoff
使用 Ray Dashboard 进行监控
查看实时进度
检查资源利用率
识别瓶颈
调试失败
实现增量处理
# Only process new documents: new_docs = all_docs.filter( lambda row: row["processing_date"] > last_run_date )
添加数据质量检查
在写入之前验证模式
检查 null 值
验证外键关系
监控随时间变化的质量指标
你学到了什么#
Ray Data 基础知识:
如何从云存储(S3)读取
分布式数据处理模式
批处理与行处理操作
资源管理和调优
写入数据仓库
数据工程技能:
ETL 管道设计
大规模文档处理
质量评估策略
数据仓库模式设计
用于性能的分区
生产实践:
验证和测试
错误处理方法
资源优化
监控和调试
可扩展性考虑