侧边栏壁纸
博主头像
gale-blog博主等级

少年一贯快马扬帆,道阻且长不转弯,要盛大,要绚烂,要哗然,要用理想的泰坦尼克号去撞现实冰川,要当烧赤壁的风,而非借箭草船,要为了一片海,就肯翻万山

  • 累计撰写 39 篇文章
  • 累计创建 5 个标签
  • 累计收到 5 条评论

目 录CONTENT

文章目录

Kafka配置

二月在这里
2024-01-27 / 0 评论 / 0 点赞 / 51 阅读 / 4946 字

Kafka配置中的kafka.listener.type 设置为batch 批量拿取 ,导致requestId中的数值拼凑在一起 ,当出现异常消费失败回滚,在下一次会拿取多个值,现在设置为signal,单个拿取

一个请求中看到多个requestID

在kafka里面

Producer : 消息生产者,就是向kafka broker发消息的客户端。 Consumer : 消息消费者,向kafka broker取消息的客户端 Topic : 可以理解为一个队列。 Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。 Broker : 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。 Partition :为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。 Offset: kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可

关于kafka的重复消费的记录

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

在里面显示提高session.timeout的时间或者减少max.poll.records的量 但是在配置里面将这两个的值已经提高了但是,还是有能出现重复消费的问题,后面发现线程里面存在两个线程,这在项目里面存在一个很消耗资源的程序中十分消耗资源,之前的一条消息,还没有被消费完,下一个线程就将消息拉取下来准备消费,导致重复消费还不能识别是否是存在重复消费.

这里讲述一下消费者几个参数

max.poll.records 一次拉取最大消息的数量 默认500

max.poll.interval.ms 指定拉取消息线程最长空闲时间 默认 300s

session.timeout.ms 检测消费者是否失效的超时时间 默认10s

heartbeat.interval.ms 消费者心跳时间 3s

在kafka的配置中没有明确的写出max.poll.interval.ms的参数,但是自己可以配置在consumer.properties自己定义写

max.poll.interval.ms >=session.timeout.ms

session.timeout.ms>=heartbeat.interval.ms

heartbeat.interval.ms不应大于session.timeout.ms的一半也不应小于三分之一

附带地址连接:

https://blog.csdn.net/BHSZZY/article/details/126757295

https://blog.csdn.net/zhanglh046/article/details/72845375

(10条消息) Kafka整体结构图、Consumer与topic关系、Kafka消息分发、Consumer的负载均衡、Kafka文件存储机制、Kafka partition segment等(来自学习资料)kafka+partition和consumer图涂作权的博客的博客-CSDN博客

0

评论区