在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。
lessor 作用是用于控制某个键值的有效期,watch 作用是监控某个 key 值(或者某段范围)的数据变动。
数据变动主要有两种情况,一是客户端调用API 对 key 进行修改,二是 key 过期,本章主要介绍的是 lessor 与 watch 直接的调用关系。
主要文件
/mvcc/kv_view.go 读写事务的实现
/mvcc/watchable_store.go 基于 mvcc.store 实现 watch 版本的 KV 存储实现
/mvcc/watchable_store_txn.go 实现事务提交后 End() 函数的处理
/lease/lease.go 定义 lessor 相关的数据结构和方法
流程

1.在 etcdserver 中, server 实例化了 watchableStore 的 kv 存储 和 lessor。
2.这里有个点需要说明的时候,watchableStore 重新实现了读事务的部分方法,如 End, End 函数会在一个写事务结束时被调用。
3.lessor 在撤销某个键值时,会开启写事务,调用End(), 同时删除相关 key 的数据。
4.End() 会调用 notify 方法,并最终通知的 client
源码说明
//etcdserver/server.go //Server 实例化相关实例 func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { .... srv.lessor = lease.NewLessor( srv.getLogger(), srv.be, lease.LessorConfig{ MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())), CheckpointInterval: cfg.LeaseCheckpointInterval, ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(), }) ... //创建 KV 存储控制(watchableStore) srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) kvindex := srv.consistIndex.ConsistentIndex() ... }
讯享网
讯享网 // /mvcc/watchable_store.go func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore { if lg == nil { lg = zap.NewNop() } s := &watchableStore{ ... } s.store.ReadView = &readView{s} //读事务(相关代码参考 kv_view) s.store.WriteView = &writeView{s} //写事务(相关代码参考 kv_view) if s.le != nil { //这个是设置 lease 被 Revoke 时的回调函数。 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) }) } ... return s }
//写事务: // /mvcc/kv_view.go func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) { tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.DeleteRange(key, end) } func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) { tw := wv.kv.Write(traceutil.TODO()) defer tw.End() return tw.Put(key, value, lease) }
讯享网//watchableStore 写事务 End 实现 // /mvcc/watchable_store_txn.go func (tw *watchableStoreTxnWrite) End() { ... tw.s.mu.Lock() tw.s.notify(rev, evs) //通知 tw.TxnWrite.End() tw.s.mu.Unlock() }
// /lease/lease.go //设置 Revoke 之后调用的实例 func (le *lessor) SetRangeDeleter(rd RangeDeleter) { le.mu.Lock() defer le.mu.Unlock() le.rd = rd } func (le *lessor) Revoke(id LeaseID) error { ... if le.rd == nil { return nil } txn := le.rd() keys := l.Keys() //获取 lease 绑定的所有 key sort.StringSlice(keys).Sort() for _, key := range keys { //删除键值 txn.DeleteRange([]byte(key), nil) } le.mu.Lock() defer le.mu.Unlock() delete(le.leaseMap, l.ID) le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) //删除 lease txn.End() //提交事务 leaseRevoked.Inc() return nil }
总结
key 的变动通知主要是 watchableStore 重新实现的写事务的 End() 函数,它会调用 notify 通知变化,最后发送给客户端。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/121474.html