问题产生背景
kafka消费之后存储到hdfs,在持续消费过程中,hdfs重启了,导致kafka消费线程中的hdfs租约未被释放。
问题日志
ava.lang.RuntimeException: 移动文件夹失败!
at com.mj.dsl.dataSource.writer.HDFSDataWriter.moveFileToDir(HDFSDataWriter.java:115)
at com.mj.dsl.task.KafkaRecordFlushJob.hdfsUpdate(KafkaRecordFlushJob.java:228)
at com.mj.dsl.task.KafkaRecordFlushJob.execute(KafkaRecordFlushJob.java:180)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
Caused by: org.apache.hadoop.hdfs.CannotObtainBlockLengthException: Cannot obtain block length for LocatedBlock{BP-102182688-192.168.2.54-1559041234013:blk_1467676073_393943259; getBlockSize()=4300736; corrupt=false; offset=0; locs=[DatanodeInfoWithStorage[192.168.4.161:50010,DS-5fa6f2f4-98b6-4f42-9c2b-2f5c1c1a53b4,DISK], DatanodeInfoWithStorage[192.168.4.54:50010,DS-ccab6327-485d-409f-a6f4-55f0a4fd7447,DISK], DatanodeInfoWithStorage[192.168.4.52:50010,DS-f444081c-c549-4fb7-9e80-12f83a451ee3,DISK]]}
at org.apache.hadoop.hdfs.DFSInputStream.readBlockLength(DFSInputStream.java:360)
at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:267)
at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:198)
at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:182)
at org.apache.hadoop.hdfs.DFSClient.openInternal(DFSClient.java:1036)
at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:999)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:326)
at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:334)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:950)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:413)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:387)
at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:337)
at com.mj.dsl.dataSource.writer.HDFSDataWriter.moveFileToDir(HDFSDataWriter.java:111)
问题解决思路
这里就出现了 hdfs 的租约未被释放的问题,租约就是 在HDFS中,当每次客户端用户往某个文件中写入数据的时候,为了保持数据的一致性,此时其它客户端程序是不允许向此文件同时写入数据的,租约的信息是存在 namenode 中的,也就是说当 hdfs 系统被关闭时,flume 还在继续写该文件,同时也会报错,文件还是处于打开状态。
所以我们要解决这个问题就需要释放租约。
恢复租约的方式
首先先查看有哪些文件是租约没有释放的
hadoop fsck /hafs/path -openforwrite
然后执行
hdfs debug recoverLease -path
释放租约
如果未被释放租约的文件太多的话,可以执行批量释放操作
hadoop fsck /hafs/path -openforwrite | egrep -v '^.+$' | egrep "MISSING|OPENFORWRITE" | grep -o "/[^ ]*" | sed -e "s/:$//" | xargs -i hdfs debug recoverLease -path {}
总结
该问题的出现主要是下游关闭时上游还在写,导致租约没有释放掉。在以后的升级或者重启 hdfs 时,需要提前先把 flume 或者其他上游写 hdfs 操作停止后,在执行 hdfs 系统的操作,才可以避免该问题的出现。
内容基本全部转载,因为已经非常详细:https://hacpai.com/article/1556626214725
本文固定链接:杨晨辉的个人博客 » 【魔镜】kafka增量更新失败 Cannot obtain block length for LocatedBlock
本站内容除特别标注外均为原创,欢迎转载,但请保留出处!