Skip to content

Spark Streaming - Spike

Archived (pre-2022)

Preserved for reference only -- likely outdated. View original | Last updated: June 2020

Spark Kubernetes Operator

The operator allows Spark applications to be specified in a declarative manner (e.g., in a YAML file) and run without the need to deal with the spark submission process. It also enables status of Spark applications to be tracked and presented idiomatically like other types of workloads on k8s. We are using k8s operator for Apache Spark made by GCP. You can get the details and design information here: design.md (Github)

What's Spike

Spikes are spark streams that are consuming data from kafka and writing to S3, in some temporary folder. If Batch was consumed and written successfully, then spike sends post request to appender with the list of written files and last offsets. Appender moves temporary saved filed to final destination (one that is later used for aggregation/reporting) and commits offsets in db. Then Spike consumes new portion of data from kafka, considering the offset.

Spike Streaming Architecture

Spike Application Deployment

We are running spike applications in a separate namespace on EKS Ocean cluster using SpotInstances. For deployments we are leveraging Jenkins and Helm. As a base for Helm Chart we used the example provided by the GCP and adjusted one to our needs: [Github] Spike Helm Chart

Each application configuration values.yaml file describes:

  • spark application parameters
  • application specific parameters
  • settings and resource requests for driver and executor

Since we are using multiple spike applications, we decided to divide the configurations as per application: [Github] Spike Helm Configs

Such hierarchy allows us to make changes and deployments of the specific apps while not affecting others: [Jenkins] Spike Helm Deploy

Application file itself is located on s3 bucket:

mainApplicationFile: "s3a://aws-production-eu-west-1-ofw-data-bucket/spike_k8s/stream-eks-assembly.jar"

Right now it's being updated by developers in the case of need, and there is room for automation.

Applications are written in a way that it can survive restarts quite easily, that allowed us to leverage Spotinst and use RestartPolicy as Always.

In the end, process of the development is following:

  1. Developers update spike repo with code related changes or application configuration changes
  2. They run Jenkins job to apply this changes to existing deployment

If there's a need for a new spark streaming spike application, they are following existing example and create configuration on their own.

Monitoring & Troubleshooting

Following command allows you to troubleshoot the deployment if the pod with application is dying too fast, check the Events sectionL

>>> ku get SparkApplication -n spike-streaming

NAME                                    AGE
cct-adtracking-failure-v1-1.1           14d
cct-adtracking-success-v1-1.0           14d
...
...
crt-payout-v1-1.0                       14d
crt-session-v1-1.0                      14d

>>> ku describe SparkApplication/crt-payout-v1-1.0 -n spike-streaming

Status:
  Application State:
    State:  RUNNING
  Driver Info:
    Pod Name:          crt-payout-v1-1.0-driver
  Execution Attempts:  20
  Executor State:
    crt-payout-v1-1-0-1593415050818-exec-1:  RUNNING
  Last Submission Attempt Time:              2020-06-29T07:17:33Z
  Spark Application Id:                      spark-e5faa313c29a4c5f8bd726e3678d3871
  Submission Attempts:                       1
  Submission ID:                             4ce144ad-c3e4-43f9-ac58-d59c4297c3a7
  Termination Time:                          <nil>
Events:                                      <none>

Right now we support basic monitoring of Pods running on our EKS cluster, which is very helpful at the same time. Based on that you can easily resolve resource constraints:

[Grafana] EKS Common - Kubernetes / Pods

Screenshot 2020-06-29 at 15.06.01.png

From the graph above we can see that there's even more room for improvement in terms of savings, CPU is underused.

From the application perspective, the key metric is the topic lag, which is also being monitored:  [Grafana] Spike Creation Timestamp

Screenshot 2020-06-30 at 11.56.04.png

History Server

One more layer of troubleshooting data and monitoring is the Spark History Server we deployed to the spike-streaming namespace. The Spark history server is a monitoring tool that displays information about completed Spark applications. This information is pulled from the data that applications write to s3 bucket.

As base we used stable community helm chart which is available in Helm Repo. This Chart also created ingress which we made available through Route 53 so it can be used by the devs for troubleshooting: [UI] Spark History Server

SparkCtl

It could be very handy during the development process to use the sparkctl utility: [Github] Sparkctl

It allows to diagnose and troubleshoot spike applications in a very quick way:

sparkctl status crt-payout-v1-1.0 --namespace spike-streaming                                                                                                                          !7802
application state:
+---------+----------------+----------------+--------------------------+-----------+--------------------+-------------------+
|  STATE  | SUBMISSION AGE | COMPLETION AGE |        DRIVER POD        | DRIVER UI | SUBMISSIONATTEMPTS | EXECUTIONATTEMPTS |
+---------+----------------+----------------+--------------------------+-----------+--------------------+-------------------+
| RUNNING | 7h             | N.A.           | crt-payout-v1-1.0-driver | N.A.      |                  1 |                20 |
+---------+----------------+----------------+--------------------------+-----------+--------------------+-------------------+
executor state:
+----------------------------------------+---------+
|              EXECUTOR POD              |  STATE  |
+----------------------------------------+---------+
| crt-payout-v1-1-0-1593415050818-exec-1 | RUNNING |
+----------------------------------------+---------+

You can also do port forwarding from a local port to the Spark web UI port on the driver. It allows the Spark web UI served in the driver pod to be accessed locally:

sparkctl forward crt-payout-v1-1.0 --namespace spike-streaming