数据聚合#

Ray Data 提供了一个灵活且高性能的 API,用于在 Dataset 上执行聚合操作。

基本聚合#

Ray Data 提供了几种内置的聚合函数,如 maxminsum

这些可以直接在 Dataset 或 GroupedData 对象上使用,如下所示

import ray

# Create a sample dataset
ds = ray.data.range(100)
ds = ds.add_column("group_key", lambda x: x % 3)
# Schema: {'id': int64, 'group_key': int64}

# Find the max
result = ds.max("id")
# result: 99

# Find the minimum value per group
result = ds.groupby("group_key").min("id")
# result: [{'group_key': 0, 'min(id)': 0}, {'group_key': 1, 'min(id)': 1}, {'group_key': 2, 'min(id)': 2}]

完整的内置聚合函数列表可在 Dataset API 参考 中找到。

前面的每个方法都有一个对应的 AggregateFnV2 对象。这些对象可用于 aggregate()Dataset.groupby().aggregate()

聚合对象可以直接与 Dataset 一起使用,如下所示

import ray
from ray.data.aggregate import Count, Mean, Quantile

# Create a sample dataset
ds = ray.data.range(100)
ds = ds.add_column("group_key", lambda x: x % 3)

# Count all rows
result = ds.aggregate(Count())
# result: {'count()': 100}

# Calculate mean per group
result = ds.groupby("group_key").aggregate(Mean(on="id")).take_all()
# result: [{'group_key': 0, 'mean(id)': ...},
#          {'group_key': 1, 'mean(id)': ...},
#          {'group_key': 2, 'mean(id)': ...}]

# Calculate 75th percentile
result = ds.aggregate(Quantile(on="id", q=0.75))
# result: {'quantile(id)': 75.0}

也可以一次计算多个聚合

import ray
from ray.data.aggregate import Count, Mean, Min, Max, Std

ds = ray.data.range(100)
ds = ds.add_column("group_key", lambda x: x % 3)

# Compute multiple aggregations at once
result = ds.groupby("group_key").aggregate(
    Count(on="id"),
    Mean(on="id"),
    Min(on="id"),
    Max(on="id"),
    Std(on="id")
).take_all()
# result: [{'group_key': 0, 'count(id)': 34, 'mean(id)': ..., 'min(id)': ..., 'max(id)': ..., 'std(id)': ...},
#          {'group_key': 1, 'count(id)': 33, 'mean(id)': ..., 'min(id)': ..., 'max(id)': ..., 'std(id)': ...},
#          {'group_key': 2, 'count(id)': 33, 'mean(id)': ..., 'min(id)': ..., 'max(id)': ..., 'std(id)': ...}]

自定义聚合#

您可以通过实现 AggregateFnV2 接口来创建自定义聚合。AggregateFnV2 接口有三个关键方法需要实现

  1. aggregate_block:处理单个数据块并返回部分聚合结果

  2. combine:将两个部分聚合结果合并为一个结果

  3. _finalize:将最终累积结果转换为所需的输出格式

聚合过程遵循以下步骤

  1. 初始化:对于每个组(如果正在分组)或对于整个数据集,使用 zero_factory 创建一个初始累加器

  2. 块聚合aggregate_block 方法独立应用于每个块

  3. 合并combine 方法将部分结果合并到一个累加器中

  4. 最终化_finalize 方法将最终累加器转换为所需的输出

示例:创建自定义平均值聚合器#

这是一个创建自定义聚合器的示例,该聚合器计算列中值的平均值

import numpy as np
from ray.data.aggregate import AggregateFnV2
from ray.data._internal.util import is_null
from ray.data.block import Block, BlockAccessor, AggType, U
import pyarrow.compute as pc
from typing import List, Optional

class Mean(AggregateFnV2):
    """Defines mean aggregation."""

    def __init__(
        self,
        on: Optional[str] = None,
        ignore_nulls: bool = True,
        alias_name: Optional[str] = None,
    ):
        super().__init__(
            alias_name if alias_name else f"mean({str(on)})",
            on=on,
            ignore_nulls=ignore_nulls,
            # NOTE: We've to copy returned list here, as some
            #       aggregations might be modifying elements in-place
            zero_factory=lambda: list([0, 0]),  # noqa: C410
        )

    def aggregate_block(self, block: Block) -> AggType:
        block_acc = BlockAccessor.for_block(block)
        count = block_acc.count(self._target_col_name, self._ignore_nulls)

        if count == 0 or count is None:
            # Empty or all null.
            return None

        sum_ = block_acc.sum(self._target_col_name, self._ignore_nulls)

        if is_null(sum_):
            # In case of ignore_nulls=False and column containing 'null'
            # return as is (to prevent unnecessary type conversions, when, for ex,
            # using Pandas and returning None)
            return sum_

        return [sum_, count]

    def combine(self, current_accumulator: AggType, new: AggType) -> AggType:
        return [current_accumulator[0] + new[0], current_accumulator[1] + new[1]]

    def _finalize(self, accumulator: AggType) -> Optional[U]:
        if accumulator[1] == 0:
            return np.nan

        return accumulator[0] / accumulator[1]

注意

在内部,聚合支持 哈希混洗后端基于范围的后端

在某些情况下,哈希混洗可以为聚合提供更好的性能。有关更多信息,请参阅 哈希混洗与基于范围的混洗方法之间的比较

要使用哈希混洗算法进行聚合,您需要在创建 Dataset 之前显式设置混洗策略:ray.data.DataContext.get_current().shuffle_strategy = ShuffleStrategy.HASH_SHUFFLE