How to Use Spark Operator with Kubernetes

Internal Implementation of Spark Operator

func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1beta1.SparkApplication {    // Expose Prometheus monitoring metrics 
appToSubmit := app.DeepCopy()
if appToSubmit.Spec.Monitoring ! = nil && appToSubmit.Spec.Monitoring.Prometheus ! = nil {
if err := configPrometheusMonitoring(appToSubmit, c.kubeClient); err ! = nil {
glog.Error(err)
}
}
// Convert the definitions in CRDs into the spark-submit command
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
if err ! = nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
return app
}
// Use spark-submit to submit a task in the Operator container
submitted, err := runSparkSubmit(newSubmission(submissionCmdArgs, appToSubmit))
if err ! = nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.FailedSubmissionState,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
glog.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return app
}

// Because Spark Operator also monitors the state of Pods, a failed driver Pod will be re-launched.
// This is a main difference from directly running spark-submit: It enables fault recovery.
if ! submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return app
}
glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
State: v1beta1.SubmittedState,
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
ExecutionAttempts: app.Status.ExecutionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
c.recordSparkApplicationEvent(app)
// Expose spark-ui by using service
service, err := createSparkUIService(app, c.kubeClient)
if err ! = nil {
glog.Errorf("failed to create UI service for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIServiceName = service.serviceName
app.Status.DriverInfo.WebUIPort = service.nodePort
// Create UI Ingress if ingress-format is set.
if c.ingressURLFormat ! = "" {
ingress, err := createSparkUIIngress(app, *service, c.ingressURLFormat, c.kubeClient)
if err ! = nil {
glog.Errorf("failed to create UI Ingress for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
} else {
app.Status.DriverInfo.WebUIIngressAddress = ingress.ingressURL
app.Status.DriverInfo.WebUIIngressName = ingress.ingressName
}
}
}
return app
}

The Task State Machine of Spark Operator

New -> Submitted -> Running -> Succeeding -> Completed

State Troubleshooting in Spark Operator

func runSparkSubmit(submission *submission) (bool, error) {
sparkHome, present := os.LookupEnv(sparkHomeEnvVar)
if ! present {
glog.Error("SPARK_HOME is not specified")
}
var command = filepath.Join(sparkHome, "/bin/spark-submit")
cmd := execCommand(command, submission.args...)
glog.V(2).Infof("spark-submit arguments: %v", cmd.Args)
if _, err := cmd.Output(); err ! = nil {
var errorMsg string
if exitErr, ok := err.(*exec.ExitError); ok {
errorMsg = string(exitErr.Stderr)
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, podAlreadyExistsErrorCode) {
glog.Warningf("trying to resubmit an already submitted SparkApplication %s/%s", submission.namespace, submission.name)
return false, nil
}
if errorMsg ! = "" {
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %s", submission.namespace, submission.name, errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit for SparkApplication %s/%s: %v", submission.namespace, submission.name, err)
}
return true, nil
}

Conclusion

Original Source

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store