跳到主要内容

43.wait

PR #943

Clients must wait for completion of actions

// podsOnMinions returns true when all of the selected pods exist on a minion.
func podsOnMinions(c *client.Client, pods api.PodList) wait.ConditionFunc {
podInfo := fakePodInfoGetter{}
return func() (bool, error) {
for i := range pods.Items {
host, id := pods.Items[i].CurrentState.Host, pods.Items[i].ID
if len(host) == 0 {
return false, nil
}
if _, err := podInfo.GetPodInfo(host, id); err != nil {
return false, nil
}
}
return true, nil
}
}

// wait for minions to indicate they have info about the desired pods
pods, err := c.ListPods(labels.Set(controllerRequest.DesiredState.ReplicaSelector).AsSelector())
if err != nil {
glog.Fatalf("FAILED: unable to get pods to list: %v", err)
}
if err := wait.Poll(time.Second, 10, podsOnMinions(c, pods)); err != nil {
glog.Fatalf("FAILED: pods never started running %v", err)
}

这段代码中定义了一些函数类型以及函数,它们可以在 Go 语言中被用来实现轮询(polling)的机制。

ConditionFunc 是一个函数类型,它返回一个布尔值以及一个错误。这个函数类型主要用于判断某个条件是否满足,如果满足则返回 true,如果不满足则返回 false,如果在检查条件过程中出现错误,则直接返回错误。

Poll 函数会在一个间隔(interval)时间内,不断地执行 ConditionFunc 函数,直到函数返回 true 或出现错误,或者达到了超时时间(timeout)。

WaitFunc 是一个函数类型,它返回一个只读的 struct{} 类型的 channel。这个 channel 在每次应该执行测试时都会接收到一个项目,并在应执行最后一个测试时关闭。

WaitFor 函数会从 WaitFunc 得到一个 channel,然后对该 channel 的每个值以及 channel 关闭时调用一次 ConditionFunc。如果 ConditionFunc 返回错误,循环就会结束并返回该错误;如果 ConditionFunc 返回 true,循环就会结束并返回 nil;如果 channel 关闭时 ConditionFunc 还未返回 true,那么就会返回 ErrWaitTimeout 错误。

poller 函数返回一个 WaitFunc,该函数会向 channel 发送信号,每隔一段时间(interval)发送一次,直到超时(timeout),然后关闭 channel。如果间隔时间非常短,可能在 channel 关闭前都没有收到 tick。如果设定的超时时间为 0,那么这个 channel 将永远不会被关闭。

这种模式通常用于你需要等待某个条件满足,但又不想立即阻塞当前的执行线程的情况。这通常发生在处理 I/O,网络请求或者其他需要等待的操作。

PR #909

Scheduler plugin v1

func main() {
flag.Parse()
util.InitLogs()
defer util.FlushLogs()

verflag.PrintAndExitIfRequested()

// TODO: security story for plugins!
kubeClient := client.New("http://"+*master, nil)

configFactory := &factory.ConfigFactory{Client: kubeClient}
config := configFactory.Create()
s := scheduler.New(config)
s.Run()

select {}
}

// Run begins watching and scheduling. It starts a goroutine and returns immediately.
func (s *Scheduler) Run() {
go util.Forever(s.scheduleOne, 0)
}

func (s *Scheduler) scheduleOne() {
pod := s.config.NextPod()
dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister)
if err != nil {
s.config.Error(pod, err)
return
}
b := &api.Binding{
PodID: pod.ID,
Host: dest,
}
if err := s.config.Binder.Bind(b); err != nil {
s.config.Error(pod, err)
}
}

main:主函数首先解析命令行参数,然后初始化日志系统。之后,它创建一个 Kubernetes 客户端 kubeClient,该客户端用于与 Kubernetes API 服务器交互。然后,主函数创建一个配置工厂 configFactory 并用该工厂创建一个调度器配置 config。最后,主函数创建并运行一个调度器,然后挂起(由于 select {},程序会阻塞在这里,不会立即结束,因为调度器在后台运行)。

Run:此方法在新的 goroutine 中启动调度器,使用 util.Forever(s.scheduleOne, 0) 循环执行 scheduleOne 方法,实现对 Pod 的持续调度。

scheduleOne:此方法处理单个 Pod 的调度。它首先从配置中获取下一个待调度的 Pod,然后使用调度算法为这个 Pod 选择一个 Node(节点)。如果调度成功,就创建一个 Binding 对象并尝试进行绑定操作。如果绑定操作或者调度算法返回错误,就使用 s.config.Error(pod, err) 方法处理错误。

PR #985

Make rolling update be blocking.

这是 Kubernetes 中一种简单的 Pod 更新策略的实现。在 Kubernetes 中,ReplicationController 保证运行指定数量的 Pod 副本。如果有 Pod 停止运行,ReplicationController 会创建新的 Pod 以替代。这种机制被用于实现简单的更新策略。

以下是对代码的解析:

Update 函数接受三个参数:ReplicationController 的名称,一个 Kubernetes 客户端,以及更新周期(即每个 Pod 更新间的等待时间)。

首先,函数通过客户端获取指定的 ReplicationController。

然后,函数使用 ReplicationController 的 ReplicaSelector 创建一个选择器 s。这个选择器可以选择与 ReplicationController 关联的所有 Pod。

函数使用这个选择器列出所有的 Pod,并获取预期的 Pod 数量 expected。

然后,函数遍历每一个 Pod,将其删除。由于 ReplicationController 的机制,每一个被删除的 Pod 都会被新的 Pod 替代。这就意味着每个 Pod 会重新拉取 Docker 镜像,以实现更新。在删除每个 Pod 后,函数会等待指定的更新周期。

最后,函数使用 wait.Poll 等待所有的 Pod 都被成功更新。它会每 5 秒检查一次,最多等待 300 秒。每次检查,都会使用选择器列出所有的 Pod,然后检查 Pod 的数量是否和预期的数量相等。