广州某客户kafka flush异常解决

魔镜问题汇总 yangchenhui 2399℃ 0评论
问题描述

最近kafka总是报flush异常,异常大概如下,用户反馈最近两周总是有数据丢失情况,每次都需要手工进行重新同步全量数据。

Caused by: java.io.FileNotFoundException: File does not exist: /guoyundata1/kafka/ds1140_sc_order/target/data-1570755600657
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1931)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:738)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:426)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:524)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1025)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:876)
at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:822)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2682)

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
魔镜kafka增量更新逻辑
  1. kafka->KafkaConsumerListener消费->hdfs(/guoyundata1/表名/随机数文件名)
  2. 一旦接收到数据,会在hdfs上对应的表的文件夹建立一个随机文件名的文件,然后再zookeeper上注册。
  3. 一旦重启或者手工flush时,会将zookeeper上的节点删掉,同时注册的监听事件会close掉当前文件,然后重新创建一个随机文件名的文件继续存储。
  4. flush时,会先将该文件目录下的所有文件移动到target目录,然后将target目录的数据加载成dataset,然后通过pk/时间进行去重,最后将新的数据集和旧的数据集进行合并,加载旧数据集时会只查询需要用到的partition,减少数据加载量。
解决思路
  1. 首先仔细查看堆栈详情,看看魔镜到底是哪里抛出来的异常,找到
    at com.mj.dsl.data.engine.spark.SparkCommonEngine.hdfsInsertOverWriteData(SparkCommonEngine.java:1155)
  2. 对照代码发现是在第一步加载hdfs新数据集(target)目录的时候就报错了,那就会有两种可能,一是在创建dataset的时候传入的文件名有问题(传入了旧的文件名),二是可能文件名是正确的,但是加载前被删除掉了,按照这两种思路继续排查代码。
    1. 第一种情况排查发现基本没有问题。
    2. 第二种排查,直接查看这张表的更新记录,发现更新记录有异常,都是29分执行一次,30分执行一次,这就不对了,说明定时任务不正常,有并发执行的可能性,查看代码,如果第一次更新时间稍微长一点,就会并发执行,那极限情况下,第一次更新把target目录删除的时候就可能影响第二次更新了。
    3. 然后继续排查定时任务,发现该表有2次定时任务在跑,一个是ds_order[1130],一个是**订单表[1130],回忆之前的升级记录,如果表被改名,并且修改过增量配置,则可能产生这种情况。
    4. 删除其中一个定时任务,然后观察一段时间试试。
  3. 观察了几天,没有再出现这个问题,至此,问题解决。

本文固定链接:杨晨辉的个人博客 » 广州某客户kafka flush异常解决
本站内容除特别标注外均为原创,欢迎转载,但请保留出处!

喜欢 (2)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址