Use BigQuery to trigger Cloud Run

So you’re using BigQuery (BQ). It’s all set up and humming perfectly. Maybe now, you want to run an ELT job whenever a new table partition is created, or maybe you want to retrain your ML model whenever new rows are inserted into the BQ table.

In my previous article on EventArc, we went through how Logging can help us create eventing-type functionality in your application. Let’s take it a step further and walk through how we can couple BigQuery and Cloud Run.

In this article you will learn how to

  • Tie together BigQuery and Cloud Run
  • Use BigQuery’s audit log to trigger Cloud Run
  • With those triggers, run your required code

Let’s go!

Let’s create a temporary dataset within BigQuery named tmp_bq_to_cr.

In that same dataset, let’s create a table in which we will insert some rows to test our BQ audit log. Let’s grab some rows from a BQ public dataset to create this table:

CREATE OR REPLACE TABLE tmp_bq_to_cr.cloud_run_trigger AS
SELECT
 date, country_name, new_persons_vaccinated, population
 from `bigquery-public-data.covid19_open_data.covid19_open_data`
 where country_name='Australia'
 AND
 date > '2021-05-31'
LIMIT 100

Following this, let’s run an insert query that will help us build our mock database trigger:

INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 3, 1000)

Now, in another browser tab let’s navigate to BQ Audit Events and look for our INSERT INTO event:

BQ-insert-event

There will be several audit logs for any given BQ action. Only after a query is parsed does BQ know which table we want to interact with, so the initial log will, for e.g., not have the table name.

We don’t want any old audit log, so we need to ensure we look for a unique set of attributes that clearly identify our action, such as in the diagram above.

In the case of inserting rows, the attributes are a combination of

  • The method is google.cloud.bigquery.v2.JobService.InsertJob
  • The name of the table being inserted to is the protoPayload.resourceName
  • The dataset id is available as resource.labels.dataset_id
  • The number of inserted rows is protoPayload.metadata.tableDataChanged.insertedRowsCount

Time for some code

Now that we’ve identified the payload that we’re looking for, we can write the action for Cloud Run. We’ve picked Python and Flask to help us in this instance. (full code is on GitHub).

First, let’s filter out the noise and find the event we want to process

@app.route('/', methods=['POST'])
def index():
    # Gets the Payload data from the Audit Log
    content = request.json
    try:
        ds = content['resource']['labels']['dataset_id']
        proj = content['resource']['labels']['project_id']
        tbl = content['protoPayload']['resourceName']
        rows = int(content['protoPayload']['metadata']
                   ['tableDataChange']['insertedRowsCount'])
        if ds == 'cloud_run_tmp' and \
           tbl.endswith('tables/cloud_run_trigger') and rows > 0:
            query = create_agg()
            return "table created", 200
    except:
        # if these fields are not in the JSON, ignore
        pass
    return "ok", 200

Now that we’ve found the event we want, let’s execute the action we need. In this example, we’ll aggregate and write out to a new table created_by_trigger:

def create_agg():
    client = bigquery.Client()
    query = """
    CREATE OR REPLACE TABLE tmp_bq_to_cr.created_by_trigger AS
    SELECT
      count_name, SUM(new_persons_vaccinated) AS n
    FROM tmp_bq_to_cr.cloud_run_trigger
    """
    client.query(query)
    return query

The Dockerfile for the container is simply a basic Python container into which we install Flask and the BigQuery client library:

FROM python:3.9-slim
RUN pip install Flask==1.1.2 gunicorn==20.0.4 google-cloud-bigquery
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY *.py ./
CMD exec gunicorn --bind :$PORT main:app

Now we Cloud Run

Build the container and deploy it using a couple of gcloud commands:

SERVICE=bq-cloud-run
PROJECT=$(gcloud config get-value project)
CONTAINER="gcr.io/${PROJECT}/${SERVICE}"
gcloud builds submit --tag ${CONTAINER}
gcloud run deploy ${SERVICE} --image $CONTAINER --platform managed

I always forget about the permissions

In order for the trigger to work, the Cloud Run service account will need the following permissions:

gcloud projects add-iam-policy-binding $PROJECT \
    --member="serviceAccount:service-${PROJECT_NO}@gcp-sa-pubsub.iam.gserviceaccount.com"\
    --role='roles/iam.serviceAccountTokenCreator'

gcloud projects add-iam-policy-binding $PROJECT \
    --member=serviceAccount:${SVC_ACCOUNT} \
    --role='roles/eventarc.admin'

Finally, the event trigger

gcloud eventarc triggers create ${SERVICE}-trigger \
  --location ${REGION} --service-account ${SVC_ACCOUNT} \
  --destination-run-service ${SERVICE}  \
  --event-filters type=google.cloud.audit.log.v1.written \
  --event-filters methodName=google.cloud.bigquery.v2.JobService.InsertJob \
  --event-filters serviceName=bigquery.googleapis.com

Important to note here is that we’re triggering on any Insert log created by BQ That’s why in this action we had to filter these events based on the payload.

Take it for a spin

Now, try out the BigQuery -> Cloud Run trigger and action. Go to the BigQuery console and insert a row or two:

INSERT INTO tmp_bq_to_cr.cloud_run_trigger
VALUES('2021-06-18', 'Australia', 5, 25000)

Watch as a new table called created_by_trigger gets created! You have successfully triggered a Cloud Run action on a database event in BigQuery.

Enjoy!

Azure Static Web App Review

The Azure Static Web App feature is relatively new in the Azure estate which has recently become generally available, I thought I would take it for a test drive and discuss my findings.

I am a proponent of the JAMStack architecture for front end applications and a user of CD enabled CDN services like Netlify, so this Azure feature was naturally appealing to me.

Azure SWAs allow you to serve static assets (like JavaScript) without a origin server, meaning you don’t need a web server, are able to streamline content distribution and web app performance, and reduce the attack surface area of your application.

The major advantage to using is simplicity, no scaffolding or infra requirements and it is seamlessly integrated into your CI/CD processes (natively if you are using GitHub).

Deploying Static Web Apps in Azure

Pretty simple to setup, aside from a name and a resource group, you just need to supply:

  • a location (Azure region to be used for serverless back end APIs via Azure Function Apps) note that this is not a location where the static web is necessarily running
  • a GitHub or GitLab repo URL
  • the branch you wish to use to trigger production deployments (e.g. main)
  • a path to your code within your app (e.g. where your package.json file is located)
  • an output folder (e.g. dist) this should not exist in your repo
  • a project or personal access token for your GitHub account (alternatively you can perform an interactive OAuth2.0 consent if using the portal)

An example is shown here:

GitHub Actions

Using the consent provided (either using the OAuth flow or by providing a token), Azure Static Web Apps will automagically create the GitHub Actions workflow to deploy your application on a push or merge event to your repo. This includes providing scoped API credentials to Azure to allow access to the Static Web App resource using secrets in GitHub (which are created automagically as well). An example workflow is shown here:

Preview or Staging Releases

Similar to the functionality in analogous services like Netlify, you can configure preview releases of your application to be deployed from specified branches on pull request events.

Routes and Authorization

Routes (for SPAs) need to be provided to Azure by using a file named staticwebapp.config.json located in the application root of your repo (same level as you package.json file). You can also specify response codes and whether the rout requires authentication as shown here:

Pros

  • Globally distributed CDN
  • Increased security posture, reduced attack surface area
  • Simplified architecture and deployment
  • No App Service Plan required – cost reduction
  • Enables Continuous Deployment – incl preview/staging environments
  • TLS and DNS can be easily configured for your app

Cons

  • Serverless API locations are limited
  • Integration with other VCS/CI/CD systems like GitLab would need to be custom built (GitHub and Azure DevOps is integrated)

Overall, this is a good feature for deploying SPAs or PWAs in Azure.

Simple Tasker: Configuration driven orchestration

Recently I found myself at a client that were using a third party tool to scan all their enterprise applications in order to collate their data lineage. They had spent two years onboarding applications to the tool, resulting in a large technical mess that was hard to debug and impossible to extend. As new applications were integrated onto the platform, developers were forced to think of new ways of connecting and tranforming the data so it could be consumed.

The general approach was: setup scanner -> scan application -> modify results -> upload results -> backup results -> cleanup workspace -> delete anything older than 'X' days

Each developer had their own style of doing this – involving shell scripts, python scripts, SQL and everything in between. Worse, there was slabs of code replicated across the entire repository, with variables and paths changed depending on the use case.

My tasks was to create a framework that could orchestrate the scanning and adhered to the following philosophies:

  • DRY (Don’t Repeat Yourself)
  • Config driven
  • Version controlled
  • Simple to extend
  • Idempotent

It also had to be written in Python as that was all the client was skilled in.

After looking at what was on the market (Airflow and Prefect being the main contenders) I decided to roll my own simplified orchestrator that required as little actual coding as possible and could be setup by configuration.

In choosing a configuration format, I settled on HOCON as it closely resembled JSON but has advanced features such as interpolation, substitions and the ability to include other hocon files – this would drastically reduce the amount of boilerplate configuration required.

Because I had focused so heavily on being configuration driven, I also needed the following charecteristics to be delivered:

  • Self discovery of task types (more on this later)
  • Configuration validation at startup

Tasks and self discovery

As I wanted anyone to be able to rapidly extend the framework by adding tasks, I needed to reduce as much repetition and boilerplate as possible. Ideally, I wanted a developer to just have to think about writing code and not have to deal with how to integrate this.

To achieve this, we needed a way of registering new ‘tasks’ that would become available to the framework. I wanted a developer to simply have to subclass the main Task class and implement a run function – the rest would be taken care of.

class TaskRegistry:

    def __init__(self) -> None:
        self._registry = {}

    def register(self, cls: type) -> None:
        n = getattr(cls, 'task_name', cls.__name__).lower()
        self._registry[n] = cls

    def registered(self) -> List[str]:
        return list(self._registry.keys())

    def has(self, name: str) -> bool:
        return name in self._registry

    def get(self, name: str) -> type:
        return self._registry[name]

    def create(self, name: str, *args, **kwargs) -> object:
        try:
            return self._registry[name](*args, **kwargs)
        except KeyError:
            raise ClassNotRegisteredException(name)


registry = TaskRegistry()

Once the registry was instantiated, any new Tasks that inherited from ‘Task’ would automatically be added to the registry. We could then use the create(name) function to instantiate any class – essentially a pythonic Factory Method

class Task(ABC):

    def __init__(self) -> None:
        self.logger = logging.getLogger(self.__class__.__name__)

    def __init_subclass__(cls) -> None:
        registry.register(cls)

    @abstractmethod
    def run(self, **kwargs) -> bool:
        raise NotImplementedError

For the framework to automatically register the classes, it was important to follow the project structure. As long as the task resided in the ‘tasks’ module, we could scan this at runtime and register each task.

└── simple_tasker
    ├── __init__.py
    ├── cli.py
    └── tasks
        ├── __init__.py
        ├── archive.py
        └── shell_script.py

This was achieved with a simple dynamic module importer

modules = glob.glob(join(dirname(__file__), "*.py"))

for f in modules:
    if isfile(f) and not f.endswith("__init__.py"):
        __import__(f"{Task.__module__}.{basename(f)[:-3]}")

The configuration

In designing how the configuration would bind to the task, I needed to capture the name (what object to instanticate) and what args to pass to the instantiated run function. I decided to model it as below with everything under a ‘tasks’ array

tasks: [
    {
        name: shell_script
        args: {
            script_path: uname
            script_args: -a
        }
    },
    {
        name: shell_script
        args: {
            script_path: find
            script_args: [${CWD}/simple_tasker/tasks, -name, "*.py"]
        }
    },
    {
        name: archive
        args: {
            input_directory_path: ${CWD}/simple_tasker/tasks
            target_file_path: /tmp/${PLATFORM}_${TODAY}.tar.gz
        }
    }
]

Orchestration and validation

As mentioned previously, one of the goals was to ensure the configuration was valid prior to any execution. This meant that the framework needed to validate whether tha task name referred to a registered task, and that all mandatory arguments were addressed in the configuration. Determining whether the task was registered was just a simple key check, however to validate the arguments to the run required some inspection – I needed to get all args for the run function and filter out ‘self’ and any asterisk args (*args, **kwargs)

def get_mandatory_args(func) -> List[str]:

    mandatory_args = []
    for k, v in inspect.signature(func).parameters.items():
        if (
            k != "self"
            and v.default is inspect.Parameter.empty
            and not str(v).startswith("*")
        ):
            mandatory_args.append(k)

    return mandatory_args

And finally onto the actual execution bit. The main functionality required here is to validate that the config was defined correctly, then loop through all tasks and execute them – passing in any args.

class Tasker:

    def __init__(self, path: Path, env: Dict[str, str] = None) -> None:

        self.logger = logging.getLogger(self.__class__.__name__)
        self._tasks = []

        with wrap_environment(env):
            self._config = ConfigFactory.parse_file(path)


    def __validate_config(self) -> bool:

        error_count = 0

        for task in self._config.get("tasks", []):
            name, args = task["name"].lower(), task.get("args", {})

            if registry.has(name):
                for arg in get_mandatory_args(registry.get(name).run):
                    if arg not in args:
                        print(f"Missing arg '{arg}' for task '{name}'")
                        error_count += 1
            else:
                print(f"Unknown tasks '{name}'")
                error_count += 1

            self._tasks.append((name, args))

        return error_count == 0

    def run(self) -> bool:

        if self.__validate_config():

            for name, args in self._tasks:
                exe = registry.create(name)
                self.logger.info(f"About to execute: '{name}'")
                if not exe.run(**args):
                    self.logger.error(f"Failed tasks '{name}'")
                    return False

            return True
        return False

Putting it together – sample tasks

Below are two examples of how easy it is to configure the framework. We have a simple folder archiver that will tar/gz a directory based on 2 input parameters.

class Archive(Task):

    def __init__(self) -> None:
        super().__init__()

    def run(self, input_directory_path: str, target_file_path: str) -> bool:

        self.logger.info(f"Archiving '{input_directory_path}' to '{target_file_path}'")

        with tarfile.open(target_file_path, "w:gz") as tar:
            tar.add(
                input_directory_path,
                arcname=os.path.basename(input_directory_path)
            )
        return True

A more complex example would be the ability to execute shell scripts (or os functions) by passing in some optional variables and variables that can either be a string or list.

class ShellScript(Task):

    task_name = "shell_script"

    def __init__(self) -> None:
        super().__init__()

    def run(
        self,
        script_path: str,
        script_args: Union[str, List[str]] = None,
        working_directory_path: str = None
    ) -> bool:

        cmd = [script_path]

        if isinstance(script_args, str):
            cmd.append(script_args)
        else:
            cmd += script_args

        try:

            result = subprocess.check_output(
                cmd,
                stderr=subprocess.STDOUT,
                cwd=working_directory_path
            ).decode("utf-8").splitlines()

            for o in result:
                self.logger.info(o)

        except (subprocess.CalledProcessError, FileNotFoundError) as e:
            self.logger.error(e)
            return False

        return True

You can view the entire implementation here

Masking Private Keys in CI/CD Pipelines in GitLab

Big fan of GitLab (and GitLab CI in particular). I had a recent requirement to push changes to a wiki repo associated with a GitLab project through a GitLab CI pipeline (using the SaaS version of GitLab) and ran into a conundrum…

Using the GitLab SaaS version – deploy tokens can’t have write api access, so the next best solution is to use deploy keys, adding your public key as a deploy key and granting this key write access to repositories is relatively straightforward.

This issue is when you attempt to create a masked GitLab CI variable using the private key from your keypair, you get this…

I was a bit astonished to see this to be honest… Looks like it has been raised as an issue several times over the last few years but never resolved (the root cause of which is something to do with newline characters or base64 encoding or the overall length of the string).

I came up with a solution! Not pretty but effective, masks the variable so that it cannot be printed in CI logs as shown here:

Setup

Add a masked and protected GitLab variable for each line in the private key, for example:

The Code

Add the following block to your .gitlab-ci.yml file:

now within Jobs in your pipeline you can simply do this to clone, push or pull from a remote GitLab repo:

as mentioned not pretty, but effective and no other cleaner options as I could see…

Okta Admin Command Line Interface

Identity and Access Management is a critical component of any application or SaaS architecture. I’m currently doing a spike of the Okta solution for an application development project I am on. Okta is a comprehensive solution built on the open OAuth2 and OIDC protocols, as well as supporting more conventional identity federation approaches such as SAML.

Okta has a clean and easy to use web-based Admin interface which can be used to create applications, users, claims, identity providers and more.

During my spike, which was done in a crash and burn test Okta organisation, I had associated my user account with a Microsoft Identity Provider for SSO, and subsequently had issues accessing the Microsoft Account my user was associated with, as a result I managed to lock myself (the super admin) out of the Okta Admin Console.

Fortunately, prior to doing this I had created an API token for my user. So, I went about looking at ways I could interact with Okta programmatically. My first inclination was to use a simple CLI for Okta to get me out of jail… but I found there wasn’t one that suited. There are, however, a wealth of SDKs for Okta across multiple front-end and back-end oriented programming languages (such as JavaScript, Golang, Python and more).

Being in lockdown and having some free time on my hands, I decided to create a simple open source command line tool which could be used to administer an Okta organisation. The result of this weekend lockdown is okta-admin

okta-admin cli

For this project I used the Golang SDK for Okta, along with the Cobra and Viper Golang packages (used by docker, kubectl and other popular command line utilities). To provide a query interface to JSON response payloads I use GJson.

Will keep adding to this so stay tuned…

Complete source code for this project is available at https://github.com/gammastudios/okta-admin

Enumerating all roles for a user in Snowflake

Snowflake allows roles to be assigned to other roles, so when a user is assigned to a role, they may inherit the ability to use countless other roles.

Challenge: recursively enumerate all roles for a given user

One solution would be to create a complex query on the “SNOWFLAKE"."ACCOUNT_USAGE"."GRANTS_TO_ROLES" object.

An easier solution is to use a stored procedure to recurse through grants for a given user and return an ARRAY of roles for that user.

This is a good programming exercise in tail call recursion (sort of) in JavaScript. Here is the code:

To call the stored proc, execute:

One drawback of stored procedures in Snowflake is that they can only have scalar or array return types and cannot be used directly in a SQL query, however you can use the table(result_scan(last_query_id())) trick to get around this, as shown below where we will pivot the ARRAY into a record set with the array elements as rows:

IMPORTANT

This query must be the next statement run immediately after the CALL statement and cannot be run again until you run another CALL statement.

More adventures with Snowflake soon!

EventArc: The state of eventing in Google Cloud

When defining event-driven architectures, it’s always good to keep up with how the landscape is changing. How do you connect microservices in your architecture? Is Pub/Sub the end-game for all events? To dive a bit deeper, let’s talk through the benefits of having a single orchestrator, or perhaps a choreographer is better?

Orchestration versus choreography refresher

My colleague @jeffreyaven did a recent post explaining this concept in simple terms, which is worth reviewing, see:

Should there really be a central orchestrator controlling all interactions between services…..or, should each service work independently and only interact through events?

  • Orchestration is usually viewed as a domain-wide central service that defines the flow and control of communication between services. In this paradigm, in becomes easier to change and ultimately monitor policies across your org.
  • Choreography has each service registering and emitting events as they need to. It doesn’t direct or define the flow of communication, but using this method usually has a central broker passing around messages and allows services to be truly independent.

Enter Workflows, which is suited for centrally orchestrated services. Not only Google Cloud service such as Cloud Functions and Cloud Run, but also external services.

How about choreography? Pub/Sub and Eventarc are both suited for this. We all know and love Pub/Sub, but how do I use EventArc?

What is Eventarc?

Announced in October-2020, it was introduced as eventing functionality that enables you, the developer, to send events to Cloud Run from more than 60 Google Cloud sources.

But how does it work?

Eventing is done by reading those sweet sweet Audit Logs, from various sources, and sending them to Cloud Run services as events in Cloud Events format. Quick primer on Cloud Events: its a specification for describing event data in a common way. The specification is now under the Cloud Native Computing Foundation. Hooray! It can also read events from Pub/Sub topics for custom applications. Here’s a diagram I graciously ripped from Google Cloud Blog:

Eventarc

Why do I need Eventarc? I have the Pub/Sub

Good question. Eventarc provides and easier path to receive events not only from Pub/Sub topics but from a number of Google Cloud sources with its Audit Log and Pub/Sub integration. Actually, any service that has Audit Log integration can be an event source for Eventarc. Beyond easy integration, it provides consistency and structure to how events are generated, routed and consumed. Things like:

Triggers

They specify routing rules from events sources, to event sinks. Listen for new object creation in GCS and route that event to a service in Cloud Run by creating an Audit-Log-Trigger. Create triggers that also listen to Pub/Sub. Then list all triggers in one, central place in Eventarc:

gcloud beta eventarc triggers list

Consistency with eventing format and libraries

Using the CloudEvent-compliant specification will allow for event data in a common way, increasing the movement towards the goal of consistency, accessibility and portability. Makes it easier for different languages to read the event and Google Events Libraries to parse fields.

This means that the long-term vision of Eventarc to be the hub of events, enabling a unified eventing story for Google Cloud and beyond.

Eventarc producers and consumers

In the future, you can excpect to forego Audit Log and read these events directly and send these out to even more sinks within GCP and any HTTP target.


This article written on inspiration from https://cloud.google.com/blog/topics/developers-practitioners/eventarc-unified-eventing-experience-google-cloud. Thanks Mete Atamel!

Microservices Concepts: Orchestration versus Choreography

One of the foundational concepts in microservices architecture and design patterns is the concept of Orchestration versus Choreography. Before we look at a reference implementation of each of these patterns, it is worthwhile starting with an analogy.

This is often likened to a Jazz band versus a Symphony Orchestra.

A modern symphony orchestra is normally comprised of sections such as strings, brass, woodwind and percussion sections. The sections are orchestrated by a conductor, usually placed at a central point with respect to each of the sections. The conductor instructs each section to perform their components of the overall symphony.

By contrast, a Jazz band does not have a conductor and also features improvisation, with different musicians improvising based upon each other. Choreography, although more aligned to dance, can involve improvisation. In both cases there is still an intended output and a general framework as to how the composition will be executed, however unlike a symphony orchestra there is a degree of spontaneity.

Now back to technology and microservices…

In the Orchestration model, there is a central orchestration service which controls the interactions between other services, in other words the flow and control of communication and/or message passing between services is controlled by an orchestrator (much like the conductor in a symphony orchestra). On the plus side, this model enables easier monitoring and policy enforcement across the system. A generalisation of the Orchestration model is shown below:

Orchestration model

By contrast, in the Choreography model, each service works independently and interacts with other services through events. In this model each service registers and emits events as they need to. The flow (of communication between services) is not predefined, much like a Jazz band. This model often includes a central broker for message passing between services, but the services operate independently of each other and are not controlled by a central service (an orchestrator). A generalisation of the Choreography model is shown below:

Choreography model

We will post subsequent articles with implementations of these patterns, but it is worthwhile getting a foundational understanding first.

Using the Azure CLI to Create an API using a Function App within API Management

Function Apps, Logic Apps and App Services can be used to expose APIs within Azure API Management which is an easy way to deploy serverless microservices. You can see this capability in the Azure portal below within API Management:

Add a new API using a Function App as a back end

Like most readers, I like to script everything, so I was initially frustrated when I couldn’t replicate this operation in the Azure cli, REST, PowerShell, or any of the other SDKs or IaC tools. Others shared my frustration as seen here.

I was nearly resigned to using click ops in the portal (arrrgh) before I worked out this workaround.

The Solution

There is a bit more prep work required to automate this process, but it is well worth it.

1. Create an OpenApi (or Swagger spec or WADL) specification document, as seen below (use the absolute URL for your Function App in the url parameter):

2. Use the az apim api import function (not the az apim api create function), as shown here:

3. Associate the API with a product (which is how you can rate limit APIs)

That’s it! You can now access your function via the API gateway using the gateway url or via the developer portal as seen below:

Function App API in API Management in the Azure Portal
Function App API in the Dev Portal