任务生命周期#

本文档介绍了 Ray Core 中任务的生命周期,包括任务如何定义、调度和执行。我们将使用以下代码作为示例,并且内部机制基于 Ray 2.48。

import ray

@ray.remote
def my_task(arg):
    return f"Hello, {arg}!"

obj_ref = my_task.remote("Ray")
print(ray.get(obj_ref))
Hello, Ray!

定义远程函数#

任务生命周期的第一步是使用 ray.remote() 装饰器定义一个远程函数。 ray.remote() 会包装 Python 函数并返回一个 RemoteFunction 实例。 RemoteFunction 存储底层函数以及用户指定的 Ray 任务 options,例如 num_cpus

调用远程函数#

定义好远程函数后,可以使用 .remote() 方法来调用它。每次调用远程函数都会创建一个 Ray 任务。此方法会提交任务以供执行,并返回一个对象引用 (ObjectRef),可用于稍后检索结果。在底层,.remote() 执行以下操作:

  1. 将底层函数进行 pickle 序列化成字节,并 存储在 GCS 键值存储中,使用一个 ,以便稍后远程执行器(将执行任务的核心工作进程)可以获取字节、反序列化并执行函数。这是针对每个远程函数定义执行一次,而不是每次调用都执行。

  2. 调用 Cython 的 submit_task,该函数 准备 参数(3种类型)并调用 C++ 的 CoreWorker::SubmitTask

    1. 按引用传递的参数:参数是 ObjectRef

    2. 按值内联传递的参数:参数是一个 小的 Python 对象,并且到目前为止此类参数的总大小低于 阈值。在这种情况下,它将被 pickle 序列化,并作为 PushTask RPC 的一部分发送到远程执行器,并在那里进行反序列化。这称为内联,在这种情况下不涉及 plasma 存储。

    3. 按值非内联传递的参数:参数是一个普通的 Python 对象,但它不符合内联条件(例如,大小过大),它被 put 到本地 plasma 存储中,并且该参数被生成的 ObjectRef 替换,因此它实际上等同于 .remote(ray.put(arg))

  3. CoreWorker 构建一个 TaskSpecification,其中包含任务的所有信息,包括函数的 ID、所有用户指定的选项和参数。此规范将发送给执行器进行执行。

  4. TaskSpecification 被异步 提交NormalTaskSubmitter。这意味着 .remote() 调用会立即返回,任务会被异步调度和执行。

调度任务#

任务被提交到 NormalTaskSubmitter 后,会选择一个 Ray 节点上的工作进程来执行该任务,这个过程称为调度。

  1. NormalTaskSubmitter 首先 等待 所有 ObjectRef 参数可用。可用意味着产生这些 ObjectRef 的任务已完成执行,并且数据已在集群中的某个位置可用。

    1. 如果 ObjectRef 指向的对象在 plasma 存储中,那么 ObjectRef 本身会被发送到执行器,执行器在调用用户函数之前会将 ObjectRef 解析为实际数据(如果需要,会从远程 plasma 存储拉取)。

    2. 如果 ObjectRef 指向的对象在调用者的内存存储中,那么数据会被 内联 并作为 PushTask RPC 的一部分发送到执行器,就像其他按值内联传递的参数一样。

  2. 一旦所有参数都可用,NormalTaskSubmitter 将尝试找到一个空闲的工作进程来执行任务。NormalTaskSubmitter 通过一个称为工作进程租约的过程从 raylet 获取工作进程以执行任务,而调度就在此处发生。具体来说,它将向 选定的(本地 raylet 或偏好数据局部性的 raylet)raylet 发送一个 RequestWorkerLease RPC 来请求工作进程租约。

  3. Raylet 处理 RequestWorkerLease RPC。

  4. RequestWorkerLease RPC 返回带有租用工作进程地址的响应时,就向调用者授予了执行任务的工作进程租约。如果 RequestWorkerLease 响应包含另一个 raylet 地址,那么 NormalTaskSubmitter 将从指定的 raylet 请求工作进程租约。这个过程会一直持续,直到获得工作进程租约。

执行任务#

获得租用的工作进程后,任务执行开始。

  1. NormalTaskSubmitter 带有 TaskSpecificationPushTask RPC 发送到租用的工作进程以执行任务。

  2. 执行器 接收 PushTask RPC 并执行(1 -> 2 -> 3 -> 4 -> 5)任务。

  3. 执行任务的第一步是 从本地 plasma 存储获取 所有按引用传递的参数(在调度期间数据已从远程 plasma 存储拉取到本地 plasma 存储)。

  4. 然后执行器 从 GCS 键值存储中获取 序列化后的函数字节并进行反序列化。

  5. 下一步是 反序列化 参数。

  6. 最后,调用 用户函数。

获取返回值#

用户函数执行后,调用者可以获取返回值。

  1. 用户函数返回后,执行器 获取并存储 所有返回值。如果返回值是一个 小的 对象,并且到目前为止此类返回值的总大小低于 阈值,则它将作为 PushTask RPC 响应的一部分直接返回给调用者。 否则,它将被 put 到本地 plasma 存储中,并且引用将返回给调用者。

  2. 当调用者 接收 PushTask RPC 响应时,它 返回值(如果返回值小则为实际数据,如果返回值大则为指示数据在 plasma 存储中的特殊值)存储在本地内存存储中。

  3. 当返回值被 添加到 本地内存存储时,ray.get() 会被 解除阻塞 并直接返回该值(如果对象较小),或者它将 本地 plasma 存储(如果需要,则先从远程 plasma 存储拉取)获取该对象(如果对象较大)。