Python SDK 概览#

Ray Jobs Python SDK 是以编程方式提交作业的推荐方式。跳转到API 参考,或继续阅读快速概览。

设置#

Ray Jobs 在 1.9+ 版本中可用,并需要完整安装 Ray。你可以通过运行以下命令来完成安装:

pip install "ray[default]"

有关安装 Ray 的更多详细信息,请参阅安装指南

要运行 Ray Job,我们还需要能够向 Ray 集群发送 HTTP 请求。为方便起见,本指南假设你正在使用本地 Ray 集群,我们可以通过运行以下命令启动它:

ray start --head
# ...
# 2022-08-10 09:54:57,664   INFO services.py:1476 -- View the Ray dashboard at http://127.0.0.1:8265
# ...

这将会在我们的本地机器上创建一个 Ray head 节点,我们可以将其用于开发目的。请注意启动或连接 Ray 集群时打印出的 Ray Dashboard URL;我们稍后会使用此 URL 提交 Ray Job。如果使用远程集群,请参阅使用远程集群获取有关端口转发的提示。有关生产部署场景的更多详细信息,请查阅在虚拟机Kubernetes上部署 Ray 的指南。

提交 Ray Job#

让我们从一个可以在本地运行的示例脚本开始。以下脚本使用 Ray API 提交一个任务并打印其返回值:

# script.py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))

SDK 调用通过一个 JobSubmissionClient 对象进行。要初始化客户端,请提供 Ray 集群 head 节点的地址以及 Ray Dashboard 使用的端口(默认为 8265)。对于此示例,我们将使用本地 Ray 集群,但相同的示例也适用于远程 Ray 集群地址;有关设置端口转发的详细信息,请参阅使用远程集群

from ray.job_submission import JobSubmissionClient

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address or set up port forwarding.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"working_dir": "./"}
)
print(job_id)

提示

默认情况下,Ray 作业服务器将生成并返回一个新的 job_id,但你也可以选择一个唯一的 job_id 字符串并将其传递给 submit_job。在这种情况下,作业将使用你给定的 ID 执行,并且如果针对同一个 Ray 集群多次提交相同的 job_id,将抛出错误。

由于作业提交是异步的,上面的调用将立即返回,并输出如下内容:

raysubmit_g8tDzJ6GqrCy7pd6

现在我们可以编写一个简单的轮询循环,检查作业状态直到其达到终端状态(即 JobStatus.SUCCEEDEDJobStatus.STOPPEDJobStatus.FAILED)。我们还可以通过调用 client.get_job_logs 获取作业的输出。

from ray.job_submission import JobSubmissionClient, JobStatus
import time

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"working_dir": "./"}
)
print(job_id)

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

输出应如下所示:

raysubmit_pBwfn5jqRE1E7Wmc
status: PENDING
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
2022-08-22 15:05:55,652 INFO worker.py:1203 -- Using address 127.0.0.1:6379 set in the environment variable RAY_ADDRESS
2022-08-22 15:05:55,652 INFO worker.py:1312 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2022-08-22 15:05:55,660 INFO worker.py:1487 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265.
hello world

与长时间运行的作业交互#

除了获取作业的当前状态和输出外,提交的作业还可以在执行完成之前由用户停止。

job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python -c 'import time; print(\"Sleeping...\"); time.sleep(60)'"
)
wait_until_status(job_id, {JobStatus.RUNNING})
print(f'Stopping job {job_id}')
client.stop_job(job_id)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

输出应如下所示:

status: PENDING
status: PENDING
status: RUNNING
Stopping job raysubmit_VYCZZ2BQb4tfeCjq
status: STOPPED
Sleeping...

要获取所有作业的信息,请调用 client.list_jobs()。这将返回一个 Dict[str, JobInfo] 对象,将作业 ID 映射到其信息。

作业信息(状态及相关元数据)无限期地存储在集群上。要删除此信息,你可以对任何已处于终端状态的作业调用 client.delete_job(job_id)。有关更多详细信息,请参阅SDK API 参考

依赖管理#

类似于Jobs CLI,我们也可以使用 Ray 运行时环境打包应用程序的依赖项。使用 Python SDK,语法如下所示:

job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Runtime environment for the job, specifying a working directory and pip package
    runtime_env={
        "working_dir": "./",
        "pip": ["requests==2.26.0"]
    }
)

提示

除了本地目录(本例中的 "./"),你还可以为作业的工作目录指定远程 URI,例如 S3 存储桶或 Git 仓库。详细信息请参阅远程 URI

更多详细信息,请参阅API 参考

指定 CPU 和 GPU 资源#

默认情况下,作业入口点脚本始终在 head 节点上运行。我们建议在 Ray 任务、actor 或 Ray 库中进行繁重计算,而不是直接在入口点脚本的顶层进行。无需额外配置即可完成此操作。

然而,如果你需要在入口点脚本中直接进行计算,并且希望为入口点脚本保留 CPU 和 GPU 资源,你可以向 submit_job 指定 entrypoint_num_cpusentrypoint_num_gpusentrypoint_memoryentrypoint_resources 参数。这些参数的功能与 指定任务或 Actor 资源需求 中描述的 @ray.remote() 装饰器的 num_cpusnum_gpusresources_memory 参数相同。

如果指定了这些参数中的任何一个,入口点脚本将调度到具有至少指定资源的节点上,而不是默认的 head 节点。例如,以下代码会将入口点脚本调度到至少具有 1 个 GPU 的节点上:

job_id = client.submit_job(
    entrypoint="python script.py",
    runtime_env={
        "working_dir": "./",
    }
    # Reserve 1 GPU for the entrypoint script
    entrypoint_num_gpus=1
)

这些相同的参数也可以在 Jobs CLI 中作为 ray job submit 命令的选项 --entrypoint-num-cpus--entrypoint-num-gpus--entrypoint-memory--entrypoint-resources 使用;参见Ray Job Submission CLI 参考

如果未指定 num_gpus,GPU 仍可用于入口点脚本,但 Ray 不会提供可见设备的隔离。确切地说,环境变量 CUDA_VISIBLE_DEVICES 不会在入口点脚本中设置;它只会设置在那些在 @ray.remote() 装饰器中指定了 num_gpus 的任务和 actor 内部。

注意

entrypoint_num_cpusentrypoint_num_gpusentrypoint-memoryentrypoint_resources 指定的资源与作业内部为任务和 actor 指定的任何资源是分开的。

例如,如果你指定了 entrypoint_num_gpus=1,那么入口点脚本将调度到至少有 1 个 GPU 的节点上,但如果你的脚本中还包含一个使用 @ray.remote(num_gpus=1) 定义的 Ray 任务,那么该任务将被调度使用不同的 GPU(如果节点至少有 2 个 GPU,则在同一节点上;否则在不同的节点上)。

注意

指定任务或 Actor 资源需求中描述的 @ray.remote()num_cpusnum_gpusresources_memory 参数一样,这些参数仅指用于调度目的的逻辑资源。实际的 CPU 和 GPU 利用率不受 Ray 控制或限制。

注意

默认情况下,为入口点脚本保留 0 个 CPU 和 0 个 GPU。

客户端配置#

附加的客户端连接选项,例如自定义 HTTP 头和 cookies,可以传递给 JobSubmissionClient 类。完整的选项列表可以在API 参考中找到。

TLS 验证#

默认情况下,任何 HTTPS 客户端连接将使用底层 requestsaiohttp 库找到的系统证书进行验证。verify 参数可以设置来覆盖此行为。例如:

client = JobSubmissionClient("https://<job-server-url>", verify="/path/to/cert.pem")

将使用在 /path/to/cert.pem 找到的证书来验证作业服务器的证书。通过将 verify 参数设置为 False 可以禁用证书验证。