02.workqueue
目录
├── default_rate_limiters.go
├── default_rate_limiters_test.go
├── delaying_queue.go
├── delaying_queue_test.go
├── doc.go
├── main_test.go
├── metrics.go
├── metrics_test.go
├── parallelizer.go
├── parallelizer_test.go
├── queue.go
├── queue_test.go
├── rate_limiting_queue.go
└── rate_limiting_queue_test.go
包功能
workqueue 是 Kubernetes 源码中的一个包,它为并发处理任务提供了一个简单的队列实现。workqueue 是一种典型的生产者-消费者模式,允许多个生产者向队列中添加任务,同时多个消费者从队列中获取任务并进行处理。
在 Kubernetes 中,workqueue 主要用于控制器(controller)来处理来自 API 服务器的事件。例如,当一个资源发生变更时,控制器需要对这个变更作出响应。为了实现这一目标,控制器通常使用 workqueue 来缓存并处理这些事件。
workqueue 的主要优点:
- 有序处理:workqueue 可以保证按照先进先出(FIFO)的顺序处理任务,从而确保事件按照顺序处理。
- 去重处理:workqueue 支持对任务进行去重,确保同一任务不会被多次处理。
- 限速和重试:workqueue 提供了限速功能,以防止在短时间内过多地处理任务。同时,它还支持自动重试失败的任务。
- 延时处理:workqueue 支持延时处理任务,可以将任务推迟到未来的某个时间点再执行。
workqueue 包中提供了以下几种队列类型:
- SimpleQueue:一个简单的、无延时的队列。
- DelayingQueue:一个支持延时处理任务的队列。
- RateLimitingQueue:一个支持限速和延时处理任务的队列。
- PriorityQueue:一个支持根据任务优先级处理任务的队列。
在 Kubernetes 的控制器中,通常会使用 RateLimitingQueue 类型的队列,因为它提供了限速和重试机制。这些功能有助于防止控制器在处理任务时发生过载和错误。
queue.go
type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}
定义 Interface,它是 workqueue 的主要接口,包括以下方法:
- Add(item interface{}):添加任务到队列。
- Len() int:返回队列中的任务数量。
- Get() (item interface{}, shutdown bool):从队列中获取一个任务,如果 shutdown 为 true,则表示队列已关闭,不应再添加任务。
- Done(item interface{}):表示任务已完成处理,可以从队列中移除。
- ShutDown():关闭队列,不再接受新任务。
- ShuttingDown() bool:检查队列是否正在关闭。
New() 和 NewNamed(name string) 函数:创建一个新的队列实例。
newQueue() 函数:初始化并返回一个新的队列实例。
这个 Type struct 实现 了 Interface
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
metrics queueMetrics
unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}
Type 结构体:表示队列的主要结构。包含以下字段:
- queue:一个数组,按照先进先出(FIFO)的顺序保存任务。
- dirty:一个集合(set),包含所有需要处理的任务。
- processing:一个集合,包含正在处理的任务。
- cond:一个条件变量,用于协调多个 goroutine 对队列的访问。
- shuttingDown:一个布尔值,表示队列是否正在关闭。
- metrics:一个接口,用于收集队列的度量数据。
- unfinishedWorkUpdatePeriod:一个时间间隔,表示更新未完成任务的频率。
- clock:一个时钟接口,用于处理时间相关操作。
在上面提供的代码中,我们已经了解了 workqueue 的核心数据结构。现在,我们来深入了解一下 workqueue 的一些关键部分。
添加任务:当调用 Add() 方法时,代码会首先检查 shuttingDown 标志以确定队列是否已关闭。如果队列已关闭,则不会添加任务。接下来,代码会检查任务是否已存在于 dirty 集合中,如果存在,则不会再次添加。然后,如果任务未在 processing 集合中,会将其添加到 queue 数组中。最后,会调用 cond.Signal() 通知其他等待中的 goroutines。
获取任务:Get() 方法用于从队列中获取任务。在获取任务之前,代码会等待直到 queue 中有任务可用。当 queue 为空且 shuttingDown 为 true 时,Get() 会返回 shutdown 为 true,表示不应再处理任务。当有任务可用时,会将任务从 queue 中移除,并将其添加到 processing 集合中。
完成任务:当任务处理完成后,需要调用 Done() 方法。Done() 方法会将任务从 processing 集合中移除。如果任务仍存在于 dirty 集合中,表示在处理过程中任务已被重新添加,因此需要将任务再次添加到 queue 中以进行重新处理。
关闭队列:ShutDown() 方法会将 shuttingDown 标志设置为 true,并通过 cond.Broadcast() 唤醒所有等待中的 goroutines。在关闭队列后,workqueue 不再接受新任务。通过 ShuttingDown() 方法可以检查队列是否正在关闭。
delaying_queue.go
delaying_queue.go 文件是 Kubernetes 源码中 workqueue 包的一部分。这个文件实现了一个具有延时功能的队列,称为 DelayingQueue。DelayingQueue 可以用于延时处理任务,即将任务推迟到未来的某个时间点再执行。下面是该文件中代码的解析:
- 定义了一个名为 DelayingInterface 的接口,它继承自 Interface 并添加了一个 AddAfter 方法,用于在指定的延迟时间后将项目添加到队列中。
- 实现了一个名为 DelayingQueue 的结构,它实现了 DelayingInterface。它包含一个优先级队列,用于根据到期时间对任务进行排序,以及一个内部的 Interface 实例,用于在任务到期后将其添加到工作队列中。
- 提供了一些创建 DelayingQueue 实例的工厂方法,如 NewDelayingQueue、NewNamedDelayingQueue 和 NewDelayingQueueWithCustomClock。
- 定义了 waitingLoop 方法,该方法在一个单独的 Goroutine 中运行,负责监控待处理任务,根据它们的到期时间将其添加到工作队列中。
- 实现了 AddAfter 方法,该方法将任务添加到延迟队列,等待指定的延迟时间后再将其添加到工作队列中。
- 实现了一些辅助方法和类型,如 waitFor 和 waitForPriorityQueue,以支持延迟队列的功能。
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}
接下来我将对一些关键的代码片段进行详细解释
- AddAfter 方法:这个方法用于在指定的延迟时间后将项目添加到队列中。如果延迟时间小于等于0,则项目立即被添加到工作队列中。否则,项目将被封装为一个 waitFor 结构体,并发送到 waitingForAddCh 通道。
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
if q.ShuttingDown() {
return
}
q.metrics.retry()
if duration <= 0 {
q.Add(item)
return
}
select {
case <-q.stopCh:
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
- waitingLoop 方法:这个方法在单独的 Goroutine 中运行,主要负责监控待处理任务,根据它们的到期时间将其添加到工作队列中。
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()
never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}
for {
if q.Interface.ShuttingDown() {
return
}
now := q.clock.Now()
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}
entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}
nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}
// 上面的代码中的select方法,满足心跳时间 或者 pop后的heap的第一个元素的时间已经到了 或者q.waitingForAddCh channel有数据,就进入下一次的for循环。
select {
case <-q.stopCh:
return
case <-q.heartbeat.C():
case <-nextReadyAt:
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}
waitingLoop 方法首先初始化一个名为 waitingForQueue 的优先级队列,用于存储待处理的任务。接下来,进入一个无限循环,不断地检查队列的头部元素,如果已经到期,则将其添加到工作队列中。此外,waitingLoop 还使用了一个名为 nextReadyAt
的计时器,用于设置等待下一个到期任务的时间。当任务到期时,计时器会触发,将到期的任务添加到工作队列中。
同时,waitingLoop 方法使用 select 语句监控多个通道:
- 如果收到 q.stopCh 通道的信号,表示队列已关闭,此时 waitingLoop 也需要退出。
- 如果收到 q.heartbeat.C() 通道的信号,表示心跳计时器到期,此时将继续循环,检查队列中是否有到期的任务需要添加到工作队列中。
- 如果收到 nextReadyAt 通道的信号,表示下一个任务已经到期,此时将继续循环,检查队列中是否有到期的任务需要添加到工作队列中。
- 如果收到 q.waitingForAddCh 通道的信号,表示有新任务需要加入到 waitingForQueue,此时会将任务添加到 waitingForQueue,并继续检查队列中是否有到期的任务需要添加到工作队列中。
在处理 q.waitingForAddCh 通道时,waitingLoop 方法还会尝试将通道中的所有待处理任务一次性添加到 waitingForQueue,以提高处理效率。这个过程称为“draining”。
waitingLoop方法,随着delayingType实例的创建而启动,并一直运行下去直到workqueue被shutdown。waitingLoop方法一直在做的事情就是 不停的将上面的 AddAfter方法 放进 q.waitingForAddCh channel的item取出来,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在就放到heap上。并不断的从heap pop出第一个item,检测item的到期时间,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在啥也不做,item继续保留在heap上。
insert方法往heap添加元素,分两种情况。若元素存在则update,若不存在,push该元素到heap中,并将入参的 knownEntries(即waitingLoop方法的waitingEntryByData) set集合添加该元素的值,为了保证不重复。
rate_limiting_queue.go
这段代码定义了一个名为 RateLimitingInterface 的接口,它扩展了 DelayingInterface 接口,为工作队列添加了速率限制功能。RateLimitingInterface 接口具有以下方法:
AddRateLimited(item interface{}):在速率限制器允许的情况下,将元素添加到工作队列。 Forget(item interface{}):表示一个元素已经完成重试。无论是因为永久失败还是成功,我们将停止速率限制器对它的跟踪。这个方法仅清除 rateLimiter,您仍然需要在队列上调用 Done 方法。 NumRequeues(item interface{}) int:返回元素被重新加入队列的次数。 NewRateLimitingQueue 和 NewNamedRateLimitingQueue 是两个构造函数,用于创建具有速率限制功能的工作队列。
rateLimitingType 结构体实现了 RateLimitingInterface,它包装了一个 DelayingInterface,并提供了速率限制的重新入队功能。
以下是 rateLimitingType 结构体的方法实现:
- AddRateLimited(item interface{}):基于速率限制器计算的时间间隔,使用 AddAfter 方法将元素添加到工作队列。
- NumRequeues(item interface{}) int:调用 rateLimiter.NumRequeues(item) 方法,返回元素被重新加入队列的次数。
- Forget(item interface{}):调用 rateLimiter.Forget(item) 方法,忘记对该元素的跟踪。这样可以确保在元素处理完成后,不再对其进行限速。
接下来,我们继续深入解释这段代码中的 RateLimiter 接口及其相关实现。
在这段代码中,RateLimiter 是一个接口,用于实现速率限制功能。它包含以下方法:
- When(item interface{}) time.Duration:根据 item 计算下一次允许入队的时间间隔。
- NumRequeues(item interface{}) int:返回 item 被重新入队的次数。
- Forget(item interface{}):清除对 item 的跟踪,不再对其进行限速。
- NewRateLimitingQueue 和 NewNamedRateLimitingQueue 是两个用于构造具有速率限制功能的工作队列的函数。它们接受一个 RateLimiter 实例作为参数,并返回一个实现了 RateLimitingInterface 的队列实例。
在实际使用中,您需要提供一个实现了 RateLimiter 接口的具体实例。Kubernetes 工作队列库中提供了一些现成的 RateLimiter 实现,例如 ItemExponentialFailureRateLimiter,它对失败的元素实施指数级退避策略,以减少对失败元素的尝试次数。
总结一下,这段代码提供了一个名为 RateLimitingInterface 的接口,它扩展了 DelayingInterface 接口,为工作队列提供了速率限制功能。通过使用 NewRateLimitingQueue 或 NewNamedRateLimitingQueue 函数,您可以创建一个具有速率限制功能的工作队列。为了实现速率限制功能,您需要提供一个实现了 RateLimiter 接口的具体实例。在处理元素时,根据 RateLimiter 的计算结果,队列会在适当的时间间隔后将元素重新添加到队列。当元素处理完成后,您需要调用 Forget 方法以停止对其进行限速。