跳到主要内容

66.predicate

PR # 1754

Add a predicate for persistent disk scheduling. 预选阶段添加 volume 冲突的检查

func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
if volume.Source.GCEPersistentDisk == nil {
return false
}
pdName := volume.Source.GCEPersistentDisk.PDName

manifest := &(pod.DesiredState.Manifest)
for ix := range manifest.Volumes {
if manifest.Volumes[ix].Source.GCEPersistentDisk != nil &&
manifest.Volumes[ix].Source.GCEPersistentDisk.PDName == pdName {
return true
}
}
return false
}

// NoDiskConflict evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. Some times of volumes are mounted onto node machines. For now, these mounts
// are exclusive so if there is already a volume mounted on that node, another pod can't schedule
// there. This is GCE specific for now.
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
manifest := &(pod.DesiredState.Manifest)
for ix := range manifest.Volumes {
for podIx := range existingPods {
if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) {
return false, nil
}
}
}
return true, nil
}

PR # 1676

Add OpenStack cloud provider 添加一个云提供商

只需要修改两个文件 cmd/apiserver/plugin.go 和 pkg/cloudprovider/openstack/openstack.go

// cmd/apiserver/plugin.go
// given binary target.
import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"
)

// pkg/cloudprovider/openstack/openstack.go
func init() {
cloudprovider.RegisterCloudProvider("openstack", func(config io.Reader) (cloudprovider.Interface, error) {
cfg, err := readConfig(config)
if err != nil {
return nil, err
}
return newOpenStack(cfg)
})
}


func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
return nil, false
}

func (os *OpenStack) Zones() (cloudprovider.Zones, bool) {
return nil, false
}

// Instances returns an implementation of Instances for OpenStack.
func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {}

PR # 1325

Proposal for new kubecfg design (kubectl)

rename to kubectl (it's a better name anyway) - DONE


$ kubecfg get pods
<list all pods in ps format>

$ kubecfg get pod PODID
<list single pod in ps format>

$ kubecfg get PODGUID
<list single pod in ps format>

$ kubecfg get -j p PODID
<print raw json of single pod>

$ kubecfg get --template=template.go pod PODID
<print single pod using template.go which can refer to resource fields in pkg/api/types.go>

$ kubecfg inspect pod PODID
<print Key: Value pairs of general information regarding the pod, joining lots of data together>

PR #1789

Add event creation library and implement in scheduler.

// EventRecorder 知道如何存储事件,目前是 kubeclient 实现了这个接口
type EventRecorder interface {
CreateEvent(event *api.Event) (*api.Event, error)
}


//函数的作用是在初始化二进制文件时开始将事件发送给指定的记录器。这个函数应该只调用一次;如果之后再次调用,后续调用将被忽略。返回值是一个watch.Interface对象。根据需要,可以忽略它,或者用它来停止记录。换句话说,如果您想在某个时刻停止事件记录,可以保存返回值,并在合适的时候调用该接口上的方法来停止观察。
func StartRecording(recorder EventRecorder, sourceName string) watch.Interface {
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
event.Source = sourceName
for {
_, err := recorder.CreateEvent(event)
if err == nil {
break
}
glog.Errorf("Sleeping: Unable to write event: %v", err)
time.Sleep(10 * time.Second)
}
})
}

// 这个函数的参数是一个函数,它将被传递一个指向api.Event类型的指针。这允许您在接收事件时执行自定义逻辑
// 这个函数方便测试
func GetEvents(f func(*api.Event)) watch.Interface {
w := events.Watch()
go func() {
defer util.HandleCrash()
for {
watchEvent, open := <-w.ResultChan()
if !open {
return
}
event, ok := watchEvent.Object.(*api.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
continue
}
f(event)
}
}()
return w
}

生产者 为 mux , 事件会被 distribte 进 result channel ,然后 watch 消费


// distribute sends event to all watchers. Blocking.
func (m *Mux) distribute(event Event) {
m.lock.Lock()
defer m.lock.Unlock()
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
}
}

// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Mux) Watch() Interface {
m.lock.Lock()
defer m.lock.Unlock()
id := m.nextWatcher
m.nextWatcher++
w := &muxWatcher{
result: make(chan Event),
stopped: make(chan struct{}),
id: id,
m: m,
}
m.watchers[id] = w
return w
}