跳到主要内容

34.kubeletPrivate

PR #551

Private kubelet [私有化 kubelet]

将kubelet内部成员小写

如果我们将一个 struct 内的成员变量都小写,那我们就再也无法从包外给成员赋值,只能使用一个构造函数,所以这里新增了一个构造函数名叫 NewMainKubelet。这个函数名直到现在还在保留着。

PR #527

Adds a flag to specify root working directory.

这段代码主要是关于Kubernetes中的Volume管理的,特别是两种类型的Volume:EmptyDirectory和HostDirectory。

EmptyDirectory:EmptyDirectory是pod可以访问的临时目录。这些目录不会在pod的生命周期之外持久存在。它们会在pod创建时设置,并在pod删除时清理。在此类型中,SetUp方法用于创建新的目录,如果该目录已经存在则会打印一个警告。TearDown方法当前是空的,但在未来可能会用于删除由SetUp创建的目录。GetPath方法返回EmptyDirectory的完整路径。

HostDirectory:HostDirectory是主机上的目录,这些目录可以被pod访问。和EmptyDirectory不同,HostDirectory在pod生命周期外也能够持久存在。

createHostDirectory 和 createEmptyDirectory 函数:这两个函数是用来根据给定的api.Volume创建对应的HostDirectory或EmptyDirectory的。

CreateVolume:这个函数是根据提供的api.Volume创建一个能够挂载该Volume的Interface。首先检查Volume的源,如果源是nil,那么就返回nil,nil。然后,根据源的类型(HostDirectory或EmptyDirectory),调用相应的创建函数来创建Volume。如果Volume的类型不支持,那么会返回一个错误。

在这段特定的代码中,rootDir 被用作创建 "EmptyDirectory" 类型的卷的基础路径。EmptyDirectory 类型的卷是一种在 Pod 生命周期内存在的临时存储,可以被 Pod 中的容器使用。在 Pod 结束时,EmptyDirectory 卷的内容通常会被清除。

例如,如果你有一个 rootDir 路径 /var/lib/kubelet,一个 Pod ID 是 pod123,并且你正在创建一个名为 myvolume 的 EmptyDirectory 卷,那么该卷的完整路径可能是 /var/lib/kubelet/pods/pod123/volumes/kubernetes.io~empty-dir/myvolume。

PR #588

Sync pods asynchronously in the Kubelet.

我们首先来学习下下面的一段代码,这也是 podWorkers 雏形,这个结构体很重要:


// Per-pod workers.
type podWorkers struct {
lock sync.Mutex

// Set of pods with existing workers.
workers util.StringSet
}

func newPodWorkers() podWorkers {
return podWorkers{
workers: util.NewStringSet(),
}
}

// Runs a worker for "podFullName" asynchronously with the specified "action".
// If the worker for the "podFullName" is already running, functions as a no-op.
func (self *podWorkers) Run(podFullName string, action func()) {
self.lock.Lock()
defer self.lock.Unlock()

// This worker is already running, let it finish.
if self.workers.Has(podFullName) {
return
}
self.workers.Insert(podFullName)

// Run worker async.
go func() {
defer util.HandleCrash()
action()

self.lock.Lock()
defer self.lock.Unlock()
self.workers.Delete(podFullName)
}()
}

这段代码是 Go 语言的一个类,名为 podWorkers,用于管理 Kubernetes 的 pod 工作程序(worker)。在 Kubernetes 中,pod 是一组相关的容器的集合,这些容器共享一些资源,并作为一个单元运行。

podWorkers 有两个成员:一个互斥锁 lock,用于保护对 workers 成员的并发访问,以及一个 workers 集合,用于保存当前正在运行的工作程序的 pod 的全名。

newPodWorkers() 是 podWorkers 的构造函数,它创建并返回一个 podWorkers 实例,该实例的 workers 成员被初始化为空集合。

Run(podFullName string, action func()) 方法以异步方式为特定的 pod 运行一个工作程序。该工作程序执行提供的 action 函数。如果已经有一个工作程序在为该 pod 运行,那么这个方法就不做任何操作。如果没有为该 pod 运行的工作程序,那么这个方法就会为该 pod 启动一个新的工作程序,然后立即返回。这个新的工作程序会在一个新的 goroutine 中运行,这样它就可以并行执行。

这个新的工作程序首先会执行提供的 action 函数。然后,无论 action 函数是否成功完成(甚至如果它引发了 panic),它都会从 workers 集合中移除该 pod 的全名。这是通过 defer 关键字实现的,这个关键字会在函数返回时运行代码,无论函数是正常返回还是因为错误或 panic 而返回。


我们再来看看 kubelet 中同步 pods 的代码,使用 podWorkers 来异步执行:

    // Check for any containers that need starting
for i := range pods {
pod := &pods[i]
podFullName := GetPodFullName(pod)

// Add all containers (including net) to the map.
desiredContainers[podContainer{podFullName, networkContainerName}] = empty{}
for _, cont := range pod.Manifest.Containers {
desiredContainers[podContainer{podFullName, cont.Name}] = empty{}
}

// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
if err != nil {
glog.Errorf("Error syncing pod: %v skipping.", err)
}
})
}

对于一个给定的 pods 列表,遍历每个 pod,获取其全名。

将所有的 pod 容器(包括网络容器)添加到 desiredContainers map 中。这个 map 用于追踪需要的容器。其中 podContainer 是一个键,由 pod 的全名和容器的名字构成,empty 是一个占位符类型。

为每个 pod 创建一个异步任务,以便在独立的工作线程中同步(sync)该 pod。具体的同步操作是通过调用 kl.syncPod 来进行的,这个函数将会根据期望的状态和当前的 Docker 容器状态来启动或停止容器。如果同步过程中出现错误,将会记录错误日志,并跳过该 pod。

PR #565

Fork API types.

开始区分内部版本,外部版本,以及做自动转换

type ConversionFunc func(input interface{}) (output interface{}, err error)

var versionMap = map[string]map[string]reflect.Type{}
var externalFuncs = map[string]ConversionFunc{}
var internalFuncs = map[string]ConversionFunc{}

func init() {
AddKnownTypes("",
PodList{},
Pod{},
ReplicationControllerList{},
ReplicationController{},
ServiceList{},
Service{},
MinionList{},
Minion{},
Status{},
ServerOpList{},
ServerOp{},
)
AddKnownTypes("v1beta1",
v1beta1.PodList{},
v1beta1.Pod{},
v1beta1.ReplicationControllerList{},
v1beta1.ReplicationController{},
v1beta1.ServiceList{},
v1beta1.Service{},
v1beta1.MinionList{},
v1beta1.Minion{},
v1beta1.Status{},
v1beta1.ServerOpList{},
v1beta1.ServerOp{},
)
}

这段代码中主要做了几个事情,以下是各部分的解释:

init():Go中的init函数是一种特殊的函数,它在每个Go源文件中都可以包含,主要用于程序的初始化。一个包中可以有多个init函数,这些函数在main函数执行前就会被调用,按照在源文件中的顺序执行。在这段代码中,init函数中调用了AddKnownTypes()函数,将一系列的数据类型注册到了已知类型的映射中。这是用于配置和初始化程序的一种方法,确保了程序开始执行时,这些类型已经在已知类型映射中注册了。

AddKnownTypes():这个函数用于向类型映射表中添加已知类型。函数接收一个版本字符串和一个类型列表,然后根据版本字符串查找对应的类型映射表。如果找不到就创建一个新的类型映射表。然后遍历类型列表,将每个类型添加到类型映射表中。最后,函数将类型的名称和类型本身作为键值对添加到已知类型的映射中。

AddExternalConversion()和AddInternalConversion():这两个函数用于向外部和内部的转换函数映射表中添加转换函数。转换函数是将一种类型的对象转换为另一种类型的对象的函数。这些函数接收一个名字和一个转换函数,然后将它们添加到对应的转换函数映射表中。

总体来看,这段代码是在进行一些初始化操作,为程序的运行做好准备。


    case *ServerOpList:
ops := make([]v1beta1.ServerOp, len(cObj.Items))
for ix := range cObj.Items {
o, err := externalize(cObj.Items[ix])
if err != nil {
return nil, err
}
ops[ix] = o.(v1beta1.ServerOp)
}
result := v1beta1.ServerOpList{
JSONBase: v1beta1.JSONBase(cObj.JSONBase),
Items: ops,
}
result.APIVersion = "v1beta1"
return &result, nil
case *ServerOp:
result := v1beta1.ServerOp{
JSONBase: v1beta1.JSONBase(cObj.JSONBase),
}
result.APIVersion = "v1beta1"
return &result, nil
default:
fn, ok := externalFuncs[reflect.ValueOf(cObj).Elem().Type().Name()]
if !ok {
panic(fmt.Sprintf("Unknown object to externalize: %#v %s", cObj, reflect.ValueOf(cObj).Type().Name()))
}
return fn(cObj)
}
panic(fmt.Sprintf("This should never happen %#v", obj))
return obj, nil

目前转化函数 只支持 从 “” <=> “v1beta1” , 并且注册的转化函数 var externalFuncs = map[string]ConversionFunc{} 仅仅是用来支持不认识的 对象,这个版本很繁琐,所以 作者加了个 TODO:witch to registered functions for each type. 后面会为每个类型注册函数。