反模式:在应用代码中 Fork 新进程#

总结: 不要在 Ray 应用代码中 Fork 新进程,例如在 driver、任务或 actor 中。而是使用“spawn”方法启动新进程,或者使用 Ray 任务和 actor 来并行化您的工作负载。

Ray 为您管理进程的生命周期。Ray 对象、任务和 Actor 管理与 Raylet 和 GCS 通信的套接字。如果您在应用代码中 Fork 新进程,这些进程可能会共享相同的套接字而没有任何同步。这可能导致消息损坏和意外行为。

解决方案是:1. 使用“spawn”方法启动新进程,以便父进程的内存空间不会被复制到子进程;或 2. 使用 Ray 任务和 actor 来并行化您的工作负载,并让 Ray 为您管理进程的生命周期。

代码示例#

import os

os.environ["RAY_DEDUP_LOGS"] = "0"

import ray
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
import numpy as np


@ray.remote
def generate_response(request):
    print(request)
    array = np.ones(100000)
    return array


def process_response(response, idx):
    print(f"Processing response {idx}")
    return response


def main():
    ray.init()
    responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)])

    # Better approach: Set the start method to "spawn"
    multiprocessing.set_start_method("spawn", force=True)

    with ProcessPoolExecutor(max_workers=4) as executor:
        future_to_task = {}
        for idx, response in enumerate(responses):
            future_to_task[executor.submit(process_response, response, idx)] = idx

        for future in as_completed(future_to_task):
            idx = future_to_task[future]
            response_entry = future.result()
            print(f"Response {idx} processed: {response_entry}")

    ray.shutdown()


if __name__ == "__main__":
    main()