在 Ray 上使用 Dask#
Dask 是一个 Python 并行计算库,旨在扩展分析和科学计算工作负载。它提供模仿熟悉的 NumPy 和 Pandas 库 API 的大数据集合,允许这些抽象表示大于内存的数据,并/或允许在多机器集群上运行对这些数据的操作,同时还提供自动数据并行、智能调度和优化操作。对这些集合的操作会创建一个任务图,由调度器执行。
Ray 为 Dask 提供了一个调度器 (dask_on_ray
),允许您使用 Dask 的集合构建数据分析,并在 Ray 集群上执行底层任务。
dask_on_ray
使用 Dask 的调度器 API,允许您指定任何可调用对象作为 Dask 用于执行您的工作负载的调度器。使用 Dask-on-Ray 调度器,整个 Dask 生态系统可以在 Ray 上运行。
注意
我们始终确保最新的 Dask 版本与 Ray nightly 版本兼容。下表显示了与各 Ray 版本测试兼容的最新 Dask 版本。
Ray 版本 |
Dask 版本 |
---|---|
|
2022.10.2 (Python 版本 < 3.12) 2024.6.0 (Python 版本 >= 3.12) |
|
2022.10.1 (Python 版本 < 3.12) 2024.6.0 (Python 版本 >= 3.12) |
|
|
|
2022.2.0 (Python 版本 < 3.8) 2022.10.1 (Python 版本 >= 3.8) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
调度器#
Dask-on-Ray 调度器可以执行任何有效的 Dask 图,并可与任何 Dask .compute() 调用一起使用。这里有一个例子
import ray
from ray.util.dask import ray_dask_get, enable_dask_on_ray, disable_dask_on_ray
import dask.array as da
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
d_arr = da.from_array(np.random.randint(0, 1000, size=(256, 256)))
# The Dask scheduler submits the underlying task graph to Ray.
d_arr.mean().compute(scheduler=ray_dask_get)
# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
df = dd.from_pandas(
pd.DataFrame(np.random.randint(0, 100, size=(1024, 2)), columns=["age", "grade"]),
npartitions=2,
)
df.groupby(["age"]).mean().compute()
disable_dask_on_ray()
# The Dask config helper can be used as a context manager, limiting the scope
# of the Dask-on-Ray scheduler to the context.
with enable_dask_on_ray():
d_arr.mean().compute()
ray.shutdown()
注意
要在 Ray 集群上执行,您不应使用 Dask.distributed 客户端;只需使用普通的 Dask 及其集合,并将 ray_dask_get
传递给 .compute()
调用,通过 此处 详述的其他方式之一设置调度器,或使用我们的 enable_dask_on_ray
配置助手。按照在集群上使用 Ray 的说明修改 ray.init()
调用。
为何在 Ray 上使用 Dask?
如果您希望在同一个应用中使用 Dask 和 Ray 库,而无需维护两个不同的集群。
如果您希望使用 Dask 提供的熟悉的 NumPy 和 Pandas API 创建数据分析,并在 Ray 这样面向生产、快速、容错的分布式任务执行系统上执行它们。
Dask-on-Ray 是一个正在进行中的项目,预计性能不会与直接使用 Ray 相同。所有Dask 抽象都应该能使用此调度器在 Ray 上无缝运行,因此如果您发现其中一个抽象无法在 Ray 上运行,请提交问题。
大规模工作负载的最佳实践#
对于 Ray 1.3,默认的调度策略是尽可能将任务打包到同一节点。如果您运行大规模/内存密集型的 Dask on Ray 工作负载,则更希望分散任务。
在这种情况下,有两种推荐的设置方式。- 降低配置标志 scheduler_spread_threshold
,以告诉调度器优先将任务分散到集群而不是打包。- 将头节点的 num-cpus
设置为 0,以便任务不会调度到头节点上。
# Head node. Set `num_cpus=0` to avoid tasks are being scheduled on a head node.
RAY_scheduler_spread_threshold=0.0 ray start --head --num-cpus=0
# Worker node.
RAY_scheduler_spread_threshold=0.0 ray start --address=[head-node-address]
核外数据处理#
通过 Ray 的对象溢出支持处理大于集群内存的数据集:如果内存对象存储已满,对象将溢出到外部存储(默认为本地磁盘)。此功能在 Ray 1.2 中可用但默认关闭,在 Ray 1.3+ 中默认开启。请参阅您的 Ray 版本对象溢出文档以了解启用和/或配置对象溢出的步骤。
持久化#
Dask-on-Ray 修补了 dask.persist() 以匹配 Dask Distributed 的持久化语义;也就是说,使用 Dask-on-Ray 调度器调用 dask.persist()
会将任务提交到 Ray 集群,并返回 Ray future,这些 future 内联在 Dask 集合中。如果您希望计算某个基础集合(例如 Dask 数组),然后进行多个不同的下游计算(例如聚合),这将非常有用:由于该基础集合计算已提前启动并被所有下游计算引用(通常通过共享内存),这些下游计算将更快。
import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
d_arr = da.ones(100)
print(dask.base.collections_to_dsk([d_arr]))
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
# 0): (functools.partial(<function _broadcast_trick_inner at 0x7f27f1a71f80>,
# dtype=dtype('float64')), (5,))}
# This submits all underlying Ray tasks to the cluster and returns
# a Dask array with the Ray futures inlined.
d_arr_p = d_arr.persist()
# Notice that the Ray ObjectRef is inlined. The dask.ones() task has
# been submitted to and is running on the Ray cluster.
dask.base.collections_to_dsk([d_arr_p])
# {('ones-c345e6f8436ff9bcd68ddf25287d27f3',
# 0): ObjectRef(8b4e50dc1ddac855ffffffffffffffffffffffff0100000001000000)}
# Future computations on this persisted Dask Array will be fast since we
# already started computing d_arr_p in the background.
d_arr_p.sum().compute()
d_arr_p.min().compute()
d_arr_p.max().compute()
ray.shutdown()
注解、资源和任务选项#
Dask-on-Ray 支持通过Dask 的注解 API 指定资源或任何其他 Ray 任务选项。此注解上下文管理器可用于将资源请求(或任何其他 Ray 任务选项)附加到特定的 Dask 操作,注解会传递到底层 Ray 任务。资源请求和其他 Ray 任务选项也可以通过 .compute(ray_remote_args={...})
API 进行全局指定,这将作为通过 Dask 工作负载启动的所有 Ray 任务的默认值。单个 Dask 操作上的注解将覆盖此全局默认值。
import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init(
resources={
"custom_resource": 1,
"other_custom_resource": 1,
"another_custom_resource": 1,
}
)
# Use our Dask config helper to set the scheduler to ray_dask_get globally,
# without having to specify it on each compute call.
enable_dask_on_ray()
# All Ray tasks that underly the Dask operations performed in an annotation
# context will require the indicated resources: 2 CPUs and 0.01 of the custom
# resource.
with dask.annotate(
ray_remote_args=dict(num_cpus=2, resources={"custom_resource": 0.01})
):
d_arr = da.ones(100)
# Operations on the same collection can have different annotations.
with dask.annotate(ray_remote_args=dict(resources={"other_custom_resource": 0.01})):
d_arr = 2 * d_arr
# This happens outside of the annotation context, so no resource constraints
# will be attached to the underlying Ray tasks for the sum() operation.
sum_ = d_arr.sum()
# Compute the result, passing in a default resource request that will be
# applied to all operations that aren't already annotated with a resource
# request. In this case, only the sum() operation will get this default
# resource request.
# We also give ray_remote_args, which will be given to every Ray task that
# Dask-on-Ray submits; note that this can also be overridden for individual
# Dask operations via the dask.annotate API.
# NOTE: We disable graph optimization since it can break annotations,
# see this issue: https://github.com/dask/dask/issues/7036.
result = sum_.compute(
ray_remote_args=dict(max_retries=5, resources={"another_custom_resource": 0.01}),
optimize_graph=False,
)
print(result)
# 200
ray.shutdown()
请注意,您可能需要禁用图优化,因为它可能会破坏注解,请参阅此 Dask 问题。
Dask DataFrame 洗牌的自定义优化#
Dask-on-Ray 提供了一个 Dask DataFrame 优化器,它利用 Ray 执行多返回任务的能力,在 Ray 上将洗牌速度提高多达 4 倍。只需将 dataframe_optimize
配置选项设置为我们的优化器函数,类似于您指定 Dask-on-Ray 调度器的方式
import ray
from ray.util.dask import dataframe_optimize, ray_dask_get
import dask
import dask.dataframe as dd
import numpy as np
import pandas as pd
# Start Ray.
# Tip: If connecting to an existing cluster, use ray.init(address="auto").
ray.init()
# Set the Dask DataFrame optimizer to
# our custom optimization function, this time using the config setter as a
# context manager.
with dask.config.set(scheduler=ray_dask_get, dataframe_optimize=dataframe_optimize):
npartitions = 100
df = dd.from_pandas(
pd.DataFrame(
np.random.randint(0, 100, size=(10000, 2)), columns=["age", "grade"]
),
npartitions=npartitions,
)
# We set max_branch to infinity in order to ensure that the task-based
# shuffle happens in a single stage, which is required in order for our
# optimization to work.
df.set_index(["age"], shuffle="tasks", max_branch=float("inf")).head(
10, npartitions=-1
)
ray.shutdown()
回调#
Dask 的自定义回调抽象通过 Ray 特有的回调进行了扩展,允许用户挂钩到 Ray 任务提交和执行生命周期。通过这些钩子,实现 Dask 级别的调度器和任务自省,例如进度报告、诊断、缓存等,变得简单。
这里有一个示例,使用 ray_pretask
和 ray_posttask
钩子测量并记录每个任务的执行时间
from ray.util.dask import RayDaskCallback, ray_dask_get
from timeit import default_timer as timer
class MyTimerCallback(RayDaskCallback):
def _ray_pretask(self, key, object_refs):
# Executed at the start of the Ray task.
start_time = timer()
return start_time
def _ray_posttask(self, key, result, pre_state):
# Executed at the end of the Ray task.
execution_time = timer() - pre_state
print(f"Execution time for task {key}: {execution_time}s")
with MyTimerCallback():
# Any .compute() calls within this context will get MyTimerCallback()
# as a Dask-Ray callback.
z.compute(scheduler=ray_dask_get)
提供以下 Ray 特有的回调
ray_presubmit(task, key, deps)
:在提交 Ray 任务之前运行。如果此回调返回非None
值,则不会创建 Ray 任务,并且此值将用作可能产生的任务的结果值。
ray_postsubmit(task, key, deps, object_ref)
:在提交 Ray 任务之后运行。
ray_pretask(key, object_refs)
:在 Ray 任务中执行 Dask 任务之前运行。这在任务提交后,在 Ray worker 内部执行。此任务的返回值(如果提供)将传递给 ray_posttask 回调。
ray_posttask(key, result, pre_state)
:在 Ray 任务中执行 Dask 任务之后运行。这在 Ray worker 内部执行。此回调接收 ray_pretask 回调的返回值(如果提供)。
ray_postsubmit_all(object_refs, dsk)
:在所有 Ray 任务提交后运行。
ray_finish(result)
:在所有 Ray 任务执行完成并返回最终结果后运行。
有关这些回调、其参数及其返回值的更多详细信息,请参阅 RayDaskCallback
的文档字符串。
创建自己的回调时,可以直接使用 RayDaskCallback
,将回调函数作为构造函数参数传递
def my_presubmit_cb(task, key, deps):
print(f"About to submit task {key}!")
with RayDaskCallback(ray_presubmit=my_presubmit_cb):
z.compute(scheduler=ray_dask_get)
或者您可以继承它,实现您需要的回调方法
class MyPresubmitCallback(RayDaskCallback):
def _ray_presubmit(self, task, key, deps):
print(f"About to submit task {key}!")
with MyPresubmitCallback():
z.compute(scheduler=ray_dask_get)
您也可以指定多个回调
# The hooks for both MyTimerCallback and MyPresubmitCallback will be
# called.
with MyTimerCallback(), MyPresubmitCallback():
z.compute(scheduler=ray_dask_get)
将 Dask 回调与 actor 结合使用,可以产生有状态数据聚合的简单模式,例如捕获任务执行统计信息和缓存结果。这里有一个示例,它同时执行这两项操作,如果任务的执行时间超过用户定义的阈值,则缓存任务的结果。
@ray.remote
class SimpleCacheActor:
def __init__(self):
self.cache = {}
def get(self, key):
# Raises KeyError if key isn't in cache.
return self.cache[key]
def put(self, key, value):
self.cache[key] = value
class SimpleCacheCallback(RayDaskCallback):
def __init__(self, cache_actor_handle, put_threshold=10):
self.cache_actor = cache_actor_handle
self.put_threshold = put_threshold
def _ray_presubmit(self, task, key, deps):
try:
return ray.get(self.cache_actor.get.remote(str(key)))
except KeyError:
return None
def _ray_pretask(self, key, object_refs):
start_time = timer()
return start_time
def _ray_posttask(self, key, result, pre_state):
execution_time = timer() - pre_state
if execution_time > self.put_threshold:
self.cache_actor.put.remote(str(key), result)
cache_actor = SimpleCacheActor.remote()
cache_callback = SimpleCacheCallback(cache_actor, put_threshold=2)
with cache_callback:
z.compute(scheduler=ray_dask_get)
注意
现有的 Dask 调度器回调 (start
, start_state
, pretask
, posttask
, finish
) 也可用,可用于自省 Dask 任务到 Ray 任务的转换过程,但请注意,pretask
和 posttask
钩子在 Ray 任务提交之前和之后执行,而不是执行之后,并且 finish
在所有 Ray 任务提交之后执行,而不是执行之后。
此回调 API 目前不稳定,未来可能会发生变化。
API#
扩展 Dask 的 |
|
在提交 Ray 任务之前运行。 |
|
在提交 Ray 任务之后运行。 |
|
在 Ray 任务中执行 Dask 任务之前运行。 |
|
在 Ray 任务中执行 Dask 任务之后运行。 |
|
在 Ray 提交所有任务之后运行。 |
|
在 Ray 完成执行所有 Ray 任务并返回最终结果之后运行。 |