preemption(Preemption抢占式调度的方法是什么)

发布时间:2025-12-10 23:16:27 浏览次数:1

ScheduleAlgorithm的变化

在Kubernetes 1.8中,对ScheduleAlgorithm Interface的定义发生了改变,多了一个Preempt(...)。因此,我在博文Kubernetes Scheduler原理解析(当时是基于kubernetes 1.5)中对scheduler调度过程开的一句话概括“将PodSpec.NodeName为空的Pods逐个地,经过预选(Predicates)和优选(Priorities)两个步骤,挑选最合适的Node作为该Pod的Destination。”将不再准确了。

现在应该一句话这样描述才算准确了:“将PodSpec.NodeName为空的Pods逐个地,经过预选(Predicates)和优选(Priorities)两个步骤,挑选最合适的Node作为该Pod的Destination。如果经过预选和优选仍然没有找到合适的节点,并且启动了Pod Priority,那么该Pod将会进行Preempt抢占式调度找到最合适的节点及需要Evict的Pods。”

//ScheduleAlgorithmisaninterfaceimplementedbythingsthatknowhowtoschedulepods//ontomachines.typeScheduleAlgorithminterface{Schedule(*v1.Pod,NodeLister)(selectedMachinestring,errerror)//Preemptreceivesschedulingerrorsforapodandtriestocreateroomfor//thepodbypreemptinglowerprioritypodsifpossible.//Itreturnsthenodewherepreemptionhappened,alistofpreemptedpods,anderrorifany.Preempt(*v1.Pod,NodeLister,error)(selectedNode*v1.Node,preemptedPods[]*v1.Pod,errerror)//Predicates()returnsapointertoamapofpredicatefunctions.Thisis//exposedfortesting.Predicates()map[string]FitPredicate//Prioritizersreturnsasliceofpriorityconfig.Thisisexposedfor//testing.Prioritizers()[]PriorityConfig}

Scheduler.scheduleOne开始真正的调度逻辑,每次负责一个Pod的调度,逻辑如下:

  • 从PodQueue中获取一个Pod。

  • 执行对应Algorithm的Schedule,进行预选和优选。

  • AssumePod

  • Bind Pod, 如果Bind Failed,ForgetPod。

在1.8中,但预选和优选调度完整没有找到合适node时(其实一定会是预选没有找到nodes,优选只是挑更好的),还会调用sched.preempt进行抢占式调度。

plugin/pkg/scheduler/scheduler.go:293func(sched*Scheduler)scheduleOne(){pod:=sched.config.NextPod()ifpod.DeletionTimestamp!=nil{sched.config.Recorder.Eventf(pod,v1.EventTypeWarning,"FailedScheduling","skipscheduledeletingpod:%v/%v",pod.Namespace,pod.Name)glog.V(3).Infof("Skipscheduledeletingpod:%v/%v",pod.Namespace,pod.Name)return}glog.V(3).Infof("Attemptingtoschedulepod:%v/%v",pod.Namespace,pod.Name)//Synchronouslyattempttofindafitforthepod.start:=time.Now()suggestedHost,err:=sched.schedule(pod)metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))iferr!=nil{//schedule()mayhavefailedbecausethepodwouldnotfitonanyhost,sowetryto//preempt,withtheexpectationthatthenexttimethepodistriedforschedulingit//willfitduetothepreemption.Itisalsopossiblethatadifferentpodwillschedule//intotheresourcesthatwerepreempted,butthisisharmless.iffitError,ok:=err.(*core.FitError);ok{sched.preempt(pod,fitError)}return}//Tellthecachetoassumethatapodnowisrunningonagivennode,eventhoughithasn'tbeenboundyet.//Thisallowsustokeepschedulingwithoutwaitingonbindingtooccur.assumedPod:=*pod//assumemodifies`assumedPod`bysettingNodeName=suggestedHosterr=sched.assume(&assumedPod,suggestedHost)iferr!=nil{return}//bindthepodtoitshostasynchronously(wecandothisb/coftheassumptionstepabove).gofunc(){err:=sched.bind(&assumedPod,&v1.Binding{ObjectMeta:metav1.ObjectMeta{Namespace:assumedPod.Namespace,Name:assumedPod.Name,UID:assumedPod.UID},Target:v1.ObjectReference{Kind:"Node",Name:suggestedHost,},})metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))iferr!=nil{glog.Errorf("Internalerrorbindingpod:(%v)",err)}}()}

Scheduler.preemt

好的,关于预选和优选,我这里不做过多解读,因为整个源码逻辑和1.5是一样,不同的是1.8增加了更多的Predicate和Priority Policys及其实现。下面只看抢占式调度Preempt的代码。

plugin/pkg/scheduler/scheduler.go:191func(sched*Scheduler)preempt(preemptor*v1.Pod,scheduleErrerror)(string,error){if!utilfeature.DefaultFeatureGate.Enabled(features.PodPriority){glog.V(3).Infof("Podpriorityfeatureisnotenabled.Nopreemptionisperformed.")return"",nil}preemptor,err:=sched.config.PodPreemptor.GetUpdatedPod(preemptor)iferr!=nil{glog.Errorf("Errorgettingtheupdatedpreemptorpodobject:%v",err)return"",err}node,victims,err:=sched.config.Algorithm.Preempt(preemptor,sched.config.NodeLister,scheduleErr)iferr!=nil{glog.Errorf("Errorpreemptingvictimstomakeroomfor%v/%v.",preemptor.Namespace,preemptor.Name)return"",err}ifnode==nil{return"",err}glog.Infof("Preempting%dpod(s)onnode%vtomakeroomfor%v/%v.",len(victims),node.Name,preemptor.Namespace,preemptor.Name)annotations:=map[string]string{core.NominatedNodeAnnotationKey:node.Name}err=sched.config.PodPreemptor.UpdatePodAnnotations(preemptor,annotations)iferr!=nil{glog.Errorf("Errorinpreemptionprocess.Cannotupdatepod%vannotations:%v",preemptor.Name,err)return"",err}for_,victim:=rangevictims{iferr:=sched.config.PodPreemptor.DeletePod(victim);err!=nil{glog.Errorf("Errorpreemptingpod%v/%v:%v",victim.Namespace,victim.Name,err)return"",err}sched.config.Recorder.Eventf(victim,v1.EventTypeNormal,"Preempted","by%v/%vonnode%v",preemptor.Namespace,preemptor.Name,node.Name)}returnnode.Name,err}
  • 检查FeaturesGate中是否开启了PodPriority,如果没开启,则不会进行后续Preemption操作;

  • 由于该Pod在Predicate/Priortiy调度过程失败后,会更新PodCondition,记录调度失败状态及失败原因。因此需要从apiserver中获取PodCondition更新后的Pod Object;

  • 调用ScheduleAlgorithm.Preempt进行抢占式调度,选出**node和待preempt pods(称为victims);

  • 调用apiserver给该pod(称为Preemptor)打上Annotation:NominatedNodeName=nodeName;

  • 遍历victims,调用apiserver进行逐个删除这些pods;

注意:在scheduler调用shed.schedule(pod)进行预选和优选调度失败时,Pod Bind Node失败,该Pod会requeue unscheduled Cache podqueue中,如果在这个pod调度过程中又有新的pod加入到待调度队列,那么该pod requeue时它前面就有其他pod,下一次调度就是先调度在它前面的pod,而这些pod的调度有可能会调度到刚刚通过Preempt释放资源的Node上,导致把刚才Preemptor释放的resource消耗掉。当再次轮到上次的Preemptor调度时,可能又需要触发一次某个节点的Preempt。

genericScheduler.Preempt

ScheduleAlgorithm.Preempt是抢占式调度的关键实现,其对应的实现在genericScheduler中:

plugin/pkg/scheduler/core/generic_scheduler.go:181//preemptfindsnodeswithpodsthatcanbepreemptedtomakeroomfor"pod"to//schedule.Itchoosesoneofthenodesandpreemptsthepodsonthenodeand//returnsthenodeandthelistofpreemptedpodsifsuchanodeisfound.//TODO(bsalamat):Addpriority-basedscheduling.Moreinfo:todayoneormore//pendingpods(differentfromthepodthattriggeredthepreemption(s))may//scheduleintosomeportionoftheresourcesfreedupbythepreemption(s)//beforethepodthattriggeredthepreemption(s)hasachancetoschedule//there,therebypreventingthepodthattriggeredthepreemption(s)from//scheduling.Solutionisgivenat://https://github.com/kubernetes/community/blob/master/contributors/design-proposals/pod-preemption.md#preemption-mechanicsfunc(g*genericScheduler)Preempt(pod*v1.Pod,nodeListeralgorithm.NodeLister,scheduleErrerror)(*v1.Node,[]*v1.Pod,error){//Schedulermayreturnvarioustypesoferrors.Considerpreemptiononlyif//theerrorisoftypeFitError.fitError,ok:=scheduleErr.(*FitError)if!ok||fitError==nil{returnnil,nil,nil}err:=g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)iferr!=nil{returnnil,nil,err}if!podEligibleToPreemptOthers(pod,g.cachedNodeInfoMap){glog.V(5).Infof("Pod%visnoteligibleformorepreemption.",pod.Name)returnnil,nil,nil}allNodes,err:=nodeLister.List()iferr!=nil{returnnil,nil,err}iflen(allNodes)==0{returnnil,nil,ErrNoNodesAvailable}potentialNodes:=nodesWherePreemptionMightHelp(pod,allNodes,fitError.FailedPredicates)iflen(potentialNodes)==0{glog.V(3).Infof("Preemptionwillnothelpschedulepod%vonanynode.",pod.Name)returnnil,nil,nil}nodeToPods,err:=selectNodesForPreemption(pod,g.cachedNodeInfoMap,potentialNodes,g.predicates,g.predicateMetaProducer)iferr!=nil{returnnil,nil,err}forlen(nodeToPods)>0{node:=pickOneNodeForPreemption(nodeToPods)ifnode==nil{returnnil,nil,err}passes,pErr:=nodePassesExtendersForPreemption(pod,node.Name,nodeToPods[node],g.cachedNodeInfoMap,g.extenders)ifpasses&&pErr==nil{returnnode,nodeToPods[node],err}ifpErr!=nil{glog.Errorf("Erroroccurredwhilecheckingextendersforpreemptiononnode%v:%v",node,pErr)}//Removethenodefromthemapandtrytopickadifferentnode.delete(nodeToPods,node)}returnnil,nil,err}

sched.schedule error检查

  • 只有前面sched.schedule()返回的error为FitError类型时,才会触发后续的Preemption。FitError就是表示pod在Predicate阶段进行某些PredicateFunc筛选时不通过。也就是说只有预选失败的Pod才会进行抢占式调度。

更新scheduler cache中的NodeInfo

  • 更新scheduler cache中NodeInfo,主要是更新Node上scheduled 和Assumed Pods,作为后续Preempt Pods时的考虑范围,确保Preemption是正确的。

podEligibleToPreemptOthers检查pod是否有资格进行抢占式调度

  • invoke podEligibleToPreemptOthers来判断该pod是否适合进行后续的Preemption,判断逻辑是:

    • 如果该Pod已经包含Annotation:NominatedNodeName=nodeName(说明该pod之前已经Preempted),并且Annotation中的这个Node有比该pod优先级更低的pod正在Terminating,则认为该pod不适合进行后续的Preemption,流程结束。

    • 除此之外,继续后续的流程。

    • 对应代码如下:

      plugin/pkg/scheduler/core/generic_scheduler.go:756funcpodEligibleToPreemptOthers(pod*v1.Pod,nodeNameToInfomap[string]*schedulercache.NodeInfo)bool{ifnodeName,found:=pod.Annotations[NominatedNodeAnnotationKey];found{ifnodeInfo,found:=nodeNameToInfo[nodeName];found{for_,p:=rangenodeInfo.Pods(){ifp.DeletionTimestamp!=nil&&util.GetPodPriority(p)<util.GetPodPriority(pod){//Thereisaterminatingpodonthenominatednode.returnfalse}}}}returntrue}

nodesWherePreemptionMightHelp筛选出Potential Nodes

  • invoke nodesWherePreemptionMightHelp来获取potential nodes。nodesWherePreemptionMightHelp的逻辑是:

    • NodeSelectorNotMatch,

    • PodNotMatchHostName,

    • TaintsTolerationsNotMatch,

    • NodeLabelPresenceViolated,

    • NodeNotReady,

    • NodeNetworkUnavailable,

    • NodeUnschedulable,

    • NodeUnknownCondition

    • 遍历所有的nodes,对每个nodes在sched.schedule()在预选阶段失败的Predicate策略(failedPredicates)进行扫描,如果failedPredicates包含以下Policy,则说明该node不适合作为Preempt的备选节点。

    • 除此之外的Node均作为Potential Nodes。

    • 对应代码如下:

      funcnodesWherePreemptionMightHelp(pod*v1.Pod,nodes[]*v1.Node,failedPredicatesMapFailedPredicateMap)[]*v1.Node{potentialNodes:=[]*v1.Node{}for_,node:=rangenodes{unresolvableReasonExist:=falsefailedPredicates,found:=failedPredicatesMap[node.Name]//IfweassumethatschedulerlooksatallnodesandpopulatesthefailedPredicateMap//(whichisthecasetoday),the!foundcaseshouldneverhappen,butwe'dprefer//torelylessonsuchassumptionsinthecodewhencheckingdoesnotimpose//significantoverhead.for_,failedPredicate:=rangefailedPredicates{switchfailedPredicate{casepredicates.ErrNodeSelectorNotMatch,predicates.ErrPodNotMatchHostName,predicates.ErrTaintsTolerationsNotMatch,predicates.ErrNodeLabelPresenceViolated,predicates.ErrNodeNotReady,predicates.ErrNodeNetworkUnavailable,predicates.ErrNodeUnschedulable,predicates.ErrNodeUnknownCondition:unresolvableReasonExist=truebreak//TODO(bsalamat):Pleaseaddaffinityfailurecasesoncewehavespecificaffinityfailureerrors.}}if!found||!unresolvableReasonExist{glog.V(3).Infof("Node%visapotentialnodeforpreemption.",node.Name)potentialNodes=append(potentialNodes,node)}}returnpotentialNodes}

selectNodesForPreemption和selectVictimsOnNode选出可行Nodes及其对应的victims

  • invoke selectNodesForPreemption从Potential Nodes中找出所有可行的Nodes及对应的victim Pods,其对应的逻辑如为:启动max(16, potentialNodesNum)个worker(对应goruntine)通过WaitGroups并发等待所有node的check完成:

    • 遍历该node上所有的scheduled pods(包括assumed pods),将优先级比Preemptor更低的Pods都加入到Potential victims List中,并且将这些victims从NodeInfoCopy中删除,下次进行Predicate时就意味着Node上有更多资源可用。

    • 对Potential victims中元素进行排序,排序规则是按照优先级从高到底排序的,index为0的对应的优先级最高。

    • 检查Preemptor是否能scheduler配置的所有Predicates Policy(基于前面将这些victims从NodeInfoCopy中删除,将所有更低优先级的pods资源全部释放了),如果不通过则返回,表示该node不合适。All Predicate通过后,继续下面流程。

    • 遍历所有的Potential victims list item(已经按照优先级从高到底排序),试着把Potential victims中第一个Pod(优先级最高)加回到NodeInfoCopy中,再检查Preemptor是否能scheduler配置的所有Predicates Policy,如果不满足就把该pod再从NodeInfoCopy中删除,并且正式加入到victims list中。接着对Potential victims中第2,3...个Pod进行同样处理。这样做,是为了保证尽量保留优先级更高的Pods,尽量删除更少的Pods。

    • 最终返回每个可行node及其对应victims list。

    • selectNodesForPreemption代码如下,其实核心代码在selectVictimsOnNode

      plugin/pkg/scheduler/core/generic_scheduler.go:583funcselectNodesForPreemption(pod*v1.Pod,nodeNameToInfomap[string]*schedulercache.NodeInfo,potentialNodes[]*v1.Node,predicatesmap[string]algorithm.FitPredicate,metadataProduceralgorithm.PredicateMetadataProducer,)(map[*v1.Node][]*v1.Pod,error){nodeNameToPods:=map[*v1.Node][]*v1.Pod{}varresultLocksync.Mutex//Wecanusethesamemetadataproducerforallnodes.meta:=metadataProducer(pod,nodeNameToInfo)checkNode:=func(iint){nodeName:=potentialNodes[i].NamevarmetaCopyalgorithm.PredicateMetadataifmeta!=nil{metaCopy=meta.ShallowCopy()}pods,fits:=selectVictimsOnNode(pod,metaCopy,nodeNameToInfo[nodeName],predicates)iffits{resultLock.Lock()nodeNameToPods[potentialNodes[i]]=podsresultLock.Unlock()}}workqueue.Parallelize(16,len(potentialNodes),checkNode)returnnodeNameToPods,nil}
      plugin/pkg/scheduler/core/generic_scheduler.go:659funcselectVictimsOnNode(pod*v1.Pod,metaalgorithm.PredicateMetadata,nodeInfo*schedulercache.NodeInfo,fitPredicatesmap[string]algorithm.FitPredicate)([]*v1.Pod,bool){potentialVictims:=util.SortableList{CompFunc:util.HigherPriorityPod}nodeInfoCopy:=nodeInfo.Clone()removePod:=func(rp*v1.Pod){nodeInfoCopy.RemovePod(rp)ifmeta!=nil{meta.RemovePod(rp)}}addPod:=func(ap*v1.Pod){nodeInfoCopy.AddPod(ap)ifmeta!=nil{meta.AddPod(ap,nodeInfoCopy)}}//Asthefirststep,removeallthelowerprioritypodsfromthenodeand//checkifthegivenpodcanbescheduled.podPriority:=util.GetPodPriority(pod)for_,p:=rangenodeInfoCopy.Pods(){ifutil.GetPodPriority(p)<podPriority{potentialVictims.Items=append(potentialVictims.Items,p)removePod(p)}}potentialVictims.Sort()//Ifthenewpoddoesnotfitafterremovingallthelowerprioritypods,//wearealmostdoneandthisnodeisnotsuitableforpreemption.Theonlycondition//thatweshouldcheckisifthe"pod"isfailingtoscheduleduetopodaffinity//failure.//TODO(bsalamat):Considercheckingaffinitytolowerprioritypodsiffeasiblewithreasonableperformance.iffits,_,err:=podFitsOnNode(pod,meta,nodeInfoCopy,fitPredicates,nil);!fits{iferr!=nil{glog.Warningf("Encounterederrorwhileselectingvictimsonnode%v:%v",nodeInfo.Node().Name,err)}returnnil,false}victims:=[]*v1.Pod{}//Trytoreprieveasmanypodsaspossiblestartingfromthehighestpriorityone.for_,p:=rangepotentialVictims.Items{lpp:=p.(*v1.Pod)addPod(lpp)iffits,_,_:=podFitsOnNode(pod,meta,nodeInfoCopy,fitPredicates,nil);!fits{removePod(lpp)victims=append(victims,lpp)glog.V(5).Infof("Pod%visapotentialpreemptionvictimonnode%v.",lpp.Name,nodeInfo.Node().Name)}}returnvictims,true}

pickOneNodeForPreemption从可行Nodes中找出最合适的一个Node

  • 如果上一步至少找到一个可行node,则调用pickOneNodeForPreemption按照以下逻辑选择一个最合适的node:

    • 选择victims中最高pod优先级最低的那个Node。

    • 如果上一步有不止一个Nodes满足条件,则再对选择所有victims优先级之和最小的那个Node。

    • 如果上一步有不止一个Nodes满足条件,则再选择victims pod数最少的Node。

    • 如果上一步有不止一个Nodes满足条件,则再随机选择一个Node。

    • plugin/pkg/scheduler/core/generic_scheduler.go:501funcpickOneNodeForPreemption(nodesToPodsmap[*v1.Node][]*v1.Pod)*v1.Node{typenodeScorestruct{node*v1.NodehighestPriorityint32sumPrioritiesint64numPodsint}iflen(nodesToPods)==0{returnnil}minHighestPriority:=int32(math.MaxInt32)minPriorityScores:=[]*nodeScore{}fornode,pods:=rangenodesToPods{iflen(pods)==0{//Wefoundanodethatdoesn'tneedanypreemption.Returnit!//Thisshouldhappenrarelywhenoneormorepodsareterminatedbetween//thetimethatschedulertriestoschedulethepodandthetimethat//preemptionlogictriestofindnodesforpreemption.returnnode}//highestPodPriorityisthehighestpriorityamongthevictimsonthisnode.highestPodPriority:=util.GetPodPriority(pods[0])ifhighestPodPriority<minHighestPriority{minHighestPriority=highestPodPriorityminPriorityScores=nil}ifhighestPodPriority==minHighestPriority{minPriorityScores=append(minPriorityScores,&nodeScore{node:node,highestPriority:highestPodPriority,numPods:len(pods)})}}iflen(minPriorityScores)==1{returnminPriorityScores[0].node}//Thereareafewnodeswithminimumhighestpriorityvictim.Findthe//smallestsumofpriorities.minSumPriorities:=int64(math.MaxInt64)minSumPriorityScores:=[]*nodeScore{}for_,nodeScore:=rangeminPriorityScores{varsumPrioritiesint64for_,pod:=rangenodesToPods[nodeScore.node]{//WeaddMaxInt32+1toallprioritiestomakeallofthem>=0.Thisis//neededsothatanodewithafewpodswithnegativepriorityisnot//pickedoveranodewithasmallernumberofpodswiththesamenegative//priority(andsimilarscenarios).sumPriorities+=int64(util.GetPodPriority(pod))+int64(math.MaxInt32+1)}ifsumPriorities<minSumPriorities{minSumPriorities=sumPrioritiesminSumPriorityScores=nil}nodeScore.sumPriorities=sumPrioritiesifsumPriorities==minSumPriorities{minSumPriorityScores=append(minSumPriorityScores,nodeScore)}}iflen(minSumPriorityScores)==1{returnminSumPriorityScores[0].node}//Thereareafewnodeswithminimumhighestpriorityvictimandsumofpriorities.//Findonewiththeminimumnumberofpods.minNumPods:=math.MaxInt32minNumPodScores:=[]*nodeScore{}for_,nodeScore:=rangeminSumPriorityScores{ifnodeScore.numPods<minNumPods{minNumPods=nodeScore.numPodsminNumPodScores=nil}ifnodeScore.numPods==minNumPods{minNumPodScores=append(minNumPodScores,nodeScore)}}//Atthispoint,eveniftherearemorethanonenodewiththesamescore,//returnthefirstone.iflen(minNumPodScores)>0{returnminNumPodScores[0].node}glog.Errorf("Errorinlogicofnodescoringforpreemption.Weshouldneverreachhere!")returnnil}

最合适的Node仍然要交给extender(if configed)检查

  • 如果scheduler配置extender scheduler,则还需要通过invoke nodePassesExtendersForPreemption再次将该pod和(假设)剔除victims的该node交给extender.Filter进行一下检查,只有检查通过了才返回该node作为最终选择的Preempt node。

  • 关于extender的理解,请参考如何对kubernetes scheduler进行二次开发和Kubernetes Scheduler源码分析。其实用的场景不多,现在支持自定义调度器了,就更少需要使用scheduler extender了。

感谢各位的阅读,以上就是“Preemption抢占式调度的方法是什么”的内容了,经过本文的学习后,相信大家对Preemption抢占式调度的方法是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是本站,小编将为大家推送更多相关知识点的文章,欢迎关注!

preemption
需要做网站?需要网络推广?欢迎咨询客户经理 13272073477