跳到主要内容

64.proxy-01

main

func main() {
flag.Parse()
util.InitLogs()
defer util.FlushLogs()

verflag.PrintAndExitIfRequested()

serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()

// define api config source
glog.Infof("Using api calls to get config %v", clientConfig.Host)
client, err := client.New(clientConfig)
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
config.NewSourceAPI(
client,
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
}


loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer, net.IP(bindAddress))
// Wire proxier to handle changes to services
serviceConfig.RegisterHandler(proxier)
// And wire loadBalancer to handle changes to endpoints to services
endpointsConfig.RegisterHandler(loadBalancer)

// Just loop forever for now...
select {}
}
  1. 解析命令行标志 : 使用flag.Parse()解析传递给程序的命令行参数。
  2. 初始化和刷新日志 : 使用util.InitLogs()进行日志初始化,并通过defer util.FlushLogs()确保在程序退出时将所有挂起的日志写入。
  3. 打印版本信息 : 如果请求,verflag.PrintAndExitIfRequested()将打印版本信息并退出程序。
  4. 创建服务和端点配置 : 使用config.NewServiceConfig()config.NewEndpointsConfig()创建服务和端点的配置对象。
  5. 定义API配置源 : 这个部分使用Kubernetes API客户端来创建一个源,以便从Kubernetes API服务器获取配置信息。
  6. 创建负载均衡器和代理器 : 通过proxy.NewLoadBalancerRR()创建一个基于轮询算法的负载均衡器,然后通过proxy.NewProxier()创建一个新的代理器实例,用于处理传入的连接。
  7. 注册处理程序 : 使用serviceConfig.RegisterHandler(proxier)endpointsConfig.RegisterHandler(loadBalancer)将代理器和负载均衡器与相应的服务和端点配置注册。
  8. 无限循环 : 最后,select {}语句使程序保持活动状态,等待服务和端点的更改。

serviceConfig

// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
mux *config.Mux
watcher *config.Watcher
store *serviceStore
}


type Watcher struct {
// Listeners for changes and their lock.
listenerLock sync.RWMutex
listeners []Listener
}

我们可以看到相比 kubelet 的 podConfig ,这里多了个 watcher ,实际上用了订阅设计模式,当配置有变化,会通知到每个 listener,然后listener 调用 Onupdate方法。

// OnUpdate manages the active set of service proxies.
// Active service proxies are reinitialized if found in the update set or
// shutdown if missing from the update set.
func (proxier *Proxier) OnUpdate(services []api.Service) {
glog.V(4).Infof("Received update notice: %+v", services)
activeServices := util.StringSet{}
for _, service := range services {
activeServices.Insert(service.ID)
info, exists := proxier.getServiceInfo(service.ID)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.isActive() && info.port == service.Port {
continue
}
if exists && info.port != service.Port {
err := proxier.stopProxy(service.ID, info)
if err != nil {
glog.Errorf("error stopping %s: %v", service.ID, err)
}
}
glog.V(3).Infof("Adding a new service %s on %s port %d", service.ID, service.Protocol, service.Port)
sock, err := newProxySocket(service.Protocol, proxier.address, service.Port)
if err != nil {
glog.Errorf("Failed to get a socket for %s: %+v", service.ID, err)
continue
}
proxier.setServiceInfo(service.ID, &serviceInfo{
port: service.Port,
protocol: service.Protocol,
active: true,
socket: sock,
timeout: udpIdleTimeout,
})
proxier.startAccepting(service.ID, sock)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name, info := range proxier.serviceMap {
if !activeServices.Has(name) {
err := proxier.stopProxyInternal(name, info)
if err != nil {
glog.Errorf("error stopping %s: %v", name, err)
}
}
}
}