跳到主要内容

33.cacheMinion

PR #483

Add a caching minion registry. [增加了缓存,不用每次调用 cloudprovider]

PR #486

Make RESTful operations return 404 Not Found when the target resource does not exist. [如果资源没找到,返回404]

PR #452

Initial framework for external volumes [抽象 volume 接口]

// All volume types are expected to implement this interface
type Interface interface {
// Prepares and mounts/unpacks the volume to a directory path.
// This procedure must be idempotent.
SetUp()
// Returns the directory path the volume is mounted to.
GetPath() string
// Unmounts the volume and removes traces of the SetUp procedure.
// This procedure must be idempotent.
TearDown()
}

PR #522 525

Added HasAll utility method for string set.

// HasAll returns true iff all items are contained in the set.
func (s StringSet) HasAll(items ...string) bool {
for _, item := range items {
if !s.Has(item) {
return false
}
}
return true
}

// IsSuperset returns true iff s1 is a superset of s2.
func (s1 StringSet) IsSuperset(s2 StringSet) bool {
for item, _ := range s2 {
if !s1.Has(item) {
return false
}
}
return true
}

PR #524

Make the service reconciller use the API, not a PodRegistry

重大突破,组件开始从apiserver 获取数据,而不是直接从etcd

PR #493 #505

Implemented via HTTP and websocket Add client watch capability

493 实现了 websocket 和 chunked

// ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface {
WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error)
}
505 实现了 watch 客户端
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
source Decoder
result chan Event
sync.Mutex
stopped bool
}

// NewStreamWatcher creates a StreamWatcher from the given decoder.
func NewStreamWatcher(d Decoder) *StreamWatcher {
sw := &StreamWatcher{
source: d,
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan Event),
}
go sw.receive()
return sw
}
提示

拓展一: 在HTTP 1.1中,Transfer-Encoding是一个可选的消息头,用于指定消息主体的编码变换。"chunked"是一种Transfer-Encoding的方式,允许消息主体以分块形式发送。

在Go语言中,我们可以通过http包来实现一个简单的chunked传输编码的服务器。以下是一个基本的示例,它使用chunked编码方式向客户端发送数据块

package main

import (
"fmt"
"net/http"
"time"
)

func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}

w.Header().Set("Transfer-Encoding", "chunked")

for i := 1; i <= 10; i++ {
fmt.Fprintf(w, "Chunk #%d\n", i)
flusher.Flush() // 发送当前缓冲的数据给客户端
time.Sleep(time.Second)
}
})

http.ListenAndServe(":8080", nil)
}

在这个代码中,我们使用了一个 http.Flusher 对象,它可以将数据立即发送到客户端,而不是等待整个响应完成。这样,我们就可以在每次写入数据并调用 Flush() 方法后看到数据,即使响应还没有完成。

你可以通过 curl 命令查看这个服务器的输出,就像这样:

curl http://localhost:8080 这样,你应该能看到每个数据块是如何在每秒钟被发送出去的。

拓展二: 在Go语言中,如果你要处理的JSON文件或数据流非常大,或者你需要处理数据流,那么你可能需要使用流式解码(Streaming Decode)。流式解码能够让你一次处理一个JSON对象,而不需要一次加载整个JSON数据。

你可以通过 json.Decoder 类型来实现流式解码。以下是一个基本的例子:

package main

import (
"encoding/json"
"fmt"
"strings"
)

type Person struct {
Name string `json:"name"`
Age int `json:"age"`
}

func main() {
const jsonStream = `
{"name": "Ed", "age": 24}
{"name": "Sam", "age": 30}
{"name": "Jack", "age": 28}
`
decoder := json.NewDecoder(strings.NewReader(jsonStream))

for {
var p Person
if err := decoder.Decode(&p); err != nil {
fmt.Println("Error:", err)
return
}
fmt.Printf("Person: %+v\n", p)
}
}

在上面的代码中,我们创建了一个新的 json.Decoder 对象,然后在循环中不断调用 Decode() 方法来读取并解码每个JSON对象。一旦 Decode() 方法返回错误(例如,读到数据流的结尾),我们就退出循环。

这种方法的优点是内存占用更小,并且可以处理无法一次性加载到内存中的大型JSON数据或数据流。

这是一个简单的Go客户端代码,该代码会连接到服务器并持续接收并解码JSON数据流:

package main

import (
"encoding/json"
"fmt"
"net"
)

type Person struct {
Name string `json:"name"`
Age int `json:"age"`
}

func main() {
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
fmt.Println("Error connecting to the server:", err)
return
}
defer conn.Close()

decoder := json.NewDecoder(conn)

for {
var p Person
if err := decoder.Decode(&p); err != nil {
fmt.Println("Error decoding JSON:", err)
break
}

fmt.Printf("Received: %+v\n", p)
}
}

在这个例子中,我们首先使用net.Dial连接到服务器(这里是本地的8080端口)。然后,我们创建一个新的json.Decoder,并将其连接到服务器的连接。然后,我们进入一个循环,在这个循环中,我们调用Decode()来读取和解码每个接收到的JSON对象。一旦我们遇到任何错误(例如,连接关闭或无法解码数据),我们就退出循环。

注意,你需要将上述代码中的localhost:8080替换为你的服务器地址和端口。

这种方式适用于服务器以流的方式发送JSON数据,每个JSON对象后面跟着一个换行符