Data-Juicer 中的分布式数据处理#
Data-Juicer 支持基于 Ray 和阿里云 AI 平台 的大规模分布式数据处理。
通过专门的设计,您可以无缝地在 Ray 分布式模式下执行 Data-Juicer 在独立模式下实现的所有操作符。Data-Juicer 团队持续针对大规模场景进行引擎特定的优化,例如平衡文件数量和工作节点的数据子集拆分策略,以及针对 Ray 和 Apache Arrow 的 JSON 文件流式 I/O 补丁。
作为参考,在 25 到 100 个阿里云节点进行的实验中,Ray 模式下的 Data-Juicer 可以在 2 小时内处理包含 700 亿个样本的数据集(使用 6400 个 CPU 核心),在 0.45 小时内处理 70 亿个样本(使用 3200 个 CPU 核心)。此外,Ray 模式下基于 MinHash-LSH 的去重操作符可以在 3 小时内对 8 个节点(1280 个 CPU 核心)上的 TB 级数据集进行去重。
有关更多详细信息,请参阅 Data-Juicer 2.0:面向基础模型的云规模自适应数据处理 论文。
实现与优化#
Data-Juicer 中的 Ray 模式#
对于 Data-Juicer 操作符 的大多数实现,核心处理功能是与引擎无关的。操作符的互操作性主要在 RayDataset 和 RayExecutor 中管理,它们分别是基类
DJDataset和BaseExecutor的子类,并支持 Ray 任务 和 Actor。例外是去重操作符,它们在独立模式下难以扩展。这些操作符的名称遵循
ray_xx_deduplicator的模式。
子集拆分#
当一个集群拥有数万个节点但只有少量数据集文件时,Ray 会根据可用资源拆分数据集文件,并将块分发到所有节点,这会产生高昂的网络通信成本并降低 CPU 利用率。有关更多详细信息,请参阅 Ray 的 _autodetect_parallelism 函数 和 为 Ray 调优输出块。
默认的执行计划在节点数量众多的情况下可能效率不高。为了优化这种情况下的性能,Data-Juicer 会提前将原始数据集自动拆分成更小的文件,并考虑 Ray 和 Arrow 的特性。当遇到此类性能问题时,您可以使用此功能,或根据自己的偏好拆分数据集。在此自动拆分策略中,单个文件大小约为 128 MB,结果应确保拆分后的子文件数量至少是集群总可用 CPU 核心数的两倍。
JSON 文件流式读取#
JSON 文件流式读取是基础模型数据处理中的常见需求,因为许多数据集是 JSONL 格式且规模庞大。然而,Ray Datasets 的当前实现(依赖于底层 Arrow 库,支持到 Ray 版本 2.40 和 Arrow 版本 18.1.0)不支持 JSON 文件的流式读取。
为了解决缺乏对流式 JSON 数据原生支持的问题,Data-Juicer 团队开发了一个流式加载接口,并为 Apache Arrow 贡献了一个内部 补丁(向仓库提交的 PR)。该补丁有助于缓解内存不足问题。有了这个补丁,Ray 模式下的 Data-Juicer 默认使用流式加载接口加载 JSON 文件。此外,CSV 和 Parquet 文件的流式读取支持已经启用。
去重#
Data-Juicer 在 Ray 模式下提供了一个优化的 MinHash-LSH 去重操作符。它是一个 Ray Actor 中的多进程并查集,以及一个负载均衡的分布式算法 BTS,用于完成等价类的合并。该操作符可以在 3 小时内对 TB 级数据集使用 1280 个 CPU 核心进行去重。Data-Juicer 团队的消融研究表明,与此去重操作符的原始版本相比,其针对 Ray 模式的专用优化可实现 2 倍到 3 倍的速度提升。
性能结果#
不同规模的数据处理#
Data-Juicer 团队对包含数十亿样本的数据集进行了实验。他们准备了一个 560k 样本的多模态数据集,并通过不同的倍数(1x 到 125000x)进行扩展,以创建不同规模的数据集。实验结果如图所示,显示了良好的可扩展性。
大规模数据集上的分布式去重#
Data-Juicer 团队使用 640 到 1280 个 CPU 核心,对 200 GB、1 TB 和 5 TB 的数据集测试了基于 MinHash 的 RayDeduplicator。如下表所示,当数据量增加 5 倍时,处理时间增加 4.02 倍到 5.62 倍。当 CPU 核心数翻倍时,处理时间减少到原来的 58.9% 到 67.1%。
# CPU |
200 GB 时间 |
1 TB 时间 |
5 TB 时间 |
|---|---|---|---|
4 * 160 |
11.13 分钟 |
50.83 分钟 |
285.43 分钟 |
8 * 160 |
7.47 分钟 |
30.08 分钟 |
168.10 分钟 |
快速入门#
开始之前,您应该安装 Data-Juicer 及其 dist 依赖项。
pip install -v -e . # Install the minimal requirements of Data-Juicer
pip install -v -e ".[dist]" # Include dependencies on Ray and other distributed libraries
然后启动一个 Ray 集群(有关更多详细信息,请参阅 Ray 文档)。
# Start a cluster as the head node
ray start --head
# (Optional) Connect to the cluster on other nodes/machines.
ray start --address='{head_ip}:6379'
Data-Juicer 在 demos/process_on_ray/ 目录中提供简单的演示,其中包含两个配置文件和两个测试数据集。
demos/process_on_ray
├── configs
│ ├── demo.yaml
│ └── dedup.yaml
└── data
├── demo-dataset.json
└── demo-dataset.jsonl
重要提示: 如果您在多个节点上运行这些演示,则需要将演示数据集放置在共享磁盘(例如,网络附加存储)上,并通过修改配置文件中的
dataset_path和export_path参数将其导出到该磁盘。
Ray 模式运行示例#
在 demo.yaml 配置文件中,将执行器类型设置为 “ray” 并指定自动 Ray 地址。
...
dataset_path: './demos/process_on_ray/data/demo-dataset.jsonl'
export_path: './outputs/demo/demo-processed'
executor_type: 'ray' # Set the executor type to "ray"
ray_address: 'auto' # Set an automatic Ray address
...
运行演示以使用 12 个常规 OP 处理数据集。
# Run the tool from source
python tools/process_data.py --config demos/process_on_ray/configs/demo.yaml
# Use the command-line tool
dj-process --config demos/process_on_ray/configs/demo.yaml
Data-Juicer 使用演示配置文件处理演示数据集,并将结果数据集导出到配置文件中 export_path 参数指定的目录。
分布式去重运行示例#
在 dedup.yaml 配置文件中,将执行器类型设置为 “ray” 并指定自动 Ray 地址。它使用专用分布式版本的 MinHash 去重操作符来去重数据集。
project_name: 'demo-dedup'
dataset_path: './demos/process_on_ray/data/'
export_path: './outputs/demo-dedup/demo-ray-bts-dedup-processed'
executor_type: 'ray' # Set the executor type to "ray"
ray_address: 'auto' # Set an automatic Ray address
# process schedule
# a list of several process operators with their arguments
process:
- ray_bts_minhash_deduplicator: # a distributed version of minhash deduplicator
tokenization: 'character'
运行演示以去重数据集。
# Run the tool from source
python tools/process_data.py --config demos/process_on_ray/configs/dedup.yaml
# Use the command-line tool
dj-process --config demos/process_on_ray/configs/dedup.yaml
Data-Juicer 使用演示配置文件去重演示数据集,并将结果数据集导出到配置文件中 export_path 参数指定的目录。