加速器支持#

GPU 等加速器对于许多机器学习应用至关重要。Ray Core 本身支持多种加速器作为预定义的资源类型,并允许任务和 Actor 指定其加速器资源需求

Ray Core 本地支持的加速器包括:

加速器

Ray 资源名称

支持级别

英伟达 GPU

GPU

经完全测试,由 Ray 团队支持

AMD GPU

GPU

实验性,由社区支持

英特尔 GPU

GPU

实验性,由社区支持

AWS Neuron Core

neuron_cores

实验性,由社区支持

谷歌 TPU

TPU

实验性,由社区支持

英特尔 Gaudi

HPU

实验性,由社区支持

华为昇腾

NPU

实验性,由社区支持

使用加速器启动 Ray 节点#

默认情况下,Ray 会将节点的加速器资源数量设置为 Ray 自动检测到的物理加速器数量。如果需要,你可以覆盖此设置。

提示

在启动 Ray 节点之前,可以设置 CUDA_VISIBLE_DEVICES 环境变量来限制 Ray 可见的英伟达 GPU。例如,CUDA_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2 让 Ray 只看到设备 1 和 3。

提示

在启动 Ray 节点之前,可以设置 ROCR_VISIBLE_DEVICES 环境变量来限制 Ray 可见的 AMD GPU。例如,ROCR_VISIBLE_DEVICES=1,3 ray start --head --num-gpus=2 让 Ray 只看到设备 1 和 3。

提示

在启动 Ray 节点之前,可以设置 ONEAPI_DEVICE_SELECTOR 环境变量来限制 Ray 可见的英特尔 GPU。例如,ONEAPI_DEVICE_SELECTOR=1,3 ray start --head --num-gpus=2 让 Ray 只看到设备 1 和 3。

提示

在启动 Ray 节点之前,可以设置 NEURON_RT_VISIBLE_CORES 环境变量来限制 Ray 可见的 AWS Neuron Core。例如,NEURON_RT_VISIBLE_CORES=1,3 ray start --head --resources='{"neuron_cores": 2}' 让 Ray 只看到设备 1 和 3。

更多关于在 EKS 作为编排平台的 Neuron 上使用 Ray 的示例,请参阅 Amazon 文档<https://awslabs.github.io/data-on-eks/docs/category/inference-on-eks>

提示

在启动 Ray 节点之前,可以设置 TPU_VISIBLE_CHIPS 环境变量来限制 Ray 可见的谷歌 TPU。例如,TPU_VISIBLE_CHIPS=1,3 ray start --head --resources='{"TPU": 2}' 让 Ray 只看到设备 1 和 3。

提示

在启动 Ray 节点之前,可以设置 HABANA_VISIBLE_MODULES 环境变量来限制 Ray 可见的英特尔 Gaudi HPU。例如,HABANA_VISIBLE_MODULES=1,3 ray start --head --resources='{"HPU": 2}' 让 Ray 只看到设备 1 和 3。

提示

在启动 Ray 节点之前,可以设置 ASCEND_RT_VISIBLE_DEVICES 环境变量来限制 Ray 可见的华为昇腾 NPU。例如,ASCEND_RT_VISIBLE_DEVICES=1,3 ray start --head --resources='{"NPU": 2}' 让 Ray 只看到设备 1 和 3。

注意

鉴于 Ray 资源是逻辑的,没有什么能阻止你指定比机器上实际加速器数量更大的加速器资源数量(例如,num_gpus)。在这种情况下,Ray 会像机器具有你指定的加速器数量一样来调度需要加速器的任务和 Actor。只有当这些任务和 Actor 尝试实际使用不存在的加速器时,才会出现问题。

在任务和 Actor 中使用加速器#

如果任务或 Actor 需要加速器,可以指定相应的资源需求(例如 @ray.remote(num_gpus=1))。然后 Ray 会将该任务或 Actor 调度到具有足够空闲加速器资源的节点,并在运行任务或 Actor 代码之前通过设置相应的环境变量(例如 CUDA_VISIBLE_DEVICES)来为任务或 Actor 分配加速器。

import os
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUActor:
    def ping(self):
        print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
        print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))

@ray.remote(num_gpus=1)
def gpu_task():
    print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
    print("CUDA_VISIBLE_DEVICES: {}".format(os.environ["CUDA_VISIBLE_DEVICES"]))

gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) CUDA_VISIBLE_DEVICES: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) CUDA_VISIBLE_DEVICES: 1
import os
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUActor:
    def ping(self):
        print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
        print("ROCR_VISIBLE_DEVICES: {}".format(os.environ["ROCR_VISIBLE_DEVICES"]))

@ray.remote(num_gpus=1)
def gpu_task():
    print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
    print("ROCR_VISIBLE_DEVICES: {}".format(os.environ["ROCR_VISIBLE_DEVICES"]))

gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) ROCR_VISIBLE_DEVICES: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) ROCR_VISIBLE_DEVICES: 1
import os
import ray

ray.init(num_gpus=2)

@ray.remote(num_gpus=1)
class GPUActor:
    def ping(self):
        print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
        print("ONEAPI_DEVICE_SELECTOR: {}".format(os.environ["ONEAPI_DEVICE_SELECTOR"]))

@ray.remote(num_gpus=1)
def gpu_task():
    print("GPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))
    print("ONEAPI_DEVICE_SELECTOR: {}".format(os.environ["ONEAPI_DEVICE_SELECTOR"]))

gpu_actor = GPUActor.remote()
ray.get(gpu_actor.ping.remote())
# The actor uses the first GPU so the task uses the second one.
ray.get(gpu_task.remote())
(GPUActor pid=52420) GPU IDs: [0]
(GPUActor pid=52420) ONEAPI_DEVICE_SELECTOR: 0
(gpu_task pid=51830) GPU IDs: [1]
(gpu_task pid=51830) ONEAPI_DEVICE_SELECTOR: 1
import os
import ray

ray.init(resources={"neuron_cores": 2})

@ray.remote(resources={"neuron_cores": 1})
class NeuronCoreActor:
    def ping(self):
        print("Neuron Core IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["neuron_cores"]))
        print("NEURON_RT_VISIBLE_CORES: {}".format(os.environ["NEURON_RT_VISIBLE_CORES"]))

@ray.remote(resources={"neuron_cores": 1})
def neuron_core_task():
    print("Neuron Core IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["neuron_cores"]))
    print("NEURON_RT_VISIBLE_CORES: {}".format(os.environ["NEURON_RT_VISIBLE_CORES"]))

neuron_core_actor = NeuronCoreActor.remote()
ray.get(neuron_core_actor.ping.remote())
# The actor uses the first Neuron Core so the task uses the second one.
ray.get(neuron_core_task.remote())
(NeuronCoreActor pid=52420) Neuron Core IDs: [0]
(NeuronCoreActor pid=52420) NEURON_RT_VISIBLE_CORES: 0
(neuron_core_task pid=51830) Neuron Core IDs: [1]
(neuron_core_task pid=51830) NEURON_RT_VISIBLE_CORES: 1
import os
import ray

ray.init(resources={"TPU": 2})

@ray.remote(resources={"TPU": 1})
class TPUActor:
    def ping(self):
        print("TPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["TPU"]))
        print("TPU_VISIBLE_CHIPS: {}".format(os.environ["TPU_VISIBLE_CHIPS"]))

@ray.remote(resources={"TPU": 1})
def tpu_task():
    print("TPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["TPU"]))
    print("TPU_VISIBLE_CHIPS: {}".format(os.environ["TPU_VISIBLE_CHIPS"]))

tpu_actor = TPUActor.remote()
ray.get(tpu_actor.ping.remote())
# The actor uses the first TPU so the task uses the second one.
ray.get(tpu_task.remote())
(TPUActor pid=52420) TPU IDs: [0]
(TPUActor pid=52420) TPU_VISIBLE_CHIPS: 0
(tpu_task pid=51830) TPU IDs: [1]
(tpu_task pid=51830) TPU_VISIBLE_CHIPS: 1
import os
import ray

ray.init(resources={"HPU": 2})

@ray.remote(resources={"HPU": 1})
class HPUActor:
    def ping(self):
        print("HPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["HPU"]))
        print("HABANA_VISIBLE_MODULES: {}".format(os.environ["HABANA_VISIBLE_MODULES"]))

@ray.remote(resources={"HPU": 1})
def hpu_task():
    print("HPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["HPU"]))
    print("HABANA_VISIBLE_MODULES: {}".format(os.environ["HABANA_VISIBLE_MODULES"]))

hpu_actor = HPUActor.remote()
ray.get(hpu_actor.ping.remote())
# The actor uses the first HPU so the task uses the second one.
ray.get(hpu_task.remote())
(HPUActor pid=52420) HPU IDs: [0]
(HPUActor pid=52420) HABANA_VISIBLE_MODULES: 0
(hpu_task pid=51830) HPU IDs: [1]
(hpu_task pid=51830) HABANA_VISIBLE_MODULES: 1
import os
import ray

ray.init(resources={"NPU": 2})

@ray.remote(resources={"NPU": 1})
class NPUActor:
    def ping(self):
        print("NPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["NPU"]))
        print("ASCEND_RT_VISIBLE_DEVICES: {}".format(os.environ["ASCEND_RT_VISIBLE_DEVICES"]))

@ray.remote(resources={"NPU": 1})
def npu_task():
    print("NPU IDs: {}".format(ray.get_runtime_context().get_accelerator_ids()["NPU"]))
    print("ASCEND_RT_VISIBLE_DEVICES: {}".format(os.environ["ASCEND_RT_VISIBLE_DEVICES"]))

npu_actor = NPUActor.remote()
ray.get(npu_actor.ping.remote())
# The actor uses the first NPU so the task uses the second one.
ray.get(npu_task.remote())
(NPUActor pid=52420) NPU IDs: [0]
(NPUActor pid=52420) ASCEND_RT_VISIBLE_DEVICES: 0
(npu_task pid=51830) NPU IDs: [1]
(npu_task pid=51830) ASCEND_RT_VISIBLE_DEVICES: 1

在任务或 Actor 内部,ray.get_runtime_context().get_accelerator_ids() 返回可用于该任务或 Actor 的加速器 ID 列表。通常不需要调用 get_accelerator_ids(),因为 Ray 会自动设置相应的环境变量(例如 CUDA_VISIBLE_DEVICES),大多数 ML 框架都会遵循此环境变量来分配加速器。

注意: 上述定义的远程函数或 Actor 实际上并未使用任何加速器。Ray 将其调度到至少有一个加速器的节点上,并在执行期间为其保留一个加速器,但实际使用加速器取决于函数本身。这通常通过 TensorFlow 等外部库来完成。以下是一个实际使用加速器的示例。为了使此示例正常工作,你需要安装 TensorFlow 的 GPU 版本。

@ray.remote(num_gpus=1)
def gpu_task():
    import tensorflow as tf

    # Create a TensorFlow session. TensorFlow restricts itself to use the
    # GPUs specified by the CUDA_VISIBLE_DEVICES environment variable.
    tf.Session()

注意: 用户完全有可能忽略分配的加速器并使用机器上的所有加速器。Ray 并未阻止这种情况发生,这可能导致过多任务或 Actor 同时使用同一加速器。然而,Ray 确实会自动设置环境变量(例如 CUDA_VISIBLE_DEVICES),在用户未覆盖的情况下,大多数深度学习框架会遵循此环境变量来限制使用的加速器。

分数加速器#

Ray 支持分数资源需求,这样多个任务和 Actor 可以共享同一加速器。

ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_gpus=0.25)
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_gpus=0.25)
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])
ray.init(num_cpus=4, num_gpus=1)

@ray.remote(num_gpus=0.25)
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same GPU.
ray.get([f.remote() for _ in range(4)])

AWS Neuron Core 不支持分数资源。

谷歌 TPU 不支持分数资源。

英特尔 Gaudi 不支持分数资源。

ray.init(num_cpus=4, resources={"NPU": 1})

@ray.remote(resources={"NPU": 0.25})
def f():
    import time

    time.sleep(1)

# The four tasks created here can execute concurrently
# and share the same NPU.
ray.get([f.remote() for _ in range(4)])

注意: 用户有责任确保单个任务不会使用超过其分配的加速器内存。可以配置 Pytorch 和 TensorFlow 来限制其内存使用量。

当 Ray 将节点的加速器分配给具有分数资源需求的任务或 Actor 时,它会在分配下一个加速器之前打包一个加速器,以避免碎片化。

ray.init(num_gpus=3)

@ray.remote(num_gpus=0.5)
class FractionalGPUActor:
    def ping(self):
        print("GPU id: {}".format(ray.get_runtime_context().get_accelerator_ids()["GPU"]))

fractional_gpu_actors = [FractionalGPUActor.remote() for _ in range(3)]
# Ray tries to pack GPUs if possible.
[ray.get(fractional_gpu_actors[i].ping.remote()) for i in range(3)]
(FractionalGPUActor pid=57417) GPU id: [0]
(FractionalGPUActor pid=57416) GPU id: [0]
(FractionalGPUActor pid=57418) GPU id: [1]

Worker 不释放 GPU 资源#

目前,当 Worker 执行使用 GPU 的任务(例如,通过 TensorFlow)时,任务可能会在 GPU 上分配内存,并且在任务执行完成后可能不会释放。这会导致下次任务尝试使用同一 GPU 时出现问题。为解决此问题,Ray 默认禁用 GPU 任务之间的 Worker 进程复用,这意味着 GPU 资源会在任务进程退出后释放。由于这会增加 GPU 任务调度的开销,你可以通过在 ray.remote 装饰器中设置 max_calls=0 来重新启用 Worker 复用。

# By default, ray does not reuse workers for GPU tasks to prevent
# GPU resource leakage.
@ray.remote(num_gpus=1)
def leak_gpus():
    import tensorflow as tf

    # This task allocates memory on the GPU and then never release it.
    tf.Session()

加速器类型#

Ray 支持特定资源的加速器类型。accelerator_type 选项可用于强制任务或 Actor 在具有特定类型加速器的节点上运行。其内部实现是将其作为 "accelerator_type:<type>": 0.001自定义资源需求。这会强制将任务或 Actor 放置在具有该特定加速器类型的节点上。这也能让多节点类型自动伸缩器知道对该类型资源的需求,从而可能触发启动提供该加速器的新节点。

from ray.util.accelerators import NVIDIA_TESLA_V100

@ray.remote(num_gpus=1, accelerator_type=NVIDIA_TESLA_V100)
def train(data):
    return "This function was run on a node with a Tesla V100 GPU"

ray.get(train.remote(1))

有关可用的加速器类型,请参阅ray.util.accelerators