跳到主要内容

第二十一课

PR #165

它实现了在删除一个复制控制器(Replication Controller)之前确保其最终状态得到应用的逻辑。这样可以避免在修改复制控制器后立即删除它(例如,将副本数从3减少到0),从而导致该复制控制器管理的Pods的存在状态变得不确定。具体来说:

PR #171

应用了podCache ,目前只有 List 的时候用到

func (storage *PodRegistryStorage) List(query labels.Query) (interface{}, error) {
var result api.PodList
pods, err := storage.registry.ListPods(query)
if err == nil {
result.Items = pods
// Get cached info for the list currently.
// TODO: Optionally use fresh info
if storage.podCache != nil {
for ix, pod := range pods {
info, err := storage.podCache.GetContainerInfo(pod.CurrentState.Host, pod.ID)
if err != nil {
log.Printf("Error getting container info: %#v", err)
continue
}
result.Items[ix].CurrentState.Info = info
}
}
}

result.Kind = "cluster#podList"
return result, err
}

PR #164

Standardize terminology on "selector"

  1. 在service 结构体中添加 query
  2. 把query 改成 selector

PR #166

Part #1 of synchronous requests: Add channels and a mechanism for waiting

删除 创建 更新的时候,使用 MakeAsync 函数,将同步函数改成异步函数。

使用 WaitFroObject 来 等待

func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Duration) (interface{}, error) {
tick := time.After(timeout)
var obj interface{}
select {
case obj = <-out:
return obj, nil
case <-tick:
return nil, fmt.Errorf("Timed out waiting for synchronization.")
}
}

type RESTStorage interface:定义了一个名为RESTStorage的接口,它包含了六个方法:List, Get, Delete, Extract, Create, 和 Update。这些方法分别对应于常见的REST API操作,例如列出、获取、删除、解析请求体、创建和更新资源。

func MakeAsync(fn func() interface{}) <-chan interface{}:这是一个用于创建异步操作的函数。它接受一个返回interface{}类型值的无参数函数fn,然后在一个新的goroutine中执行该函数,并将结果发送到一个缓冲大小为1的通道。这个函数最后返回该通道。

func (server *ApiServer) waitForObject(out <-chan interface{}, timeout time.Duration) (interface{}, error):这个方法用于等待异步操作完成。它接受一个接收通道out,其中包含异步操作的结果,以及一个超时时间timeout。这个方法会等待从通道out中接收值,或者超时后返回错误。如果成功接收到值,返回接收到的值和nil错误。

以下代码片段展示了如何使用storage.Create(obj)创建资源,并根据sync变量决定是以同步还是异步方式执行。如果sync为true,那么会使用server.waitForObject(out, timeout)等待异步操作完成。最后,根据sync变量的值设置响应的HTTP状态码,然后调用server.write(statusCode, obj, w)写入响应。

out, err := storage.Create(obj)
if err == nil && sync {
obj, err = server.waitForObject(out, timeout)
}
if err != nil {
server.error(err, w)
return
}
var statusCode int
if sync {
statusCode = http.StatusOK
} else {
statusCode = http.StatusAccepted
}
server.write(statusCode, obj, w)

func (storage *PodRegistryStorage) Delete(id string) (<-chan interface{}, error):这是一个在PodRegistryStorage结构体上定义的方法,用于删除指定ID的Pod。它首先调用apiserver.MakeAsync函数创建一个异步操作,该操作返回一个包含删除成功状态的apiserver.Status对象。然后,它调用storage.registry.DeletePod(id)删除Pod,并返回异步操作通道和删除操作的错误。

问题1:请解释RESTStorage接口的目的和它的六个方法的作用。

问题2:请解释MakeAsync函数的功能,并描述它的输入参数和返回值。

问题3:请解释waitForObject函数的功能,并描述它的输入参数和返回值。

答案1:RESTStorage接口的目的是为不同类型的资源提供统一的REST API操作。它的六个方法对应于常见的REST API操作:

List:列出符合特定标签选择器的资源。 Get:根据资源ID获取资源。 Delete:根据资源ID删除资源。 Extract:从请求体中提取资源数据。 Create:创建一个新的资源。 Update:更新现有的资源。 答案2:MakeAsync函数的功能是创建一个异步操作。它接收一个返回interface{}类型值的无参数函数fn作为输入参数,然后在一个新的goroutine中执行该函数,并将结果发送到一个缓冲大小为1的通道。这个函数最后返回该通道,类型为<-chan interface{}。

答案3:waitForObject函数的功能是等待异步操作完成。它接受两个输入参数:

out:一个接收通道,类型为<-chan interface{},其中包含异步操作的结果。 timeout:一个time.Duration类型的值,表示等待操作完成的超时时间。 waitForObject函数返回两个值:一个interface{}类型的值,表示从通道中接收到的异步操作结果;一个error类型的值,表示操作过程中可能发生的错误。如果成功接收到值,返回接收到的值和nil错误。如果超时,返回nil值和一个包含超时错误信息的错误对象。

提示

作业: // 创建一个异步任务,使其在3秒后返回字符串 "Hello, Async!" 代码:playground