欢迎来到 Ray
一个开源框架,可轻松构建和扩展您的 ML 和 Python 应用
使用 Ray 进行扩展
from typing import Dict
import numpy as np
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
ds = ray.data.from_numpy(np.asarray(["Complete this", "for me"]))
# Step 2: Define a Predictor class for inference.
class HuggingFacePredictor:
def __init__(self):
from transformers import pipeline
# Initialize a pre-trained GPT2 Huggingface pipeline.
self.model = pipeline("text-generation", model="gpt2")
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
# Get the predictions from the input batch.
predictions = self.model(
list(batch["data"]), max_length=20, num_return_sequences=1)
# `predictions` is a list of length-one lists. For example:
# [[{"generated_text": "output_1"}], ..., [{"generated_text": "output_2"}]]
# Modify the output to get it into the following format instead:
# ["output_1", "output_2"]
batch["output"] = [sequences[0]["generated_text"] for sequences in predictions]
return batch
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(HuggingFacePredictor, concurrency=2)
# Step 4: Show one prediction output.
predictions.show(limit=1)
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# Step 1: Set up PyTorch model training as you normally would.
def train_func():
model = ...
train_dataset = ...
for epoch in range(num_epochs):
... # model training logic
# Step 2: Set up Ray's PyTorch Trainer to run on 32 GPUs.
trainer = TorchTrainer(
train_loop_per_worker=train_func,
scaling_config=ScalingConfig(num_workers=32, use_gpu=True),
datasets={"train": train_dataset},
)
# Step 3: Run distributed model training on 32 GPUs.
result = trainer.fit()
from ray import tune
from ray.train import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
train_dataset, eval_dataset = ...
# Step 1: Set up Ray's LightGBM Trainer to train on 64 CPUs.
trainer = LightGBMTrainer(
...
scaling_config=ScalingConfig(num_workers=64),
datasets={"train": train_dataset, "eval": eval_dataset},
)
# Step 2: Set up Ray Tuner to run 1000 trials.
tuner = tune.Tuner(
trainer=trainer,
param_space=hyper_param_space,
tune_config=tune.TuneConfig(num_samples=1000),
)
# Step 3: Run distributed HPO with 1000 trials; each trial runs on 64 CPUs.
result_grid = tuner.fit()
from io import BytesIO
from fastapi import FastAPI
from fastapi.responses import Response
import torch
from ray import serve
from ray.serve.handle import DeploymentHandle
app = FastAPI()
@serve.deployment(num_replicas=1)
@serve.ingress(app)
class APIIngress:
def __init__(self, diffusion_model_handle: DeploymentHandle) -> None:
self.handle = diffusion_model_handle
@app.get(
"/imagine",
responses={200: {"content": {"image/png": {}}}},
response_class=Response,
)
async def generate(self, prompt: str, img_size: int = 512):
assert len(prompt), "prompt parameter cannot be empty"
image = await self.handle.generate.remote(prompt, img_size=img_size)
file_stream = BytesIO()
image.save(file_stream, "PNG")
return Response(content=file_stream.getvalue(), media_type="image/png")
@serve.deployment(
ray_actor_options={"num_gpus": 1},
autoscaling_config={"min_replicas": 0, "max_replicas": 2},
)
class StableDiffusionV2:
def __init__(self):
from diffusers import EulerDiscreteScheduler, StableDiffusionPipeline
model_id = "stabilityai/stable-diffusion-2"
scheduler = EulerDiscreteScheduler.from_pretrained(
model_id, subfolder="scheduler"
)
self.pipe = StableDiffusionPipeline.from_pretrained(
model_id, scheduler=scheduler, revision="fp16", torch_dtype=torch.float16
)
self.pipe = self.pipe.to("cuda")
def generate(self, prompt: str, img_size: int = 512):
assert len(prompt), "prompt parameter cannot be empty"
with torch.autocast("cuda"):
image = self.pipe(prompt, height=img_size, width=img_size).images[0]
return image
entrypoint = APIIngress.bind(StableDiffusionV2.bind())
from ray.rllib.algorithms.ppo import PPOConfig
# Step 1: Configure PPO to run 64 parallel workers to collect samples from the env.
ppo_config = (
PPOConfig()
.environment(env="Taxi-v3")
.rollouts(num_rollout_workers=64)
.framework("torch")
.training(model=rnn_lage)
)
# Step 2: Build the PPO algorithm.
ppo_algo = ppo_config.build()
# Step 3: Train and evaluate PPO.
for _ in range(5):
print(ppo_algo.train())
ppo_algo.evaluate()