Spark in the Google Cloud Platform Part 2

In the previous post in this series Spark in the Google Cloud Platform Part 1, we started to explore the various ways in which we could deploy Apache Spark applications in GCP. The first option we looked at was deploying Spark using Cloud DataProc, a managed Hadoop cluster with various ecosystem components included.

In this post, we will look at another option for deploying Spark in GCP – a Spark Standalone cluster running on GKE.

Spark Standalone refers to the in-built cluster manager provided with each Spark release. Standalone can be a bit of a misnomer as it sounds like a single instance – which it is not, standalone simply refers to the fact that it is not dependent upon any other projects or components – such as Apache YARN, Mesos, etc.

A Spark Standalone cluster consists of a Master node or instance and one of more Worker nodes. The Master node serves as both a master and a cluster manager in the Spark runtime architecture.

The Master process is responsible for marshalling resource requests on behalf of applications and monitoring cluster resources.

The Worker nodes host one or many Executor instances which are responsible for carrying out tasks.

Deploying a Spark Standalone cluster on GKE is reasonably straightforward. In the example provided in this post we will set up a private network (VPC), create a GKE cluster, and deploy a Spark Master pod and two Spark Worker pods (in a real scenario you would typically have many Worker pods).

Once the network and GKE cluster have been deployed, the first step is to create Docker images for both the Master and Workers.

The Dockerfile below can be used to create an image capable or running either the Worker or Master daemons:

Note the shell scripts included in the Dockerfile: spark-master and spark-worker. These will be used later on by K8S deployments to start the relative Master and Worker daemon processes in each of the pods.

Next, we will use Cloud Build to build an image using the Dockerfile are store this in GCR (Google Container Registry), from the Cloud Build directory in our project we will run:

gcloud builds submit --tag gcr.io/spark-demo-266309/spark-standalone

Next, we will create Kubernetes deployments for our Master and Worker pods.

Firstly, we need to get cluster credentials for our GKE cluster named ‘spark-cluster’:

gcloud container clusters get-credentials spark-cluster --zone australia-southeast1-a --project spark-demo-266309

Now from within the k8s-deployments\deploy folder of our project we will use the kubectl command to deploy the Master pod, service and the Worker pods

Starting with the Master deployment, this will deploy our Spark Standalone image into a container running the Master daemon process:

To deploy the Master, run the following:

kubectl create -f spark-master-deployment.yaml

The Master will expose a web UI on port 8080 and an RPC service on port 7077, we will need to deploy a K8S service for this, the YAML required to do this is shown here:

To deploy the Master service, run the following:

kubectl create -f spark-master-service.yaml

Now that we have a Master pod and service up and running, we need to deploy our Workers which are preconfigured to communicate with the Master service.

The YAML required to deploy the two Worker pods is shown here:

To deploy the Worker pods, run the following:

kubectl create -f spark-worker-deployment.yaml

You can now inspect the Spark processes running on your GKE cluster.

kubectl get deployments

Shows…

NAME           READY   UP-TO-DATE   AVAILABLE   AGE
 spark-master   1/1     1            1           7m45s
 spark-worker   2/2     2            2           9s
kubectl get pods

Shows…

NAME                            READY   STATUS    RESTARTS   AGE
 spark-master-f69d7d9bc-7jgmj    1/1     Running   0          8m
 spark-worker-55965f669c-rm59p   1/1     Running   0          24s
 spark-worker-55965f669c-wsb2f   1/1     Running   0          24s

Next, as we need to expose the Web UI for the Master process we will create a LoadBalancer resource. The YAML used to do this is provided here:

To deploy the LB, you would run the following:

kubectl create -f spark-ui-lb.yaml

NOTE This is just an example, for simplicity we are creating an external LoadBalancer with a public IP, this configuration is likely not be appropriate in most real scenarios, alternatives would include an internal LoadBalancer, retraction of Authorized Networks, a jump host, SSH tunnelling or IAP.

Now you’re up and running!

You can access the Master web UI from the Google Console link shown here:

Accessing the Spark Master UI from the Google Cloud Console

The Spark Master UI should look like this:

Spark Master UI

Next we will exec into a Worker pod, get a shell:

kubectl exec -it spark-worker-55965f669c-rm59p -- sh

Now from within the shell environment of a Worker – which includes all of the Spark client libraries, we will submit a simple Spark application:

spark-submit --class org.apache.spark.examples.SparkPi \
 --master spark://10.11.250.98:7077 \
/opt/spark/examples/jars/spark-examples*.jar 10000

You can see the results in the shell, as shown here:

Spark Pi Estimator Example

Additionally, as all of the container logs go to Stackdriver, you can view the application logs there as well:

Container Logs in StackDriver

This is a simple way to get a Spark cluster running, it is not without its downsides and shortcomings however, which include the limited security mechanisms available (SASL, network security, shared secrets).

In the final post in this series we will look at Spark on Kubernetes, using Kubernetes as the Spark cluster manager and interacting with Spark using the Kubernetes API and control plane, see you then.

Full source code for this article is available at: https://github.com/gamma-data/spark-on-gcp

The infrastructure coding for this example uses Powershell and Terraform, and is deployed as follows:

PS > .\run.ps1 private-network apply <<gcp-project>> <<region>>
PS > .\run.ps1 gke apply <<gcp-project>> <<region>> 

Spark in the Google Cloud Platform Part 1

I have been an avid Spark enthusiast since 2014 (the early days..). Spark has featured heavily in every project I have been involved with from data warehousing, ETL, feature extraction, advanced analytics to event processing and IoT applications. I like to think of it as a Swiss army knife for distributed processing.

Curiously enough, the first project I had been involved with for some years that did not feature the Apache Spark project was a green field GCP project which got me thinking… where does Spark fit into the GCP landscape?

Unlike the other major providers who use Spark as the backbone of their managed distributed ETL services with examples such as AWS Glue or the Spark integration runtime option in Azure Data Factory, Google’s managed ETL solution is Cloud DataFlow. Cloud DataFlow which is a managed Apache Beam service does not use a Spark runtime (there is a Spark Runner however this is not an option when using CDF). So where does this leave Spark?

My summation is that although Spark is not a first-class citizen in GCP (as far as managed ETL), it is not a second-class citizen either. This article will discuss the various ways Spark clusters and applications can be deployed within the GCP ecosystem.

Quick Primer on Spark

Every Spark application contains several components regardless of deployment mode, the components in the Spark runtime architecture are:

  • the Driver
  • the Master
  • the Cluster Manager
  • the Executor(s), which run on worker nodes or Workers

Each component has a specific role in executing a Spark program and all of the Spark components run in Java virtual machines (JVMs).

Spark Runtime Architecture

Cluster Managers schedule and manage distributed resources (compute and memory) across the nodes of the cluster. Cluster Managers available for Spark include:

  • Standalone
  • YARN (Hadoop)
  • Mesos
  • Kubernetes

Spark on DataProc

This is perhaps the simplest and most integrated approach to using Spark in the GCP ecosystem.

DataProc is GCP’s managed Hadoop Service (akin to AWS EMR or HDInsight on Azure). DataProc uses Hadoop/YARN as the Cluster Manager. DataProc clusters can be deployed on a private network (VPC using RFC1918 address space), supports encryption at Rest using Google Managed or Customer Managed Keys in KMS, supports autoscaling and the use of Preemptible Workers, and can be deployed in a HA config.

Furthermore, DataProc clusters can enforce strong authentication using Kerberos which can be integrated into other directory services such as Active Directory through the use of cross realm trusts.

Deployment

DataProc clusters can be deployed using the gcloud dataproc clusters create command or using IaC solutions such as Terraform. For this article I have included an example in the source code using the gcloud command to deploy a DataProc cluster on a private network which was created using Terraform.

Integration

The beauty of DataProc is its native integration into IAM and the GCP service plane. Having been a long-time user of AWS EMR, I have found that the usability and integration are in many ways superior in GCP DataProc. Let’s look at some examples…

IAM and IAP (TCP Forwarding)

DataProc is integrated into Cloud IAM using various coarse grained permissions use as dataproc.clusters.use and simplified IAM Roles such as dataproc.editor or dataproc.admin. Members with bindings to the these roles can perform tasks such as submitting jobs and creating workflow templates (which we will discuss shortly), as well as accessing instances such as the master node instance or instances in the cluster using IAP (TCP Forwarding) without requiring a public IP address or a bastion host.

DataProc Jobs and Workflows

Spark jobs can be submitted using the console or via gcloud dataproc jobs submit as shown here:

Submitting a Spark Job using gcloud dataproc jobs submit

Cluster logs are natively available in StackDriver and standard out from the Spark Driver is visible from the console as well as via gcloud commands.

Complex Workflows can be created by adding Jobs as Steps in Workflow Templates using the following command:

gcloud dataproc workflow-templates add-job spark

Optional Components and the Component Gateway

DataProc provides you with a Hadoop cluster including YARN and HDFS, a Spark runtine – which includes Spark SQL and SparkR. DataProc also supports several optional components including Anaconda, Jupyter, Zeppelin, Druid, Presto, and more.

Web interfaces to some of these components as well as the management interfaces such as the Resource Manager UI or the Spark History Server UI can be accessed through the Component Gateway.

This is a Cloud IAM integrated gateway (much like IAP) which can allow access through an authenticated and authorized console session to web UIs in the cluster – without the need for SSH tunnels, additional firewall rules, bastion hosts, or public IPs. Very cool.

Links to the component UIs as well as built in UIs like the YARN Resource Manager UI are available directly from through the console.

Jupyter

Jupyter is a popular notebook application in the data science and analytics communities used for reproducible research. DataProc’s Jupyter component provides a ready-made Spark application vector using PySpark. If you have also installed the Anaconda component you will have access to the full complement of scientific and mathematic Python packages such as Pandas and NumPy which can be used in Jupyter notebooks as well. Using the Component Gateway, Jupyer notebooks can be accessed directly from the Google console as shown here:

Jupyter Notebooks using DataProc

From this example you can see that I accessed source data from a GCS bucket and used HDFS as local scratch space.

Furthermore, notebooks are automagically saved in your integrated Cloud Storage DataProc staging bucket and can be shared amongst analysts or accessed at a later time. These notebooks also persist beyond the lifespan of the cluster.

Next up we will look at deploying a Spark Standalone cluster on a GKE cluster, see you then!

Change Data Capture at Scale using Spark

Change Data Capture (CDC) is one of the most challenging processing patterns to implement at scale. I personally have had several cracks at this using various different frameworks and approaches, the most recent of which was implemented using Spark – and I think I have finally found the best approach. Even though the code examples referenced use Spark, the pattern is language agnostic – the focus is on the approach not the specific implementation (as this could be applied to any framework or runtime).

The first challenge you are faced with, is to compare a very large dataset (representing the current state of an object) with another potentially very large dataset (representing new or incoming data). Ideally, you would like the process to be configuration driven and accommodate such things as composite primary keys, or operational columns which you would like to restrict from change detection. You may also want to implement a pattern to segregate sensitive attributes from non-sensitive attributes.

Overview

This pattern (and all my other recent attempts) is fundamentally based upon calculating a deterministic hash of the key and non-key attribute(s), and then using this hash as the basis for comparison. The difference between this pattern and my other attempts is in the distillation and reconstitution of data during the process, as well as breaking the pattern into discrete stages (designed to minimize the impact to other applications). This pattern can be used to process delta or full datasets.

A high-level flowchart representing the basic pattern is shown here:

CDC Flowchart

The Example

The example provided uses the Synthetic CDC Data Generator application, configuring an incoming set with 5 uuid columns acting as a composite key, and 10 random number columns acting as non key values. The initial days payload consists of 10,000 records, the subsequent days payload consists of another 10,000 records. From the initial dataset, a DELETE operation was performed at the source system for 20% of records, an UPDATE was performed on 40% of the records and the remaining 40% of records were unchanged. In this case the 20% of records that were deleted at the source, were replaced by new INSERT operations creating new keys.

After creating the synthesized day 1 and day 2 datasets, the files are processed as follows:

$ spark-submit cdc.py config.yaml data/day1 2019-06-18
$ spark-submit cdc.py config.yaml data/day2 2019-06-19

Where config.yaml is the configuration for the dataset, data/day1 and data/day2 represent the different data files, and 2019-06-18 and 2019-06-19 represent a business effective date.

The Results

You should see the following output from running the preceding commands for day 1 and day 2 respectively:

Day 1:

Day 2:

A summary analysis of the resultant dataset should show:

Pattern Details

Details about the pattern and its implementation follow.

Current and Historical Datasets

The output of each operation will yield a current dataset (that is the current stateful representation of a give object) and a historical dataset partition (capturing the net changes from the previous state in an appended partition).

This is useful, because often consumers will primarily query the latest state of an object. The change sets (or historical dataset partitions) can be used for more advanced analysis by sophisticated users.

Type 2 SCDs (sort of)

Two operational columns are added to each current and historical object:

  • OPERATION : Represents the last known operation to the record, valid values include :
    • I (INSERT)
    • U (UPDATE)
    • D (DELETE – hard DELETEs, applies to full datasets only)
    • X (Not supplied, applies to delta processing only)
    • N (No change)
  • EFF_START_DATE

Since data structures on most big data or cloud storage platforms are immutable, we only store the effective start date for each record, this is changed as needed with each coarse-grained operation on the current object. The effective end date is inferred by the presence of a new effective start date (or change in the EFF_START_DATE value for a given record).

The Configuration

I am using a YAML document to store the configuration for the pattern. Important attributes to include in your configuration are a list of keys and non keys and their datatype (this implementation does type casting as well). Other important attributes include the table names and file paths for the current and historical data structures.

The configuration is read at the beginning of a routine as an input along with the path of an incoming data file (a CSV file in this case) and a business effective date (which will be used as the EFF_START_DATE for new or updated records).

Processing is performed using the specified key and non key attributes and the output datasets (current and historical) are written to columnar storage files (parquet in this case). This is designed to make subsequent access and processing more efficient.

The Algorithm

I have broken the process into stages as follows:

Stage 1 – Type Cast and Hash Incoming Data

The first step is to create deterministic hashes of the configured key and non key values for incoming data. The hashes are calculated based upon a list of elements representing the key and non key values using the MD5 algorithm. The hashes for each record are then stored with the respective record. Furthermore, the fields are casted their target datatype as specified in the configuration. Both of these operations can be performed in a single pass of each row using a map() operation.

Importantly we only calculate hashes once upon arrival of new data, as the hashes are persisted for the life of the data – and the data structures are immutable – the hashes should never change or be invalidated.

Stage 2 – Determine INSERTs

We now compare Incoming Hashes with previously calculated hash values for the (previous day’s) current object. If no current object exists for the dataset, then it can be assumed this is a first run. In this case every record is considered as an INSERT with an EFF_START_DATE of the business effective date supplied.

If there is a current object, then the key and non key hash values (only the hash values) are read from the current object. These are then compared to the respective hashes of the incoming data (which should still be in memory).

Given the full outer join:

incoming_data(keyhash, nonkeyhash) 
FULL OUTER JOIN  
current_data(keyhash, nonkeyhash) 
ON keyhash

Keys which exist in the left entity which do not exist in the right entity must be the results of an INSERT operation.

Tag these records with an operation of I with an EFF_START_DATE of the business effective date, then rejoin only these records with their full attribute payload from the incoming dataset. Finally, write out these records to the current and historical partition in overwrite mode.

Stage 3 – Determine DELETEs or Missing Records

Referring the previous full outer join operation, keys which exist in the right entity (current object) which do not appear in the left entity (incoming data) will be the result of a (hard) DELETE operation if you are processing full snapshots, otherwise if you are processing deltas these would be missing records (possibly because there were no changes at the source).

Tag these records as D or X respectively with an EFF_START_DATE of the business effective date, rejoin these records with their full attribute payload from the current dataset, then write out these records to the current and historical partition in append mode.

Stage 4 – Determine UPDATEs or Unchanged Records

Again, referring to the previous full outer join, keys which exist in both the incoming and current datasets must be either the result of an UPDATE or they could be unchanged. To determine which case they fall under, compare the non key hashes. If the non key hashes differ, it must have been a result of an UPDATE operation at the source, otherwise the record would be unchanged.

Tag these records as U or N respectively with an EFF_START_DATE of the business effective date (in the case of an update – otherwise maintain the current EFF_START_DATE), rejoin these records with their full attribute payload from the incoming dataset, then write out these records to the current and historical partition in append mode.

Key Callouts

A summary of the key callouts from this pattern are:

  • Use the RDD API for iterative record operations (such as type casting and hashing)
  • Persist hashes with the records
  • Use Dataframes for JOIN operations
  • Only perform JOINs with the keyhash and nonkeyhash columns – this minimizes the amount of data shuffled across the network
  • Write output data in columnar (Parquet) format
  • Break the routine into stages, covering each operation, culminating with a saveAsParquet() action – this may seem expensive but for large datsets it is more efficient to break down DAGs for each operation
  • Use caching for objects which will be reused between actions

Metastore Integration

Although I did not include this in my example, you could easily integrate this pattern with a metastore (such as a Hive metastore or AWS Glue Catalog), by using table objects and ALTER TABLE statements to add historical partitions.

Further optimisations

If the incoming data is known to be relatively small (in the case of delta processing for instance), you could consider a broadcast join where the smaller incoming data is distributed to all of the different Executors hosting partitions from the current dataset.

Also you could add a key to the column config to configure a column to be nullable or not.

Happy CDCing!

Full source code for this article can be found at: https://github.com/avensolutions/cdc-at-scale-using-spark

Synthetic CDC Data Generator

This is a simple routine to generate random data with a configurable number or records, key fields and non key fields to be used to create synthetic data for source change data capture (CDC) processing. The output includes an initial directory containing CSV files representing an initial data load, and an incremental directory containing CSV files representing incremental data.

Arguments (by position) include:

  • no_init_recs : the number of initial records to generate
  • no_incr_recs : the number of incremental records on the second run – should be >= no_init_recs
  • no_keys : number of key columns in the dataset – keys are generated as UUIDs
  • no_nonkeys : number of non-key columns in the dataset – nonkey values are generated as random numbers
  • pct_del : percentage of initial records deleted on the second run – between 0.0 and 1.0
  • pct_upd : percentage of initial records updated on the second run – between 0.0 and 1.0
  • pct_unchanged : percentage of records unchanged on the second run – between 0.0 and 1.0
  • initial_output : folder for initial output in CSV format
  • incremental_output : folder for incremental output in CSV format

NOTE : pct_del + pct_upd + pct_unchanged must equal 1.0

Example usage:

$ spark-submit synthetic-cdc-data-generator.py 100000 100000 2 3 0.2 0.4 0.4 data/day1 data/day2

Example output from the day1 run for the above configuration would look like this:

Note that this routine can be run subsequent times producing different key and non key values each time, as the keys are UUIDs and the values are random numbers.

We will use this application to generate random input data to demonstrate CDC using Spark in a subsequent post, see you soon!

Full source code can be found at: https://github.com/avensolutions/synthetic-cdc-data-generator

Multi Stage ETL Framework using Spark SQL

Most traditional data warehouse or datamart ETL routines consist of multi stage SQL transformations, often a series of CTAS (CREATE TABLE AS SELECT) statements usually creating transient or temporary tables – such as volatile tables in Teradata or Common Table Expressions (CTE’s).

The initial challenge when moving from a SQL/MPP based ETL framework platformed on Oracle, Teradata, SQL Server, etc to a Spark based ETL framework is what to do with this…

Multi Stage SQL Based ETL

One approach is to use the lightweight, configuration driven, multi stage Spark SQL based ETL framework described in this post.

This framework is driven from a YAML configuration document. YAML was preferred over JSON as a document format as it allows for multi-line statements (SQL statements), as well as comments – which are very useful as SQL can sometimes be undecipherable even for the person that wrote it.

The YAML config document has three main sections: sources, transforms and targets.

The sources section is used to configure the input data source(s) including optional column and row filters. In this case the data sources are tables available in the Spark catalog (for instance the AWS Glue Catalog or a Hive Metastore), this could easily be extended to read from other datasources using the Spark DataFrameReader API.

The transforms section contains the multiple SQL statements to be run in sequence where each statement creates a temporary view using objects created by preceding statements.

Finally the targets section writes out the final object or objects to a specified destination (S3, HDFS, etc).

The process_sql_statements.py script that is used to execute the framework is very simple (30 lines of code not including comments, etc). It loads the sources into Spark Dataframes and then creates temporary views to reference these datasets in the transforms section, then sequentially executes the SQL statements in the list of transforms. Lastly the script writes out the final view or views to the desired destination – in this case parquet files stored in S3 were used as the target.

You could implement an object naming convention such as prefixing object names with sv_, iv_, fv_ (for source view, intermediate view and final view respectively) if this helps you differentiate between the different objects.

To use this framework you would simply use spark-submit as follows:

spark-submit process_sql_statements.py config.yml

Full source code can be found at: https://github.com/avensolutions/spark-sql-etl-framework