

PR # 1754

Add a predicate for persistent disk scheduling. 预选阶段添加 volume 冲突的检查

func isVolumeConflict(volume api.Volume, pod *api.Pod) bool {
if volume.Source.GCEPersistentDisk == nil {
return false
pdName := volume.Source.GCEPersistentDisk.PDName

manifest := &(pod.DesiredState.Manifest)
for ix := range manifest.Volumes {
if manifest.Volumes[ix].Source.GCEPersistentDisk != nil &&
manifest.Volumes[ix].Source.GCEPersistentDisk.PDName == pdName {
return true
return false

// NoDiskConflict evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. Some times of volumes are mounted onto node machines. For now, these mounts
// are exclusive so if there is already a volume mounted on that node, another pod can't schedule
// there. This is GCE specific for now.
// TODO: migrate this into some per-volume specific code?
func NoDiskConflict(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
manifest := &(pod.DesiredState.Manifest)
for ix := range manifest.Volumes {
for podIx := range existingPods {
if isVolumeConflict(manifest.Volumes[ix], &existingPods[podIx]) {
return false, nil
return true, nil

PR # 1676

Add OpenStack cloud provider 添加一个云提供商

只需要修改两个文件 cmd/apiserver/plugin.go 和 pkg/cloudprovider/openstack/openstack.go

// cmd/apiserver/plugin.go
// given binary target.
import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"

// pkg/cloudprovider/openstack/openstack.go
func init() {
cloudprovider.RegisterCloudProvider("openstack", func(config io.Reader) (cloudprovider.Interface, error) {
cfg, err := readConfig(config)
if err != nil {
return nil, err
return newOpenStack(cfg)

func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) {
return nil, false

func (os *OpenStack) Zones() (cloudprovider.Zones, bool) {
return nil, false

// Instances returns an implementation of Instances for OpenStack.
func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {}

PR # 1325

Proposal for new kubecfg design (kubectl)

rename to kubectl (it's a better name anyway) - DONE

$ kubecfg get pods
<list all pods in ps format>

$ kubecfg get pod PODID
<list single pod in ps format>

$ kubecfg get PODGUID
<list single pod in ps format>

$ kubecfg get -j p PODID
<print raw json of single pod>

$ kubecfg get --template=template.go pod PODID
<print single pod using template.go which can refer to resource fields in pkg/api/types.go>

$ kubecfg inspect pod PODID
<print Key: Value pairs of general information regarding the pod, joining lots of data together>

PR #1789

Add event creation library and implement in scheduler.

// EventRecorder 知道如何存储事件,目前是 kubeclient 实现了这个接口
type EventRecorder interface {
CreateEvent(event *api.Event) (*api.Event, error)

func StartRecording(recorder EventRecorder, sourceName string) watch.Interface {
return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
event.Source = sourceName
for {
_, err := recorder.CreateEvent(event)
if err == nil {
glog.Errorf("Sleeping: Unable to write event: %v", err)
time.Sleep(10 * time.Second)

// 这个函数的参数是一个函数,它将被传递一个指向api.Event类型的指针。这允许您在接收事件时执行自定义逻辑
// 这个函数方便测试
func GetEvents(f func(*api.Event)) watch.Interface {
w := events.Watch()
go func() {
defer util.HandleCrash()
for {
watchEvent, open := <-w.ResultChan()
if !open {
event, ok := watchEvent.Object.(*api.Event)
if !ok {
// This is all local, so there's no reason this should
// ever happen.
return w

生产者 为 mux , 事件会被 distribte 进 result channel ,然后 watch 消费

// distribute sends event to all watchers. Blocking.
func (m *Mux) distribute(event Event) {
defer m.lock.Unlock()
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:

// Watch adds a new watcher to the list and returns an Interface for it.
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events.
func (m *Mux) Watch() Interface {
defer m.lock.Unlock()
id := m.nextWatcher
w := &muxWatcher{
result: make(chan Event),
stopped: make(chan struct{}),
id: id,
m: m,
m.watchers[id] = w
return w