So you’re using BigQuery (BQ). It’s all set up and humming perfectly. Maybe now, you want to run an ELT job whenever a new table partition is created, or maybe you want to retrain your ML model whenever new rows are inserted into the BQ table.
In my previous article on EventArc, we went through how Logging can help us create eventing-type functionality in your application. Let’s take it a step further and walk through how we can couple BigQuery and Cloud Run.
In this article you will learn how to
Tie together BigQuery and Cloud Run
Use BigQuery’s audit log to trigger Cloud Run
With those triggers, run your required code
Let’s create a temporary dataset within BigQuery named tmp_bq_to_cr.
In that same dataset, let’s create a table in which we will insert some rows to test our BQ audit log. Let’s grab some rows from a BQ public dataset to create this table:
CREATE OR REPLACE TABLE tmp_bq_to_cr.cloud_run_trigger AS
date, country_name, new_persons_vaccinated, population
date > '2021-05-31'
Following this, let’s run an insert query that will help us build our mock database trigger:
INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 3, 1000)
Now, in another browser tab let’s navigate to BQ Audit Events and look for our INSERT INTO event:
There will be several audit logs for any given BQ action. Only after a query is parsed does BQ know which table we want to interact with, so the initial log will, for e.g., not have the table name.
We don’t want any old audit log, so we need to ensure we look for a unique set of attributes that clearly identify our action, such as in the diagram above.
In the case of inserting rows, the attributes are a combination of
The method is google.cloud.bigquery.v2.JobService.InsertJob
The name of the table being inserted to is the protoPayload.resourceName
The dataset id is available as resource.labels.dataset_id
The number of inserted rows is protoPayload.metadata.tableDataChanged.insertedRowsCount
Time for some code
Now that we’ve identified the payload that we’re looking for, we can write the action for Cloud Run. We’ve picked Python and Flask to help us in this instance. (full code is on GitHub).
First, let’s filter out the noise and find the event we want to process
# Gets the Payload data from the Audit Log
content = request.json
ds = content['resource']['labels']['dataset_id']
proj = content['resource']['labels']['project_id']
tbl = content['protoPayload']['resourceName']
rows = int(content['protoPayload']['metadata']
if ds == 'cloud_run_tmp' and \
tbl.endswith('tables/cloud_run_trigger') and rows > 0:
query = create_agg()
return "table created", 200
# if these fields are not in the JSON, ignore
return "ok", 200
Now that we’ve found the event we want, let’s execute the action we need. In this example, we’ll aggregate and write out to a new table created_by_trigger:
client = bigquery.Client()
query = """
CREATE OR REPLACE TABLE tmp_bq_to_cr.created_by_trigger AS
count_name, SUM(new_persons_vaccinated) AS n
The Dockerfile for the container is simply a basic Python container into which we install Flask and the BigQuery client library:
Metadata Hub (MDH) is intended to be the source of truth for metadata around the Company’s platform. It has the ability to load metadata configuration from yaml, and serve that information up via API. It will also be the store of information for pipeline information while ingesting files into the platform.
Config-Driven. Anyone who has been authorized to do so, should be able to add another ‘table-info.yaml’ in to MDH without the need to update any code in the system
Here’s how table information makes its way into MDH:
summary: All tables in MDH
description: get the title of all tables that exist in MDH
summary: Creates a new table in MDH
description: Creates a new table in MDH
summary: Obtain information about specific table
summary: All columns for a particular table
description: Obtain information on columns for a particular table
summary: All information about a particular end-to-end batch run of file ingestion
summary: Update metadata on a batch load
description: Update metadata on a batch load
summary: Use this to save on calculation of business days.
description: This base response gives you today’s date in a string
summary: Will return a string of the previous business day
description: Will return a string of the previous business day, based on the date on when it’s called
summary: Will return a string of the next business day
description: Will return a string of the next business day, based on the date on when it’s called
Yaml to Datastore – Entity/Kind design
Before we jump right into Entity Groups in Datastore, it is important to first go over the basics and establish a common vocabulary. Datastore holds entities, which are objects, that can contain various key/value pairs, called properties. Each entity must contain a unique identifier, known as a key. When creating an entity, a user can choose to specify a custom key or let Datastore create a key. If a user decides to specify a custom key, it will contain two fields: a kind, which represents a category such as ‘Toy’ or ‘Marital Status’, and a name, which is the identifying value. If a user decides to only specify a kind when creating a key, and does not specify a unique identifier, Datastore automatically generates an ID behind the scenes. Below is an example of a Python3 script which illustrates this identifier concept.
from google.cloud import datastore
client = datastore.Client()
#Custom key- specify my kind=item and a unique_id of broker
custom_key_entry = datastore.Entity(client.key("table","broker"))
#Only specify kind=item, let datastore generate unique_id
datastore_gen_key_entry = datastore.Entity(client.key("table"))
In your GCP Console under Datastore, you will then see your two entities of kind “table”. One will contain your custom key and one will contain the automatically generated key.
Ancestors and Entity Groups
For highly related or hierarchical data, Datastore allows entities to be stored in a parent/child relationship. This is known as an entity group or ancestor/descendent relationship.
This is an example of an entity group with kinds of types: table, column, and classification. The ‘Grandparent’ in this relationship is the ‘table’. In order to configure this, one must first create the table entity. Then, a user can create a column, and specify that the parent is a table key. In order to create the grandchild, a user then creates a classification and sets its parent to be a column key. To further add customizable attributes, a user can specify additional key-value pairs such as pii and data_type. These key-value pairs are stored as properties. We model this diagram in Datastore in our working example below.
One can create entity groups by setting the ‘parent’ parameter while creating an entity key for a child. This command adds the parent key to be part of the child entity key. The child’s key is represented as a tuple (‘parent_key’, ‘child_key’), such that the parents’ key is the prefix of the key, which is followed by its own unique identifier. For example, follow the diagram above:
Printing the variable table_key will display: ("table", "broker","column", "broker_legal_name")
Datastore also supports chaining of parents, which can lead to very large keys for descendants with a long lineage of ancestors. Additionally, parents can have multiple children (representing a one-to-many relationship). However, there is no native support for entities to have multiple parents (representing a many-to-many relationship). Once you have configured this ancestral hierarchy, it is easy to retrieve all descendants for a given parent. You can do this by querying on the parent key by using the ‘ancestor’ parameter. For example, given the entity table_key created above, I can query for all of the tables
As per our Key Philosophies – Config-Driven – anyone should be able to add a new table to be processed and landed in a target-table somewhere within MDH with our yaml syntax. Below is a full working python3 example of the table/column/classification hierarchical model described above.
Now that we have the column entity, let’s locate it’s parent id.
column = list(query1.fetch())
print("This column belongs to: " +str(column.key.parent.id_or_name))
Further to this, we can also get all data classification elements attributed to a single column using the ancestor clause query.
query2 = datastore_client.query(kind="classification", ancestor=column.key)
for classification in list(query2.fetch()):
For more complex queries, Datastore has the concept of indexes being set, usually via it’s index.yaml configuration. The following is an example of an index.yaml file:
- kind: Cat
- name: name
- name: age
- kind: Cat
- name: name
- name: whiskers
- kind: Store
- name: business
- name: owner
Indexes are important when attempting to add filters on more than one particular attribute within a Datastore entity. For example, the following code will fail:
# Adding a '>' filter will cause this to fail. Sidenote; it will work
# without an index if you add another '=' filter.
query2 = datastore_client.query(kind="classification", ancestor=column.key)
query2.add_filter("pii", ">", 0)
for classification in list(query2.fetch()):
To rectify this issue, you need to create an index.yaml that looks like the following:
The official pypi package for google-cloud-datastore can be found here: https://pypi.org/project/google-cloud-datastore/. At the time of writing, Firestore in Datastore-mode will be the way forward, as per the release note from January 31, 2019.
Cloud Firestore is now Generally Available. Cloud Firestore is the new version of Cloud Datastore and includes a backwards-compatible Datastore mode.
If you intend to use the Cloud Datastore API in a new project, use Cloud Firestore in Datastore mode. Existing Cloud Datastore databases will be automatically upgraded to Cloud Firestore in Datastore mode.
Except where noted, the Cloud Datastore documentation now describes behavior for Cloud Firestore in Datastore mode.
We’ve purposefully created MDH in Datastore to show you how it was done originally, and we’ll be migrating the Datastore code to Firestore in an upcoming post.
Creating and deleting indexes within Datastore will need to be done through the REST API via googleapiclient.discovery, as this function doesn’t exist via the google-cloud-datastore API. Working with the discovery api client can be a bit daunting for a first-time user, so here’s the code to add an index on Datastore:
How did we craft this API request? We can use the Google API Discovery Service to build client libraries, IDE plugins, and other tools that interact with Google APIs. The Discovery API provides a list of Google APIs and a machine-readable "Discovery Document" for each API. Features of the Discovery API:
A directory of supported APIs schemas based on JSON Schema.
A machine-readable "Discovery Document" for each of the supported APIs. Each document contains:
A list of API methods and available parameters for each method.
A list of available OAuth 2.0 scopes.
Inline documentation of methods, parameters, and available parameter values.
Navigating to the API reference page for Datastore and going to the ‘Datastore Admin’ API page, we can see references to the Indexes and RESTful endpoints we can hit for those Indexes. Therefore, looking at the link for the Discovery document for Datastore:
From this, we can build out our instantiation for the google api discovery object
build(‘datastore’, ‘v1’, credentials=credentials)
With respect to building out the body aspect of the request, I’ve found crafting that part within the ‘Try this API’ section of https://cloud.google.com/datastore/docs/reference/admin/rest/v1/projects.indexes/create pretty valuable.
With this code, your index should show up in your Datastore console! You can also retrieve them within gcloud with gcloud datastore indexes list if you’d like to verify the indexes outside our python code. So there you have it: a working example of entity groups, ancestors, indexes and Metadata within Datastore. Have fun coding!
When defining event-driven architectures, it’s always good to keep up with how the landscape is changing. How do you connect microservices in your architecture? Is Pub/Sub the end-game for all events? To dive a bit deeper, let’s talk through the benefits of having a single orchestrator, or perhaps a choreographer is better?
Orchestration versus choreography refresher
My colleague @jeffreyaven did a recent post explaining this concept in simple terms, which is worth reviewing, see:
Should there really be a central orchestrator controlling all interactions between services…..or, should each service work independently and only interact through events?
Orchestration is usually viewed as a domain-wide central service that defines the flow and control of communication between services. In this paradigm, in becomes easier to change and ultimately monitor policies across your org.
Choreography has each service registering and emitting events as they need to. It doesn’t direct or define the flow of communication, but using this method usually has a central broker passing around messages and allows services to be truly independent.
Enter Workflows, which is suited for centrally orchestrated services. Not only Google Cloud service such as Cloud Functions and Cloud Run, but also external services.
How about choreography? Pub/Sub and Eventarc are both suited for this. We all know and love Pub/Sub, but how do I use EventArc?
What is Eventarc?
Announced in October-2020, it was introduced as eventing functionality that enables you, the developer, to send events to Cloud Run from more than 60 Google Cloud sources.
But how does it work?
Eventing is done by reading those sweet sweet Audit Logs, from various sources, and sending them to Cloud Run services as events in Cloud Events format. Quick primer on Cloud Events: its a specification for describing event data in a common way. The specification is now under the Cloud Native Computing Foundation. Hooray! It can also read events from Pub/Sub topics for custom applications. Here’s a diagram I graciously ripped from Google Cloud Blog:
Why do I need Eventarc? I have the Pub/Sub
Good question. Eventarc provides and easier path to receive events not only from Pub/Sub topics but from a number of Google Cloud sources with its Audit Log and Pub/Sub integration. Actually, any service that has Audit Log integration can be an event source for Eventarc. Beyond easy integration, it provides consistency and structure to how events are generated, routed and consumed. Things like:
They specify routing rules from events sources, to event sinks. Listen for new object creation in GCS and route that event to a service in Cloud Run by creating an Audit-Log-Trigger. Create triggers that also listen to Pub/Sub. Then list all triggers in one, central place in Eventarc:
gcloud beta eventarc triggers list
Consistency with eventing format and libraries
Using the CloudEvent-compliant specification will allow for event data in a common way, increasing the movement towards the goal of consistency, accessibility and portability. Makes it easier for different languages to read the event and Google Events Libraries to parse fields.
This means that the long-term vision of Eventarc to be the hub of events, enabling a unified eventing story for Google Cloud and beyond.
In the future, you can excpect to forego Audit Log and read these events directly and send these out to even more sinks within GCP and any HTTP target.
So what are expectations when it comes to data (and data quality)…
An expectation is a falsifiable, verifiable statement about data. Expectations provide a language to talk about data characteristics and data quality – humans to humans, humans to machines and machines to machines.
The great expectations project includes predefined, codified expectations such as:
Expectations are both data tests and docs! Expectations can be presented in a machine-friendly JSON, for example:
Great Expectations provides validation results of defined expectations, which can dramatically shorten your development cycle.
Nearly 50 built in expectations allow you to express how you understand your data, and you can add custom expectations if you need a new one. A machine can test if a dataset conforms to the expectation.
tried with Python 3.7.2, but had issues with library lgzm on my local machine
once installed, run the following in the python repl shell:
showing the data in the dataframe should give you the following:
as can be seen, a collection of random integers in each column for our initial testing. Let’s pipe this data in to great-expectations…
yields the following output…
this shows that there are 0 unexpected items in the data we are testing. Great!
Now let’s have a look at a negative test. Since we’ve picked the values at random, there are bound to be duplicates. Let’s test that:
The JSON schema has metadata information about the result, of note is the result section which is specific to our query, and shows the percentage that failed the expectation.
Let’s progress to something more real-world, namely creating exceptions that are run on databases. Armed with our basic understanding of great-expectations, let’s…
set up a postgres database
initiate a new Data Context within great-expectations
write test-cases for the data
group those test-cases and
Setting up a Database
if you don’t have it installed,
wait 15 minutes for download the internet. Verify postgres running with docker ps, then connect with
Create some data
Take data for a spin
Now time for great-expectations
Great Expectations relies on the library sqlalchemy and psycopg2 to connect to your data.
once done, let’s set up great-expectations
should look like below:
let’s set up a few other goodies while we’re here
Congratulations! Great Expectations is now set up
You should see a file structure as follows:
If you didn’t generate a suite during the set up based on app.order, you can do so now with
great_expectations suite new
when created, looking at great_expectations/expectations/app/order/warning.json should yield the following:
as noted in the content section, this expectation config is created by the tool by looking at 1000 rows of the data. We also have access to the data-doc site which we can open in the browser at great_expectations/uncommitted/data_docs/local_site/index.html
Clicking on app.order.warning, you’ll see the sample expectation shown in the UI
Now, let’s create our own expectation file and take it for a spin. We’ll call this one error.
This should also start a jupyter notebook. If for some reason you need to start it back up again, you can do so with
Go ahead and hit run on your first cell.
Let’s keep it simple and test the customer_order_id column is in a set with the values below:
using the following expectations function in your Table Expectation(s). You may need to click the + sign in the toolbar to insert a new cell, as below:
As we can see, appropriate json output that describes the output of our expectation. Go ahead and run the final cell, which will save our work and open a newly minted data documentation UI page, where you’ll see the expectations you defined in human readable form.
Running the test cases
In Great Expectations, running a set of expectations (test cases) is called a checkpoint. Let’s create a new checkpoint called first_checkpoint for our app.order.error expectation as shown below:
Let’s take a look at our checkpoint definition.
Above you can see the validation_operator_name which points to a definition in great_expectations.yml, and the batches where we defined the data source and what expectations to run against.
Let’s have a look at great_expectations.yml. We can see the action_list_operator defined and all the actions it contains:
Let’s run our checkpoint using
Okay cool, we’ve set up an expectation, a checkpoint and shown a successful status! But what does a failure look like? We can introduce a failure by logging in to postgres and inserting a customer_11 that we’ll know will fail, as we’ve specific our expectation that customer_id should only have two values..
Here are the commands to make that happen, as well as the command to re-run our checkpoint:
Run checkpoint again, this time it should fail
As expected, it failed.
In it’s current implementation version 0.12.9, the supported databases our of the box are:
With respect to BigTable, it may not be possible as SQLAlchemy can only manage SQL-based RDBSM-type systems, while BigTable (and HBase) are NoSQL non-relational systems.
Now that we have seen how to run tests on our data, we can run our checkpoints from bash or a python script(generated using great_expectations checkpoint script first_checkpoint). This lends itself to easy integration with scheduling tools like airflow, cron, prefect, etc.
When deploying in production, you can store any sensitive information(credentials, validation results, etc) which are part of the uncommitted folder in cloud storage systems or databases or data stores depending on your infratructure setup. Great Expectations has a lot of options
When not to use a data quality framework
This tool is great and provides a lot of advanced data quality validation functions, but it adds another layer of complexity to your infrastructure that you will have to maintain and trouble shoot in case of errors. It would be wise to use it only when needed.
Do not use a data quality framework, if simple SQL based tests at post load time works for your use case. Do not use a data quality framework, if you only have a few (usually < 5) simple data pipelines.
Do use it when you have data that needs to be tested in an automated and a repeatable fashion. As shown in this article, Great Expectations has a number of options that can be toggled to suit your particular use-case.
Great Expectations shows a lot of promise, and it’s an active project so expect to see features roll out frequently. It’s been quite easy to use, but I’d like to see all it’s features work in a locked-down enterprise environment.
A refresher on the data plane, and what the userspace proxy can perform:
Routing: Given a REST request for /hello from the local service instance, where should that request be sent?
Load Balancing: Once routing has done its job, to which upstream service instance should the request be sent? With what timeout? If the request fails, should it be retried?
Authorisation and Authentication: For requests that are incoming, can cryptographic functions determine the authenticity of that requests? Is the called allowed to invoke the requested endpoint?
Observability: Detailed logging, statistics, distributed tracing data so that operators can understand the traffic flow and debug problems as they occur
Service Discovery: What backend/upstream service instances are available?
Health Checking: Are upstream service instances healthy and ready to accept traffic?
The control plane is slightly less complex. For the data plane to act in a coordinated fashion, the control plane gives you the machinery to make that happen. This is the magical part of the service mesh; the control plane takes a set of isolated sidecar proxies and turns them into a distributed system. The control plane in turn provides an API to allow the user to modify and inspect the behaviour of the data plane.
You can see from the diagram below the proxies are right next to the service in the same node. We usually call those ‘sidecar’ containers.
The diagram above gives you a high level indication of what the service mesh would look like. What if I don’t have many services? Then the service mesh probably isn’t for you. That’s a whole lot of machinery to run a single proxy! Having said this, if your solution is running hundreds or thousands of services, then you’re going to require a whole heap of proxies.
So there you have it. The service mesh with its control and data plane. To put it simply, the goal of the control plane is to monitor and set a policy that will eventually be enacted by the data plane.
You’ve taken over a project, and the security team have mandated the use of the service mesh. You’ve never used it yourself before, and the confusion as to why we need another thing is getting you down. An additional thing next to my container that will add latency? And consume resources? And I have to maintain it?! Why would anyone need or want this?
While there are a few answers to this, the most important answer is something I alluded to in an example in part 1 of this series: this design is a great way to add additional logic into the system. Not only can you add additional logic (to containers possibly outside of your control) but you can do this uniformly across the entire mesh! The service mesh gives you features that are critical for running software that’s uniform across your whole stack
The set of features that the service mesh can provide include reliability features (Retries, timeouts etc), observability features (latencies, volume etc) and security features (mTLS, access control etc).
Let’s break it down
Server-side software relies on these critical features If you’re building any type of modern server-side software that’s predicated on multiple services, think API’s and web-apps, and if you’re continually adding features to this in a short timeframe, then all the features listed above become critical for you. Your applications must be reliable, observable and most importantly secure. This is exactly what the service mesh helps you with.
One view to rule them all The features mentioned above are language-agnostic, don’t care about your framework, who wrote it or any part of your development life cycle. They give you, your team and your company a consistent way to deploy changes across your service landscape
Decoupled from application code It’s important to have a single place to include application and business logic, and not have the nightmare of managing that in multiple components of your system. The core stewardship of the functionality that the service mesh provides lies at the platform level. This includes maintenance, deployments, operation etc. The application can be updated and deployed by developers maintaining the application, and the service mesh can change without the application being involved.
Yes, while the features of the service mesh could be implemented as application code, this solution would not help in driving uniform features sets across the whole system, which is the value proposition for the service mesh.
If you’re a business-logic developer, you probably don’t need to worry about the service mesh. Keep pumping out that new fangled business logic that makes the software oh-so-usable
If you’re in a platform role and most likely using Kubernetes, then you should be right on top of the service mesh! That is unless your architecture dictates a monolith. You’re going to have a lot of services talking to one another, all tied together with an overarching dependency.
If you’re in a platform role with no Kubernetes but a bunch of microservices, you should maybe care a little bit about the service mesh, but without the power of Kubernetes and the ease of deployment it brings, you’ll have to weigh up how you intend to manage all those proxies.
I hope you enjoyed this article, please feel free to reach out to me at:
So you’ve started delivering a new project and it’s all about this “Cloud Native” or “Microservices” thing. You’re a Delivery Manager or Software Engineer at some type of company and someone has lightly peppered a meeting with a term, ‘Mesh’.
They possibly said event mesh. Or better yet, they mentioned a service mesh. As time went on you kept hearing more and more about the service mesh. You’ve attempted to read up about it, digested a whole bunch of new terms and still didn’t completely understand what the Mesh even does, why you would need it or why the hype train around this technology shows no sign of stopping. This article is an attempt to provide a focused guide to the service mesh, and why it is so interesting.
Ok, so what is this thing?
Truth be told, the service mesh is actually pretty simple. It’s built around the idea of small, repeatable bits of software, in this case userspace proxies, stuck very close to your services. This is called the data plane. In addition to the userspace proxies, you also get a bunch of management processes, which is referred to as the control plane. Simply put, the data plane (userspace proxies) intercepts all calls between services and the control plane (management processes) coordinates the wholesale behaviour of those proxies. This allows you to perform sweeping changes across your service landscape via the control planes API’s, operators and provides the capability to measure your mesh as a whole.
Before we get into the engineering of what the proxies are, let’s go with an example.
The business has bought some software.
The engineers are tasked with deploying this software in their Kubernetes cluster.
The engineers first task is to containerise this application, expose its functionality to downstream applications and deploy it to the cluster in a repeatable, continuous fashion.
There’s a requirement in your organisation that says ‘I need all communications to this vendors software as TLS1.3’. Or, ‘I would like to measure all API latency from this application’.
The engineer replies ‘I can’t make changes to a third party application! What do I do?’. Service mesh to the rescue.
Using a service mesh, you can deploy a proxy right next to your vendor container and in effect, abstract away the complexities of measurement and data transport mechanisms, and allow the vendor software to concentrate on it’s business logic.
This vendor container is now part of the service mesh.
When we talk about proxies, we usually discuss things in OSI model terminology, that is to say Layers 1 through 7. Most of the time when it comes to proxies, you’re comparing Layer 4 to Layer 7. Here’s a quick run-down:
Layer 4 (L4) -> operates with the delivery of messages with no regard to the content of the messages. They would simply forward network packets to and from the server without inspecting any part of the packets.
Layer 7 (L7) -> this is a higher level, application layer. This deals with the actual content of the message. If you were routing network traffic, you could do this at L7 in a much more sophisticated way because you can now make decisions based on the packets messages within.
Why pick between L4 and L7? Speed.
Back to the service mesh, these userspace proxies are L7-aware TCP proxies. Think NGINX or haproxy. There are different proxies; Linkerd is an ultralight service mesh for Kubernetes. The most popular is Envoy, which was created by the ride-share company Lyft. Above, I also mentioned NGINX and haproxy which are also quite popular. So what differentiates NGINX proxies from the service mesh? Their focus. You would implement NGINX as an Ingress proxy (traffic entering your network), but when it comes to proxies that focus on traffic between services, that’s when the service mesh proxy comes in to play.
Ok, probably time for a diagram now that we’ve explained the Data Plane.
Tune in for part 2 for when we discuss the Control Plane!