Python SDK 概述#
Ray Jobs Python SDK 是以编程方式提交作业的推荐方式。跳转到 API 参考,或继续阅读以获取快速概述。
设置#
Ray Jobs 可在 1.9+ 版本中使用,需要完整安装 Ray。您可以通过运行以下命令来完成此操作:
pip install "ray[default]"
有关安装 Ray 的更多详细信息,请参阅 安装指南。
要运行 Ray 作业,我们还需要能够向 Ray 集群发送 HTTP 请求。为了方便起见,本指南将假设您正在使用本地 Ray 集群,我们可以通过运行以下命令来启动它:
ray start --head
# ...
# 2022-08-10 09:54:57,664 INFO services.py:1476 -- View the Ray dashboard at http://127.0.0.1:8265
# ...
这将会在我们的本地机器上创建一个 Ray Head 节点,供我们用于开发目的。请注意启动或连接到 Ray 集群时打印的 Ray Dashboard URL;我们稍后将使用此 URL 来提交 Ray 作业。如果使用远程集群,请参阅 使用远程集群 以获取端口转发的技巧。有关生产部署场景的更多详细信息,请查看在 VM 和 Kubernetes 上部署 Ray 的指南。
提交 Ray 作业#
让我们从一个可以在本地运行的示例脚本开始。以下脚本使用 Ray API 来提交一个任务并打印其返回值:
# script.py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
SDK 调用通过 JobSubmissionClient 对象进行。要初始化客户端,请提供 Ray 集群 Head 节点地址以及 Ray Dashboard 使用的端口(默认情况下为 8265)。在此示例中,我们将使用本地 Ray 集群,但相同的示例也适用于远程 Ray 集群地址;有关设置端口转发的详细信息,请参阅 使用远程集群。
from ray.job_submission import JobSubmissionClient
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address or set up port forwarding.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python script.py",
# Path to the local directory that contains the script.py file
runtime_env={"working_dir": "./"}
)
print(job_id)
提示
默认情况下,Ray 作业服务器将生成一个新的 job_id 并返回它,但您也可以先选择一个唯一的 job_id 字符串,然后将其传递给 submit_job。在这种情况下,作业将以您给定的 ID 执行,如果同一个 Ray 集群提交了相同的 job_id 多次,则会抛出错误。
由于作业提交是异步的,因此上述调用将立即返回,输出类似如下:
raysubmit_g8tDzJ6GqrCy7pd6
现在我们可以编写一个简单的轮询循环,该循环会检查作业状态,直到它达到终止状态(即 JobStatus.SUCCEEDED、JobStatus.STOPPED 或 JobStatus.FAILED)。我们还可以通过调用 client.get_job_logs 来获取作业的输出。
from ray.job_submission import JobSubmissionClient, JobStatus
import time
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python script.py",
# Path to the local directory that contains the script.py file
runtime_env={"working_dir": "./"}
)
print(job_id)
def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
start = time.time()
while time.time() - start <= timeout_seconds:
status = client.get_job_status(job_id)
print(f"status: {status}")
if status in status_to_wait_for:
break
time.sleep(1)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)
输出应类似于:
raysubmit_pBwfn5jqRE1E7Wmc
status: PENDING
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
2022-08-22 15:05:55,652 INFO worker.py:1203 -- Using address 127.0.0.1:6379 set in the environment variable RAY_ADDRESS
2022-08-22 15:05:55,652 INFO worker.py:1312 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2022-08-22 15:05:55,660 INFO worker.py:1487 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265.
hello world
与长时间运行的作业交互#
除了获取作业的当前状态和输出之外,用户还可以在作业完成执行之前停止已提交的作业。
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python -c 'import time; print(\"Sleeping...\"); time.sleep(60)'"
)
wait_until_status(job_id, {JobStatus.RUNNING})
print(f'Stopping job {job_id}')
client.stop_job(job_id)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)
输出应类似于:
status: PENDING
status: PENDING
status: RUNNING
Stopping job raysubmit_VYCZZ2BQb4tfeCjq
status: STOPPED
Sleeping...
要获取有关所有作业的信息,请调用 client.list_jobs()。这将返回一个 Dict[str, JobInfo] 对象,该对象将作业 ID 映射到其信息。
作业信息(状态和相关元数据)会无限期地存储在集群上。要删除这些信息,您可以调用 client.delete_job(job_id) 来删除任何已处于终止状态的作业。有关更多详细信息,请参阅 SDK API 参考。
依赖管理#
与 Jobs CLI 类似,我们还可以通过使用 Ray 运行时环境 来打包应用程序的依赖项。使用 Python SDK,语法如下:
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python script.py",
# Runtime environment for the job, specifying a working directory and pip package
runtime_env={
"working_dir": "./",
"pip": ["requests==2.26.0"]
}
)
提示
除了本地目录(此示例中为 "./")之外,您还可以为作业的工作目录指定远程 URI,例如 S3 存储桶或 Git 存储库。有关详细信息,请参阅 远程 URI。
有关完整详细信息,请参阅 API 参考。
指定 CPU 和 GPU 资源#
默认情况下,作业入口点脚本始终在 Head 节点上运行。我们建议在 Ray 任务、Actor 或 Ray 库中进行繁重的计算,而不是直接在入口点脚本的顶层进行。执行此操作无需额外配置。
但是,如果您需要在入口点脚本中直接进行计算,并希望为入口点脚本预留 CPU 和 GPU 资源,您可以向 submit_job 指定 entrypoint_num_cpus、entrypoint_num_gpus、entrypoint_memory 和 entrypoint_resources 参数。这些参数的功能与 指定任务或 Actor 的资源需求 中描述的 @ray.remote() 装饰器的 num_cpus、num_gpus、resources 和 _memory 参数的功能相同。
如果指定了这些参数中的任何一个,入口点脚本将被调度到一个具有至少指定资源的节点上,而不是 Head 节点(默认)。例如,以下代码会将入口点脚本调度到一个至少具有 1 个 GPU 的节点上:
job_id = client.submit_job(
entrypoint="python script.py",
runtime_env={
"working_dir": "./",
}
# Reserve 1 GPU for the entrypoint script
entrypoint_num_gpus=1
)
这些相同的参数也作为选项 --entrypoint-num-cpus、--entrypoint-num-gpus、--entrypoint-memory 和 --entrypoint-resources 提供给 ray job submit(在 Jobs CLI 中);有关详细信息,请参阅 Ray 作业提交 CLI 参考。
如果未指定 num_gpus,GPU 仍然可用于入口点脚本,但 Ray 不会提供可见设备方面的隔离。精确地说,CUDA_VISIBLE_DEVICES 环境变量不会在入口点脚本中设置;它仅在具有 num_gpus 指定在 @ray.remote() 装饰器中的任务和 Actor 中设置。
注意
entrypoint_num_cpus、entrypoint_num_gpus、entrypoint-memory 和 entrypoint_resources 指定的资源与作业中为任务和 Actor 指定的任何资源是分开的。
例如,如果您指定 entrypoint_num_gpus=1,则入口点脚本将被调度到一个至少具有 1 个 GPU 的节点上,但如果您的脚本还包含一个定义为 @ray.remote(num_gpus=1) 的 Ray 任务,则该任务将被调度使用不同的 GPU(如果节点至少有 2 个 GPU,则在同一节点上;否则在不同节点上)。
注意
与 指定任务或 Actor 的资源需求 中描述的 @ray.remote() 的 num_cpus、num_gpus、resources 和 _memory 参数一样,这些参数仅指用于调度的逻辑资源。实际的 CPU 和 GPU 利用率不受 Ray 控制或限制。
注意
默认情况下,为入口点脚本预留 0 个 CPU 和 0 个 GPU。
客户端配置#
其他客户端连接选项,例如自定义 HTTP 标头和 cookie,可以传递给 JobSubmissionClient 类。选项的完整列表可以在 API 参考 中找到。
TLS 验证#
默认情况下,任何 HTTPS 客户端连接都将使用底层 requests 和 aiohttp 库找到的系统证书进行验证。verify 参数可以设置为覆盖此行为。例如:
client = JobSubmissionClient("https://<job-server-url>", verify="/path/to/cert.pem")
将使用位于 /path/to/cert.pem 的证书来验证作业服务器的证书。通过将 verify 参数设置为 False 可以禁用证书验证。