Spark in the Google Cloud Platform Part 2

In the previous post in this series Spark in the Google Cloud Platform Part 1, we started to explore the various ways in which we could deploy Apache Spark applications in GCP. The first option we looked at was deploying Spark using Cloud DataProc, a managed Hadoop cluster with various ecosystem components included.

In this post, we will look at another option for deploying Spark in GCP – a Spark Standalone cluster running on GKE.

Spark Standalone refers to the in-built cluster manager provided with each Spark release. Standalone can be a bit of a misnomer as it sounds like a single instance – which it is not, standalone simply refers to the fact that it is not dependent upon any other projects or components – such as Apache YARN, Mesos, etc.

A Spark Standalone cluster consists of a Master node or instance and one of more Worker nodes. The Master node serves as both a master and a cluster manager in the Spark runtime architecture.

The Master process is responsible for marshalling resource requests on behalf of applications and monitoring cluster resources.

The Worker nodes host one or many Executor instances which are responsible for carrying out tasks.

Deploying a Spark Standalone cluster on GKE is reasonably straightforward. In the example provided in this post we will set up a private network (VPC), create a GKE cluster, and deploy a Spark Master pod and two Spark Worker pods (in a real scenario you would typically have many Worker pods).

Once the network and GKE cluster have been deployed, the first step is to create Docker images for both the Master and Workers.

The Dockerfile below can be used to create an image capable or running either the Worker or Master daemons:

Note the shell scripts included in the Dockerfile: spark-master and spark-worker. These will be used later on by K8S deployments to start the relative Master and Worker daemon processes in each of the pods.

Next, we will use Cloud Build to build an image using the Dockerfile are store this in GCR (Google Container Registry), from the Cloud Build directory in our project we will run:

gcloud builds submit --tag gcr.io/spark-demo-266309/spark-standalone

Next, we will create Kubernetes deployments for our Master and Worker pods.

Firstly, we need to get cluster credentials for our GKE cluster named ‘spark-cluster’:

gcloud container clusters get-credentials spark-cluster --zone australia-southeast1-a --project spark-demo-266309

Now from within the k8s-deployments\deploy folder of our project we will use the kubectl command to deploy the Master pod, service and the Worker pods

Starting with the Master deployment, this will deploy our Spark Standalone image into a container running the Master daemon process:

To deploy the Master, run the following:

kubectl create -f spark-master-deployment.yaml

The Master will expose a web UI on port 8080 and an RPC service on port 7077, we will need to deploy a K8S service for this, the YAML required to do this is shown here:

To deploy the Master service, run the following:

kubectl create -f spark-master-service.yaml

Now that we have a Master pod and service up and running, we need to deploy our Workers which are preconfigured to communicate with the Master service.

The YAML required to deploy the two Worker pods is shown here:

To deploy the Worker pods, run the following:

kubectl create -f spark-worker-deployment.yaml

You can now inspect the Spark processes running on your GKE cluster.

kubectl get deployments

Shows…

NAME           READY   UP-TO-DATE   AVAILABLE   AGE
 spark-master   1/1     1            1           7m45s
 spark-worker   2/2     2            2           9s
kubectl get pods

Shows…

NAME                            READY   STATUS    RESTARTS   AGE
 spark-master-f69d7d9bc-7jgmj    1/1     Running   0          8m
 spark-worker-55965f669c-rm59p   1/1     Running   0          24s
 spark-worker-55965f669c-wsb2f   1/1     Running   0          24s

Next, as we need to expose the Web UI for the Master process we will create a LoadBalancer resource. The YAML used to do this is provided here:

To deploy the LB, you would run the following:

kubectl create -f spark-ui-lb.yaml

NOTE This is just an example, for simplicity we are creating an external LoadBalancer with a public IP, this configuration is likely not be appropriate in most real scenarios, alternatives would include an internal LoadBalancer, retraction of Authorized Networks, a jump host, SSH tunnelling or IAP.

Now you’re up and running!

You can access the Master web UI from the Google Console link shown here:

Accessing the Spark Master UI from the Google Cloud Console

The Spark Master UI should look like this:

Spark Master UI

Next we will exec into a Worker pod, get a shell:

kubectl exec -it spark-worker-55965f669c-rm59p -- sh

Now from within the shell environment of a Worker – which includes all of the Spark client libraries, we will submit a simple Spark application:

spark-submit --class org.apache.spark.examples.SparkPi \
 --master spark://10.11.250.98:7077 \
/opt/spark/examples/jars/spark-examples*.jar 10000

You can see the results in the shell, as shown here:

Spark Pi Estimator Example

Additionally, as all of the container logs go to Stackdriver, you can view the application logs there as well:

Container Logs in StackDriver

This is a simple way to get a Spark cluster running, it is not without its downsides and shortcomings however, which include the limited security mechanisms available (SASL, network security, shared secrets).

In the final post in this series we will look at Spark on Kubernetes, using Kubernetes as the Spark cluster manager and interacting with Spark using the Kubernetes API and control plane, see you then.

Full source code for this article is available at: https://github.com/gamma-data/spark-on-gcp

The infrastructure coding for this example uses Powershell and Terraform, and is deployed as follows:

PS > .\run.ps1 private-network apply <<gcp-project>> <<region>>
PS > .\run.ps1 gke apply <<gcp-project>> <<region>> 

Spark in the Google Cloud Platform Part 1

I have been an avid Spark enthusiast since 2014 (the early days..). Spark has featured heavily in every project I have been involved with from data warehousing, ETL, feature extraction, advanced analytics to event processing and IoT applications. I like to think of it as a Swiss army knife for distributed processing.

Curiously enough, the first project I had been involved with for some years that did not feature the Apache Spark project was a green field GCP project which got me thinking… where does Spark fit into the GCP landscape?

Unlike the other major providers who use Spark as the backbone of their managed distributed ETL services with examples such as AWS Glue or the Spark integration runtime option in Azure Data Factory, Google’s managed ETL solution is Cloud DataFlow. Cloud DataFlow which is a managed Apache Beam service does not use a Spark runtime (there is a Spark Runner however this is not an option when using CDF). So where does this leave Spark?

My summation is that although Spark is not a first-class citizen in GCP (as far as managed ETL), it is not a second-class citizen either. This article will discuss the various ways Spark clusters and applications can be deployed within the GCP ecosystem.

Quick Primer on Spark

Every Spark application contains several components regardless of deployment mode, the components in the Spark runtime architecture are:

  • the Driver
  • the Master
  • the Cluster Manager
  • the Executor(s), which run on worker nodes or Workers

Each component has a specific role in executing a Spark program and all of the Spark components run in Java virtual machines (JVMs).

Spark Runtime Architecture

Cluster Managers schedule and manage distributed resources (compute and memory) across the nodes of the cluster. Cluster Managers available for Spark include:

  • Standalone
  • YARN (Hadoop)
  • Mesos
  • Kubernetes

Spark on DataProc

This is perhaps the simplest and most integrated approach to using Spark in the GCP ecosystem.

DataProc is GCP’s managed Hadoop Service (akin to AWS EMR or HDInsight on Azure). DataProc uses Hadoop/YARN as the Cluster Manager. DataProc clusters can be deployed on a private network (VPC using RFC1918 address space), supports encryption at Rest using Google Managed or Customer Managed Keys in KMS, supports autoscaling and the use of Preemptible Workers, and can be deployed in a HA config.

Furthermore, DataProc clusters can enforce strong authentication using Kerberos which can be integrated into other directory services such as Active Directory through the use of cross realm trusts.

Deployment

DataProc clusters can be deployed using the gcloud dataproc clusters create command or using IaC solutions such as Terraform. For this article I have included an example in the source code using the gcloud command to deploy a DataProc cluster on a private network which was created using Terraform.

Integration

The beauty of DataProc is its native integration into IAM and the GCP service plane. Having been a long-time user of AWS EMR, I have found that the usability and integration are in many ways superior in GCP DataProc. Let’s look at some examples…

IAM and IAP (TCP Forwarding)

DataProc is integrated into Cloud IAM using various coarse grained permissions use as dataproc.clusters.use and simplified IAM Roles such as dataproc.editor or dataproc.admin. Members with bindings to the these roles can perform tasks such as submitting jobs and creating workflow templates (which we will discuss shortly), as well as accessing instances such as the master node instance or instances in the cluster using IAP (TCP Forwarding) without requiring a public IP address or a bastion host.

DataProc Jobs and Workflows

Spark jobs can be submitted using the console or via gcloud dataproc jobs submit as shown here:

Submitting a Spark Job using gcloud dataproc jobs submit

Cluster logs are natively available in StackDriver and standard out from the Spark Driver is visible from the console as well as via gcloud commands.

Complex Workflows can be created by adding Jobs as Steps in Workflow Templates using the following command:

gcloud dataproc workflow-templates add-job spark

Optional Components and the Component Gateway

DataProc provides you with a Hadoop cluster including YARN and HDFS, a Spark runtine – which includes Spark SQL and SparkR. DataProc also supports several optional components including Anaconda, Jupyter, Zeppelin, Druid, Presto, and more.

Web interfaces to some of these components as well as the management interfaces such as the Resource Manager UI or the Spark History Server UI can be accessed through the Component Gateway.

This is a Cloud IAM integrated gateway (much like IAP) which can allow access through an authenticated and authorized console session to web UIs in the cluster – without the need for SSH tunnels, additional firewall rules, bastion hosts, or public IPs. Very cool.

Links to the component UIs as well as built in UIs like the YARN Resource Manager UI are available directly from through the console.

Jupyter

Jupyter is a popular notebook application in the data science and analytics communities used for reproducible research. DataProc’s Jupyter component provides a ready-made Spark application vector using PySpark. If you have also installed the Anaconda component you will have access to the full complement of scientific and mathematic Python packages such as Pandas and NumPy which can be used in Jupyter notebooks as well. Using the Component Gateway, Jupyer notebooks can be accessed directly from the Google console as shown here:

Jupyter Notebooks using DataProc

From this example you can see that I accessed source data from a GCS bucket and used HDFS as local scratch space.

Furthermore, notebooks are automagically saved in your integrated Cloud Storage DataProc staging bucket and can be shared amongst analysts or accessed at a later time. These notebooks also persist beyond the lifespan of the cluster.

Next up we will look at deploying a Spark Standalone cluster on a GKE cluster, see you then!

Query Cloud SQL through Big Query

This article demonstrates Cloud SQL federated queries for Big Query, a neat and simple to use feature.

Connecting to Cloud SQL

One of the challenges presented when using Cloud SQL on a private network (VPC) is providing access to users. There are several ways to accomplish this which include:

  • open the database port on the VPC Firewall (5432 for example for Postgres) and let users access the database using a command line or locally installed GUI tool (may not be allowed in your environment)
  • provide a web based interface deployed on your VPC such as PGAdmin deployed on a GCE instance or GKE pod (adds security and management overhead)
  • use the Cloud SQL proxy (requires additional software to be installed and configured)

In additional, all of the above solutions require direct IP connectivity to the instance which may not always be available. Furthermore each of these operations requires the user to present some form of authentication – in many cases the database user and password which then must be managed at an individual level.

Enter Cloud SQL federated queries for Big Query…

Big Query Federated Queries for Cloud SQL

Big Query allows you to query tables and views in Cloud SQL (currently MySQL and Postgres) using the Federated Queries feature. The queries could be authorized views in Big Query datasets for example.

This has the following advantages:

  • Allows users to authenticate and use the GCP console to query Cloud SQL
  • Does not require direct IP connectivity to the user or additional routes or firewall rules
  • Leverages Cloud IAM as the authorization mechanism – rather that unmanaged db user accounts and object level permissions
  • External queries can be executed against a read replica of the Cloud SQL instance to offload query IO from the master instance

Setting it up

Setting up big query federated queries for Cloud SQL is exceptionally straightforward, a summary of the steps are provided below:

Step 1. Enable a Public IP on the Cloud SQL instance

This sounds bad, but it isn’t really that bad. You need to enable a public interface for Big Query to be able to establish a connection to Cloud SQL, however this is not accessed through the actual public internet – rather it is accessed through the Google network using the back end of the front end if you will.

Furthermore, you configure an empty list of authorized networks which effectively shields the instance from the public network, this can be configured in Terraform as shown here:

This configuration change can be made to a running instance as well as during the initial provisioning of the instance.

As shown below you will get a warning dialog in the console saying that you have no authorized networks – this is by design.

Cloud SQL Public IP Enabled with No Authorized Networks

Step 2. Create a Big Query dataset which will be used to execute the queries to Cloud SQL

Connections to Cloud SQL are defined in a Big Query dataset, this can also be use to control access to Cloud SQL using authorized views controlled by IAM roles.

Step 3. Create a connection to Cloud SQL

To create a connection to Cloud SQL from Big Query you must first enable the BigQuery Connection API, this is done at a project level.

As this is a fairly recent feature there isn’t great coverage with either the bq tool or any of the Big Query client libraries to do this so we will need to use the console for now…

Under the Resources -> Add Data link in the left hand panel of the Big Query console UI, select Create Connection. You will see a side info panel with a form to enter connection details for your Cloud SQL instance.

In this example I will setup a connection to a Cloud SQL read replica instance I have created:

Creating a Big Query Connection to Cloud SQL

More information on the Big Query Connections API can be found at: https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest

The following permissions are associated with connections in Big Query:

bigquery.connections.create
bigquery.connections.get
bigquery.connections.list
bigquery.connections.use
bigquery.connections.update
bigquery.connections.delete

These permissions are conveniently combined into the following predefined roles:

roles/bigquery.connectionAdmin    (BigQuery Connection Admin)         
roles/bigquery.connectionUser     (BigQuery Connection User)          

Step 4. Query away!

Now the connection to Cloud SQL can be accessed using the EXTERNAL_QUERY function in Big Query, as shown here:

Querying Cloud SQL from Big Query

Google Cloud SQL – Availability for PostgreSQL – Part II (Read Replicas)

In this post we will look at read replicas as an additional method to achieve multi zone availability for Cloud SQL, which gives us – in turn – the ability to offload (potentially expensive) IO operations such as user created backups or read operations without adding load to the master instance.

In the previous post in this series we looked at Regional availability for PostgreSQL HA using Cloud SQL:

Recall that this option was simple to implement and worked relatively seamlessly and transparently with respect to zonal failover.

Now let’s look at read replicas in Cloud SQL as an additional measure for availability.

Deploying Read Replica(s)

Deploying read replicas is slightly more involved than simple regional (high) availability, as you will need to define each replica or replicas as a separate Cloud SQL instance which is a slave to the primary instance (the master instance).

An example using Terraform is provided here, starting by creating the master instance:

Next you would specify one or more read replicas (typically in a zone other than the zone the master is in):

Note that several of the options supplied are omitted when creating a read replica database instance, such as the backup and maintenance options – as these operations cannot be performed on a read replica as we will see later.

Cloud SQL Instances – showing master and replica
Cloud SQL Master Instance

Voila! You have just set up a master instance (the primary instance your application and/or users will connect to) along with a read replica in a different zone which will be asynchronously updated as changes occur on the master instance.

Read Replicas in Action

Now that we have created a read replica, lets see it in action. After connecting to the read replica (like you would any other instance), attempt to access a table that has not been created on the master as shown here:

SELECT operation from the replica instance

Now create the table and insert some data on the master instance:

Create a table and insert a record on the master instance

Now try the select operation on the replica instance:

SELECT operation from the replica instance (after changes have been made on the master)

It works!

Some Points to Note about Cloud SQL Read Replicas

  • Users connect to a read replica as a normal database connection (as shown above)
  • Google managed backups (using the console or gcloud sql backups create .. ) can NOT be performed against replica instances
  • Read replicas can be used to offload IO intensive operations from the the master instance – such as user managed backup operations (e.g. pg_dump)
pg_dump operation against a replica instance
  • BE CAREFUL Despite their name, read replicas are NOT read only, updates can be made which will NOT propagate back to the master instance – you could get yourself in an awful mess if you allow users to perform INSERT, UPDATE, DELETE, CREATE or DROP operations against replica instances.

Promoting a Read Replica

If required a read replica can be promoted to a standalone Cloud SQL instance, which is another DR option. Keep in mind however as the read replica is updated in an asynchronous manner, promotion of a read replica may result in a loss of data (hopefully not much but a loss nonetheless). Your application RPO will dictate if this is acceptable or not.

Promotion of a read replica is reasonably straightforward as demonstrated here using the console:

Promoting a read replica using the console

You can also use the following gcloud command:

 gcloud sql instances promote-replica  <replica_instance_name>

Once you click on the Promote Replica button you will see the following warning:

Promoting a read replica using the console

This simply states that once you promote the replica instance your instance will become an independent instance with no further relationship with the master instance. Once accepted and the promotion process is complete, you can see that you now have two independent Cloud SQL instances (as advertised!):

Promoted Cloud SQL instance

Some of the options you would normally configure with a master instance would need to be configured on the promoted replica instance – such as high availability, maintenance and scheduled backups – but in the event of a zonal failure you would be back up and running with virtually no data loss!

Full source code for this article is available at: https://github.com/gamma-data/cloud-sql-postgres-availability-tutorial

Google Cloud SQL – Availability, Replication, Failover for PostgreSQL – Part I

In this multi part blog we will explore the features available in Google Cloud SQL for High Availability, Backup and Recovery, Replication and Failover and Security (at rest and in transit) for the PostgreSQL DBMS engine. Some of these features are relatively hot of the press and in Beta – which still makes them available for general use.

We will start by looking at the High Availability (HA) options available to you when using the PostgreSQL engine in Google Cloud SQL.

Most of you would be familiar with the concepts of High Availability, Redundancy, Fault Tolerance, etc but let’s start with a definition of HA anyway:

High availability (HA) is a characteristic of a system, which aims to ensure an agreed level of operational performance, usually uptime, for a higher than normal period.

Wikipedia

Higher than a normal period is quite subjective, typically this is quantified by a percentage represented by a number of “9s”, that is 99.99% (which would be quoted as “four nines”), this would allot you 52.60 minutes of downtime over a one-year period.

Essentially the number of 9’s required will drive your bias towards the options available to you for Cloud SQL HA.

We will start with Cloud SQL HA in its simplest form, Regional Availability.

Regional Availability

Knowing what we know about the Google Cloud Platform, regional availability means that our application or service (in this case Cloud SQL) should be resilient to a failure of any one zone in our region. In fact, as all GCP regions have at least 3 zones – two zones could fail, and our application would still be available.

Regional availability for Cloud SQL (which Google refers to as High Availability), creates a standby instance in addition to the primary instance and uses a regional Persistent Disk resource to store the database instance data, transaction log and other state files, which is synchronously replicated to a Persistent Disk resource local to the zones that the primary and standby instances are located in.

A shared IP address (like a Virtual IP) is used to serve traffic to the healthy (normally primary) Cloud SQL instance.

An overview of Cloud SQL HA is shown here:

Cloud SQL High Availability

Implementing High Availability for Cloud SQL

Implementing Regional Availability for Cloud SQL is dead simple, it is one argument:

availability_type = "REGIONAL"

Using the gcloud command line utility, this would be:

gcloud sql instances create postgresql-instance-1234 \
  --availability-type=REGIONAL \
  --database-version= POSTGRES_9_6

Using Terraform (with a complete set of options) it would look like:

resource "google_sql_database_instance" "postgres_ha" {
  provider = google-beta
  region = var.region
  project = var.project
  name = "postgresql-instance-${random_id.instance_suffix.hex}"
  database_version = "POSTGRES_9_6"
  settings {
   tier = var.tier
   disk_size = var.disk_size
   activation_policy = "ALWAYS"
   disk_autoresize = true
   disk_type = "PD_SSD"
   availability_type = "REGIONAL"
   backup_configuration {
     enabled = true
     start_time = "00:00"
   }
   ip_configuration  {
     ipv4_enabled = false
     private_network = google_compute_network.private_network.self_link
   }
   maintenance_window  {
     day = 7
     hour = 0
     update_track = "stable"
   }
  }
 } 

Once deployed you will notice a few different items in the console, first from the instance overview page you can see that the High Availability option is ENABLED for your instance.

Second, you will see a Failover button enabled on the detailed management view for this instance.

Failover

Failovers and failbacks can be initiated manually or automatically (should the primary be unresponsive). A manual failover can be invoked by executing the command:

gcloud sql instances failover postgresql-instance-1234

There is an --async option which will return immediately, invoking the failover operation asynchronously.

Failover can also be invoked from the Cloud Console using the Failover button shown previously. As an example I have created a connection to a regionally available Cloud SQL instance and started a command which runs a loop and prints out a counter:

Now using the gcloud command shown earlier, I have invoked a manual failover of the Cloud SQL instance.

Once the failover is initiated, the client connection is dropped (as the server is momentarily unavailable):

The connection can be immediately re-established afterwards, the state of the running query is lost – importantly no data is lost however. If your application clients had retry logic in their code and they weren’t executing a long running query, chances are no one would notice! Once reconnecting normal database activities can be resumed:

A quick check of the instance logs will show that the failover event has occured:

Now when you return to the instance page in the console you will see a Failback button, which indicates that your instance is being served by the standby:

Note that there may be a slight delay in the availability of this option as the replica is still being synched.

It is worth noting that nothing comes for free! When you run in REGIONAL or High Availability mode – you are effectively paying double the costs as compared to running in ZONAL mode. However availability and cost have always been trade-offs against one another – you get what you pay for…

More information can be found at: https://cloud.google.com/sql/docs/postgres/high-availability

Next up we will look at read replicas (and their ability to be promoted) as another high availability alternative in Cloud SQL.

The Ultimate AWS to GCP Thesaurus

There are many posts available which map analogous services between the different cloud providers, but this post attempts to go a step further and map additional concepts, terms, and configuration options to be the definitive thesaurus for cloud practitioners familiar with AWS looking to fast track their familiarisation with GCP.

It should be noted that AWS and GCP are fundamentally different platforms, nowhere is this more apparent than in the way networking is implemented between the two providers, see:

This post is focused on the core infrastructure, networking and security services offered by the two major cloud providers, I will do a future post on higher level services such as the ML/AI offerings from the respective providers.

Furthermore this will be a living post which I will continue to update, I encourage comments from readers on additional mappings which I will incorporate into the post as well.

I have broken this down into sections based upon the layout of the AWS Console.

Compute

EC2 (Elastic Compute Cloud)GCE (Google Compute Engine)
Availability ZoneZone
InstanceVM Instance
Instance FamilyMachine Family
Instance TypeMachine Type
Amazon Machine Image (AMI)Image
IAM Role (for an EC2 Instance)Service Account
Security GroupsVPC Firewall Rules (ALLOW)
TagLabel
Termination ProtectionDeletion Protection
Reserved InstancesCommitted Use Discounts
Capacity ReservationReservation
User DataStartup Script
Spot InstancesPreemptible VMs
Dedicated InstancesSole Tenancy
EBS VolumePersistent Disk
Auto Scaling GroupManaged Instance Group
Launch ConfigurationInstance Template
ELB ListenerURL Map (Load Balancer)
ELB Target GroupBackend/ Instance Group
Instance Storage (ephemeral)Local SSDs
EBS SnapshotsSnapshots
KeypairSSH Keys
Elastic IPExternal IP
LambdaGoogle Cloud Functions
Elastic BeanstalkGoogle App Engine
Elastic Container Registry (ECR)Google Container Registry (GCR)
Elastic Container Service (ECS)Google Kubernetes Engine (GKE)
Elastic Kubernetes Service (EKS)Google Kubernetes Engine (GKE)
AWS FargateCloud Run
AWS Service QuotasAllocation Quotas
Account (within an Organisation)†Project
RegionRegion
AWS Cloud​FormationCloud Deployment Manager

Storage

Simple Storage Service (S3)Google Cloud Storage (GCS)
Standard Storage ClassStandard Storage Class
Infrequent Access Storage ClassNearline Storage Class
Amazon GlacierColdline Storage Class
Lifecycle PolicyRetention Policy
TagsLabels
SnowballTransfer Appliance
Requester PaysRequester Pays
RegionLocation Type/Location
Object LockHold
Vault Lock (Glacier)Bucket Lock
Multi Part UploadParallel Composite Transfer
Cross-Origin Resource Sharing (CORS)Cross-Origin Resource Sharing (CORS)
Static Website HostingBucket Website Configuration
S3 Access PointsVPC Service Controls
Object NotificationsPub/Sub Notifications for Cloud Storage
Presigned URLSigned URL
Transfer AccelerationStorage Transfer Service
Elastic File System (EFS)Cloud Filestore
AWS DataSyncTransfer Service for on-premises data
ETagETag
BucketBucket
aws s3gsutil

Database

Relational Database Service (RDS)Cloud SQL
DynamoDBCloud Datastore
ElastiCacheCloud Memorystore
Table (DynamoDB)Kind (Cloud Datastore)
Item (DynamoDB)Entity (Cloud Datastore)
Partition Key (DynamoDB)Key (Cloud Datastore)
Attributes (DynamoDB)Properties (Cloud Datastore)
Local Secondary Index (DynamoDB)Composite Index (Cloud Datastore)
Elastic Map Reduce (EMR)Cloud DataProc
AthenaBig Query
AWS GlueCloud DataFlow
Glue CatalogData Catalog
Amazon Simple Notification Service (SNS)Cloud PubSub (push subscription)
Amazon KinesisCloud PubSub
Amazon Simple Queue Service (SQS)Cloud PubSub (poll and pull mode)

Networking & Content Delivery

Virtual Private Cloud (VPC) (Regional)VPC Network (Global or Regional)
Subnet (Zonal)Subnet (Regional)
Route TablesRoutes
Network ACLs (NACLS)VPC Firewall Rules (ALLOW or DENY)
CloudFrontCloud CDN
Route 53Cloud DNS/Google Domains
Direct ConnectDedicated (or Partner) Interconnect
Virtual Private Network (VPN)Cloud VPN
AWS PrivateLinkGoogle Private Access
NAT GatewayCloud NAT
Elastic Load BalancerLoad Balancer
AWS WAFCloud Armour
VPC Peering ConnectionVPC Network Peering
Amazon API GatewayApigee API Gateway
Amazon API GatewayCloud Endpoints

Security, Identity, & Compliance

Root AccountSuper Admin
IAM UserMember
IAM PolicyRole (Collection of Permissions)
IAM Policy AttachmentIAM Role Binding (or IAM Binding)
Key Management Service (KMS)Cloud KMS
CloudHSMCloud HSM
Amazon Inspector (agent based)Cloud Security Scanner (scan based)
AWS Security HubCloud Security Command Center (SCC)
Secrets ManagerSecret Manager
Amazon MacieCloud Data Loss Prevention (DLP)
AWS WAFCloud Armour
AWS ShieldCloud Armour

† No direct equivalent, this is the closest equivalent

Google Cloud Storage Object Notifications using Slack

This article describes the steps to integrate Slack with Google Cloud Functions to get notified about object events within a specified Google Cloud Storage bucket.

Google Cloud Storage Object Notifications using Slack

Events could include the creation of new objects, as well as delete, archive or metadata operations performed on a given bucket.

This pattern could be easily extended to other event sources supported by Cloud Functions including:

  • Cloud Pub/Sub messages
  • Cloud Firestore and Firebase events
  • Stackdriver log entries

More information can be found at https://cloud.google.com/functions/docs/concepts/events-triggers.

The prerequisite steps to configure Slack are provided here:

1. First you will need to create a Slack app (assuming you have already set up an account and a workspace). The following screenshots demonstrate this process:

Create a Slack app
Give the app a name and associate it with an existing Slack workspace

2. Next you need to Enable and Activate Incoming Webhooks to your app and add this to your workspace. The following screenshots demonstrate this process:

Enable Incoming Web Hooks for the app
Activate incoming webhooks
Add the webhook to your workspace

3. Next you need to specify a channel for notifications generated from object events.

Select a channel for the webhook

4. Now you need to copy the Webhook url provided, you will use this later in your Cloud Function.

Copy the webhook URL to the clipboard

Treat your webhook url as a secret, do not upload this to a public source code repository

Next you need to create your Cloud Function, this example uses Python but you can use an alternative runtime including Node.js or Go.

This example templates the source code using the Terraform template_file data source. The function source code is shown here:

Within your Terraform code you need to render your Cloud Function code substituting the slack_webhook_url for it’s value which you will supply as a Terraform variable. The rendered template file is then placed in a local directory along with a requirements.txt file and zipped up. The resulting Zip archive is uploaded to a specified bucket where it will be sourced to create the Cloud Function.

Now you need to create the Cloud Function, the following HCL snippet demonstrates this:

The event_trigger block in particular specifies which GCS bucket to watch and what events will trigger invocation of the function. Bucket events include:

  • google.storage.object.finalize (the creation of a new object)
  • google.storage.object.delete
  • google.storage.object.archive
  • google.storage.object.metadataUpdate

You could add additional logic to the Cloud Function code to look for specific object names or naming patterns, but keep in mind the function will fire upon every event matching the event_type and resource criteria.

To deploy the function, you would simply run:

terraform apply -var="slack_webhook_url=https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX"

Now once you upload a file named test-object.txt, voilà!:

Slack notification for a new object created

Full source code is available at: https://github.com/gamma-data/gcs-object-notifications-using-slack

Map Reduce is Dead, Long Live Map Reduce

Firstly, this is not another Hadoop obituary, there are enough of those out there already.

The generalized title of this article has been used as an expression to convey the idea that something old has been replaced by something new. In the case of the expression “the King is dead, long live the King” the inference is that although one monarch has passed, another monarch instantly succeeds him.

In the age of instant gratification and hype cycle driven ‘pump and dump’ investment we are very quick to discard technologies that don’t realise overzealous targets for sales or market share. In our continuous attempts to find the next big thing, we are quick to throw out the last big thing and everything associated with it.

The Reports of My Death Have Been Greatly Exaggerated

A classic example of this is the notion that Map Reduce is dead. Largely proliferated by the Hadoop obituaries which seem to be growing exponentially with each day.

A common e-myth is that Google invented the Map Reduce pattern, which is completely incorrect. In 2004, Google described a framework distributed systems implementation of the Map Reduce pattern in a white paper named “MapReduce: Simplified Data Processing on Large Clusters.” – this would inspire the first-generation processing framework (MapReduce) in the Hadoop project. But neither Google nor Yahoo! nor contributors to the Hadoop project (which include the pure play vendors) created the Map Reduce algorithm or processing pattern and neither shall any one of these have the rights to kill it.

The origins of the Map Reduce pattern can be traced all the way back to the early foundations of functional programming beginning with Lambda Calculus in the 1930s to LISP in the 1960s. Map Reduce is an integral pattern in all of today’s functional and distributed systems programming. You only need to look at the support for map() and reduce() operators in some of the most popular languages today including Python, JavaScript, Scala, and many more languages that support functional programming.

As far as distributed processing frameworks go, the Map Reduce pattern and its map() and reduce() methods are very prominent as higher order functions in APIs such as Spark, Kafka Streams, Apache Samza and Apache Flink to name a few.

While the initial Hadoop adaptation of Map Reduce has been supplanted by superior approaches, the Map Reduce processing pattern is far from dead.

On the fall of Hadoop…

There is so much hysteria around the fall of Hadoop, we need to be careful not to toss the baby out with the bath water. Hadoop served a significant role in bringing open source, distributed systems from search engine providers to academia all the way to the mainstream, and still serves an important purpose in many organizations data ecosystems today and will continue to do so for some time.

OK, it wasn’t the panacea to everything, but who said it was supposed to be? The Hadoop movement was hijacked by hysteria, hype, venture capital, over ambitious sales targets and financial engineering – this does not mean the technology was bad.

Hadoop spawned many significant related projects such as Spark, Kafka and Presto to name a few. These projects paved the way for cloud integration, which is now the dominant vector for data storage, processing, and analysis.

While the quest for world domination by the Hadoop pure play vendors may be over, the Hadoop movement (and the impact it has had on the enterprise data landscape) will live on.

Change Data Capture at Scale using Spark

Change Data Capture (CDC) is one of the most challenging processing patterns to implement at scale. I personally have had several cracks at this using various different frameworks and approaches, the most recent of which was implemented using Spark – and I think I have finally found the best approach. Even though the code examples referenced use Spark, the pattern is language agnostic – the focus is on the approach not the specific implementation (as this could be applied to any framework or runtime).

The first challenge you are faced with, is to compare a very large dataset (representing the current state of an object) with another potentially very large dataset (representing new or incoming data). Ideally, you would like the process to be configuration driven and accommodate such things as composite primary keys, or operational columns which you would like to restrict from change detection. You may also want to implement a pattern to segregate sensitive attributes from non-sensitive attributes.

Overview

This pattern (and all my other recent attempts) is fundamentally based upon calculating a deterministic hash of the key and non-key attribute(s), and then using this hash as the basis for comparison. The difference between this pattern and my other attempts is in the distillation and reconstitution of data during the process, as well as breaking the pattern into discrete stages (designed to minimize the impact to other applications). This pattern can be used to process delta or full datasets.

A high-level flowchart representing the basic pattern is shown here:

CDC Flowchart

The Example

The example provided uses the Synthetic CDC Data Generator application, configuring an incoming set with 5 uuid columns acting as a composite key, and 10 random number columns acting as non key values. The initial days payload consists of 10,000 records, the subsequent days payload consists of another 10,000 records. From the initial dataset, a DELETE operation was performed at the source system for 20% of records, an UPDATE was performed on 40% of the records and the remaining 40% of records were unchanged. In this case the 20% of records that were deleted at the source, were replaced by new INSERT operations creating new keys.

After creating the synthesized day 1 and day 2 datasets, the files are processed as follows:

$ spark-submit cdc.py config.yaml data/day1 2019-06-18
$ spark-submit cdc.py config.yaml data/day2 2019-06-19

Where config.yaml is the configuration for the dataset, data/day1 and data/day2 represent the different data files, and 2019-06-18 and 2019-06-19 represent a business effective date.

The Results

You should see the following output from running the preceding commands for day 1 and day 2 respectively:

Day 1:

Day 2:

A summary analysis of the resultant dataset should show:

Pattern Details

Details about the pattern and its implementation follow.

Current and Historical Datasets

The output of each operation will yield a current dataset (that is the current stateful representation of a give object) and a historical dataset partition (capturing the net changes from the previous state in an appended partition).

This is useful, because often consumers will primarily query the latest state of an object. The change sets (or historical dataset partitions) can be used for more advanced analysis by sophisticated users.

Type 2 SCDs (sort of)

Two operational columns are added to each current and historical object:

  • OPERATION : Represents the last known operation to the record, valid values include :
    • I (INSERT)
    • U (UPDATE)
    • D (DELETE – hard DELETEs, applies to full datasets only)
    • X (Not supplied, applies to delta processing only)
    • N (No change)
  • EFF_START_DATE

Since data structures on most big data or cloud storage platforms are immutable, we only store the effective start date for each record, this is changed as needed with each coarse-grained operation on the current object. The effective end date is inferred by the presence of a new effective start date (or change in the EFF_START_DATE value for a given record).

The Configuration

I am using a YAML document to store the configuration for the pattern. Important attributes to include in your configuration are a list of keys and non keys and their datatype (this implementation does type casting as well). Other important attributes include the table names and file paths for the current and historical data structures.

The configuration is read at the beginning of a routine as an input along with the path of an incoming data file (a CSV file in this case) and a business effective date (which will be used as the EFF_START_DATE for new or updated records).

Processing is performed using the specified key and non key attributes and the output datasets (current and historical) are written to columnar storage files (parquet in this case). This is designed to make subsequent access and processing more efficient.

The Algorithm

I have broken the process into stages as follows:

Stage 1 – Type Cast and Hash Incoming Data

The first step is to create deterministic hashes of the configured key and non key values for incoming data. The hashes are calculated based upon a list of elements representing the key and non key values using the MD5 algorithm. The hashes for each record are then stored with the respective record. Furthermore, the fields are casted their target datatype as specified in the configuration. Both of these operations can be performed in a single pass of each row using a map() operation.

Importantly we only calculate hashes once upon arrival of new data, as the hashes are persisted for the life of the data – and the data structures are immutable – the hashes should never change or be invalidated.

Stage 2 – Determine INSERTs

We now compare Incoming Hashes with previously calculated hash values for the (previous day’s) current object. If no current object exists for the dataset, then it can be assumed this is a first run. In this case every record is considered as an INSERT with an EFF_START_DATE of the business effective date supplied.

If there is a current object, then the key and non key hash values (only the hash values) are read from the current object. These are then compared to the respective hashes of the incoming data (which should still be in memory).

Given the full outer join:

incoming_data(keyhash, nonkeyhash) 
FULL OUTER JOIN  
current_data(keyhash, nonkeyhash) 
ON keyhash

Keys which exist in the left entity which do not exist in the right entity must be the results of an INSERT operation.

Tag these records with an operation of I with an EFF_START_DATE of the business effective date, then rejoin only these records with their full attribute payload from the incoming dataset. Finally, write out these records to the current and historical partition in overwrite mode.

Stage 3 – Determine DELETEs or Missing Records

Referring the previous full outer join operation, keys which exist in the right entity (current object) which do not appear in the left entity (incoming data) will be the result of a (hard) DELETE operation if you are processing full snapshots, otherwise if you are processing deltas these would be missing records (possibly because there were no changes at the source).

Tag these records as D or X respectively with an EFF_START_DATE of the business effective date, rejoin these records with their full attribute payload from the current dataset, then write out these records to the current and historical partition in append mode.

Stage 4 – Determine UPDATEs or Unchanged Records

Again, referring to the previous full outer join, keys which exist in both the incoming and current datasets must be either the result of an UPDATE or they could be unchanged. To determine which case they fall under, compare the non key hashes. If the non key hashes differ, it must have been a result of an UPDATE operation at the source, otherwise the record would be unchanged.

Tag these records as U or N respectively with an EFF_START_DATE of the business effective date (in the case of an update – otherwise maintain the current EFF_START_DATE), rejoin these records with their full attribute payload from the incoming dataset, then write out these records to the current and historical partition in append mode.

Key Callouts

A summary of the key callouts from this pattern are:

  • Use the RDD API for iterative record operations (such as type casting and hashing)
  • Persist hashes with the records
  • Use Dataframes for JOIN operations
  • Only perform JOINs with the keyhash and nonkeyhash columns – this minimizes the amount of data shuffled across the network
  • Write output data in columnar (Parquet) format
  • Break the routine into stages, covering each operation, culminating with a saveAsParquet() action – this may seem expensive but for large datsets it is more efficient to break down DAGs for each operation
  • Use caching for objects which will be reused between actions

Metastore Integration

Although I did not include this in my example, you could easily integrate this pattern with a metastore (such as a Hive metastore or AWS Glue Catalog), by using table objects and ALTER TABLE statements to add historical partitions.

Further optimisations

If the incoming data is known to be relatively small (in the case of delta processing for instance), you could consider a broadcast join where the smaller incoming data is distributed to all of the different Executors hosting partitions from the current dataset.

Also you could add a key to the column config to configure a column to be nullable or not.

Happy CDCing!

Full source code for this article can be found at: https://github.com/avensolutions/cdc-at-scale-using-spark

Synthetic CDC Data Generator

This is a simple routine to generate random data with a configurable number or records, key fields and non key fields to be used to create synthetic data for source change data capture (CDC) processing. The output includes an initial directory containing CSV files representing an initial data load, and an incremental directory containing CSV files representing incremental data.

Arguments (by position) include:

  • no_init_recs : the number of initial records to generate
  • no_incr_recs : the number of incremental records on the second run – should be >= no_init_recs
  • no_keys : number of key columns in the dataset – keys are generated as UUIDs
  • no_nonkeys : number of non-key columns in the dataset – nonkey values are generated as random numbers
  • pct_del : percentage of initial records deleted on the second run – between 0.0 and 1.0
  • pct_upd : percentage of initial records updated on the second run – between 0.0 and 1.0
  • pct_unchanged : percentage of records unchanged on the second run – between 0.0 and 1.0
  • initial_output : folder for initial output in CSV format
  • incremental_output : folder for incremental output in CSV format

NOTE : pct_del + pct_upd + pct_unchanged must equal 1.0

Example usage:

$ spark-submit synthetic-cdc-data-generator.py 100000 100000 2 3 0.2 0.4 0.4 data/day1 data/day2

Example output from the day1 run for the above configuration would look like this:

Note that this routine can be run subsequent times producing different key and non key values each time, as the keys are UUIDs and the values are random numbers.

We will use this application to generate random input data to demonstrate CDC using Spark in a subsequent post, see you soon!

Full source code can be found at: https://github.com/avensolutions/synthetic-cdc-data-generator