Kafka动态配置实现原理解析

使用Bundle在Activity之间交换数据

问题导读

Apache Kafka在全球各个领域各大公司获得广泛使用,得益于它强大的功能和不断完善的生态。其中Kafka动态配置是一个比较高频好用的功能,下面我们就来一探究竟。

  • 动态配置是如何设计的?
  • 动态配置优先级是怎样的?
  • Broker初始化是如何读取配置的?
  • 动态配置支持哪些特性功能?
  • 动态配置如何使用呢?

前言介绍

 Kafka初始开源的几个版本,当broker初始化启动时,所有配置信息只能从server.properties静态文件读取,以后不再发生任何更改,随着Kafka逐步迭代,在线业务对稳定性和个性化要求越来越突出,需要能支持在线修改功能动态生效的需求应运而生。例如:按照topic维度清理数据,依据clientid限流,根据用户名称进行访问权限控制等等。目前Kafka最新版本支持以下几类动态配置。

 Kafka动态配置实现原理解析

发展历程

Kafka动态配置实现原理解析

动态配置文件发展历程:

kafka在0.8.0对topic的管理功能分布在三个shell中,它们分别是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh,后来社区考虑到topic管理功能过于分散,到了0.8.1版本有关topic所有功能收敛到kafka-topics.sh中。0.8.0中只有topic的创建、删除和列表及添加分区功能,到了0.8.1开始支持topics动态配置了。 

0.9.0.0开始支持client(producer和consumer)客户端配额限流支持,确保不因为某个或少数几个topic的客户端占满了broker带宽资源和磁盘IO资源,影响其他客户端的正常读写,导致集群内主从同步也受到影响。这个功能对确保系统SLA大有好处,通过服务降级,保证写/生产不受影响,降低或暂停读/消费流量更容易解决系统资源瓶颈。

0.9.0版本动态配置与topic管理分离,为了保持向下兼容kafka-topics.sh依然包含操作topic动态配置功能,新增kafka-configs.sh支持clients和topics动态配置功能,所以kafka-topics.sh和kafka-configs.sh任意一个都可以修改topic动态配置

0.10.1.0版本新增支持users和brokers动态配置功能,user动态配置用于访问资源的权限控制,提升集群的访问和数据安全性,例如:用户对读/写/创建/删除等操作和API、topic、group资源访问控制。broker动态配置,在不用重启及影响服务运行情况下,broker级别功能实现动态生效,例如:副本注册复制速率、磁盘内挂载点间数据迁移速率、网络请求的线程数、处理请求的I/O线程数等等全局参数等等。

0.10.1.0~2.3.1版本都支持topics、clients、users、brokers四类型动态配置的11种粒度配置对象,只是配置模块和属性字段有增减与调整。

动态配置设计原理

Kafka动态配置实现原理解析

用户使用kafka-configs.sh脚本,根据格式和参数规范要求,ConfigCommand类进行相关逻辑处理、json格式和内容校验,生成notification json,写入到序列化持久节点上,zk路径为xxx/config/changes/config_change_seqNo,节点名称为config_change_seqNo,其中seqNo从1开始的自增序号。kafka集群中所有broker通过监听zk上xxx/config/changes的children变化,每次获得比当前内存中last_seqNo大的seqNo的json内容,从中读取entity_type/entity_name相对路径,由此判断如何从xxx/config/topics|clients|users|brokers四种类型中读取哪个配置路径。同一个Broker在操作过程中任何时刻只能串行读写一种类型的配置,多种配置需要串行操作。

各个角色的作用:

kafka-config.sh: 负责写dynamic config和notification,写顺序上图有先后标识。

broker:负责监听xxx/config/changes子节点变化和读取entity_type/entity_name路径节点上内容

zk:负责存储notification和dynamic config及下发配置给相应的broker

notification json内容:

V1 0.10.0.1及以前版本有效

{
    "version": 1,
    "entity_type": "topics",
    "entity_name":  "finalTest"
}

V2 0.10.1.0~2.3.1 当前最新版本都有效 

以上不管是version 1还是version2,本质上没有变化。都是通过entity_type/entity_name获得entity_path的zk相对路径,全路径为xxx/config/entityType/entityName,具体请看如下详图

Kafka动态配置实现原理解析

entity_type=topics | clients | users | brokers

entity_name=topicName | clientId | userId | (brokerId | <default>)

当entity_type为brokers时,brokerId为broker编号与自己的server.properties对应,只对某个broker生效。“<default>”指对所有broker生效。而entity_type为topics | clients | users对所有broker都生效。通过以上entity_type/entity_name六种组合成六个zk相对路径。

topics和clients组合原理一样,但users和brokers却略有不同,他们各自有2个组合,除了普通组合还有复合组合,两种类型组合在一起,例如users有users与clients组合,zk路径为users/<user>/clients/<clientId>;brokers动态配置非常实用,不需要重启就能动态更改任意数量brokers配置,更改所有brokers为xxx/brokers/<default>

四类动态配置11种zk相对路径,根据11种zk相对路径可以读取11种粒度配置对象dynamic config。

<default>说明:某种类型下所有作用域生效,例如xxx/clients/<default>和xxx/brokers/<default>就是集群内所有All clients和集群内所有All brokers配置都会生效,其他同理。

dynamic config内容示例:

entity_type/entity_name=topics/<topic_name>=topics/finalTest

Kafka动态配置实现原理解析

hadoop-2.10.0安装hive-2.3.6,centos7搭建hadoop2.10高可用(HA)

{
    "version": 1,
    "config": {
        "retention.bytes": "102400000",
        "flush.ms": "5000",
        "cleanup.policy": "compact",
        "flush.messages",
        ...
    }
}

entity_type/entity_name=clients/<clientId>=clients/camusall

Kafka动态配置实现原理解析

{
    "version": 1,
    "config": {
        "producer_byte_rate": "20971520",
        "consumer_byte_rate": "20971520"
    }
}

entity_type/entity_name=brokers/all brokers=brokers/<default>

Kafka动态配置实现原理解析

{
    "version": 1,
    "config": {
        "leader.replication.throttled.rate": "5000",
        "follower.replication.throttled.rate": "60000",
        "replica.alter.log.dirs.io.max.bytes.per.second": "5000",
        "log.retention.hours": "24",
        "log.flush.interval.messages": "5000",
        "min.insync.replicas": "2",
        ...
    }
}

entity_type/entity_name=brokers/brokerId     与all brokers的配置形成完全一样,只是作用域范围不同而已,此处省略。

写配置格式校验

如果写入配置不进行规范校验,broker就会读取处理过程中,就会卡住或阻塞,影响服务运行稳定性。所以配置校验至关重要,校验规则如下:

  • 配置参数格式必须合法,否则报错不予接收
  • 输入配置项进行校验,输入参数必须是kafka包含的配置项,否则过滤掉

config_change_seqNo生成规则

Kafka动态配置实现原理解析

Kafka动态配置实现原理解析

 kafka-configs.sh脚本每成功执行一次,在zk上就创建一个新的seqNode节点(即/xxx/config/changes/config_change_seqNo),seqNode是zk的持久顺序节点(PERSISTENT_SEQUENTIAL),它的组成是seqNode = seqNodePrefix + seqNodeSuffix,config_change_固定为seqNode的前缀,seqNodeSuffix = seqNo为seqNode的后缀,seqNo是10位数字的序列号,这个序列号后缀是自增的,由zk服务端自动生成和维护,每次事务请求成功就加1,它与MySQL的自增id原理一样。

config_change_seqno清除规则 

Kafka动态配置实现原理解析

集群经过长期运行积累,xxx/config/changes下会留存大量历史节点,如果不及时清理,会有以下影响:

  • 大量无用的seqNode进行传输,会增加网络带宽负担
  • 占用zk服务端内存及存储资源
  • Kafka会做大量无效判断和计算

综上所述,因此必须要及时清除无用的seqNode集合,清除公式步骤如下:

  • 当broker监听到notification变化回调时,记录系统时间。
  • 获取xxx/config/changes下所有子节点,读取每个seqNode的创建时间
  • 系统时间减去seqNode创建时间,如果时间差值大于过期时间,即changeExpirationMs,就会被删除
  • changeExpirationMs默认为15分钟,可由broker配置。

config_change_seqno判断处理

Kafka动态配置实现原理解析

Kafka动态配置实现原理解析

前面提到每当触发回调处理,seqNode节点创建时间过期15分钟会被删除,删除条件是触发才会被执行,如果长时间不创建就可能有少数几个seqNode一直保留。如果短时间内(15分钟内)创建大量seqNode,又不会立即被删除,只有等到下次触发达到条件才行,那怎么判断哪些会被处理呢?broker缓存中维护一个变量lastExecutedSeqNo,它负责保存执行历史中seqNode最大顺序号,所以每当触发回调获取seqNodeSet列表时,都能轻易判断出哪些需要处理计算,也会同步更新lastExecutedSeqNo。

notification作用

  • 通知broker有新的动态配置产生,读取相应的动态配置
  • 不用监听大量四种类型配置下子节点,每个broker只需要监听一个notification节点即可,高效且性能也高
  • 大大减少broker监听数量,如果像controller监听/xxx/partitions/[0-N]/state一样,监听数量就是四种类型配置zk路径乘以broker数量了

静态与动态的配置及优先级

Kafka动态配置实现原理解析

 

 Kafka动态配置实现原理解析

静态与动态区别。静态配置是Broker内置默认配置和静态配置文件server.properties,broker启动前可以任意修改,启动后不可修改。动态配置是broker启动运行后,可以在线更新生效,偷偷说一句离线也可以改,就是不生效而已。

配置优先级以上4个图包含4类型配置既有动态也有静态,那优先级如何呢?动态配置优先级高于静态配置。如上图1、2、4,环越小优先级越高,对于动态配置来说,修改配置作用域范围越小优先级越高,反之亦然。优先级最高的,会逐级覆盖相同配置项。

当broker启动时读取顺序依次为broker内置默认配置,broker静态配置文件,动态配置。当配置项相同时,高优先级覆盖次优先的,其他依次类推

图示说明上图1、2、3、4中,其中图1、2、4中环数字表示配置优先级关系,数字从1~5表示优先级从高到低。图3为两级关联。Users和Clients组合实现配置管理,这两者组合用于客户端配额限流,Users与Clients就像两级目录一样,一个User可以包含一个、多个clientId或所有clientId。图4中既有优先级关系也有配置参数包含关系,topics类型配置是brokers类型配置的子集,brokers除了包含topic配置外还有DynamicThreadPool、DynamicListenerConfig、DynamicConnectionQuota、LogCleaner配置。

如何使用动态配置

  • 使用脚本kafka-configs.sh和kafka-topics.sh,kafka-configs.sh支持四种类型,kafka-topics.sh仅支持topics类型
  • 使用Apache-kafka官方提供的java版本客户端API调用
  • 直接写zk实现,具体遵循如上写notification和dynamic_config规范

如想更深入了解kafka-configs.sh用法,请查看

总结

  • Kafka配置参数分为静态配置和动态配置,静态分为内置默认与外置用户配置,用户配置优先于内置配置
  • 动态配置为4类型11个zk相对路径,即11种粒度配置对象生效,同类型作用域范围越小优先级越高
  • 动态配置优先级比静态配置优先级高,动态配置中Users与Clients可以组合配置
  • 从设计原理中了解config_change_seqNo生成规则

  • 写上文中理解了写Notification的作用,从而理解什么场景会适合使用zk中持久顺序节点(PERSISTENT_SEQUENTIAL)

注意事项:以上配置解析,是基于Kafka-2.3.1版本分析

参考资料

Release Notes – Kafka – Version 0.8.1:https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html
Release Notes – Kafka – Version 0.10.1.0:https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html
Release Notes – Kafka – Version 0.10.2.0:https://archive.apache.org/dist/kafka/0.10.2.0/RELEASE_NOTES.html
Release Notes – Kafka – Version 0.10.2.1:https://archive.apache.org/dist/kafka/0.10.2.1/RELEASE_NOTES.html
Release Notes – Kafka – Version 1.1.0:https://archive.apache.org/dist/kafka/1.1.0/RELEASE_NOTES.html
Release Notes – Kafka – Version 1.1.1:https://archive.apache.org/dist/kafka/1.1.1/RELEASE_NOTES.html
Release Notes – Kafka – Version 2.0.0:https://archive.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
KIP-21 – Dynamic Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
KIP-226 – Dynamic Broker Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211:https://issues.apache.org/jira/browse/KAFKA-2547
KIP-257 – Configurable Quota Management:https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management
KIP-73 Replication Quotas:https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas

图像增强之空间域锐化

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享