服务一个 Java 应用#

要使用 Java Ray Serve,你需要在 pom.xml 中添加以下依赖。

<dependency>
  <groupId>io.ray</groupId>
  <artifactId>ray-serve</artifactId>
  <version>${ray.version}</version>
  <scope>provided</scope>
</dependency>

注意:安装 Ray Python 版本后,本地环境会包含 Ray Serve 的 Java jar 包。provided scope 确保了在使用 Ray Serve 编译 Java 代码时不会在集群部署时发生版本冲突。

示例模型#

本示例用例是金融应用的生产工作流。该应用需要计算单个任务与不同银行交互的最佳策略。

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class Strategy {

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        results.addAll(calcBankIndicators(time, bank, indicators));
      }
    }
    return results;
  }

  public List<String> calcBankIndicators(Long time, String bank, List<String> indicators) {
    List<String> results = new ArrayList<>();
    for (String indicator : indicators) {
      results.add(calcIndicator(time, bank, indicator));
    }
    return results;
  }

  public String calcIndicator(Long time, String bank, String indicator) {
    // do bank data calculation
    return bank + "-" + indicator + "-" + time; // Demo;
  }
}

本示例使用 Strategy 类计算多家银行的指标。

  • calc 方法是计算的入口。输入参数是计算时间间隔以及银行及其指标的映射。calc 方法包含一个两层 for 循环,遍历每个银行的每个指标列表,并调用 calcBankIndicators 方法计算指定银行的指标。

  • calcBankIndicators 方法中还有一层 for 循环,遍历每个指标,然后调用 calcIndicator 方法计算银行的特定指标。

  • calcIndicator 方法是基于银行、指定时间间隔和指标的特定计算逻辑。

此代码使用 Strategy

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StrategyCalc {

  public static void main(String[] args) {
    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    Strategy strategy = new Strategy();
    List<String> results = strategy.calc(time, banksAndIndicators);

    System.out.println(results);
  }
}

当银行和指标规模扩大时,三层 for 循环会减慢计算速度。即使使用线程池并行计算每个指标,也可能遇到单机性能瓶颈。此外,你不能将此 Strategy 对象用作常驻服务。

转换为 Ray Serve 部署#

通过 Ray Serve,你可以将 Strategy 的核心计算逻辑部署为可扩展的分布式计算服务。

首先,将每个机构的指标计算提取到一个单独的 StrategyOnRayServe 类中

public class StrategyOnRayServe {

  public String calcIndicator(Long time, String bank, String indicator) {
    // do bank data calculation
    return bank + "-" + indicator + "-" + time; // Demo;
  }
}

接下来,启动 Ray Serve 运行时并将 StrategyOnRayServe 部署为一个 deployment。

  public void deploy() {
    Serve.start(null);

    Application deployment =
        Serve.deployment()
            .setName("strategy")
            .setDeploymentDef(StrategyOnRayServe.class.getName())
            .setNumReplicas(4)
            .bind();
    Serve.run(deployment);
  }

Deployment.create 创建一个名为 strategy 的 Deployment 对象。执行 Deployment.deploy 后,Ray Serve 实例会部署此 strategy deployment 并带有四个副本,你可以访问它进行分布式并行计算。

测试 Ray Serve 部署#

你可以在 Ray 内部使用 RayServeHandle 测试 strategy deployment

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    Deployment deployment = Serve.getDeployment("strategy");

    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          results.add(
              (String)
                  deployment
                      .getHandle()
                      .method("calcIndicator")
                      .remote(time, bank, indicator)
                      .result());
        }
      }
    }
    return results;
  }

此代码串行执行每个银行指标的计算,并将其发送到 Ray 执行。你可以使计算并发化,这不仅提高了计算效率,还解决了单机瓶颈。

  public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    Deployment deployment = Serve.getDeployment("strategy");

    List<String> results = new ArrayList<>();
    List<DeploymentResponse> responses = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          responses.add(
              deployment.getHandle().method("calcIndicator").remote(time, bank, indicator));
        }
      }
    }
    for (DeploymentResponse response : responses) {
      results.add((String) response.result());
    }
    return results;
  }

你可以像 main 方法中的示例那样使用 StrategyCalcOnRayServe

  public static void main(String[] args) {

    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    StrategyCalcOnRayServe strategy = new StrategyCalcOnRayServe();
    strategy.deploy();
    List<String> results = strategy.parallelCalc(time, banksAndIndicators);

    System.out.println(results);
  }

使用 HTTP 调用 Ray Serve 部署#

测试或调用 deployment 的另一种方式是通过 HTTP 请求。然而,Java deployment 存在两个限制

  • 只有用户类的 call 方法才能处理 HTTP 请求。

  • call 方法只能有一个输入参数,并且输入参数和返回值的类型只能是 String

如果你想通过 HTTP 调用 strategy deployment,那么你可以像这段代码一样重写该类

import com.google.gson.Gson;

public class HttpStrategyOnRayServe {

  static class BankIndicator {
    long time;
    String bank;
    String indicator;
  }

  private Gson gson = new Gson();

  public String call(String dataJson) {
    BankIndicator data = gson.fromJson(dataJson, BankIndicator.class);
    // do bank data calculation
    return data.bank + "-" + data.indicator + "-" + data.time; // Demo;
  }
}

部署此 deployment 后,你可以使用 curl 命令访问它

curl -d '{"time":1641038674, "bank":"test_bank", "indicator":"test_indicator"}' http://127.0.0.1:8000/strategy

你也可以在 Java 代码中使用 HTTP Client 访问它

  private Gson gson = new Gson();

  public String httpCalc(Long time, String bank, String indicator) {
    Map<String, Object> data = new HashMap<>();
    data.put("time", time);
    data.put("bank", bank);
    data.put("indicator", indicator);

    String result;
    try {
      result =
          Request.post("http://127.0.0.1:8000/http-strategy")
              .bodyString(gson.toJson(data), null)
              .execute()
              .returnContent()
              .asString();
    } catch (IOException e) {
      result = "error";
    }

    return result;
  }

使用 HTTP 访问 deployment 进行策略计算的示例如下

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {

    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          results.add(httpCalc(time, bank, indicator));
        }
      }
    }
    return results;
  }

你也可以重写此代码以支持并发

  private ExecutorService executorService = Executors.newFixedThreadPool(4);

  public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {

    List<String> results = new ArrayList<>();
    List<Future<String>> futures = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          futures.add(executorService.submit(() -> httpCalc(time, bank, indicator)));
        }
      }
    }
    for (Future<String> future : futures) {
      try {
        results.add(future.get());
      } catch (InterruptedException | ExecutionException e1) {
        results.add("error");
      }
    }
    return results;
  }

最后,HttpStrategyCalcOnRayServe 的完整用法类似于此代码

  public static void main(String[] args) {

    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    HttpStrategyCalcOnRayServe strategy = new HttpStrategyCalcOnRayServe();
    strategy.deploy();
    List<String> results = strategy.parallelCalc(time, banksAndIndicators);

    System.out.println(results);
  }