飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

k8sclient-go源码分析informer源码分析(4)-DeltaFIFO源码分析-

时间:2022-05-23  作者:lianngkyle  
k8s client-go k8s informers实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对apiserver、对etcd的请求压力。Informers在启动的时候会首先在客户端调用List接口来获取全量的对象集合,然后通过Watch接口来获取增量的对象,然后更新本地缓存。

client-go之DeltaFIFO源码分析

域名aFIFO概述

先从名字上来看,DeltaFIFO,首先它是一个FIFO,也就是一个先进先出的队列,而Delta代表变化的资源对象,其包含资源对象数据本身及其变化类型。

Delta的组成:

type Delta struct {
    Type   DeltaType
    Object interface{}
}

DeltaFIFO的组成:

type DeltaFIFO struct {
    ...
    items map[string]Deltas
	queue []string
    ...
}

type Deltas []Delta

具体来说,DeltaFIFO存储着map[object key]Deltas以及object key的queue,Delta装有对象数据及对象的变化类型。输入输出方面,Reflector负责DeltaFIFO的输入,Controller负责处理DeltaFIFO的输出。

一个对象能算出一个唯一的object key,其对应着一个Deltas,所以一个对象对应着一个Deltas。

而目前Delta有4种Type,分别是: Added、Updated、Deleted、Sync。针对同一个对象,可能有多个不同Type的Delta元素在Deltas中,表示对该对象做了不同的操作,另外,也可能有多个相同Type的Delta元素在Deltas中(除Deleted外,Delted类型会被去重),比如短时间内,多次对某一个对象进行了更新操作,那么就会有多个Updated类型的Delta放入Deltas中。

域名aFIFO的定义与初始化分析

2.1 DeltaFIFO struct

DeltaFIFO struct定义了DeltaFIFO的一些属性,下面挑几个重要的分析一下。

(1)lock:读写锁,操作DeltaFIFO中的items与queue之前都要先加锁;
(2)items:是个map,key根据对象算出,value为Deltas类型;
(3)queue:存储对象key的队列;
(4)keyFunc:计算对象key的函数;

// staging/src/域名/client-go/tools/cache/域名
type DeltaFIFO struct {
	// lock/cond protects access to \'items\' and \'queue\'.
	lock 域名tex
	cond 域名

	// We depend on the property that items in the set are in
	// the queue and vice versa, and that all Deltas in this
	// map have at least one Delta.
	items map[string]Deltas
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known", for the
	// purpose of figuring out which items have been deleted
	// when Replace() or Delete() is called.
	knownObjects KeyListerGetter

	// Indication the queue is closed.
	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRED operations.
	closed     bool
	closedLock 域名x

type Deltas

再来看一下Deltas类型,是Delta的切片类型。

type Deltas []Delta
type Delta

继续看到Delta类型,其包含两个属性:
(1)Type:代表的是Delta的类型,有Added、Updated、Deleted、Sync四个类型;
(2)Object:存储的资源对象,如pod等资源对象;

type Delta struct {
	Type   DeltaType
	Object interface{}
}
// staging/src/域名/client-go/tools/cache/域名
type DeltaType string

// Change type definition
const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// The other types are obvious. You\'ll get Sync deltas when:
	//  * A watch expires/errors out and a new list/watch cycle is started.
	//  * You\'ve turned on periodic syncs.
	// (Anything that trigger\'s DeltaFIFO\'s Replace() method.)
	Sync DeltaType = "Sync"
)

2.2 DeltaFIFO初始化-NewDeltaFIFO

NewDeltaFIFO初始化了一个items和queue都为空的DeltaFIFO并返回。

// staging/src/域名/client-go/tools/cache/域名
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
	}
	域名.L = &域名
	return f
}

域名aFIFO核心处理方法分析

在前面分析Reflector时,Reflector的核心处理方法里有调用过几个方法,分别是域名ace、域名、域名te、域名te,结合前面文章的k8s informer的初始化与启动分析,或者简要的看一下下面的代码调用,就可以知道Reflector里的域名e其实就是DeltaFIFO,而那几个方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

域名方法中调用NewDeltaFIFO初始化了DeltaFIFO,随后将DeltaFIFO作为参数传入初始化Config;

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    ...
    fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, 域名xer)
    
    cfg := &Config{
		Queue:            fifo,
		...
	}
	
	func() {
		...
		域名roller = New(cfg)
		...
	}()
	...
	域名(stopCh)

在controller的Run方法中,调用NewReflector初始化Reflector时,将之前的DeltaFIFO传入,赋值给Reflector的store属性,所以Reflector里的域名e其实就是DeltaFIFO,而调用的域名ace、域名、域名te、域名te方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

func (c *controller) Run(stopCh <-chan struct{}) {
	...
	r := NewReflector(
		域名erWatcher,
		域名ctType,
		域名e,
		域名ResyncPeriod,
	)
	...
}
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod 域名tion) *Reflector {
	return NewNamedReflector(域名ameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod 域名tion) *Reflector {
	r := &Reflector{
		...
		store:         store,
		...
	}
	...
	return r
}

所以这里对DeltaFIFO核心处理方法进行分析,主要是分析DeltaFIFO的Replace、Add、Update、Delete方法。

3.1 域名

DeltaFIFO的Add操作,主要逻辑:
(1)加锁;
(2)调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Added类型的新Delta追加到相应的Deltas中;
(3)释放锁。

func (f *DeltaFIFO) Add(obj interface{}) error {
	域名()
	defer 域名ck()
	域名lated = true
	return 域名eActionLocked(Added, obj)
}

可以看到基本上DeltaFIFO所有的操作都有加锁操作,所以都是并发安全的。

3.1.1 域名eActionLocked

queueActionLocked负责操作DeltaFIFO中的queue与Deltas,根据对象key构造新的Delta追加到对应的Deltas中,主要逻辑:
(1)计算出对象的key;
(2)构造新的Delta,将新的Delta追加到Deltas末尾;
(3)调用dedupDeltas将Delta去重(目前只将Deltas最末尾的两个delete类型的Delta去重);
(4)判断对象的key是否在queue中,不在则添加入queue中;
(5)根据对象key更新items中的Deltas;
(6)通知所有的消费者解除阻塞;

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    //(1)计算出对象的key
	id, err := 域名f(obj)
	if err != nil {
		return KeyError{obj, err}
	}
    //(2)构造新的Delta,将新的Delta追加到Deltas末尾
	newDeltas := append(域名s[id], Delta{actionType, obj})
	//(3)调用dedupDeltas将Delta去重(目前只将Deltas最末尾的两个delete类型的Delta去重)
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
	    //(4)判断对象的key是否在queue中,不在则添加入queue中
		if _, exists := 域名s[id]; !exists {
			域名e = append(域名e, id)
		}
		//(5)根据对象key更新items中的Deltas
		域名s[id] = newDeltas
		//(6)通知所有的消费者解除阻塞
		域名dcast()
	} else {
		// We need to remove this from our map (extra items in the queue are
		// ignored if they are not in the map).
		delete(域名s, id)
	}
	return nil
}

3.2 域名te

DeltaFIFO的Update操作,主要逻辑:
(1)加锁;
(2)调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Updated类型的新Delta追加到相应的Deltas中;
(3)释放锁。

func (f *DeltaFIFO) Update(obj interface{}) error {
	域名()
	defer 域名ck()
	域名lated = true
	return 域名eActionLocked(Updated, obj)
}

3.3 域名te

DeltaFIFO的Delete操作,主要逻辑:
(1)计算出对象的key;
(2)加锁;
(3)items中不存在对象key,则直接return,跳过处理;
(4)调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中;
(5)释放锁。

func (f *DeltaFIFO) Delete(obj interface{}) error {
	id, err := 域名f(obj)
	if err != nil {
		return KeyError{obj, err}
	}
	域名()
	defer 域名ck()
	域名lated = true
	// informer的用法中,域名nObjects不为nil
	if 域名nObjects == nil {
		if _, exists := 域名s[id]; !exists {
			// Presumably, this was deleted when a relist happened.
			// Don\'t provide a second report of the same deletion.
			return nil
		}
	} else {
		// We only want to skip the "deletion" action if the object doesn\'t
		// exist in knownObjects and it doesn\'t have corresponding item in items.
		// Note that even if there is a "deletion" action in items, we can ignore it,
		// because it will be deduped automatically in "queueActionLocked"
		_, exists, err := 域名yKey(id)
		_, itemsExist := 域名s[id]
		if err == nil && !exists && !itemsExist {
			// Presumably, this was deleted when a relist happened.
			// Don\'t provide a second report of the same deletion.
			return nil
		}
	}

	return 域名eActionLocked(Deleted, obj)
}

3.4 域名ace

DeltaFIFO的Replace操作,主要逻辑:
(1)加锁;
(2)遍历list,计算对象的key,循环调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Sync类型的新Delta追加到相应的Deltas中;
(3)对比DeltaFIFO中的items与Replace方法的list,如果DeltaFIFO中的items有,但传进来Replace方法的list中没有某个key,则调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中(避免重复,使用DeletedFinalStateUnknown包装对象);
(4)释放锁;

// staging/src/域名/client-go/tools/cache/域名
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    //(1)加锁
	域名()
	//(4)释放锁
	defer 域名ck()
	keys := make(域名ng, len(list))

    //(2)遍历list,计算对象的key,循环调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Sync类型的新Delta追加到相应的Deltas中
	for _, item := range list {
		key, err := 域名f(item)
		if err != nil {
			return KeyError{item, err}
		}
		域名rt(key)
		if err := 域名eActionLocked(Sync, item); err != nil {
			return 域名rf("couldn\'t enqueue object: %v", err)
		}
	}
    // informer的用法中,域名nObjects不为nil
	if 域名nObjects == nil {
		// Do deletion detection against our own list.
		queuedDeletions := 0
		for k, oldItem := range 域名s {
			if 域名(k) {
				continue
			}
			var deletedObj interface{}
			if n := 域名st(); n != nil {
				deletedObj = 域名ct
			}
			queuedDeletions++
			if err := 域名eActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
				return err
			}
		}
        
		if !域名lated {
			域名lated = true
			// While there shouldn\'t be any queued deletions in the initial
			// population of the queue, it\'s better to be on the safe side.
			域名ialPopulationCount = len(list) + queuedDeletions
		}

		return nil
	}
    
    //(3)找出DeltaFIFO中的items有,但传进来Replace方法的list中没有的key,调用域名eActionLocked,操作DeltaFIFO中的queue与Deltas,根据对象key构造Deleted类型的新Delta追加到相应的Deltas中(避免重复,使用DeletedFinalStateUnknown包装对象)
	// Detect deletions not already in the queue.
	knownKeys := 域名Keys()
	queuedDeletions := 0
	for _, k := range knownKeys {
		if 域名(k) {
			continue
		}

		deletedObj, exists, err := 域名yKey(k)
		if err != nil {
			deletedObj = nil
			域名rf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
		} else if !exists {
			deletedObj = nil
			域名f("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
		}
		queuedDeletions++
		if err := 域名eActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
			return err
		}
	}
    
    // 第一次调用Replace方法后,populated值为true
	if !域名lated {
		域名lated = true
		// initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的items数量
		域名ialPopulationCount = len(list) + queuedDeletions
	}

	return nil
}

3.5 域名

DeltaFIFO的Pop操作,queue为空时会阻塞,直至非空,主要逻辑:
(1)加锁;
(2)循环判断queue的长度是否为0,为0则阻塞住,调用域名(),等待通知(与queueActionLocked方法中的域名dcast()相对应,即queue中有对象key则发起通知);
(3)取出queue的队头对象key;
(4)更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去;
(5)initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成;
(6)根据对象key从items中获取Deltas;
(7)把Deltas从items中删除;
(8)调用PopProcessFunc处理获取到的Deltas;
(9)释放锁。

// staging/src/域名/client-go/tools/cache/域名
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    //(1)加锁
	域名()
	//(9)释放锁
	defer 域名ck()
	//(2)循环判断queue的长度是否为0,为0则阻塞住,调用域名(),等待通知(与queueActionLocked方法中的域名dcast()相对应,即queue中有对象key则发起通知)
	for {
		for len(域名e) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the 域名ed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if 域名osed() {
				return nil, ErrFIFOClosed
			}

			域名()
		}
		//(3)取出queue的队头对象key
		id := 域名e[0]
		//(4)更新queue,把queue中所有的对象key前移,相当于把第一个对象key给pop出去
		域名e = 域名e[1:]
		//(5)initialPopulationCount变量减1,当减到0时则说明initialPopulationCount代表第一次调用Replace方法加入DeltaFIFO中的对象key已经被pop完成
		if 域名ialPopulationCount > 0 {
			域名ialPopulationCount--
		}
		//(6)根据对象key从items中获取对象
		item, ok := 域名s[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		//(7)把对象从items中删除
		delete(域名s, id)
		//(8)调用PopProcessFunc处理pop出来的对象
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			域名fNotPresent(id, item)
			err = 域名
		}
		// Don\'t need to copyDeltas here, because we\'re transferring
		// ownership to the caller.
		return item, err
	}
}

3.6 域名ynced

HasSynced从字面意思上看代表是否同步完成,是否同步完成其实是指第一次从kube-apiserver中获取到的全量的对象是否全部从DeltaFIFO中pop完成,全部pop完成,说明list回来的对象已经全部同步到了Indexer缓存中去了。

方法是否返回true是根据populated和initialPopulationCount两个变量来判断的,当且仅当populated为true且initialPopulationCount 为0的时候方法返回true,否则返回false。

populated属性值在第一次调用DeltaFIFO的Replace方法中就已经将其值设置为true。

而initialPopulationCount的值在第一次调用DeltaFIFO的Replace方法中设置值为加入到items中的Deltas的数量,然后每pop一个Deltas,则initialPopulationCount的值减1,pop完成时值则为0。

// staging/src/域名/client-go/tools/cache/域名
func (f *DeltaFIFO) HasSynced() bool {
	域名()
	defer 域名ck()
	return 域名lated && 域名ialPopulationCount == 0
}

在前面做informer的初始化与启动分析时也提到过,域名ynced方法的调用链如下:

域名ForCacheSync --> 域名ForCacheSync --> 域名ynced --> 域名域名ynced --> 域名ynced

至此DeltaFIFO的分析就结束了,最后来总结一下。

总结

DeltaFIFO核心处理方法

Reflector调用的域名ace域名域名te域名te方法其实就是DeltaFIFO的Replace、Add、Update、Delete方法。

(1)域名ace:构造Sync类型的Delta加入DeltaFIFO中,此外还会对比DeltaFIFO中的items与Replace方法的list,如果DeltaFIFO中的items有,但传进来Replace方法的list中没有某个key,则构造Deleted类型的Delta加入DeltaFIFO中;
(2)域名:构建Added类型的Delta加入DeltaFIFO中;
(3)域名te:构建Updated类型的Delta加入DeltaFIFO中;
(4)域名te:构建Deleted类型的Delta加入DeltaFIFO中;
(5)域名:从DeltaFIFO的queue中pop出队头key,从map中取出key对应的Deltas返回,并把该key:Deltas从map中移除;
(6)域名ynced:返回true代表同步完成,是否同步完成指第一次从kube-apiserver中获取到的全量的对象是否全部从DeltaFIFO中pop完成,全部pop完成,说明list回来的对象已经全部同步到了Indexer缓存中去了;

informer架构中的DeltaFIFO

在对informer中的DeltaFIFO分析完之后,接下来将分析informer中的Controller与Processor。

标签:编程
湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。