Python SDK 概述#

Ray Jobs Python SDK 是以编程方式提交作业的推荐方式。跳转到 API 参考,或继续阅读以获取快速概述。

设置#

Ray Jobs 可在 1.9+ 版本中使用,需要完整安装 Ray。您可以通过运行以下命令来完成此操作:

pip install "ray[default]"

有关安装 Ray 的更多详细信息,请参阅 安装指南

要运行 Ray 作业,我们还需要能够向 Ray 集群发送 HTTP 请求。为了方便起见,本指南将假设您正在使用本地 Ray 集群,我们可以通过运行以下命令来启动它:

ray start --head
# ...
# 2022-08-10 09:54:57,664   INFO services.py:1476 -- View the Ray dashboard at http://127.0.0.1:8265
# ...

这将会在我们的本地机器上创建一个 Ray Head 节点,供我们用于开发目的。请注意启动或连接到 Ray 集群时打印的 Ray Dashboard URL;我们稍后将使用此 URL 来提交 Ray 作业。如果使用远程集群,请参阅 使用远程集群 以获取端口转发的技巧。有关生产部署场景的更多详细信息,请查看在 VMKubernetes 上部署 Ray 的指南。

提交 Ray 作业#

让我们从一个可以在本地运行的示例脚本开始。以下脚本使用 Ray API 来提交一个任务并打印其返回值:

# script.py
import ray

@ray.remote
def hello_world():
    return "hello world"

ray.init()
print(ray.get(hello_world.remote()))

SDK 调用通过 JobSubmissionClient 对象进行。要初始化客户端,请提供 Ray 集群 Head 节点地址以及 Ray Dashboard 使用的端口(默认情况下为 8265)。在此示例中,我们将使用本地 Ray 集群,但相同的示例也适用于远程 Ray 集群地址;有关设置端口转发的详细信息,请参阅 使用远程集群

from ray.job_submission import JobSubmissionClient

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address or set up port forwarding.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"working_dir": "./"}
)
print(job_id)

提示

默认情况下,Ray 作业服务器将生成一个新的 job_id 并返回它,但您也可以先选择一个唯一的 job_id 字符串,然后将其传递给 submit_job。在这种情况下,作业将以您给定的 ID 执行,如果同一个 Ray 集群提交了相同的 job_id 多次,则会抛出错误。

由于作业提交是异步的,因此上述调用将立即返回,输出类似如下:

raysubmit_g8tDzJ6GqrCy7pd6

现在我们可以编写一个简单的轮询循环,该循环会检查作业状态,直到它达到终止状态(即 JobStatus.SUCCEEDEDJobStatus.STOPPEDJobStatus.FAILED)。我们还可以通过调用 client.get_job_logs 来获取作业的输出。

from ray.job_submission import JobSubmissionClient, JobStatus
import time

# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Path to the local directory that contains the script.py file
    runtime_env={"working_dir": "./"}
)
print(job_id)

def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
    start = time.time()
    while time.time() - start <= timeout_seconds:
        status = client.get_job_status(job_id)
        print(f"status: {status}")
        if status in status_to_wait_for:
            break
        time.sleep(1)


wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

输出应类似于:

raysubmit_pBwfn5jqRE1E7Wmc
status: PENDING
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
2022-08-22 15:05:55,652 INFO worker.py:1203 -- Using address 127.0.0.1:6379 set in the environment variable RAY_ADDRESS
2022-08-22 15:05:55,652 INFO worker.py:1312 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2022-08-22 15:05:55,660 INFO worker.py:1487 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265.
hello world

与长时间运行的作业交互#

除了获取作业的当前状态和输出之外,用户还可以在作业完成执行之前停止已提交的作业。

job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python -c 'import time; print(\"Sleeping...\"); time.sleep(60)'"
)
wait_until_status(job_id, {JobStatus.RUNNING})
print(f'Stopping job {job_id}')
client.stop_job(job_id)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)

输出应类似于:

status: PENDING
status: PENDING
status: RUNNING
Stopping job raysubmit_VYCZZ2BQb4tfeCjq
status: STOPPED
Sleeping...

要获取有关所有作业的信息,请调用 client.list_jobs()。这将返回一个 Dict[str, JobInfo] 对象,该对象将作业 ID 映射到其信息。

作业信息(状态和相关元数据)会无限期地存储在集群上。要删除这些信息,您可以调用 client.delete_job(job_id) 来删除任何已处于终止状态的作业。有关更多详细信息,请参阅 SDK API 参考

依赖管理#

Jobs CLI 类似,我们还可以通过使用 Ray 运行时环境 来打包应用程序的依赖项。使用 Python SDK,语法如下:

job_id = client.submit_job(
    # Entrypoint shell command to execute
    entrypoint="python script.py",
    # Runtime environment for the job, specifying a working directory and pip package
    runtime_env={
        "working_dir": "./",
        "pip": ["requests==2.26.0"]
    }
)

提示

除了本地目录(此示例中为 "./")之外,您还可以为作业的工作目录指定远程 URI,例如 S3 存储桶或 Git 存储库。有关详细信息,请参阅 远程 URI

有关完整详细信息,请参阅 API 参考

指定 CPU 和 GPU 资源#

默认情况下,作业入口点脚本始终在 Head 节点上运行。我们建议在 Ray 任务、Actor 或 Ray 库中进行繁重的计算,而不是直接在入口点脚本的顶层进行。执行此操作无需额外配置。

但是,如果您需要在入口点脚本中直接进行计算,并希望为入口点脚本预留 CPU 和 GPU 资源,您可以向 submit_job 指定 entrypoint_num_cpusentrypoint_num_gpusentrypoint_memoryentrypoint_resources 参数。这些参数的功能与 指定任务或 Actor 的资源需求 中描述的 @ray.remote() 装饰器的 num_cpusnum_gpusresources_memory 参数的功能相同。

如果指定了这些参数中的任何一个,入口点脚本将被调度到一个具有至少指定资源的节点上,而不是 Head 节点(默认)。例如,以下代码会将入口点脚本调度到一个至少具有 1 个 GPU 的节点上:

job_id = client.submit_job(
    entrypoint="python script.py",
    runtime_env={
        "working_dir": "./",
    }
    # Reserve 1 GPU for the entrypoint script
    entrypoint_num_gpus=1
)

这些相同的参数也作为选项 --entrypoint-num-cpus--entrypoint-num-gpus--entrypoint-memory--entrypoint-resources 提供给 ray job submit(在 Jobs CLI 中);有关详细信息,请参阅 Ray 作业提交 CLI 参考

如果未指定 num_gpus,GPU 仍然可用于入口点脚本,但 Ray 不会提供可见设备方面的隔离。精确地说,CUDA_VISIBLE_DEVICES 环境变量不会在入口点脚本中设置;它仅在具有 num_gpus 指定在 @ray.remote() 装饰器中的任务和 Actor 中设置。

注意

entrypoint_num_cpusentrypoint_num_gpusentrypoint-memoryentrypoint_resources 指定的资源与作业中为任务和 Actor 指定的任何资源是分开的。

例如,如果您指定 entrypoint_num_gpus=1,则入口点脚本将被调度到一个至少具有 1 个 GPU 的节点上,但如果您的脚本还包含一个定义为 @ray.remote(num_gpus=1) 的 Ray 任务,则该任务将被调度使用不同的 GPU(如果节点至少有 2 个 GPU,则在同一节点上;否则在不同节点上)。

注意

指定任务或 Actor 的资源需求 中描述的 @ray.remote()num_cpusnum_gpusresources_memory 参数一样,这些参数仅指用于调度的逻辑资源。实际的 CPU 和 GPU 利用率不受 Ray 控制或限制。

注意

默认情况下,为入口点脚本预留 0 个 CPU 和 0 个 GPU。

客户端配置#

其他客户端连接选项,例如自定义 HTTP 标头和 cookie,可以传递给 JobSubmissionClient 类。选项的完整列表可以在 API 参考 中找到。

TLS 验证#

默认情况下,任何 HTTPS 客户端连接都将使用底层 requestsaiohttp 库找到的系统证书进行验证。verify 参数可以设置为覆盖此行为。例如:

client = JobSubmissionClient("https://<job-server-url>", verify="/path/to/cert.pem")

将使用位于 /path/to/cert.pem 的证书来验证作业服务器的证书。通过将 verify 参数设置为 False 可以禁用证书验证。