重复消费原因
根本原因:已经消费了数据,但是offset没提交 。
原因1:强行kill线程,导致消费后的数据,offset没有提交
生产者:准备3000条记录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class KafkaDemoMultiProducer { public static void main (String[] args) { produce(); } static void produce () { Properties kafkaProps = new Properties (); kafkaProps.put("bootstrap.servers" , "10.211.55.11:9092" ); kafkaProps.put("key.serializer" , StringSerializer.class.getName()); kafkaProps.put("value.serializer" , StringSerializer.class.getName()); KafkaProducer<String, String> producer = new KafkaProducer <>(kafkaProps); for (int i = 0 ; i < 3000 ; i++) { ProducerRecord<String, String> record = new ProducerRecord <>("demo9" , "good" + i, "java-kafka" + i); try { producer.send(record).get(); } catch (Exception e) { e.printStackTrace(); } } producer.close(); } }
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class KafkaDemoProducer { public static void main (String[] args) throws InterruptedException { consume(); } static void consume () throws InterruptedException { Properties kafkaProps = new Properties (); kafkaProps.put("bootstrap.servers" , "10.211.55.11:9092" ); kafkaProps.put("group.id" , "demo-group9" ); kafkaProps.put("key.deserializer" , StringDeserializer.class.getName()); kafkaProps.put("value.deserializer" , StringDeserializer.class.getName()); kafkaProps.put("enable.auto.commit" , "false" ); kafkaProps.put("max.poll.records" , "300" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(kafkaProps); consumer.subscribe(Collections.singletonList("demo9" )); final int minBatchSize = 400 ; List<ConsumerRecord<String, String>> buffer = new ArrayList <>(); try { while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { buffer.add(record); log.debug("+++++++++++++++topic = {}, partition = {}, offset = {}, key = {}, value = {}" , record.topic(), record.partition(), record.offset(), record.key(), record.value()); } if (buffer.size() >= minBatchSize) { Thread.sleep(3000 ); consumer.commitSync(); buffer.clear(); } } }finally { consumer.close(); } } }
kafkaProps.put("enable.auto.commit", "false");
手动提交offset
kafkaProps.put("max.poll.records", "300");
设置每次拉取300条记录
如上代码,拉取第一次buffer.size=300, 拉取第二次buffer.size=600,第二次数据拉取完后会进行第一次手动提交offset.
当第二次手动提交offset前,停止消费;下次启动消费者,还会消费以前的记录,造成数据重复消费
第一次:
第二次:在准备提交之前,kill掉消费者
再次启动消费者
由此可见:600-1199被重复消费两次
原因2:消费后的数据,当offset还没有提交时,partition就断开连接。 比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间,那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
// TODO 实战
解决办法 如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。
其实还是得结合业务来思考,我这里给几个思路:
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。