使用蒙特卡洛方法估算 π#

Run on Anyscale

本教程将向您展示如何使用一种蒙特卡洛方法估算 π 的值。该方法通过在 2x2 的正方形内随机采样点来工作。我们可以使用落在以原点为中心的单位圆内的点的比例来估算圆面积与正方形面积的比例。鉴于我们知道真实比例为 π/4,我们可以将估算的比例乘以 4 来近似 π 的值。我们采样更多的点来计算这个近似值,那么估算值应该会更接近 π 的真实值。

../../_images/monte_carlo_pi.png

我们使用 Ray 的任务 (tasks)来分发采样工作,并使用 Ray 的Actor来跟踪这些分布式采样任务的进度。该代码可以在您的笔记本电脑上运行,并且可以轻松地扩展到大型集群 (clusters)以提高估算的准确性。

要开始,请通过 pip install -U ray 安装 Ray。有关更多安装选项,请参阅 安装 Ray

启动 Ray#

首先,我们包含本教程所需的所有模块,并使用 ray.init() 启动一个本地 Ray 集群。

import ray
import math
import time
import random

ray.init()

定义进度 Actor#

接下来,我们定义一个 Ray Actor,它可以被采样任务调用以更新进度。Ray Actor 本质上是状态服务,任何拥有 Actor 实例(句柄)的人都可以调用其方法。

@ray.remote
class ProgressActor:
    def __init__(self, total_num_samples: int):
        self.total_num_samples = total_num_samples
        self.num_samples_completed_per_task = {}

    def report_progress(self, task_id: int, num_samples_completed: int) -> None:
        self.num_samples_completed_per_task[task_id] = num_samples_completed

    def get_progress(self) -> float:
        return (
            sum(self.num_samples_completed_per_task.values()) / self.total_num_samples
        )

我们通过使用 ray.remote 装饰一个普通的 Python 类来定义一个 Ray Actor。进度 Actor 具有 report_progress() 方法,该方法将被采样任务调用以单独更新其进度,以及一个 get_progress() 方法来获取总体进度。

定义采样任务#

定义好 Actor 后,我们现在定义一个 Ray 任务,该任务进行最多 `num_samples` 次采样,并返回落在圆内的采样数。Ray 任务是无状态函数。它们异步执行,并并行运行。

@ray.remote
def sampling_task(num_samples: int, task_id: int,
                  progress_actor: ray.actor.ActorHandle) -> int:
    num_inside = 0
    for i in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1

        # Report progress every 1 million samples.
        if (i + 1) % 1_000_000 == 0:
            # This is async.
            progress_actor.report_progress.remote(task_id, i + 1)

    # Report the final progress.
    progress_actor.report_progress.remote(task_id, num_samples)
    return num_inside

要将普通 Python 函数转换为 Ray 任务,我们使用 ray.remote 装饰该函数。采样任务以进度 Actor 句柄作为输入,并向其报告进度。上面的代码展示了一个从任务中调用 Actor 方法的示例。

创建进度 Actor#

定义好 Actor 后,我们就可以创建一个实例了。

# Change this to match your cluster scale.
NUM_SAMPLING_TASKS = 10
NUM_SAMPLES_PER_TASK = 10_000_000
TOTAL_NUM_SAMPLES = NUM_SAMPLING_TASKS * NUM_SAMPLES_PER_TASK

# Create the progress actor.
progress_actor = ProgressActor.remote(TOTAL_NUM_SAMPLES)

要创建进度 Actor 的实例,只需调用 `ActorClass.remote()` 方法并传入构造函数的参数。这将创建一个 Actor 并在远程工作进程上运行它。`ActorClass.remote(...)` 的返回值是一个 Actor 句柄,可用于调用其方法。

执行采样任务#

任务定义完毕后,我们就可以异步执行它了。

# Create and execute all sampling tasks in parallel.
results = [
    sampling_task.remote(NUM_SAMPLES_PER_TASK, i, progress_actor)
    for i in range(NUM_SAMPLING_TASKS)
]

我们通过调用 `remote()` 方法并传入函数的参数来执行采样任务。这将立即返回一个 `ObjectRef` 作为 Future,然后异步在远程工作进程上执行函数。

调用进度 Actor#

在采样任务运行时,我们可以通过调用 Actor 的 `get_progress()` 方法来定期查询进度。

# Query progress periodically.
while True:
    progress = ray.get(progress_actor.get_progress.remote())
    print(f"Progress: {int(progress * 100)}%")

    if progress == 1:
        break

    time.sleep(1)

要调用 Actor 方法,请使用 `actor_handle.method.remote()`。此调用将立即返回一个 `ObjectRef` 作为 Future,然后异步在远程 Actor 进程上执行该方法。要获取 `ObjectRef` 的实际返回值,我们使用阻塞的 ray.get()

计算 π#

最后,我们从远程采样任务中获取圆内的采样数,并计算 π。

# Get all the sampling tasks results.
total_num_inside = sum(ray.get(results))
pi = (total_num_inside * 4) / TOTAL_NUM_SAMPLES
print(f"Estimated value of π is: {pi}")

正如我们在上面的代码中看到的,除了单个 `ObjectRef` 之外,ray.get() 还可以接受一个 `ObjectRef` 列表并返回一个结果列表。

如果您运行本教程,您将看到如下输出:

Progress: 0%
Progress: 15%
Progress: 28%
Progress: 40%
Progress: 50%
Progress: 60%
Progress: 70%
Progress: 80%
Progress: 90%
Progress: 100%
Estimated value of π is: 3.1412202