跳到主要内容

08.indexer

什么是 informers

Kubernetes 控制器的重要作用是监视对象的期望状态和实际状态,然后发送指令以使系统实际状态与期望状态保持一致,这就需要控制器去频繁的实时获取系统中的信息,但是控制器如何获取对象的信息呢? 我们知道 Apiserver 能够提供集群的实时信息。但是,为获取有关资源的信息而进行的连续轮询可能会降低 Apiserver 的性能。为了随时了解这些事件何时被触发,client-go 提供了解决此问题的informer。informer查询资源数据并将其存储在本地缓存中。存储后,仅当检测到对象(或资源)状态的更改时,才会生成事件。

如果您对如何缓存信息感到困惑,这里有一个图表来解释该流程。

单个informer为自己创建一个本地缓存。但实际上,单个资源可以由多个控制器监视。如果每个控制器为自己创建一个缓存,则存在同步问题,因为多个控制器在自己的缓存上有一个监视。 client-go 提供了一个共享告密者,用于在所有控制器之间共享缓存。每个内置的 Kubernetes 资源都有一个 Informer。

informer 机制有三个部分:

  • Reflector :监视特定资源(如某些 CRD),并将 Added 、 Updated 和 Deleted 等事件放入本地缓存 DeltaFIFO 中
  • DeltaFIFO : 用于存储相关资源事件的 FIFO 队列
  • Indexer : 它是 client-go 实现的本地存储,与 etcd 保持一致,减轻了 API 服务器和 etcd 的压力。

Indexer

Indexer 是 Informer 中一部分,本节我们重点讨论 Indexer 这个组件原理以及源码。Indexer 和数据库类似索引类似,索引构建在存储之上,用来加速查询。 就本质上来说,Index 就是资源对象作为某一个维度的索引值和相关对象 id 集合的对应关系 下面我们来看下什么是 Indexer:

// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexers return the indexers
GetIndexers() Indexers

// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexers Indexers) error
}

我们看到 Indexer 其实是一个接口的名字,该接口组合了 Store 接口(负责存储数据)。在client-go中实现 Indexer 接口的结构体叫 cache ,我们来看下 cache 结构体的定义:

// `*cache` implements Indexer in terms of a ThreadSafeStore and an
// associated KeyFunc.
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFunc KeyFunc
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj)
return nil
}

当我们调用 Indexer 接口的Add方法时,我们首先调用 KeyFunc 来获取对象的 Key(通常是namespace/name) ,然后会调用 c.cacheStorage.Add(key, obj) 方法,该方法底层会先把对象存储到内存,然后重建索引,我们主要着重看下重建索引的部分,这也是网上大部分文章在分析的重点,:

// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}


// Index maps the indexed value to a set of keys in the store that match on that value
type Index map[string]sets.String

// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc
type IndexFunc func(obj interface{}) ([]string, error)

// Indices maps a name to an Index
type Indices map[string]Index


func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
var err error
for name, indexFunc := range i.indexers {
if oldObj != nil {
oldIndexValues, err = indexFunc(oldObj)
} else {
oldIndexValues = oldIndexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}

if newObj != nil {
indexValues, err = indexFunc(newObj)
} else {
indexValues = indexValues[:0]
}
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}

index := i.indices[name]
if index == nil {
index = Index{}
i.indices[name] = index
}

if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
// We optimize for the most common case where indexFunc returns a single value which has not been changed
continue
}

for _, value := range oldIndexValues {
i.deleteKeyFromIndex(key, value, index)
}
for _, value := range indexValues {
i.addKeyToIndex(key, value, index)
}
}

上面代码中比较重要的是storeIndex,这个结构体有两个成员: Indexers 和 Indices 。Indexers 负责存储索引方法映射关系,在执行NewIndexer 构造函数时候会传 Indexers。 Indices 负责存储 索引和对象的映射关系,方便日后查询,我们来看下面一段具体示例:

    index := NewIndexer(MetaNamespaceKeyFunc, Indexers{"byUser": testUsersIndexFunc})

pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}

index.Add(pod1)
index.Add(pod2)
index.Add(pod3)

indexResults, err := index.ByIndex("byUser", "ernie")
fmt.Println(indexResults)

一图胜千言:

结构

Indexers包含了所有索引器(索引分类)及其索引器函数IndexFunc,IndexFunc为计算某个索引键下的所有对象键列表的方法;

Indexers: {  
"索引器1": 索引函数1,
"索引器2": 索引函数2,
}

数据示例:

Indexers: {  
"namespace": MetaNamespaceIndexFunc,
"nodeName": NodeNameIndexFunc,
}
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}

func NodeNameIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{""}, fmt.Errorf("object is not a pod)
}
return []string{pod.Spec.NodeName}, nil
}

Indices包含了所有索引器(索引分类)及其所有的索引数据Index;而Index则包含了索引键以及索引键下的所有对象键的列表;

Indices: {
"索引器1": {
"索引键1": ["对象键1", "对象键2"],
"索引键2": ["对象键3"],
},
"索引器2": {
"索引键3": ["对象键1"],
"索引键4": ["对象键2", "对象键3"],
}
}

pod1 := &v1.Pod {
ObjectMeta: metav1.ObjectMeta {
Name: "pod-1",
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node1",
}
}

pod2 := &v1.Pod {
ObjectMeta: metav1.ObjectMeta {
Name: "pod-2",
Namespace: "default",
},
Spec: v1.PodSpec{
NodeName: "node2",
}
}

pod3 := &v1.Pod {
ObjectMeta: metav1.ObjectMeta {
Name: "pod-3",
Namespace: "kube-system",
},
Spec: v1.PodSpec{
NodeName: "node2",
}
}

Indices: {
"namespace": {
"default": ["pod-1", "pod-2"],
"kube-system": ["pod-3"],
},
"nodeName": {
"node1": ["pod-1"],
"node2": ["pod-2", "pod-3"],
}
}

函数

一开始提到Indexer interface,除了继承的Store外,其他的几个方法声明均与索引功能相关,下面对几个常用方法进行介绍。

// staging/src/k8s.io/client-go/tools/cache/index.go
type Indexer interface {
Store

Index(indexName string, obj interface{}) ([]interface{}, error)

IndexKeys(indexName, indexedValue string) ([]string, error)

ListIndexFuncValues(indexName string) []string

ByIndex(indexName, indexedValue string) ([]interface{}, error)

GetIndexers() Indexers

AddIndexers(newIndexers Indexers) error
}

下面的方法介绍基于以下数据:

Indexers: {  
"namespace": MetaNamespaceIndexFunc,
"nodeName": NodeNameIndexFunc,
}

Indices: {
"namespace": {
"default": ["pod-1", "pod-2"],
"kube-system": ["pod-3"],
},
"nodeName": {
"node1": ["pod-1"],
"node2": ["pod-2", "pod-3"],
}
}

ByIndex(indexName, indexedValue string) ([]interface{}, error)

调用ByIndex方法,传入索引器名称indexName,以及索引键名称indexedValue,方法寻找该索引器下,索引键对应的对象键列表,然后根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

// staging/src/k8s.io/client-go/tools/cache/store.go
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error) {
return c.cacheStorage.ByIndex(indexName, indexKey)
}

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

index := c.indices[indexName]

set := index[indexKey]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}

return list, nil
}

使用示例:

pods, err := index.ByIndex("namespace", "default")
if err != nil {
panic(err)
}
for _, pod := range pods {
fmt.Println(pod.(*v1.Pod).Name)
}

fmt.Println("=====")

pods, err := index.ByIndex("nodename", "node1")
if err != nil {
panic(err)
}
for _, pod := range pods {
fmt.Println(pod.(*v1.Pod).Name)
}

输出:

pod-1
pod-2
=====
pod-1

IndexKeys(indexName, indexedValue string) ([]string, error)

IndexKeys方法与ByIndex方法类似,只不过只返回对象键列表,不会根据对象键列表,到Indexer缓存(即threadSafeMap中的items属性)中获取出相应的对象列表。

// staging/src/k8s.io/client-go/tools/cache/store.go
func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error) {
return c.cacheStorage.IndexKeys(indexName, indexKey)
}

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
c.lock.RLock()
defer c.lock.RUnlock()

indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}

index := c.indices[indexName]

set := index[indexKey]
return set.List(), nil
}

AddIndexers(newIndexers Indexers) error

添加更多的索引器函数

总结

ndexer中有informer维护的指定资源对象的相对于etcd数据的一份本地内存缓存,可通过该缓存获取资源对象,以减少对apiserver、对etcd的请求压力。

informer所维护的缓存依赖于threadSafeMap结构体中的items属性,其本质上是一个用map构建的键值对,资源对象都存在items这个map中,key为资源对象的namespace/name组成,value为资源对象本身,这些构成了informer的本地缓存。

Indexer除了维护了一份本地内存缓存外,还有一个很重要的功能,便是索引功能了。索引的目的就是为了快速查找,比如我们需要查找某个node节点上的所有pod、查找某个命名空间下的所有pod等,利用到索引,可以实现快速查找。关于索引功能,则依赖于threadSafeMap结构体中的indexers与indices属性