SparkContext AddJar不生效问题排查过程记录

技术总结 yangchenhui 14416℃ 0评论

SparkContext AddJar不生效问题排查过程记录

问题背景

最近想做一个动态udf功能,用户在界面上传jar包,填写类名、函数名等基本信息后,在sql中就可以直接使用该udf。

问题描述

要实现这个功能,最开始想到的代码如下,使用原生的spark api

sparkSession.sparkContext().addJar("/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
sparkSession.udf().registerJava("mj_demo_test","com.junbo.udf.TestConcat",DataTypes.StringType);

可惜理想很丰满,现实很骨感,竟然报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Can not load class com.junbo.udf.TestConcat, please make sure it is on the classpath;
    at org.apache.spark.sql.UDFRegistration.registerJava(UDFRegistration.scala:613)
    at com.mj.web.listener.TestSpark.main(TestSpark.java:52)

排查过程

最初以为是自己的类名、jar包路径写的有问题,可是仔细检查都没有问题,甚至中文路径也想到了,都没有问题,那就只能从其他问题查找,查找了spark的api,描述如下:

public void addJar(String path)
Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
If a jar is added during execution, it will not be available until the next TaskSet starts.

Parameters:
path - can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
Note:
A path can be added only once. Subsequent additions of the same path are ignored.

从api描述来看,支持本地jar包,hdfs路径和local路径,替换了代码将file:///开头尝试

sparkSession.sparkContext().addJar("file:///Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");

然而错误依旧,过程中我换了路径,写了一个不存在的jar包名称,发现报错,那就说明不是路径问题。

疑惑了很久,跟踪代码,报错日志上看,是因为classloader没有加载到这个jar包,跟踪sparkContext.addJar()方法,也确实没有向classloader中添加jar包。比较奇怪!

继续度娘,发现 https://blog.csdn.net/vfgbv/article/details/51241881 ,里面描述这种方式不行,反而在sparksql中运行add jar命令反而可以。

sparkSession.sql("add jar /Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
sparkSession.sql("CREATE TEMPORARY FUNCTION mj_demo_test AS 'com.junbo.udf.TestConcat'");

不死心,为啥这样可以,查看spark运行详细日志,发现多了这行日志输出:

20/03/31 09:58:59 INFO audit: ugi=ych   ip=unknown-ip-addr  cmd=get_database: default   
20/03/31 09:59:00 INFO SessionState: Added [/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar] to class path
20/03/31 09:59:00 INFO SessionState: Added resources: [/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar]
20/03/31 09:59:00 INFO SparkContext: Added JAR /Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar at spark://127.0.0.1:58770/jars/udf-1.0.jar with timestamp 1585619940118
20/03/31 09:59:00 INFO HiveMetaStore: 0: get_database: default
20/03/31 09:59:00 INFO audit: ugi=ych   ip=unknown-ip-addr  cmd=get_database: default   

果然和猜测一样,这里多了将jar包增加到classpath的操作,查看SessionState源码

/**
   * Add a jar path to [[SparkContext]] and the classloader.
   *
   * Note: this method seems not access any session state, but a Hive based `SessionState` needs
   * to add the jar to its hive client for the current session. Hence, it still needs to be in
   * [[SessionState]].
   */
  def addJar(path: String): Unit = {
    session.sparkContext.addJar(path)
    val uri = new Path(path).toUri
    val jarURL = if (uri.getScheme == null) {
      // `path` is a local file path without a URL scheme
      new File(path).toURI.toURL
    } else {
      // `path` is a URL with a scheme
      uri.toURL
    }
    session.sharedState.jarClassLoader.addURL(jarURL)
    Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
  }

果然这里不但调用了sparkContext.addJar(),还额外将jar包路径添加到了classLoader中。

由于我个人比较习惯直接使用spark原生api,最后改下代码:

sparkSession.sessionState().resourceLoader().addJar("/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
sparkSession.udf().registerJava("mj_demo_test","com.junbo.udf.TestConcat",DataTypes.StringType);

测试ok,大功告成~

结论:

  1. sparkContext.addJar并不会将jar包增加到classLoader,具体什么原因,api怎么用,我自己也想不明白,有高手可以留言交流。

本文固定链接:杨晨辉的个人博客 » SparkContext AddJar不生效问题排查过程记录
本站内容除特别标注外均为原创,欢迎转载,但请保留出处!

喜欢 (3)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址