跳到主要内容

02.workqueue

目录

├── default_rate_limiters.go
├── default_rate_limiters_test.go
├── delaying_queue.go
├── delaying_queue_test.go
├── doc.go
├── main_test.go
├── metrics.go
├── metrics_test.go
├── parallelizer.go
├── parallelizer_test.go
├── queue.go
├── queue_test.go
├── rate_limiting_queue.go
└── rate_limiting_queue_test.go

包功能

workqueue 是 Kubernetes 源码中的一个包,它为并发处理任务提供了一个简单的队列实现。workqueue 是一种典型的生产者-消费者模式,允许多个生产者向队列中添加任务,同时多个消费者从队列中获取任务并进行处理。

在 Kubernetes 中,workqueue 主要用于控制器(controller)来处理来自 API 服务器的事件。例如,当一个资源发生变更时,控制器需要对这个变更作出响应。为了实现这一目标,控制器通常使用 workqueue 来缓存并处理这些事件。

workqueue 的主要优点:

  • 有序处理:workqueue 可以保证按照先进先出(FIFO)的顺序处理任务,从而确保事件按照顺序处理。
  • 去重处理:workqueue 支持对任务进行去重,确保同一任务不会被多次处理。
  • 限速和重试:workqueue 提供了限速功能,以防止在短时间内过多地处理任务。同时,它还支持自动重试失败的任务。
  • 延时处理:workqueue 支持延时处理任务,可以将任务推迟到未来的某个时间点再执行。

workqueue 包中提供了以下几种队列类型:

  • SimpleQueue:一个简单的、无延时的队列。
  • DelayingQueue:一个支持延时处理任务的队列。
  • RateLimitingQueue:一个支持限速和延时处理任务的队列。
  • PriorityQueue:一个支持根据任务优先级处理任务的队列。

在 Kubernetes 的控制器中,通常会使用 RateLimitingQueue 类型的队列,因为它提供了限速和重试机制。这些功能有助于防止控制器在处理任务时发生过载和错误。

queue.go

type Interface interface {
Add(item interface{})
Len() int
Get() (item interface{}, shutdown bool)
Done(item interface{})
ShutDown()
ShuttingDown() bool
}

定义 Interface,它是 workqueue 的主要接口,包括以下方法:

  • Add(item interface{}):添加任务到队列。
  • Len() int:返回队列中的任务数量。
  • Get() (item interface{}, shutdown bool):从队列中获取一个任务,如果 shutdown 为 true,则表示队列已关闭,不应再添加任务。
  • Done(item interface{}):表示任务已完成处理,可以从队列中移除。
  • ShutDown():关闭队列,不再接受新任务。
  • ShuttingDown() bool:检查队列是否正在关闭。

New() 和 NewNamed(name string) 函数:创建一个新的队列实例。

newQueue() 函数:初始化并返回一个新的队列实例。

这个 Type struct 实现 了 Interface

type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t

// dirty defines all of the items that need to be processed.
dirty set

// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set

cond *sync.Cond

shuttingDown bool

metrics queueMetrics

unfinishedWorkUpdatePeriod time.Duration
clock clock.Clock
}

Type 结构体:表示队列的主要结构。包含以下字段:

  • queue:一个数组,按照先进先出(FIFO)的顺序保存任务。
  • dirty:一个集合(set),包含所有需要处理的任务。
  • processing:一个集合,包含正在处理的任务。
  • cond:一个条件变量,用于协调多个 goroutine 对队列的访问。
  • shuttingDown:一个布尔值,表示队列是否正在关闭。
  • metrics:一个接口,用于收集队列的度量数据。
  • unfinishedWorkUpdatePeriod:一个时间间隔,表示更新未完成任务的频率。
  • clock:一个时钟接口,用于处理时间相关操作。

在上面提供的代码中,我们已经了解了 workqueue 的核心数据结构。现在,我们来深入了解一下 workqueue 的一些关键部分。

添加任务:当调用 Add() 方法时,代码会首先检查 shuttingDown 标志以确定队列是否已关闭。如果队列已关闭,则不会添加任务。接下来,代码会检查任务是否已存在于 dirty 集合中,如果存在,则不会再次添加。然后,如果任务未在 processing 集合中,会将其添加到 queue 数组中。最后,会调用 cond.Signal() 通知其他等待中的 goroutines。

获取任务:Get() 方法用于从队列中获取任务。在获取任务之前,代码会等待直到 queue 中有任务可用。当 queue 为空且 shuttingDown 为 true 时,Get() 会返回 shutdown 为 true,表示不应再处理任务。当有任务可用时,会将任务从 queue 中移除,并将其添加到 processing 集合中。

完成任务:当任务处理完成后,需要调用 Done() 方法。Done() 方法会将任务从 processing 集合中移除。如果任务仍存在于 dirty 集合中,表示在处理过程中任务已被重新添加,因此需要将任务再次添加到 queue 中以进行重新处理。

关闭队列:ShutDown() 方法会将 shuttingDown 标志设置为 true,并通过 cond.Broadcast() 唤醒所有等待中的 goroutines。在关闭队列后,workqueue 不再接受新任务。通过 ShuttingDown() 方法可以检查队列是否正在关闭。

delaying_queue.go

delaying_queue.go 文件是 Kubernetes 源码中 workqueue 包的一部分。这个文件实现了一个具有延时功能的队列,称为 DelayingQueue。DelayingQueue 可以用于延时处理任务,即将任务推迟到未来的某个时间点再执行。下面是该文件中代码的解析:

  • 定义了一个名为 DelayingInterface 的接口,它继承自 Interface 并添加了一个 AddAfter 方法,用于在指定的延迟时间后将项目添加到队列中。
  • 实现了一个名为 DelayingQueue 的结构,它实现了 DelayingInterface。它包含一个优先级队列,用于根据到期时间对任务进行排序,以及一个内部的 Interface 实例,用于在任务到期后将其添加到工作队列中。
  • 提供了一些创建 DelayingQueue 实例的工厂方法,如 NewDelayingQueue、NewNamedDelayingQueue 和 NewDelayingQueueWithCustomClock。
  • 定义了 waitingLoop 方法,该方法在一个单独的 Goroutine 中运行,负责监控待处理任务,根据它们的到期时间将其添加到工作队列中。
  • 实现了 AddAfter 方法,该方法将任务添加到延迟队列,等待指定的延迟时间后再将其添加到工作队列中。
  • 实现了一些辅助方法和类型,如 waitFor 和 waitForPriorityQueue,以支持延迟队列的功能。
// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
Interface
// AddAfter adds an item to the workqueue after the indicated duration has passed
AddAfter(item interface{}, duration time.Duration)
}

接下来我将对一些关键的代码片段进行详细解释

  1. AddAfter 方法:这个方法用于在指定的延迟时间后将项目添加到队列中。如果延迟时间小于等于0,则项目立即被添加到工作队列中。否则,项目将被封装为一个 waitFor 结构体,并发送到 waitingForAddCh 通道。
func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
if q.ShuttingDown() {
return
}

q.metrics.retry()

if duration <= 0 {
q.Add(item)
return
}

select {
case <-q.stopCh:
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}
  1. waitingLoop 方法:这个方法在单独的 Goroutine 中运行,主要负责监控待处理任务,根据它们的到期时间将其添加到工作队列中。
func (q *delayingType) waitingLoop() {
defer utilruntime.HandleCrash()

never := make(<-chan time.Time)
var nextReadyAtTimer clock.Timer
waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)
waitingEntryByData := map[t]*waitFor{}

for {
if q.Interface.ShuttingDown() {
return
}

now := q.clock.Now()

for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
if entry.readyAt.After(now) {
break
}

entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(waitingEntryByData, entry.data)
}

nextReadyAt := never
if waitingForQueue.Len() > 0 {
if nextReadyAtTimer != nil {
nextReadyAtTimer.Stop()
}
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
nextReadyAt = nextReadyAtTimer.C()
}

// 上面的代码中的select方法,满足心跳时间 或者 pop后的heap的第一个元素的时间已经到了 或者q.waitingForAddCh channel有数据,就进入下一次的for循环。
select {
case <-q.stopCh:
return

case <-q.heartbeat.C():
case <-nextReadyAt:
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}

drained := false
for !drained {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
default:
drained = true
}
}
}
}
}

waitingLoop 方法首先初始化一个名为 waitingForQueue 的优先级队列,用于存储待处理的任务。接下来,进入一个无限循环,不断地检查队列的头部元素,如果已经到期,则将其添加到工作队列中。此外,waitingLoop 还使用了一个名为 nextReadyAt 的计时器,用于设置等待下一个到期任务的时间。当任务到期时,计时器会触发,将到期的任务添加到工作队列中。

同时,waitingLoop 方法使用 select 语句监控多个通道:

  • 如果收到 q.stopCh 通道的信号,表示队列已关闭,此时 waitingLoop 也需要退出。
  • 如果收到 q.heartbeat.C() 通道的信号,表示心跳计时器到期,此时将继续循环,检查队列中是否有到期的任务需要添加到工作队列中。
  • 如果收到 nextReadyAt 通道的信号,表示下一个任务已经到期,此时将继续循环,检查队列中是否有到期的任务需要添加到工作队列中。
  • 如果收到 q.waitingForAddCh 通道的信号,表示有新任务需要加入到 waitingForQueue,此时会将任务添加到 waitingForQueue,并继续检查队列中是否有到期的任务需要添加到工作队列中。

在处理 q.waitingForAddCh 通道时,waitingLoop 方法还会尝试将通道中的所有待处理任务一次性添加到 waitingForQueue,以提高处理效率。这个过程称为“draining”。

waitingLoop方法,随着delayingType实例的创建而启动,并一直运行下去直到workqueue被shutdown。waitingLoop方法一直在做的事情就是 不停的将上面的 AddAfter方法 放进 q.waitingForAddCh channel的item取出来,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在就放到heap上。并不断的从heap pop出第一个item,检测item的到期时间,根据item的时间是早于现在还是晚于现在,早于现在就加入工作队列,晚于现在啥也不做,item继续保留在heap上。

insert方法往heap添加元素,分两种情况。若元素存在则update,若不存在,push该元素到heap中,并将入参的 knownEntries(即waitingLoop方法的waitingEntryByData) set集合添加该元素的值,为了保证不重复。

rate_limiting_queue.go

这段代码定义了一个名为 RateLimitingInterface 的接口,它扩展了 DelayingInterface 接口,为工作队列添加了速率限制功能。RateLimitingInterface 接口具有以下方法:

AddRateLimited(item interface{}):在速率限制器允许的情况下,将元素添加到工作队列。 Forget(item interface{}):表示一个元素已经完成重试。无论是因为永久失败还是成功,我们将停止速率限制器对它的跟踪。这个方法仅清除 rateLimiter,您仍然需要在队列上调用 Done 方法。 NumRequeues(item interface{}) int:返回元素被重新加入队列的次数。 NewRateLimitingQueue 和 NewNamedRateLimitingQueue 是两个构造函数,用于创建具有速率限制功能的工作队列。

rateLimitingType 结构体实现了 RateLimitingInterface,它包装了一个 DelayingInterface,并提供了速率限制的重新入队功能。

以下是 rateLimitingType 结构体的方法实现:

  • AddRateLimited(item interface{}):基于速率限制器计算的时间间隔,使用 AddAfter 方法将元素添加到工作队列。
  • NumRequeues(item interface{}) int:调用 rateLimiter.NumRequeues(item) 方法,返回元素被重新加入队列的次数。
  • Forget(item interface{}):调用 rateLimiter.Forget(item) 方法,忘记对该元素的跟踪。这样可以确保在元素处理完成后,不再对其进行限速。

接下来,我们继续深入解释这段代码中的 RateLimiter 接口及其相关实现。

在这段代码中,RateLimiter 是一个接口,用于实现速率限制功能。它包含以下方法:

  • When(item interface{}) time.Duration:根据 item 计算下一次允许入队的时间间隔。
  • NumRequeues(item interface{}) int:返回 item 被重新入队的次数。
  • Forget(item interface{}):清除对 item 的跟踪,不再对其进行限速。
  • NewRateLimitingQueue 和 NewNamedRateLimitingQueue 是两个用于构造具有速率限制功能的工作队列的函数。它们接受一个 RateLimiter 实例作为参数,并返回一个实现了 RateLimitingInterface 的队列实例。

在实际使用中,您需要提供一个实现了 RateLimiter 接口的具体实例。Kubernetes 工作队列库中提供了一些现成的 RateLimiter 实现,例如 ItemExponentialFailureRateLimiter,它对失败的元素实施指数级退避策略,以减少对失败元素的尝试次数。

总结一下,这段代码提供了一个名为 RateLimitingInterface 的接口,它扩展了 DelayingInterface 接口,为工作队列提供了速率限制功能。通过使用 NewRateLimitingQueue 或 NewNamedRateLimitingQueue 函数,您可以创建一个具有速率限制功能的工作队列。为了实现速率限制功能,您需要提供一个实现了 RateLimiter 接口的具体实例。在处理元素时,根据 RateLimiter 的计算结果,队列会在适当的时间间隔后将元素重新添加到队列。当元素处理完成后,您需要调用 Forget 方法以停止对其进行限速。