基本原理揭秘-Informer-深化了解-Kubernetes (基本原理的作用)
本文剖析k8scontroller中informer启动的基本流程
不论是k8s自身组件,还是自己编写controller,都须要经过apiserver监听etcd事情来成功自己的控制循环逻辑。
如何高效牢靠启动事情监听,k8s客户端工具包client-go提供了一个通用的informer包,经过informer,可以繁难和高效的启动controller开发。
informer包提供了如下的一些性能:
1、本地缓存(store)
2、索引机制(indexer)
3、Handler注册性能(eventHandler)
1、informer架构
整个informer机制架构如下图(图片源自Client-go):
图片
可以看到这张图分为高低两个局部,上半局部由client-go提供,下半局部则是须要自己成功的控制循环逻辑
本文关键剖析上半局部的逻辑,包含上方几个组件:
1.1、Reflector:
从图上可以看到Reflector是一个和apiserver交互的组件,经过list和watchapi将资源对象压入队列
1.2、DeltaFifo:
DeltaFifo的结构体表示如下:
typeDeltaFIFOstruct{...//Wedependonthepropertythatitemsinthesetarein//thequeueandviceversa,andthatallDeltasinthis//maphaveatleastoneDelta.itemsmap[string]Deltasqueue[]string...}
关键分为两局部,fifo和delta
(1)fifo:先进先出队列
对应结构体中的queue,结构体示例如下:
[default/-fd77b5886-pfrgn,xxx,xxx]
(2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个map,结构体示例如下:
map:{"default/centos-fd77b5886-pfrgn":[{Replaced&Pod{ObjectMeta:${pod参数}],"xxx":[{},{}]}
消费者从queue中pop出对象启动消费,并从items失掉详细的消费操作(执执行作Update/Deleted/Sync,和执行的对象objectspec)
1.3、Indexer:
client-go用来存储资源对象并自带索引性能的本地存储,deltaFIFO中pop出的对象将存储到Indexer。
indexer与etcd集群中的数据坚持分歧,从而client-go可以间接从本地缓存失掉资源对象,缩小apiserver和etcd集群的压力。
2、一个基本例子
funcmn(){stopCh:=make(chanstruct{})deferclose(stopCh)//(1)Newak8sclientsetmasterUrl:="172.27.32.110:8080"config,err:=clientcmd.BuildConfigFromFlags(masterUrl,"")iferr!=nil{klog.Errorf("BuildConfigFromFlagserr,err:%v",err)}clientset,err:=k.NewForConfig(config)iferr!=nil{klog.Errorf("Getclientseterr,err:%v",err)}//(2)NewasharedInformersfactorysharedInformers:=informers.NewSharedInformerFactory(clientset,defaultResync)//(3)Registerainformer//f.informers[informerType]=informer,//thedetailforinformerisbuildinNewFilteredPodInformer()podInformer:=sharedInformers.Core().V1().Pods().Informer()//(4)RegistereventhandlerpodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:func(objinterface{}){mObj:=obj.(v1.Object)klog.Infof("Getnewobj:%v",mObj)klog.Infof("Getnewobjname:%s",mObj.GetName())},})//(5)StartallinformerssharedInformers.Start(stopCh)//(6)Acronjobforcachesyncif!cache.WaitForCacheSync(stopCh,podInformer.HasSynced){klog.Infof("Cachesyncfail!")}//(7)UselisterpodLister:=sharedInformers.Core().V1().Pods().Lister()pods,err:=podLister.List(labels.Everything())iferr!=nil{klog.Infof("err:%v",err)}klog.Infof("len(pods),%d",len(pods))for_,v:=rangepods{klog.Infof("pod:%s",v.Name)}<-stopChan}
上方就是一个繁难的informer的经常使用例子,整个环节如上述几个步骤,着重说一下(2)、(3)、(4)、(5)四个步骤
3、流程剖析
3.1、NewasharedInformersfactory
sharedInformers:=informers.NewSharedInformerFactory(clientset,defaultResync)factory:=&sharedInformerFactory{client:client,namespace:v1.NamespaceAll,defaultResync:defaultResync,informers:make(map[reflect.Type]cache.SharedIndexInformer),startedInformers:make(map[reflect.Type]bool),customResync:make(map[reflect.Type]time.Duration),}
这个环节就是创立一个informer的工厂sharedInformerFactory,sharedInformerFactory中有一个informers对象,外面是一个informer的map,sharedInformerFactory是为了防止过多的重复informer监听apiserver,造成apiserver压力过大,在同一个服务中,不同的controller经常使用同一个informer
3.2、Registerainformer
这个环节关键是生成和注册informer到sharedInformerFactory
podInformer:=sharedInformers.Core().V1().Pods().Informer()func(f*podInformer)Informer()cache.SharedIndexInformer{returnf.factory.InformerFor(&corev1.Pod{},f.defaultInformer)}###f.factory.InformerFor:###注册informerfunc(f*sharedInformerFactory)InformerFor(objruntime.Object,newFuncinternalinterfaces.NewInformerFunc)cache.SharedIndexInformer{...informer=newFunc(f.client,resyncPeriod)f.informers[informerType]=informerreturninformer}###f.defaultInformer:###生成informerfunc(f*podInformer)defaultInformer(clientk.Interface,resyncPeriodtime.Duration)cache.SharedIndexInformer{returnNewFilteredPodInformer(client,f.namespace,resyncPeriod,cache.Indexers{cache.NamespaceIndex:cache.MetaNamespaceIndexFunc},f.tweakListOptions)}funcNewFilteredPodInformer(clientk.Interface,namespacestring,resyncPeriodtime.Duration,indexerscache.Indexers,tweakListOptionsinternalinterfaces.TweakListOptionsFunc)cache.SharedIndexInformer{returncache.NewSharedIndexInformer(&cache.ListWatch{ListFunc:func(optionsmetav1.ListOptions)(runtime.Object,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Pods(namespace).List(context.TODO(),options)},WatchFunc:func(optionsmetav1.ListOptions)(watch.Interface,error){iftweakListOptions!=nil{tweakListOptions(&options)}returnclient.CoreV1().Pods(namespace).Watch(context.TODO(),options)},},&corev1.Pod{},resyncPeriod,indexers,)}###cache.NewSharedIndexInformer:funcNewSharedIndexInformer(lwListerWatcher,exampleObjectruntime.Object,defaultEventHandlerResyncPeriodtime.Duration,indexersIndexers)SharedIndexInformer{realClock:=&clock.RealClock{}sharedIndexInformer:=&sharedIndexInformer{processor:&sharedProcessor{clock:realClock},indexer:NewIndexer(DeletionHandlingMetaNamespaceKeyFunc,indexers),listerWatcher:lw,objectType:exampleObject,resyncCheckPeriod:defaultEventHandlerResyncPeriod,defaultEventHandlerResyncPeriod:defaultEventHandlerResyncPeriod,cacheMutationDetector:NewCacheMutationDetector(fmt.Sprintf("%T",exampleObject)),clock:realClock,}returnsharedIndexInformer}
首先经过f.defaultInformer方法生成informer,而后经过f.factory.InformerFor方法,将informer注册到sharedInformerFactory
3.3、Registereventhandler
这个环节展现如何注册一个回调函数,以及如何触发这个回调函数
###podInformer.AddEventHandler:func(s*sharedIndexInformer)AddEventHandler(handlerResourceEventHandler){s.AddEventHandlerWithResyncPeriod(handler,s.defaultEventHandlerResyncPeriod)}func(s*sharedIndexInformer)AddEventHandlerWithResyncPeriod(handlerResourceEventHandler,resyncPeriodtime.Duration){...listener:=newProcessListener(handler,resyncPeriod,determineResyncPeriod(resyncPeriod,s.resyncCheckPeriod),s.clock.Now(),initialBufferSize)if!s.started{s.processor.addListener(listener)return}...}###s.processor.addListener(listener):func(p*sharedProcessor)addListener(listener*processorListener){p.addListenerLocked(listener)ifp.listenersStarted{p.wg.Start(listener.run)p.wg.Start(listener.pop)}}###listener.run:func(p*processorListener)run(){//thiscallblocksuntilthechannelisclosed.Whenapanichensduringthenotification//wewillcatchit,**theoffendingitemwillbeskipped!**,andafterashortdelay(onesecond)//thenextnotificationwillbeattempted.Thisisusuallybetterthanthealternativeofnever//deliveringagain.stopCh:=make(chanstruct{})wait.Until(func(){fornext:=rangep.nextCh{switchnotification:=next.(type){//经过next结构体自身的类型来判别事情类型caseupdateNotification:p.handler.OnUpdate(notification.oldObj,notification.newObj)caseaddNotification:p.handler.OnAdd(notification.newObj)casedeleteNotification:p.handler.OnDelete(notification.oldObj)default:utilruntime.HandleError(fmt.Errorf("unrecognizednotification:%T",next))}}//theonlywaytogethereisifthep.nextChisemptyandclosedclose(stopCh)},1*time.Second,stopCh)}###listener.pop:func(p*processorListener)pop(){varnextChchan<-interface{}varnotificationinterface{}for{select{casenextCh<-notification://Notificationdispatchedvarokboolnotification,ok=p.pendingNotifications.ReadOne()if!ok{//NothingtopopnextCh=nil//Disablethisselectcase}casenotificationToAdd,ok:=<-p.addCh:if!ok{return}ifnotification==nil{//Nonotificationtopop(andpendingNotificationsisempty)//Optimizethecase-skipaddingtopendingNotificationsnotification=notificationToAddnextCh=p.nextCh}else{//Thereisalreadyanotificationwaitingtobedispatchedp.pendingNotifications.WriteOne(notificationToAdd)}}}}
这个环节总结就是:
(1)AddEventHandler到sharedProcessor,注册事情回调函数到sharedProcessor
(2)listenerpop方法里会监听p.addCh,经过nextCh=p.nextCh将addCh将事情传递给p.nextCh
(3)listenerrun方法里会监听p.nextCh,收到信号之后,判别是属于什么类型的方法,并且执行前面注册的Handler
所以后面须要关注当资源对象出现变卦时,是如何将变卦信号给p.addCh,进一步触发回调函数的
3.4、Startallinformers
经过sharedInformers.Start(stopCh)启动一切的informer,代码如下:
//Startinitializesallrequestedinformers.func(f*sharedInformerFactory)Start(stopCh<-chanstruct{}){forinformerType,informer:=rangef.informers{if!f.startedInformers[informerType]{goinformer.Run(stopCh)f.startedInformers[informerType]=true}}}
咱们的例子中其实就只启动了PodInformer,接上去看到podInformer的Run方法做了什么
###goinformer.Run(stopCh):func(s*sharedIndexInformer)Run(stopCh<-chanstruct{}){deferutilruntime.HandleCrash()fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{//DeltafifoKnownObjects:s.indexer,EmitDeltaTypeReplaced:true,})cfg:=&Config{Queue:fifo,//DeltafifoListerWatcher:s.listerWatcher,//listerWatcherObjectType:s.objectType,FullResyncPeriod:s.resyncCheckPeriod,RetryOnError:false,ShouldResync:s.processor.shouldResync,//HandleDeltas,addedtoprocess,anddoneinprocessloopProcess:s.HandleDeltas,WatchErrorHandler:s.watchErrorHandler,}func(){...s.controller=New(cfg)...}s.controller.Run(stopCh)}###s.controller.Run(stopCh)func(c*controller)Run(stopCh<-chanstruct{}){r:=NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)c.reflector=r//Runreflectorwg.StartWithChannel(stopCh,r.Run)//RunprocessLoop,popfromdeltafifoanddoProcessFunc,//ProcessFuncisthes.HandleDeltasbeforewait.Until(c.processLoop,time.Second,stopCh)}
可以看到上方的逻辑首先生成一个DeltaFifo,而后接上去的逻辑分为两块,消费和消费:
(1)消费—r.Run:
关键的逻辑就是应用listandwatch将资源对象包含操作类型压入队列DeltaFifo
####r.Run:func(r*Reflector)Run(stopCh<-chanstruct{}){//执行listAndWatchiferr:=r.ListAndWatch(stopCh);}//执行ListAndWatch流程func(r*Reflector)ListAndWatch(stopCh<-chanstruct{})error{//1、list://(1)、listpods,实践调用的是podInformer里的ListFunc方法,//client.CoreV1().Pods(namespace).List(context.TODO(),options)r.listerWatcher.List(opts)//(2)、失掉资源版本号,用于watchresourceVersion=listMetaInterface.GetResourceVersion()//(3)、数据转换,转换成列表items,err:=meta.ExtractList(list)//(4)、将资源列表中的资源对象和版本号存储到DeltaFifo中r.syncWith(items,resourceVersion);//2、watch,有限循环去watchapiserver,当watch到事情的时刻,执行watchHandler将event事情压入fifofor{//(1)、watchpods,实践调用的是podInformer里的WatchFunc方法,//client.CoreV1().Pods(namespace).Watch(context.TODO(),options)w,err:=r.listerWatcher.Watch(options)//(2)、watchHandler//watchHandlerwatchespod,降级DeltaFifo消息,并且降级resourceVersioniferr:=r.watchHandler(start,w,&resourceVersion,resyncerrc,stopCh);}}###r.watchHandler//watchHandlerwatcheswandkeeps*resourceVersionuptodate.func(r*Reflector)watchHandler(starttime.Time,wwatch.Interface,resourceVersion*string,errcchanerror,stopCh<-chanstruct{})error{...loop:for{select{caseevent,ok:=<-w.ResultChan():newResourceVersion:=meta.GetResourceVersion()switchevent.Type{casewatch.Added:err:=r.store.Add(event.Object)//Addeventtosrore,store的详细方法在fifo中iferr!=nil{utilruntime.HandleError(fmt.Errorf("%s:unabletoaddwatcheventobject(%#v)tostore:%v",r.name,event.Object,err))}...}*resourceVersion=newResourceVersionr.setLastSyncResourceVersion(newResourceVersion)eventCount++}}...}###r.store.Add:##即为deltaFifo的add方法:func(f*DeltaFIFO)Add(objinterface{})error{...returnf.queueActionLocked(Added,obj)...}func(f*DeltaFIFO)queueActionLocked(actionTypeDeltaType,objinterface{})error{id,err:=f.KeyOf(obj)iferr!=nil{returnKeyError{obj,err}}newDeltas:=append(f.items[id],Delta{actionType,obj})newDeltas=dedupDeltas(newDeltas)iflen(newDeltas)>0{if_,exists:=f.items[id];!exists{f.queue=append(f.queue,id)}f.items[id]=newDeltasf.cond.Broadcast()//通知一切阻塞住的消费者}...returnnil}
(2)消费—c.processLoop:
消费逻辑就是从DeltaFifopop出对象,而后做两件事情:(1)触发前面注册的eventhandler(2)降级本地索引缓存indexer,坚持数据和etcd分歧
func(c*controller)processLoop(){for{obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))}}###Queue.Pop:##Queue.Pop是一个带有处置函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){for{//有限循环forlen(f.queue)==0{f.cond.Wait()//阻塞直到消费端broadcast方法通知}id:=f.queue[0]item,ok:=f.items[id]delete(f.items,id)err:=process(item)//执行处置方法ife,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)//假设处置失败的从新添加到fifo中从新处置err=e.Err}returnitem,err}}###c.config.Process:##c.config.Process是在初始化controller的时刻赋值的,即为前面的s.HandleDeltas###s.HandleDeltas:func(s*sharedIndexInformer)HandleDeltas(objinterface{})error{s.blockDeltas.Lock()defers.blockDeltas.Unlock()//fromoldesttonewestfor_,d:=rangeobj.(Deltas){switchd.Type{caseSync,Replaced,Added,Updated:s.cacheMutationDetector.AddObject(d.Object)ifold,exists,err:=s.indexer.Get(d.Object);err==nil&&exists{iferr:=s.indexer.Update(d.Object);err!=nil{returnerr}isSync:=falseswitch{cased.Type==Sync://SynceventsareonlypropagatedtolistenersthatrequestedresyncisSync=truecased.Type==Replaced:ifaccessor,err:=meta.Accessor(d.Object);err==nil{ifoldAccessor,err:=meta.Accessor(old);err==nil{//Replacedeventsthatdidn'tchangeresourceVersionaretreatedasresyncevents//andonlypropagatedtolistenersthatrequestedresyncisSync=accessor.GetResourceVersion()==oldAccessor.GetResourceVersion()}}}s.processor.distribute(updateNotification{oldObj:old,newObj:d.Object},isSync)}else{iferr:=s.indexer.Add(d.Object);err!=nil{returnerr}s.processor.distribute(addNotification{newObj:d.Object},false)}caseDeleted:iferr:=s.indexer.Delete(d.Object);err!=nil{returnerr}s.processor.distribute(deleteNotification{oldObj:d.Object},false)}}returnnil}
可以看到上方关键执行两局部逻辑:
s.processor.distribute
####s.processor.distribute:###例如新增通知:s.processor.distribute(addNotification{newObj:d.Object},false)###其中addNotification就是add类型的通知,前面会经过notification结构体的类型来执行不同的eventHandlerfunc(p*sharedProcessor)distribute(objinterface{},syncbool){p.listenersLock.RLock()deferp.listenersLock.RUnlock()ifsync{for_,listener:=rangep.syncingListeners{listener.add(obj)}}else{for_,listener:=rangep.listeners{listener.add(obj)}}}func(p*processorListener)add(notificationinterface{}){p.addCh<-notification//新增notification到addCh}
这里p.addCh对应到前面说的关注对象p.addCh,processorListener收到addCh信号之后传递给nextCh,而后经过notification结构体的类型来执行不同的eventHandler
s.indexer的增删改:
这个就是本地数据的缓存和索引,自定义控制逻辑外面会经过indexer失掉操作对象的详细参数,这里就不开展细讲了。
4、总结
至此一个informer的client-go局部的流程就走完了,可以看到启动informer关键流程就是:
1、ReflectorListAndWatch:
(1)经过一个reflectorrun起来一个带有list和watchapi的client
(2)list到的pod列表经过DeltaFifo存储,并降级最新的ResourceVersion
(3)继续监听pod,监听到的pod操作事情继续存储到DeltaFifo中
2、DeltaFifo消费和消费:
(1)消费:listandwatch到的事情消费压入队列DeltaFifo
(2)消费:执行注册的eventHandler,并降级本地indexer
所以informer实质其实就是一个经过deltaFifo建设消费消费机制,并且带有本地缓存和索引,以及可以注册回调事情的apiServer的客户端库。
5、参考
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。