在 Spark Standalone 集群上部署#

本文档描述了在 Spark Standalone 集群上运行 Ray 集群的几个高级步骤。

运行一个基本示例#

这是一个 Spark 应用程序示例代码,它在 Spark 上启动 Ray 集群,然后执行 Ray 应用程序代码,最后关闭已启动的 Ray 集群。

1) 创建一个包含 Spark 应用程序代码的 Python 文件,假设文件名为 ‘ray-on-spark-example1.py’。

from pyspark.sql import SparkSession
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Ray on spark example 1") \
        .config("spark.task.cpus", "4") \
        .getOrCreate()

    # Set up a ray cluster on this spark application, it creates a background
    # spark job that each spark task launches one ray worker node.
    # ray head node is launched in spark application driver side.
    # Resources (CPU / GPU / memory) allocated to each ray worker node is equal
    # to resources allocated to the corresponding spark task.
    setup_ray_cluster(max_worker_nodes=MAX_NUM_WORKER_NODES)

    # You can any ray application code here, the ray application will be executed
    # on the ray cluster setup above.
    # You don't need to set address for `ray.init`,
    # it will connect to the cluster created above automatically.
    ray.init()
    ...

    # Terminate ray cluster explicitly.
    # If you don't call it, when spark application is terminated, the ray cluster
    # will also be terminated.
    shutdown_ray_cluster()
  1. 将上面的 Spark 应用程序提交到 Spark Standalone 集群。

#!/bin/bash
spark-submit \
  --master spark://{spark_master_IP}:{spark_master_port} \
  path/to/ray-on-spark-example1.py

在 Spark 集群上创建长时间运行的 Ray 集群#

这是一个 Spark 应用程序示例代码,它在 Spark 上启动一个长时间运行的 Ray 集群。创建的 Ray 集群可以由远程 Python 进程访问。

1) 创建一个包含 Spark 应用程序代码的 Python 文件,假设文件名为 ‘long-running-ray-cluster-on-spark.py’。

from pyspark.sql import SparkSession
import time
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("long running ray cluster on spark") \
        .config("spark.task.cpus", "4") \
        .getOrCreate()

    cluster_address = setup_ray_cluster(
        max_worker_nodes=MAX_NUM_WORKER_NODES
    )
    print("Ray cluster is set up, you can connect to this ray cluster "
          f"via address ray://{cluster_address}")

    # Sleep forever until the spark application being terminated,
    # at that time, the ray cluster will also be terminated.
    while True:
        time.sleep(10)
  1. 将上面的 Spark 应用程序提交到 Spark Standalone 集群。

#!/bin/bash
spark-submit \
  --master spark://{spark_master_IP}:{spark_master_port} \
  path/to/long-running-ray-cluster-on-spark.py

Ray on Spark API#

ray.util.spark.setup_ray_cluster(*, max_worker_nodes: int, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, ray_temp_root_dir: str | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0, **kwargs) Tuple[str, str][source]#

通过在 Spark 应用程序的驱动节点上启动 Ray 头节点,在 Spark 集群上设置 Ray 集群。创建头节点后,会创建一个后台 Spark 作业,生成一个 RayClusterOnSpark 实例,其中包含将在 Spark 集群工作节点上运行的 Ray 集群配置。Ray 集群设置完成后,会设置“RAY_ADDRESS”环境变量为集群地址,这样您就可以直接调用 ray.init() 连接到集群而无需指定 Ray 集群地址。要关闭集群,您可以调用 ray.util.spark.shutdown_ray_cluster()。注意:如果活跃的 Ray 集群尚未关闭,您无法创建新的 Ray 集群。

参数:
  • max_worker_nodes – 此参数表示为 Ray 集群启动的最大 Ray 工作节点数。您可以将 max_worker_nodes 指定为 ray.util.spark.MAX_NUM_WORKER_NODES,这表示一种 Ray 集群配置,它将使用为 Spark 应用程序配置的所有可用资源。要创建旨在在非扩展模式下独占运行共享 Ray 集群的 Spark 应用程序,建议将此参数设置为 ray.util.spark.MAX_NUM_WORKER_NODES

  • min_worker_nodes – 最小工作节点数(默认 None)。如果“max_worker_nodes”值等于“min_worker_nodes”参数,或者“min_worker_nodes”参数值为 None,则禁用自动扩缩容,Ray 集群将启动固定数量(“max_worker_nodes”)的 Ray 工作节点,否则启用自动扩缩容。

  • num_cpus_worker_node – 每个 Ray 工作节点可用的 CPU 数量。如果未提供,并且支持 Spark 阶段调度,则 ‘num_cpus_head_node’ 的值等于每个 Spark 工作节点的 CPU 核心数,否则使用 Spark 应用程序配置 ‘spark.task.cpus’。限制 仅 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。

  • num_cpus_head_node – Ray 头节点可用的 CPU 数量。如果未提供,并且是全局模式的 Ray 集群,则使用 Spark 驱动节点中的 CPU 核心数,否则使用 0。使用 0 意味着需要 CPU 资源的任务不会调度到 Ray 头节点。

  • num_gpus_worker_node – 每个 Ray 工作节点可用的 GPU 数量。如果未提供,并且支持 Spark 阶段调度,则 ‘num_gpus_worker_node’ 的值等于每个 Spark 工作节点的 GPU 数量,否则使用 Spark 应用程序配置 ‘spark.task.resource.gpu.amount’ 的向下取整值。此参数仅在配置了 ‘gpu’ 资源的 Spark 集群上可用。限制 仅 Spark 版本 >= 3.4 或 Databricks Runtime 12.x 支持设置此参数。

  • num_gpus_head_node – Ray 头节点可用的 GPU 数量。如果未提供,并且是全局模式的 Ray 集群,则使用 Spark 驱动节点中的 GPU 数量,否则使用 0。此参数仅在 Spark 驱动节点具有 GPU 的 Spark 集群上可用。

  • memory_worker_node – Optional[int]:为 Ray 工作节点配置的堆内存。这基本上相当于使用 ray start 命令启动 Ray 节点时设置 --memory 选项。

  • memory_head_node – Optional[int]:为 Ray 头节点配置的堆内存。这基本上相当于使用 ray start 命令启动 Ray 节点时设置 --memory 选项。

  • object_store_memory_worker_node – 每个 Ray 工作节点可用的对象存储内存,但受“dev_shm_available_size * 0.8 / num_tasks_per_spark_worker”的限制。默认值等于“0.3 * spark_worker_physical_memory * 0.8 / num_tasks_per_spark_worker”。

  • object_store_memory_head_node – Ray 头节点可用的对象存储内存,但受“dev_shm_available_size * 0.8”的限制。默认值等于“0.3 * spark_driver_physical_memory * 0.8”。

  • head_node_options – 一个字典,表示 Ray 头节点的额外选项,这些选项将传递给 ray start 脚本。请注意,您需要将 ray start 选项键从 --foo-bar 格式转换为 foo_bar 格式。对于标志选项(例如 ‘–disable-usage-stats’),您应该在选项字典中将值设置为 None,例如 {"disable_usage_stats": None}。注意:不支持短名称选项(例如 ‘-v’)。

  • worker_node_options – 一个字典,表示 Ray 工作节点的额外选项,这些选项将传递给 ray start 脚本。请注意,您需要将 ray start 选项键从 --foo-bar 格式转换为 foo_bar 格式。对于标志选项(例如 ‘–disable-usage-stats’),您应该在选项字典中将值设置为 None,例如 {"disable_usage_stats": None}。注意:不支持短名称选项(例如 ‘-v’)。

  • ray_temp_root_dir – 用于存储 Ray 临时数据的本地磁盘路径。创建的集群将在此路径下创建一个子目录“ray-{head_port}-{random_suffix}”。

  • strict_mode – 布尔标志,如果可用的 Spark 集群没有足够的资源来满足内存、CPU 和 GPU 的资源分配,则会快速失败 Ray 集群的初始化。当设置为 true 时,如果请求的资源不足以满足建议的最低功能需求,则会引发异常,详细说明 Spark 集群配置设置不足之处。如果覆盖为 False,则会引发警告。

  • collect_log_to_path – 如果指定,在 Ray 头节点/工作节点终止后,将其日志收集到指定的路径。在 Databricks Runtime 上,建议您指定以 ‘/dbfs/’ 开头的本地路径,因为该路径挂载到集中式存储设备,并且存储的数据在 Databricks Spark 集群终止后仍然存在。

  • autoscale_upscaling_speed – 如果启用自动扩缩容,此参数表示允许挂起的节点数量,是当前节点数量的倍数。值越高,扩缩容越激进。例如,如果设置为 1.0,则集群在任何时候最多可增长其当前大小的 100%。如果集群当前有 20 个节点,则最多允许 20 个挂起的启动。无论此设置如何,最小挂起启动数为 5。默认值为 1.0,最小值为 1.0

  • autoscale_idle_timeout_minutes – 如果启用自动扩缩容,此参数表示自动扩缩容器删除空闲工作节点之前需要经过的分钟数。值越小,缩减越激进。当工作节点上没有活跃任务、Actor 或引用的对象(无论是内存中还是溢出到磁盘)时,它们被认为是空闲的。此参数不影响头节点。默认值为 1.0,最小值为 0

返回:

返回一个元组 (address, remote_connection_address),其中“address”格式为“<Ray 头节点 IP>:<端口>”,“remote_connection_address”格式为“ray://<Ray 头节点 IP>:<Ray 客户端服务器端口>”。如果您的客户端运行在也托管 Ray 集群节点的机器上,您可以通过 ray.init(address) 连接到 Ray 集群;否则,您可以通过 ray.init(remote_connection_address) 连接到 Ray 集群。

ray.util.spark.shutdown_ray_cluster() None[source]#

关闭当前活跃的 Ray 集群。

ray.util.spark.setup_global_ray_cluster(*, max_worker_nodes: int, is_blocking: bool = True, min_worker_nodes: int | None = None, num_cpus_worker_node: int | None = None, num_cpus_head_node: int | None = None, num_gpus_worker_node: int | None = None, num_gpus_head_node: int | None = None, memory_worker_node: int | None = None, memory_head_node: int | None = None, object_store_memory_worker_node: int | None = None, object_store_memory_head_node: int | None = None, head_node_options: Dict | None = None, worker_node_options: Dict | None = None, strict_mode: bool = False, collect_log_to_path: str | None = None, autoscale_upscaling_speed: float | None = 1.0, autoscale_idle_timeout_minutes: float | None = 1.0)[source]#

设置全局模式集群。全局 Ray on Spark 集群意味着:- 您一次只能创建一个活跃的全局 Ray on Spark 集群。在 Databricks 集群上,全局 Ray 集群可以被所有用户使用,- 相反,非全局 Ray 集群只能被当前笔记本用户使用。- 它会持续运行而不会自动关闭。- 在 Databricks 笔记本中,您无需指定地址即可通过调用 ray.init() 连接到全局集群,如果它正在运行,它会自动发现全局集群。

对于全局模式,不支持 ray_temp_root_dir 参数。全局模式的 Ray 集群始终使用默认的 Ray 临时目录路径。

所有参数与 setup_ray_cluster API 相同,但以下情况除外:- 不支持 ray_temp_root_dir 参数。全局模式的 Ray 集群始终使用默认的 Ray 临时目录路径。- 添加了一个新参数“is_blocking”(默认为 True)。如果“is_blocking”为 True,则调用会一直阻塞,直到被中断。一旦调用被中断,全局 Ray on Spark 集群就会关闭,并且 setup_global_ray_cluster 调用终止。如果“is_blocking”为 False,则 Ray 集群设置完成后立即返回。