数据聚合#
Ray Data 提供了一个灵活且高性能的 API,用于在 Dataset 上执行聚合操作。
基本聚合#
Ray Data 提供了几种内置的聚合函数,如 max、min、sum。
这些可以直接在 Dataset 或 GroupedData 对象上使用,如下所示
import ray
# Create a sample dataset
ds = ray.data.range(100)
ds = ds.add_column("group_key", lambda x: x % 3)
# Schema: {'id': int64, 'group_key': int64}
# Find the max
result = ds.max("id")
# result: 99
# Find the minimum value per group
result = ds.groupby("group_key").min("id")
# result: [{'group_key': 0, 'min(id)': 0}, {'group_key': 1, 'min(id)': 1}, {'group_key': 2, 'min(id)': 2}]
完整的内置聚合函数列表可在 Dataset API 参考 中找到。
前面的每个方法都有一个对应的 AggregateFnV2 对象。这些对象可用于 aggregate() 或 Dataset.groupby().aggregate()。
聚合对象可以直接与 Dataset 一起使用,如下所示
import ray
from ray.data.aggregate import Count, Mean, Quantile
# Create a sample dataset
ds = ray.data.range(100)
ds = ds.add_column("group_key", lambda x: x % 3)
# Count all rows
result = ds.aggregate(Count())
# result: {'count()': 100}
# Calculate mean per group
result = ds.groupby("group_key").aggregate(Mean(on="id")).take_all()
# result: [{'group_key': 0, 'mean(id)': ...},
# {'group_key': 1, 'mean(id)': ...},
# {'group_key': 2, 'mean(id)': ...}]
# Calculate 75th percentile
result = ds.aggregate(Quantile(on="id", q=0.75))
# result: {'quantile(id)': 75.0}
也可以一次计算多个聚合
import ray
from ray.data.aggregate import Count, Mean, Min, Max, Std
ds = ray.data.range(100)
ds = ds.add_column("group_key", lambda x: x % 3)
# Compute multiple aggregations at once
result = ds.groupby("group_key").aggregate(
Count(on="id"),
Mean(on="id"),
Min(on="id"),
Max(on="id"),
Std(on="id")
).take_all()
# result: [{'group_key': 0, 'count(id)': 34, 'mean(id)': ..., 'min(id)': ..., 'max(id)': ..., 'std(id)': ...},
# {'group_key': 1, 'count(id)': 33, 'mean(id)': ..., 'min(id)': ..., 'max(id)': ..., 'std(id)': ...},
# {'group_key': 2, 'count(id)': 33, 'mean(id)': ..., 'min(id)': ..., 'max(id)': ..., 'std(id)': ...}]
自定义聚合#
您可以通过实现 AggregateFnV2 接口来创建自定义聚合。AggregateFnV2 接口有三个关键方法需要实现
aggregate_block:处理单个数据块并返回部分聚合结果combine:将两个部分聚合结果合并为一个结果_finalize:将最终累积结果转换为所需的输出格式
聚合过程遵循以下步骤
初始化:对于每个组(如果正在分组)或对于整个数据集,使用
zero_factory创建一个初始累加器块聚合:
aggregate_block方法独立应用于每个块合并:
combine方法将部分结果合并到一个累加器中最终化:
_finalize方法将最终累加器转换为所需的输出
示例:创建自定义平均值聚合器#
这是一个创建自定义聚合器的示例,该聚合器计算列中值的平均值
import numpy as np
from ray.data.aggregate import AggregateFnV2
from ray.data._internal.util import is_null
from ray.data.block import Block, BlockAccessor, AggType, U
import pyarrow.compute as pc
from typing import List, Optional
class Mean(AggregateFnV2):
"""Defines mean aggregation."""
def __init__(
self,
on: Optional[str] = None,
ignore_nulls: bool = True,
alias_name: Optional[str] = None,
):
super().__init__(
alias_name if alias_name else f"mean({str(on)})",
on=on,
ignore_nulls=ignore_nulls,
# NOTE: We've to copy returned list here, as some
# aggregations might be modifying elements in-place
zero_factory=lambda: list([0, 0]), # noqa: C410
)
def aggregate_block(self, block: Block) -> AggType:
block_acc = BlockAccessor.for_block(block)
count = block_acc.count(self._target_col_name, self._ignore_nulls)
if count == 0 or count is None:
# Empty or all null.
return None
sum_ = block_acc.sum(self._target_col_name, self._ignore_nulls)
if is_null(sum_):
# In case of ignore_nulls=False and column containing 'null'
# return as is (to prevent unnecessary type conversions, when, for ex,
# using Pandas and returning None)
return sum_
return [sum_, count]
def combine(self, current_accumulator: AggType, new: AggType) -> AggType:
return [current_accumulator[0] + new[0], current_accumulator[1] + new[1]]
def _finalize(self, accumulator: AggType) -> Optional[U]:
if accumulator[1] == 0:
return np.nan
return accumulator[0] / accumulator[1]
注意
在某些情况下,哈希混洗可以为聚合提供更好的性能。有关更多信息,请参阅 哈希混洗与基于范围的混洗方法之间的比较。
要使用哈希混洗算法进行聚合,您需要在创建 Dataset 之前显式设置混洗策略:ray.data.DataContext.get_current().shuffle_strategy = ShuffleStrategy.HASH_SHUFFLE