更新时间:2022-07-19 GMT+08:00

如何采用Java命令提交Spark应用

问题

除了spark-submit命令提交应用外,如何采用Java命令提交Spark应用?

回答

您可以通过org.apache.spark.launcher.SparkLauncher类采用java命令方式提交Spark应用。详细步骤如下:

  1. 定义org.apache.spark.launcher.SparkLauncher类。默认提供了SparkLauncherJavaExample和SparkLauncherScalaExample示例,您需要根据实际业务应用程序修改示例代码中的传入参数。

    • 如果您使用Java语言开发程序,您可以参考如下示例,编写SparkLauncher类。
          public static void main(String[] args) throws Exception {
              System.out.println("com.huawei.bigdata.spark.examples.SparkLauncherExample <mode> <jarParh> <app_main_class> <appArgs>");
              SparkLauncher launcher = new SparkLauncher();
              launcher.setMaster(args[0])
                  .setAppResource(args[1]) // Specify user app jar path
                  .setMainClass(args[2]);
              if (args.length > 3) {
                  String[] list = new String[args.length - 3];
                  for (int i = 3; i < args.length; i++) {
                      list[i-3] = args[i];
                  }
                  // Set app args
                  launcher.addAppArgs(list);
              }
      
              // Launch the app
              Process process = launcher.launch();
              // Get Spark driver log
              new Thread(new ISRRunnable(process.getErrorStream())).start();
              int exitCode = process.waitFor();
              System.out.println("Finished! Exit code is "  + exitCode);
          }
    • 如果您使用Scala语言开发程序,您可以参考如下示例,编写SparkLauncher类。
        def main(args: Array[String]) {
          println(s"com.huawei.bigdata.spark.examples.SparkLauncherExample <mode> <jarParh>  <app_main_class> <appArgs>")
          val launcher = new SparkLauncher()
          launcher.setMaster(args(0))
            .setAppResource(args(1)) // Specify user app jar path
            .setMainClass(args(2))
            if (args.drop(3).length > 0) {
              // Set app args
              launcher.addAppArgs(args.drop(3): _*)
            }
      
      
          // Launch the app
          val process = launcher.launch()
          // Get Spark driver log
          new Thread(new ISRRunnable(process.getErrorStream)).start()
          val exitCode = process.waitFor()
          println(s"Finished! Exit code is $exitCode")
        }

  2. 根据业务逻辑,开发对应的Spark应用程序,并设置用户编写的Spark应用程序的主类等常数。

    如果您使用的是普通模式,准备业务应用代码及其相关配置即可。

  3. 调用org.apache.spark.launcher.SparkLauncher.launch()方法,将用户的应用程序提交。

    1. 将SparkLauncher程序和用户应用程序分别生成Jar包,并上传至运行此应用的Spark节点中。
      • SparkLauncher程序的编译依赖包为spark-launcher_2.10-1.5.1.jar。
      • 用户应用程序的编译依赖包根据代码不同而不同,需用户根据自己编写的代码进行加载。
    2. 将运行程序的依赖Jar包上传至需要运行此应用的节点中,例如“$SPARK_HOME/lib”路径。

      用户需要将SparkLauncher类的运行依赖包和应用程序运行依赖包上传至客户端的lib路径。文档中提供的示例代码,其运行依赖包在客户端lib中已存在。

      SparkLauncher的方式依赖Spark客户端,即运行程序的节点必须已安装Spark客户端,且客户端可用。运行过程中依赖客户端已配置好的环境变量、运行依赖包和配置文件,

    3. 在Spark应用程序运行节点,执行如下命令使用SparkLauncher方式提交。

      java -cp $SPARK_HOME/conf:$SPARK_HOME/lib/*:SparkLauncherExample.jar com.huawei.bigdata.spark.examples.SparkLauncherExample yarn-client /opt/female/FemaleInfoCollection.jar com.huawei.bigdata.spark.examples.FemaleInfoCollection <inputPath>