65.proxy-02
loadbalancer
// LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service and source address.
NextEndpoint(service string, srcAddr net.Addr) (string, error)
}
我们来看下具体的轮训 loadbalance 实现:
// LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct {
lock sync.RWMutex
endpointsMap map[string][]string
rrIndex map[string]int
}
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) {
lb.lock.RLock()
endpoints, exists := lb.endpointsMap[service]
index := lb.rrIndex[service]
lb.lock.RUnlock()
if !exists {
return "", ErrMissingServiceEntry
}
if len(endpoints) == 0 {
return "", ErrMissingEndpoints
}
endpoint := endpoints[index]
lb.lock.Lock()
lb.rrIndex[service] = (index + 1) % len(endpoints)
lb.lock.Unlock()
return endpoint, nil
}
loadbalancer 也有一个 update 用来监听 endpoint 的变化,本质就是维护 service 到 endpoints 的映射关系。
// OnUpdate manages the registered service endpoints.
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
registeredEndpoints := make(map[string]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
// Update endpoints for services.
for _, endpoint := range endpoints {
existingEndpoints, exists := lb.endpointsMap[endpoint.ID]
validEndpoints := filterValidEndpoints(endpoint.Endpoints)
if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.ID, endpoint.Endpoints)
lb.endpointsMap[endpoint.ID] = validEndpoints
// Reset the round-robin index.
lb.rrIndex[endpoint.ID] = 0
}
registeredEndpoints[endpoint.ID] = true
}
// Remove endpoints missing from the update.
for k, v := range lb.endpointsMap {
if _, exists := registeredEndpoints[k]; !exists {
glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v)
delete(lb.endpointsMap, k)
}
}
}
proxier
// Abstraction over TCP/UDP sockets which are proxied.
type proxySocket interface {
// Addr gets the net.Addr for a proxySocket.
Addr() net.Addr
// Close stops the proxySocket from accepting incoming connections. Each implementation should comment
// on the impact of calling Close while sessions are active.
Close() error
// ProxyLoop proxies incoming connections for the specified service to the service endpoints.
ProxyLoop(service string, proxier *Proxier)
}
目前 tcpProxySocket 和 udpProxySocket 实现了这个 interface ,这里详解下 tcp 的 proxyLoop 方法:
func (tcp *tcpProxySocket) ProxyLoop(service string, proxier *Proxier) {
info, found := proxier.getServiceInfo(service)
if !found {
glog.Errorf("Failed to find service: %s", service)
return
}
for {
if !info.isActive() {
break
}
// Block until a connection is made.
inConn, err := tcp.Accept()
if err != nil {
glog.Errorf("Accept failed: %v", err)
continue
}
glog.V(2).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
endpoint, err := proxier.loadBalancer.NextEndpoint(service, inConn.RemoteAddr())
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s %v", service, err)
inConn.Close()
continue
}
glog.V(3).Infof("Mapped service %s to endpoint %s", service, endpoint)
// TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic.
outConn, err := net.DialTimeout("tcp", endpoint, endpointDialTimeout)
if err != nil {
// TODO: Try another endpoint?
glog.Errorf("Dial failed: %v", err)
inConn.Close()
continue
}
// Spin up an async copy loop.
go proxyTCP(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
- 获取服务信息 : 通过
proxier.getServiceInfo
从代理中获取特定服务的信息。如果找不到服务,则记录错误并返回。 - 监听连接 : 这个方法通过一个无限循环来监听连接,其中每个迭代都会尝试接受新的TCP连接。
- 检查服务活动状态 : 如果服务不再活动,则退出循环。
- 接受连接 : 使用
tcp.Accept()
来接受新的TCP连接。如果出现错误,则记录错误并继续下一个迭代。 - 获取下一个端点 : 调用
proxier.loadBalancer.NextEndpoint
来确定要将连接映射到的端点。如果找不到端点,则关闭连接并继续下一个迭代。 - 连接到端点 : 使用
net.DialTimeout
连接到选择的端点。这里还有一个待办事项(TODO),表示可能会在新的goroutine中执行此操作,并尝试连接到另一个端点(如果连接失败)。 - 代理TCP连接 : 最后,调用
proxyTCP
异步复制输入和输出连接之间的数据。