How to Use Spark Operator with Kubernetes

By Mo Yuan

Internal Implementation of Spark Operator

Before further exploring Spark Operator, let’s take a look at Kubernetes Operators first. During 2018, many Kubernetes Operators emerged. Operators are one of the best methods to extend the capabilities and integrate Kubernetes. Among Kubernetes design concepts, an important concept is abstractions. This includes storage abstraction, application load abstraction, and access layer abstraction. Each abstraction corresponds to the controller for the lifecycle management. YAML files submitted by developers are actually the description of final abstraction. Controllers listen for, parse and process abstraction changes, and try to restore the state to the final state.

Image for post
Image for post

What is used to process abstractions that are not defined in Kubernetes? The answer is Operators. A standard Operator usually consists of the following parts:

  1. The definition of the CRD abstraction, which describes features that can be included in the abstraction.
  2. CRD Controller, which parses content in the CRD definition and performs lifecycle management.
  3. The client-go SDK, which provides the SDKs used during code integration.
Image for post
Image for post

Now that you understand the above information, we can easily and clearly understand the structure of the Spark Operator code. The core code logic is all located under the pkg.

Under the pkg directory, the following is the case:

  • apis mainly defines different versions of APIs
  • the client directory mainly includes the auto-generated client-go SDKs

Next, the crd directory describes the structures of the two custom resources:

  • sparkapplication and scheduledsparkapplication
  • the controller directory is where the lifecycle management logic of the Operator is defined
  • the config directory deals with the conversion of Spark config.

The fastest way to learn the features of an Operator is to see the CRD definitions. In a Spark Operator, two CRDs are defined:

  • sparkapplication and scheduledsparkapplication.

So, what are the differences between them? sparkapplication is the abstraction of regular Spark tasks. A task runs at a certain time and each Pod has the state of Succeed or Failed after a task is run. scheduledsparkapplication is the abstraction of offline scheduled tasks. Developers can define tasks like crontab in scheduledsparkapplication to schedule offline Spark tasks regularly.

The above figure shows the integration of Kubernetes in Spark. That is to say, a Driver Pod and several Executor Pods will be automatically generated when we use spark-submit to submit a task.

So then what is the process like after the Spark Operator is introduced? Look at the following code.

func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {    // Expose Prometheus monitoring metrics 
appToSubmit := app.DeepCopy()
if appToSubmit.Spec.Monitoring ! = nil && appToSubmit.Spec.Monitoring.Prometheus ! = nil {
if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err ! = nil {
glog.Error(err)
}
}
// Convert the definitions in CRDs into the spark-submit command
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
if err ! = nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
return app
}
// Use spark-submit to submit a task in the Operator container
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
if err ! = nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return app
}

// Because Spark Operator also monitors the state of Pods, a failed driver Pod will be re-launched.
// This is a main difference from directly running spark-submit: It enables fault recovery.
if ! submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return app
}
glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.SubmittedState,
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
ExecutionAttempts: app.Status.ExecutionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
// Expose spark-ui by using service
service, err := createSparkUIService(app, c.kubeClient)
if err ! = nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.nodePort
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat ! = "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err ! = nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
return app
}

From the following code, you can understand the basics of what Spark Operator can do. Let’s summarize what it does: First, Spark Operator defines two different CRD objects, which correspond to common computing tasks and scheduled and periodic computing tasks. Then it parses the configuration files of CRDs, converts them into the spark-submit command, exposes the interface for collecting monitoring data through Prometheus and creates services to provide access to spark-ui. It listens for the state of Pods, continuously writes back and updates CRD objects to implement the lifecycle management of Spark tasks.

The Task State Machine of Spark Operator

After learning the design concepts of Spark Operator and the basic processes, we also need to understand what the states of sparkapplication are and how they convert from one to another. This is the most important lifecycle management enhancement enabled by Spark Operator.

Running a Spark task is described in the preceding state machine conversion graph. A normal task will go through the following states:

New -> Submitted -> Running -> Succeeding -> Completed

When a task fails, the system retries the task. The task will become ultimately failed if the number of retries reaches the maximum limit. That is to say, if a task is rejected or removed due to resources, scheduling or other factors, Spark Operator will use its state machine to switch states and retry that task.

State Troubleshooting in Spark Operator

We have learned that the most important feature of Spark Operator is to convert configurations of the CRDs into the spark-submit command. When a task is not running as expected, how can we determine what is causing the problem? First, check if the parameter generated during the spark-submit operation is as expected, because the YAML configuration of CRDs increases the complexity of configuration and the probability of errors, although YAML configuration does improve the expressiveness.

func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
if ! present {
glog.Error("SPARK_HOME is not specified")
}
var command = filepath.Join(sparkHome, "/bin/spark-submit")
cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
if _, err := cmd.Output(); err ! = nil {
var errorMsg string
if exitErr, ok := err.(*exec.ExitError); ok {
errorMsg = string(exitErr.Stderr)
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
return false, nil
}
if errorMsg ! = "" {
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
}
return true, nil
}

By default, Spark Operator generates the submit command by using glog level=2 after each task is submitted. The default glog level is 2. Therefore, checking the Pod log in Spark Operator is helpful for developers to quickly troubleshoot problems. In addition, states are also recorded in the form of events on sparkapplication. The conversion between the preceding state machines is reflected as on events objects in sparkapplication. These two methods can significantly reduce the time needed for troubleshooting.

Conclusion

Spark Operator is the best way to run Spark tasks on Kubernetes. Compared with the traditional spark-submit, Spark Operator provides better fault recovery and reliability and also supports the integration of capabilities such as monitoring, logging, and UI.

Original Source

Written by

Follow me to keep abreast with the latest technology news, industry insights, and developer trends.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store