60.controller-manager
启动 goroutine 来监听 endpoint 和 replication controller
endpoints := service.NewEndpointController(kubeClient)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
controllerManager := controller.NewReplicationManager(kubeClient)
controllerManager.Run(10 * time.Second)
控制器监听 apiserver
func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
ctx := api.NewContext()
watching, err := rm.kubeClient.WatchReplicationControllers(
ctx,
labels.Everything(),
labels.Everything(),
*resourceVersion,
)
if err != nil {
glog.Errorf("Unexpected failure to watch: %v", err)
time.Sleep(5 * time.Second)
return
}
for {
select {
case <-rm.syncTime:
rm.synchronize()
case event, open := <-watching.ResultChan():
if !open {
// watchChannel has been closed, or something else went
// wrong with our etcd watch call. Let the util.Forever()
// that called us call us again.
return
}
glog.V(4).Infof("Got watch: %#v", event)
rc, ok := event.Object.(*api.ReplicationController)
if !ok {
glog.Errorf("unexpected object: %#v", event.Object)
continue
}
// If we get disconnected, start where we left off.
*resourceVersion = rc.ResourceVersion
// Sync even if this is a deletion event, to ensure that we leave
// it in the desired state.
glog.V(4).Infof("About to sync from watch: %v", rc.ID)
rm.syncHandler(*rc)
}
}
}
同步单个 replication controller 的 pod 数量匹配预期
func (rm *ReplicationManager) syncReplicationController(controllerSpec api.ReplicationController) error {
s := labels.Set(controllerSpec.DesiredState.ReplicaSelector).AsSelector()
ctx := api.WithNamespace(api.NewContext(), controllerSpec.Namespace)
podList, err := rm.kubeClient.ListPods(ctx, s)
if err != nil {
return err
}
filteredList := rm.filterActivePods(podList.Items)
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
if diff < 0 {
diff *= -1
wait := sync.WaitGroup{}
wait.Add(diff)
glog.V(2).Infof("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
rm.podControl.createReplica(ctx, controllerSpec)
}()
}
wait.Wait()
} else if diff > 0 {
glog.V(2).Infof("Too many replicas, deleting %d\n", diff)
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(ctx, filteredList[ix].ID)
}(i)
}
wait.Wait()
}
return nil
}