任务#
Ray 允许在单独的 Python worker 上异步执行任意函数。这类函数称为 Ray 远程函数,其异步调用称为 Ray 任务。以下是一个示例。
import ray
import time
# A regular Python function.
def normal_function():
return 1
# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
return 1
# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()
# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1
@ray.remote
def slow_function():
time.sleep(10)
return 1
# Ray tasks are executed in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
# This doesn't block.
slow_function.remote()
详见 ray.remote
API。
public class MyRayApp {
// A regular Java static method.
public static int myFunction() {
return 1;
}
}
// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
ObjectRef<Integer> res = Ray.task(MyRayApp::myFunction).remote();
// The result can be retrieved with ``ObjectRef::get``.
Assert.assertTrue(res.get() == 1);
public class MyRayApp {
public static int slowFunction() throws InterruptedException {
TimeUnit.SECONDS.sleep(10);
return 1;
}
}
// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
// This doesn't block.
Ray.task(MyRayApp::slowFunction).remote();
}
// A regular C++ function.
int MyFunction() {
return 1;
}
// Register as a remote function by `RAY_REMOTE`.
RAY_REMOTE(MyFunction);
// Invoke the above method as a Ray task.
// This will immediately return an object ref (a future) and then create
// a task that will be executed on a worker process.
auto res = ray::Task(MyFunction).Remote();
// The result can be retrieved with ``ray::ObjectRef::Get``.
assert(*res.Get() == 1);
int SlowFunction() {
std::this_thread::sleep_for(std::chrono::seconds(10));
return 1;
}
RAY_REMOTE(SlowFunction);
// Ray tasks are executed in parallel.
// All computation is performed in the background, driven by Ray's internal event loop.
for(int i = 0; i < 4; i++) {
// This doesn't block.
ray::Task(SlowFunction).Remote();
a
使用 状态 API 的 ray summary tasks
查看正在运行和已完成的任务及计数
# This API is only available when you download Ray via `pip install "ray[default]"`
ray summary tasks
======== Tasks Summary: 2023-05-26 11:09:32.092546 ========
Stats:
------------------------------------
total_actor_scheduled: 0
total_actor_tasks: 0
total_tasks: 5
Table (group by func_name):
------------------------------------
FUNC_OR_CLASS_NAME STATE_COUNTS TYPE
0 slow_function RUNNING: 4 NORMAL_TASK
1 my_function FINISHED: 1 NORMAL_TASK
指定所需资源#
您可以在任务中指定资源需求(详见指定任务或 Actor 资源需求)。
# Specify required resources.
@ray.remote(num_cpus=4, num_gpus=2)
def my_function():
return 1
# Override the default resource requirements.
my_function.options(num_cpus=3).remote()
// Specify required resources.
Ray.task(MyRayApp::myFunction).setResource("CPU", 4.0).setResource("GPU", 2.0).remote();
// Specify required resources.
ray::Task(MyFunction).SetResource("CPU", 4.0).SetResource("GPU", 2.0).Remote();
将对象引用传递给 Ray 任务#
除了值之外,还可以将对象引用 (Object refs)传递给远程函数。当任务执行时,在函数体内部,参数将是其底层值。例如,考虑此函数:
@ray.remote
def function_with_an_argument(value):
return value + 1
obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1
# You can pass an object ref as an argument to another Ray task.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2
public class MyRayApp {
public static int functionWithAnArgument(int value) {
return value + 1;
}
}
ObjectRef<Integer> objRef1 = Ray.task(MyRayApp::myFunction).remote();
Assert.assertTrue(objRef1.get() == 1);
// You can pass an object ref as an argument to another Ray task.
ObjectRef<Integer> objRef2 = Ray.task(MyRayApp::functionWithAnArgument, objRef1).remote();
Assert.assertTrue(objRef2.get() == 2);
static int FunctionWithAnArgument(int value) {
return value + 1;
}
RAY_REMOTE(FunctionWithAnArgument);
auto obj_ref1 = ray::Task(MyFunction).Remote();
assert(*obj_ref1.Get() == 1);
// You can pass an object ref as an argument to another Ray task.
auto obj_ref2 = ray::Task(FunctionWithAnArgument).Remote(obj_ref1);
assert(*obj_ref2.Get() == 2);
注意以下行为:
由于第二个任务依赖于第一个任务的输出,Ray 将不会执行第二个任务,直到第一个任务完成。
如果这两个任务调度在不同的机器上,则第一个任务的输出(对应于
obj_ref1/objRef1
的值)将通过网络发送到调度第二个任务的机器上。
等待部分结果#
在 Ray 任务结果上调用 ray.get 将会阻塞,直到任务执行完成。在启动一些任务后,您可能想知道哪些任务已经完成执行,而无需阻塞等待所有任务。这可以通过 ray.wait()
来实现。该函数的工作方式如下:
object_refs = [slow_function.remote() for _ in range(2)]
# Return as soon as one of the tasks finished execution.
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
WaitResult<Integer> waitResult = Ray.wait(objectRefs, /*num_returns=*/0, /*timeoutMs=*/1000);
System.out.println(waitResult.getReady()); // List of ready objects.
System.out.println(waitResult.getUnready()); // list of unready objects.
ray::WaitResult<int> wait_result = ray::Wait(object_refs, /*num_objects=*/0, /*timeout_ms=*/1000);
生成器#
Ray 与 Python 生成器语法兼容。详见Ray 生成器。
多返回值#
默认情况下,Ray 任务只返回一个对象引用。但是,您可以通过设置 num_returns
选项来配置 Ray 任务返回多个对象引用。
# By default, a Ray task only returns a single Object Ref.
@ray.remote
def return_single():
return 0, 1, 2
object_ref = return_single.remote()
assert ray.get(object_ref) == (0, 1, 2)
# However, you can configure Ray tasks to return multiple Object Refs.
@ray.remote(num_returns=3)
def return_multiple():
return 0, 1, 2
object_ref0, object_ref1, object_ref2 = return_multiple.remote()
assert ray.get(object_ref0) == 0
assert ray.get(object_ref1) == 1
assert ray.get(object_ref2) == 2
对于返回多个对象的任务,Ray 还支持远程生成器,允许任务一次返回一个对象,以减少 worker 端的内存使用。Ray 还支持动态设置返回值数量的选项,当任务调用者不知道会返回多少个值时,这非常有用。有关用例的更多详细信息,请参阅用户指南。
@ray.remote(num_returns=3)
def return_multiple_as_generator():
for i in range(3):
yield i
# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()
取消任务#
可以通过对返回的对象引用调用 ray.cancel()
来取消 Ray 任务。
@ray.remote
def blocking_operation():
time.sleep(10e6)
obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)
try:
ray.get(obj_ref)
except ray.exceptions.TaskCancelledError:
print("Object reference was cancelled.")
调度#
对于每个任务,Ray 会选择一个节点来运行它,调度决策基于几个因素,例如任务的资源需求、指定的调度策略和任务参数的位置。详见Ray 调度。
容错#
默认情况下,Ray 会因为系统故障和指定的应用级故障而重试失败的任务。您可以通过在 ray.remote()
和 .options()
中设置 max_retries
和 retry_exceptions
选项来更改此行为。详见Ray 容错。
任务事件#
默认情况下,Ray 会跟踪任务的执行,报告任务状态事件和性能分析事件,供 Ray Dashboard 和 状态 API 使用。
您可以通过在 ray.remote()
和 .options()
中设置 enable_task_events
选项来禁用任务事件,这可以减少任务执行的开销以及任务发送到 Ray Dashboard 的数据量。嵌套任务不会继承父任务的任务事件设置。您需要为每个任务单独设置任务事件设置。