SparkDataset DropDuplicate去重无法保证取最新数据问题

技术总结 yangchenhui 5412℃ 1评论

问题描述

还是之前kafka消费数据到hive中丢失数据的问题,为了更进一步定位问题,我先将flush之后的临时文件存储到一个临时目录,然后针对丢失的数据从临时文件中查找是否存在,结果文件中数据是全的。

解决思路

仔细检查代码,发现后面可能引起丢失数据的过程就剩一个去重,业务逻辑是:这里消费的是mysqlbinlog推送过来的数据,对于同一个订单,数据可能有不同的状态之前变化,也就是说同一个主键的数据会推送过来N条数据,我们只需要拿最新的一条存储就可以了,其他的中间状态可以丢弃掉。

去重的代码

fileDataSet.orderBy(fileDataSet.col("pt_datetime").desc()).dropDuplicates(pkList.toArray(new String[pkList.size()]))

查询资料,这是spark官方提供去重方法,理论上不会有什么问题,百度上搜相关关键字”spark dataset dropDuplicates 丢数据“发现也没什么有用线索,再回想错误场景,有一个细节”数据的总量一直都是对的,只是个别数据的状态不对“,结合文件中数据是全的,那是不是说,其实不是丢数据,是取最新状态这个过程出错了,然后谷歌 ”spark dataset dropDuplicates keep firset“,果然找到stackoverflow上有相关问题。

https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first

那就验证一下吧,验证分为两步:

  1. 验证大神的结论。
  2. 修改之后线上验证。(目前跑了一两天,数据基本正常了)

准备数据

List<String> jsonString = new ArrayList<>();
jsonString.add("{\"dateStr\":\"2019-01-01\",\"columnVal\":0}");
jsonString.add("{\"dateStr\":\"2019-01-01\",\"columnVal\":1}");
jsonString.add("{\"dateStr\":\"2019-01-01\",\"columnVal\":2}");
jsonString.add("{\"dateStr\":\"2019-01-01\",\"columnVal\":3}");
jsonString.add("{\"dateStr\":\"2019-01-01\",\"columnVal\":4}");

jsonString.add("{\"dateStr\":\"2019-02-01\",\"columnVal\":0}");
jsonString.add("{\"dateStr\":\"2019-02-01\",\"columnVal\":1}");
jsonString.add("{\"dateStr\":\"2019-02-01\",\"columnVal\":2}");
jsonString.add("{\"dateStr\":\"2019-02-01\",\"columnVal\":3}");
jsonString.add("{\"dateStr\":\"2019-02-01\",\"columnVal\":4}");

jsonString.add("{\"dateStr\":\"2019-03-01\",\"columnVal\":0}");
jsonString.add("{\"dateStr\":\"2019-03-01\",\"columnVal\":1}");
jsonString.add("{\"dateStr\":\"2019-03-01\",\"columnVal\":2}");
jsonString.add("{\"dateStr\":\"2019-03-01\",\"columnVal\":3}");
jsonString.add("{\"dateStr\":\"2019-03-01\",\"columnVal\":4}");

Dataset dataset = sparkSession.read().json(sparkSession.createDataset(jsonString,Encoders.STRING()));

dataset.show();

结果

+---------+----------+
|columnVal|   dateStr|
+---------+----------+
|        0|2019-01-01|
|        1|2019-01-01|
|        2|2019-01-01|
|        3|2019-01-01|
|        4|2019-01-01|
|        0|2019-02-01|
|        1|2019-02-01|
|        2|2019-02-01|
|        3|2019-02-01|
|        4|2019-02-01|
|        0|2019-03-01|
|        1|2019-03-01|
|        2|2019-03-01|
|        3|2019-03-01|
|        4|2019-03-01|
+---------+----------+

先看一下直接使用的效果

dataset.orderBy("dateStr").dropDuplicates("columnVal").show();

结果

+---------+----------+
|columnVal|   dateStr|
+---------+----------+
|        4|2019-01-01|
|        0|2019-01-01|
|        3|2019-01-01|
|        2|2019-01-01|
|        1|2019-01-01|
+---------+----------+

貌似数据是正确的,跟大神的说法不同,回想一下,因为自己用的是local模式,partition默认为1 ,是不是这个原因? 手工将partition设置为3试一下

dataset.orderBy("dateStr").repartition(3).dropDuplicates("columnVal").show();

结果果然开始出问题了!!!

+---------+----------+
|columnVal|   dateStr|
+---------+----------+
|        4|2019-03-01|
|        0|2019-02-01|
|        3|2019-01-01|
|        2|2019-02-01|
|        1|2019-03-01|
+---------+----------+

同时在改为partition 1尝试
dataset.orderBy("dateStr").repartition(1).dropDuplicates("columnVal").show();

结果:又正常了。

+---------+----------+
|columnVal|   dateStr|
+---------+----------+
|        4|2019-01-01|
|        0|2019-01-01|
|        3|2019-01-01|
|        2|2019-01-01|
|        1|2019-01-01|
+---------+----------+

结论

大神的实验是正确的,在dataset计算是partition数量大于1的时候,先order by在dropDuplicates是没办法保证从排序结果里面取第一条的,实际是先分区内排序,然后随机取一条!!!如果想用这种方式,必须加上repatition(1) 或者 coalesce(1) 保证现在一个分区内排序才能保证结果正确!!

但是在一个分区内排序,如果dataset比较大,会不会有性能问题也是需要考虑的点,知道有这个坑,那肯定还可以找到其他更好的方案来代替repatition(1)来提高性能,只是我们目前业务这样做基本可以了

参考文档:https://stackoverflow.com/questions/38687212/spark-dataframe-drop-duplicates-and-keep-first


本文固定链接:杨晨辉的个人博客 » SparkDataset DropDuplicate去重无法保证取最新数据问题
本站内容除特别标注外均为原创,欢迎转载,但请保留出处!

喜欢 (4)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
(1)个小伙伴在吐槽
  1. 博主V587。这都被你找到了。看来是折磨你很久了。
    ta2019-11-13 09:37回复