任务#

Ray 允许在单独的 Python worker 上异步执行任意函数。这类函数称为 Ray 远程函数,其异步调用称为 Ray 任务。以下是一个示例。

import ray
import time


# A regular Python function.
def normal_function():
    return 1


# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
    return 1


# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()

# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1


@ray.remote
def slow_function():
    time.sleep(10)
    return 1


# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
    # This doesn't block.
    slow_function.remote()

详见 ray.remote API。

public class MyRayApp {
  // A regular Java static method.
  public static int myFunction() {
    return 1;
  }
}

// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
ObjectRef<Integer> res = Ray.task(MyRayApp::myFunction).remote();

// The result can be retrieved with ``ObjectRef::get``.
Assert.assertTrue(res.get() == 1);

public class MyRayApp {
  public static int slowFunction() throws InterruptedException {
    TimeUnit.SECONDS.sleep(10);
    return 1;
  }
}

// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
  // This doesn't block.
  Ray.task(MyRayApp::slowFunction).remote();
}
// A regular C++ function.
int MyFunction() {
  return 1;
}
// Register as a remote function by `RAY_REMOTE`.
RAY_REMOTE(MyFunction);

// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
auto res = ray::Task(MyFunction).Remote();

// The result can be retrieved with ``ray::ObjectRef::Get``.
assert(*res.Get() == 1);

int SlowFunction() {
  std::this_thread::sleep_for(std::chrono::seconds(10));
  return 1;
}
RAY_REMOTE(SlowFunction);

// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
  // This doesn't block.
  ray::Task(SlowFunction).Remote();
a

使用 状态 APIray summary tasks 查看正在运行和已完成的任务及计数

# This API is only available when you download Ray via `pip install "ray[default]"`
ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5


Table (group by func_name):
------------------------------------
    FUNC_OR_CLASS_NAME    STATE_COUNTS    TYPE
0   slow_function         RUNNING: 4      NORMAL_TASK
1   my_function           FINISHED: 1     NORMAL_TASK

指定所需资源#

您可以在任务中指定资源需求(详见指定任务或 Actor 资源需求)。

# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
    return 1


# Override the default resource requirements.
my_function.options(num_cpus=3).remote()
// Specify required resources.
Ray.task(MyRayApp::myFunction).setResource("CPU", 4.0).setResource("GPU", 2.0).remote();
// Specify required resources.
ray::Task(MyFunction).SetResource("CPU", 4.0).SetResource("GPU", 2.0).Remote();

将对象引用传递给 Ray 任务#

除了值之外,还可以将对象引用 (Object refs)传递给远程函数。当任务执行时,在函数体内部,参数将是其底层值。例如,考虑此函数:

@ray.remote
def function_with_an_argument(value):
    return value + 1


obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1

# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
public class MyRayApp {
    public static int functionWithAnArgument(int value) {
        return value + 1;
    }
}

ObjectRef<Integer> objRef1 = Ray.task(MyRayApp::myFunction).remote();
Assert.assertTrue(objRef1.get() == 1);

// You can pass an object ref as an argument to another Ray task.
ObjectRef<Integer> objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote();
Assert.assertTrue(objRef2.get() == 2);
static int FunctionWithAnArgument(int value) {
    return value + 1;
}
RAY_REMOTE(FunctionWithAnArgument);

auto obj_ref1 = ray::Task(MyFunction).Remote();
assert(*obj_ref1.Get() == 1);

// You can pass an object ref as an argument to another Ray task.
auto obj_ref2 = ray::Task(FunctionWithAnArgument).Remote(obj_ref1);
assert(*obj_ref2.Get() == 2);

注意以下行为:

  • 由于第二个任务依赖于第一个任务的输出,Ray 将不会执行第二个任务,直到第一个任务完成。

  • 如果这两个任务调度在不同的机器上,则第一个任务的输出(对应于 obj_ref1/objRef1 的值)将通过网络发送到调度第二个任务的机器上。

等待部分结果#

在 Ray 任务结果上调用 ray.get 将会阻塞,直到任务执行完成。在启动一些任务后,您可能想知道哪些任务已经完成执行,而无需阻塞等待所有任务。这可以通过 ray.wait() 来实现。该函数的工作方式如下:

object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
WaitResult<Integer> waitResult = Ray.wait(objectRefs, /*num_returns=*/0, /*timeoutMs=*/1000);
System.out.println(waitResult.getReady());  // List of ready objects.
System.out.println(waitResult.getUnready());  // list of unready objects.
ray::WaitResult<int> wait_result = ray::Wait(object_refs, /*num_objects=*/0, /*timeout_ms=*/1000);

生成器#

Ray 与 Python 生成器语法兼容。详见Ray 生成器

多返回值#

默认情况下,Ray 任务只返回一个对象引用。但是,您可以通过设置 num_returns 选项来配置 Ray 任务返回多个对象引用。

# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
    return 0, 1, 2


object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)


# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
    return 0, 1, 2


object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2

对于返回多个对象的任务,Ray 还支持远程生成器,允许任务一次返回一个对象,以减少 worker 端的内存使用。Ray 还支持动态设置返回值数量的选项,当任务调用者不知道会返回多少个值时,这非常有用。有关用例的更多详细信息,请参阅用户指南

@ray.remote(num_returns=3)
def return_multiple_as_generator():
    for i in range(3):
        yield i


# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()

取消任务#

可以通过对返回的对象引用调用 ray.cancel() 来取消 Ray 任务。

@ray.remote
def blocking_operation():
    time.sleep(10e6)


obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)

try:
    ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
    print("Object reference was cancelled.")

调度#

对于每个任务,Ray 会选择一个节点来运行它,调度决策基于几个因素,例如任务的资源需求指定的调度策略任务参数的位置。详见Ray 调度

容错#

默认情况下,Ray 会因为系统故障和指定的应用级故障而重试失败的任务。您可以通过在 ray.remote().options() 中设置 max_retriesretry_exceptions 选项来更改此行为。详见Ray 容错

任务事件#

默认情况下,Ray 会跟踪任务的执行,报告任务状态事件和性能分析事件,供 Ray Dashboard 和 状态 API 使用。

您可以通过在 ray.remote().options() 中设置 enable_task_events 选项来禁用任务事件,这可以减少任务执行的开销以及任务发送到 Ray Dashboard 的数据量。嵌套任务不会继承父任务的任务事件设置。您需要为每个任务单独设置任务事件设置。

更多关于 Ray 任务的信息#