使用 Ray Core 实现简单的时间序列 AutoML#
提示
我们强烈建议使用 Ray Tune 进行超参数调优/AutoML,这将使你能够更快、更容易地构建它,并获得内置的日志记录、容错等许多好处。如果你认为你的用例 Ray Tune 无法支持,我们希望通过 Ray GitHub issue 等方式获取你的反馈。
AutoML (自动机器学习) 是一个广泛的主题,但本质上,它归结为为当前任务和数据集选择最佳模型(以及可能的预处理)。虽然存在多种高级 AutoML 框架,但我们可以仅使用 Ray Core 和无状态任务快速构建一个简单的解决方案。
如果你有兴趣应用更高级的优化算法,或者希望利用更高层次的抽象和多个内置功能,我们强烈建议使用 Ray Tune 的 Tuner。
在本 notebook 中,我们将构建一个 AutoML(或更准确地说,AutoTS)系统,该系统将为时间序列回归任务选择 statsforecast 模型和超参数的最佳组合——此处,我们将使用 M5 数据集 的一个分区。
简单的 AutoML 包括在相同数据上独立运行不同的函数(超参数配置)。我们将训练具有不同配置的模型并对其进行评估,以获得各种指标,例如均方误差。评估完所有配置后,我们将能够根据要使用的指标选择最佳配置。
为了使此示例更具实践性,我们将使用 时间序列交叉验证 (CV) 作为我们的评估策略。交叉验证的工作原理是对模型进行 k 次评估,每次选择数据的一个不同子集(折叠)进行训练和评估。这使得性能估计更可靠,并有助于防止过拟合,尤其是在数据量较小时。换句话说,我们将运行 n * k 次单独评估,其中 n 是配置数量,k 是折叠数量。
逐步指南#
让我们先导入 Ray 并初始化一个本地 Ray 集群。
from typing import List, Union, Callable, Dict, Type, Tuple
import time
import ray
import itertools
import pandas as pd
import numpy as np
from collections import defaultdict
from statsforecast import StatsForecast
from statsforecast.models import AutoETS, AutoARIMA, _TS
from pyarrow import parquet as pq
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error
ray.init(ignore_reinit_error=True)
我们将把逻辑分解为几个函数和一个 Ray 任务。
Ray 任务是 train_and_evaluate_fold
,它包含在数据的 CV 折叠上拟合和评估模型所需的所有逻辑。我们将任务结构化为接收数据集和将其分割为训练集和测试集的索引——这样,我们可以将数据集的一个实例保存在 Ray 对象存储中,并在每个任务中单独分割它。我们将此定义为 Ray 任务,因为我们希望所有折叠都在 Ray 集群上并行评估——Ray 将处理所有编排和执行。每个任务默认会保留 1 个 CPU 核心。
@ray.remote
def train_and_evaluate_fold(
model: _TS,
df: pd.DataFrame,
train_indices: np.ndarray,
test_indices: np.ndarray,
label_column: str,
metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],
freq: str = "D",
) -> Dict[str, float]:
try:
# Create the StatsForecast object with train data & model.
statsforecast = StatsForecast(models=[model], freq=freq)
# Make a forecast and calculate metrics on test data.
# This will fit the model first automatically.
forecast = statsforecast.forecast(h=len(test_indices), df=df.iloc[train_indices])
return {
metric_name: metric(
df.iloc[test_indices][label_column], forecast[model.__class__.__name__]
)
for metric_name, metric in metrics.items()
}
except Exception as ex:
print(f"Exception generating forecast for model {model}: {ex}")
# In case the model fit or eval fails, return None for all metrics.
return {metric_name: None for metric_name, metric in metrics.items()}
evaluate_models_with_cv
是一个驱动函数,用于运行我们的优化循环。我们接收一个模型列表(参数已设置)和 dataframe。
dataframe 被放入 Ray 对象存储并被重用,这意味着我们只需要对其进行一次序列化。这样,我们就可以避免 反模式:重复按值传递相同的巨大参数会损害性能。
我们将每个折叠的拟合视为一个单独的任务。我们为每个模型生成 k 个任务,并通过调用 ray.get()
等待它们完成,ray.get()
会阻塞直到所有任务完成并收集结果。然后,我们汇总返回的指标,计算每个模型的每个折叠的平均指标。
def evaluate_models_with_cv(
models: List[_TS],
df: pd.DataFrame,
label_column: str,
metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],
freq: str = "D",
cv: Union[int, TimeSeriesSplit] = 5,
) -> Dict[_TS, Dict[str, float]]:
# Obtain CV train-test indices for each fold.
if isinstance(cv, int):
cv = TimeSeriesSplit(cv)
train_test_indices = list(cv.split(df))
# Put df into Ray object store for better performance.
df_ref = ray.put(df)
# Add tasks to be executed for each fold.
fold_refs = []
for model in models:
fold_refs.extend(
[
train_and_evaluate_fold.remote(
model,
df_ref,
train_indices,
test_indices,
label_column,
metrics,
freq=freq,
)
for train_indices, test_indices in train_test_indices
]
)
# wait on all tasks
fold_results = []
while fold_refs:
ready_refs, fold_refs = ray.wait(fold_refs)
fold_results.extend(ray.get(ready_refs))
# Split fold results into a list of CV splits-sized chunks.
# Ray guarantees that order is preserved.
fold_results_per_model = [
fold_results[i : i + len(train_test_indices)]
for i in range(0, len(fold_results), len(train_test_indices))
]
# Aggregate and average results from all folds per model.
# We go from a list of dicts to a dict of lists and then
# get a mean of those lists.
mean_results_per_model = []
for model_results in fold_results_per_model:
aggregated_results = defaultdict(list)
for fold_result in model_results:
for metric, value in fold_result.items():
aggregated_results[metric].append(value)
mean_results = {
metric: np.mean(values) for metric, values in aggregated_results.items()
}
mean_results_per_model.append(mean_results)
# Join models and their metrics together.
mean_results_per_model = {
models[i]: mean_results_per_model[i] for i in range(len(mean_results_per_model))
}
return mean_results_per_model
最后,我们必须定义逻辑,将字典搜索空间转换为我们可以传递给 evaluate_models_with_cv
的实例化模型。
注意
scikit-learn 和 statsforecast 模型可以轻松序列化且非常小,这意味着实例化模型可以轻松地在 Ray 集群中传递。对于其他框架,例如 Torch,你可能需要在拟合模型的任务中实例化模型,以避免问题。
我们的 generate_configurations
生成器转换一个两级字典,其中键是模型类,值是参数字典及其可能值的列表。我们想运行网格搜索,这意味着我们要评估给定模型的每种可能的超参数组合。
我们稍后将使用的搜索空间如下所示
{
AutoARIMA: {},
AutoETS: {
"season_length": [6, 7],
"model": ["ZNA", "ZZZ"]
}
}
它将转换为以下模型
AutoARIMA(),
AutoETS(season_length=6, model="ZNA")
AutoETS(season_length=7, model="ZNA")
AutoETS(season_length=6, model="ZZZ")
AutoETS(season_length=7, model="ZZZ")
evaluate_search_space_with_cv
是我们 AutoML 系统的入口点,它接收搜索空间、dataframe、标签列、指标、用于选择最佳配置的指标、是要最小化还是最大化它、数据的频率以及要使用的 scikit-learn TimeSeriesSplit
交叉验证分割器。
def generate_configurations(search_space: Dict[Type[_TS], Dict[str, list]]) -> _TS:
# Convert dict search space into configurations - models instantiated with specific arguments.
for model, model_search_space in search_space.items():
kwargs, values = model_search_space.keys(), model_search_space.values()
# Get a product - all combinations in the per-model grid.
for configuration in itertools.product(*values):
yield model(**dict(zip(kwargs, configuration)))
def evaluate_search_space_with_cv(
search_space: Dict[Type[_TS], Dict[str, list]],
df: pd.DataFrame,
label_column: str,
metrics: Dict[str, Callable[[pd.Series, pd.Series], float]],
eval_metric: str,
mode: str = "min",
freq: str = "D",
cv: Union[int, TimeSeriesSplit] = 5,
) -> List[Tuple[_TS, Dict[str, float]]]:
assert eval_metric in metrics
assert mode in ("min", "max")
configurations = list(generate_configurations(search_space))
print(
f"Evaluating {len(configurations)} configurations with {cv.get_n_splits()} splits each, "
f"totalling {len(configurations)*cv.get_n_splits()} tasks..."
)
ret = evaluate_models_with_cv(
configurations, df, label_column, metrics, freq=freq, cv=cv
)
# Sort the results by eval_metric
ret = sorted(ret.items(), key=lambda x: x[1][eval_metric], reverse=(mode == "max"))
print("Evaluation complete!")
return ret
系统构建完成后,我们只需要一个快速辅助函数来从 S3 存储桶中获取数据,并将其预处理为 statsforecast 所需的格式。由于数据集相当大,我们使用 PyArrow 的下推谓词作为过滤器,只获取我们关注的行,而无需将所有数据加载到内存中。
def get_m5_partition(unique_id: str) -> pd.DataFrame:
ds1 = pq.read_table(
"s3://anonymous@m5-benchmarks/data/train/target.parquet",
filters=[("item_id", "=", unique_id)],
)
Y_df = ds1.to_pandas()
# StatsForecasts expects specific column names!
Y_df = Y_df.rename(
columns={"item_id": "unique_id", "timestamp": "ds", "demand": "y"}
)
Y_df["unique_id"] = Y_df["unique_id"].astype(str)
Y_df["ds"] = pd.to_datetime(Y_df["ds"])
Y_df = Y_df.dropna()
constant = 10
Y_df["y"] += constant
return Y_df[Y_df.unique_id == unique_id]
df = get_m5_partition("FOODS_1_001_CA_1")
df
unique_id | ds | y | |
---|---|---|---|
0 | FOODS_1_001_CA_1 | 2011-01-29 | 13.0 |
1 | FOODS_1_001_CA_1 | 2011-01-30 | 10.0 |
2 | FOODS_1_001_CA_1 | 2011-01-31 | 10.0 |
3 | FOODS_1_001_CA_1 | 2011-02-01 | 11.0 |
4 | FOODS_1_001_CA_1 | 2011-02-02 | 14.0 |
... | ... | ... | ... |
1936 | FOODS_1_001_CA_1 | 2016-05-18 | 10.0 |
1937 | FOODS_1_001_CA_1 | 2016-05-19 | 11.0 |
1938 | FOODS_1_001_CA_1 | 2016-05-20 | 10.0 |
1939 | FOODS_1_001_CA_1 | 2016-05-21 | 10.0 |
1940 | FOODS_1_001_CA_1 | 2016-05-22 | 10.0 |
1941 行 × 3 列
现在我们可以使用我们的搜索空间运行 AutoML 系统,并获得最佳模型及其配置。我们将使用 scikit-learn 实现的均方误差 (MSE) 和平均绝对误差 (MAE) 作为指标,其中前者是我们要优化的目标。
tuning_results = evaluate_search_space_with_cv(
{AutoARIMA: {}, AutoETS: {"season_length": [6, 7], "model": ["ZNA", "ZZZ"]}},
df,
"y",
{"mse": mean_squared_error, "mae": mean_absolute_error},
"mse",
cv=TimeSeriesSplit(test_size=1),
)
我们可以看到,在我们的搜索空间中,最小化 MSE 最好的模型是季节长度为 6 的 ZNA AutoETS 模型。
print(tuning_results[0])
# Print arguments of the model:
print(tuning_results[0][0].__dict__)