跳到主要内容

04.refactor

介绍

我们接下来看看 refactor.go 这个文件

Reflector 是保证 Informer 可靠性的核心组件,在丢失事件,收到异常事件,处理事件失败等多种异常情况下需要考虑的细节很多。单独的listwatcher缺少重新连接和重新同步机制,有可能出现数据不一致问题。其对事件响应是同步的,如果执行复杂的操作会引起阻塞,需要引入队列。

Reflector可以成为反射器,将etcd中的数据反射到存储(DeltaFIFO)中。Reflector通过其内部的List操作获取所有资源对象数据,保存到本地存储,之后Watch监视资源变化,触发对应事件处理,例如Add、Update、Delete等。

结构体

// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// 标识此 Reflector 的名称,默认为文件名和行号
name string

// 我们期望存储在存储中的类型的名称。如果提供了 expectedGVK,则名称将是其字符串化形式,否则为 expectedType 的字符串化形式。仅用于显示,不应用于解析或比较
expectedTypeName string
// 我们期望存储在存储中的类型的示例对象。只有 type 需要正确,但当类型为 unstructured.Unstructured 时,对象的 "apiVersion" 和 "kind" 也必须正确。
expectedType reflect.Type
// 如果对象类型为 unstructured,我们期望存储在存储中的对象的 GroupVersionKind
expectedGVK *schema.GroupVersionKind
// 与监视源同步的目标
store Store
// 用于执行列表和监视操作的对象。
listerWatcher ListerWatcher

// 管理 ListWatch 的退避
backoffManager wait.BackoffManager
// 管理与 ListAndWatch 的 Watch 调用的初始连接的退避。
initConnBackoffManager wait.BackoffManager

//重新同步周期。
resyncPeriod time.Duration
// 定期调用此函数,当返回 true 时,将调用 Store 的 Resync 操作。
ShouldResync func() bool
// 允许测试操纵时间的时钟对象。
clock clock.Clock
// 定义是否应为列表调用强制分页。根据初始列表调用的结果进行设置。
paginatedResult bool
// 上次同步时观察到的资源版本令牌,与底层存储同步,但是线程安全的
lastSyncResourceVersion string
// 如果上次使用 lastSyncResourceVersion 的列表或监视请求由于“过期”或“资源版本过大”错误而失败,则为 true
isLastSyncResourceVersionUnavailable bool
// 保护对 lastSyncResourceVersion 的读/写访问的互斥锁。
lastSyncResourceVersionMutex sync.RWMutex
// 初始和重新同步监视列表的请求块大小。如果未设置,对于一致性读取(RV = "")或选择任意旧数据的读取(RV = "0"),它将默认为 pager.PageSize,对于其他情况(RV != "" && RV != "0"),它将关闭分页以允许从监视缓存中提供服务。
WatchListPageSize int64
//当 ListAndWatch 因错误而中断连接时调用的函数
watchErrorHandler WatchErrorHandler

Run流程图

详细

// ResourceVersionUpdater is an interface that allows store implementation to
// track the current resource version of the reflector. This is especially
// important if storage bookmarks are enabled.
type ResourceVersionUpdater interface {
// UpdateResourceVersion is called each time current resource version of the reflector
// is updated.
UpdateResourceVersion(resourceVersion string)
}

ResourceVersionUpdater 接口:允许存储实现跟踪 reflector 的当前资源版本。当启用存储书签时,这尤为重要。接口定义了一个方法 UpdateResourceVersion,每次 reflector 的当前资源版本更新时都会调用它。

// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// Implementations of this handler may display the error message in other
// ways. Implementations should return quickly - any expensive processing
// should be offloaded.
type WatchErrorHandler func(r *Reflector, err error)

// DefaultWatchErrorHandler is the default implementation of WatchErrorHandler
func DefaultWatchErrorHandler(r *Reflector, err error) {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
}

WatchErrorHandler 类型:一个函数类型,当 ListAndWatch 由于错误而断开连接时调用。调用此处理程序后,informer 将退避并重试。默认实现根据错误类型尝试以适当的级别记录错误消息。此处理程序的实现可以以其他方式显示错误消息。实现应快速返回 - 任何昂贵的处理都应卸载。

DefaultWatchErrorHandler 函数:是 WatchErrorHandler 的默认实现。

// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
// The indexer is configured to key on namespace
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
return indexer, reflector
}

// NewReflector creates a new Reflector object which will keep the
// given store up to date with the server's contents for the given
// resource. Reflector promises to only put things in the store that
// have the type of expectedType, unless expectedType is nil. If
// resyncPeriod is non-zero, then the reflector will periodically
// consult its ShouldResync function to determine whether to invoke
// the Store's Resync operation; `ShouldResync==nil` means always
// "yes". This enables you to use reflectors to periodically process
// everything as well as incrementally processing the things that
// change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

NewNamespaceKeyedIndexerAndReflector 函数:创建一个配置为基于命名空间键的 Indexer 和一个 Reflector。

NewReflector 函数:创建一个新的 Reflector 对象,该对象将保持给定的存储与服务器的内容保持一致。Reflector 保证只将具有 expectedType 类型的对象放入存储,除非 expectedType 为 nil。如果 resyncPeriod 不为零,则 reflector 将定期调用其 ShouldResync 函数,以确定是否调用 Store 的 Resync 操作;ShouldResync == nil 表示始终为 "yes"。这使您能够使用 reflector 来定期处理所有内容以及增量处理发生更改的内容。

NewNamedReflector 函数:与 NewReflector 相同,但具有指定的用于记录的名称。

// Run repeatedly uses the reflector's ListAndWatch to fetch all the
// objects and subsequent deltas.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

这段代码定义了一个名为 Run 的方法,它属于 Reflector 结构体。该方法使用反射器的 ListAndWatch 方法重复获取所有对象及其后续增量。当 stopCh 被关闭时,Run 将退出。

以下是此方法的详细解释:

在开始运行 reflector 时,会使用 klog.V(3).Infof() 打印日志,记录 reflector 的开始、期望的类型名和反射器名称。

wait.BackoffUntil() 是一个带退避策略的无限循环,直到 stopCh 被关闭。该函数接受一个函数、一个退避管理器、一个布尔值(指示是否在启动时立即执行函数),以及一个表示停止信号的通道。在这个例子中,它会重复执行以下操作:

调用 r.ListAndWatch(stopCh),尝试获取对象及其后续增量。如果执行过程中发生错误,返回该错误。 调用 r.watchErrorHandler(r, err) 来处理 ListAndWatch 返回的错误。错误处理器可能会记录错误或采取其他措施。 当 stopCh 被关闭时,wait.BackoffUntil() 函数将退出,最后使用 klog.V(3).Infof() 打印日志,记录 reflector 停止的信息。

总之,Run 方法用于启动 reflector,它会持续运行,直到接收到停止信号。在运行过程中,它将尝试获取对象及其后续增量,并处理可能出现的错误

核心函数 ListAndWatch

这段代码实现了 Reflector 结构体的 ListAndWatch 方法,该方法首先列出所有对象及其资源版本,然后使用资源版本进行监听。以下是代码的主要部分及其功能:

ListAndWatch 方法接收一个 stopCh 通道作为参数,用于在需要停止的时候通知方法终止。

定义一个名为 resourceVersion 的变量,用于存储资源版本。

使用当前资源版本创建一个 metav1.ListOptions 对象。

使用一个匿名函数并立即执行来列出对象。在此过程中,会尝试分页列出对象,并处理可能出现的错误,例如资源版本不可用或过大。成功列出对象后,会提取对象和资源版本,然后将提取到的对象与存储中的数据同步。

创建一个 goroutine 来处理重新同步操作。当接收到重新同步信号或停止信号时,根据 r.ShouldResync 的值强制进行重新同步,并调用 r.store.Resync()。

创建一个无限循环来监听对象更改。在循环中,首先创建一个带有超时时间和允许使用书签的 ListOptions 对象。然后调用 r.listerWatcher.Watch(options) 进行监听。如果监听失败,根据错误类型采取不同的措施,例如重新连接、等待退避时间等。接着调用 r.watchHandler() 进行监听处理,如果监听处理失败,根据错误类型采取不同的措施,例如重新连接、等待退避时间等。

ListAndWatch 方法的主要目的是列出并监听对象的更改,以便在运行过程中同步数据并处理可能出现的错误。通过这种方式,Reflector 可以确保其缓存数据始终与 API 服务器上的数据保持一致。