SparkContext AddJar不生效问题排查过程记录
问题背景
最近想做一个动态udf功能,用户在界面上传jar包,填写类名、函数名等基本信息后,在sql中就可以直接使用该udf。
问题描述
要实现这个功能,最开始想到的代码如下,使用原生的spark api
sparkSession.sparkContext().addJar("/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
sparkSession.udf().registerJava("mj_demo_test","com.junbo.udf.TestConcat",DataTypes.StringType);
可惜理想很丰满,现实很骨感,竟然报错:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Can not load class com.junbo.udf.TestConcat, please make sure it is on the classpath;
at org.apache.spark.sql.UDFRegistration.registerJava(UDFRegistration.scala:613)
at com.mj.web.listener.TestSpark.main(TestSpark.java:52)
排查过程
最初以为是自己的类名、jar包路径写的有问题,可是仔细检查都没有问题,甚至中文路径也想到了,都没有问题,那就只能从其他问题查找,查找了spark的api,描述如下:
public void addJar(String path)
Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
If a jar is added during execution, it will not be available until the next TaskSet starts.
Parameters:
path - can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
Note:
A path can be added only once. Subsequent additions of the same path are ignored.
从api描述来看,支持本地jar包,hdfs路径和local路径,替换了代码将file:///开头尝试
sparkSession.sparkContext().addJar("file:///Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
然而错误依旧,过程中我换了路径,写了一个不存在的jar包名称,发现报错,那就说明不是路径问题。
疑惑了很久,跟踪代码,报错日志上看,是因为classloader没有加载到这个jar包,跟踪sparkContext.addJar()方法,也确实没有向classloader中添加jar包。比较奇怪!
继续度娘,发现 https://blog.csdn.net/vfgbv/article/details/51241881 ,里面描述这种方式不行,反而在sparksql中运行add jar命令反而可以。
sparkSession.sql("add jar /Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
sparkSession.sql("CREATE TEMPORARY FUNCTION mj_demo_test AS 'com.junbo.udf.TestConcat'");
不死心,为啥这样可以,查看spark运行详细日志,发现多了这行日志输出:
20/03/31 09:58:59 INFO audit: ugi=ych ip=unknown-ip-addr cmd=get_database: default
20/03/31 09:59:00 INFO SessionState: Added [/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar] to class path
20/03/31 09:59:00 INFO SessionState: Added resources: [/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar]
20/03/31 09:59:00 INFO SparkContext: Added JAR /Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar at spark://127.0.0.1:58770/jars/udf-1.0.jar with timestamp 1585619940118
20/03/31 09:59:00 INFO HiveMetaStore: 0: get_database: default
20/03/31 09:59:00 INFO audit: ugi=ych ip=unknown-ip-addr cmd=get_database: default
果然和猜测一样,这里多了将jar包增加到classpath的操作,查看SessionState源码
/**
* Add a jar path to [[SparkContext]] and the classloader.
*
* Note: this method seems not access any session state, but a Hive based `SessionState` needs
* to add the jar to its hive client for the current session. Hence, it still needs to be in
* [[SessionState]].
*/
def addJar(path: String): Unit = {
session.sparkContext.addJar(path)
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
session.sharedState.jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
}
果然这里不但调用了sparkContext.addJar(),还额外将jar包路径添加到了classLoader中。
由于我个人比较习惯直接使用spark原生api,最后改下代码:
sparkSession.sessionState().resourceLoader().addJar("/Users/ych/IdeaProjects/test-udf/target/udf-1.0.jar");
sparkSession.udf().registerJava("mj_demo_test","com.junbo.udf.TestConcat",DataTypes.StringType);
测试ok,大功告成~
结论:
- sparkContext.addJar并不会将jar包增加到classLoader,具体什么原因,api怎么用,我自己也想不明白,有高手可以留言交流。
本文固定链接:杨晨辉的个人博客 » SparkContext AddJar不生效问题排查过程记录
本站内容除特别标注外均为原创,欢迎转载,但请保留出处!