反模式:在应用代码中 Fork 新进程#
总结: 不要在 Ray 应用代码中 Fork 新进程,例如在 driver、任务或 actor 中。而是使用“spawn”方法启动新进程,或者使用 Ray 任务和 actor 来并行化您的工作负载。
Ray 为您管理进程的生命周期。Ray 对象、任务和 Actor 管理与 Raylet 和 GCS 通信的套接字。如果您在应用代码中 Fork 新进程,这些进程可能会共享相同的套接字而没有任何同步。这可能导致消息损坏和意外行为。
解决方案是:1. 使用“spawn”方法启动新进程,以便父进程的内存空间不会被复制到子进程;或 2. 使用 Ray 任务和 actor 来并行化您的工作负载,并让 Ray 为您管理进程的生命周期。
代码示例#
import os
os.environ["RAY_DEDUP_LOGS"] = "0"
import ray
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
import numpy as np
@ray.remote
def generate_response(request):
print(request)
array = np.ones(100000)
return array
def process_response(response, idx):
print(f"Processing response {idx}")
return response
def main():
ray.init()
responses = ray.get([generate_response.remote(f"request {i}") for i in range(4)])
# Better approach: Set the start method to "spawn"
multiprocessing.set_start_method("spawn", force=True)
with ProcessPoolExecutor(max_workers=4) as executor:
future_to_task = {}
for idx, response in enumerate(responses):
future_to_task[executor.submit(process_response, response, idx)] = idx
for future in as_completed(future_to_task):
idx = future_to_task[future]
response_entry = future.result()
print(f"Response {idx} processed: {response_entry}")
ray.shutdown()
if __name__ == "__main__":
main()