leader选举在kubernetes controller中是如何实现的

在 kubernetes 的 kube-controller-manager , kube-scheduler, 以及使用 operator 的底层实现 controller-rumtime 都支持高可用系统中的 leader 选举,本文将以理解 controller-rumtime (底层的实现是 client-go) 中的 leader 选举以在 kubernetes controller 中是如何实现的。
background
在运行 kube-controller-manager 时,是有一些参数提供给 cm 进行 leader 选举使用的,可以参考官方文档提供的 参数来了解相关参数。
--leader-elect                               default: true--leader-elect-renew-deadline duration       default: 10s--leader-elect-resource-lock string          default: leases--leader-elect-resource-name string       default: kube-controller-manager--leader-elect-resource-namespace string     default: kube-system--leader-elect-retry-period duration         default: 2s... 本身以为这些组件的选举动作时通过 etcd 进行的,但是后面对 controller-runtime 学习时,发现并没有配置其相关的 etcd 相关参数,这就引起了对选举机制的好奇。怀着这种好奇心搜索了下有关于 kubernetes 的选举,发现官网是这么介绍的,下面是对官方的说明进行一个通俗总结。simple leader election with kubernetes

通过阅读文章得知,kubernetes api 提供了一中选举机制,只要运行在集群内的容器,都是可以实现选举功能的。
kubernetes api 通过提供了两个属性来完成选举动作的
resourceversions:每个 api 对象唯一一个 resourceversion
annotations:每个 api 对象都可以对这些 key 进行注释
注:这种选举会增加 apiserver 的压力。也就对 etcd 会产生影响
那么有了这些信息之后,我们来看一下,在 kubernetes 集群中,谁是 cm 的 leader(我们提供的集群只有一个节点,所以本节点就是 leader)。
在 kubernetes 中所有启用了 leader 选举的服务都会生成一个 endpoint ,在这个 endpoint 中会有上面提到的 label(annotations)来标识谁是 leader。
$ kubectl get ep -n kube-systemname                      endpoints   agekube-controller-manager         3d4hkube-dns                              3d4hkube-scheduler                  3d4h 这里以 kube-controller-manager 为例,来看下这个 endpoint 有什么信息
[root@master-machine ~]# kubectl describe ep kube-controller-manager -n kube-systemname:         kube-controller-managernamespace:    kube-systemlabels:       annotations:  control-plane.alpha.kubernetes.io/leader:                {holderidentity:master-machine_06730140-a503-487d-850b-1fe1619f1fe1,leasedurationseconds:15,acquiretime:2022-06-27t1546z,re...subsets:events:  type    reason          age    from                     message  ----    ------          ----   ----                     -------  normal  leaderelection  2d22h  kube-controller-manager  master-machine_76aabcb5-49ff-45ff-bd18-4afa61fbc5af became leader  normal  leaderelection  9m     kube-controller-manager  master-machine_06730140-a503-487d-850b-1fe1619f1fe1 became leader  
可以看出 annotations: control-plane.alpha.kubernetes.io/leader: 标出了哪个 node 是 leader。
election in controller-runtime
controller-runtime 有关 leader 选举的部分在 pkg/leaderelection下面,总共 100 行代码,我们来看下做了些什么?
可以看到,这里只提供了创建资源锁的一些选项
type options struct { // 在manager启动时,决定是否进行选举 leaderelection bool // 使用那种资源锁 默认为租用 lease leaderelectionresourcelock string // 选举发生的名称空间 leaderelectionnamespace string // 该属性将决定持有leader锁资源的名称 leaderelectionid string} 通过 newresourcelock 可以看到,这里是走的 client-go/tools/leaderelection下面,而这个 leaderelection 也有一个 example来学习如何使用它。
通过 example 可以看到,进入选举的入口是一个 runordie() 的函数
// 这里使用了一个lease锁,注释中说愿意为集群中存在lease的监听较少lock := &resourcelock.leaselock{    leasemeta: metav1.objectmeta{        name:      leaselockname,        namespace: leaselocknamespace,    },    client: client.coordinationv1(),    lockconfig: resourcelock.resourcelockconfig{        identity: id,    },}// 开启选举循环leaderelection.runordie(ctx, leaderelection.leaderelectionconfig{    lock: lock,    // 这里必须保证拥有的租约在调用cancel()前终止,否则会仍有一个loop在运行    releaseoncancel: true,    leaseduration:   60 * time.second,    renewdeadline:   15 * time.second,    retryperiod:     5 * time.second,    callbacks: leaderelection.leadercallbacks{        onstartedleading: func(ctx context.context) {            // 这里填写你的代码,            // usually put your code            run(ctx)        },        onstoppedleading: func() {            // 这里清理你的lease            klog.infof(leader lost: %s, id)            os.exit(0)        },        onnewleader: func(identity string) {            // we're notified when new leader elected            if identity == id {                // i just got the lock                return            }            klog.infof(new leader elected: %s, identity)        },    },})  
到这里,我们了解了锁的概念和如何启动一个锁,下面看下,client-go 都提供了那些锁。
在代码 tools/leaderelection/resourcelock/interface.go[6] 定义了一个锁抽象,interface 提供了一个通用接口,用于锁定 leader 选举中使用的资源。
type interface interface { // get 返回选举记录 get(ctx context.context) (*leaderelectionrecord, []byte, error) // create 创建一个leaderelectionrecord create(ctx context.context, ler leaderelectionrecord) error // update will update and existing leaderelectionrecord update(ctx context.context, ler leaderelectionrecord) error // recordevent is used to record events recordevent(string) // identity 返回锁的标识 identity() string // describe is used to convert details on current resource lock into a string describe() string} 那么实现这个抽象接口的就是,实现的资源锁,我们可以看到,client-go 提供了四种资源锁
leaselock
configmaplock
multilock
endpointlock
leaselock
lease 是 kubernetes 控制平面中的通过 etcd 来实现的一个 leases 的资源,主要为了提供分布式租约的一种控制机制。相关对这个 api 的描述可以参考于:lease 。
在 kubernetes 集群中,我们可以使用如下命令来查看对应的 lease
$ kubectl get leases -anamespace         name                      holder                                                agekube-node-lease   master-machine            master-machine                                        3d19hkube-system       kube-controller-manager   master-machine_06730140-a503-487d-850b-1fe1619f1fe1   3d19hkube-system       kube-scheduler            master-machine_1724e2d9-c19c-48d7-ae47-ee4217b27073   3d19h$ kubectl describe leases kube-controller-manager -n kube-systemname:         kube-controller-managernamespace:    kube-systemlabels:       annotations:  api version:  coordination.k8s.io/v1kind:         leasemetadata:  creation timestamp:  2022-06-24t1151z  managed fields:    api version:  coordination.k8s.io/v1    fields type:  fieldsv1    fieldsv1:      f        f        f        f        f        f    manager:         kube-controller-manager    operation:       update    time:            2022-06-24t1151z  resource version:  56012  self link:         /apis/coordination.k8s.io/v1/namespaces/kube-system/leases/kube-controller-manager  uid:               851a32d2-25dc-49b6-a3f7-7a76f152f071spec:  acquire time:            2022-06-27t1546.000000z  holder identity:         master-machine_06730140-a503-487d-850b-1fe1619f1fe1  lease duration seconds:  15  lease transitions:       2  renew time:              2022-06-28t0626.837773zevents:                      
下面来看下 leaselock 的实现,leaselock 会实现了作为资源锁的抽象
type leaselock struct { // leasemeta 就是类似于其他资源类型的属性,包含name ns 以及其他关于lease的属性 leasemeta  metav1.objectmeta client     coordinationv1client.leasesgetter // client 就是提供了informer中的功能 // lockconfig包含上面通过 describe 看到的 identity与recoder用于记录资源锁的更改    lockconfig resourcelockconfig    // lease 就是 api中的lease资源,可以参考下上面给出的这个api的使用 lease      *coordinationv1.lease}  
下面来看下 leaselock 实现了那些方法?
get
get是从 spec 中返回选举的记录
func (ll *leaselock) get(ctx context.context) (*leaderelectionrecord, []byte, error) { var err error ll.lease, err = ll.client.leases(ll.leasemeta.namespace).get(ctx, ll.leasemeta.name, metav1.getoptions{}) if err != nil {  return nil, nil, err } record := leasespectoleaderelectionrecord(&ll.lease.spec) recordbyte, err := json.marshal(*record) if err != nil {  return nil, nil, err } return record, recordbyte, nil}// 可以看出是返回这个资源spec里面填充的值func leasespectoleaderelectionrecord(spec *coordinationv1.leasespec) *leaderelectionrecord { var r leaderelectionrecord if spec.holderidentity != nil {  r.holderidentity = *spec.holderidentity } if spec.leasedurationseconds != nil {  r.leasedurationseconds = int(*spec.leasedurationseconds) } if spec.leasetransitions != nil {  r.leadertransitions = int(*spec.leasetransitions) } if spec.acquiretime != nil {  r.acquiretime = metav1.time{spec.acquiretime.time} } if spec.renewtime != nil {  r.renewtime = metav1.time{spec.renewtime.time} } return &r}  
create
create是在 kubernetes 集群中尝试去创建一个租约,可以看到,client 就是 api 提供的对应资源的 rest 客户端,结果会在 kubernetes 集群中创建这个 lease
func (ll *leaselock) create(ctx context.context, ler leaderelectionrecord) error { var err error ll.lease, err = ll.client.leases(ll.leasemeta.namespace).create(ctx, &coordinationv1.lease{  objectmeta: metav1.objectmeta{   name:      ll.leasemeta.name,   namespace: ll.leasemeta.namespace,  },  spec: leaderelectionrecordtoleasespec(&ler), }, metav1.createoptions{}) return err}  
update
update是更新 lease 的 spec
func (ll *leaselock) update(ctx context.context, ler leaderelectionrecord) error { if ll.lease == nil {  return errors.new(lease not initialized, call get or create first) } ll.lease.spec = leaderelectionrecordtoleasespec(&ler) lease, err := ll.client.leases(ll.leasemeta.namespace).update(ctx, ll.lease, metav1.updateoptions{}) if err != nil {  return err } ll.lease = lease return nil}  
recordevent
recordevent是记录选举时出现的事件,这时候我们回到上部分 在 kubernetes 集群中查看 ep 的信息时可以看到的 event 中存在 became leader 的事件,这里就是将产生的这个 event 添加到 meta-data 中。
func (ll *leaselock) recordevent(s string) {   if ll.lockconfig.eventrecorder == nil {      return   }   events := fmt.sprintf(%v %v, ll.lockconfig.identity, s)   subject := &coordinationv1.lease{objectmeta: ll.lease.objectmeta}   // populate the type meta, so we don't have to get it from the schema   subject.kind = lease   subject.apiversion = coordinationv1.schemegroupversion.string()   ll.lockconfig.eventrecorder.eventf(subject, corev1.eventtypenormal, leaderelection, events)}  
到这里大致上了解了资源锁究竟是什么了,其他种类的资源锁也是相同的实现的方式,这里就不过多阐述了;下面的我们来看看选举的过程。
election workflow
选举的代码入口是在 leaderelection.go,这里会继续上面的 example 向下分析整个选举的过程。
前面我们看到了进入选举的入口是一个 runordie()的函数,那么就继续从这里开始来了解。进入 runordie,看到其实只有几行而已,大致上了解到了 runordie 会使用提供的配置来启动选举的客户端,之后会阻塞,直到 ctx 退出,或停止持有 leader 的租约。
func runordie(ctx context.context, lec leaderelectionconfig) { le, err := newleaderelector(lec) if err != nil {  panic(err) } if lec.watchdog != nil {  lec.watchdog.setleaderelection(le) } le.run(ctx)}  
下面看下 newleaderelector做了些什么?可以看到,leaderelector 是一个结构体,这里只是创建他,这个结构体提供了我们选举中所需要的一切(leaderelector 就是 runordie 创建的选举客户端)。
func newleaderelector(lec leaderelectionconfig) (*leaderelector, error) { if lec.leaseduration <= lec.renewdeadline {  return nil, fmt.errorf(leaseduration must be greater than renewdeadline) } if lec.renewdeadline <= time.duration(jitterfactor*float64(lec.retryperiod)) {  return nil, fmt.errorf(renewdeadline must be greater than retryperiod*jitterfactor) } if lec.leaseduration < 1 {  return nil, fmt.errorf(leaseduration must be greater than zero) } if lec.renewdeadline < 1 {  return nil, fmt.errorf(renewdeadline must be greater than zero) } if lec.retryperiod  0 &&  le.observedtime.add(le.config.leaseduration).after(now.time) &&  !le.isleader() { // 不是leader,进行holderidentity比较,再加上时间,这个时候没有到竞选其,跳出  klog.v(4).infof(lock is held by %v and has not yet expired, oldleaderelectionrecord.holderidentity)  return false } // 3.我们将尝试更新。 在这里leaderelectionrecord设置为默认值。让我们在更新之前更正它。 if le.isleader() { // 到这就说明是leader,修正他的时间  leaderelectionrecord.acquiretime = oldleaderelectionrecord.acquiretime  leaderelectionrecord.leadertransitions = oldleaderelectionrecord.leadertransitions } else { // leadertransitions 就是指leader调整(转变为其他)了几次,如果是,  // 则为发生转变,保持原有值  // 反之,则+1  leaderelectionrecord.leadertransitions = oldleaderelectionrecord.leadertransitions + 1 } // 完事之后更新apiserver中的锁资源,也就是更新对应的资源的属性信息 if err = le.config.lock.update(ctx, leaderelectionrecord); err != nil {  klog.errorf(failed to update lock: %v, err)  return false } // setobservedrecord 是通过一个新的record来更新这个锁中的record // 操作是安全的,会上锁保证临界区仅可以被一个线程/进程操作 le.setobservedrecord(&leaderelectionrecord) return true}  
到这里,已经完整知道利用 kubernetes 进行选举的流程都是什么了;下面简单回顾下,上述 leader 选举所有的步骤:
首选创建的服务就是该服务的 leader,锁可以为 lease , endpoint 等资源进行上锁
已经是 leader 的实例会不断续租,租约的默认值是 15 秒 (leaseduration);leader 在租约满时更新租约时间(renewtime)。
其他的 follower,会不断检查对应资源锁的存在,如果已经有 leader,那么则检查 renewtime,如果超过了租用时间(),则表明 leader 存在问题需要重新启动选举,直到有 follower 提升为 leader。
而为了避免资源被抢占,kubernetes api 使用了 resourceversion 来避免被重复修改(如果版本号与请求版本号不一致,则表示已经被修改了,那么 apiserver 将返回错误)
利用 leader 机制实现 ha 应用
下面就通过一个 example 来实现一个,利用 kubernetes 提供的选举机制完成的高可用应用。
代码实现
如果仅仅是使用 kubernetes 中的锁,实现的代码也只有几行而已。
package mainimport ( context flag fmt os os/signal syscall time metav1 k8s.io/apimachinery/pkg/apis/meta/v1 clientset k8s.io/client-go/kubernetes k8s.io/client-go/rest k8s.io/client-go/tools/clientcmd k8s.io/client-go/tools/leaderelection k8s.io/client-go/tools/leaderelection/resourcelock k8s.io/klog/v2)func buildconfig(kubeconfig string) (*rest.config, error) { if kubeconfig !=  {  cfg, err := clientcmd.buildconfigfromflags(, kubeconfig)  if err != nil {   return nil, err  }  return cfg, nil } cfg, err := rest.inclusterconfig() if err != nil {  return nil, err } return cfg, nil}func main() { klog.initflags(nil) var kubeconfig string var leaselockname string var leaselocknamespace string var id string // 初始化客户端的部分 flag.stringvar(&kubeconfig, kubeconfig, , absolute path to the kubeconfig file) flag.stringvar(&id, id, , the holder identity name) flag.stringvar(&leaselockname, lease-lock-name, , the lease lock resource name) flag.stringvar(&leaselocknamespace, lease-lock-namespace, , the lease lock resource namespace) flag.parse() if leaselockname ==  {  klog.fatal(unable to get lease lock resource name (missing lease-lock-name flag).) } if leaselocknamespace ==  {  klog.fatal(unable to get lease lock resource namespace (missing lease-lock-namespace flag).) } config, err := buildconfig(kubeconfig) if err != nil {  klog.fatal(err) } client := clientset.newforconfigordie(config) run := func(ctx context.context) {  // 实现的业务逻辑,这里仅仅为实验,就直接打印了  klog.info(controller loop...)  for {   fmt.println(i am leader, i was working.)   time.sleep(time.second * 5)  } } // use a go context so we can tell the leaderelection code when we // want to step down ctx, cancel := context.withcancel(context.background()) defer cancel() // 监听系统中断 ch := make(chan os.signal, 1) signal.notify(ch, os.interrupt, syscall.sigterm) go func() {  <-ch  klog.info(received termination, signaling shutdown)  cancel() }() // 创建一个资源锁 lock := &resourcelock.leaselock{  leasemeta: metav1.objectmeta{   name:      leaselockname,   namespace: leaselocknamespace,  },  client: client.coordinationv1(),  lockconfig: resourcelock.resourcelockconfig{   identity: id,  }, } // 开启一个选举的循环 leaderelection.runordie(ctx, leaderelection.leaderelectionconfig{  lock:            lock,  releaseoncancel: true,  leaseduration:   60 * time.second,  renewdeadline:   15 * time.second,  retryperiod:     5 * time.second,  callbacks: leaderelection.leadercallbacks{   onstartedleading: func(ctx context.context) {    // 当选举为leader后所运行的业务逻辑    run(ctx)   },   onstoppedleading: func() {    // we can do cleanup here    klog.infof(leader lost: %s, id)    os.exit(0)   },   onnewleader: func(identity string) { // 申请一个选举时的动作    if identity == id {     return    }    klog.infof(new leader elected: %s, identity)   },  }, })}  

注:这种 lease 锁只能在 in-cluster 模式下运行,如果需要类似二进制部署的程序,可以选择 endpoint 类型的资源锁。
生成镜像
这里已经制作好了镜像并上传到 dockerhub(cylonchau/leaderelection:v0.0.2)上了,如果只要学习运行原理,则忽略此步骤
from golang:alpine as buildermaintainer cylonworkdir /electioncopy . /electionenv goproxy https://goproxy.cn,directrun goos=linux goarch=amd64 cgo_enabled=0 go build -o elector main.gofrom alpine as runnerworkdir /go/electorcopy --from=builder /election/elector .volume [/election]entrypoint [./elector]  
准备资源清单
默认情况下,kubernetes 运行的 pod 在请求 kubernetes 集群内资源时,默认的账户是没有权限的,默认服务帐户无权访问协调  api,因此我们需要创建另一个 serviceaccount 并相应地设置  对应的 rbac 权限绑定;在清单中配置上这个 sa,此时所有的 pod 就会有协调锁的权限了。
apiversion: v1kind: serviceaccountmetadata:  name: sa-leaderelection---apiversion: rbac.authorization.k8s.io/v1kind: rolemetadata:  name: leaderelectionrules:  - apigroups:      - coordination.k8s.io    resources:      - leases    verbs:      - '*'---apiversion: rbac.authorization.k8s.io/v1kind: rolebindingmetadata:  name: leaderelectionroleref:  apigroup: rbac.authorization.k8s.io  kind: role  name: leaderelectionsubjects:  - kind: serviceaccount    name: sa-leaderelection---apiversion: apps/v1kind: deploymentmetadata:  labels:    app: leaderelection  name: leaderelection  namespace: defaultspec:  replicas: 3  selector:    matchlabels:      app: leaderelection  template:    metadata:      labels:        app: leaderelection    spec:      containers:        - image: cylonchau/leaderelection:v0.0.2          imagepullpolicy: ifnotpresent          command: [./elector]          args:          - -id=$(pod_name)          - -lease-lock-name=test          - -lease-lock-namespace=default          env:          - name: pod_name            valuefrom:              fieldref:                apiversion: v1                fieldpath: metadata.name          name: elector      serviceaccountname: sa-leaderelection  
集群中运行
执行完清单后,当 pod 启动后,可以看到会创建出一个 lease。
$ kubectl get leasename   holder                            agetest   leaderelection-5644c5f84f-frs5n   1s$ kubectl describe leasename:         testnamespace:    defaultlabels:       annotations:  api version:  coordination.k8s.io/v1kind:         leasemetadata:  creation timestamp:  2022-06-28t1645z  managed fields:    api version:  coordination.k8s.io/v1    fields type:  fieldsv1    fieldsv1:      f        f        f        f        f        f    manager:         elector    operation:       update    time:            2022-06-28t1645z  resource version:  131693  self link:         /apis/coordination.k8s.io/v1/namespaces/default/leases/test  uid:               bef2b164-a117-44bd-bad3-3e651c94c97bspec:  acquire time:            2022-06-28t1645.931873z  holder identity:         leaderelection-5644c5f84f-frs5n  lease duration seconds:  60  lease transitions:       0  renew time:              2022-06-28t1655.963537zevents:                      
通过其持有者的信息查看对应 pod(因为程序中对 holder identity 设置的是 pod 的名称),实际上是工作的 pod。
如上实例所述,这是利用 kubernetes 集群完成的 leader 选举的方案,虽然这不是最完美解决方案,但这是一种简单的方法,因为可以无需在集群上部署更多东西或者进行大量的代码工作就可以利用 kubernetes 集群来实现一个高可用的 ha 应用。


紫光发布全新一代自主研发人工智能服务器
采用Imagination的OmniShield™多域技术的解决方案
新基建引力 PCB或迎来快速增长
工业智能网关BL110应用之三十五: 如何连接配置金鸽MQTT云服务器
Overcurrent/Overdischarge Prot
leader选举在kubernetes controller中是如何实现的
FPC物料中英文对照表
LG正式发布LG Gram 16笔记本
许斌发表了《RGB全系列突进》的主题演讲
美国国防预先研究计划局:开发量子传感器以改变战场通信
Timing Considerations When Usi
海得控制12.78亿元收购新能源工控领域企业75%股权
恩智浦与华为携手成立射频功放联合研发及创新实验中心
高速PCB设计指南---PCB的可靠性设计
汽车连接器的生产技术水平如何?
台湾高技 GAOJ-K与您相约DMP大湾区工博会
使用单片机设计一款功率测试仪
三星W20 5G开启预约该机搭载骁龙855平台辅以12GB+512GB内存组合
哪些将会推动统一通信和协作的发展
Sovit3D数字孪生智慧水务三维可视化系统