Joining Data#

注意

This is a new feature released in Ray 2.46. Note that this is an experimental feature and some things might not work as expected.

Ray Data 允许使用不同的连接类型连接多个 Dataset 实例,具体取决于提供的键列,如下所示:

import ray

doubles_ds = ray.data.range(4).map(
    lambda row: {"id": row["id"], "double": int(row["id"]) * 2}
)

squares_ds = ray.data.range(4).map(
    lambda row: {"id": row["id"], "square": int(row["id"]) ** 2}
)

doubles_and_squares_ds = doubles_ds.join(
    squares_ds,
    join_type="inner",
    num_partitions=2,
    on=("id",),
)

Ray Data 支持以下连接类型(请参阅 Dataset.join 文档以获取最新列表)

内连接/外连接: - 内连接、左外连接、右外连接、全外连接

半连接: - 左半连接、右半连接(返回在另一表中至少有一个匹配行的所有行,仅返回来自请求侧的列)

反连接: - 左反连接、右反连接(返回在另一表中没有匹配行的行,仅返回来自请求侧的列)

连接目前由 hash-shuffle 后端 提供支持。

配置连接#

连接通常是内存密集型操作,需要准确的内存计算和投影,因此对数据集中的倾斜和不平衡敏感。

Ray Data 提供了以下选项来调整连接性能以适应您的工作负载:

  • num_partitions:(必需) 指定两个输入数据集将被哈希分区成的分区数。请参阅 配置分区数 部分以获取有关如何调整此参数的指导。

  • partition_size_hint:(可选) 向连接操作符提供有关单个分区估计平均预期大小(以字节为单位)的提示。如果未指定,则默认为 DataContext.target_max_block_size(默认为 128Mb)。- 请注意,num_partitions * partition_size_hint 理想情况下应近似于实际数据集大小,即 partition_size_hint 可以估计为数据集大小除以 num_partitions(假设分区大小相对均匀)- 但是,在数据集分区预计会严重倾斜的情况下,partition_size_hint 应近似于最大分区大小,以防止出现内存不足 (OOM) 错误。

注意

请注意,默认情况下,Ray 只为其对象存储保留 30% 的内存。建议至少将此值设置为 **50%** 以用于所有 Ray Data 工作负载,特别是对于使用连接的工作负载。

要将对象存储配置为 50%,请添加到您的镜像中:

RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION=0.5

配置分区数#

分区数(也称为块)在单个任务处理的行批次大小与对它们执行的操作的内存要求之间提供了重要的权衡。

经验法则保持分区大,但不要太大以免导致内存不足 (OOM) 错误

  1. 不要为连接“过大”分区很重要,因为这可能会导致 OOM 错误(如果连接的分区可能太大而无法放入内存)。

  2. 同样重要的是不要创建太多小分区,因为这会产生处理大量较小对象的开销。

配置聚合器数量#

“聚合器”是执行实际连接/聚合/混洗的 worker actor,它们接收传入块的单个分区块,然后以执行给定操作所需的方式对其进行“聚合”。

以下是成功配置聚合器池中聚合器数量的重要考虑因素:

  • 默认为 64 或 num_partitions(当分区数少于 64 个时)。

  • 单个聚合器可能被分配来处理一个以上的分区(分区以轮询方式均匀地分配给聚合器)。

  • 聚合器是保持状态(分区)在内存中进行混洗的 有状态组件

注意

经验法则是避免设置 num_partitions >> 聚合器数量,因为它可能会创建瓶颈。

  1. 设置 DataContext.max_hash_shuffle_aggregators 会限制聚合器的数量。

  2. 将其设置为足够大的值会产生将 1 个分区分配给 1 个聚合器的效果(当 max_hash_shuffle_aggregators >= num_partitions 时)。