在 Spark Standalone 集群上部署#

本文档概述了在 Spark Standalone 集群 上运行 Ray 集群的几个高级步骤。

运行一个基本示例#

这是一个 Spark 应用程序示例代码,它会在 Spark 上启动 Ray 集群,然后执行 Ray 应用程序代码,最后关闭启动的 Ray 集群。

1) 创建一个包含 Spark 应用程序代码的 Python 文件。假设 Python 文件名为 ‘ray-on-spark-example1.py’。

from pyspark.sql import SparkSession
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Ray on spark example 1") \
        .config("spark.task.cpus", "4") \
        .getOrCreate()

    # Set up a ray cluster on this spark application, it creates a background
    # spark job that each spark task launches one ray worker node.
    # ray head node is launched in spark application driver side.
    # Resources (CPU / GPU / memory) allocated to each ray worker node is equal
    # to resources allocated to the corresponding spark task.
    setup_ray_cluster(max_worker_nodes=MAX_NUM_WORKER_NODES)

    # You can any ray application code here, the ray application will be executed
    # on the ray cluster setup above.
    # You don't need to set address for `ray.init`,
    # it will connect to the cluster created above automatically.
    ray.init()
    ...

    # Terminate ray cluster explicitly.
    # If you don't call it, when spark application is terminated, the ray cluster
    # will also be terminated.
    shutdown_ray_cluster()
  1. 将上述 Spark 应用程序提交到 Spark Standalone 集群。

#!/bin/bash
spark-submit \
  --master spark://{spark_master_IP}:{spark_master_port} \
  path/to/ray-on-spark-example1.py

在 Spark 集群上创建长期运行的 Ray 集群#

这是一个 Spark 应用程序示例代码,它会在 Spark 上启动一个长期运行的 Ray 集群。创建的 Ray 集群可以被远程 Python 进程访问。

1) 创建一个包含 Spark 应用程序代码的 Python 文件。假设 Python 文件名为 ‘long-running-ray-cluster-on-spark.py’。

from pyspark.sql import SparkSession
import time
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("long running ray cluster on spark") \
        .config("spark.task.cpus", "4") \
        .getOrCreate()

    cluster_address = setup_ray_cluster(
        max_worker_nodes=MAX_NUM_WORKER_NODES
    )
    print("Ray cluster is set up, you can connect to this ray cluster "
          f"via address ray://{cluster_address}")

    # Sleep forever until the spark application being terminated,
    # at that time, the ray cluster will also be terminated.
    while True:
        time.sleep(10)
  1. 将上述 Spark 应用程序提交到 Spark Standalone 集群。

#!/bin/bash
spark-submit \
  --master spark://{spark_master_IP}:{spark_master_port} \
  path/to/long-running-ray-cluster-on-spark.py

Ray on Spark API#

ray.util.spark.setup_ray_cluster(*, max_worker_nodes: int, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, ray_temp_root_dir: str | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0, **kwargs) Tuple[str, str][source]#

通过在 Spark 应用程序的驱动程序节点上启动 Ray 头节点来设置 Spark 集群上的 Ray 集群。创建头节点后,会创建一个后台 Spark 作业,该作业会生成一个 RayClusterOnSpark 实例,其中包含将在 Spark 集群的 worker 节点上运行的 Ray 集群的配置。在 Ray 集群设置完成后,“RAY_ADDRESS”环境变量会被设置为集群地址,因此您可以调用 ray.init() 而无需指定 Ray 集群地址即可连接到集群。要关闭集群,可以调用 ray.util.spark.shutdown_ray_cluster()。注意:如果活动的 Ray 集群尚未关闭,则无法创建新的 Ray 集群。

参数:
  • max_worker_nodes – 此参数表示为 Ray 集群启动的最大 Ray worker 节点数。您可以将 max_worker_nodes 指定为 ray.util.spark.MAX_NUM_WORKER_NODES,它表示一个 Ray 集群配置,该配置将使用 Spark 应用程序配置的所有可用资源。要创建一个旨在专用于非扩展模式下的共享 Ray 集群的 Spark 应用程序,建议将此参数设置为 ray.util.spark.MAX_NUM_WORKER_NODES

  • min_worker_nodes – Worker 节点的最少数量(默认为 None)。如果“max_worker_nodes”的值等于“min_worker_nodes”参数,或者“min_worker_nodes”参数值为 None,则禁用自动扩展,Ray 集群将以固定数量“max_worker_nodes”的 Ray worker 节点启动;否则,启用自动扩展。

  • num_cpus_worker_node – 每个 Ray worker 节点可用的 CPU 数量。如果未提供,则如果支持 Spark 阶段调度,“num_cpus_head_node”的值等于每个 Spark worker 节点的 CPU 核心数,否则它将使用 Spark 应用程序配置“spark.task.cpus”。限制 只有 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。

  • num_cpus_head_node – Ray 头节点可用的 CPU 数量。如果未提供,如果是全局模式 Ray 集群,则使用 Spark 驱动程序节点的 CPU 核心数,否则使用 0。使用 0 表示需要 CPU 资源的任务不会被调度到 Ray 头节点。

  • num_gpus_worker_node – 每个 Ray worker 节点可用的 GPU 数量。如果未提供,则如果支持 Spark 阶段调度,“num_gpus_worker_node”的值等于每个 Spark worker 节点的 GPU 数量,否则它将使用 Spark 应用程序配置“spark.task.resource.gpu.amount”的向下取整值。此参数仅在配置了“gpu”资源的 Spark 集群上可用。限制 只有 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。

  • num_gpus_head_node – Ray 头节点可用的 GPU 数量。如果未提供,如果是全局模式 Ray 集群,则使用 Spark 驱动程序节点的 GPU 数量,否则使用 0。此参数仅在 Spark 驱动程序节点具有 GPU 的 Spark 集群上可用。

  • memory_worker_node – Optional[int]: 为 Ray worker 节点配置的堆内存。这基本上是在使用 ray start 命令启动 Ray 节点时设置 --memory 选项。

  • memory_head_node – Optional[int]: 为 Ray 头节点配置的堆内存。这基本上是在使用 ray start 命令启动 Ray 节点时设置 --memory 选项。

  • object_store_memory_worker_node – 可用于每个 Ray worker 节点的对象存储内存,但其上限为“dev_shm_available_size * 0.8 / num_tasks_per_spark_worker”。默认值等于“0.3 * spark_worker_physical_memory * 0.8 / num_tasks_per_spark_worker”。

  • object_store_memory_head_node – 可用于 Ray 头节点的对象存储内存,但其上限为“dev_shm_available_size * 0.8”。默认值等于“0.3 * spark_driver_physical_memory * 0.8”。

  • head_node_options – 一个字典,表示 Ray 头节点的额外选项。这些选项将传递给 ray start 脚本。请注意,您需要将 ray start 选项的键从 --foo-bar 格式转换为 foo_bar 格式。对于标志选项(例如 ‘–disable-usage-stats’),您应将值设置为 None,例如 {"disable_usage_stats": None}。注意:不支持短名称选项(例如 ‘-v’)。

  • worker_node_options – 一个字典,表示 Ray worker 节点的额外选项。这些选项将传递给 ray start 脚本。请注意,您需要将 ray start 选项的键从 --foo-bar 格式转换为 foo_bar 格式。对于标志选项(例如 ‘–disable-usage-stats’),您应将值设置为 None,例如 {"disable_usage_stats": None}。注意:不支持短名称选项(例如 ‘-v’)。

  • ray_temp_root_dir – 用于存储 Ray 临时数据的本地磁盘路径。创建的集群将在该路径下创建一个子目录“ray-{head_port}-{random_suffix}”。

  • strict_mode – 布尔标志,如果可用的 Spark 集群没有足够的资源来满足内存、CPU 和 GPU 的资源分配,则会快速失败 Ray 集群的初始化。如果设置为 True,并且请求的资源不足以满足建议的最低推荐功能,则会引发详细说明 Spark 集群配置设置不足的异常。如果覆盖为 False,则会发出警告。

  • collect_log_to_path – 如果指定,则在 Ray 头/worker 节点终止后,将它们的日志收集到指定路径。在 Databricks Runtime 上,我们建议您指定一个以 ‘/dbfs/’ 开头的本地路径,因为该路径挂载了集中式存储设备,并且数据在 Databricks Spark 集群终止后仍然保留。

  • autoscale_upscaling_speed – 如果启用了自动扩展,则表示允许挂起的节点数量是当前节点数量的倍数。值越高,扩展速度越快。例如,如果设置为 1.0,集群的大小在任何时候最多可以增长 100%,因此如果集群当前有 20 个节点,最多允许 20 个挂起的启动。最小挂起启动数量为 5,与此设置无关。默认值为 1.0,最小值为 1.0。

  • autoscale_idle_timeout_minutes – 如果启用了自动扩展,则表示在空闲 worker 节点被自动扩展程序移除之前需要经过的分钟数。值越小,缩减速度越快。当 worker 节点不持有任何活动任务、actor 或引用对象(在内存中或已溢出到磁盘)时,它们被视为空闲。此参数不影响头节点。默认值为 1.0,最小值为 0。

返回:

返回一个元组 (address, remote_connection_address)。“address”的格式为“<ray_head_node_ip>:<port>”。“remote_connection_address”的格式为“ray://<ray_head_node_ip>:<ray-client-server-port>”。如果您的客户端运行在也本地托管 Ray 集群节点的机器上,您可以通过 ray.init(address) 连接到 Ray 集群;否则,您可以通过 ray.init(remote_connection_address) 连接到 Ray 集群。

ray.util.spark.shutdown_ray_cluster() None[source]#

关闭活动的 Ray 集群。

ray.util.spark.setup_global_ray_cluster(*, max_worker_nodes: int, is_blocking: bool = True, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0)[source]#

设置全局模式集群。全局 Ray on Spark 集群意味着:- 您一次只能创建一个活动的全局 Ray on Spark 集群。在 Databricks 集群上,全局 Ray 集群可供所有用户使用;- 相反,非全局 Ray 集群只能由当前笔记本用户使用;- 它会持续运行,不会自动关闭;- 在 Databricks 笔记本上,您可以通过调用 ray.init() 而无需指定其地址来连接到全局集群,它会自动发现已启动的全局集群。

对于全局模式,不支持 ray_temp_root_dir 参数。全局模式 Ray 集群始终使用默认的 Ray 临时目录路径。

setup_ray_cluster API 的所有参数都相同,除了:- 不支持 ray_temp_root_dir 参数。全局模式 Ray 集群始终使用默认的 Ray 临时目录路径。- 添加了一个新参数“is_blocking”(默认为 True)。如果“is_blocking”为 True,则调用将保持阻塞状态直到被中断。一旦调用被中断,全局 Ray on Spark 集群将被关闭,并且 setup_global_ray_cluster 调用终止。如果“is_blocking”为 False,则 Ray 集群设置完成后立即返回。