跳到主要内容

62.kubelet-02

健康检查

    health.AddHealthChecker(health.NewExecHealthChecker(k))
health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{}))
health.AddHealthChecker(&health.TCPHealthChecker{})


// HealthChecker defines an abstract interface for checking container health.
type HealthChecker interface {
HealthCheck(podFullName, podUUID string, currentState api.PodState, container api.Container) (Status, error)
CanCheck(probe *api.LivenessProbe) bool
}

server

// InstallDefaultHandlers registers the default set of supported HTTP request patterns with the mux.
func (s *Server) InstallDefaultHandlers() {
healthz.InstallHandler(s.mux)
s.mux.HandleFunc("/podInfo", s.handlePodInfo)
s.mux.HandleFunc("/stats/", s.handleStats)
s.mux.HandleFunc("/spec/", s.handleSpec)
}

// InstallDeguggingHandlers registers the HTTP request patterns that serve logs or run commands/containers
func (s *Server) InstallDebuggingHandlers() {
// ToDo: /container, /run, and /containers aren't debugging options, should probably be handled separately
s.mux.HandleFunc("/container", s.handleContainer)
s.mux.HandleFunc("/containers", s.handleContainers)
s.mux.HandleFunc("/run/", s.handleRun)

s.mux.HandleFunc("/logs/", s.handleLogs)
s.mux.HandleFunc("/containerLogs/", s.handleContainerLogs)
}

type HostInterface interface {
GetContainerInfo(podFullName, uuid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetPodInfo(name, uuid string) (api.PodInfo, error)
RunInContainer(name, uuid, container string, cmd []string) ([]byte, error)
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
}

syncPod

func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uuid := pod.Manifest.UUID
containersToKeep := make(map[dockertools.DockerID]empty)
killedContainers := make(map[dockertools.DockerID]empty)

// Make sure we have a network container
var netID dockertools.DockerID
if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found {
netID = dockertools.DockerID(networkDockerContainer.ID)
} else {
glog.V(3).Infof("Network container doesn't exist, creating")
count, err := kl.deleteAllContainers(pod, podFullName, dockerContainers)
if err != nil {
return err
}
dockerNetworkID, err := kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName)
return err
}
netID = dockerNetworkID
if count > 0 {
// relist everything, otherwise we'll think we're ok
dockerContainers, err = dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
glog.Errorf("Error listing containers %#v", dockerContainers)
return err
}
}
}
containersToKeep[netID] = empty{}

podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
if err != nil {
glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err)
return err
}

podState := api.PodState{}
info, err := kl.GetPodInfo(podFullName, uuid)
if err != nil {
glog.Errorf("Unable to get pod with name %s and uuid %s info, health checks may be invalid.",
podFullName, uuid)
}
netInfo, found := info[networkContainerName]
if found {
podState.PodIP = netInfo.PodIP
}

for _, container := range pod.Manifest.Containers {
expectedHash := dockertools.HashContainer(&container)
if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uuid, container.Name); found {
containerID := dockertools.DockerID(dockerContainer.ID)
glog.V(3).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)

// look for changes in the container.
if hash == 0 || hash == expectedHash {
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(podFullName, uuid, podState, container, dockerContainer)
if err != nil {
glog.V(1).Infof("health check errored: %v", err)
containersToKeep[containerID] = empty{}
continue
}
if healthy == health.Healthy {
containersToKeep[containerID] = empty{}
continue
}
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
} else {
glog.V(3).Infof("container hash changed %d vs %d.", hash, expectedHash)
}
if err := kl.killContainer(dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
}
killedContainers[containerID] = empty{}
}

// Check RestartPolicy for container
recentContainers, err := dockertools.GetRecentDockerContainersWithNameAndUUID(kl.dockerClient, podFullName, uuid, container.Name)
if err != nil {
glog.Errorf("Error listing recent containers with name and uuid:%s--%s--%s", podFullName, uuid, container.Name)
// TODO(dawnchen): error handling here?
}

if len(recentContainers) > 0 && pod.Manifest.RestartPolicy.Always == nil {
if pod.Manifest.RestartPolicy.Never != nil {
glog.V(3).Infof("Already ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
if pod.Manifest.RestartPolicy.OnFailure != nil {
// Check the exit code of last run
if recentContainers[0].State.ExitCode == 0 {
glog.V(3).Infof("Already successfully ran container with name %s--%s--%s, do nothing",
podFullName, uuid, container.Name)
continue
}
}
}

glog.V(3).Infof("Container with name %s--%s--%s doesn't exist, creating %#v", podFullName, uuid, container.Name, container)
if !api.IsPullNever(container.ImagePullPolicy) {
present, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
glog.Errorf("Failed to inspect image: %s: %#v skipping pod %s container %s", container.Image, err, podFullName, container.Name)
continue
}
if api.IsPullAlways(container.ImagePullPolicy) || !present {
if err := kl.dockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to pull image %s: %v skipping pod %s container %s.", container.Image, err, podFullName, container.Name)
continue
}
}
}
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
containerID, err := kl.runContainer(pod, &container, podVolumes, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue
}
containersToKeep[containerID] = empty{}
}

// Kill any containers in this pod which were not identified above (guards against duplicates).
for id, container := range dockerContainers {
curPodFullName, curUUID, _, _ := dockertools.ParseDockerName(container.Names[0])
if curPodFullName == podFullName && curUUID == uuid {
// Don't kill containers we want to keep or those we already killed.
_, keep := containersToKeep[id]
_, killed := killedContainers[id]
if !keep && !killed {
err = kl.killContainer(container)
if err != nil {
glog.Errorf("Error killing container: %v", err)
}
}
}
}

return nil

这个函数是Kubernetes的Kubelet组件的一部分,它负责同步pod的状态,包括确保所需的容器正在运行,以及杀死任何不再需要的容器。下面是对这个函数的详细解释:

  1. 确保网络容器存在 : 如果网络容器不存在,则会删除所有容器并创建一个新的网络容器。网络容器用于管理容器之间的网络通信。
  2. 挂载外部卷 : 该函数还负责挂载pod所需的所有外部卷。如果无法挂载这些卷,则返回错误并跳过pod。
  3. 检查容器健康状况 : 对于pod中的每个容器,该函数检查它们是否存在,并与预期的哈希值进行比较以查看是否有任何更改。它还执行健康检查,并根据结果决定是否保留或杀死容器。
  4. 处理重启策略 : 根据pod的重启策略(总是、从不或失败时)以及容器的最近状态,决定是否要重新启动容器。
  5. 拉取和运行容器 : 对于需要的容器,该函数确保镜像已经存在,如果需要,则会拉取它。然后,它将运行容器,并将其ID添加到containersToKeep映射中。
  6. 杀死不再需要的容器 : 最后,该函数会遍历所有当前运行的容器,并杀死任何未在containersToKeep中列出的容器。这可以确保不再需要的任何容器都被正确清理。

func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContainers) error定义了一个名为syncPod的方法,该方法用于同步给定的pod状态。

  • podFullName := GetPodFullName(pod):获取pod的完整名称。
  • uuid := pod.Manifest.UUID:获取pod的UUID。
  • containersToKeep := make(map[dockertools.DockerID]empty)killedContainers := make(map[dockertools.DockerID]empty):定义要保留和杀死的容器的映射。
  • if networkDockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, uuid, networkContainerName); found { ... }:确定网络容器是否存在。如果找不到,则创建一个。
  • podVolumes, err := kl.mountExternalVolumes(&pod.Manifest):挂载pod所需的外部卷。
  • info, err := kl.GetPodInfo(podFullName, uuid):获取关于pod的信息。
  • 对于pod的每个容器,确定容器是否存在、是否健康、是否需要重新启动等,并相应地杀死或启动容器。
  • 如果容器不满足重启策略,则可以跳过重启。
  • for id, container := range dockerContainers { ... }:杀死不再需要的容器。
  • return nil:返回nil表示同步成功完成。

整个函数的目的是确保给定pod的容器状态与期望的状态一致。如果有任何差异,例如容器不健康或不存在,则采取适当的措施来纠正这些差异,例如通过重新启动容器或杀死不再需要的容器。