



store Interface

// Store is a generic object storage and processing interface.  A
// Store holds a map from string keys to accumulators, and has
// operations to add, update, and delete a given object to/from the
// accumulator currently associated with a given key. A Store also
// knows how to extract the key from a given object, so many operations
// are given only the object.
// In the simplest Store implementations each accumulator is simply
// the last given object, or empty after Delete, and thus the Store's
// behavior is simple storage.
// Reflector knows how to watch a server and update a Store. This
// package provides a variety of implementations of Store.
type Store interface {

// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error

// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error

// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error

// List returns a list of all the currently non-empty accumulators
List() []interface{}

// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string

// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)

// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)

// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error

// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error




type ExplicitKey string
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)


Indexer Interface


// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface {
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error




Indexer 接口包含以下方法:

  • Index(indexName string, obj interface{}) ([]interface{}, error):返回给定对象的索引值集与存储对象的索引值集相交的存储对象,针对指定的索引名称。
  • IndexKeys(indexName, indexedValue string) ([]string, error):返回存储对象的存储键,这些对象的指定索引名称的索引值集包含给定的索引值。
  • ListIndexFuncValues(indexName string) []string:返回给定索引的所有索引值。
  • ByIndex(indexName, indexedValue string) ([]interface{}, error):返回指定索引名称的索引值集包含给定索引值的存储对象。
  • GetIndexers() Indexers:返回索引器。
  • AddIndexers(newIndexers Indexers) error:向该存储中添加更多索引器。如果在已经有数据的存储中调用此方法,结果是未定义的。
  • IndexFunc 类型定义了一个函数,该函数接受一个对象并返回由该对象计算出的一组索引值。这类函数用于根据需要对对象进行多个不同类型的索引。





cache 结构体实现了 Store 和 Indexer 接口。它使用 ThreadSafeStore 作为缓存存储,确保线程安全。keyFunc 用于从对象中生成键。

NewStore 和 NewIndexer 分别返回实现了 Store 和 Indexer 接口的 cache 实例。这些实例可以用于存储、更新、删除、查询对象,并根据索引器进行索引操作。

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}

// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]Index





// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)

indexedValues, err := indexFunc(obj)
if err != nil {
return nil, err
index := c.indices[indexName]

var storeKeySet sets.String
if len(indexedValues) == 1 {
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet = index[indexedValues[0]]
} else {
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {

list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
return list, nil


  • 会先从indexers查找到此IndexName对应的索引函数,使用此索引函数计算出obj的索引值。
  • 从indices中取出该IndexName对应的index实例对象,即按照indexName代表索引方法建立的索引结果。
  • 最后经过聚合,返回符合该索引值的所有对象。



// Queue extends Store with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {

// Pop blocks until there is at least one key to process or the
// Queue is closed. In the latter case Pop returns with an error.
// In the former case Pop atomically picks one key to process,
// removes that (key, accumulator) association from the Store, and
// processes the accumulator. Pop returns the accumulator that
// was processed and the result of processing. The PopProcessFunc
// may return an ErrRequeue{inner} and in this case Pop will (a)
// return that (key, accumulator) association to the Queue as part
// of the atomic processing and (b) return the inner error from
// Pop.
Pop(PopProcessFunc) (interface{}, error)

// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error

// HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, AddIfNotPresent,
// Update, or Delete; otherwise the first batch is empty.
HasSynced() bool

// Close the queue


        // Pop方法在队列中没有元素时会一直阻塞。或者当Pop操作返回错误时,结束阻塞
// 当队列中存在元素时,Pop方法会从取出对象的存储键、然后将对象从缓存中删除、最后进行处理。
// 在上述处理过程中都属于原子操作
// 由于对象是从Store中取出并删除缓存后,再进行处理。
// 所以当处理失败时,处理函数可能会返回ErrRequeue类型错误,此时需要对此对象重新入队,等待后续处理
// 对元素执行处理逻辑的定义在PopProcessFunc中
Pop(PopProcessFunc) (interface{}, error)


// PopProcessFunc is passed to Pop() method of Queue interface.
// It is supposed to process the accumulator popped from the queue.
type PopProcessFunc func(interface{}) error


// FIFO is a Queue in which (a) each accumulator is simply the most
// recently provided object and (b) the collection of keys to process
// is a FIFO. The accumulators all start out empty, and deleting an
// object from its accumulator empties the accumulator. The Resync
// operation is a no-op.
// Thus: if multiple adds/updates of a single object happen while that
// object's key is in the queue before it has been processed then it
// will only be processed once, and when it is processed the most
// recent version will be processed. This can't be done with a channel
// FIFO solves this use case:
// - You want to process every object (exactly) once.
// - You want to process the most recent version of the object when you process it.
// - You do not want to process deleted objects, they should be removed from the queue.
// - You do not want to periodically reprocess objects.
// Compare with DeltaFIFO for other use cases.
type FIFO struct {
lock sync.RWMutex
cond sync.Cond

// 在Store接口中提到,Store接口的实现中都会维护这样一个map,键为存储键,值为存储对象
items map[string]interface{}
// queue决定了存储对象被处理的顺序
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// 记录当Replace()方法被调用时队列中的对象数量
initialPopulationCount int

// 用于在对象存入缓存中时生成对象的存储键
keyFunc KeyFunc

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closed bool

由FIFO的定义可知,FIFO存储的数据类型为interface{}, 可以存储任意类型的存储对象。其Pop实现

// Pop waits until an item is ready and processes it. If multiple items are
// ready, they are returned in the order in which they were added/updated.
// The item is removed from the queue (and the store) before it is processed,
// so if you don't successfully process it, it should be added back with
// AddIfNotPresent(). process function is called under lock, so it is safe
// update data structures in it that need to be in sync with the queue.
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed

id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
return item, err


  • 检查队列queue中元素个数,当元素数量为0时阻塞等待
  • 获取队首存储键,并执行出队操作
  • 如果initialPopulationCount大于0,表示replace()方法初次调用,对initialPopulationCount减一操作
  • 通过存储键从缓存items中取出存储对象
  • 调用Pop函数的入参process方法进行处理存储对象
  • 校验处理结果,若处理错误类型为ErrRequeue,则经过参数校验后对该存储对象再次入队



// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string

// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// Replaced is emitted when we encountered watch errors and had to do a
// relist. We don't know if the replaced object has changed.
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
Replaced DeltaType = "Replaced"
// Sync is for synthetic events during a periodic resync.
Sync DeltaType = "Sync"

type Delta struct {
Type DeltaType
Object interface{}

// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta



type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond

// `items` maps a key to a Deltas.
// Each such Deltas has at least one Delta.
items map[string]Deltas

// `queue` maintains FIFO order of keys for consumption in Pop().
// There are no duplicates in `queue`.
// A key is in `queue` if and only if it is in `items`.
queue []string

// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update/AddIfNotPresent was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int

// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc

// knownObjects list keys that are "known" --- affecting Delete(),
// Replace(), and Resync()
knownObjects KeyListerGetter

// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRUD operations.
closed bool

// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
// DeltaType when Replace() is called (to preserve backwards compat).
emitDeltaTypeReplaced bool




func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.closed {
return nil, ErrFIFOClosed

id := f.queue[0]
f.queue = f.queue[1:]
depth := len(f.queue)
if f.initialPopulationCount > 0 {
item, ok := f.items[id]
if !ok {
// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
delete(f.items, id)
// Only log traces if the queue depth is greater than 10 and it takes more than
// 100 milliseconds to process one item from the queue.
// Queue depth never goes high because processing an item is locking the queue,
// and new items can't be added until processing finish.
// https://github.com/kubernetes/kubernetes/issues/103789
if depth > 10 {
trace := utiltrace.New("DeltaFIFO Pop Process",
utiltrace.Field{Key: "ID", Value: id},
utiltrace.Field{Key: "Depth", Value: depth},
utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
defer trace.LogIfLong(100 * time.Millisecond)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
