在 Ray 上使用 Dask#
Dask 是一个 Python 并行计算库,专为扩展分析和科学计算工作负载而设计。它提供了大数据集合,模仿了熟悉的NumPy 和 Pandas 库的 API,允许这些抽象表示大于内存的数据和/或允许在多机集群上运行对这些数据的操作,同时还提供自动数据并行、智能调度和优化操作。这些集合上的操作会创建任务图,由调度程序执行。
Ray 为 Dask 提供了一个调度程序(dask_on_ray),它允许您使用 Dask 的集合构建数据分析,并在 Ray 集群上执行底层任务。
dask_on_ray 使用 Dask 的调度程序 API,它允许您指定任何可调用对象作为调度程序,供 Dask 使用来执行您的工作负载。使用 Dask-on-Ray 调度程序,整个 Dask 生态系统都可以运行在 Ray 之上。
注意
我们始终确保最新的 Dask 版本与 Ray nightly 兼容。下表显示了与 Ray 版本一起测试的最新 Dask 版本。
Ray 版本 |
Dask 版本 |
|---|---|
|
2023.6.1 (Python 版本 < 3.12)2025.5.0 (Python 版本 >= 3.12) |
|
2022.10.2 (Python 版本 < 3.12)2024.6.0 (Python 版本 >= 3.12) |
|
2022.10.1 (Python 版本 < 3.12)2024.6.0 (Python 版本 >= 3.12) |
|
|
|
2022.2.0 (Python 版本 < 3.8)2022.10.1 (Python 版本 >= 3.8) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
调度程序#
Dask-on-Ray 调度程序可以执行任何有效的 Dask 图,并且可以与任何 Dask 的.compute() 调用一起使用。下面是一个示例。
import ray
from ray.util.dask import ray_dask_get, enable_dask_on_ray, disable_dask_on_ray
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))
# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)
# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
df = dd.from_pandas(
pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
npartitions=2,
)
df.groupby(["age"]).mean().compute()
disable_dask_on_ray()
# The Dask config helper can be used as a context manager, limiting the scope
# of the Dask-on-Ray scheduler to the context.
with enable_dask_on_ray():
d_arr.mean().compute()
ray.shutdown()
注意
要在 Ray 集群上执行,您应该*不要*使用Dask.distributed 客户端;只需使用普通的 Dask 及其集合,并将 ray_dask_get 传递给 .compute() 调用,或者通过此处详细介绍的另一种方式设置调度程序,或者使用我们的enable_dask_on_ray 配置助手。请按照在集群上使用 Ray的说明修改 ray.init() 调用。
为什么要在 Ray 上使用 Dask?
如果您想在同一个应用程序中同时使用 Dask 和 Ray 库,而无需两个不同的集群。
如果您想使用 Dask 提供的熟悉的 NumPy 和 Pandas API 创建数据分析,并在像 Ray 这样专为生产而设计的快速、容错的分布式任务执行系统上执行它们。
Dask-on-Ray 是一个正在进行中的项目,预计不会达到直接使用 Ray 的性能。所有Dask 抽象应该能够通过此调度程序在 Ray 之上无缝运行,因此如果您发现其中一个抽象无法在 Ray 上运行,请打开一个 issue。
大规模工作负载的最佳实践#
对于 Ray 1.3,默认的调度策略是将任务尽可能地打包到同一个节点上。如果您在 Ray 上运行大规模/内存密集型的 Dask 工作负载,则更希望分散任务。
在这种情况下,有两种推荐的设置:- 降低 scheduler_spread_threshold 配置标志,告诉调度程序倾向于将任务分散到整个集群而不是打包。- 将头节点的 num-cpus 设置为 0,这样任务就不会被调度到头节点上。
# Head node. Set `num_cpus=0` to avoid tasks being scheduled on a head node.
RAY_scheduler_spread_threshold=0.0 ray start --head --num-cpus=0
# Worker node.
RAY_scheduler_spread_threshold=0.0 ray start --address=[head-node-address]
核外数据处理#
通过 Ray 的对象溢出支持处理大于集群内存的数据集:如果内存中的对象存储已满,对象将被溢出到外部存储(默认情况下为本地磁盘)。此功能在 Ray 1.2 中可用但默认关闭,在 Ray 1.3+ 中默认开启。请参阅您的 Ray 版本的对象溢出文档,了解启用和/或配置对象溢出的步骤。
持久化#
Dask-on-Ray 会修补 dask.persist() 以匹配Dask Distributed 的持久化语义;即,使用 Dask-on-Ray 调度程序调用 dask.persist() 将任务提交到 Ray 集群,并在 Dask 集合中内联返回 Ray Future。如果您想计算一些基础集合(例如 Dask 数组),然后进行多个不同的下游计算(例如聚合),这会很有用:这些下游计算会更快,因为基础集合的计算已提前启动,并被所有下游计算引用,通常通过共享内存。
import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
d_arr = da.ones(100)
# Print the internal Dask graph. Replace this with `print(dask.base.collections_to_dsk([d_arr]))` when dask>=2024.11.0,<2025.4.0.
print(dask.base.collections_to_expr([d_arr]).dask)
# {('ones_like-5902a58f37d3b639948dee893f5c4f4a', 0):
# <Task ('ones_like-5902a58f37d3b639948dee893f5c4f4a', 0)
# ones_like(...)>}
# This submits all underlying Ray tasks to the cluster and returns
# a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()
# Notice that the Ray ObjectRef is inlined. The dask.ones() task has
# been submitted to and is running on the Ray cluster.
# Replace this in a similar way when dask>=2024.11.0,<2025.4.0.
print(dask.base.collections_to_expr([d_arr_p]).dask)
# {('ones_like-5902a58f37d3b639948dee893f5c4f4a', 0):
# DataNode(ObjectRef(2c329aa28fcae64affffffffffffffffffffffff2c00000001000000))}
# Future computations on this persisted Dask Array will be fast since we
# already started computing d_arr_p in the background.
d_arr_p.sum().compute()
d_arr_p.min().compute()
d_arr_p.max().compute()
ray.shutdown()
注解、资源和任务选项#
Dask-on-Ray 支持通过Dask 的注解 API指定资源或任何其他 Ray 任务选项。此注解上下文管理器可用于将资源请求(或任何其他 Ray 任务选项)附加到特定的 Dask 操作,注解会向下传递到底层的 Ray 任务。资源请求和其他 Ray 任务选项也可以通过 .compute(ray_remote_args={...}) API 全局指定,该 API 将作为通过 Dask 工作负载启动的所有 Ray 任务的默认值。单个 Dask 操作上的注解将覆盖此全局默认值。
import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init(
resources={
"custom_resource": 1,
"other_custom_resource": 1,
"another_custom_resource": 1,
}
)
# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
# All Ray tasks that underly the Dask operations performed in an annotation
# context will require the indicated resources: 2 CPUs and 0.01 of the custom
# resource.
with dask.annotate(
ray_remote_args=dict(num_cpus=2, resources={"custom_resource": 0.01})
):
d_arr = da.ones(100)
# Operations on the same collection can have different annotations.
with dask.annotate(ray_remote_args=dict(resources={"other_custom_resource": 0.01})):
d_arr = 2 * d_arr
# This happens outside of the annotation context, so no resource constraints
# will be attached to the underlying Ray tasks for the sum() operation.
sum_ = d_arr.sum()
# Compute the result, passing in a default resource request that will be
# applied to all operations that aren't already annotated with a resource
# request. In this case, only the sum() operation will get this default
# resource request.
# We also give ray_remote_args, which will be given to every Ray task that
# Dask-on-Ray submits; note that this can also be overridden for individual
# Dask operations via the dask.annotate API.
# NOTE: We disable graph optimization since it can break annotations,
# see this issue: https://github.com/dask/dask/issues/7036.
result = sum_.compute(
ray_remote_args=dict(max_retries=5, resources={"another_custom_resource": 0.01}),
optimize_graph=False,
)
print(result)
# 200
ray.shutdown()
请注意,您可能需要禁用图优化,因为它可能会破坏注解,请参阅此 Dask issue。
Dask DataFrame Shuffling 的自定义优化#
Dask-on-Ray 提供了一个 Dask DataFrame 优化器,它利用 Ray 执行多返回任务的能力,以将 Dask DataFrame Shuffling 的速度提高多达 4 倍。只需将 dataframe_optimize 配置选项设置为我们的优化器函数,类似于指定 Dask-on-Ray 调度程序的方式。
import ray
from ray.util.dask import dataframe_optimize, ray_dask_get
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the Dask DataFrame optimizer to
# our custom optimization function, this time using the config setter as a
# context manager.
with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
npartitions = 100
df = dd.from_pandas(
pd.DataFrame(
np.random.randint(0, 100, size=(10000, 2)), columns=["age", "grade"]
),
npartitions=npartitions,
)
# We set max_branch to infinity in order to ensure that the task-based
# shuffle happens in a single stage, which is required in order for our
# optimization to work.
df.set_index(["age"], shuffle="tasks", max_branch=float("inf")).head(
10, npartitions=-1
)
ray.shutdown()
回调#
Dask 的自定义回调抽象通过 Ray 特有的回调进行了扩展,允许用户挂钩到 Ray 任务提交和执行生命周期。有了这些钩子,实现 Dask 级别的调度程序和任务内省(例如进度报告、诊断、缓存等)就很简单。
下面是一个使用 ray_pretask 和 ray_posttask 钩子来测量和记录每个任务执行时间的示例。
from ray.util.dask import RayDaskCallback, ray_dask_get
from timeit import default_timer as timer
class MyTimerCallback(RayDaskCallback):
def _ray_pretask(self, key, object_refs):
# Executed at the start of the Ray task.
start_time = timer()
return start_time
def _ray_posttask(self, key, result, pre_state):
# Executed at the end of the Ray task.
execution_time = timer() - pre_state
print(f"Execution time for task {key}: {execution_time}s")
with MyTimerCallback():
# Any .compute() calls within this context will get MyTimerCallback()
# as a Dask-Ray callback.
z.compute(scheduler=ray_dask_get)
提供了以下 Ray 特有的回调:
ray_presubmit(task, key, deps):在提交 Ray 任务之前运行。如果此回调返回非None值,则不会创建 Ray 任务,并且此值将用作潜在任务的结果值。
ray_postsubmit(task, key, deps, object_ref):在提交 Ray 任务之后运行。
ray_pretask(key, object_refs):在 Ray 任务中执行 Dask 任务之前运行。这将在任务提交后,在 Ray 工作进程中执行。此任务的返回值将传递给 ray_posttask 回调(如果提供)。
ray_posttask(key, result, pre_state):在 Ray 任务中执行 Dask 任务之后运行。这将在 Ray 工作进程中执行。此回调接收 ray_pretask 回调的返回值(如果提供)。
ray_postsubmit_all(object_refs, dsk):在所有 Ray 任务都已提交后运行。
ray_finish(result):在所有 Ray 任务执行完毕并返回最终结果后运行。
有关这些回调、它们的参数和返回值的更多详细信息,请参阅 RayDaskCallback 的文档字符串。
创建自己的回调时,可以直接使用 RayDaskCallback,将回调函数作为构造函数参数传递。
def my_presubmit_cb(task, key, deps):
print(f"About to submit task {key}!")
with RayDaskCallback(ray_presubmit=my_presubmit_cb):
z.compute(scheduler=ray_dask_get)
或者可以继承它,实现所需的回调方法。
class MyPresubmitCallback(RayDaskCallback):
def _ray_presubmit(self, task, key, deps):
print(f"About to submit task {key}!")
with MyPresubmitCallback():
z.compute(scheduler=ray_dask_get)
您还可以指定多个回调。
# The hooks for both MyTimerCallback and MyPresubmitCallback will be
# called.
with MyTimerCallback(), MyPresubmitCallback():
z.compute(scheduler=ray_dask_get)
将 Dask 回调与 actor 结合使用,可以实现有状态数据聚合的简单模式,例如捕获任务执行统计信息和缓存结果。下面是一个同时执行这两项操作的示例,如果任务执行时间超过用户定义的阈值,则缓存任务的结果。
@ray.remote
class SimpleCacheActor:
def __init__(self):
self.cache = {}
def get(self, key):
# Raises KeyError if key isn't in cache.
return self.cache[key]
def put(self, key, value):
self.cache[key] = value
class SimpleCacheCallback(RayDaskCallback):
def __init__(self, cache_actor_handle, put_threshold=10):
self.cache_actor = cache_actor_handle
self.put_threshold = put_threshold
def _ray_presubmit(self, task, key, deps):
try:
return ray.get(self.cache_actor.get.remote(str(key)))
except KeyError:
return None
def _ray_pretask(self, key, object_refs):
start_time = timer()
return start_time
def _ray_posttask(self, key, result, pre_state):
execution_time = timer() - pre_state
if execution_time > self.put_threshold:
self.cache_actor.put.remote(str(key), result)
cache_actor = SimpleCacheActor.remote()
cache_callback = SimpleCacheCallback(cache_actor, put_threshold=2)
with cache_callback:
z.compute(scheduler=ray_dask_get)
注意
现有的 Dask 调度程序回调(start、start_state、pretask、posttask、finish)也可用,可用于内省 Dask 任务到 Ray 任务的转换过程,但请注意,pretask 和 posttask 钩子在 Ray 任务*提交*之前和之后执行,而不是在*执行*之前和之后执行,并且 finish 在所有 Ray 任务*提交*之后执行,而不是*执行*之后。
此回调 API 目前不稳定,可能会发生更改。
API#
使用 Ray 特有的钩子扩展了 Dask 的 |
|
在提交 Ray 任务之前运行。 |
|
在提交 Ray 任务之后运行。 |
|
在 Ray 任务中执行 Dask 任务之前运行。 |
|
在 Ray 任务中执行 Dask 任务之后运行。 |
|
在 Ray 提交所有任务之后运行。 |
|
在 Ray 完成所有 Ray 任务的执行并返回最终结果后运行。 |