当前位置:首页 > 数码 > 基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)

基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)

admin7个月前 (04-26)数码48

本文剖析k8scontroller中informer启动的基本流程

不论是k8s自身组件,还是自己编写controller,都须要经过apiserver监听etcd事情来成功自己的控制循环逻辑。

如何高效牢靠启动事情监听,k8s客户端工具包client-go提供了一个通用的informer包,经过informer,可以繁难和高效的启动controller开发。

informer包提供了如下的一些性能:

1、本地缓存(store)

2、索引机制(indexer)

3、Handler注册性能(eventHandler)

1、informer架构

整个informer机制架构如下图(图片源自Client-go):

图片

可以看到这张图分为高低两个局部,上半局部由client-go提供,下半局部则是须要自己成功的控制循环逻辑

本文关键剖析上半局部的逻辑,包含上方几个组件:

1.1、Reflector:

从图上可以看到Reflector是一个和apiserver交互的组件,经过list和watchapi将资源对象压入队列

1.2、DeltaFifo:

DeltaFifo的结构体表示如下:

typeDeltaFIFOstruct{...//Wedependonthepropertythatitemsinthesetarein//thequeueandviceversa,andthatallDeltasinthis//maphaveatleastoneDelta.itemsmap[string]Deltasqueue[]string...}

关键分为两局部,fifo和delta

(1)fifo:先进先出队列

对应结构体中的queue,结构体示例如下:

[default/-fd77b5886-pfrgn,xxx,xxx]

(2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个map,结构体示例如下:

map:{"default/centos-fd77b5886-pfrgn":[{Replaced&Pod{ObjectMeta:${pod参数}],"xxx":[{},{}]}

消费者从queue中pop出对象启动消费,并从items失掉详细的消费操作(执执行作Update/Deleted/Sync,和执行的对象objectspec)

1.3、Indexer:

client-go用来存储资源对象并自带索引性能的本地存储,deltaFIFO中pop出的对象将存储到Indexer。

indexer与etcd集群中的数据坚持分歧,从而client-go可以间接从本地缓存失掉资源对象,缩小apiserver和etcd集群的压力。

2、一个基本例子

funcmn(){stopCh:=make(chanstruct{})deferclose(stopCh)//(1)Newak8sclientsetmasterUrl:="172.27.32.110:8080"config,err:=clientcmd.BuildConfigFromFlags(masterUrl,"")iferr!=nil{klog.Errorf("BuildConfigFromFlagserr,err:%v",err)}clientset,err:=k.NewForConfig(config)iferr!=nil{klog.Errorf("Getclientseterr,err:%v",err)}//(2)NewasharedInformersfactorysharedInformers:=informers.NewSharedInformerFactory(clientset,defaultResync)//(3)Registerainformer//f.informers[informerType]=informer,//thedetailforinformerisbuildinNewFilteredPodInformer()podInformer:=sharedInformers.Core().V1().Pods().Informer()//(4)RegistereventhandlerpodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){mObj:=obj.(v1.Object)klog.Infof("Getnewobj:%v",mObj)klog.Infof("Getnewobjname:%s",mObj.GetName())},})//(5)StartallinformerssharedInformers.Start(stopCh)//(6)Acronjobforcachesyncif!cache.WaitForCacheSync(stopCh,podInformer.HasSynced){klog.Infof("Cachesyncfail!")}//(7)UselisterpodLister:=sharedInformers.Core().V1().Pods().Lister()pods,err:=podLister.List(labels.Everything())iferr!=nil{klog.Infof("err:%v",err)}klog.Infof("len(pods),%d",len(pods))for_,v:=rangepods{klog.Infof("pod:%s",v.Name)}<-stopChan}

上方就是一个繁难的informer的经常使用例子,整个环节如上述几个步骤,着重说一下(2)、(3)、(4)、(5)四个步骤

3、流程剖析

3.1、NewasharedInformersfactory

sharedInformers:=informers.NewSharedInformerFactory(clientset,defaultResync)factory:=&sharedInformerFactory{client:client,namespace:v1.NamespaceAll,defaultResync:defaultResync,informers:make(map[reflect.Type]cache.SharedIndexInformer),startedInformers:make(map[reflect.Type]bool),customResync:make(map[reflect.Type]time.Duration),}

这个环节就是创立一个informer的工厂sharedInformerFactory,sharedInformerFactory中有一个informers对象,外面是一个informer的map,sharedInformerFactory是为了防止过多的重复informer监听apiserver,造成apiserver压力过大,在同一个服务中,不同的controller经常使用同一个informer

3.2、Registerainformer

这个环节关键是生成和注册informer到sharedInformerFactory

podInformer:=sharedInformers.Core().V1().Pods().Informer()func(f*podInformer)Informer()cache.SharedIndexInformer{returnf.factory.InformerFor(&corev1.Pod{},f.defaultInformer)}###f.factory.InformerFor:###注册informerfunc(f*sharedInformerFactory)InformerFor(objruntime.Object,newFuncinternalinterfaces.NewInformerFunc)cache.SharedIndexInformer{...informer=newFunc(f.client,resyncPeriod)f.informers[informerType]=informerreturninformer}###f.defaultInformer:###生成informerfunc(f*podInformer)defaultInformer(clientk.Interface,resyncPeriodtime.Duration)cache.SharedIndexInformer{returnNewFilteredPodInformer(client,f.namespace,resyncPeriod,cache.Indexers{cache.NamespaceIndex:cache.MetaNamespaceIndexFunc},f.tweakListOptions)}funcNewFilteredPodInformer(clientk.Interface,namespacestring,resyncPeriodtime.Duration,indexerscache.Indexers,tweakListOptionsinternalinterfaces.TweakListOptionsFunc)cache.SharedIndexInformer{returncache.NewSharedIndexInformer(&cache.ListWatch{ListFunc:func(optionsmetav1.ListOptions)(runtime.Object,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Pods(namespace).List(context.TODO(),options)},WatchFunc:func(optionsmetav1.ListOptions)(watch.Interface,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Pods(namespace).Watch(context.TODO(),options)},},&corev1.Pod{},resyncPeriod,indexers,)}###cache.NewSharedIndexInformer:funcNewSharedIndexInformer(lwListerWatcher,exampleObjectruntime.Object,defaultEventHandlerResyncPeriodtime.Duration,indexersIndexers)SharedIndexInformer{realClock:=&clock.RealClock{}sharedIndexInformer:=&sharedIndexInformer{processor:&sharedProcessor{clock:realClock},indexer:NewIndexer(DeletionHandlingMetaNamespaceKeyFunc,indexers),listerWatcher:lw,objectType:exampleObject,resyncCheckPeriod:defaultEventHandlerResyncPeriod,defaultEventHandlerResyncPeriod:defaultEventHandlerResyncPeriod,cacheMutationDetector:NewCacheMutationDetector(fmt.Sprintf("%T",exampleObject)),clock:realClock,}returnsharedIndexInformer}

首先经过f.defaultInformer方法生成informer,而后经过f.factory.InformerFor方法,将informer注册到sharedInformerFactory

3.3、Registereventhandler

这个环节展现如何注册一个回调函数,以及如何触发这个回调函数

###podInformer.AddEventHandler:func(s*sharedIndexInformer)AddEventHandler(handlerResourceEventHandler){s.AddEventHandlerWithResyncPeriod(handler,s.defaultEventHandlerResyncPeriod)}func(s*sharedIndexInformer)AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration){...listener:=newProcessListener(handler,resyncPeriod,determineResyncPeriod(resyncPeriod,s.resyncCheckPeriod),s.clock.Now(),initialBufferSize)if!s.started{s.processor.addListener(listener)return}...}###s.processor.addListener(listener):func(p*sharedProcessor)addListener(listener*processorListener){p.addListenerLocked(listener)ifp.listenersStarted{p.wg.Start(listener.run)p.wg.Start(listener.pop)}}###listener.run:func(p*processorListener)run(){//thiscallblocksuntilthechannelisclosed.Whenapanichensduringthenotification//wewillcatchit,**theoffendingitemwillbeskipped!**,andafterashortdelay(onesecond)//thenextnotificationwillbeattempted.Thisisusuallybetterthanthealternativeofnever//deliveringagain.stopCh:=make(chanstruct{})wait.Until(func(){fornext:=rangep.nextCh{switchnotification:=next.(type){//经过next结构体自身的类型来判别事情类型caseupdateNotification:p.handler.OnUpdate(notification.oldObj,notification.newObj)caseaddNotification:p.handler.OnAdd(notification.newObj)casedeleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognizednotification:%T",next))}}//theonlywaytogethereisifthep.nextChisemptyandclosedclose(stopCh)},1*time.Second,stopCh)}###listener.pop:func(p*processorListener)pop(){varnextChchan<-interface{}varnotificationinterface{}for{select{casenextCh<-notification://Notificationdispatchedvarokboolnotification,ok=p.pendingNotifications.ReadOne()if!ok{//NothingtopopnextCh=nil//Disablethisselectcase}casenotificationToAdd,ok:=<-p.addCh:if!ok{return}ifnotification==nil{//Nonotificationtopop(andpendingNotificationsisempty)//Optimizethecase-skipaddingtopendingNotificationsnotification=notificationToAddnextCh=p.nextCh}else{//Thereisalreadyanotificationwaitingtobedispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}}

这个环节总结就是:

(1)AddEventHandler到sharedProcessor,注册事情回调函数到sharedProcessor

(2)listenerpop方法里会监听p.addCh,经过nextCh=p.nextCh将addCh将事情传递给p.nextCh

(3)listenerrun方法里会监听p.nextCh,收到信号之后,判别是属于什么类型的方法,并且执行前面注册的Handler

所以后面须要关注当资源对象出现变卦时,是如何将变卦信号给p.addCh,进一步触发回调函数的

Kubernetes

3.4、Startallinformers

经过sharedInformers.Start(stopCh)启动一切的informer,代码如下:

//Startinitializesallrequestedinformers.func(f*sharedInformerFactory)Start(stopCh<-chanstruct{}){forinformerType,informer:=rangef.informers{if!f.startedInformers[informerType]{goinformer.Run(stopCh)f.startedInformers[informerType]=true}}}

咱们的例子中其实就只启动了PodInformer,接上去看到podInformer的Run方法做了什么

###goinformer.Run(stopCh):func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{//DeltafifoKnownObjects:s.indexer,EmitDeltaTypeReplaced:true,})cfg:=&Config{Queue:fifo,//DeltafifoListerWatcher:s.listerWatcher,//listerWatcherObjectType:s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:false,ShouldResync:s.processor.shouldResync,//HandleDeltas,addedtoprocess,anddoneinprocessloopProcess:s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,}func(){...s.controller=New(cfg)...}s.controller.Run(stopCh)}###s.controller.Run(stopCh)func(c*controller)Run(stopCh<-chanstruct{}){r:=NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)c.reflector=r//Runreflectorwg.StartWithChannel(stopCh,r.Run)//RunprocessLoop,popfromdeltafifoanddoProcessFunc,//ProcessFuncisthes.HandleDeltasbeforewait.Until(c.processLoop,time.Second,stopCh)}

可以看到上方的逻辑首先生成一个DeltaFifo,而后接上去的逻辑分为两块,消费和消费:

(1)消费—r.Run:

关键的逻辑就是应用listandwatch将资源对象包含操作类型压入队列DeltaFifo

####r.Run:func(r*Reflector)Run(stopCh<-chanstruct{}){//执行listAndWatchiferr:=r.ListAndWatch(stopCh);}//执行ListAndWatch流程func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{//1、list://(1)、listpods,实践调用的是podInformer里的ListFunc方法,//client.CoreV1().Pods(namespace).List(context.TODO(),options)r.listerWatcher.List(opts)//(2)、失掉资源版本号,用于watchresourceVersion=listMetaInterface.GetResourceVersion()//(3)、数据转换,转换成列表items,err:=meta.ExtractList(list)//(4)、将资源列表中的资源对象和版本号存储到DeltaFifo中r.syncWith(items,resourceVersion);//2、watch,有限循环去watchapiserver,当watch到事情的时刻,执行watchHandler将event事情压入fifofor{//(1)、watchpods,实践调用的是podInformer里的WatchFunc方法,//client.CoreV1().Pods(namespace).Watch(context.TODO(),options)w,err:=r.listerWatcher.Watch(options)//(2)、watchHandler//watchHandlerwatchespod,降级DeltaFifo消息,并且降级resourceVersioniferr:=r.watchHandler(start,w,&resourceVersion,resyncerrc,stopCh);}}###r.watchHandler//watchHandlerwatcheswandkeeps*resourceVersionuptodate.func(r*Reflector)watchHandler(starttime.Time,wwatch.Interface,resourceVersion*string,errcchanerror,stopCh<-chanstruct{})error{...loop:for{select{caseevent,ok:=<-w.ResultChan():newResourceVersion:=meta.GetResourceVersion()switchevent.Type{casewatch.Added:err:=r.store.Add(event.Object)//Addeventtosrore,store的详细方法在fifo中iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s:unabletoaddwatcheventobject(%#v)tostore:%v",r.name,event.Object,err))}...}*resourceVersion=newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}...}###r.store.Add:##即为deltaFifo的add方法:func(f*DeltaFIFO)Add(objinterface{})error{...returnf.queueActionLocked(Added,obj)...}func(f*DeltaFIFO)queueActionLocked(actionTypeDeltaType,objinterface{})error{id,err:=f.KeyOf(obj)iferr!=nil{returnKeyError{obj,err}}newDeltas:=append(f.items[id],Delta{actionType,obj})newDeltas=dedupDeltas(newDeltas)iflen(newDeltas)>0{if_,exists:=f.items[id];!exists{f.queue=append(f.queue,id)}f.items[id]=newDeltasf.cond.Broadcast()//通知一切阻塞住的消费者}...returnnil}
(2)消费—c.processLoop:

消费逻辑就是从DeltaFifopop出对象,而后做两件事情:(1)触发前面注册的eventhandler(2)降级本地索引缓存indexer,坚持数据和etcd分歧

func(c*controller)processLoop(){for{obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))}}###Queue.Pop:##Queue.Pop是一个带有处置函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){for{//有限循环forlen(f.queue)==0{f.cond.Wait()//阻塞直到消费端broadcast方法通知}id:=f.queue[0]item,ok:=f.items[id]delete(f.items,id)err:=process(item)//执行处置方法ife,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)//假设处置失败的从新添加到fifo中从新处置err=e.Err}returnitem,err}}###c.config.Process:##c.config.Process是在初始化controller的时刻赋值的,即为前面的s.HandleDeltas###s.HandleDeltas:func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{s.blockDeltas.Lock()defers.blockDeltas.Unlock()//fromoldesttonewestfor_,d:=rangeobj.(Deltas){switchd.Type{caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{iferr:=s.indexer.Update(d.Object);err!=nil{returnerr}isSync:=falseswitch{cased.Type==Sync://SynceventsareonlypropagatedtolistenersthatrequestedresyncisSync=truecased.Type==Replaced:ifaccessor,err:=meta.Accessor(d.Object);err==nil{ifoldAccessor,err:=meta.Accessor(old);err==nil{//Replacedeventsthatdidn'tchangeresourceVersionaretreatedasresyncevents//andonlypropagatedtolistenersthatrequestedresyncisSync=accessor.GetResourceVersion()==oldAccessor.GetResourceVersion()}}}s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)}else{iferr:=s.indexer.Add(d.Object);err!=nil{returnerr}s.processor.distribute(addNotification{newObj:d.Object},false)}caseDeleted:iferr:=s.indexer.Delete(d.Object);err!=nil{returnerr}s.processor.distribute(deleteNotification{oldObj:d.Object},false)}}returnnil}

可以看到上方关键执行两局部逻辑:

s.processor.distribute
####s.processor.distribute:###例如新增通知:s.processor.distribute(addNotification{newObj:d.Object},false)###其中addNotification就是add类型的通知,前面会经过notification结构体的类型来执行不同的eventHandlerfunc(p*sharedProcessor)distribute(objinterface{},syncbool){p.listenersLock.RLock()deferp.listenersLock.RUnlock()ifsync{for_,listener:=rangep.syncingListeners{listener.add(obj)}}else{for_,listener:=rangep.listeners{listener.add(obj)}}}func(p*processorListener)add(notificationinterface{}){p.addCh<-notification//新增notification到addCh}

这里p.addCh对应到前面说的关注对象p.addCh,processorListener收到addCh信号之后传递给nextCh,而后经过notification结构体的类型来执行不同的eventHandler

s.indexer的增删改:

这个就是本地数据的缓存和索引,自定义控制逻辑外面会经过indexer失掉操作对象的详细参数,这里就不开展细讲了。

4、总结

至此一个informer的client-go局部的流程就走完了,可以看到启动informer关键流程就是:

1、ReflectorListAndWatch:

(1)经过一个reflectorrun起来一个带有list和watchapi的client

(2)list到的pod列表经过DeltaFifo存储,并降级最新的ResourceVersion

(3)继续监听pod,监听到的pod操作事情继续存储到DeltaFifo中

2、DeltaFifo消费和消费:

(1)消费:listandwatch到的事情消费压入队列DeltaFifo

(2)消费:执行注册的eventHandler,并降级本地indexer

所以informer实质其实就是一个经过deltaFifo建设消费消费机制,并且带有本地缓存和索引,以及可以注册回调事情的apiServer的客户端库。

5、参考

免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。

标签: Kubernetes

“基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)” 的相关文章

Kubernetes-网关-战略的流量治理-基于-API (kubernetes)

Kubernetes-网关-战略的流量治理-基于-API (kubernetes)

Kubees网关API经过形象复杂性并提供申明式的方法来定义路由和流量战略,简化了性能流程。 译自EffectiveTrafficManagementwithKubernetesGatewa...

Kubernetes-集群的十年历程-管理-踩过的十个大坑 (kubernetes)

Kubernetes-集群的十年历程-管理-踩过的十个大坑 (kubernetes)

Kubernetes是容器技术的绝对王者,它允许我们在YAML文件中描述应用程序的外观,然后Kubernetes会完成其余的工作。 高效管理Kubernetes集群至关重要。本文总结了管理K...

分步实现指南-基于Kubernetes构建Nacos高可用集群 (分步实施的步骤)

分步实现指南-基于Kubernetes构建Nacos高可用集群 (分步实施的步骤)

前提条件 安装并配置 Kubernetes 集群。 准备持久化存储(如 NFS、PV 等)用于保存 Nacos 数据。 修改 Nacos 配置 按照以下步骤...

优秀Kubernetes工具的最终指南 (优秀库)

优秀Kubernetes工具的最终指南 (优秀库)

引言 Kubernetes 是用于管理容器化应用程序编排的领先平台。它提供了出色的功能,例如自动扩展、自动修复和负载平衡,这些功能使其成为软件工程师的首选解决方案。Kubernetes 的管理可...

100个常用命令-Kubernetes-提升集群管理和故障排除效率 (100个常用的关联词)

100个常用命令-Kubernetes-提升集群管理和故障排除效率 (100个常用的关联词)

本指南提供了全面的命令清单,用于诊断 Kubernetes 集群以及在其中运行的应用程序。请在使用这些命令时务必将占位符(如 <namespace> 和...

LTS-现状-常年支持-的-Kubernetes-解谜与揭秘 (ltsg)

LTS-现状-常年支持-的-Kubernetes-解谜与揭秘 (ltsg)

从一个幽默的疑问引出很多人都在关注的KubeesLTS的疑问。 幽默的疑问 2019年,一个名为apiserverLoopbackClientServercertexpire...

ChatGPT避而不谈-揭开Kubernetes消费运行的十大秘诀 (chatgpt官网)

ChatGPT避而不谈-揭开Kubernetes消费运行的十大秘诀 (chatgpt官网)

理想证实,生成式在许多相对基础的用例中已施展作用,然而当它须要在技术方面给予更多指点时,体现又如何呢? 在推出时,咱们也和大家一样想将它给出的答案与惯例网络搜查获取的答案启动比拟。咱们启动试...

Gateway-维护者透露未来规划-API-Kubernetes-上线-1.0 (gateway翻译成中文)

Gateway-维护者透露未来规划-API-Kubernetes-上线-1.0 (gateway翻译成中文)

经过四年的努力,Kubernetes Gateway API 现已达到生产就绪状态。它提供了标准化的方法来管理进出 Kubernetes 集群的网络流量。 新特性...