使用 Ray Core 的简单 MapReduce 示例#
本示例演示了如何使用 Ray 来处理一个常见的分布式计算示例——跨多个文档计数单词的出现次数。复杂性在于处理大型语料库,这需要多个计算节点来处理数据。使用 Ray 实现 MapReduce 的简便性是分布式计算的一个重要里程碑。许多流行的海量数据技术,例如 Hadoop,都建立在此编程模型之上,凸显了使用 Ray Core 的影响力。
MapReduce 方法有三个阶段
Map 阶段 Map 阶段将指定的函数应用于一组数据中的元素,以转换或*映射*这些元素。它会生成键值对:键代表一个元素,值是为该元素计算的度量。为了计算每个单词在文档中出现的次数,当一个单词出现时,map 函数会输出对
(word, 1),表示该单词出现了一次。Shuffle 阶段 Shuffle 阶段收集 Map 阶段的所有输出,并按键进行组织。当在多个计算节点上找到相同的键时,此阶段包括在不同节点之间传输或*混洗*数据。如果 Map 阶段生成了四个
(word, 1)对,Shuffle 阶段会将所有相同单词的出现次数集中到同一个节点上。Reduce 阶段 Reduce 阶段聚合 Shuffle 阶段的元素。每个单词出现次数的总数是每个节点上出现次数的总和。例如,四个
(word, 1)实例会合并为最终计数word: 4。
第一个和最后一个阶段是 MapReduce 名称的一部分,但中间的阶段同样至关重要。这些阶段看起来很简单,但它们的强大之处在于可以在多台机器上并发运行。下图说明了在一组文档上的三个 MapReduce 阶段。

加载数据#
我们使用 Python 来实现 MapReduce 算法进行单词计数,并使用 Ray 来并行化计算。我们首先从“Python 之禅”(Python 社区的一系列编码指南)加载一些示例数据。根据彩蛋传统,通过在 Python 会话中键入 import this 来访问“Python 之禅”。我们将“Python 之禅”分成三个独立的“文档”,将每一行视为一个单独的实体,然后将这些行分成三个分区。
import subprocess
zen_of_python = subprocess.check_output(["python", "-c", "import this"])
corpus = zen_of_python.split()
num_partitions = 3
chunk = len(corpus) // num_partitions
partitions = [
corpus[i * chunk: (i + 1) * chunk] for i in range(num_partitions)
]
映射数据#
为了确定 Map 阶段,我们需要一个 map 函数来应用于每个文档。对于在文档中找到的每个单词,输出都是 (word, 1) 对。对于我们加载为 Python 字符串的基本文本文档,过程如下:
def map_function(document):
for word in document.lower().split():
yield word, 1
我们将 apply_map 函数应用于大量文档集合,方法是使用 @ray.remote 装饰器将其标记为 Ray 中的一个任务。当我们调用 apply_map 时,我们会将其应用于三组文档数据(num_partitions=3)。apply_map 函数返回三个列表,每个列表对应一个分区,以便 Ray 可以重新排列 Map 阶段的结果并将其分发到适当的节点。
import ray
@ray.remote
def apply_map(corpus, num_partitions=3):
map_results = [list() for _ in range(num_partitions)]
for document in corpus:
for result in map_function(document):
first_letter = result[0].decode("utf-8")[0]
word_index = ord(first_letter) % num_partitions
map_results[word_index].append(result)
return map_results
对于可以存储在单台机器上的文本语料库,Map 阶段不是必需的。但是,当数据需要跨多个节点分发时,Map 阶段很有用。要并行地将 Map 阶段应用于语料库,我们使用对 apply_map 的远程调用,这与前面的示例类似。主要区别在于我们希望通过 num_returns 参数返回三个结果(每个分区一个)。
map_results = [
apply_map.options(num_returns=num_partitions)
.remote(data, num_partitions)
for data in partitions
]
for i in range(num_partitions):
mapper_results = ray.get(map_results[i])
for j, result in enumerate(mapper_results):
print(f"Mapper {i}, return value {j}: {result[:2]}")
Mapper 0, return value 0: [(b'of', 1), (b'is', 1)]
Mapper 0, return value 1: [(b'python,', 1), (b'peters', 1)]
Mapper 0, return value 2: [(b'the', 1), (b'zen', 1)]
Mapper 1, return value 0: [(b'unless', 1), (b'in', 1)]
Mapper 1, return value 1: [(b'although', 1), (b'practicality', 1)]
Mapper 1, return value 2: [(b'beats', 1), (b'errors', 1)]
Mapper 2, return value 0: [(b'is', 1), (b'is', 1)]
Mapper 2, return value 1: [(b'although', 1), (b'a', 1)]
Mapper 2, return value 2: [(b'better', 1), (b'than', 1)]
此示例演示了如何使用 ray.get 在驱动程序上收集数据。要完成 Map 阶段后的另一个任务,您将不这样做。下一节将展示如何高效地将所有阶段一起运行。
混洗和归约数据#
Reduce 阶段的目标是将来自第 j 个返回值的所有对传输到同一节点。在 Reduce 阶段,我们创建一个字典,该字典将每个分区上的所有单词出现次数相加。
@ray.remote
def apply_reduce(*results):
reduce_results = dict()
for res in results:
for key, value in res:
if key not in reduce_results:
reduce_results[key] = 0
reduce_results[key] += value
return reduce_results
我们可以使用以下方法获取来自每个 mapper 的第 j 个返回值并将其发送到第 j 个 reducer。请注意,此代码适用于无法容纳在一台机器上的大型数据集,因为我们传递的是 Ray 对象的*数据引用*而不是实际数据本身。Map 和 Reduce 阶段都可以运行在任何 Ray 集群上,Ray 会处理数据混洗。
outputs = []
for i in range(num_partitions):
outputs.append(
apply_reduce.remote(*[partition[i] for partition in map_results])
)
counts = {k: v for output in ray.get(outputs) for k, v in output.items()}
sorted_counts = sorted(counts.items(), key=lambda item: item[1], reverse=True)
for count in sorted_counts:
print(f"{count[0].decode('utf-8')}: {count[1]}")
is: 10
better: 8
than: 8
the: 6
to: 5
of: 3
although: 3
be: 3
unless: 2
one: 2
if: 2
implementation: 2
idea.: 2
special: 2
should: 2
do: 2
may: 2
a: 2
never: 2
way: 2
explain,: 2
ugly.: 1
implicit.: 1
complex.: 1
complex: 1
complicated.: 1
flat: 1
readability: 1
counts.: 1
cases: 1
rules.: 1
in: 1
face: 1
refuse: 1
one--: 1
only: 1
--obvious: 1
it.: 1
obvious: 1
first: 1
often: 1
*right*: 1
it's: 1
it: 1
idea: 1
--: 1
let's: 1
python,: 1
peters: 1
simple: 1
sparse: 1
dense.: 1
aren't: 1
practicality: 1
purity.: 1
pass: 1
silently.: 1
silenced.: 1
ambiguity,: 1
guess.: 1
and: 1
preferably: 1
at: 1
you're: 1
dutch.: 1
good: 1
are: 1
great: 1
more: 1
zen: 1
by: 1
tim: 1
beautiful: 1
explicit: 1
nested.: 1
enough: 1
break: 1
beats: 1
errors: 1
explicitly: 1
temptation: 1
there: 1
that: 1
not: 1
now: 1
never.: 1
now.: 1
hard: 1
bad: 1
easy: 1
namespaces: 1
honking: 1
those!: 1
要深入了解如何使用 Ray 在多台机器上扩展 MapReduce 任务,包括内存管理,请阅读相关主题的博客文章。
总结#
此 MapReduce 示例演示了 Ray 编程模型的灵活性。生产级的 MapReduce 实现需要更多的工作,但能够*快速*重现常见的算法(如本例)大有裨益。在 MapReduce 的早期(大约 2010 年),这种范式通常是表达工作负载的唯一模型。有了 Ray,任何中等水平的 Python 程序员都可以使用各种有趣的分布式计算模式。
要了解有关 Ray,特别是 Ray Core 的更多信息,请参阅Ray Core 示例库,或我们用例库中的 ML 工作负载。此 MapReduce 示例可以在《学习 Ray》中找到,其中包含更多类似的示例。