A Brief Analysis on the Implementation of the Kubernetes Scheduler

Kubernetes Scheduler

How the Scheduler Works

  • The scheduler maintains a scheduled podQueue and listens to the APIServer.
  • When we create a Pod, we first write Pod metadata to etcd through the APIServer.
  • The scheduler listens to the Pod status through Informer. When a new Pod is added, the Pod is added to the podQueue.
  • The main process continuously extracts Pods from the podQueue and assigns nodes to Pods.
  • The scheduling process consists of two steps: Filter matching nodes and prioritize these nodes based on Pod configuration (for example, by metrics like resource usage and affinity) to score nodes and select the node with the highest score.
  • After a node is assigned successfully, invoke the binding pod interface of the apiServer and set pod.Spec.NodeName to the assigned pod.
  • The kubelet on the node also listens to the ApiServer. If it finds that a new Pod is scheduled to that node, the local dockerDaemon is invoked to run the container.
  • If the scheduler fails to schedule a Pod, if priority and preemption is enabled, first a preemption attempt is made, Pods with low priority on the node are deleted and Pods to be scheduled will be scheduled to the node. If the preemption is not enabled or the preemption attempt fails, related information will be recorded in logs and Pods will be added to the end of the podQueue.

Implementation Details

  • Initialize a Scheduler instance sched, pass various Informers and set up a listener for a target resource and register the handler, such as maintaining podQuene
  • Register the “events” component and set logs.
  • Register the HTTP/HTTPS listeners to provide health checks and metric requests.
  • Run sched.run(), the main entry of the scheduled content. If --leader-elect is set to true, multiple instances are started and a Leader will be selected through the Raft protocol. An instance will run sched.run only when it is selected as the master.
The core content of a scheduling task lies in the `sched.run()` function, which will start a "go routine" to continuously run `sched.scheduleOne`, each run representing a scheduling recurrence.func (sched *Scheduler) Run() {
if ! sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
.... // do some pre check
scheduleResult, err := sched.schedule(pod)
if err ! = nil {
if fitError, ok := err.(*core.FitError); ok {
if ! util.PodPriorityEnabled() || sched.config.DisablePreemption {
..... // do some log
} else {
sched.preempt(pod, fitError)
}
}
}
...
// Assume volumes first before assuming the pod.
allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
...
fo func() {
// Bind volumes first before Pod
if ! allBound {
err := sched.bindVolumes(assumedPod)
if err ! = nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
}
err := sched.bind(assumedPod, &v1. Binding{
ObjectMeta: metav1. ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1. ObjectReference{
Kind: "Node",
Name: scheduleResult.SuggestedHost,
},
})
}
}
  • Pods are extracted from podQuene through sched.config.NextPod().
  • sched.schedule is run to make a scheduling attempt.
  • If the scheduling fails and the preemption feature is enabled, sched.preempt starts a preemption attempt and evicts some Pods to reserve space for the scheduled pods. The preemption will take effect in the next scheduling instance.
  • If the scheduling is successful, interface binding will start. Before the interface binding, the PVC declared in the pod volume will be provisioned.
func (g *genericScheduler) Schedule(pod *v1. Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
// Get node list
nodes, err := nodeLister.List()
// Filter
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err ! = nil {
return result, err
}
// Priority
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err ! = nil {
return result, err
}

// SelectHost
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
FeasibleNodes: len(filteredNodes),
}, err
}
  • Filters: Filter nodes that do not meet the specified conditions.
  • PrioritizeNodes: Score matching nodes and obtain a final score list called priorityList.
  • selectHost: Select a group of nodes with the highest score from the priorityList and then select the most appropriate node by using the round-robin algorithm.

Filters

func (g *genericScheduler) findNodesThatFit(pod *v1. Pod, nodes []*v1. Node) ([]*v1. Node, FailedPredicateMap, error) {
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree(). NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
checkNode := func(i int) {
nodeName := g.cache.NodeTree(). Next()
// All the predicates of this node are invoked at this point.
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
)
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// If enough current nodes meet the conditions, the calculation will stop.
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName]. Node()
}
}
}
// Invoke the checkNode method in parallel.
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
}
return filtered, failedPredicateMap, nil
}

Prioritize

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int
}
  • Map calculates the scores of individual nodes.
  • Reduce processes scores of all nodes in the current PriorityConfig again.
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index]. Name]
for i := range priorityConfigs {
var err error
results[i][index], err = priorityConfigs[i]. Map(pod, meta, nodeInfo)
}
})
for i := range priorityConfigs {
wg.Add(1)
go func(index int) {
defer wg.Done()
if err := priorityConfigs[index]. Reduce(pod, meta, nodeNameToInfo, results[index]);
}(i)
}
wg.Wait()
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i]. Name, Score: 0})
for j := range priorityConfigs {
result[i]. Score += results[j][i]. Score * priorityConfigs[j]. Weight
}
}

Status Quo

  • The Kubernetes scheduler currently evaluates each Pod for all nodes. Pod scheduling is extremely slow if a cluster is large and contains many nodes. This is the problem that percentage-of-nodes-to-score tries to solve.
  • The pod-by-pod method is not suitable for some machine learning scenarios. Kubernetes was initially designed to support online tasks. In some offline task scenarios, for example, distributed machine learning, we need a new algorithm called gang scheduling. Pods may not require too much scheduling immediacy. However, after tasks are submitted, computing will start only after all the workers in the batch of computing tasks run. In these scenario, the pod-by-pod scheduling method will easily lead to resource deadlock in the case of insufficient resources.
  • Currently the scheduler does not show excellent scalability. Scheduling in some specific scenarios has to be implemented in the main process through hard-coding. For example, the bindVolume causes the gang scheduler to be unable to be implemented in the current scheduling framework in a native manner.

Development of the Kubernetes Scheduler

  • The scheduler V2 framework improves the scalability and opens the door for implementing gang scheduling in native schedulers.
  • Kube-batch: an implementation of gang scheduling. For more information, visit https://github.com/kubernetes-sigs/kube-batch
  • Poseidon: Firmament is a scheduler based on the network graph scheduling algorithm. Poseidon is to bring integration of Firmament Scheduler in Kubernetes. For more information, visit https://github.com/kubernetes-sigs/poseidon

References

Original Source

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com