反模式:在应用程序代码中fork新进程#

摘要:请勿在Ray应用程序代码中fork新进程—例如,在驱动程序、任务或Actor中。相反,请使用“spawn”方法启动新进程,或使用Ray任务和Actor来并行化您的工作负载。

Ray会为您管理进程的生命周期。Ray对象、任务和Actor会管理与Raylet和GCS通信的套接字。如果在应用程序代码中fork新进程,这些进程可能会在没有任何同步的情况下共享相同的套接字。这可能导致消息损坏和意外行为。

解决方案是:1. 使用“spawn”方法启动新进程,以便父进程的内存空间不会复制到子进程;或者2. 使用Ray任务和Actor来并行化您的工作负载,让Ray为您管理进程的生命周期。

代码示例#

import os

os.environ["RAY_DEDUP_LOGS"] = "0"

import ray
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
import numpy as np


@ray.remote
def generate_response(request):
    print(request)
    array = np.ones(100000)
    return array


def process_response(response, idx):
    print(f"Processing response {idx}")
    return response


def main():
    ray.init()
    responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)])

    # Better approach: Set the start method to "spawn"
    multiprocessing.set_start_method("spawn", force=True)

    with ProcessPoolExecutor(max_workers=4) as executor:
        future_to_task = {}
        for idx, response in enumerate(responses):
            future_to_task[executor.submit(process_response, response, idx)] = idx

        for future in as_completed(future_to_task):
            idx = future_to_task[future]
            response_entry = future.result()
            print(f"Response {idx} processed: {response_entry}")

    ray.shutdown()


if __name__ == "__main__":
    main()