kafka中堆积大量消息, 一直不消费.
查看报错, 关键信息是:
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1368) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:836) ~[na:na] Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116) ~[kafka-clients-2.5.1.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983) ~[kafka-clients-2.5.1.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510) ~[kafka-clients-2.5.1.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2324) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2319) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2305) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2119) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1104) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE] ... 3 common frames omitted 2021-06-21 11:08:59.025 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-data-statistics-service-19, groupId=data-statistics-service] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group 2021-06-21 11:08:59.025 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-data-statistics-service-19, groupId=data-statistics-service] Lost previously assigned partitions production.creams_building.room-0
从异常: CommitFailedException 可以看出消费完成后提交offset异常, 由于业务代码消费的比较慢, 在consumer拉取到消息之后, 一直没有提交, 超过阈值:
max.poll.interval.ms: 使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。300000
session.timeout.ms: 用于发现消费者故障的超时时间。消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。请注意,该值必须在broker配置的group.min.session.timeout.ms和group.max.session.timeout.ms允许的范围内。 默认10000
max.poll.records: 在单次调用poll()中返回的最大记录数。默认50
最后通过把
max.poll.records设置为50
max.poll.interval.ms=30000
sesstion.timeout.ms=20000