A Brief Analysis on the Implementation of the Kubernetes Scheduler

By Xiao Yuan

Kubernetes Scheduler

Kubernetes is a container-based distributed scheduler that implements its own scheduling module.

In a Kubernetes cluster, the scheduler runs as an independent module through the Pod. This article describes the Kubernetes scheduler from several perspectives.

How the Scheduler Works

The scheduler in Kubernetes runs as an independent component (usually in a master) and the number of schedulers remains consistent with that of masters. An instance is selected to work as a Leader through the Raft protocol and the other instances are backup instances. When the master fails, the Raft protocol selects a new master from the other instances.

The scheduler basically works like this:

Implementation Details

kube-scheduling is a component that runs independently and is mainly in charge of running functions.

This involves several things:

The core content of a scheduling task lies in the `sched.run()` function, which will start a "go routine" to continuously run `sched.scheduleOne`, each run representing a scheduling recurrence.func (sched *Scheduler) Run() {
if ! sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

Let’s see what sched.scheduleOne mainly does.

func (sched *Scheduler) scheduleOne() {
pod := sched.config.NextPod()
.... // do some pre check
scheduleResult, err := sched.schedule(pod)
if err ! = nil {
if fitError, ok := err.(*core.FitError); ok {
if ! util.PodPriorityEnabled() || sched.config.DisablePreemption {
..... // do some log
} else {
sched.preempt(pod, fitError)
}
}
}
...
// Assume volumes first before assuming the pod.
allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
...
fo func() {
// Bind volumes first before Pod
if ! allBound {
err := sched.bindVolumes(assumedPod)
if err ! = nil {
klog.Errorf("error binding volumes: %v", err)
metrics.PodScheduleErrors.Inc()
return
}
}
err := sched.bind(assumedPod, &v1. Binding{
ObjectMeta: metav1. ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
Target: v1. ObjectReference{
Kind: "Node",
Name: scheduleResult.SuggestedHost,
},
})
}
}

Several things are done in sched.scheduleOne.

sched.schedule is the main Pod scheduling logic.

func (g *genericScheduler) Schedule(pod *v1. Pod, nodeLister algorithm.NodeLister) (result ScheduleResult, err error) {
// Get node list
nodes, err := nodeLister.List()
// Filter
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
if err ! = nil {
return result, err
}
// Priority
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err ! = nil {
return result, err
}

// SelectHost
host, err := g.selectHost(priorityList)
return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
FeasibleNodes: len(filteredNodes),
}, err
}

A scheduling task is completed in three steps:

Next, let’s take a closer look at the three steps.

Filters

Filters are relatively easy. By default, the scheduler registers a series of predicates. In the scheduling process, the predicate of each node is invoked in parallel. Then, a node list is obtained, containing nodes that meet the specified conditions.

func (g *genericScheduler) findNodesThatFit(pod *v1. Pod, nodes []*v1. Node) ([]*v1. Node, FailedPredicateMap, error) {
if len(g.predicates) == 0 {
filtered = nodes
} else {
allNodes := int32(g.cache.NodeTree(). NumNodes())
numNodesToFind := g.numFeasibleNodesToFind(allNodes)
checkNode := func(i int) {
nodeName := g.cache.NodeTree(). Next()
// All the predicates of this node are invoked at this point.
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
g.cachedNodeInfoMap[nodeName],
g.predicates,
g.schedulingQueue,
g.alwaysCheckAllPredicates,
)
if fits {
length := atomic.AddInt32(&filteredLen, 1)
if length > numNodesToFind {
// If enough current nodes meet the conditions, the calculation will stop.
cancel()
atomic.AddInt32(&filteredLen, -1)
} else {
filtered[length-1] = g.cachedNodeInfoMap[nodeName]. Node()
}
}
}
// Invoke the checkNode method in parallel.
workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)
filtered = filtered[:filteredLen]
}
return filtered, failedPredicateMap, nil
}

Note that the FeasibleNodes mechanism was introduced in Kubernetes 1.13 to improving the performance for scheduling large-scale clusters. With this feature, we can set the node scoring percentage in the filter process (50% by default) by using the bad-percentage-of-nodes-to-score parameter. When the number of nodes exceeds 100, filtering will stop once the matching nodes exceed this percentage, and the calculation will not be performed on all the nodes.

For example, if the total number of nodes is 1,000 and the percentage is 30%, the scheduler only needs to find 300 matching nodes in the filtering process. When 300 matching nodes are found, the filtering process will stop. This makes it unnecessary to filter all the nodes and reduces the nodes to be prioritized. However, this also has a shortcoming: The Pod may be not scheduled to the most appropriate node.

Prioritize

Pod priority allows scoring nodes that meet the conditions and helps find the most appropriate node for a Pod. The scheduler also registers a series of Priority methods. This is the data structure of the Priority object:

// PriorityConfig is a config used for a priority function.
type PriorityConfig struct {
Name string
Map PriorityMapFunction
Reduce PriorityReduceFunction
// TODO: Remove it after migrating all functions to
// Map-Reduce pattern.
Function PriorityFunction
Weight int
}

Each PriorityConfig represents a scoring metric and takes account of factors like service balance and resource assignment for nodes. The main scoring process for a PriorityConfig includes Map and Reduce.

After all PriorityConfigs are calculated, the result for each PriorityConfig is multiplied by the corresponding weight to perform aggregation again by nodes.

workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index]. Name]
for i := range priorityConfigs {
var err error
results[i][index], err = priorityConfigs[i]. Map(pod, meta, nodeInfo)
}
})
for i := range priorityConfigs {
wg.Add(1)
go func(index int) {
defer wg.Done()
if err := priorityConfigs[index]. Reduce(pod, meta, nodeNameToInfo, results[index]);
}(i)
}
wg.Wait()
// Summarize all scores.
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i]. Name, Score: 0})
for j := range priorityConfigs {
result[i]. Score += results[j][i]. Score * priorityConfigs[j]. Weight
}
}

In addition, both the Filter and Prioritize support the invocation to the extener scheduler, which is not further described in this article.

Status Quo

Currently the main scheduling method of the kubernetes scheduler is Pod-by-Pod. This is also one of the current shortcomings of the scheduler. The main performance bottlenecks are as follows:

Development of the Kubernetes Scheduler

Many schedulers are developed to solve these problems.

Next, we will analyze the implementation of a specific scheduler to help you understand how the scheduler works. We will also pay attention to the trends in the scheduler community.

References

https://medium.com/jorgeacetozi/kubernetes-master-components-etcd-api-server-controller-manager-and-scheduler-3a0179fc8186
https://jvns.ca/blog/2017/07/27/how-does-the-kubernetes-scheduler-work/

Original Source

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.