在 Spark Standalone 集群上部署#
本文档概述了在 Spark Standalone 集群 上运行 Ray 集群的几个高级步骤。
运行一个基本示例#
这是一个 Spark 应用程序示例代码,它会在 Spark 上启动 Ray 集群,然后执行 Ray 应用程序代码,最后关闭启动的 Ray 集群。
1) 创建一个包含 Spark 应用程序代码的 Python 文件。假设 Python 文件名为 ‘ray-on-spark-example1.py’。
from pyspark.sql import SparkSession
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Ray on spark example 1") \
.config("spark.task.cpus", "4") \
.getOrCreate()
# Set up a ray cluster on this spark application, it creates a background
# spark job that each spark task launches one ray worker node.
# ray head node is launched in spark application driver side.
# Resources (CPU / GPU / memory) allocated to each ray worker node is equal
# to resources allocated to the corresponding spark task.
setup_ray_cluster(max_worker_nodes=MAX_NUM_WORKER_NODES)
# You can any ray application code here, the ray application will be executed
# on the ray cluster setup above.
# You don't need to set address for `ray.init`,
# it will connect to the cluster created above automatically.
ray.init()
...
# Terminate ray cluster explicitly.
# If you don't call it, when spark application is terminated, the ray cluster
# will also be terminated.
shutdown_ray_cluster()
将上述 Spark 应用程序提交到 Spark Standalone 集群。
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/ray-on-spark-example1.py
在 Spark 集群上创建长期运行的 Ray 集群#
这是一个 Spark 应用程序示例代码,它会在 Spark 上启动一个长期运行的 Ray 集群。创建的 Ray 集群可以被远程 Python 进程访问。
1) 创建一个包含 Spark 应用程序代码的 Python 文件。假设 Python 文件名为 ‘long-running-ray-cluster-on-spark.py’。
from pyspark.sql import SparkSession
import time
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("long running ray cluster on spark") \
.config("spark.task.cpus", "4") \
.getOrCreate()
cluster_address = setup_ray_cluster(
max_worker_nodes=MAX_NUM_WORKER_NODES
)
print("Ray cluster is set up, you can connect to this ray cluster "
f"via address ray://{cluster_address}")
# Sleep forever until the spark application being terminated,
# at that time, the ray cluster will also be terminated.
while True:
time.sleep(10)
将上述 Spark 应用程序提交到 Spark Standalone 集群。
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/long-running-ray-cluster-on-spark.py
Ray on Spark API#
- ray.util.spark.setup_ray_cluster(*, max_worker_nodes: int, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, ray_temp_root_dir: str | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0, **kwargs) Tuple[str, str][source]#
通过在 Spark 应用程序的驱动程序节点上启动 Ray 头节点来设置 Spark 集群上的 Ray 集群。创建头节点后,会创建一个后台 Spark 作业,该作业会生成一个
RayClusterOnSpark实例,其中包含将在 Spark 集群的 worker 节点上运行的 Ray 集群的配置。在 Ray 集群设置完成后,“RAY_ADDRESS”环境变量会被设置为集群地址,因此您可以调用ray.init()而无需指定 Ray 集群地址即可连接到集群。要关闭集群,可以调用ray.util.spark.shutdown_ray_cluster()。注意:如果活动的 Ray 集群尚未关闭,则无法创建新的 Ray 集群。- 参数:
max_worker_nodes – 此参数表示为 Ray 集群启动的最大 Ray worker 节点数。您可以将
max_worker_nodes指定为ray.util.spark.MAX_NUM_WORKER_NODES,它表示一个 Ray 集群配置,该配置将使用 Spark 应用程序配置的所有可用资源。要创建一个旨在专用于非扩展模式下的共享 Ray 集群的 Spark 应用程序,建议将此参数设置为ray.util.spark.MAX_NUM_WORKER_NODES。min_worker_nodes – Worker 节点的最少数量(默认为
None)。如果“max_worker_nodes”的值等于“min_worker_nodes”参数,或者“min_worker_nodes”参数值为 None,则禁用自动扩展,Ray 集群将以固定数量“max_worker_nodes”的 Ray worker 节点启动;否则,启用自动扩展。num_cpus_worker_node – 每个 Ray worker 节点可用的 CPU 数量。如果未提供,则如果支持 Spark 阶段调度,“num_cpus_head_node”的值等于每个 Spark worker 节点的 CPU 核心数,否则它将使用 Spark 应用程序配置“spark.task.cpus”。限制 只有 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。
num_cpus_head_node – Ray 头节点可用的 CPU 数量。如果未提供,如果是全局模式 Ray 集群,则使用 Spark 驱动程序节点的 CPU 核心数,否则使用 0。使用 0 表示需要 CPU 资源的任务不会被调度到 Ray 头节点。
num_gpus_worker_node – 每个 Ray worker 节点可用的 GPU 数量。如果未提供,则如果支持 Spark 阶段调度,“num_gpus_worker_node”的值等于每个 Spark worker 节点的 GPU 数量,否则它将使用 Spark 应用程序配置“spark.task.resource.gpu.amount”的向下取整值。此参数仅在配置了“gpu”资源的 Spark 集群上可用。限制 只有 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。
num_gpus_head_node – Ray 头节点可用的 GPU 数量。如果未提供,如果是全局模式 Ray 集群,则使用 Spark 驱动程序节点的 GPU 数量,否则使用 0。此参数仅在 Spark 驱动程序节点具有 GPU 的 Spark 集群上可用。
memory_worker_node – Optional[int]: 为 Ray worker 节点配置的堆内存。这基本上是在使用
ray start命令启动 Ray 节点时设置--memory选项。memory_head_node – Optional[int]: 为 Ray 头节点配置的堆内存。这基本上是在使用
ray start命令启动 Ray 节点时设置--memory选项。object_store_memory_worker_node – 可用于每个 Ray worker 节点的对象存储内存,但其上限为“dev_shm_available_size * 0.8 / num_tasks_per_spark_worker”。默认值等于“0.3 * spark_worker_physical_memory * 0.8 / num_tasks_per_spark_worker”。
object_store_memory_head_node – 可用于 Ray 头节点的对象存储内存,但其上限为“dev_shm_available_size * 0.8”。默认值等于“0.3 * spark_driver_physical_memory * 0.8”。
head_node_options – 一个字典,表示 Ray 头节点的额外选项。这些选项将传递给
ray start脚本。请注意,您需要将ray start选项的键从--foo-bar格式转换为foo_bar格式。对于标志选项(例如 ‘–disable-usage-stats’),您应将值设置为 None,例如{"disable_usage_stats": None}。注意:不支持短名称选项(例如 ‘-v’)。worker_node_options – 一个字典,表示 Ray worker 节点的额外选项。这些选项将传递给
ray start脚本。请注意,您需要将ray start选项的键从--foo-bar格式转换为foo_bar格式。对于标志选项(例如 ‘–disable-usage-stats’),您应将值设置为 None,例如{"disable_usage_stats": None}。注意:不支持短名称选项(例如 ‘-v’)。ray_temp_root_dir – 用于存储 Ray 临时数据的本地磁盘路径。创建的集群将在该路径下创建一个子目录“ray-{head_port}-{random_suffix}”。
strict_mode – 布尔标志,如果可用的 Spark 集群没有足够的资源来满足内存、CPU 和 GPU 的资源分配,则会快速失败 Ray 集群的初始化。如果设置为 True,并且请求的资源不足以满足建议的最低推荐功能,则会引发详细说明 Spark 集群配置设置不足的异常。如果覆盖为
False,则会发出警告。collect_log_to_path – 如果指定,则在 Ray 头/worker 节点终止后,将它们的日志收集到指定路径。在 Databricks Runtime 上,我们建议您指定一个以 ‘/dbfs/’ 开头的本地路径,因为该路径挂载了集中式存储设备,并且数据在 Databricks Spark 集群终止后仍然保留。
autoscale_upscaling_speed – 如果启用了自动扩展,则表示允许挂起的节点数量是当前节点数量的倍数。值越高,扩展速度越快。例如,如果设置为 1.0,集群的大小在任何时候最多可以增长 100%,因此如果集群当前有 20 个节点,最多允许 20 个挂起的启动。最小挂起启动数量为 5,与此设置无关。默认值为 1.0,最小值为 1.0。
autoscale_idle_timeout_minutes – 如果启用了自动扩展,则表示在空闲 worker 节点被自动扩展程序移除之前需要经过的分钟数。值越小,缩减速度越快。当 worker 节点不持有任何活动任务、actor 或引用对象(在内存中或已溢出到磁盘)时,它们被视为空闲。此参数不影响头节点。默认值为 1.0,最小值为 0。
- 返回:
返回一个元组 (address, remote_connection_address)。“address”的格式为“<ray_head_node_ip>:<port>”。“remote_connection_address”的格式为“ray://<ray_head_node_ip>:<ray-client-server-port>”。如果您的客户端运行在也本地托管 Ray 集群节点的机器上,您可以通过
ray.init(address)连接到 Ray 集群;否则,您可以通过ray.init(remote_connection_address)连接到 Ray 集群。
- ray.util.spark.setup_global_ray_cluster(*, max_worker_nodes: int, is_blocking: bool = True, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0)[source]#
设置全局模式集群。全局 Ray on Spark 集群意味着:- 您一次只能创建一个活动的全局 Ray on Spark 集群。在 Databricks 集群上,全局 Ray 集群可供所有用户使用;- 相反,非全局 Ray 集群只能由当前笔记本用户使用;- 它会持续运行,不会自动关闭;- 在 Databricks 笔记本上,您可以通过调用
ray.init()而无需指定其地址来连接到全局集群,它会自动发现已启动的全局集群。对于全局模式,不支持
ray_temp_root_dir参数。全局模式 Ray 集群始终使用默认的 Ray 临时目录路径。setup_ray_clusterAPI 的所有参数都相同,除了:- 不支持ray_temp_root_dir参数。全局模式 Ray 集群始终使用默认的 Ray 临时目录路径。- 添加了一个新参数“is_blocking”(默认为True)。如果“is_blocking”为 True,则调用将保持阻塞状态直到被中断。一旦调用被中断,全局 Ray on Spark 集群将被关闭,并且setup_global_ray_cluster调用终止。如果“is_blocking”为 False,则 Ray 集群设置完成后立即返回。