63.kubelet-03
同步 pods
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.V(4).Infof("Desired [%s]: %+v", kl.hostname, pods)
var err error
desiredContainers := make(map[podContainer]empty)
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}
// Check for any containers that need starting
for ix := range pods {
pod := &pods[ix]
podFullName := GetPodFullName(pod)
uuid := pod.Manifest.UUID
// Add all containers (including net) to the map.
desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{}
for _, cont := range pod.Manifest.Containers {
desiredContainers[podContainer{podFullName, uuid, cont.Name}] = empty{}
}
// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
if err != nil {
glog.Errorf("Error syncing pod: %v skipping.", err)
}
})
}
// Kill any containers we don't need
existingContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers: %v", err)
return err
}
for _, container := range existingContainers {
// Don't kill containers that are in the desired pods.
podFullName, uuid, containerName, _ := dockertools.ParseDockerName(container.Names[0])
if _, ok := desiredContainers[podContainer{podFullName, uuid, containerName}]; !ok {
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
// Remove any orphaned volumes.
kl.reconcileVolumes(pods)
return err
- 初始化 : 创建一个映射,用于存储期望运行的容器,并从Docker获取当前正在运行的容器列表。
- 检查需要启动的容器 : 遍历每个期望的pod,并将其容器(包括网络容器)添加到期望容器的映射中。随后,使用异步pod worker运行
syncPod
方法,该方法可能负责启动、停止或重启容器,以便使其与期望的状态相匹配。 - 杀死不需要的容器 : 重新从Docker获取当前正在运行的容器列表,并杀死任何不在期望容器映射中的容器。这确保了只有期望的容器在运行。
- 清理孤立的卷 : 调用
reconcileVolumes
方法来处理与期望的pod列表不匹配的任何卷。
同步 pod
// Run the sync in an async manifest worker.
kl.podWorkers.Run(podFullName, func() {
err := kl.syncPod(pod, dockerContainers)
if err != nil {
glog.Errorf("Error syncing pod: %v skipping.", err)
}
})
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uuid := pod.Manifest.UUID
containersToKeep := make(map[dockertools.DockerID]empty)
killedContainers := make(map[dockertools.DockerID]empty)
// Make sure we have a network container
var netID dockertools.DockerID
if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
netID = dockertools.DockerID(networkDockerContainer.ID)
} else {
glog.V(3).Infof("Network container doesn't exist, creating")
count, err := kl.deleteAllContainers(pod, podFullName, dockerContainers)
if err != nil {
return err
}
dockerNetworkID, err := kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName)
return err
}
netID = dockerNetworkID
if count > 0 {
// relist everything, otherwise we'll think we're ok
dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}
}
}
containersToKeep[netID] = empty{}
podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
if err != nil {
glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err)
return err
}
podState := api.PodState{}
info, err := kl.GetPodInfo(podFullName, uuid)
if err != nil {
glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid.",
podFullName, uuid)
}
netInfo, found := info[networkContainerName]
if found {
podState.PodIP = netInfo.PodIP
}
for _, container := range pod.Manifest.Containers {
expectedHash := dockertools.HashContainer(&container)
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uuid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
glog.V(3).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
// look for changes in the container.
if hash == 0 || hash == expectedHash {
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podFullName, uuid, podState, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
containersToKeep[containerID] = empty{}
continue
}
if healthy == health.Healthy {
containersToKeep[containerID] = empty{}
continue
}
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
} else {
glog.V(3).Infof("container hash changed %d vs %d.", hash, expectedHash)
}
if err := kl.killContainer(dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
}
killedContainers[containerID] = empty{}
}
// Check RestartPolicy for container
recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, uuid, container.Name)
if err != nil {
glog.Errorf("Error listing recent containers with name and uuid:%s--%s--%s", podFullName, uuid, container.Name)
// TODO(dawnchen): error handling here?
}
if len(recentContainers) > 0 && pod.Manifest.RestartPolicy.Always == nil {
if pod.Manifest.RestartPolicy.Never != nil {
glog.V(3).Infof("Already ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
if pod.Manifest.RestartPolicy.OnFailure != nil {
// Check the exit code of last run
if recentContainers[0].State.ExitCode == 0 {
glog.V(3).Infof("Already successfully ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
}
}
glog.V(3).Infof("Container with name %s--%s--%s doesn't exist, creating %#v", podFullName, uuid, container.Name, container)
if !api.IsPullNever(container.ImagePullPolicy) {
present, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
glog.Errorf("Failed to inspect image: %s: %#v skipping pod %s container %s", container.Image, err, podFullName, container.Name)
continue
}
if api.IsPullAlways(container.ImagePullPolicy) || !present {
if err := kl.dockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to pull image %s: %v skipping pod %s container %s.", container.Image, err, podFullName, container.Name)
continue
}
}
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
containerID, err := kl.runContainer(pod, &container, podVolumes, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue
}
containersToKeep[containerID] = empty{}
}
// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, curUUID, _, _ := dockertools.ParseDockerName(container.Names[0])
if curPodFullName == podFullName && curUUID == uuid {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
}
return nil
}
- 设置容器保持和杀死的映射 : 这些映射用于跟踪应该保持运行的容器和已经被杀死的容器。
- 检查网络容器 : 如果找到网络容器,会保持它。如果没有找到,则创建一个新的。
- 挂载外部卷 : 对于Pod中定义的所有外部卷进行挂载。
- 获取Pod状态 : 获取有关Pod及其容器的信息,并根据需要设置Pod IP。
- 处理容器 : 遍历pod的每个容器,并执行以下步骤:
- 检查容器是否存在,如果存在并且散列匹配,则进行健康检查。
- 如果容器不健康或散列不匹配,杀死容器。
- 检查重启策略并相应地重启容器。
- 如果需要,拉取并运行容器。
- 杀死任何未标识的容器 : 最后,函数会杀死任何没有在之前的步骤中标识的容器。这确保了只有期望的容器在运行。