跳到主要内容

76.release0.5-01

kube-apiserver

var (
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
// arrange these text blocks sensibly. Grrr.
port = flag.Int("port", 8080, ""+
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
"set up such that this port is not reachable from outside of the cluster. It is "+
"further assumed that port 443 on the cluster's public address is proxied to this "+
"port. This is performed by nginx in the default setup.")
address = util.IP(net.ParseIP("127.0.0.1"))
publicAddressOverride = flag.String("public_address_override", "", ""+
"Public serving address. Read only port will be opened on this address, "+
"and it is assumed that port 443 at this address will be proxied/redirected "+
"to '-address':'-port'. If blank, the address in the first listed interface "+
"will be used.")
readOnlyPort = flag.Int("read_only_port", 7080, ""+
"The port from which to serve read-only resources. If 0, don't serve on a "+
"read-only address. It is assumed that firewall rules are set up such that "+
"this port is not reachable from outside of the cluster.")
securePort = flag.Int("secure_port", 0, "The port from which to serve HTTPS with authentication and authorization. If 0, don't serve HTTPS ")
tlsCertFile = flag.String("tls_cert_file", "", "File containing x509 Certificate for HTTPS. (CA cert, if any, concatenated after server cert).")
tlsPrivateKeyFile = flag.String("tls_private_key_file", "", "File containing x509 private key matching --tls_cert_file.")
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.")
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the secure port of the API server via token authentication.")
authorizationMode = flag.String("authorization_mode", "AlwaysAllow", "Selects how to do authorization on the secure port. One of: "+strings.Join(apiserver.AuthorizationModeChoices, ","))
authorizationPolicyFile = flag.String("authorization_policy_file", "", "File with authorization policy in csv format, used with --authorization_mode=ABAC, on the secure port.")
etcdServerList util.StringList
etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.")
corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
portalNet util.IPNet // TODO: make this a list
enableLogsSupport = flag.Bool("enable_logs_support", true, "Enables server endpoint for log collection")
kubeletConfig = client.KubeletConfig{
Port: 10250,
EnableHttps: false,
}
)
  1. port : API Server 的监听端口,默认为 8080。
  2. address : 默认监听地址,这里设为 localhost (127.0.0.1)。
  3. publicAddressOverride : 公共服务地址,如果为空,则使用第一个列出的接口地址。
  4. readOnlyPort : 为只读资源提供服务的端口,默认为 7080。
  5. securePort : 提供带认证和授权的 HTTPS 服务的端口。默认不提供 HTTPS 服务。
  6. tlsCertFile & tlsPrivateKeyFile : HTTPS 的证书和私钥文件。
  7. apiPrefix : 服务器上 API 请求的前缀,默认为 "/api"。
  8. storageVersion : 存储资源的版本。默认为服务器偏好版本。
  9. cloudProvider & cloudConfigFile : 云服务提供商及其配置文件。
  10. healthCheckMinions : 是否对 minion 进行健康检查,默认为 true。
  11. eventTTL : 事件的保留时间,默认为 48 小时。
  12. tokenAuthFile : 用于通过 token 认证来保护 API Server 安全端口的文件。
  13. authorizationMode & authorizationPolicyFile : 认证方式及其策略文件。
  14. etcdServerList & etcdConfigFile : etcd 服务器列表及其配置文件。
  15. corsAllowedOriginList : 允许的 CORS 源列表。
  16. allowPrivileged : 是否允许特权容器。
  17. portalNet : 分配 portal IPs 的 CIDR 表示的 IP 范围。
  18. enableLogsSupport : 是否启用日志收集的服务器端点。
  19. kubeletConfig : kubelet 配置。
    flag.Parse()
util.InitLogs()
defer util.FlushLogs()

verflag.PrintAndExitIfRequested()
verifyPortalFlags()

if (*etcdConfigFile != "" && len(etcdServerList) != 0) || (*etcdConfigFile == "" && len(etcdServerList) == 0) {
glog.Fatalf("specify either -etcd_servers or -etcd_config")
}

capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: *allowPrivileged,
})

cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile)

kubeletClient, err := client.NewKubeletClient(&kubeletConfig)
if err != nil {
glog.Fatalf("Failure to start kubelet client: %v", err)
}

// TODO: expose same flags as client.BindClientConfigFlags but for a server
clientConfig := &client.Config{
Host: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
Version: *storageVersion,
}
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid server address: %v", err)
}

helper, err := newEtcd(*etcdConfigFile, etcdServerList)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}

n := net.IPNet(portalNet)

authorizer, err := apiserver.NewAuthorizerFromAuthorizationConfig(*authorizationMode, *authorizationPolicyFile)
if err != nil {
glog.Fatalf("Invalid Authorization Config: %v", err)
}

config := &master.Config{
Client: client,
Cloud: cloud,
EtcdHelper: helper,
HealthCheckMinions: *healthCheckMinions,
EventTTL: *eventTTL,
KubeletClient: kubeletClient,
PortalNet: &n,
EnableLogsSupport: *enableLogsSupport,
EnableUISupport: true,
APIPrefix: *apiPrefix,
CorsAllowedOriginList: corsAllowedOriginList,
TokenAuthFile: *tokenAuthFile,
ReadOnlyPort: *readOnlyPort,
ReadWritePort: *port,
PublicAddress: *publicAddressOverride,
Authorizer: authorizer,
}
m := master.New(config)
  1. flag.Parse(): 解析命令行参数。所有之前使用 flag 定义的变量现在都会被填充。
  2. util.InitLogs(): 初始化日志系统,使得日志能够被写入。
  3. defer util.FlushLogs(): 确保程序结束前刷新日志。defer 关键字使得此语句在包含它的函数(通常是 main)返回之前执行。
  4. verflag.PrintAndExitIfRequested(): 检查命令行参数是否请求显示版本信息。如果是,则显示版本信息并退出。
  5. verifyPortalFlags(): 验证 portalNet 参数是否正确配置。
  6. 验证 etcd 配置:只能提供 etcdConfigFileetcdServerList 中的一个,不能都提供或都不提供。
  7. capabilities.Initialize(...): 初始化 API Server 的能力,例如是否允许特权容器。
  8. cloudprovider.InitCloudProvider(...): 初始化云提供商接口。这允许 Kubernetes 与各种云提供商(如 AWS、GCE、Azure 等)交互。
  9. 创建 kubeletClient: 与 kubelet 交互的客户端。
  10. 设置 clientConfig 并创建一个新的 Kubernetes 客户端:用于与 API Server 交互。
  11. newEtcd(...): 初始化 etcd 的帮助函数,用于数据存储。
  12. portalNet 从自定义类型转换为标准的 net.IPNet
  13. 创建一个新的授权器,根据提供的配置确定如何进行 API 访问授权。
  14. 初始化 master 的配置:这涵盖了 API Server 运行所需的大多数配置。
  15. master.New(config): 使用上面的配置创建一个新的 Kubernetes master,它处理所有核心的 API 请求。

clientConfig := &client.Config{
Host: net.JoinHostPort(address.String(), strconv.Itoa(int(*port))),
Version: *storageVersion,
}
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid server address: %v", err)
}


// New creates a Kubernetes client for the given config. This client works with pods,
// replication controllers and services. It allows operations such as list, get, update
// and delete on these objects. An error is returned if the provided configuration
// is not valid.
func New(c *Config) (*Client, error) {
config := *c
if config.Prefix == "" {
config.Prefix = "/api"
}
client, err := RESTClientFor(&config)
if err != nil {
return nil, err
}
return &Client{client}, nil
}


初始化 client ,我们来看看 RESTClientFor

// RESTClientFor returns a RESTClient that satisfies the requested attributes on a client Config
// object.
func RESTClientFor(config *Config) (*RESTClient, error) {
version := defaultVersionFor(config)

// Set version
versionInterfaces, err := latest.InterfacesFor(version)
if err != nil {
return nil, fmt.Errorf("API version '%s' is not recognized (valid values: %s)", version, strings.Join(latest.Versions, ", "))
}

baseURL, err := defaultServerUrlFor(config)
if err != nil {
return nil, err
}

client := NewRESTClient(baseURL, versionInterfaces.Codec)

transport, err := TransportFor(config)
if err != nil {
return nil, err
}

if transport != http.DefaultTransport {
client.Client = &http.Client{Transport: transport}
}
return client, nil
}
  • 这个函数 RESTClientFor 的目的是为了根据给定的 Config 返回一个符合要求属性的 RESTClient。让我们深入了解其内部工作:
  1. 版本确定 首先,函数调用 defaultVersionFor(config) 来确定要使用的 API 版本。这里的 defaultVersionFor 似乎是一个辅助函数,它可能会检查 config 中是否指定了版本,并可能返回默认版本。
  2. 检查版本 函数接着使用 latest.InterfacesFor(version) 来检查所选版本是否有效并获取相应的接口。这里的 latest 可能是一个全局变量或者包级别的对象,它知道所有支持的 API 版本。如果版本无效,函数会返回一个错误。
  3. 获取基本 URL 然后,它调用 defaultServerUrlFor(config) 来获取服务器的基本 URL。这个函数可能与我们之前看到的 DefaultServerURL 类似,但可能根据 Config 中的其他信息进行了更多的处理。
  4. 创建新的 RESTClient 一旦有了基本的 URL 和正确的编解码器(从 versionInterfaces.Codec 获取),函数就会使用它们创建一个新的 RESTClient
  5. 设置传输 接下来,函数使用 TransportFor(config) 来为客户端设置传输。这可能涉及到处理各种配置选项,如代理设置、TLS 证书、身份验证等。 如果得到的传输与默认的 HTTP 传输不同,那么它将为 RESTClient 的内部 HTTP 客户端设置这个新的传输。
  6. 返回客户端 最后,函数返回这个配置好的 RESTClient
// defaultServerUrlFor is shared between IsConfigTransportTLS and RESTClientFor
func defaultServerUrlFor(config *Config) (*url.URL, error) {
version := defaultVersionFor(config)
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
defaultTLS := config.CertFile != "" || config.Insecure
host := config.Host
if host == "" {
host = "localhost"
}
return DefaultServerURL(host, config.Prefix, version, defaultTLS)
}

// DefaultServerURL converts a host, host:port, or URL string to the default base server API path
// to use with a Client at a given API version following the standard conventions for a
// Kubernetes API.
func DefaultServerURL(host, prefix, version string, defaultTLS bool) (*url.URL, error) {
if host == "" {
return nil, fmt.Errorf("host must be a URL or a host:port pair")
}
if version == "" {
return nil, fmt.Errorf("version must be set")
}
base := host
hostURL, err := url.Parse(base)
if err != nil {
return nil, err
}
if hostURL.Scheme == "" {
scheme := "http://"
if defaultTLS {
scheme = "https://"
}
hostURL, err = url.Parse(scheme + base)
if err != nil {
return nil, err
}
if hostURL.Path != "" && hostURL.Path != "/" {
return nil, fmt.Errorf("host must be a URL or a host:port pair: %s", base)
}
}

// If the user specified a URL without a path component (http://server.com), automatically
// append the default prefix
if hostURL.Path == "" {
if prefix == "" {
prefix = "/"
}
hostURL.Path = prefix
}

// Add the version to the end of the path
hostURL.Path = path.Join(hostURL.Path, version)

return hostURL, nil
}
  1. 检查 hostversion。如果它们中的任何一个是空的,函数会返回错误。
  2. 尝试解析 host 到一个 URL。如果它不是一个有效的 URL,url.Parse 会返回错误。
  3. 如果解析的 URL 没有指定协议(也称为方案),函数将为其选择一个默认的。如果 defaultTLStrue,则选择 "https://";否则选择 "http://"。然后再次尝试解析这个带有方案的新 URL。
  4. 检查解析的 URL 的路径是否存在。如果用户指定了一个没有路径的 URL(例如,http://server.com),则会自动追加默认的前缀。
  5. 最后,函数会将版本添加到路径的末尾。
  6. 返回构建的 URL。

一个实际的应用示例: 假设我们有以下参数调用函数:

  • host: example.com:8080
  • prefix: /api
  • version: v1
  • defaultTLS: false

返回的 URL 将是 http://example.com:8080/api/v1


// Attributes implements authorizer.Attributes interface.
type Attributes struct {
// TODO: add fields and methods when authorizer.Attributes is completed.
}

// alwaysAllowAuthorizer is an implementation of authorizer.Attributes
// which always says yes to an authorization request.
// It is useful in tests and when using kubernetes in an open manner.
type alwaysAllowAuthorizer struct{}

func (alwaysAllowAuthorizer) Authorize(a authorizer.Attributes) (err error) {
return nil
}

func NewAlwaysAllowAuthorizer() authorizer.Authorizer {
return new(alwaysAllowAuthorizer)
}

// alwaysDenyAuthorizer is an implementation of authorizer.Attributes
// which always says no to an authorization request.
// It is useful in unit tests to force an operation to be forbidden.
type alwaysDenyAuthorizer struct{}

func (alwaysDenyAuthorizer) Authorize(a authorizer.Attributes) (err error) {
return errors.New("Everything is forbidden.")
}

func NewAlwaysDenyAuthorizer() authorizer.Authorizer {
return new(alwaysDenyAuthorizer)
}

const (
ModeAlwaysAllow string = "AlwaysAllow"
ModeAlwaysDeny string = "AlwaysDeny"
ModeABAC string = "ABAC"
)

// Keep this list in sync with constant list above.
var AuthorizationModeChoices = []string{ModeAlwaysAllow, ModeAlwaysDeny, ModeABAC}

// NewAuthorizerFromAuthorizationConfig returns the right sort of authorizer.Authorizer
// based on the authorizationMode xor an error. authorizationMode should be one of AuthorizationModeChoices.
func NewAuthorizerFromAuthorizationConfig(authorizationMode string, authorizationPolicyFile string) (authorizer.Authorizer, error) {
if authorizationPolicyFile != "" && authorizationMode != "ABAC" {
return nil, errors.New("Cannot specify --authorization_policy_file without mode ABAC")
}
// Keep cases in sync with constant list above.
switch authorizationMode {
case ModeAlwaysAllow:
return NewAlwaysAllowAuthorizer(), nil
case ModeAlwaysDeny:
return NewAlwaysDenyAuthorizer(), nil
case ModeABAC:
return abac.NewFromFile(authorizationPolicyFile)
default:
return nil, errors.New("Unknown authorization mode")
}
}

  1. Attributes 结构 :这是一个实现 authorizer.Attributes 接口的结构,但它目前是空的,并有一个待办事项说明要在完善 authorizer.Attributes 接口后添加字段和方法。
  2. alwaysAllowAuthorizer 结构 :这是 authorizer.Attributes 的一个实现,无论何时都会同意授权请求。这在测试中很有用,也在开放的 Kubernetes 使用场景中很有用。
  3. alwaysDenyAuthorizer 结构 :与 alwaysAllowAuthorizer 相反,这是另一个实现,它总是拒绝授权请求。这在单元测试中很有用,用于强制某个操作被禁止。
  4. 授权模式常量 :定义了几个字符串常量来表示授权模式,如 "AlwaysAllow", "AlwaysDeny", 和 "ABAC"。
  5. NewAuthorizerFromAuthorizationConfig 函数 :这个函数是基于传入的 authorizationModeauthorizationPolicyFile 参数来返回合适的 authorizer.Authorizer。如果 authorizationMode 不在预定义的模式中,函数会返回一个错误。这是一个工厂方法,用于根据所选模式创建合适的授权器。