监控您的工作负载#
本节通过查看 Dataset 的 Ray Data progress bars,Ray Data dashboard,Ray Data logs,Ray Data stats 来帮助您调试和监控 Dataset 的执行。
Ray Data progress bars#
当您执行一个 Dataset 时,Ray Data 会在控制台中显示一组进度条。这些进度条显示了各种执行和进度相关的指标,包括已完成/剩余的行数、资源使用情况以及任务/Actor 状态。请参阅带有注释的图像,了解如何解释进度条输出的详细信息。
关于进度条的一些附加说明
进度条每秒更新一次;资源使用情况、指标和任务/Actor 状态可能需要长达 5 秒才会更新。
当任务部分包含标签
[backpressure]时,表示该算子处于反压(backpressured)状态,这意味着该算子在下游算子准备好接受更多数据之前不会提交更多任务。全局资源使用情况是所有算子(活动和请求的)使用的资源总和(包括待调度和待节点分配)。
配置进度条#
根据您的用例,您可能不关心完整的进度条输出,或者希望完全关闭它们。Ray Data 提供了几种方法来实现这一点:
禁用算子级进度条:将
DataContext.get_current().enable_operator_progress_bars = False设置为False。这只会显示全局进度条,并省略算子级进度条。禁用所有进度条:将
DataContext.get_current().enable_progress_bars = False设置为False。这将禁用 Ray Data 与数据集执行相关的所有进度条。禁用
ray_tqdm:将DataContext.get_current().use_ray_tqdm = False设置为False。这将配置 Ray Data 使用基础的tqdm库而不是自定义的分布式tqdm实现,这在调试分布式环境中的日志记录问题时可能很有用。
对于长度超过 100 个字符阈值的算子名称,Ray Data 默认会截断名称,以防止出现算子名称过长导致进度条太宽而无法显示在屏幕上的情况。
要关闭此行为并显示完整的算子名称,请将
DataContext.get_current().enable_progress_bar_name_truncation = False设置为False。要更改截断名称的阈值,请更新常量
ray.data._internal.progress_bar.ProgressBar.MAX_NAME_LENGTH = 42。
提示
有一个新的实验性控制台 UI 来显示进度条。通过将 DataContext.get_current().enable_rich_progress_bars = True 设置为 True 或设置 RAY_DATA_ENABLE_RICH_PROGRESS_BARS=1 环境变量来启用。
Ray Data dashboard#
Ray Data 在数据集执行期间实时发出 Prometheus 指标。这些指标按数据集和算子进行标记,并在 Ray dashboard 的多个视图中显示。
注意
大多数指标仅适用于使用 map 操作的物理算子。例如,由 map_batches()、map() 和 flat_map() 创建的物理算子。
作业:Ray Data 概览#
要概览您集群上运行的所有数据集,请在 作业视图 中查看 Ray Data 概览。当第一个数据集在集群上开始执行后,此表就会出现,并显示数据集的详细信息,例如:
执行进度(以块为单位)
执行状态(运行中、失败或已完成)
数据集开始/结束时间
数据集级指标(例如,所有算子上已处理的总行数)
为了获得更精细化的概览,表中的每个数据集行都可以展开以显示单个算子的相同详细信息。
提示
要评估一个数据集级指标,当不适合对所有单个算子的值求和时,查看最后一个算子的算子级指标可能更有用。例如,要计算数据集的吞吐量,请使用数据集最后一个算子的“输出行数”,因为数据集级指标包含所有算子输出的行数总和。
Ray dashboard 指标#
有关这些指标的时间序列视图,请参阅 Metrics view 中的 Ray Data 部分。本节包含 Ray Data 发出的所有指标的时间序列图。执行指标按数据集和算子分组,迭代指标按数据集分组。
记录的指标包括:
从对象存储到磁盘溢出的对象字节数
对象存储中分配的对象字节数
对象存储中已释放的对象字节数
对象存储中当前对象总字节数
分配给数据集算子的逻辑 CPU 数量
分配给数据集算子的逻辑 GPU 数量
数据集算子输出的字节数
数据集算子输出的行数
数据算子接收的输入块
数据算子任务中处理的输入块/字节数
提交给任务的输入字节数
数据算子任务中生成的输出块/字节数/行数
下游算子接收的输出块/字节数
已完成任务的输出块/字节数
已提交的任务数
正在运行的任务数
具有至少一个输出块的任务数
已完成的任务数
失败的任务数
算子内部队列大小(块/字节)
算子内部出队大小(块/字节)
待处理任务中使用的块大小
对象存储中已释放的内存
对象存储中溢出的内存
生成块所花费的时间
任务提交反压所花费的时间
初始化迭代所花费的时间。
迭代期间用户代码被阻塞的时间。
迭代期间用户代码所花费的时间。
要了解有关 Ray dashboard 的更多信息,包括详细的设置说明,请参阅 Ray Dashboard。
Prometheus 指标#
Ray Data 发出 Prometheus 指标,可用于监控数据集执行。指标用 dataset 和 operator 标签进行标记,以帮助您识别指标来自哪个数据集和算子。
要访问这些指标,您可以查询运行在 Ray head 节点上的 Prometheus 服务器。默认的 Prometheus 服务器 URL 是 http://<head-node-ip>:8080。
下表按类别列出了所有可用的 Ray Data 指标。
概览指标#
这些指标提供有关数据集执行和资源使用情况的高级别信息。
指标名称 |
描述 |
|---|---|
|
数据集算子溢出的字节数。将 |
|
数据集算子释放的字节数 |
|
数据集算子使用的对象存储内存的字节数 |
|
分配给数据集算子的 CPU 数量 |
|
分配给数据集算子的 GPU 数量 |
|
数据集算子输出的字节数 |
|
数据集算子输出的行数 |
输入指标#
这些指标跟踪流入算子的输入数据。
指标名称 |
描述 |
|---|---|
|
算子接收的输入块数量 |
|
算子接收的输入行数量 |
|
算子接收的输入块的字节大小 |
|
算子任务已完成处理的输入块数量 |
|
算子任务已完成处理的输入块的字节大小 |
|
传递给已提交任务的输入块的字节大小 |
|
传递给已提交任务的输入块中的行数 |
|
每个任务的平均输入块数量,如果没有任务完成则为 |
|
传递给任务的 ref bundles 的平均字节大小,如果没有任务提交则为 |
|
输入块中每任务的平均行数,如果没有任务提交则为 |
输出指标#
这些指标跟踪算子生成的输出数据。
指标名称 |
描述 |
|---|---|
|
任务生成的输出块数量 |
|
任务生成的输出块的字节大小 |
|
任务生成的输出行数量 |
|
已被下游算子接收的行数 |
|
已被下游算子接收的块数量 |
|
已被下游算子接收的输出块数量 |
|
已被下游算子接收的输出块的字节大小 |
|
已完成任务生成的输出块数量 |
|
已完成任务生成的输出块的总字节大小 |
|
已完成任务生成的行数 |
|
外部输入队列中的块数量 |
|
外部输入队列中块的字节大小 |
|
外部输出队列中的块数量 |
|
外部输出队列中块的字节大小 |
|
每个任务的平均输出块数量,如果没有任务完成则为 |
|
输出块的平均字节大小 |
|
任务的平均总输出大小(字节),如果没有任务完成则为 |
|
每个任务的平均输出行数,如果没有任务完成则为 |
|
每秒每任务的平均输出块数量 |
任务指标#
这些指标跟踪任务的执行和调度。
指标名称 |
描述 |
|---|---|
|
已提交的任务数量 |
|
正在运行的任务数量 |
|
具有至少一个输出的任务数量 |
|
已完成的任务数量 |
|
失败的任务数量 |
|
任务中生成块所花费的时间 |
|
任务提交反压所花费的时间 |
|
任务输出反压所花费的时间 |
|
任务完成运行时间直方图 |
|
单个块完成运行时间直方图。如果每个任务生成多个块,Ray Data 会通过假设每个块的处理时间相等来近似。 |
|
任务完成运行所花费的时间(秒) |
|
任务完成运行时间(秒),不包括反压 |
|
任务生成的块大小(字节)直方图 |
|
任务生成的块中的行数直方图 |
|
平均任务完成时间(秒),包括节流。这包括 Ray Core 和 Ray Data 的反压。 |
|
平均任务完成时间(秒),不包括节流 |
|
任务的平均唯一集大小(USS)内存使用量。USS 是进程独有的内存量,如果进程终止,这些内存将被释放。 |
Actor 指标#
这些指标跟踪使用 Actor 的操作的 Actor 生命周期。
指标名称 |
描述 |
|---|---|
|
存活 Actor 数量 |
|
正在重启的 Actor 数量 |
|
待处理 Actor 数量 |
对象存储内存指标#
这些指标跟踪 Ray 对象存储中的内存使用情况。
指标名称 |
描述 |
|---|---|
|
算子内部输入队列中的块数量 |
|
算子内部输出队列中的块数量 |
|
对象存储中已释放内存的字节大小 |
|
对象存储中溢出内存的字节大小 |
|
对象存储中已用内存的字节大小 |
|
算子内部输入队列中输入块的字节大小 |
|
算子内部输出队列中输出块的字节大小 |
|
待处理任务使用的输入块的字节大小 |
调度和资源指标#
这些指标跟踪流式执行器中的资源分配和调度行为。
指标名称 |
描述 |
|---|---|
|
调度循环的持续时间(秒) |
|
每个算子分配的 CPU 预算 |
|
每个算子分配的 GPU 预算 |
|
每个算子分配的内存预算 |
|
每个算子分配的对象存储内存预算 |
|
每个算子从流式生成器缓冲区读取的最大字节数 |
Ray Data 日志#
执行期间,Ray Data 会定期将更新记录到 ray-data.log。
每五秒钟,Ray Data 会记录数据集中每个算子的执行进度。如需更频繁的更新,请将 RAY_DATA_TRACE_SCHEDULING=1 设置为 1,以便在每个任务分派后记录进度。
Execution Progress:
0: - Input: 0 active, 0 queued, 0.0 MiB objects, Blocks Outputted: 200/200
1: - ReadRange->MapBatches(<lambda>): 10 active, 190 queued, 381.47 MiB objects, Blocks Outputted: 100/200
当算子完成时,该算子的指标也会被记录。
Operator InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange->MapBatches(<lambda>)] completed. Operator Metrics:
{'num_inputs_received': 20, 'bytes_inputs_received': 46440, 'num_task_inputs_processed': 20, 'bytes_task_inputs_processed': 46440, 'num_task_outputs_generated': 20, 'bytes_task_outputs_generated': 800, 'rows_task_outputs_generated': 100, 'num_outputs_taken': 20, 'bytes_outputs_taken': 800, 'num_outputs_of_finished_tasks': 20, 'bytes_outputs_of_finished_tasks': 800, 'num_tasks_submitted': 20, 'num_tasks_running': 0, 'num_tasks_have_outputs': 20, 'num_tasks_finished': 20, 'obj_store_mem_freed': 46440, 'obj_store_mem_spilled': 0, 'block_generation_time': 1.191296085, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
此日志文件可以在本地找到:/tmp/ray/{SESSION_NAME}/logs/ray-data/ray-data.log。它也可以在 Ray Dashboard 的 head 节点的日志中找到,位于 Logs view。
Ray Data 统计信息#
要查看数据集执行的详细统计信息,您可以使用 stats() 方法。
算子统计信息#
统计输出包括每个算子单个算子执行统计信息的摘要。Ray Data 在许多不同的块上计算此摘要,因此某些统计信息显示在所有块上聚合的统计信息的最小值、最大值、平均值和总和。以下是算子级别包含的各种统计信息的描述:
远程墙上时间:墙上时间是指算子的开始到结束时间。它包括算子未处理数据、睡眠、等待 I/O 等的时间。
远程 CPU 时间:CPU 时间是指算子的进程时间,不包括睡眠时间。此时间包括用户 CPU 时间和系统 CPU 时间。
UDF 时间:UDF 时间是指在用户定义的函数中花费的时间。此时间包括您传递给 Ray Data 方法的函数的时间,例如
map()、map_batches()、filter()等。您可以使用此统计信息来跟踪在您定义的函数中花费的时间,以及优化这些函数可以节省多少时间。内存使用量:输出以 MiB 为单位显示每个块的内存使用量。
输出统计信息:输出包括每个块的输出行数和输出大小(字节)的统计信息。每任务的输出行数也包括在内。所有这些信息共同让您了解 Ray Data 在每个块和每任务级别输出多少数据。
任务统计信息:输出显示了任务到节点的调度情况,让您可以查看是否按预期利用了所有节点。
吞吐量:摘要计算了算子的吞吐量,并作为比较点,它还计算了相同任务在单个节点上的吞吐量估算。此估算假定工作总时间保持不变,但没有并发。总体摘要还计算了数据集级别的吞吐量,包括单个节点估算。
迭代器统计信息#
如果您遍历数据,Ray Data 还会生成迭代统计信息。即使您不直接遍历数据,也可能会看到迭代统计信息,例如,如果您调用 take_all()。Ray Data 在迭代器级别包含的一些统计信息是:
迭代器初始化:Ray Data 花费时间初始化迭代器。此时间是 Ray Data 内部的。
用户线程阻塞时间:Ray Data 花费时间在迭代器中生成数据。如果您尚未物化数据集,此时间通常是数据集的主要执行时间。
用户线程时间:在迭代数据集的用户线程中花费的时间,不包括 Ray Data 代码。如果此时间较长,请考虑优化迭代数据集的循环体。
批处理迭代统计信息:Ray Data 还包括有关批处理预取的统计信息。这些时间是 Ray Data 代码内部的,但您可以通过调整预取过程来进一步优化这些时间。
详细统计信息#
默认情况下,Ray Data 只记录最重要的顶级统计信息。要启用详细统计信息输出,请在您的 Ray Data 代码中包含以下代码片段:
from ray.data import DataContext
context = DataContext.get_current()
context.verbose_stats_logs = True
通过启用详细信息,Ray Data 会添加一些额外的输出:
额外指标:算子、执行器等可以添加到此字典中各种指标。默认输出和此字典之间存在一些统计信息的重复,但对于高级用户,此统计信息提供了对数据集执行的更多洞察。
运行时指标:这些指标是数据集执行运行时的顶级细分。这些统计信息是每个算子的摘要,显示每个算子完成所需的时间以及该算子完成数据集总执行时间所占的比例。由于可能存在多个并发算子,这些百分比不一定加起来是 100%。相反,它们显示了每个算子在整个数据集执行中的运行时间。
示例统计信息#
作为一个具体的例子,下面是从 使用 PyTorch ResNet18 进行图像分类批处理推理 的统计信息输出:
Operator 1 ReadImage->Map(preprocess_image): 384 tasks executed, 386 blocks produced in 9.21s
* Remote wall time: 33.55ms min, 2.22s max, 1.03s mean, 395.65s total
* Remote cpu time: 34.93ms min, 3.36s max, 1.64s mean, 632.26s total
* UDF time: 535.1ms min, 2.16s max, 975.7ms mean, 376.62s total
* Peak heap memory usage (MiB): 556.32 min, 1126.95 max, 655 mean
* Output num rows per block: 4 min, 25 max, 24 mean, 9469 total
* Output size bytes per block: 6060399 min, 105223020 max, 31525416 mean, 12168810909 total
* Output rows per task: 24 min, 25 max, 24 mean, 384 tasks used
* Tasks per node: 32 min, 64 max, 48 mean; 8 nodes used
* Operator throughput:
* Ray Data throughput: 1028.5218637702708 rows/s
* Estimated single node throughput: 23.932674100499128 rows/s
Operator 2 MapBatches(ResnetModel): 14 tasks executed, 48 blocks produced in 27.43s
* Remote wall time: 523.93us min, 7.01s max, 1.82s mean, 87.18s total
* Remote cpu time: 523.23us min, 6.23s max, 1.76s mean, 84.61s total
* UDF time: 4.49s min, 17.81s max, 10.52s mean, 505.08s total
* Peak heap memory usage (MiB): 4025.42 min, 7920.44 max, 5803 mean
* Output num rows per block: 84 min, 334 max, 197 mean, 9469 total
* Output size bytes per block: 72317976 min, 215806447 max, 134739694 mean, 6467505318 total
* Output rows per task: 319 min, 720 max, 676 mean, 14 tasks used
* Tasks per node: 3 min, 4 max, 3 mean; 4 nodes used
* Operator throughput:
* Ray Data throughput: 345.1533728632648 rows/s
* Estimated single node throughput: 108.62003864820711 rows/s
Dataset iterator time breakdown:
* Total time overall: 38.53s
* Total time in Ray Data iterator initialization code: 16.86s
* Total time user thread is blocked by Ray Data iter_batches: 19.76s
* Total execution time for user thread: 1.9s
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): 70.49ms min, 2.16s max, 272.8ms avg, 13.09s total
* In batch creation: 3.6us min, 5.95us max, 4.26us avg, 204.41us total
* In batch formatting: 4.81us min, 7.88us max, 5.5us avg, 263.94us total
Dataset throughput:
* Ray Data throughput: 1026.5318925757008 rows/s
* Estimated single node throughput: 19.611578909587674 rows/s
对于启用了详细信息功能的同一示例,统计信息输出是:
Operator 1 ReadImage->Map(preprocess_image): 384 tasks executed, 387 blocks produced in 9.49s
* Remote wall time: 22.81ms min, 2.5s max, 999.95ms mean, 386.98s total
* Remote cpu time: 24.06ms min, 3.36s max, 1.63s mean, 629.93s total
* UDF time: 552.79ms min, 2.41s max, 956.84ms mean, 370.3s total
* Peak heap memory usage (MiB): 550.95 min, 1186.28 max, 651 mean
* Output num rows per block: 4 min, 25 max, 24 mean, 9469 total
* Output size bytes per block: 4444092 min, 105223020 max, 31443955 mean, 12168810909 total
* Output rows per task: 24 min, 25 max, 24 mean, 384 tasks used
* Tasks per node: 39 min, 60 max, 48 mean; 8 nodes used
* Operator throughput:
* Ray Data throughput: 997.9207015895857 rows/s
* Estimated single node throughput: 24.46899945870273 rows/s
* Extra metrics: {'num_inputs_received': 384, 'bytes_inputs_received': 1104723940, 'num_task_inputs_processed': 384, 'bytes_task_inputs_processed': 1104723940, 'bytes_inputs_of_submitted_tasks': 1104723940, 'num_task_outputs_generated': 387, 'bytes_task_outputs_generated': 12168810909, 'rows_task_outputs_generated': 9469, 'num_outputs_taken': 387, 'bytes_outputs_taken': 12168810909, 'num_outputs_of_finished_tasks': 387, 'bytes_outputs_of_finished_tasks': 12168810909, 'num_tasks_submitted': 384, 'num_tasks_running': 0, 'num_tasks_have_outputs': 384, 'num_tasks_finished': 384, 'num_tasks_failed': 0, 'block_generation_time': 386.97945193799995, 'task_submission_backpressure_time': 7.263684450000142, 'obj_store_mem_internal_inqueue_blocks': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 0, 'obj_store_mem_freed': 1104723940, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 12582535566, 'cpu_usage': 0, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}
Operator 2 MapBatches(ResnetModel): 14 tasks executed, 48 blocks produced in 28.81s
* Remote wall time: 134.84us min, 7.23s max, 1.82s mean, 87.16s total
* Remote cpu time: 133.78us min, 6.28s max, 1.75s mean, 83.98s total
* UDF time: 4.56s min, 17.78s max, 10.28s mean, 493.48s total
* Peak heap memory usage (MiB): 3925.88 min, 7713.01 max, 5688 mean
* Output num rows per block: 125 min, 259 max, 197 mean, 9469 total
* Output size bytes per block: 75531617 min, 187889580 max, 134739694 mean, 6467505318 total
* Output rows per task: 325 min, 719 max, 676 mean, 14 tasks used
* Tasks per node: 3 min, 4 max, 3 mean; 4 nodes used
* Operator throughput:
* Ray Data throughput: 328.71474145609153 rows/s
* Estimated single node throughput: 108.6352856660782 rows/s
* Extra metrics: {'num_inputs_received': 387, 'bytes_inputs_received': 12168810909, 'num_task_inputs_processed': 0, 'bytes_task_inputs_processed': 0, 'bytes_inputs_of_submitted_tasks': 12168810909, 'num_task_outputs_generated': 1, 'bytes_task_outputs_generated': 135681874, 'rows_task_outputs_generated': 252, 'num_outputs_taken': 1, 'bytes_outputs_taken': 135681874, 'num_outputs_of_finished_tasks': 0, 'bytes_outputs_of_finished_tasks': 0, 'num_tasks_submitted': 14, 'num_tasks_running': 14, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 0, 'num_tasks_failed': 0, 'block_generation_time': 7.229860895999991, 'task_submission_backpressure_time': 0, 'obj_store_mem_internal_inqueue_blocks': 13, 'obj_store_mem_internal_inqueue': 413724657, 'obj_store_mem_internal_outqueue_blocks': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 12168810909, 'obj_store_mem_freed': 0, 'obj_store_mem_spilled': 0, 'obj_store_mem_used': 1221136866.0, 'cpu_usage': 0, 'gpu_usage': 4}
Dataset iterator time breakdown:
* Total time overall: 42.29s
* Total time in Ray Data iterator initialization code: 20.24s
* Total time user thread is blocked by Ray Data iter_batches: 19.96s
* Total execution time for user thread: 2.08s
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): 73.0ms min, 2.15s max, 246.3ms avg, 11.82s total
* In batch creation: 3.62us min, 6.6us max, 4.39us avg, 210.7us total
* In batch formatting: 4.75us min, 8.67us max, 5.52us avg, 264.98us total
Dataset throughput:
* Ray Data throughput: 468.11051989434594 rows/s
* Estimated single node throughput: 972.8197093015862 rows/s
Runtime Metrics:
* ReadImage->Map(preprocess_image): 9.49s (46.909%)
* MapBatches(ResnetModel): 28.81s (142.406%)
* Scheduling: 6.16s (30.448%)
* Total: 20.23s (100.000%)