基于kubernetes1.2一简介pvcontroller是kcm的组件之一,它负责处理集群中的pvc/pv对象,对pvc/pv对象进行状态转换。二pvController初始化初始化代码在pkg/controller/volume/persistentvolume/pv_controller_base.go文件中,NewController主要做了如下几件事情初始化eventRecorder初始化PersistentVolumeController对象,调用VolumePluginMgr.InitPlugins()方法初始化存储插件,代码存在于pkg/volume/plugins.go文件中开始创建informer监听集群内的资源,初始化了如下informerPersistentVolumeInformerPersistentVolumeClaimInformerStorageClassInformerPodInformerNodeInformer将PVPVC的event分别放入volumeQueueclaimQueue为了不每次都迭代pods,自定义一个通过pvc键索引pod的索引器初始化inte存储-csi迁移相关功能的managerNewController代码在cmd/kube-controller-manager代码里面被调用,初始化成功之后紧接着调用goRun()方法运行pvController三开始运行
//开始运行pvControllerfunc(ctrl*PersistentVolumeController)Run(stopCh-chanstruct{}){deferutilruntime.HandleCrash()deferctrl.claimQueue.ShutDown()deferctrl.volumeQueue.ShutDown()klog.Infof("Startingpersistentvolumecontroller")deferklog.Infof("Shuttingdownpersistentvolumecontroller")if!cache.WaitForNamedCacheSync("persistentvolume",stopCh,ctrl.volumeListerSynced,ctrl.claimListerSynced,ctrl.classListerSynced,ctrl.podListerSynced,ctrl.NodeListerSynced){turn}ctrl.initializeCaches(ctrl.volumeLister,ctrl.claimLister)gowait.Until(ctrl.sync,ctrl.syncPeriod,stopCh)gowait.Until(ctrl.volumeWorker,time.Second,stopCh)gowait.Until(ctrl.claimWorker,time.Second,stopCh)metrics.Register(ctrl.volumes.sto,ctrl.claims,ctrl.volumePluginMgr)-stopCh}
同步缓存之后开始周期性执行ctrl.sync,ctrl.volumeWorker,ctrl.claimWorker,我们看下initalizeCaches方法func(ctrl*PersistentVolumeController)initializeCaches(volumeListercolisters.PersistentVolumeLister,claimListercolisters.PersistentVolumeClaimLister){//这里不访问apiserver,是从本地缓存拿出的对象,这些对象不可以被外部函数修改volumeList,err:=volumeLister.List(labels.Everything())iferr!=nil{klog.Errorf("PersistentVolumeControllercantinitializecaches:%v",err)turn}for_,volume:=rangevolumeList{//我们不能改变volume对象,所以这里我们copy一份新对象,对新对象进行操作volumeClone:=volume.DeepCopy()if_,err=ctrl.stoVolumeUpdate(volumeClone);err!=nil{klog.Errorf("errorupdatingvolumecache:%v",err)}}claimList,err:=claimLister.List(labels.Everything())iferr!=nil{klog.Errorf("PersistentVolumeControllercantinitializecaches:%v",err)turn}for_,claim:=rangeclaimList{if_,err=ctrl.stoClaimUpdate(claim.DeepCopy());err!=nil{klog.Errorf("errorupdatingclaimcache:%v",err)}}klog.V().Infof("controllerinitialized")}typepersistentVolumeOrdedIndexstruct{stocache.Indexer}
该方法将cache.listener里面的缓存转存在persistentVolumeOrdedIndex中,它是按AccessModes索引并按存储容量排序的persistentVolume的缓存。1syncfunc(ctrl*PersistentVolumeController)sync(){klog.V().Infof("syncingPVcontroller")pvcs,err:=ctrl.claimLister.List(labels.NewSelector())iferr!=nil{klog.Warningf("cannotlistclaims:%s",err)turn}for_,pvc:=rangepvcs{ctrl.enqueueWork(ctrl.claimQueue,pvc)}pvs,err:=ctrl.volumeLister.List(labels.NewSelector())iferr!=nil{klog.Warningf("cannotlistpersistentvolumes:%s",err)turn}for_,pv:=rangepvs{ctrl.enqueueWork(ctrl.volumeQueue,pv)}}
这里将集群内所有的pvc/pv统一都放到对应的claimQueuevolumeQueue里面重新处理。这个syncPeriod等于一个randomtime.Duration*config.time(在kcm启动时设置)。2volumeWorker一个无限循环,不断的处理从volumeQueue里面获取到的PersistentVolumeworkFunc:=func()bool{keyObj,quit:=ctrl.volumeQueue.Get()ifquit{turntrue}deferctrl.volumeQueue.Done(keyObj)key:=keyObj.(string)klog.V().Infof("volumeWorker[%s]",key)_,name,err:=cache.SplitMetaNamespaceKey(key)iferr!=nil{klog.V().Infof("errorgettingnameofvolume%qtogetvolumefrominformer:%v",key,err)turnfalse}volume,err:=ctrl.volumeLister.Get(name)iferr==nil{//Thevolumestillexistsininformercache,theeventmusthave//beenadd/update/syncctrl.updateVolume(volume)turnfalse}if!errors.IsNotFound(err){klog.V(2).Infof("errorgettingvolume%qfrominformer:%v",key,err)turnfalse}//Thevolumeisnotininformercache,theeventmusthavebeen//"delete"volumeObj,found,err:=ctrl.volumes.sto.GetByKey(key)iferr!=nil{klog.V(2).Infof("errorgettingvolume%qfromcache:%v",key,err)turnfalse}if!found{//Thecontrollerhasaladyprocessedthedeleteeventand//deletedthevolumefromitscacheklog.V(2).Infof("deletionofvolume%qwasaladyprocessed",key)turnfalse}volume,ok:=volumeObj.(*v1.PersistentVolume)if!ok{klog.Errorf("expectedvolume,got%+v",volumeObj)turnfalse}ctrl.deleteVolume(volume)turnfalse}
我们主要