Python SDK 概览#
Ray Jobs Python SDK 是以编程方式提交作业的推荐方式。跳转到API 参考,或继续阅读快速概览。
设置#
Ray Jobs 在 1.9+ 版本中可用,并需要完整安装 Ray。你可以通过运行以下命令来完成安装:
pip install "ray[default]"
有关安装 Ray 的更多详细信息,请参阅安装指南。
要运行 Ray Job,我们还需要能够向 Ray 集群发送 HTTP 请求。为方便起见,本指南假设你正在使用本地 Ray 集群,我们可以通过运行以下命令启动它:
ray start --head
# ...
# 2022-08-10 09:54:57,664 INFO services.py:1476 -- View the Ray dashboard at http://127.0.0.1:8265
# ...
这将会在我们的本地机器上创建一个 Ray head 节点,我们可以将其用于开发目的。请注意启动或连接 Ray 集群时打印出的 Ray Dashboard URL;我们稍后会使用此 URL 提交 Ray Job。如果使用远程集群,请参阅使用远程集群获取有关端口转发的提示。有关生产部署场景的更多详细信息,请查阅在虚拟机和Kubernetes上部署 Ray 的指南。
提交 Ray Job#
让我们从一个可以在本地运行的示例脚本开始。以下脚本使用 Ray API 提交一个任务并打印其返回值:
# script.py
import ray
@ray.remote
def hello_world():
return "hello world"
ray.init()
print(ray.get(hello_world.remote()))
SDK 调用通过一个 JobSubmissionClient
对象进行。要初始化客户端,请提供 Ray 集群 head 节点的地址以及 Ray Dashboard 使用的端口(默认为 8265
)。对于此示例,我们将使用本地 Ray 集群,但相同的示例也适用于远程 Ray 集群地址;有关设置端口转发的详细信息,请参阅使用远程集群。
from ray.job_submission import JobSubmissionClient
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address or set up port forwarding.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python script.py",
# Path to the local directory that contains the script.py file
runtime_env={"working_dir": "./"}
)
print(job_id)
提示
默认情况下,Ray 作业服务器将生成并返回一个新的 job_id
,但你也可以选择一个唯一的 job_id
字符串并将其传递给 submit_job
。在这种情况下,作业将使用你给定的 ID 执行,并且如果针对同一个 Ray 集群多次提交相同的 job_id
,将抛出错误。
由于作业提交是异步的,上面的调用将立即返回,并输出如下内容:
raysubmit_g8tDzJ6GqrCy7pd6
现在我们可以编写一个简单的轮询循环,检查作业状态直到其达到终端状态(即 JobStatus.SUCCEEDED
、JobStatus.STOPPED
或 JobStatus.FAILED
)。我们还可以通过调用 client.get_job_logs
获取作业的输出。
from ray.job_submission import JobSubmissionClient, JobStatus
import time
# If using a remote cluster, replace 127.0.0.1 with the head node's IP address.
client = JobSubmissionClient("http://127.0.0.1:8265")
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python script.py",
# Path to the local directory that contains the script.py file
runtime_env={"working_dir": "./"}
)
print(job_id)
def wait_until_status(job_id, status_to_wait_for, timeout_seconds=5):
start = time.time()
while time.time() - start <= timeout_seconds:
status = client.get_job_status(job_id)
print(f"status: {status}")
if status in status_to_wait_for:
break
time.sleep(1)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)
输出应如下所示:
raysubmit_pBwfn5jqRE1E7Wmc
status: PENDING
status: PENDING
status: RUNNING
status: RUNNING
status: RUNNING
2022-08-22 15:05:55,652 INFO worker.py:1203 -- Using address 127.0.0.1:6379 set in the environment variable RAY_ADDRESS
2022-08-22 15:05:55,652 INFO worker.py:1312 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2022-08-22 15:05:55,660 INFO worker.py:1487 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265.
hello world
与长时间运行的作业交互#
除了获取作业的当前状态和输出外,提交的作业还可以在执行完成之前由用户停止。
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python -c 'import time; print(\"Sleeping...\"); time.sleep(60)'"
)
wait_until_status(job_id, {JobStatus.RUNNING})
print(f'Stopping job {job_id}')
client.stop_job(job_id)
wait_until_status(job_id, {JobStatus.SUCCEEDED, JobStatus.STOPPED, JobStatus.FAILED})
logs = client.get_job_logs(job_id)
print(logs)
输出应如下所示:
status: PENDING
status: PENDING
status: RUNNING
Stopping job raysubmit_VYCZZ2BQb4tfeCjq
status: STOPPED
Sleeping...
要获取所有作业的信息,请调用 client.list_jobs()
。这将返回一个 Dict[str, JobInfo]
对象,将作业 ID 映射到其信息。
作业信息(状态及相关元数据)无限期地存储在集群上。要删除此信息,你可以对任何已处于终端状态的作业调用 client.delete_job(job_id)
。有关更多详细信息,请参阅SDK API 参考。
依赖管理#
类似于Jobs CLI,我们也可以使用 Ray 运行时环境打包应用程序的依赖项。使用 Python SDK,语法如下所示:
job_id = client.submit_job(
# Entrypoint shell command to execute
entrypoint="python script.py",
# Runtime environment for the job, specifying a working directory and pip package
runtime_env={
"working_dir": "./",
"pip": ["requests==2.26.0"]
}
)
提示
除了本地目录(本例中的 "./"
),你还可以为作业的工作目录指定远程 URI,例如 S3 存储桶或 Git 仓库。详细信息请参阅远程 URI。
更多详细信息,请参阅API 参考。
指定 CPU 和 GPU 资源#
默认情况下,作业入口点脚本始终在 head 节点上运行。我们建议在 Ray 任务、actor 或 Ray 库中进行繁重计算,而不是直接在入口点脚本的顶层进行。无需额外配置即可完成此操作。
然而,如果你需要在入口点脚本中直接进行计算,并且希望为入口点脚本保留 CPU 和 GPU 资源,你可以向 submit_job
指定 entrypoint_num_cpus
、entrypoint_num_gpus
、entrypoint_memory
和 entrypoint_resources
参数。这些参数的功能与 指定任务或 Actor 资源需求 中描述的 @ray.remote()
装饰器的 num_cpus
、num_gpus
、resources
和 _memory
参数相同。
如果指定了这些参数中的任何一个,入口点脚本将调度到具有至少指定资源的节点上,而不是默认的 head 节点。例如,以下代码会将入口点脚本调度到至少具有 1 个 GPU 的节点上:
job_id = client.submit_job(
entrypoint="python script.py",
runtime_env={
"working_dir": "./",
}
# Reserve 1 GPU for the entrypoint script
entrypoint_num_gpus=1
)
这些相同的参数也可以在 Jobs CLI 中作为 ray job submit
命令的选项 --entrypoint-num-cpus
、--entrypoint-num-gpus
、--entrypoint-memory
和 --entrypoint-resources
使用;参见Ray Job Submission CLI 参考。
如果未指定 num_gpus
,GPU 仍可用于入口点脚本,但 Ray 不会提供可见设备的隔离。确切地说,环境变量 CUDA_VISIBLE_DEVICES
不会在入口点脚本中设置;它只会设置在那些在 @ray.remote()
装饰器中指定了 num_gpus
的任务和 actor 内部。
注意
由 entrypoint_num_cpus
、entrypoint_num_gpus
、entrypoint-memory
和 entrypoint_resources
指定的资源与作业内部为任务和 actor 指定的任何资源是分开的。
例如,如果你指定了 entrypoint_num_gpus=1
,那么入口点脚本将调度到至少有 1 个 GPU 的节点上,但如果你的脚本中还包含一个使用 @ray.remote(num_gpus=1)
定义的 Ray 任务,那么该任务将被调度使用不同的 GPU(如果节点至少有 2 个 GPU,则在同一节点上;否则在不同的节点上)。
注意
与指定任务或 Actor 资源需求中描述的 @ray.remote()
的 num_cpus
、num_gpus
、resources
和 _memory
参数一样,这些参数仅指用于调度目的的逻辑资源。实际的 CPU 和 GPU 利用率不受 Ray 控制或限制。
注意
默认情况下,为入口点脚本保留 0 个 CPU 和 0 个 GPU。
客户端配置#
附加的客户端连接选项,例如自定义 HTTP 头和 cookies,可以传递给 JobSubmissionClient
类。完整的选项列表可以在API 参考中找到。
TLS 验证#
默认情况下,任何 HTTPS 客户端连接将使用底层 requests
和 aiohttp
库找到的系统证书进行验证。verify
参数可以设置来覆盖此行为。例如:
client = JobSubmissionClient("https://<job-server-url>", verify="/path/to/cert.pem")
将使用在 /path/to/cert.pem
找到的证书来验证作业服务器的证书。通过将 verify
参数设置为 False
可以禁用证书验证。