跳到主要内容

03.cache

在client-go中,informer从DeltaFifo中取出基于list-watch机制捕获的资源对象之后,首先会将资源对象存储到本地缓存indexer中。这里对indexer做出简单的介绍

Indexer就是带索引功能的本地缓存。Indexer继承了通用存储接口Store。因此,在介绍Indexer之前,对通用接口Store做出简单介绍。

store Interface

// Store is a generic object storage and processing interface.  A
// Store holds a map from string keys to accumulators, and has
// operations to add, update, and delete a given object to/from the
// accumulator currently associated with a given key. A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
//
// In the simplest Store implementations each accumulator is simply
// the last given object, or empty after Delete, and thus the Store's
// behavior is simple storage.
//
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
type Store interface {

// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error

// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error

// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error

// List returns a list of all the currently non-empty accumulators
List() []interface{}

// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string

// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)

// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)

// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error

// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}

由接口定义可知,Store中定义的方法均是通用的存储和处理接口。但值得注意的是,在Store接口中出现了key,这是资源对象的存储键。对象在被存储到缓存中时,首先会调用KeyFunc方法为资源对象生成存储键。每个存储键可以唯一地标识缓存中存储的资源对象,可以通过此存储键而查找对应的资源对象。

与此同时,在Store接口的实现类中均会维护一个map用于存储资源对象。此map的key就是对象的存储键,value就是此存储对象。

在client-go中,将资源对象存储到缓存中时,会调用函数方法为资源对象生成存储键。函数方法类型如下所示:

type ExplicitKey string
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)

Store只是一个通用接口,只是定义了存储资源对象的最基本方法。在client-go中,并没有单独的数据类型来实现此接口。Store接口往往与其他方法共同组合成为存储和处理的新接口,如Indexer、DeltaFIFO,并提供专门的实现。这里将Store抽象出来会使代码更有层的概念。

Indexer Interface

Indexer接口扩展了通用接口Store

// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}

在Indexer中,存在三种字符串类型的数据

存储键:这个和Store接口中定义相同,由KeyFunc为资源对象生成的存储键。

索引名称和索引值:可以用IndexFunc为资源对象生成索引名称下对应的索引值,也可以是通过其它对象计算方法计算得到。由函数定义可知,通过IndexFunc可能会为一个资源对象生成多个索引值

Indexer 接口包含以下方法:

  • Index(indexName string, obj interface{}) ([]interface{}, error):返回给定对象的索引值集与存储对象的索引值集相交的存储对象,针对指定的索引名称。
  • IndexKeys(indexName, indexedValue string) ([]string, error):返回存储对象的存储键,这些对象的指定索引名称的索引值集包含给定的索引值。
  • ListIndexFuncValues(indexName string) []string:返回给定索引的所有索引值。
  • ByIndex(indexName, indexedValue string) ([]interface{}, error):返回指定索引名称的索引值集包含给定索引值的存储对象。
  • GetIndexers() Indexers:返回索引器。
  • AddIndexers(newIndexers Indexers) error:向该存储中添加更多索引器。如果在已经有数据的存储中调用此方法,结果是未定义的。
  • IndexFunc 类型定义了一个函数,该函数接受一个对象并返回由该对象计算出的一组索引值。这类函数用于根据需要对对象进行多个不同类型的索引。

作为初学者,我们主要是要理解这三个结构,可以进行代码调试

cache

进一步看Indexer的实现结构体cache

cache是线程安全的,它继承并实现了Indexer接口,其中keyFunc用于存储资源对象时生成存储键。关于cache在Store接口的实现,这里不再赘述,看源代码即可明白,这里给出Add函数代码。

cache 结构体实现了 Store 和 Indexer 接口。它使用 ThreadSafeStore 作为缓存存储,确保线程安全。keyFunc 用于从对象中生成键。

NewStore 和 NewIndexer 分别返回实现了 Store 和 Indexer 接口的 cache 实例。这些实例可以用于存储、更新、删除、查询对象,并根据索引器进行索引操作。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}

// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index

Index类型实例对象的名称为索引名称,其map的key为索引值,其value为存储键集合。每一个键值对元素都表示符合该索引值的所有存储对象;那么Index类型实例对象就表示了此索引名称下所有的索引值,以及不同索引值下的存储对象。

所有的存储对象都存在于缓存中,按照不同索引名称代表的索引方法进行计算得到不同的Index实例对象。Indices可以理解为多叉树,树的每一个分支都是一个Index实例对象,也就代表着一种索引方法建立的索引结果集。可以理解为Indices代表的是对缓存中所有存储对象按照不同的索引方法进行分类罢了。

Indexers中维护了索引名称与建立该索引名称对应函数方法的映射关系。

再看threadSafeMap关于Index的实现

// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
}
index := c.indices[indexName]

var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}

list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}

此方法执行流程

  • 会先从indexers查找到此IndexName对应的索引函数,使用此索引函数计算出obj的索引值。
  • 从indices中取出该IndexName对应的index实例对象,即按照indexName代表索引方法建立的索引结果。
  • 最后经过聚合,返回符合该索引值的所有对象。

DeltaFIFO

讲解DeltaFIFO,自然要先从接口Queue开始。查看定义:

// Queue extends Store with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {
Store

// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
// In the former case Pop atomically picks one key to process,
// removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error)

// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error

// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool

// Close the queue
Close()
}

Queue接口扩展了Store接口。在Store接口中定义了存储和处理的通用方法方法,具体描述可以参考前面关于Indexer。这里重点讲述Pop方法,它是理解后续内容的关键。

        // Pop方法在队列中没有元素时会一直阻塞。或者当Pop操作返回错误时,结束阻塞
// 当队列中存在元素时,Pop方法会从取出对象的存储键、然后将对象从缓存中删除、最后进行处理。
// 在上述处理过程中都属于原子操作
// 由于对象是从Store中取出并删除缓存后,再进行处理。
// 所以当处理失败时,处理函数可能会返回ErrRequeue类型错误,此时需要对此对象重新入队,等待后续处理
// 对元素执行处理逻辑的定义在PopProcessFunc中
Pop(PopProcessFunc) (interface{}, error)

在Pop方法中,入参为PopProcessFunc类型的方法,此方法中定义了对DeltaFIFO中弹出元素的处理逻辑。

// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error

为了更好理解DeltaFIFO,这里先对FIFO结构体进行讲解。DeltaFIFO和FIFO结构体都实现了Queue接口,先明白FIFO后,再明白DeltaFIFO就会比较容易。

// FIFO is a Queue in which (a) each accumulator is simply the most
// recently provided object and (b) the collection of keys to process
// is a FIFO. The accumulators all start out empty, and deleting an
// object from its accumulator empties the accumulator. The Resync
// operation is a no-op.
//
// Thus: if multiple adds/updates of a single object happen while that
// object's key is in the queue before it has been processed then it
// will only be processed once, and when it is processed the most
// recent version will be processed. This can't be done with a channel
//
// FIFO solves this use case:
// - You want to process every object (exactly) once.
// - You want to process the most recent version of the object when you process it.
// - You do not want to process deleted objects, they should be removed from the queue.
// - You do not want to periodically reprocess objects.
//
// Compare with DeltaFIFO for other use cases.
type FIFO struct {
lock sync.RWMutex
cond sync.Cond

// 在Store接口中提到,Store接口的实现中都会维护这样一个map,键为存储键,值为存储对象
items map[string]interface{}
// queue决定了存储对象被处理的顺序
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// 记录当Replace()方法被调用时队列中的对象数量
initialPopulationCount int

// 用于在对象存入缓存中时生成对象的存储键
keyFunc KeyFunc

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closed bool
}

由FIFO的定义可知,FIFO存储的数据类型为interface{}, 可以存储任意类型的存储对象。其Pop实现

// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is processed,
// so if you don't successfully process it, it should be added back with
// AddIfNotPresent(). process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}

在Pop方法中,维持了一个for循环。此循环在队列中元素个数为0时会调用sync.Cond的wait()方法进行阻塞等待。在进入Pop方法中时,首先会调用sync.RWMutex的lock方法进行上锁,保证了整个操作的原子性。在循环中,主要有以下几个步骤:

  • 检查队列queue中元素个数,当元素数量为0时阻塞等待
  • 获取队首存储键,并执行出队操作
  • 如果initialPopulationCount大于0,表示replace()方法初次调用,对initialPopulationCount减一操作
  • 通过存储键从缓存items中取出存储对象
  • 调用Pop函数的入参process方法进行处理存储对象
  • 校验处理结果,若处理错误类型为ErrRequeue,则经过参数校验后对该存储对象再次入队

有了FIFO的铺垫之后,再查看Queue的另一个实现:DeltaFIFO。

由于DeltaFIFO队列中的存储对象是Deltas,这里先看Deltas的定义

// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// Replaced is emitted when we encountered watch errors and had to do a
// relist. We don't know if the replaced object has changed.
//
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
Replaced DeltaType = "Replaced"
// Sync is for synthetic events during a periodic resync.
Sync DeltaType = "Sync"
)

type Delta struct {
Type DeltaType
Object interface{}
}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta

由上可知,Delta结构体包含两个成员变量,表示存储对象在经过变动类型为Type的一次变更事件后,其状态变更为Object(delete类型不在此范围内)。Deltas数组包含了同一个存储对象在多次变动事件中的变化历程。下标为0的Delta表示了存储对象最老的那个变动状态,最后一个元素表示了存储对象的最新状态。

再查看DeltaFIFO定义

type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond

// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas

// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int

// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc

// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter

// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closed bool

// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool
}

可以发现,DeltaFIFO相对于FIFO而言,其存储对象类型由interface{}变化成为了Deltas。

initialPopulationCount则是记录了当第一次调用Replace()方法时缓存中的元素个数;knownObjects对象后续讲解。

DeltaFIFO的Pop函数

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed
}

f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
delete(f.items, id)
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
}
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

由函数实现可知,DeltaFIFO的Pop函数与FIFO的Pop函数实现相类似,且Deltas对象弹出后执行的处理逻辑也是在PopProcessFunc定义。唯一不同的地方在于不同的是,当queue中元素数量超过10时会进行log记录。