服务一个 Java 应用#
要使用 Java Ray Serve,你需要在 pom.xml 中添加以下依赖。
<dependency>
<groupId>io.ray</groupId>
<artifactId>ray-serve</artifactId>
<version>${ray.version}</version>
<scope>provided</scope>
</dependency>
注意:安装 Ray Python 版本后,本地环境会包含 Ray Serve 的 Java jar 包。
provided
scope 确保了在使用 Ray Serve 编译 Java 代码时不会在集群部署时发生版本冲突。
示例模型#
本示例用例是金融应用的生产工作流。该应用需要计算单个任务与不同银行交互的最佳策略。
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class Strategy {
public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
List<String> results = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
results.addAll(calcBankIndicators(time, bank, indicators));
}
}
return results;
}
public List<String> calcBankIndicators(Long time, String bank, List<String> indicators) {
List<String> results = new ArrayList<>();
for (String indicator : indicators) {
results.add(calcIndicator(time, bank, indicator));
}
return results;
}
public String calcIndicator(Long time, String bank, String indicator) {
// do bank data calculation
return bank + "-" + indicator + "-" + time; // Demo;
}
}
本示例使用 Strategy
类计算多家银行的指标。
calc
方法是计算的入口。输入参数是计算时间间隔以及银行及其指标的映射。calc
方法包含一个两层for
循环,遍历每个银行的每个指标列表,并调用calcBankIndicators
方法计算指定银行的指标。
calcBankIndicators
方法中还有一层for
循环,遍历每个指标,然后调用calcIndicator
方法计算银行的特定指标。calcIndicator
方法是基于银行、指定时间间隔和指标的特定计算逻辑。
此代码使用 Strategy
类
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class StrategyCalc {
public static void main(String[] args) {
long time = System.currentTimeMillis();
String bank1 = "demo_bank_1";
String bank2 = "demo_bank_2";
String indicator1 = "demo_indicator_1";
String indicator2 = "demo_indicator_2";
Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
banksAndIndicators.put(
bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));
Strategy strategy = new Strategy();
List<String> results = strategy.calc(time, banksAndIndicators);
System.out.println(results);
}
}
当银行和指标规模扩大时,三层 for
循环会减慢计算速度。即使使用线程池并行计算每个指标,也可能遇到单机性能瓶颈。此外,你不能将此 Strategy
对象用作常驻服务。
转换为 Ray Serve 部署#
通过 Ray Serve,你可以将 Strategy
的核心计算逻辑部署为可扩展的分布式计算服务。
首先,将每个机构的指标计算提取到一个单独的 StrategyOnRayServe
类中
public class StrategyOnRayServe {
public String calcIndicator(Long time, String bank, String indicator) {
// do bank data calculation
return bank + "-" + indicator + "-" + time; // Demo;
}
}
接下来,启动 Ray Serve 运行时并将 StrategyOnRayServe
部署为一个 deployment。
public void deploy() {
Serve.start(null);
Application deployment =
Serve.deployment()
.setName("strategy")
.setDeploymentDef(StrategyOnRayServe.class.getName())
.setNumReplicas(4)
.bind();
Serve.run(deployment);
}
Deployment.create
创建一个名为 strategy
的 Deployment 对象。执行 Deployment.deploy
后,Ray Serve 实例会部署此 strategy
deployment 并带有四个副本,你可以访问它进行分布式并行计算。
测试 Ray Serve 部署#
你可以在 Ray 内部使用 RayServeHandle 测试 strategy
deployment
public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
Deployment deployment = Serve.getDeployment("strategy");
List<String> results = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
results.add(
(String)
deployment
.getHandle()
.method("calcIndicator")
.remote(time, bank, indicator)
.result());
}
}
}
return results;
}
此代码串行执行每个银行指标的计算,并将其发送到 Ray 执行。你可以使计算并发化,这不仅提高了计算效率,还解决了单机瓶颈。
public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
Deployment deployment = Serve.getDeployment("strategy");
List<String> results = new ArrayList<>();
List<DeploymentResponse> responses = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
responses.add(
deployment.getHandle().method("calcIndicator").remote(time, bank, indicator));
}
}
}
for (DeploymentResponse response : responses) {
results.add((String) response.result());
}
return results;
}
你可以像 main
方法中的示例那样使用 StrategyCalcOnRayServe
public static void main(String[] args) {
long time = System.currentTimeMillis();
String bank1 = "demo_bank_1";
String bank2 = "demo_bank_2";
String indicator1 = "demo_indicator_1";
String indicator2 = "demo_indicator_2";
Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
banksAndIndicators.put(
bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));
StrategyCalcOnRayServe strategy = new StrategyCalcOnRayServe();
strategy.deploy();
List<String> results = strategy.parallelCalc(time, banksAndIndicators);
System.out.println(results);
}
使用 HTTP 调用 Ray Serve 部署#
测试或调用 deployment 的另一种方式是通过 HTTP 请求。然而,Java deployment 存在两个限制
只有用户类的
call
方法才能处理 HTTP 请求。call
方法只能有一个输入参数,并且输入参数和返回值的类型只能是String
。
如果你想通过 HTTP 调用 strategy
deployment,那么你可以像这段代码一样重写该类
import com.google.gson.Gson;
public class HttpStrategyOnRayServe {
static class BankIndicator {
long time;
String bank;
String indicator;
}
private Gson gson = new Gson();
public String call(String dataJson) {
BankIndicator data = gson.fromJson(dataJson, BankIndicator.class);
// do bank data calculation
return data.bank + "-" + data.indicator + "-" + data.time; // Demo;
}
}
部署此 deployment 后,你可以使用 curl
命令访问它
curl -d '{"time":1641038674, "bank":"test_bank", "indicator":"test_indicator"}' http://127.0.0.1:8000/strategy
你也可以在 Java 代码中使用 HTTP Client 访问它
private Gson gson = new Gson();
public String httpCalc(Long time, String bank, String indicator) {
Map<String, Object> data = new HashMap<>();
data.put("time", time);
data.put("bank", bank);
data.put("indicator", indicator);
String result;
try {
result =
Request.post("http://127.0.0.1:8000/http-strategy")
.bodyString(gson.toJson(data), null)
.execute()
.returnContent()
.asString();
} catch (IOException e) {
result = "error";
}
return result;
}
使用 HTTP 访问 deployment 进行策略计算的示例如下
public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
List<String> results = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
results.add(httpCalc(time, bank, indicator));
}
}
}
return results;
}
你也可以重写此代码以支持并发
private ExecutorService executorService = Executors.newFixedThreadPool(4);
public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
List<String> results = new ArrayList<>();
List<Future<String>> futures = new ArrayList<>();
for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
String bank = e.getKey();
for (List<String> indicators : e.getValue()) {
for (String indicator : indicators) {
futures.add(executorService.submit(() -> httpCalc(time, bank, indicator)));
}
}
}
for (Future<String> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException | ExecutionException e1) {
results.add("error");
}
}
return results;
}
最后,HttpStrategyCalcOnRayServe
的完整用法类似于此代码
public static void main(String[] args) {
long time = System.currentTimeMillis();
String bank1 = "demo_bank_1";
String bank2 = "demo_bank_2";
String indicator1 = "demo_indicator_1";
String indicator2 = "demo_indicator_2";
Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
banksAndIndicators.put(
bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));
HttpStrategyCalcOnRayServe strategy = new HttpStrategyCalcOnRayServe();
strategy.deploy();
List<String> results = strategy.parallelCalc(time, banksAndIndicators);
System.out.println(results);
}