How to Use Spark Operator with Kubernetes

Internal Implementation of Spark Operator

  1. The definition of the CRD abstraction, which describes features that can be included in the abstraction.
  2. CRD Controller, which parses content in the CRD definition and performs lifecycle management.
  3. The client-go SDK, which provides the SDKs used during code integration.
  • apis mainly defines different versions of APIs
  • the client directory mainly includes the auto-generated client-go SDKs
  • sparkapplication and scheduledsparkapplication
  • the controller directory is where the lifecycle management logic of the Operator is defined
  • the config directory deals with the conversion of Spark config.
  • sparkapplication and scheduledsparkapplication.
func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {    // Expose Prometheus monitoring metrics 
appToSubmit := app.DeepCopy()
if appToSubmit.Spec.Monitoring ! = nil && appToSubmit.Spec.Monitoring.Prometheus ! = nil {
if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err ! = nil {
glog.Error(err)
}
}
// Convert the definitions in CRDs into the spark-submit command
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
if err ! = nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
return app
}
// Use spark-submit to submit a task in the Operator container
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
if err ! = nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return app
}

// Because Spark Operator also monitors the state of Pods, a failed driver Pod will be re-launched.
// This is a main difference from directly running spark-submit: It enables fault recovery.
if ! submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return app
}
glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.SubmittedState,
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
ExecutionAttempts: app.Status.ExecutionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
// Expose spark-ui by using service
service, err := createSparkUIService(app, c.kubeClient)
if err ! = nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.nodePort
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat ! = "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err ! = nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
return app
}

The Task State Machine of Spark Operator

New -> Submitted -> Running -> Succeeding -> Completed

State Troubleshooting in Spark Operator

func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
if ! present {
glog.Error("SPARK_HOME is not specified")
}
var command = filepath.Join(sparkHome, "/bin/spark-submit")
cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
if _, err := cmd.Output(); err ! = nil {
var errorMsg string
if exitErr, ok := err.(*exec.ExitError); ok {
errorMsg = string(exitErr.Stderr)
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
return false, nil
}
if errorMsg ! = "" {
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
}
return true, nil
}

Conclusion

Original Source

--

--

--

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

👏Kotlin (over) Flow review 👏

Print names of test methods executed by JUnit 5

Flutter. Using environment variables.

Daily minutiae and record keeping

The Lotus Fax Of Knowledge

Chattie or how to build a decentralised chat app in 10 minutes

Why does my app send network requests when I open an SVG file?

Tech Journal: Day 5

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Alibaba Cloud

Alibaba Cloud

Follow me to keep abreast with the latest technology news, industry insights, and developer trends. Alibaba Cloud website:https://www.alibabacloud.com

More from Medium

Apache Spark on Kubernetes

Installation of Kubernetes cluster with docker containers

K8s — Access Pod Through Service

Apache Cassandra: CQL Commands