So you’re using BigQuery (BQ). It’s all set up and humming perfectly. Maybe now, you want to run an ELT job whenever a new table partition is created, or maybe you want to retrain your ML model whenever new rows are inserted into the BQ table.
In my previous article on EventArc, we went through how Logging can help us create eventing-type functionality in your application. Let’s take it a step further and walk through how we can couple BigQuery and Cloud Run.
In this article you will learn how to
Tie together BigQuery and Cloud Run
Use BigQuery’s audit log to trigger Cloud Run
With those triggers, run your required code
Let’s create a temporary dataset within BigQuery named tmp_bq_to_cr.
In that same dataset, let’s create a table in which we will insert some rows to test our BQ audit log. Let’s grab some rows from a BQ public dataset to create this table:
CREATE OR REPLACE TABLE tmp_bq_to_cr.cloud_run_trigger AS
date, country_name, new_persons_vaccinated, population
date > '2021-05-31'
Following this, let’s run an insert query that will help us build our mock database trigger:
INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 3, 1000)
Now, in another browser tab let’s navigate to BQ Audit Events and look for our INSERT INTO event:
There will be several audit logs for any given BQ action. Only after a query is parsed does BQ know which table we want to interact with, so the initial log will, for e.g., not have the table name.
We don’t want any old audit log, so we need to ensure we look for a unique set of attributes that clearly identify our action, such as in the diagram above.
In the case of inserting rows, the attributes are a combination of
The method is google.cloud.bigquery.v2.JobService.InsertJob
The name of the table being inserted to is the protoPayload.resourceName
The dataset id is available as resource.labels.dataset_id
The number of inserted rows is protoPayload.metadata.tableDataChanged.insertedRowsCount
Time for some code
Now that we’ve identified the payload that we’re looking for, we can write the action for Cloud Run. We’ve picked Python and Flask to help us in this instance. (full code is on GitHub).
First, let’s filter out the noise and find the event we want to process
# Gets the Payload data from the Audit Log
content = request.json
ds = content['resource']['labels']['dataset_id']
proj = content['resource']['labels']['project_id']
tbl = content['protoPayload']['resourceName']
rows = int(content['protoPayload']['metadata']
if ds == 'cloud_run_tmp' and \
tbl.endswith('tables/cloud_run_trigger') and rows > 0:
query = create_agg()
return "table created", 200
# if these fields are not in the JSON, ignore
return "ok", 200
Now that we’ve found the event we want, let’s execute the action we need. In this example, we’ll aggregate and write out to a new table created_by_trigger:
client = bigquery.Client()
query = """
CREATE OR REPLACE TABLE tmp_bq_to_cr.created_by_trigger AS
count_name, SUM(new_persons_vaccinated) AS n
The Dockerfile for the container is simply a basic Python container into which we install Flask and the BigQuery client library:
Metadata Hub (MDH) is intended to be the source of truth for metadata around the Company’s platform. It has the ability to load metadata configuration from yaml, and serve that information up via API. It will also be the store of information for pipeline information while ingesting files into the platform.
Config-Driven. Anyone who has been authorized to do so, should be able to add another ‘table-info.yaml’ in to MDH without the need to update any code in the system
Here’s how table information makes its way into MDH:
summary: All tables in MDH
description: get the title of all tables that exist in MDH
summary: Creates a new table in MDH
description: Creates a new table in MDH
summary: Obtain information about specific table
summary: All columns for a particular table
description: Obtain information on columns for a particular table
summary: All information about a particular end-to-end batch run of file ingestion
summary: Update metadata on a batch load
description: Update metadata on a batch load
summary: Use this to save on calculation of business days.
description: This base response gives you today’s date in a string
summary: Will return a string of the previous business day
description: Will return a string of the previous business day, based on the date on when it’s called
summary: Will return a string of the next business day
description: Will return a string of the next business day, based on the date on when it’s called
Yaml to Datastore – Entity/Kind design
Before we jump right into Entity Groups in Datastore, it is important to first go over the basics and establish a common vocabulary. Datastore holds entities, which are objects, that can contain various key/value pairs, called properties. Each entity must contain a unique identifier, known as a key. When creating an entity, a user can choose to specify a custom key or let Datastore create a key. If a user decides to specify a custom key, it will contain two fields: a kind, which represents a category such as ‘Toy’ or ‘Marital Status’, and a name, which is the identifying value. If a user decides to only specify a kind when creating a key, and does not specify a unique identifier, Datastore automatically generates an ID behind the scenes. Below is an example of a Python3 script which illustrates this identifier concept.
from google.cloud import datastore
client = datastore.Client()
#Custom key- specify my kind=item and a unique_id of broker
custom_key_entry = datastore.Entity(client.key("table","broker"))
#Only specify kind=item, let datastore generate unique_id
datastore_gen_key_entry = datastore.Entity(client.key("table"))
In your GCP Console under Datastore, you will then see your two entities of kind “table”. One will contain your custom key and one will contain the automatically generated key.
Ancestors and Entity Groups
For highly related or hierarchical data, Datastore allows entities to be stored in a parent/child relationship. This is known as an entity group or ancestor/descendent relationship.
This is an example of an entity group with kinds of types: table, column, and classification. The ‘Grandparent’ in this relationship is the ‘table’. In order to configure this, one must first create the table entity. Then, a user can create a column, and specify that the parent is a table key. In order to create the grandchild, a user then creates a classification and sets its parent to be a column key. To further add customizable attributes, a user can specify additional key-value pairs such as pii and data_type. These key-value pairs are stored as properties. We model this diagram in Datastore in our working example below.
One can create entity groups by setting the ‘parent’ parameter while creating an entity key for a child. This command adds the parent key to be part of the child entity key. The child’s key is represented as a tuple (‘parent_key’, ‘child_key’), such that the parents’ key is the prefix of the key, which is followed by its own unique identifier. For example, follow the diagram above:
Printing the variable table_key will display: ("table", "broker","column", "broker_legal_name")
Datastore also supports chaining of parents, which can lead to very large keys for descendants with a long lineage of ancestors. Additionally, parents can have multiple children (representing a one-to-many relationship). However, there is no native support for entities to have multiple parents (representing a many-to-many relationship). Once you have configured this ancestral hierarchy, it is easy to retrieve all descendants for a given parent. You can do this by querying on the parent key by using the ‘ancestor’ parameter. For example, given the entity table_key created above, I can query for all of the tables
As per our Key Philosophies – Config-Driven – anyone should be able to add a new table to be processed and landed in a target-table somewhere within MDH with our yaml syntax. Below is a full working python3 example of the table/column/classification hierarchical model described above.
Now that we have the column entity, let’s locate it’s parent id.
column = list(query1.fetch())
print("This column belongs to: " +str(column.key.parent.id_or_name))
Further to this, we can also get all data classification elements attributed to a single column using the ancestor clause query.
query2 = datastore_client.query(kind="classification", ancestor=column.key)
for classification in list(query2.fetch()):
For more complex queries, Datastore has the concept of indexes being set, usually via it’s index.yaml configuration. The following is an example of an index.yaml file:
- kind: Cat
- name: name
- name: age
- kind: Cat
- name: name
- name: whiskers
- kind: Store
- name: business
- name: owner
Indexes are important when attempting to add filters on more than one particular attribute within a Datastore entity. For example, the following code will fail:
# Adding a '>' filter will cause this to fail. Sidenote; it will work
# without an index if you add another '=' filter.
query2 = datastore_client.query(kind="classification", ancestor=column.key)
query2.add_filter("pii", ">", 0)
for classification in list(query2.fetch()):
To rectify this issue, you need to create an index.yaml that looks like the following:
The official pypi package for google-cloud-datastore can be found here: https://pypi.org/project/google-cloud-datastore/. At the time of writing, Firestore in Datastore-mode will be the way forward, as per the release note from January 31, 2019.
Cloud Firestore is now Generally Available. Cloud Firestore is the new version of Cloud Datastore and includes a backwards-compatible Datastore mode.
If you intend to use the Cloud Datastore API in a new project, use Cloud Firestore in Datastore mode. Existing Cloud Datastore databases will be automatically upgraded to Cloud Firestore in Datastore mode.
Except where noted, the Cloud Datastore documentation now describes behavior for Cloud Firestore in Datastore mode.
We’ve purposefully created MDH in Datastore to show you how it was done originally, and we’ll be migrating the Datastore code to Firestore in an upcoming post.
Creating and deleting indexes within Datastore will need to be done through the REST API via googleapiclient.discovery, as this function doesn’t exist via the google-cloud-datastore API. Working with the discovery api client can be a bit daunting for a first-time user, so here’s the code to add an index on Datastore:
How did we craft this API request? We can use the Google API Discovery Service to build client libraries, IDE plugins, and other tools that interact with Google APIs. The Discovery API provides a list of Google APIs and a machine-readable "Discovery Document" for each API. Features of the Discovery API:
A directory of supported APIs schemas based on JSON Schema.
A machine-readable "Discovery Document" for each of the supported APIs. Each document contains:
A list of API methods and available parameters for each method.
A list of available OAuth 2.0 scopes.
Inline documentation of methods, parameters, and available parameter values.
Navigating to the API reference page for Datastore and going to the ‘Datastore Admin’ API page, we can see references to the Indexes and RESTful endpoints we can hit for those Indexes. Therefore, looking at the link for the Discovery document for Datastore:
From this, we can build out our instantiation for the google api discovery object
build(‘datastore’, ‘v1’, credentials=credentials)
With respect to building out the body aspect of the request, I’ve found crafting that part within the ‘Try this API’ section of https://cloud.google.com/datastore/docs/reference/admin/rest/v1/projects.indexes/create pretty valuable.
With this code, your index should show up in your Datastore console! You can also retrieve them within gcloud with gcloud datastore indexes list if you’d like to verify the indexes outside our python code. So there you have it: a working example of entity groups, ancestors, indexes and Metadata within Datastore. Have fun coding!
Snowflake allows roles to be assigned to other roles, so when a user is assigned to a role, they may inherit the ability to use countless other roles.
Challenge: recursively enumerate all roles for a given user
One solution would be to create a complex query on the “SNOWFLAKE"."ACCOUNT_USAGE"."GRANTS_TO_ROLES" object.
An easier solution is to use a stored procedure to recurse through grants for a given user and return an ARRAY of roles for that user.
To call the stored proc, execute:
One drawback of stored procedures in Snowflake is that they can only have scalar or array return types and cannot be used directly in a SQL query, however you can use the table(result_scan(last_query_id())) trick to get around this, as shown below where we will pivot the ARRAY into a record set with the array elements as rows:
This query must be the next statement run immediately after the CALL statement and cannot be run again until you run another CALL statement.
This is a follow up to the original Cloud Bigtable primer where we discussed the basics of Cloud Bigtable:
In this article we will cover schema design and row key selection in Bigtable – arguably the most critical design decision to make when employing Bigtable in a cloud data architecture.
Recall from the previous post where the Bigtable data model was introduced that tables in Bigtable are comprised of rows and columns – much like a table in any other RDBMS. Every row is uniquely identified by a rowkey – again akin to a primary key in a table in an RDBMS. But this is where the similarities end.
Unlike a table in an RDBMS, columns only ever exist when they are inserted, and NULLs are not stored. See the illustration below:
Row Key Selection
Data in Bigtable is distributed by row keys. Row keys are physically stored in tablets in lexographic order. Recall that row keys are your ONLY indexes to data in Bigtable.
As row keys are your only indexes to retrieve or update rows in Bigtable, row key design must take the access patterns for the data to be stored and served via Bigtable into consideration, specifically the following must be considered when designing a Bigtable application:
Search patterns (returning data for a specific entity)
Scan patterns (returning batches of data)
Queries that use the row key, a row prefix, or a row range are the most efficient. Queries that do not include a row key will typically scan GB or TB of data and would not be suitable for operational use cases.
Row Key Performance
Row key performance will be biased towards your specific access patterns and application functional requirements. For example if you are performing sequential reads or scan operations then sequential keys will perform the best, however their write performance will not be optimal. Conversely, random keys (such as a uuid) will perform best for writes but poor for scan or sequential read operations.
Adding salts to keys (or additional data), similar to the use of salts in cryptography as well as promoting other field keys to be part of a composite row key can help achieve a “Goldilocks” scenario for both reads and writes, see the diagram below:
Using Reverse Timestamps
Use reverse timestamps when your most common query is for the latest values. Typically you would append the reverse timestamp to the key, this will ensure that the same related records are grouped together, for instance if you are storing events for a customer using the customer id along with an appended reverse timestamp (for example <customer_id>#<reverse_ts>) would allow you to quickly serve the latest events for a customer in descending order as within each group (customer_id), rows will be sorted so most recent insert will be located at the top. A reverse timestamp can be generalised as:
Long.MAX_VALUE - System.currentTimeMillis()
Schema Design Tips
Some general tips for good schema design using Bigtable are summarised below:
Group related data for more efficient reads using column families
Distribute data evenly for more efficient writes
Place identical values in the adjoining rows for more efficient compression using row keys
Following these tips will give you the best possible performance using Bigtable.
Use the Key Visualizer to profile performance
Google provides a neat tool to visualize your row key distribution in Cloud Bigtable. You need to have at least 30 GB of data in your table to enable this feature.
The Key Visualizer is shown here:
The Key Visualizer will help you find and prevent hotspots, find rows with too much data and show if your key schema is balanced.
Bigtable is one of the original and best (massively) distributed NoSQL platforms available. Schema and moreover row key design play a massive part in ensuring low latency and query performance. Go forth and conquer with Cloud Bigtable!
I have been an avid Spark enthusiast since 2014 (the early days..). Spark has featured heavily in every project I have been involved with from data warehousing, ETL, feature extraction, advanced analytics to event processing and IoT applications. I like to think of it as a Swiss army knife for distributed processing.
Spark Training Courses from the AlphaZetta Academy
Curiously enough, the first project I had been involved with for some years that did not feature the Apache Spark project was a green field GCP project which got me thinking… where does Spark fit into the GCP landscape?
Unlike the other major providers who use Spark as the backbone of their managed distributed ETL services with examples such as AWS Glue or the Spark integration runtime option in Azure Data Factory, Google’s managed ETL solution is Cloud DataFlow. Cloud DataFlow which is a managed Apache Beam service does not use a Spark runtime (there is a Spark Runner however this is not an option when using CDF). So where does this leave Spark?
My summation is that although Spark is not a first-class citizen in GCP (as far as managed ETL), it is not a second-class citizen either. This article will discuss the various ways Spark clusters and applications can be deployed within the GCP ecosystem.
Quick Primer on Spark
Every Spark application contains several components regardless of deployment mode, the components in the Spark runtime architecture are:
the Cluster Manager
the Executor(s), which run on worker nodes or Workers
Each component has a specific role in executing a Spark program and all of the Spark components run in Java virtual machines (JVMs).
Cluster Managers schedule and manage distributed resources (compute and memory) across the nodes of the cluster. Cluster Managers available for Spark include:
Spark on DataProc
This is perhaps the simplest and most integrated approach to using Spark in the GCP ecosystem.
DataProc is GCP’s managed Hadoop Service (akin to AWS EMR or HDInsight on Azure). DataProc uses Hadoop/YARN as the Cluster Manager. DataProc clusters can be deployed on a private network (VPC using RFC1918 address space), supports encryption at Rest using Google Managed or Customer Managed Keys in KMS, supports autoscaling and the use of Preemptible Workers, and can be deployed in a HA config.
Furthermore, DataProc clusters can enforce strong authentication using Kerberos which can be integrated into other directory services such as Active Directory through the use of cross realm trusts.
DataProc clusters can be deployed using the gcloud dataproc clusters create command or using IaC solutions such as Terraform. For this article I have included an example in the source code using the gcloud command to deploy a DataProc cluster on a private network which was created using Terraform.
The beauty of DataProc is its native integration into IAM and the GCP service plane. Having been a long-time user of AWS EMR, I have found that the usability and integration are in many ways superior in GCP DataProc. Let’s look at some examples…
IAM and IAP (TCP Forwarding)
DataProc is integrated into Cloud IAM using various coarse grained permissions use as dataproc.clusters.use and simplified IAM Roles such as dataproc.editor or dataproc.admin. Members with bindings to the these roles can perform tasks such as submitting jobs and creating workflow templates (which we will discuss shortly), as well as accessing instances such as the master node instance or instances in the cluster using IAP (TCP Forwarding) without requiring a public IP address or a bastion host.
DataProc Jobs and Workflows
Spark jobs can be submitted using the console or via gcloud dataproc jobs submit as shown here:
Cluster logs are natively available in StackDriver and standard out from the Spark Driver is visible from the console as well as via gcloud commands.
Complex Workflows can be created by adding Jobs as Steps in Workflow Templates using the following command:
gcloud dataproc workflow-templates add-job spark
Optional Components and the Component Gateway
DataProc provides you with a Hadoop cluster including YARN and HDFS, a Spark runtine – which includes Spark SQL and SparkR. DataProc also supports several optional components including Anaconda, Jupyter, Zeppelin, Druid, Presto, and more.
Web interfaces to some of these components as well as the management interfaces such as the Resource Manager UI or the Spark History Server UI can be accessed through the Component Gateway.
This is a Cloud IAM integrated gateway (much like IAP) which can allow access through an authenticated and authorized console session to web UIs in the cluster – without the need for SSH tunnels, additional firewall rules, bastion hosts, or public IPs. Very cool.
Links to the component UIs as well as built in UIs like the YARN Resource Manager UI are available directly from through the console.
Jupyter is a popular notebook application in the data science and analytics communities used for reproducible research. DataProc’s Jupyter component provides a ready-made Spark application vector using PySpark. If you have also installed the Anaconda component you will have access to the full complement of scientific and mathematic Python packages such as Pandas and NumPy which can be used in Jupyter notebooks as well. Using the Component Gateway, Jupyer notebooks can be accessed directly from the Google console as shown here:
From this example you can see that I accessed source data from a GCS bucket and used HDFS as local scratch space.
Furthermore, notebooks are automagically saved in your integrated Cloud Storage DataProc staging bucket and can be shared amongst analysts or accessed at a later time. These notebooks also persist beyond the lifespan of the cluster.
Next up we will look at deploying a Spark Standalone cluster on a GKE cluster, see you then!
Change Data Capture (CDC) is one of the most challenging processing patterns to implement at scale. I personally have had several cracks at this using various different frameworks and approaches, the most recent of which was implemented using Spark – and I think I have finally found the best approach. Even though the code examples referenced use Spark, the pattern is language agnostic – the focus is on the approach not the specific implementation (as this could be applied to any framework or runtime).
Spark Training Courses from the AlphaZetta Academy
The first challenge you are faced with, is to compare a very large dataset (representing the current state of an object) with another potentially very large dataset (representing new or incoming data). Ideally, you would like the process to be configuration driven and accommodate such things as composite primary keys, or operational columns which you would like to restrict from change detection. You may also want to implement a pattern to segregate sensitive attributes from non-sensitive attributes.
This pattern (and all my other recent attempts) is fundamentally based upon calculating a deterministic hash of the key and non-key attribute(s), and then using this hash as the basis for comparison. The difference between this pattern and my other attempts is in the distillation and reconstitution of data during the process, as well as breaking the pattern into discrete stages (designed to minimize the impact to other applications). This pattern can be used to process delta or full datasets.
A high-level flowchart representing the basic pattern is shown here:
The example provided uses the Synthetic CDC Data Generator application, configuring an incoming set with 5 uuid columns acting as a composite key, and 10 random number columns acting as non key values. The initial days payload consists of 10,000 records, the subsequent days payload consists of another 10,000 records. From the initial dataset, a DELETE operation was performed at the source system for 20% of records, an UPDATE was performed on 40% of the records and the remaining 40% of records were unchanged. In this case the 20% of records that were deleted at the source, were replaced by new INSERT operations creating new keys.
After creating the synthesized day 1 and day 2 datasets, the files are processed as follows:
Where config.yaml is the configuration for the dataset, data/day1 and data/day2 represent the different data files, and 2019-06-18 and 2019-06-19 represent a business effective date.
You should see the following output from running the preceding commands for day 1 and day 2 respectively:
A summary analysis of the resultant dataset should show:
Details about the pattern and its implementation follow.
Current and Historical Datasets
The output of each operation will yield a current dataset (that is the current stateful representation of a give object) and a historical dataset partition (capturing the net changes from the previous state in an appended partition).
This is useful, because often consumers will primarily query the latest state of an object. The change sets (or historical dataset partitions) can be used for more advanced analysis by sophisticated users.
Type 2 SCDs (sort of)
Two operational columns are added to each current and historical object:
OPERATION : Represents the last known operation to the record, valid values include :
D (DELETE – hard DELETEs, applies to full datasets only)
X (Not supplied, applies to delta processing only)
N (No change)
Since data structures on most big data or cloud storage platforms are immutable, we only store the effective start date for each record, this is changed as needed with each coarse-grained operation on the current object. The effective end date is inferred by the presence of a new effective start date (or change in the EFF_START_DATE value for a given record).
I am using a YAML document to store the configuration for the pattern. Important attributes to include in your configuration are a list of keys and non keys and their datatype (this implementation does type casting as well). Other important attributes include the table names and file paths for the current and historical data structures.
The configuration is read at the beginning of a routine as an input along with the path of an incoming data file (a CSV file in this case) and a business effective date (which will be used as the EFF_START_DATE for new or updated records).
Processing is performed using the specified key and non key attributes and the output datasets (current and historical) are written to columnar storage files (parquet in this case). This is designed to make subsequent access and processing more efficient.
I have broken the process into stages as follows:
Stage 1 – Type Cast and Hash Incoming Data
The first step is to create deterministic hashes of the configured key and non key values for incoming data. The hashes are calculated based upon a list of elements representing the key and non key values using the MD5 algorithm. The hashes for each record are then stored with the respective record. Furthermore, the fields are casted their target datatype as specified in the configuration. Both of these operations can be performed in a single pass of each row using a map() operation.
Importantly we only calculate hashes once upon arrival of new data, as the hashes are persisted for the life of the data – and the data structures are immutable – the hashes should never change or be invalidated.
Stage 2 – Determine INSERTs
We now compare Incoming Hashes with previously calculated hash values for the (previous day’s) current object. If no current object exists for the dataset, then it can be assumed this is a first run. In this case every record is considered as an INSERT with an EFF_START_DATE of the business effective date supplied.
If there is a current object, then the key and non key hash values (only the hash values) are read from the current object. These are then compared to the respective hashes of the incoming data (which should still be in memory).
Given the full outer join:
FULL OUTER JOIN
Keys which exist in the left entity which do not exist in the right entity must be the results of an INSERT operation.
Tag these records with an operation of I with an EFF_START_DATE of the business effective date, then rejoin only these records with their full attribute payload from the incoming dataset. Finally, write out these records to the current and historical partition in overwrite mode.
Stage 3 – Determine DELETEs or Missing Records
Referring the previous full outer join operation, keys which exist in the right entity (current object) which do not appear in the left entity (incoming data) will be the result of a (hard) DELETE operation if you are processing full snapshots, otherwise if you are processing deltas these would be missing records (possibly because there were no changes at the source).
Tag these records as D or X respectively with an EFF_START_DATE of the business effective date, rejoin these records with their full attribute payload from the current dataset, then write out these records to the current and historical partition in append mode.
Stage 4 – Determine UPDATEs or Unchanged Records
Again, referring to the previous full outer join, keys which exist in both the incoming and current datasets must be either the result of an UPDATE or they could be unchanged. To determine which case they fall under, compare the non key hashes. If the non key hashes differ, it must have been a result of an UPDATE operation at the source, otherwise the record would be unchanged.
Tag these records as U or N respectively with an EFF_START_DATE of the business effective date (in the case of an update – otherwise maintain the current EFF_START_DATE), rejoin these records with their full attribute payload from the incoming dataset, then write out these records to the current and historical partition in append mode.
A summary of the key callouts from this pattern are:
Use the RDD API for iterative record operations (such as type casting and hashing)
Persist hashes with the records
Use Dataframes for JOIN operations
Only perform JOINs with the keyhash and nonkeyhash columns – this minimizes the amount of data shuffled across the network
Write output data in columnar (Parquet) format
Break the routine into stages, covering each operation, culminating with a saveAsParquet() action – this may seem expensive but for large datsets it is more efficient to break down DAGs for each operation
Use caching for objects which will be reused between actions
Although I did not include this in my example, you could easily integrate this pattern with a metastore (such as a Hive metastore or AWS Glue Catalog), by using table objects and ALTER TABLE statements to add historical partitions.
If the incoming data is known to be relatively small (in the case of delta processing for instance), you could consider a broadcast join where the smaller incoming data is distributed to all of the different Executors hosting partitions from the current dataset.
Also you could add a key to the column config to configure a column to be nullable or not.
This is a simple routine to generate random data with a configurable number or records, key fields and non key fields to be used to create synthetic data for source change data capture (CDC) processing. The output includes an initial directory containing CSV files representing an initial data load, and an incremental directory containing CSV files representing incremental data.
Spark Training Courses from the AlphaZetta Academy
The aspiration to extend data analysis (predictive, descriptive or otherwise) to streaming event data has been common across every enterprise scale program I have been involved with. Often, however, this aspiration goes unrealised as it tends to slide down the priority scale as we still grapple with legacy batch oriented integration patterns and processes.
Event processing is not a new concept, real time event and transaction processing has been a standard feature for security, digital and operations functions for some time, however in the Data Warehousing, BI and Advanced Analytics worlds it is often spoken about but rarely implemented, except for tech companies of course. In many cases personalization is still a batch oriented process, e.g. train a model from a feature set built from historical data, generate recommendations in batch, serve these recommendations upon the next visit – wash, rinse, and repeat.
Lambda has existed for several years now as a data-processing architecture pattern designed to incorporate both batch and stream-processing capabilities. Moreover, messaging platforms have existed for decades, from point-to-point messaging systems, to message-oriented-middleware systems, to distributed pub-sub messaging systems such as Apache Kafka.
Additionally, open source streaming data processing frameworks and tools have proliferated in recent years with projects such as Storm, Samza, Flink and Spark Streaming becoming established solutions.
Kafka in particular, with its focus on durability, resiliency, availability and consistency, has graduated into fully fledged data platform not simply a transient messaging system. In many cases Kafka is serving as a back end for operational processes, such as applications implementing the CQRS (Command Query Responsibility Segregation) design pattern.
In other words, it is not the technology that holds us back, it’s our lack of imagination.
Enter Kappa Architecture where we no longer have to attempt to integrate streaming data with batch processes…everything is a stream. The ultimate embodiment of Kappa Architecture is the Streaming Data Warehouse.
In the Streaming Data Warehouse, tables are represented by topics. Topics represent either:
unbounded event or change streams; or
stateful representations of data (such as master, reference or summary data sets).
This approach makes possible the enrichment and/or summarisation of transaction or event data with master or reference data. Furthermore many of the patterns used in data warehousing and master data management are inherent in Kafka as you can represent the current state of an object as well as the complete change history of that object (in other words change data capture and associated slowly changing dimensions from one inbound stream).
Data is acquired from source systems either in real time or as a scheduled extract process, in either case the data is presented to Kafka as a stream.
The Kafka Avro Schema Registry provides a systematic contract with source systems which also serves as a data dictionary for consumers supporting schema evolution with backward and forward compatibility. Data is retained on the Kafka platform for a designated period of time (days or weeks) where it is available for applications and processes to consume – these processes can include data summarisation or sliding window operations for reporting or notification, or data integration or datamart building processes which sink data to other systems – these could include relational or non-relational data stores.
Real time applications can be built using the KStreams API and emerging tools such as KSQL can be used to provide a well-known interface for sampling streaming data or performing windowed processing operations on streams. Structured Streaming in Spark or Spark Streaming in its original RDD/DStream implementation can be used to prepare and enrich data for machine learning operations using Spark ML or Spark MLlib.
In addition, data sinks can operate concurrently to sink datasets to S3 or Google Cloud Storage or both (multi cloud – like real time analytics – is something which is talked about more than it’s implemented…).
In the Streaming Data Warehouse architecture Kafka is much more than a messaging platform it is a distributed data platform, which could easily replace major components of a legacy (or even a modern) data architecture. It just takes a little imagination…
Most traditional data warehouse or datamart ETL routines consist of multi stage SQL transformations, often a series of CTAS (CREATE TABLE AS SELECT) statements usually creating transient or temporary tables – such as volatile tables in Teradata or Common Table Expressions (CTE’s).
Spark Training Courses from the AlphaZetta Academy
The initial challenge when moving from a SQL/MPP based ETL framework platformed on Oracle, Teradata, SQL Server, etc to a Spark based ETL framework is what to do with this…
One approach is to use the lightweight, configuration driven, multi stage Spark SQL based ETL framework described in this post.
This framework is driven from a YAML configuration document. YAML was preferred over JSON as a document format as it allows for multi-line statements (SQL statements), as well as comments – which are very useful as SQL can sometimes be undecipherable even for the person that wrote it.
The YAML config document has three main sections: sources, transforms and targets.
The sources section is used to configure the input data source(s) including optional column and row filters. In this case the data sources are tables available in the Spark catalog (for instance the AWS Glue Catalog or a Hive Metastore), this could easily be extended to read from other datasources using the Spark DataFrameReader API.
The transforms section contains the multiple SQL statements to be run in sequence where each statement creates a temporary view using objects created by preceding statements.
Finally the targets section writes out the final object or objects to a specified destination (S3, HDFS, etc).
The process_sql_statements.py script that is used to execute the framework is very simple (30 lines of code not including comments, etc). It loads the sources into Spark Dataframes and then creates temporary views to reference these datasets in the transforms section, then sequentially executes the SQL statements in the list of transforms. Lastly the script writes out the final view or views to the desired destination – in this case parquet files stored in S3 were used as the target.
You could implement an object naming convention such as prefixing object names with sv_, iv_, fv_ (for source view, intermediate view and final view respectively) if this helps you differentiate between the different objects.
To use this framework you would simply use spark-submit as follows: