任务生命周期#
本文档介绍了 Ray Core 中任务的生命周期,包括任务如何定义、调度和执行。我们将使用以下代码作为示例,并且内部机制基于 Ray 2.48。
import ray
@ray.remote
def my_task(arg):
return f"Hello, {arg}!"
obj_ref = my_task.remote("Ray")
print(ray.get(obj_ref))
Hello, Ray!
定义远程函数#
任务生命周期的第一步是使用 ray.remote() 装饰器定义一个远程函数。 ray.remote() 会包装 Python 函数并返回一个 RemoteFunction 实例。 RemoteFunction 存储底层函数以及用户指定的 Ray 任务 options,例如 num_cpus。
调用远程函数#
定义好远程函数后,可以使用 .remote() 方法来调用它。每次调用远程函数都会创建一个 Ray 任务。此方法会提交任务以供执行,并返回一个对象引用 (ObjectRef),可用于稍后检索结果。在底层,.remote() 执行以下操作:
将底层函数进行 pickle 序列化成字节,并 存储在 GCS 键值存储中,使用一个 键,以便稍后远程执行器(将执行任务的核心工作进程)可以获取字节、反序列化并执行函数。这是针对每个远程函数定义执行一次,而不是每次调用都执行。
调用 Cython 的 submit_task,该函数 准备 参数(3种类型)并调用 C++ 的 CoreWorker::SubmitTask。
CoreWorker构建一个 TaskSpecification,其中包含任务的所有信息,包括函数的 ID、所有用户指定的选项和参数。此规范将发送给执行器进行执行。TaskSpecification 被异步 提交 到 NormalTaskSubmitter。这意味着
.remote()调用会立即返回,任务会被异步调度和执行。
调度任务#
任务被提交到 NormalTaskSubmitter 后,会选择一个 Ray 节点上的工作进程来执行该任务,这个过程称为调度。
NormalTaskSubmitter首先 等待 所有ObjectRef参数可用。可用意味着产生这些ObjectRef的任务已完成执行,并且数据已在集群中的某个位置可用。如果
ObjectRef指向的对象在 plasma 存储中,那么ObjectRef本身会被发送到执行器,执行器在调用用户函数之前会将ObjectRef解析为实际数据(如果需要,会从远程 plasma 存储拉取)。如果
ObjectRef指向的对象在调用者的内存存储中,那么数据会被 内联 并作为PushTaskRPC 的一部分发送到执行器,就像其他按值内联传递的参数一样。
一旦所有参数都可用,
NormalTaskSubmitter将尝试找到一个空闲的工作进程来执行任务。NormalTaskSubmitter通过一个称为工作进程租约的过程从 raylet 获取工作进程以执行任务,而调度就在此处发生。具体来说,它将向 选定的(本地 raylet 或偏好数据局部性的 raylet)raylet 发送一个RequestWorkerLeaseRPC 来请求工作进程租约。Raylet 处理
RequestWorkerLeaseRPC。当
RequestWorkerLeaseRPC 返回带有租用工作进程地址的响应时,就向调用者授予了执行任务的工作进程租约。如果RequestWorkerLease响应包含另一个 raylet 地址,那么NormalTaskSubmitter将从指定的 raylet 请求工作进程租约。这个过程会一直持续,直到获得工作进程租约。
执行任务#
获得租用的工作进程后,任务执行开始。
获取返回值#
用户函数执行后,调用者可以获取返回值。
用户函数返回后,执行器 获取并存储 所有返回值。如果返回值是一个 小的 对象,并且到目前为止此类返回值的总大小低于 阈值,则它将作为
PushTaskRPC 响应的一部分直接返回给调用者。 否则,它将被 put 到本地 plasma 存储中,并且引用将返回给调用者。当调用者 接收
PushTaskRPC 响应时,它 将 返回值(如果返回值小则为实际数据,如果返回值大则为指示数据在 plasma 存储中的特殊值)存储在本地内存存储中。当返回值被 添加到 本地内存存储时,
ray.get()会被 解除阻塞 并直接返回该值(如果对象较小),或者它将 从 本地 plasma 存储(如果需要,则先从远程 plasma 存储拉取)获取该对象(如果对象较大)。