利用SparkLauncher 提交Job

利用SparkLauncher 提交Job一 适用背景 在学习 Spark 过程中 资料中介绍的提交 Spark Job 的方式主要有两种 我所知道的 第一种是通过命令行的方式提交 Job 使用 spark 自带的 spark submit 工具提交 官网和大多数参考资料都是已这种方式提交的 提交命令示例如下 spark submit

大家好,我是讯享网,很高兴认识大家。

一. 适用背景

在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:

./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar 

讯享网

第二种提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

二. 文章的目地

官网已有demo和API的情况下写这篇文章的目地:官网给出的demo 放在本机跑不了。出现的现象是程序结束了,什么输出都没有或者输出JAVA_HOME is not set,虽然我调用方法设置了,然而没啥用,因此把我搜索和加上在自己思考后能够运行的demo记录下来。

三. 相关demo

根据官网的示例这里有两种方式:


讯享网

第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集**互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

讯享网package com.learn.spark; import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; import java.util.HashMap; import java.util.concurrent.CountDownLatch; public class LanuncherAppV { 
    public static void main(String[] args) throws IOException, InterruptedException { 
    HashMap env = new HashMap(); //这两个属性必须设置 env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf"); env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151"); //可以不设置 //env.put("YARN_CONF_DIR",""); CountDownLatch countDownLatch = new CountDownLatch(1); //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在 SparkAppHandle handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("com.learn.spark.SimpleApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.app.id", "11222") .setConf("spark.driver.memory", "2g") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.instances", "32") .setConf("spark.executor.cores", "3") .setConf("spark.default.parallelism", "10") .setConf("spark.driver.allowMultipleContexts", "true") .setVerbose(true).startApplication(new SparkAppHandle.Listener() { 
    //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false @Override public void stateChanged(SparkAppHandle sparkAppHandle) { 
    if (sparkAppHandle.getState().isFinal()) { 
    countDownLatch.countDown(); } System.out.println("state:" + sparkAppHandle.getState().toString()); } @Override public void infoChanged(SparkAppHandle sparkAppHandle) { 
    System.out.println("Info:" + sparkAppHandle.getState().toString()); } }); System.out.println("The task is executing, please wait ...."); //线程等待任务结束 countDownLatch.await(); System.out.println("The task is finished!"); } } 
package com.learn.spark; import org.apache.spark.launcher.SparkAppHandle; import org.apache.spark.launcher.SparkLauncher; import java.io.IOException; import java.util.HashMap; public class LauncherApp { 
    public static void main(String[] args) throws IOException, InterruptedException { 
    HashMap env = new HashMap(); //这两个属性必须设置 env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf"); env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151"); //env.put("YARN_CONF_DIR",""); SparkLauncher handle = new SparkLauncher(env) .setSparkHome("/usr/local/spark") .setAppResource("/usr/local/spark/spark-demo.jar") .setMainClass("com.learn.spark.SimpleApp") .setMaster("yarn") .setDeployMode("cluster") .setConf("spark.app.id", "11222") .setConf("spark.driver.memory", "2g") .setConf("spark.akka.frameSize", "200") .setConf("spark.executor.memory", "1g") .setConf("spark.executor.instances", "32") .setConf("spark.executor.cores", "3") .setConf("spark.default.parallelism", "10") .setConf("spark.driver.allowMultipleContexts","true") .setVerbose(true); Process process =handle.launch(); InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input"); Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input"); inputThread.start(); InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error"); Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error"); errorThread.start(); System.out.println("Waiting for finish..."); int exitCode = process.waitFor(); System.out.println("Finished! Exit code:" + exitCode); } } 

使用的自定义InputStreamReaderRunnable类实现如下:

讯享网package com.learn.spark; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; public class InputStreamReaderRunnable implements Runnable { 
    private BufferedReader reader; private String name; public InputStreamReaderRunnable(InputStream is, String name) { 
    this.reader = new BufferedReader(new InputStreamReader(is)); this.name = name; } public void run() { 
    System.out.println("InputStream " + name + ":"); try { 
    String line = reader.readLine(); while (line != null) { 
    System.out.println(line); line = reader.readLine(); } reader.close(); } catch (IOException e) { 
    e.printStackTrace(); } } } 
小讯
上一篇 2025-03-26 13:03
下一篇 2025-02-07 13:12

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/15625.html