在 Spark Standalone 集群上部署#
本文档描述了在 Spark Standalone 集群上运行 Ray 集群的几个高级步骤。
运行一个基本示例#
这是一个 Spark 应用程序示例代码,它在 Spark 上启动 Ray 集群,然后执行 Ray 应用程序代码,最后关闭已启动的 Ray 集群。
1) 创建一个包含 Spark 应用程序代码的 Python 文件,假设文件名为 ‘ray-on-spark-example1.py’。
from pyspark.sql import SparkSession
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Ray on spark example 1") \
.config("spark.task.cpus", "4") \
.getOrCreate()
# Set up a ray cluster on this spark application, it creates a background
# spark job that each spark task launches one ray worker node.
# ray head node is launched in spark application driver side.
# Resources (CPU / GPU / memory) allocated to each ray worker node is equal
# to resources allocated to the corresponding spark task.
setup_ray_cluster(max_worker_nodes=MAX_NUM_WORKER_NODES)
# You can any ray application code here, the ray application will be executed
# on the ray cluster setup above.
# You don't need to set address for `ray.init`,
# it will connect to the cluster created above automatically.
ray.init()
...
# Terminate ray cluster explicitly.
# If you don't call it, when spark application is terminated, the ray cluster
# will also be terminated.
shutdown_ray_cluster()
将上面的 Spark 应用程序提交到 Spark Standalone 集群。
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/ray-on-spark-example1.py
在 Spark 集群上创建长时间运行的 Ray 集群#
这是一个 Spark 应用程序示例代码,它在 Spark 上启动一个长时间运行的 Ray 集群。创建的 Ray 集群可以由远程 Python 进程访问。
1) 创建一个包含 Spark 应用程序代码的 Python 文件,假设文件名为 ‘long-running-ray-cluster-on-spark.py’。
from pyspark.sql import SparkSession
import time
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("long running ray cluster on spark") \
.config("spark.task.cpus", "4") \
.getOrCreate()
cluster_address = setup_ray_cluster(
max_worker_nodes=MAX_NUM_WORKER_NODES
)
print("Ray cluster is set up, you can connect to this ray cluster "
f"via address ray://{cluster_address}")
# Sleep forever until the spark application being terminated,
# at that time, the ray cluster will also be terminated.
while True:
time.sleep(10)
将上面的 Spark 应用程序提交到 Spark Standalone 集群。
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/long-running-ray-cluster-on-spark.py
Ray on Spark API#
- ray.util.spark.setup_ray_cluster(*, max_worker_nodes: int, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, ray_temp_root_dir: str | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0, **kwargs) Tuple[str, str] [source]#
通过在 Spark 应用程序的驱动节点上启动 Ray 头节点,在 Spark 集群上设置 Ray 集群。创建头节点后,会创建一个后台 Spark 作业,生成一个
RayClusterOnSpark
实例,其中包含将在 Spark 集群工作节点上运行的 Ray 集群配置。Ray 集群设置完成后,会设置“RAY_ADDRESS”环境变量为集群地址,这样您就可以直接调用ray.init()
连接到集群而无需指定 Ray 集群地址。要关闭集群,您可以调用ray.util.spark.shutdown_ray_cluster()
。注意:如果活跃的 Ray 集群尚未关闭,您无法创建新的 Ray 集群。- 参数:
max_worker_nodes – 此参数表示为 Ray 集群启动的最大 Ray 工作节点数。您可以将
max_worker_nodes
指定为ray.util.spark.MAX_NUM_WORKER_NODES
,这表示一种 Ray 集群配置,它将使用为 Spark 应用程序配置的所有可用资源。要创建旨在在非扩展模式下独占运行共享 Ray 集群的 Spark 应用程序,建议将此参数设置为ray.util.spark.MAX_NUM_WORKER_NODES
。min_worker_nodes – 最小工作节点数(默认
None
)。如果“max_worker_nodes”值等于“min_worker_nodes”参数,或者“min_worker_nodes”参数值为 None,则禁用自动扩缩容,Ray 集群将启动固定数量(“max_worker_nodes”)的 Ray 工作节点,否则启用自动扩缩容。num_cpus_worker_node – 每个 Ray 工作节点可用的 CPU 数量。如果未提供,并且支持 Spark 阶段调度,则 ‘num_cpus_head_node’ 的值等于每个 Spark 工作节点的 CPU 核心数,否则使用 Spark 应用程序配置 ‘spark.task.cpus’。限制 仅 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。
num_cpus_head_node – Ray 头节点可用的 CPU 数量。如果未提供,并且是全局模式的 Ray 集群,则使用 Spark 驱动节点中的 CPU 核心数,否则使用 0。使用 0 意味着需要 CPU 资源的任务不会调度到 Ray 头节点。
num_gpus_worker_node – 每个 Ray 工作节点可用的 GPU 数量。如果未提供,并且支持 Spark 阶段调度,则 ‘num_gpus_worker_node’ 的值等于每个 Spark 工作节点的 GPU 数量,否则使用 Spark 应用程序配置 ‘spark.task.resource.gpu.amount’ 的向下取整值。此参数仅在配置了 ‘gpu’ 资源的 Spark 集群上可用。限制 仅 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。
num_gpus_head_node – Ray 头节点可用的 GPU 数量。如果未提供,并且是全局模式的 Ray 集群,则使用 Spark 驱动节点中的 GPU 数量,否则使用 0。此参数仅在 Spark 驱动节点具有 GPU 的 Spark 集群上可用。
memory_worker_node – Optional[int]:为 Ray 工作节点配置的堆内存。这基本上相当于使用
ray start
命令启动 Ray 节点时设置--memory
选项。memory_head_node – Optional[int]:为 Ray 头节点配置的堆内存。这基本上相当于使用
ray start
命令启动 Ray 节点时设置--memory
选项。object_store_memory_worker_node – 每个 Ray 工作节点可用的对象存储内存,但受“dev_shm_available_size * 0.8 / num_tasks_per_spark_worker”的限制。默认值等于“0.3 * spark_worker_physical_memory * 0.8 / num_tasks_per_spark_worker”。
object_store_memory_head_node – Ray 头节点可用的对象存储内存,但受“dev_shm_available_size * 0.8”的限制。默认值等于“0.3 * spark_driver_physical_memory * 0.8”。
head_node_options – 一个字典,表示 Ray 头节点的额外选项,这些选项将传递给
ray start
脚本。请注意,您需要将ray start
选项键从--foo-bar
格式转换为foo_bar
格式。对于标志选项(例如 ‘–disable-usage-stats’),您应该在选项字典中将值设置为 None,例如{"disable_usage_stats": None}
。注意:不支持短名称选项(例如 ‘-v’)。worker_node_options – 一个字典,表示 Ray 工作节点的额外选项,这些选项将传递给
ray start
脚本。请注意,您需要将ray start
选项键从--foo-bar
格式转换为foo_bar
格式。对于标志选项(例如 ‘–disable-usage-stats’),您应该在选项字典中将值设置为 None,例如{"disable_usage_stats": None}
。注意:不支持短名称选项(例如 ‘-v’)。ray_temp_root_dir – 用于存储 Ray 临时数据的本地磁盘路径。创建的集群将在此路径下创建一个子目录“ray-{head_port}-{random_suffix}”。
strict_mode – 布尔标志,如果可用的 Spark 集群没有足够的资源来满足内存、CPU 和 GPU 的资源分配,则会快速失败 Ray 集群的初始化。当设置为 true 时,如果请求的资源不足以满足建议的最低功能需求,则会引发异常,详细说明 Spark 集群配置设置不足之处。如果覆盖为
False
,则会引发警告。collect_log_to_path – 如果指定,在 Ray 头节点/工作节点终止后,将其日志收集到指定的路径。在 Databricks Runtime 上,建议您指定以 ‘/dbfs/’ 开头的本地路径,因为该路径挂载到集中式存储设备,并且存储的数据在 Databricks Spark 集群终止后仍然存在。
autoscale_upscaling_speed – 如果启用自动扩缩容,此参数表示允许挂起的节点数量,是当前节点数量的倍数。值越高,扩缩容越激进。例如,如果设置为 1.0,则集群在任何时候最多可增长其当前大小的 100%。如果集群当前有 20 个节点,则最多允许 20 个挂起的启动。无论此设置如何,最小挂起启动数为 5。默认值为 1.0,最小值为 1.0
autoscale_idle_timeout_minutes – 如果启用自动扩缩容,此参数表示自动扩缩容器删除空闲工作节点之前需要经过的分钟数。值越小,缩减越激进。当工作节点上没有活跃任务、Actor 或引用的对象(无论是内存中还是溢出到磁盘)时,它们被认为是空闲的。此参数不影响头节点。默认值为 1.0,最小值为 0
- 返回:
返回一个元组 (address, remote_connection_address),其中“address”格式为“<Ray 头节点 IP>:<端口>”,“remote_connection_address”格式为“ray://<Ray 头节点 IP>:<Ray 客户端服务器端口>”。如果您的客户端运行在也托管 Ray 集群节点的机器上,您可以通过
ray.init(address)
连接到 Ray 集群;否则,您可以通过ray.init(remote_connection_address)
连接到 Ray 集群。
- ray.util.spark.setup_global_ray_cluster(*, max_worker_nodes: int, is_blocking: bool = True, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0)[source]#
设置全局模式集群。全局 Ray on Spark 集群意味着:- 您一次只能创建一个活跃的全局 Ray on Spark 集群。在 Databricks 集群上,全局 Ray 集群可以被所有用户使用,- 相反,非全局 Ray 集群只能被当前笔记本用户使用。- 它会持续运行而不会自动关闭。- 在 Databricks 笔记本中,您无需指定地址即可通过调用
ray.init()
连接到全局集群,如果它正在运行,它会自动发现全局集群。对于全局模式,不支持
ray_temp_root_dir
参数。全局模式的 Ray 集群始终使用默认的 Ray 临时目录路径。所有参数与
setup_ray_cluster
API 相同,但以下情况除外:- 不支持ray_temp_root_dir
参数。全局模式的 Ray 集群始终使用默认的 Ray 临时目录路径。- 添加了一个新参数“is_blocking”(默认为True
)。如果“is_blocking”为 True,则调用会一直阻塞,直到被中断。一旦调用被中断,全局 Ray on Spark 集群就会关闭,并且setup_global_ray_cluster
调用终止。如果“is_blocking”为 False,则 Ray 集群设置完成后立即返回。