大叔问题定位分享(2)spark任务一定几率报错java.lang.NoSuchFieldError: HIVE_MOVE_FILES_THREAD_COUNT

最近用 yarn cluster 方式提交 spark 任务时,有时会报错,报错几率是 40%,报错如下:

18/03/15 21:50:36 116 ERROR ApplicationMaster91: User class threw exception: org.apache.spark.sql.AnalysisException: java.lang.NoSuchFieldError: HIVE_MOVE_FILES_THREAD_COUNT;

org.apache.spark.sql.AnalysisException: java.lang.NoSuchFieldError: HIVE_MOVE_FILES_THREAD_COUNT;

         at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)

         at org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:766)

         at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:374)

         at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221)

         at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407)

         at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

         at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)

         at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)

         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

         at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)

         at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)

         at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)

         at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)

         at org.apache.spark.sql.Dataset.<init>(Dataset.scala:185)

         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)

         at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)

         at scala.util.control.Breaks.breakable(Breaks.scala:38)

         at app.package.APPClass$.main(APPClass.scala:177)

         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

         at java.lang.reflect.Method.invoke(Method.java:497)

         at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)

Caused by: java.lang.NoSuchFieldError: HIVE_MOVE_FILES_THREAD_COUNT

         at org.apache.hadoop.hive.ql.metadata.Hive.trashFilesUnderDir(Hive.java:1389)

         at org.apache.hadoop.hive.ql.metadata.Hive.replaceFiles(Hive.java:2873)

         at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1621)

         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

         at java.lang.reflect.Method.invoke(Method.java:497)

         at org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:728)

         at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply$mcV$sp(HiveClientImpl.scala:676)

         at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:676)

         at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadTable$1.apply(HiveClientImpl.scala:676)

         at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)

         at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)

         at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)

         at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)

         at org.apache.spark.sql.hive.client.HiveClientImpl.loadTable(HiveClientImpl.scala:675)

         at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply$mcV$sp(HiveExternalCatalog.scala:768)

         at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:766)

         at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadTable$1.apply(HiveExternalCatalog.scala:766)

         at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)

         ... 25 more

大概流程是 spark sql 在执行 InsertIntoHiveTable 时会调用 loadTable,这个操作最终会通过反射调用 hive 代码的 loadTable 方法

  1. org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407)
  2. org.apache.spark.sql.hive.HiveExternalCatalog.loadTable(HiveExternalCatalog.scala:766)
  3. org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:728)
  4. java.lang.reflect.Method.invoke(Method.java:497)
  5. org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1621)
  6. org.apache.hadoop.hive.ql.metadata.Hive.trashFilesUnderDir(Hive.java:1389)

在第 6 步中报错 java.lang.NoSuchFieldError: HIVE_MOVE_FILES_THREAD_COUNT

 

这个问题通常会认为是 hive-site.xml 缺少配置

  <property>

    <name>hive.mv.files.thread</name>

    <value>15</value>

  </property>

但是查看代码会发现 spark2.1.1 依赖的是 hive1.2.1,在 hive1.2.1 中是没有hive.mv.files.thread这个配置的,这个配置从 hive2 才开始出现,而且报错的类org.apache.hadoop.hive.ql.metadata.Hive在 hive1.2.1 和 hive2 的相关代码完全不同,具体分写如下:

 

在 hive1.2.1 的代码是:(trashFilesUnderDir方法是 FileUtils类的

            if (FileUtils.isSubDir(oldPath, destf, fs2)) {
          FileUtils.trashFilesUnderDir(fs2, oldPath, conf);

        }</span></pre>

在 hive2 的代码是:(trashFilesUnderDir方法是 Hive类的

  private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf)
  </span><span style="color: rgba(0, 0, 255, 1)">throws</span><span style="color: rgba(0, 0, 0, 1)"> IOException {

FileStatus[] statuses </span>=<span style="color: rgba(0, 0, 0, 1)"> fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);

</span><span style="color: rgba(0, 0, 255, 1)">boolean</span> result = <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;

</span><span style="color: rgba(0, 0, 255, 1)">final</span> List&lt;Future&lt;Boolean&gt;&gt; futures = <span style="color: rgba(0, 0, 255, 1)">new</span> LinkedList&lt;&gt;<span style="color: rgba(0, 0, 0, 1)">();

</span><span style="color: rgba(0, 0, 255, 1)">final</span> ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) &gt; 0 ?<span style="color: rgba(0, 0, 0, 1)">

    Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, </span>25<span style="color: rgba(0, 0, 0, 1)">),

    </span><span style="color: rgba(0, 0, 255, 1)">new</span> ThreadFactoryBuilder().setDaemon(<span style="color: rgba(0, 0, 255, 1)">true</span>).setNameFormat("Delete-Thread-%d").build()) : <span style="color: rgba(0, 0, 255, 1)">null</span>;</pre>

 

所以第 6 步的报错,执行的应该是 hive2 的代码,所以猜测问题可能是:

1)由于 jar 包污染,启动 jvm 的时候 classpath 里同时有 hive1 和 hive2 的 jar,有时加载类用到 hive1 的 jar(可以正常运行),有时用到 hive2 的 jar(会报错);

2)集群服务器环境配置差异,有的服务器 classpath 中没有 hive2 的 jar(可以正常运行),有的服务器 classpath 有 hive2 的 jar(可能报错);

 

对比正常和报错的服务器的环境配置以及启动命令发现都是一样的,没有发现 hive2 的 jar,

通过在启动任务时增加 -verbose:class,发现正常和报错的情况下,Hive 类都是从 Hive1.2.1 的 jar 加载出来的,

[Loaded org.apache.hadoop.hive.ql.metadata.Hive from file:/export/Data/tmp/hadoop-tmp/nm-local-dir/filecache/98/hive-exec-1.2.1.spark2.jar]

否定了上边的两种猜测;

 

分析提交任务命令发现,用到了spark.yarn.jars,避免每次都上传 spark 的 jar,这些 jar 会被作为 filecache 缓存在yarn.nodemanager.local-dirs下,

反编译正常和报错服务器上 filecache 里的hive-exec-1.2.1.spark2.jar最终发现问题,

正常服务器上 Hive 类代码是:

    if (FileUtils.isSubDir(oldPath, destf, fs2))
    FileUtils.trashFilesUnderDir(fs2, oldPath, conf);</span></pre>

报错服务器上的 Hive 类代码是:

    private static boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf) throws IOException {
    FileStatus[] statuses </span>=<span style="color: rgba(0, 0, 0, 1)"> fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);

    </span><span style="color: rgba(0, 0, 255, 1)">boolean</span> result = <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">;

    List</span>&lt;Future&lt;Boolean&gt;&gt; futures = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> LinkedList();

    ExecutorService pool </span>= conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) &gt; 0 ? Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), (<span style="color: rgba(0, 0, 255, 1)">new</span> ThreadFactoryBuilder()).setDaemon(<span style="color: rgba(0, 0, 255, 1)">true</span>).setNameFormat("Delete-Thread-%d").build()) : <span style="color: rgba(0, 0, 255, 1)">null</span>;</pre>

报错服务器上的 Hive 类用到ConfVars.HIVE_MOVE_FILES_THREAD_COUNT,但是在 hive-common-1.2.1.jar 中的 ConfVars 不存在这个属性,所以报错java.lang.NoSuchFieldError

所以问题应该是 hdfs 上的hive-exec-1.2.1.spark2.jar一开始是对的,然后所有 nodemanager 下载到本地作为 filecache,后来这个 jar 被改错了(使用 hive2 编译 spark),然后新加的 nodemanager 会下载有问题的 jar 作为 filecache,这样结果就是有的服务器执行正常,有的服务器执行报错;

 

yarn 中的 filecache 清理有两个配置

yarn.nodemanager.localizer.cache.cleanup.interval-ms:600000  Interval in between cache cleanups.

yarn.nodemanager.localizer.cache.target-size-mb:10240    Target size of localizer cache in MB, per local directory.

每隔 cleanup.interval-ms 会检查本地 filecache 大小是否超过 target-size-mb,超过才清理,不超过就一直使用 filecache;