Spark-submit脚本有什么用

这篇文章主要介绍Spark-submit脚本有什么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:主机域名网站空间、营销软件、网站建设、鹿寨网站维护、网站推广。

spark程序的提交是通过spark-submit脚本实现的,我们从它开始一步步解开spark提交集群的步骤。

spark-submit的主要命令行:exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

是执行spark-class脚本,并将spark.deploy.SparkSubmit类作为第一个参数。

1、 spark-class

最关键的就是下面这句了:

CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
exec "${CMD[@]}"

首先循环读取ARG参数,加入到CMD中。然后执行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@ 这个是真正执行的第一个spark的类。

该类在launcher模块下,简单的浏览下代码:

public static void main(String[] argsArray) throws Exception {
   ...
    List args = new ArrayList(Arrays.asList(argsArray));
    String className = args.remove(0);
    ...
    //创建命令解析器
    AbstractCommandBuilder builder;
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        ...
      }
    } else {
      builder = new SparkClassCommandBuilder(className, args);
    }

    List cmd = builder.buildCommand(env);//解析器解析参数
    ...
    //返回有效的参数
    if (isWindows()) {
      System.out.println(prepareWindowsCommand(cmd, env));
    } else {
      List bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);
        System.out.print('\0');
      }
    }
  }

launcher.Main返回的数据存储到CMD中。

然后执行命令:

exec "${CMD[@]}"

这里开始真正执行某个Spark的类。

2、 deploy.SparkSubmit类

 private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(args, uninitLog)
            }
          })
        } catch {
          。。。
        }
      } else {
        runMain(args, uninitLog)
      }
    }    
      doRunMain()
  }

主要是通过runMain(args,unititLog)方法来提价spark jar包。

所以必须先看看runMain方法是干什么的:

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
        
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }

    var mainClass: Class[_] = null

    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      ...
    }

    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }    

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      ...
    }
  }

这就很清楚了,要做的事情有以下这些:获取类加载器,添加jar包依赖。创建SparkApplication类的可执行程序或者是JavaMainApplication,创建出来的类叫app。最后执行app.start方法。

SparkApplication是一个抽象类,我们就看看默认的JavaMainApplication就可以了,代码非常简单:

private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
  override def start(args: Array[String], conf: SparkConf): Unit = {
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }
    mainMethod.invoke(null, args)
  }

}

就是一个kclass的封装器,用来执行入参的kclass的main方法。这里的kclass就是我们编写的spark程序了,里面总有个main方法的。

以上是“Spark-submit脚本有什么用”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联行业资讯频道!


本文标题:Spark-submit脚本有什么用
标题URL:http://pwwzsj.com/article/pccdce.html