Johuer's Blog

多学知识,精简代码

0%

kafka重复消费

重复消费原因

根本原因:已经消费了数据,但是offset没提交

原因1:强行kill线程,导致消费后的数据,offset没有提交

生产者:准备3000条记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class KafkaDemoMultiProducer {

public static void main(String[] args) {
produce();
}

static void produce() {
Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers", "10.211.55.11:9092");
kafkaProps.put("key.serializer", StringSerializer.class.getName());
kafkaProps.put("value.serializer", StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);

for (int i = 0; i < 3000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("demo9", "good" + i, "java-kafka" + i);

try {
producer.send(record).get();

} catch (Exception e) {
e.printStackTrace();
}
}

producer.close();
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class KafkaDemoProducer {


public static void main(String[] args) throws InterruptedException {
consume();
}

static void consume() throws InterruptedException {
// 1.创建消费者
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "10.211.55.11:9092");
kafkaProps.put("group.id", "demo-group9");
kafkaProps.put("key.deserializer", StringDeserializer.class.getName());
kafkaProps.put("value.deserializer", StringDeserializer.class.getName());
kafkaProps.put("enable.auto.commit", "false");
kafkaProps.put("max.poll.records", "300");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);

// 2.订阅主题
consumer.subscribe(Collections.singletonList("demo9"));

final int minBatchSize = 400;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
// 3.轮询
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
log.debug("+++++++++++++++topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
if (buffer.size() >= minBatchSize) {
// doSomething(buffer).....
Thread.sleep(3000);
consumer.commitSync();
buffer.clear();
}
}
}finally {
consumer.close();
}
}
}

kafkaProps.put("enable.auto.commit", "false"); 手动提交offset

kafkaProps.put("max.poll.records", "300"); 设置每次拉取300条记录

如上代码,拉取第一次buffer.size=300, 拉取第二次buffer.size=600,第二次数据拉取完后会进行第一次手动提交offset.

当第二次手动提交offset前,停止消费;下次启动消费者,还会消费以前的记录,造成数据重复消费

第一次:

logo

第二次:在准备提交之前,kill掉消费者

logo

再次启动消费者

logo

由此可见:600-1199被重复消费两次

原因2:消费后的数据,当offset还没有提交时,partition就断开连接。

比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间,那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

// TODO 实战

解决办法

如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

其实还是得结合业务来思考,我这里给几个思路:

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。