跳到主要内容

第十八课

PR #135

Add load balancing support to services

在 Service 上添加是否创建 loadbalance 的标识符。

创建service时候判断是否设置了 loadbalance ,如果设置了 true ,调用云厂商的 api 接口来创建 loadbalance 。

云厂商需要实现的接口函数:

在main 函数中创建 实现了接口的对象,目前只支持gce:

PR #144

rafactor controller

 // Begin watching and syncing. 开闭原则,对外暴露的更少了! 
func (rm *ReplicationManager) Run(period time.Duration) {
rm.syncTime = time.Tick(period)
go util.Forever(func() { rm.watchControllers() }, period)
}
func (rm *ReplicationManager) watchControllers() {
watchChannel := make(chan *etcd.Response)
stop := make(chan bool)
defer func() {
// Ensure that the call to watch ends.
close(stop)
}()
go func() {
defer util.HandleCrash()
defer func() {
close(watchChannel)
}()
_, err := rm.etcdClient.Watch("/registry/controllers", 0, true, watchChannel, stop)
if err != etcd.ErrWatchStoppedByUser {
log.Printf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
}()

for {
select {
case <-rm.syncTime:
rm.synchronize()
case watchResponse, open := <-watchChannel:
if !open || watchResponse == nil {
// watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever()
// that called us call us again.
return
}
log.Printf("Got watch: %#v", watchResponse)
controller, err := rm.handleWatchResponse(watchResponse)
if err != nil {
log.Printf("Error handling data: %#v, %#v", err, watchResponse)
continue
}
rm.syncHandler(*controller)
}
}
}
这个方法用于监视etcd中的控制器数据,并在数据发生变化时触发相应的处理。让我们一步步解释代码的功能:

首先,创建一个名为watchChannel的通道,用于接收etcd的响应。另外,创建一个名为stop的通道,用于停止etcd的监听。
defer代码块中,确保在方法结束时关闭stop通道,这会停止etcd的监听。
使用go关键字启动一个新的goroutine来执行以下操作:
a. 使用defer调用util.HandleCrash(),确保在goroutine崩溃时记录错误信息。
b. 使用defer关闭watchChannel通道。
c. 调用rm.etcdClient.Watch()来监视etcd中的/registry/controllers路径,watchChannel用于接收响应,stop用于停止监听。如果监听意外停止,会打印错误信息。
使用for循环来处理以下情况:
a. 当rm.syncTime通道接收到信号时,调用rm.synchronize()方法来同步控制器状态。
b. 从watchChannel接收etcd的响应。如果通道已关闭或响应有误,则返回并允许调用watchControllers的util.Forever()方法重新调用。
c. 如果接收到有效的响应,则打印响应信息,调用rm.handleWatchResponse()处理响应,并将处理后的控制器传递给rm.syncHandler()方法进行同步。
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
podList, err := rm.kubeClient.ListPods(controllerSpec.DesiredState.ReplicasInSet)
if err != nil {
return err
}
filteredList := rm.filterActivePods(podList.Items)
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
log.Printf("%#v", filteredList)
if diff < 0 {
diff *= -1
log.Printf("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ {
rm.podControl.createReplica(controllerSpec)
}
} else if diff > 0 {
log.Print("Too many replicas, deleting")
for i := 0; i < diff; i++ {
rm.podControl.deletePod(filteredList[i].ID)
}
}
return nil
}
使用rm.kubeClient.ListPods()方法获取所有与controllerSpec相关的Pod,并将结果存储在podList变量中。如果发生错误,直接返回错误。
调用rm.filterActivePods()方法,筛选出活跃的Pod,并将结果存储在filteredList变量中。
计算diff,即filteredList中的Pod数量与controllerSpec.DesiredState.Replicas的期望副本数之间的差值。
打印筛选后的Pod列表。
判断diff的值:
a. 如果diff小于0,说明当前副本数小于期望副本数。将diff的绝对值赋值给diff,打印需要创建的副本数量,然后根据差值创建相应数量的副本。在这个过程中,使用rm.podControl.createReplica()方法创建新的副本。
b. 如果diff大于0,说明当前副本数大于期望副本数。打印删除副本的信息,然后根据差值删除多余的副本。在这个过程中,使用rm.podControl.deletePod()方法删除指定的Pod。
如果一切顺利,返回nil表示没有错误。
提示

题目1:实现一个filterActivePods方法

在ReplicationManager结构体中,实现一个名为filterActivePods的方法。此方法接收一个[]api.Pod类型的参数(表示Pod列表),并返回一个过滤后的活跃Pod列表。

要求:只保留状态为api.PodRunning的Pod。

示例代码框架:代码

题目2:实现一个简化版的watchControllers方法

要求:在ReplicationManager结构体中,实现一个简化版的watchControllers方法。这个方法应该接收一个通道,用于接收代表副本控制器变更的字符串(例如:"create"、"update"或"delete")。当收到一个变更时,方法应输出相应的变更信息。

示例代码框架

请注意,此示例中省略了与etcd交互的部分,因此只是一个简化版的watchControllers方法,用于练习处理通道中的事件。 另外你能使用 for select 模式吗?