Use BigQuery to trigger Cloud Run

So you’re using BigQuery (BQ). It’s all set up and humming perfectly. Maybe now, you want to run an ELT job whenever a new table partition is created, or maybe you want to retrain your ML model whenever new rows are inserted into the BQ table.

In my previous article on EventArc, we went through how Logging can help us create eventing-type functionality in your application. Let’s take it a step further and walk through how we can couple BigQuery and Cloud Run.

In this article you will learn how to

  • Tie together BigQuery and Cloud Run
  • Use BigQuery’s audit log to trigger Cloud Run
  • With those triggers, run your required code

Let’s go!

Let’s create a temporary dataset within BigQuery named tmp_bq_to_cr.

In that same dataset, let’s create a table in which we will insert some rows to test our BQ audit log. Let’s grab some rows from a BQ public dataset to create this table:

CREATE OR REPLACE TABLE tmp_bq_to_cr.cloud_run_trigger AS
SELECT
 date, country_name, new_persons_vaccinated, population
 from `bigquery-public-data.covid19_open_data.covid19_open_data`
 where country_name='Australia'
 AND
 date > '2021-05-31'
LIMIT 100

Following this, let’s run an insert query that will help us build our mock database trigger:

INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 3, 1000)

Now, in another browser tab let’s navigate to BQ Audit Events and look for our INSERT INTO event:

BQ-insert-event

There will be several audit logs for any given BQ action. Only after a query is parsed does BQ know which table we want to interact with, so the initial log will, for e.g., not have the table name.

We don’t want any old audit log, so we need to ensure we look for a unique set of attributes that clearly identify our action, such as in the diagram above.

In the case of inserting rows, the attributes are a combination of

  • The method is google.cloud.bigquery.v2.JobService.InsertJob
  • The name of the table being inserted to is the protoPayload.resourceName
  • The dataset id is available as resource.labels.dataset_id
  • The number of inserted rows is protoPayload.metadata.tableDataChanged.insertedRowsCount

Time for some code

Now that we’ve identified the payload that we’re looking for, we can write the action for Cloud Run. We’ve picked Python and Flask to help us in this instance. (full code is on GitHub).

First, let’s filter out the noise and find the event we want to process

@app.route('/', methods=['POST'])
def index():
    # Gets the Payload data from the Audit Log
    content = request.json
    try:
        ds = content['resource']['labels']['dataset_id']
        proj = content['resource']['labels']['project_id']
        tbl = content['protoPayload']['resourceName']
        rows = int(content['protoPayload']['metadata']
                   ['tableDataChange']['insertedRowsCount'])
        if ds == 'cloud_run_tmp' and \
           tbl.endswith('tables/cloud_run_trigger') and rows > 0:
            query = create_agg()
            return "table created", 200
    except:
        # if these fields are not in the JSON, ignore
        pass
    return "ok", 200

Now that we’ve found the event we want, let’s execute the action we need. In this example, we’ll aggregate and write out to a new table created_by_trigger:

def create_agg():
    client = bigquery.Client()
    query = """
    CREATE OR REPLACE TABLE tmp_bq_to_cr.created_by_trigger AS
    SELECT
      count_name, SUM(new_persons_vaccinated) AS n
    FROM tmp_bq_to_cr.cloud_run_trigger
    """
    client.query(query)
    return query

The Dockerfile for the container is simply a basic Python container into which we install Flask and the BigQuery client library:

FROM python:3.9-slim
RUN pip install Flask==1.1.2 gunicorn==20.0.4 google-cloud-bigquery
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY *.py ./
CMD exec gunicorn --bind :$PORT main:app

Now we Cloud Run

Build the container and deploy it using a couple of gcloud commands:

SERVICE=bq-cloud-run
PROJECT=$(gcloud config get-value project)
CONTAINER="gcr.io/${PROJECT}/${SERVICE}"
gcloud builds submit --tag ${CONTAINER}
gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed

I always forget about the permissions

In order for the trigger to work, the Cloud Run service account will need the following permissions:

gcloud projects add-iam-policy-binding $PROJECT \
    --member="serviceAccount:service-${PROJECT_NO}@gcp-sa-pubsub.iam.gserviceaccount.com"\
    --role='roles/iam.serviceAccountTokenCreator'

gcloud projects add-iam-policy-binding $PROJECT \
    --member=serviceAccount:${SVC_ACCOUNT} \
    --role='roles/eventarc.admin'

Finally, the event trigger

gcloud eventarc triggers create ${SERVICE}-trigger \
  --location ${REGION} --service-account ${SVC_ACCOUNT} \
  --destination-run-service ${SERVICE}  \
  --event-filters type=google.cloud.audit.log.v1.written \
  --event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
  --event-filters serviceName=bigquery.googleapis.com

Important to note here is that we’re triggering on any Insert log created by BQ That’s why in this action we had to filter these events based on the payload.

Take it for a spin

Now, try out the BigQuery -> Cloud Run trigger and action. Go to the BigQuery console and insert a row or two:

INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 5, 25000)

Watch as a new table called created_by_trigger gets created! You have successfully triggered a Cloud Run action on a database event in BigQuery.

Enjoy!

Query Cloud SQL through Big Query

This article demonstrates Cloud SQL federated queries for Big Query, a neat and simple to use feature.

Connecting to Cloud SQL

One of the challenges presented when using Cloud SQL on a private network (VPC) is providing access to users. There are several ways to accomplish this which include:

  • open the database port on the VPC Firewall (5432 for example for Postgres) and let users access the database using a command line or locally installed GUI tool (may not be allowed in your environment)
  • provide a web based interface deployed on your VPC such as PGAdmin deployed on a GCE instance or GKE pod (adds security and management overhead)
  • use the Cloud SQL proxy (requires additional software to be installed and configured)

In additional, all of the above solutions require direct IP connectivity to the instance which may not always be available. Furthermore each of these operations requires the user to present some form of authentication – in many cases the database user and password which then must be managed at an individual level.

Enter Cloud SQL federated queries for Big Query…

Big Query Federated Queries for Cloud SQL

Big Query allows you to query tables and views in Cloud SQL (currently MySQL and Postgres) using the Federated Queries feature. The queries could be authorized views in Big Query datasets for example.

This has the following advantages:

  • Allows users to authenticate and use the GCP console to query Cloud SQL
  • Does not require direct IP connectivity to the user or additional routes or firewall rules
  • Leverages Cloud IAM as the authorization mechanism – rather that unmanaged db user accounts and object level permissions
  • External queries can be executed against a read replica of the Cloud SQL instance to offload query IO from the master instance

Setting it up

Setting up big query federated queries for Cloud SQL is exceptionally straightforward, a summary of the steps are provided below:

Step 1. Enable a Public IP on the Cloud SQL instance

This sounds bad, but it isn’t really that bad. You need to enable a public interface for Big Query to be able to establish a connection to Cloud SQL, however this is not accessed through the actual public internet – rather it is accessed through the Google network using the back end of the front end if you will.

Furthermore, you configure an empty list of authorized networks which effectively shields the instance from the public network, this can be configured in Terraform as shown here:

This configuration change can be made to a running instance as well as during the initial provisioning of the instance.

As shown below you will get a warning dialog in the console saying that you have no authorized networks – this is by design.

Cloud SQL Public IP Enabled with No Authorized Networks

Step 2. Create a Big Query dataset which will be used to execute the queries to Cloud SQL

Connections to Cloud SQL are defined in a Big Query dataset, this can also be use to control access to Cloud SQL using authorized views controlled by IAM roles.

Step 3. Create a connection to Cloud SQL

To create a connection to Cloud SQL from Big Query you must first enable the BigQuery Connection API, this is done at a project level.

As this is a fairly recent feature there isn’t great coverage with either the bq tool or any of the Big Query client libraries to do this so we will need to use the console for now…

Under the Resources -> Add Data link in the left hand panel of the Big Query console UI, select Create Connection. You will see a side info panel with a form to enter connection details for your Cloud SQL instance.

In this example I will setup a connection to a Cloud SQL read replica instance I have created:

Creating a Big Query Connection to Cloud SQL

More information on the Big Query Connections API can be found at: https://cloud.google.com/bigquery/docs/reference/bigqueryconnection/rest

The following permissions are associated with connections in Big Query:

bigquery.connections.create
bigquery.connections.get
bigquery.connections.list
bigquery.connections.use
bigquery.connections.update
bigquery.connections.delete

These permissions are conveniently combined into the following predefined roles:

roles/bigquery.connectionAdmin    (BigQuery Connection Admin)         
roles/bigquery.connectionUser     (BigQuery Connection User)          

Step 4. Query away!

Now the connection to Cloud SQL can be accessed using the EXTERNAL_QUERY function in Big Query, as shown here:

Querying Cloud SQL from Big Query