1.【kafka源码】Topic的码修创建源码分析(附视频)
2.sarama 源码解析--Kafka的重平衡
3.Kafka消费者源码:重平衡(1)-初始化与FIND_COORDINATOR
4.源码解析kafka删除topic
5.部署Kafka监控
6.Kafka Logcleaner源码分析
【kafka源码】Topic的创建源码分析(附视频)
关于Kafka Topic创建的源码分析,可以从kafka-topic.sh脚本的码修入口开始,它执行了kafka.admin.TopicCommand类。码修在创建Topic时,码修主要涉及AdminClientTopicService对象的码修创建和AdminClientClient创建Topics方法的调用,其中Controller负责处理客户端的码修尊云目录源码CreateTopics请求。
服务端的码修处理逻辑在KafkaRequestHandler.run()方法中,通过apis.handle(request)调用对应接口,码修如KafkaApis.handleCreateTopicsRequest,码修这个方法会触发adminManager.createTopics(),码修创建主题并监控其完成状态。码修创建的码修Topic配置和分区副本信息会被写入Zookeeper,如Topic配置和Topic的码修分区副本分配。
当Controller监听到/brokers/topics/Topic名称的码修变更后,会触发Broker在磁盘上创建相关Log文件。码修如果Controller在创建过程中失败,如Controller挂掉,待重新选举后,创建过程会继续,直到Log文件被创建并同步到zk中。
创建Topic时,zk上会创建特定节点,包括主题配置和分区信息。手动添加或删除/brokers/topics/节点将影响Topic的创建和管理。完整参数可通过sh bin/kafka-topic -help查看。
sarama 源码解析--Kafka的重平衡
重平衡操作
重平衡是动态调整Consumer Group下的Consumer订阅Topic的分区的一个关键操作。Sarama中的画质增强源码BalanceStrategyRange和BalanceStrategySticky策略具体实施这一操作。
重平衡触发条件之一是成员数变更。这一过程包括以下步骤:
1. 启动一个新的消费者实例。
2. 调用Consume方法。
3. Consume方法初始化连接信息,并启动一个goroutine。程序会阻塞在sess.ctx.Done()上。
4. 在newSession方法中找到协调者信息,并发起join请求和syncgroup请求。Consumer Leader执行一次重平衡。
5. 创建consumer group session,并初始化offset manager和开启心跳goroutine。
6. 当心跳超时或收到coordinator的重平衡通知时,调用cancel()方法取消操作,退出Consume逻辑。
7. 此时,Consume函数优雅退出。由于外层循环的存在,会重新执行Consume,实现一次重平衡。
另一个触发重平衡的条件是订阅主题分区数发生变更。这一过程如下:
1. 在Consume方法中开启心跳goroutine,并将consumer group session传递给它。
2. 分区数发生变化时,调用sess.cancel(),Consume优雅退出并重新执行,实现重平衡。智能网站源码
Kafka消费者源码:重平衡(1)-初始化与FIND_COORDINATOR
在Kafka 2.5.2的消费者组中,重平衡是关键,它定义了消费者如何根据订阅关系调整对Topic分区的分配。当消费者数量、订阅的Topic或GroupCoordinator所在的Broker发生变更时,会触发重平衡。
消费者组状态由GroupState类管理,共有五个状态:Empty(无成员)、PreparingRebalance(加入中)、CompletingRebalance(等待分配)、Stable(已平衡)和Dead(元数据已删除)。状态间的转换基于预先定义的前置状态。例如,从Empty到PreparingRebalance,预示着重平衡的开始。
重平衡过程分为几个步骤,首先是消费者和Broker之间的协调。服务端启动时,GroupCoordinator组件即已就绪,而Consumer通过ConsumerCoordinator与之通信。在启动时,消费者首先会通过FindCoordinatorRequest找到GroupCoordinator,通过最小负载节点发送请求,然后服务端确定哪个Broker负责协调,如groupId的hash值对consumer_offsets分区数取模确定。
一旦找到GroupCoordinator,影视ios源码消费者会发送JoinGroupRequest。后续步骤如SYNC_GROUP和HEARTBEAT确保消费者与协调器保持同步。这部分详细内容在后续的文章中会进一步探讨。
源码解析kafka删除topic
本文以kafka0.8.2.2为例,解析如何删除一个topic以及其背后的关键技术和源码实现过程。
删除一个topic涉及两个关键点:配置删除参数以及执行删除操作。
首先,配置参数`delete.topic.enable`为`True`,这是Broker级别的配置,用于指示kafka是否允许执行topic删除操作。
其次,执行命令`bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name`,此命令指示kafka删除指定的topic。
若未配置`delete.topic.enable`为`True`,topic仅被标记为删除状态,而非立即清除。此时,通常的做法是手动删除Zookeeper中的topic信息和日志,但这仅会清除Zookeeper的数据,并不会真正清除kafkaBroker内存中的topic数据。因此,最佳做法是配置`delete.topic.enable`为`True`,然后重启kafka。
接下来,我们介绍几个关键类和它们在删除topic过程中的作用。
1. **PartitionStateMachine**:该类代表分区的广告点播源码状态机,决定分区的当前状态及其转移。状态包括:NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition。
2. **ReplicaManager**:负责管理当前机器的所有副本,处理读写、删除等具体操作。读写操作流程包括获取partition对象,再获取Replica对象,接着获取Log对象,并通过其管理的Segment对象将数据写入、读出。
3. **ReplicaStateMachine**:副本的状态机,决定副本的当前状态和状态之间的转移。状态包括:NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionStarted、ReplicaDeletionSuccessful、ReplicaDeletionIneligible、NonExistentReplica。
4. **TopicDeletionManager**:管理topic删除的状态机,包括发布删除命令、监听并开始删除topic、以及执行删除操作。
在删除topic的过程中,分为四个阶段:客户端执行删除命令、未配置`delete.topic.enable`的流水、配置了`delete.topic.enable`的流水、以及手动删除Zookeeper上topic信息和磁盘数据。
客户端执行删除命令时,会在"/admin/delete_topics"目录下创建topicName节点。
未配置`delete.topic.enable`时,topic删除流程涉及监听topic删除命令、判断`delete.topic.enable`状态、标记topic为不可删除、以及队列删除topic任务。
配置了`delete.topic.enable`时,额外步骤包括停止删除topic、检查特定条件、更新删除topic集合、激活删除线程、执行删除操作,如解除分区变动监听、清除内存数据结构、删除副本数据、删除Zookeeper节点信息等。
关于手动删除Zookeeper上topic信息和磁盘数据,通常做法是删除Zookeeper的topic相关信息及磁盘数据,但这可能导致部分内存数据未清除。是否会有隐患,需要进一步测试。
总结而言,kafka的topic删除流程基于Zookeeper实现,通过配置参数、执行命令、管理状态机以及清理相关数据,以实现topic的有序删除。正确配置`delete.topic.enable`并执行删除操作是确保topic完全清除的关键步骤。
部署Kafka监控
在Kafka部署过程中,监控系统的设置至关重要。本文将简述搭建Kafka监控的实践经验,包括所选工具和环境配置步骤。
首先,确保Kafka实例在本地部署了三个实例,未使用Docker。监控方案选择了kafka_exporter、Prometheus和Grafana组合,详细选择理由可自行查阅网络资源。kafka_exporter在本地编译部署,因遇到go环境不匹配问题,最终选择源码编译,通过git克隆v1.7.0版本,设置goproxy以获取依赖库。编译过程中,对`go mod vendor`指令进行了修改,成功编译出kafka_exporter可执行文件,并针对多个Kafka实例制定了启动命令。
同时,为了监控系统负载,部署了node-exporter在Docker中,确保其固定IP以方便Prometheus的配置。node-exporter的IP设为..0.2,端口为。
接下来是Prometheus的部署。首先通过Docker拉取prom/prometheus镜像,配置文件中包含了Prometheus自身、node-exporter(.网段)和kafka_exporter(..0.1)的采集项。使用命令`docker run`启动Prometheus,监听端口,与node-exporter和kafka_exporter通信。
Grafana的安装则在另一个目录B中进行,设置了读写权限后通过Docker拉取grafana/grafana镜像。部署时,Grafana容器的IP设为..0.4,监听端口。登录Grafana后,首先添加DataSource,指向Prometheus实例,然后导入官网提供的Linux系统模板(如、),Kafka监控模板(如),以及Prometheus模板()以设置Dashboard。
总结,通过这些步骤,成功搭建了Kafka的监控系统,包括本地部署的kafka_exporter、Docker中的node-exporter和Prometheus,以及Grafana用于可视化监控数据。
Kafka Logcleaner源码分析
Kafka日志保留策略包括按时间/大小和compact两种。Logcleaner遵循compact策略清理日志,只保留最新的消息,当多个消息具有相同key时,只保留最新的一个。
每个日志由两部分组成:clean和dirty。dirty部分可以进一步划分为cleanable和uncleanable。uncleanable部分不允许清理,包括活跃段和未达到compact延迟时间的段。
清理过程由后台线程定期执行,选择最脏的日志进行清理,脏度由dirty部分字节数与总字节数的比例决定。清理前,Logcleaner构建一个key->last_offset映射,包含dirty部分的所有消息。清理后,日志文件过滤掉过期消息,并合并较小的连续段为较大文件。
payload为null的消息被Logcleaner删除,这类消息在topic配置的时间内保留,然后被清理。清理过程需与幂等性和事务性生产者兼容,保留活跃生产者最后一批消息,直到产生新消息或生产者不活跃。只清理提交或终止事物中的消息,未提交事物中的消息不清理。
Logcleaner通过cleanOrSleep方法启动清理,选择最脏日志,调用clean清理并合并段。在清理前计算tombstone的移除时间,确保在clean部分驻留一定时间后移除。清理过程包括构建offset映射,分组段文件并清理合并。
Logcleaner的清理逻辑确保了高效和一致的日志管理,助力Kafka系统稳定运行。