在 Ray 上使用 Dask#

Dask 是一个 Python 并行计算库,专为扩展分析和科学计算工作负载而设计。它提供了大数据集合,模仿了熟悉的NumPyPandas 库的 API,允许这些抽象表示大于内存的数据和/或允许在多机集群上运行对这些数据的操作,同时还提供自动数据并行、智能调度和优化操作。这些集合上的操作会创建任务图,由调度程序执行。

Ray 为 Dask 提供了一个调度程序(dask_on_ray),它允许您使用 Dask 的集合构建数据分析,并在 Ray 集群上执行底层任务。

dask_on_ray 使用 Dask 的调度程序 API,它允许您指定任何可调用对象作为调度程序,供 Dask 使用来执行您的工作负载。使用 Dask-on-Ray 调度程序,整个 Dask 生态系统都可以运行在 Ray 之上。

注意

我们始终确保最新的 Dask 版本与 Ray nightly 兼容。下表显示了与 Ray 版本一起测试的最新 Dask 版本。

每个 Ray 版本的最新 Dask 版本。#

Ray 版本

Dask 版本

2.48.0 或更高版本

2023.6.1 (Python 版本 < 3.12)
2025.5.0 (Python 版本 >= 3.12)

2.40.02.47.1

2022.10.2 (Python 版本 < 3.12)
2024.6.0 (Python 版本 >= 3.12)

2.34.02.39.0

2022.10.1 (Python 版本 < 3.12)
2024.6.0 (Python 版本 >= 3.12)

2.8.02.33.x

2022.10.1

2.5.02.7.x

2022.2.0 (Python 版本 < 3.8)
2022.10.1 (Python 版本 >= 3.8)

2.4.0

2022.10.1

2.3.0

2022.10.1

2.2.0

2022.10.1

2.1.0

2022.2.0

2.0.0

2022.2.0

1.13.0

2022.2.0

1.12.0

2022.2.0

1.11.0

2022.1.0

1.10.0

2021.12.0

1.9.2

2021.11.0

1.9.1

2021.11.0

1.9.0

2021.11.0

1.8.0

2021.9.1

1.7.0

2021.9.1

1.6.0

2021.8.1

1.5.0

2021.7.0

1.4.1

2021.6.1

1.4.0

2021.5.0

调度程序#

Dask-on-Ray 调度程序可以执行任何有效的 Dask 图,并且可以与任何 Dask 的.compute() 调用一起使用。下面是一个示例。

import ray
from ray.util.dask import ray_dask_get, enable_dask_on_ray, disable_dask_on_ray
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()

d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))

# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

df = dd.from_pandas(
    pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
    npartitions=2,
)
df.groupby(["age"]).mean().compute()

disable_dask_on_ray()

# The Dask config helper can be used as a context manager, limiting the scope
# of the Dask-on-Ray scheduler to the context.
with enable_dask_on_ray():
    d_arr.mean().compute()

ray.shutdown()

注意

要在 Ray 集群上执行,您应该*不要*使用Dask.distributed 客户端;只需使用普通的 Dask 及其集合,并将 ray_dask_get 传递给 .compute() 调用,或者通过此处详细介绍的另一种方式设置调度程序,或者使用我们的enable_dask_on_ray 配置助手。请按照在集群上使用 Ray的说明修改 ray.init() 调用。

为什么要在 Ray 上使用 Dask?

  1. 利用 Ray 特有的功能,例如

    启动云集群共享内存存储

  2. 如果您想在同一个应用程序中同时使用 Dask 和 Ray 库,而无需两个不同的集群。

  3. 如果您想使用 Dask 提供的熟悉的 NumPy 和 Pandas API 创建数据分析,并在像 Ray 这样专为生产而设计的快速、容错的分布式任务执行系统上执行它们。

Dask-on-Ray 是一个正在进行中的项目,预计不会达到直接使用 Ray 的性能。所有Dask 抽象应该能够通过此调度程序在 Ray 之上无缝运行,因此如果您发现其中一个抽象无法在 Ray 上运行,请打开一个 issue

大规模工作负载的最佳实践#

对于 Ray 1.3,默认的调度策略是将任务尽可能地打包到同一个节点上。如果您在 Ray 上运行大规模/内存密集型的 Dask 工作负载,则更希望分散任务。

在这种情况下,有两种推荐的设置:- 降低 scheduler_spread_threshold 配置标志,告诉调度程序倾向于将任务分散到整个集群而不是打包。- 将头节点的 num-cpus 设置为 0,这样任务就不会被调度到头节点上。

# Head node. Set `num_cpus=0` to avoid tasks being scheduled on a head node.
RAY_scheduler_spread_threshold=0.0 ray start --head --num-cpus=0

# Worker node.
RAY_scheduler_spread_threshold=0.0 ray start --address=[head-node-address]

核外数据处理#

通过 Ray 的对象溢出支持处理大于集群内存的数据集:如果内存中的对象存储已满,对象将被溢出到外部存储(默认情况下为本地磁盘)。此功能在 Ray 1.2 中可用但默认关闭,在 Ray 1.3+ 中默认开启。请参阅您的 Ray 版本的对象溢出文档,了解启用和/或配置对象溢出的步骤。

持久化#

Dask-on-Ray 会修补 dask.persist() 以匹配Dask Distributed 的持久化语义;即,使用 Dask-on-Ray 调度程序调用 dask.persist() 将任务提交到 Ray 集群,并在 Dask 集合中内联返回 Ray Future。如果您想计算一些基础集合(例如 Dask 数组),然后进行多个不同的下游计算(例如聚合),这会很有用:这些下游计算会更快,因为基础集合的计算已提前启动,并被所有下游计算引用,通常通过共享内存。

import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

d_arr = da.ones(100)

# Print the internal Dask graph. Replace this with `print(dask.base.collections_to_dsk([d_arr]))` when dask>=2024.11.0,<2025.4.0.
print(dask.base.collections_to_expr([d_arr]).dask)
# {('ones_like-5902a58f37d3b639948dee893f5c4f4a', 0):
# <Task ('ones_like-5902a58f37d3b639948dee893f5c4f4a', 0)
# ones_like(...)>}

# This submits all underlying Ray tasks to the cluster and returns
# a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()

# Notice that the Ray ObjectRef is inlined. The dask.ones() task has
# been submitted to and is running on the Ray cluster.
# Replace this in a similar way when dask>=2024.11.0,<2025.4.0.
print(dask.base.collections_to_expr([d_arr_p]).dask)
# {('ones_like-5902a58f37d3b639948dee893f5c4f4a', 0):
# DataNode(ObjectRef(2c329aa28fcae64affffffffffffffffffffffff2c00000001000000))}

# Future computations on this persisted Dask Array will be fast since we
# already started computing d_arr_p in the background.
d_arr_p.sum().compute()
d_arr_p.min().compute()
d_arr_p.max().compute()

ray.shutdown()

注解、资源和任务选项#

Dask-on-Ray 支持通过Dask 的注解 API指定资源或任何其他 Ray 任务选项。此注解上下文管理器可用于将资源请求(或任何其他 Ray 任务选项)附加到特定的 Dask 操作,注解会向下传递到底层的 Ray 任务。资源请求和其他 Ray 任务选项也可以通过 .compute(ray_remote_args={...}) API 全局指定,该 API 将作为通过 Dask 工作负载启动的所有 Ray 任务的默认值。单个 Dask 操作上的注解将覆盖此全局默认值。

import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init(
    resources={
        "custom_resource": 1,
        "other_custom_resource": 1,
        "another_custom_resource": 1,
    }
)

# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()

# All Ray tasks that underly the Dask operations performed in an annotation
# context will require the indicated resources: 2 CPUs and 0.01 of the custom
# resource.
with dask.annotate(
    ray_remote_args=dict(num_cpus=2, resources={"custom_resource": 0.01})
):
    d_arr = da.ones(100)

# Operations on the same collection can have different annotations.
with dask.annotate(ray_remote_args=dict(resources={"other_custom_resource": 0.01})):
    d_arr = 2 * d_arr

# This happens outside of the annotation context, so no resource constraints
# will be attached to the underlying Ray tasks for the sum() operation.
sum_ = d_arr.sum()

# Compute the result, passing in a default resource request that will be
# applied to all operations that aren't already annotated with a resource
# request. In this case, only the sum() operation will get this default
# resource request.
# We also give ray_remote_args, which will be given to every Ray task that
# Dask-on-Ray submits; note that this can also be overridden for individual
# Dask operations via the dask.annotate API.
# NOTE: We disable graph optimization since it can break annotations,
# see this issue: https://github.com/dask/dask/issues/7036.
result = sum_.compute(
    ray_remote_args=dict(max_retries=5, resources={"another_custom_resource": 0.01}),
    optimize_graph=False,
)
print(result)
# 200

ray.shutdown()

请注意,您可能需要禁用图优化,因为它可能会破坏注解,请参阅此 Dask issue

Dask DataFrame Shuffling 的自定义优化#

Dask-on-Ray 提供了一个 Dask DataFrame 优化器,它利用 Ray 执行多返回任务的能力,以将 Dask DataFrame Shuffling 的速度提高多达 4 倍。只需将 dataframe_optimize 配置选项设置为我们的优化器函数,类似于指定 Dask-on-Ray 调度程序的方式。

import ray
from ray.util.dask import dataframe_optimize, ray_dask_get
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd

# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()

# Set the Dask DataFrame optimizer to
# our custom optimization function, this time using the config setter as a
# context manager.
with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
    npartitions = 100
    df = dd.from_pandas(
        pd.DataFrame(
            np.random.randint(0, 100, size=(10000, 2)), columns=["age", "grade"]
        ),
        npartitions=npartitions,
    )
    # We set max_branch to infinity in order to ensure that the task-based
    # shuffle happens in a single stage, which is required in order for our
    # optimization to work.
    df.set_index(["age"], shuffle="tasks", max_branch=float("inf")).head(
        10, npartitions=-1
    )

ray.shutdown()

回调#

Dask 的自定义回调抽象通过 Ray 特有的回调进行了扩展,允许用户挂钩到 Ray 任务提交和执行生命周期。有了这些钩子,实现 Dask 级别的调度程序和任务内省(例如进度报告、诊断、缓存等)就很简单。

下面是一个使用 ray_pretaskray_posttask 钩子来测量和记录每个任务执行时间的示例。

from ray.util.dask import RayDaskCallback, ray_dask_get
from timeit import default_timer as timer


class MyTimerCallback(RayDaskCallback):
    def _ray_pretask(self, key, object_refs):
        # Executed at the start of the Ray task.
        start_time = timer()
        return start_time

    def _ray_posttask(self, key, result, pre_state):
        # Executed at the end of the Ray task.
        execution_time = timer() - pre_state
        print(f"Execution time for task {key}: {execution_time}s")


with MyTimerCallback():
    # Any .compute() calls within this context will get MyTimerCallback()
    # as a Dask-Ray callback.
    z.compute(scheduler=ray_dask_get)

提供了以下 Ray 特有的回调:

  1. ray_presubmit(task, key, deps):在提交 Ray 任务之前运行。如果此回调返回非 None 值,则不会创建 Ray 任务,并且此值将用作潜在任务的结果值。

  2. ray_postsubmit(task, key, deps, object_ref):在提交 Ray 任务之后运行。

  3. ray_pretask(key, object_refs):在 Ray 任务中执行 Dask 任务之前运行。这将在任务提交后,在 Ray 工作进程中执行。此任务的返回值将传递给 ray_posttask 回调(如果提供)。

  4. ray_posttask(key, result, pre_state):在 Ray 任务中执行 Dask 任务之后运行。这将在 Ray 工作进程中执行。此回调接收 ray_pretask 回调的返回值(如果提供)。

  5. ray_postsubmit_all(object_refs, dsk):在所有 Ray 任务都已提交后运行。

  6. ray_finish(result):在所有 Ray 任务执行完毕并返回最终结果后运行。

有关这些回调、它们的参数和返回值的更多详细信息,请参阅 RayDaskCallback 的文档字符串。

创建自己的回调时,可以直接使用 RayDaskCallback,将回调函数作为构造函数参数传递。

def my_presubmit_cb(task, key, deps):
    print(f"About to submit task {key}!")

with RayDaskCallback(ray_presubmit=my_presubmit_cb):
    z.compute(scheduler=ray_dask_get)

或者可以继承它,实现所需的回调方法。

class MyPresubmitCallback(RayDaskCallback):
    def _ray_presubmit(self, task, key, deps):
        print(f"About to submit task {key}!")

with MyPresubmitCallback():
    z.compute(scheduler=ray_dask_get)

您还可以指定多个回调。

# The hooks for both MyTimerCallback and MyPresubmitCallback will be
# called.
with MyTimerCallback(), MyPresubmitCallback():
    z.compute(scheduler=ray_dask_get)

将 Dask 回调与 actor 结合使用,可以实现有状态数据聚合的简单模式,例如捕获任务执行统计信息和缓存结果。下面是一个同时执行这两项操作的示例,如果任务执行时间超过用户定义的阈值,则缓存任务的结果。

@ray.remote
class SimpleCacheActor:
    def __init__(self):
        self.cache = {}

    def get(self, key):
        # Raises KeyError if key isn't in cache.
        return self.cache[key]

    def put(self, key, value):
        self.cache[key] = value


class SimpleCacheCallback(RayDaskCallback):
    def __init__(self, cache_actor_handle, put_threshold=10):
        self.cache_actor = cache_actor_handle
        self.put_threshold = put_threshold

    def _ray_presubmit(self, task, key, deps):
        try:
            return ray.get(self.cache_actor.get.remote(str(key)))
        except KeyError:
            return None

    def _ray_pretask(self, key, object_refs):
        start_time = timer()
        return start_time

    def _ray_posttask(self, key, result, pre_state):
        execution_time = timer() - pre_state
        if execution_time > self.put_threshold:
            self.cache_actor.put.remote(str(key), result)


cache_actor = SimpleCacheActor.remote()
cache_callback = SimpleCacheCallback(cache_actor, put_threshold=2)
with cache_callback:
    z.compute(scheduler=ray_dask_get)

注意

现有的 Dask 调度程序回调(startstart_statepretaskposttaskfinish)也可用,可用于内省 Dask 任务到 Ray 任务的转换过程,但请注意,pretaskposttask 钩子在 Ray 任务*提交*之前和之后执行,而不是在*执行*之前和之后执行,并且 finish 在所有 Ray 任务*提交*之后执行,而不是*执行*之后。

此回调 API 目前不稳定,可能会发生更改。

API#

RayDaskCallback

使用 Ray 特有的钩子扩展了 Dask 的 Callback 类。

_ray_presubmit

在提交 Ray 任务之前运行。

_ray_postsubmit

在提交 Ray 任务之后运行。

_ray_pretask

在 Ray 任务中执行 Dask 任务之前运行。

_ray_posttask

在 Ray 任务中执行 Dask 任务之后运行。

_ray_postsubmit_all

在 Ray 提交所有任务之后运行。

_ray_finish

在 Ray 完成所有 Ray 任务的执行并返回最终结果后运行。