Kubernetes 为什么选择ETCD做存储?

后端 Kubernetes 源码阅读

为什么用 ETCD 作为存储

这是一个技术选型的问题,首先得想清楚 存储需要支持什么样的功能?

Kubernetes 的功能出发,容器的编排和状态的管理都是需要大量的事件监听和消费,同时由于各个组件的抽象,需要大量的通信。

Kubernetes 要维护整体的集群状态一致,不能够说一个地方获取到 Pod 不可用不能提供服务,另一个地方获取到的却是可以提供服务,这样对集群的整体状态维护是不可接受的。

为什么不用 Mysql 和 Postgres SQL 这种关系型数据库?

因为 Kubernetes 不需要做复杂查询,它们致力于优化的方向并用不到。

在kubernetes 操作存储的内容主要有哪些?Pod的信息也只需要通过 kv 存储就可以实现

那既然 kv 存储的数据为什么Redis 不行?

Redis是一直高可用的存储,它更多保证的是最终一致性,但是在集群整体状态的一致性上它没办法做保证。

etcd 是 Kubernetes 存储有关集群状态的所有信息的地方;事实上,它是整个 Kubernetes 控制平面中唯一有状态的部分。

Kubernetes 在 Pod状态不一致会有较大的问题,如果 Pod 状态错误了之后没及时更新,这个时候就会导致请求打到不健康的 Pod 上,这个在 Kubernetes 是无法接受的。

所以综上所述 ETCD 的功能就十分契合 Kubernetes 的场景,它提供了 Watch 机制,能让我们快速监听到资源的变化情况,并且有 MVCC 多版本的管理机制,让整体的资源可以定时进行修复,降低了出现状态不一致情况的概率。

ETCD Watch 机制的实现

Watch 机制

为避免客户端不停的对key进行轮询,ETCD 提供了Watch,当客户端Watch 的key 更新时,ETCD会主动的去通知客户端。

etcd 的 watch 特性是 Kubernetes 控制器的工作基础。在 Kubernetes 中,各种各样的控制器实现了 Deployment、StatefulSet、Job 等功能强大的 Workload。控制器的核心思想是监听、比较资源实际状态与期望状态是否一致,若不一致则进行协调工作,使其最终一致,这主要依赖于 etcd 的 Watch 机制。 基于 Watch 特性,可以快速获取到感兴趣的数据变化事件 ,这也是 Kubernetes 控制器工作的核心基础。

kubernetes 通过 ResultChan 来监听key值的变化。etcd 同样提供了一套实现

创建Watch

etcd 启动时会创建多个 server ,其中一个就是 WatchServer ,用于处理 Watch 请求

每当收到一个 watch 的命令,就会创建一个 serverWatchStream ,负责接收客户端的 cancel/create 的watcher请求。

在 etcd 启动的时候,WatchableKV 模块会运行 syncWatchersLoop 和 syncVictimsLoop goroutine,分别负责不同场景下的事件推送,它们也是 Watch 特性可靠性的核心之一。

serverWatchStream 并分别启动了 sendLoop 和 recvLoop。

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
	sws := serverWatchStream{...}

	sws.wg.Add(1)
	// 初始化发送协程
	go func() {
		sws.sendLoop()
		sws.wg.Done()
	}()

	// 初始化接收请求的协程
	go func() {
		if rerr := sws.recvLoop(); rerr != nil {}
	}()
}

这里 sws.wg 的作用是为了 close 的时候等待 sendLoop 正常结束后才关闭。

recvLoop

recvLoop 负责接收 client 的请求。 接收 client 的 create/cancel watcher 消息,并调用 watchStream 的对应方法 create/cancel watch

当 serverWatchStream 收到 create watcher 请求后,调用 Watch 方法。

Watch 方法实现流程是通过 watchableStore 实例化出来 watcher 后将其放回到 watchStream 里面,方便后续进行数据推送

// 创建一个监听,返回对应的监听 id,监听id是个int64类型
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
	// 获取 实例化监听者
	w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

	ws.cancels[id] = c
	ws.watchers[id] = w
	return id, nil
}

func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
	// 实例化对应的监听者
	wa := &watcher{
		key:    key,
		end:    end,
		minRev: startRev,
		id:     id,
		ch:     ch,
		fcs:    fcs,
	}
	return wa, func() { s.cancelWatcher(wa) }
}

sendLoop

主要接收两种消息:sendLoop 主要负责把从 MVCC 模块接收的 Watch 事件转发给 client。

一个大的 for 循环不断从 watchStream Response chan 中获取 event,并转发给 client。

watchableStore

type watchableStore struct {
	*store

	// 被阻塞在 watch channel 中的 watcherBatch
	victims []watcherBatch
	victimc chan struct{}

	// 未同步的 watchers
	unsynced watcherGroup

	// 已同步的 watchers
	synced watcherGroup
}

数据修改后如何推送给 Client

在 put 事务结束时,会调用End方法,它会将 KeyValue 转换成 Event 事件。

func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
	tw := wv.kv.Write(traceutil.TODO())
	defer tw.End()
	return tw.Put(key, value, lease)
}

接着回调 watchableStore.notify 函数,将事件写入到 ch 中来传播出去

func (tw *watchableStoreTxnWrite) End() {
	//转成evs 时间后进行通知	
	tw.s.notify(rev, evs)
}

最后通过 syncWatchersLoop 方法来将未同步的数据同步给 watcher

syncWatchers

syncWatchers 主要处理流程如下:

  1. syncWatcher 从 unsynced 筛选出需要同步的任务
  2. 从 BoltDB 中取当前版本范围内的数据变更并将它们转换成事件
  3. watcherGroup 在打包之后会通过 send 方法发送到每一个 watcher 对应的 Channel 中

syncWatchersLoop 和 syncVictimsLoop 如何保证数据不丢失?

synced 的channel 是 1024 的长度,如果 channel 满了之后会将事件加入到 victims

然后不断地去尝试同步,如果synced队列没满,则重新加入到 unsynced chan中

最后 unsynced 根据revision 将历史数据同步完后加入到 synced 中。

整体流程如下图:

蓝色部分是初始化的时候 ETCD 需要维护的信息,用来接收 ETCD 的请求和启动一个循环来发送变更。

红色部分则是接收到变更的时候,将数据变更写入到 watchStream chan 中,让 sendLoop 能够读取到数据。

是否可以替代ETCD?

答案当然是可以的,在k8s的代码中或多或少都会有ETCD的硬编码内容,k3s是轻量级的k8s,并不需要使用ETCD,所以在中间加入一层Kine来对ETCD的操作进行转化。

只要我们使用的存储能够通过代码兼容来满足 k8s 功能和性能,存储如何进行选取都可以。

微信扫码立即使用「源自下载」小程序

「源自下载」小程序二维码

Copyright © 2019-2024 源自下载