跳到主要内容

61.kubelet

参数

const defaultRootDir = "/var/lib/kubelet"

var (
config = flag.String("config", "", "Path to the config file or directory of files")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest")
enableServer = flag.Bool("enable_server", true, "Enable the info server")
address = util.IP(net.ParseIP("127.0.0.1"))
port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on")
hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.")
networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.")
dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with")
etcdServerList util.StringList
etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers")
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server")
enableDebuggingHandlers = flag.Bool("enable_debugging_handlers", true, "Enables server endpoints for log collection and local running of containers and commands")
)
  1. config: 指定配置文件或目录的路径。
  2. syncFrequency: 定义同步运行的容器和配置之间的最大时间间隔。
  3. fileCheckFrequency: 定义检查配置文件以获取新数据的时间间隔。
  4. httpCheckFrequency: 定义检查 HTTP 获取新数据的时间间隔。
  5. manifestURL: 容器清单的 URL。
  6. enableServer: 是否启用信息服务器。
  7. address: 信息服务器要绑定的 IP 地址。
  8. port: 信息服务器要监听的端口号。
  9. hostnameOverride: 如果不为空,将使用该字符串作为识别而不是实际主机名。
  10. networkContainerImage: 每个 pod 中的网络容器将使用的镜像。
  11. dockerEndpoint: 如果不为空,将用于与 Docker 通信的端点。
  12. etcdServerList: etcd 服务器列表。
  13. etcdConfigFile: etcd 客户端的配置文件。与 etcdServerList 互斥。
  14. rootDirectory: 用于管理 kubelet 文件(例如卷挂载)的目录路径。
  15. allowPrivileged: 如果为 true,则允许容器请求特权模式。
  16. registryPullQPS: 限制从注册表拉取的 QPS(每秒查询率)。如果为 0,则不限制。
  17. registryBurst: 允许临时突发到此数量的拉取,同时仍不超过 registry_qps。仅在 registry_qps 大于 0 时使用。
  18. runonce: 如果为 true,则在从本地清单或远程 URL 生成 pod 后退出。与 etcdServerListenableServer 互斥。
  19. enableDebuggingHandlers: 启用用于日志收集和本地容器和命令运行的服务器端点。

main

  1. 解析标志和初始化日志系统 :程序开始时,它解析命令行标志并初始化日志系统。这使得用户可以通过命令行选项来定制 kubelet 的行为。
  2. 随机数种子的初始化 :用于确保程序的随机行为。
  3. 验证运行选项 :检查 runonce 标志,并确保与其他互斥的标志不会同时被设置。如果 runonce 被设置并且 enableServer 也被设置,则禁用服务器。
  4. 初始化 etcd :设置 etcd 的日志记录器,并初始化一些功能,如允许特权容器。
  5. 连接到 Docker :尝试连接到 Docker 客户端。如果失败,程序将终止。
  6. 处理主机名和根目录 :获取主机名并确保提供了有效的根目录。然后清理和创建这个目录。
  7. 配置来源的定义 :根据所提供的标志,设置从文件、URL 或 etcd 读取容器配置的方法。
  8. kubelet 的实例化 :创建新的 kubelet 实例,并设置各种参数,如 Docker 客户端、etcd 客户端、同步频率等。
  9. 设置 cadvisor 客户端 :尝试创建 cadvisor 客户端并将其设置在 kubelet 中。
  10. 添加健康检查器 :添加用于容器健康检查的检查器。
  11. 处理一次运行 :如果 runonce 标志被设置,则处理 pod 并退出。
  12. 启动 kubelet :启动 kubelet 的主循环,使其开始运行。
  13. 启动 kubelet 服务器 :如果启用,则启动一个服务器以便其他系统组件与 kubelet 进行通信。
  14. 无限运行 :主函数在这一点上被阻塞,因此 kubelet 继续运行,直到手动停止。

podConfig

这个代码片段定义了一个核心组件 podConfig ,它允许 kubelet 从多个源接收、合并和同步 pod 配置,并以有序的方式将这些更改通知给侦听器。这确保了 pod 的一致性和正确的运行状态。kubelet 通过监听 podConfig 的update Channel 来运行 syncLoop 保证一致性。


//这个类型描述了如何将更改发送到更新通道。它有三个不同的模式:
type PodConfigNotificationMode int

const (
//当发生任何更改时,都会通过 SET 操作传递完整的配置
PodConfigNotificationSnapshot = iota
// 当 pods 更改时,通过 UPDATE 消息传递,并且如果有任何添加或删除,则通过 SET 消息传递。
PodConfigNotificationSnapshotAndUpdates
// 将 ADD、UPDATE 和 REMOVE 操作传递到更新通道。
PodConfigNotificationIncremental



// PodConfig 是一个配置复用器,它将多个 pod 配置源合并为单一的一致结构,
// 然后按顺序向侦听器传递增量更改通知。
type PodConfig struct {
pods *podStorage
mux *config.Mux

// the channel of denormalized changes passed to listeners
updates chan kubelet.PodUpdate
}

// NewPodConfig 函数使用给定的通知模式创建 PodConfig 对象。它设置存储、复用器和更新通道,并返回新创建的对象。
func NewPodConfig(mode PodConfigNotificationMode) *PodConfig {
updates := make(chan kubelet.PodUpdate, 1)
storage := newPodStorage(updates, mode)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
}
return podConfig
}


// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
return c.mux.Channel(source)
}

// Updates returns a channel of updates to the configuration, properly denormalized.
func (c *PodConfig) Updates() <-chan kubelet.PodUpdate {
return c.updates
}

// Sync requests the full configuration be delivered to the update channel.
func (c *PodConfig) Sync() {
c.pods.Sync()
}

podConfig 有三个对外暴露的函数,分别是 Channel ,Updates , Sync

  • Channel(source string) chan<- interface{}:返回一个通道,配置源可以在该通道发送新配置的更新。对同一源进行多次调用将返回同一通道。这允许基于变更和状态的源使用相同的通道。

  • Updates() <-chan kubelet.PodUpdate:返回一个 PodUpdate Channel,当有 pod 更新时候会通知 kubelet 进行同步

  • Sync():将全部 pod 按照 Op == SET 放入 updates 中,目前只是测试代码在使用。

podStorage

// podStorage manages the current pod state at any point in time and ensures updates
// to the channel are delivered in order. Note that this object is an in-memory source of
// "truth" and on creation contains zero entries. Once all previously read sources are
// available, then this object should be considered authoritative.
type podStorage struct {
podLock sync.RWMutex
// map of source name to pod name to pod reference
pods map[string]map[string]*kubelet.Pod
mode PodConfigNotificationMode

// ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubelet.PodUpdate
}

podStorage 底层主要有三个数据结构

  • pods 存储各源对应的pods
  • mode 通知模式(默认使用 PodConfigNotificationSnapshotAndUpdates 模式)
  • updates 更新通道 (外部传入)

podStorage 有三个对外暴露的函数,分别是 Merge,Sync ,MergedStated

  • Merge(source string, change interface{}) error Merge 函数将来自不同源的一组传入更改规范化为所有 Pod 的映射,并确保过滤掉多余的更改,然后将零个或多个最小更新推送到更新通道。确保按顺序交付更新。
  • Sync() sends a copy of the current state through the update channel.
  • MergedState() returns a representation of the current merge state.