06.informer
Informer创建
首先,我们回顾下Informer创建的代码流程,比较简单,仅需要两行代码:
- 第一行根据参数创建 sharedInformerFactory对象,这个逻辑很简单,大家直接看代码即可
- 第二行,通过sharedInformerFactory 创建Informer对象,这个流程中涉及到多个关键对象的创建,我们来细致分解下
factory := informers.NewSharedInformerFactory(clientset, 0)
informer := factory.Core().V1().Pods().Informer()
- 由于我们创建的是共享型的Informer Factory,因此在创建Informer时,首先会判断factory的informers中是否已经存在对应的informer type对象,如果存在则不再创建
- 如果不存在,则调用newFunc函数进行创建,帮放入informers map中。
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
// 判断是否已经存在informerType
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 执行informer的创建逻辑
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
newFunc的实现函数如下,由于我们创建的是podInformer,因此会来到podInformer的defaultInformer实现中
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
PodInformer的真实创建通过如下函数来完成,在这个阶段我们重点关注下ListWatch的创建。
可以清晰的看到,ListFunc的实现是通过调用k8s client的List接口来实现的,同样WatchFunc是通过k8s client的Watch接口来实现
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
我们在深入一层查看NewSharedIndexInformer的实现,到这里就看到了最底层的创建逻辑了,几个上文提到的关键对象陆续出场了:
- processor:上文提到是用来处理资源对象event handler的模块,这里构建为sharedProcessor,后面我们会对其进一步分析
- indexer:上文提到的资源对象cache;ListWatch获得的资源对象信息会更新到该indexer中,Lister查询资源对象信息也是从这里获取
- listerWatcher:上文已经提到,SharedIndexInformer通过其与API Server进行交互,获取最新的资源对象信息
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
Informer Factory启动
sharedInformerFactory的Start函数实现如下所示:
- 逻辑很更简单,遍历factory中的每一个informer,并在通过协程调用inform的Run函数进行启动;
- 因此,Factory的start本质是其管理的所有informer的start;
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
下面我们以 sharedIndexInformer为例,来看看Informer的Run函数都做了哪些事情:
- 根据参数创建一个DeltaFIFO对象,Reflector会将监听到的资源信息存储到DeltaFIFO中,Controller则不断从DeltaFIFO中Pop资源对象进行处理
- 创建config,该config用于controller的创建
- 根据config的参数来创建controller
- 启动processor
- 启动controller
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 1. 创建DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// 2. 创建controller的config
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
// 3. 创建controller
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 4. 启动processer
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 5. 启动controller
s.controller.Run(stopCh)
}
到本小节,基本所有Informer的关键对象都已经出现过了,接下来我们在看看这些对象之间是如何配合来实现Informer的功能的
Controller
上文已经提到controller的两个主要功能:
其首先不断的从DeltaFIFO中Pop Deltas(先理解为是一个资源的Event即可), 然后将Event的变化一方面同步到Indexer中(也就是cache),另一方面来触发用户在informer.AddEventHandler 注册的Handler。
但是在代码层面上,Reflector的实现也放在了Controller中。我们来看下controller 核心Run函数。其主要逻辑如下:
创建Reflector,这里可以看到将上层创建的ListerWatcher和Queue传给了Reflector 调用Reflector的Run函数启动Reflector主流程,其实现中会通过ListerWatcher获得最新的资源对象信息,然后存储到Queue中 controller的processLoop执行Controller的主流程,其不断从Queue中Pop Event Object,然后分别丢给Indexer和processer进行处理
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1. 创建Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 2. 启动Reflector
wg.StartWithChannel(stopCh, r.Run)
// 3. 执行processLoop
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
Controller的processLoop的行为同名称保持一致,在一个Loop中不断的从DeltaFIFO中获取Item进行处理
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
- DeltaFIFO每次Pop出来的是当前这个对象的所有event 集合,因此是一个Deltas;
- Delta的Type属性描述了Event的类型,当前支持的有Added、Updated、Deleted、Replaced、Sync、
- Delta的Object属性,描述了对应资源Object的信息,对于Added和Updated为new Object的信息,Deleted为删除前Object的信息
- 对于Event为Sync, Replaced, Added, Updated的情况
- 若Indexer中已经存在,则使用最新的Object来更新Indexer,然后给processer发送update notification 并指定old 和new两个object
- 若Indexser中不存在,则将Object添加到Indexer中,然后给processer发送add notification并指定new object
- 对于Event为Delete的情况
- 在Index中删除对应Object,然后给proceser发送delete notification并指定删除的Object
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
// 处理resyncPeriod
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
if s.started {
klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
// 创建listener
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
// 在informer还没有调用run的时候,直接添加listener到processor
if !s.started {
s.processor.addListener(listener)
return
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// 此时informer已经调用过run了,若再有新的handler register,需要将index中的所有object从新按照add event添加一次
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
event handler 调用
如前文所述,Event Handler的调用由processor来完成,在代码层面则对应到sharedProcessor 对象。前文在介绍Informer 启动的逻辑中已经说明,sharedIndexInformer的run接口中会通过调用sharedProcessor的run接口来启动processor的处理流程。因此,Event Handler的调用是在sharedProcessor的run接口中完成的。
下面我们来看下sharedProcessor run接口的实现
- 其会针对processor中的每个listener启动两个goroutine,分别是listener.run和listerner.pop
- listerner.pop会接收controller 发送的event notification,然后交给listener.run执行event handler
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// 启动listener的run逻辑
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}