Extending Airflow's BigQueryOperator so to allow double substitution of parameters


This note describes the solution to a small problem related to Airflow's BigQueryOperator which I encountered a while ago.
The solution was not entirely trivial for me to figure out, so I decided to share it here in case it may be beneficial
to someone else. Therefore, I assume some basic knowledge of Airflow in this note, as it is not meant to be introductory.

What we mean by "double substitution"

I will start with describing the issue.
The
BigQueryOperator
contains the following templated parameters:

  • destination_dataset_table
  • sql
  • labels
  • query_params
  • bql (deprecated, so we will ignore it here)

As it is common in Airflow, user can pass a dictionary param to BigQueryOperator, that can be later referenced
in templates.

However, what if one wants to pass a template string in param? That is, what if one wants to pass something like
{"x":"{{var.json.global_config.x}}"} in params, so to reference some global_config
variable defined in Airflow as {"x":"value"}, and then
have SQL template

SELECT "{{params.x}}" AS x

to be substituted to

SELECT "value" AS x

?

This will not work, as you will in fact get
sql
SELECT "{{var.json.global_config.x}}" AS x

This makes sense, as substitution in Airflow (in its underlying Jinja template engine)
does not happen recursively. This is justified, as otherwise one would have to face the possibility of endless substitution
loops. However, this is not always convenient.

Why we want double substitution

Now, the scenarios when one wants to perform double substitutions are more common, than one might thought at first glance.
Imagine that in our Airflow deployment every DAG we deploy has its own corresponding variable, and also every DAG
may reference certain global_config variable, containing global configuration. Now, to avoid duplication, it would be ideally
to be able to reference global_config within the configuration variables of individual DAGs, like in the diagram:

However, if double substitution
is unavailable, we have to do the substitution manually in every DAG's source code. This clutters the logic.

How we can achieve double substitution

One solution is to create our own ExtendedBigQueryOperator, which one would then call as, for example

ExtendedBigQueryOperator(task_id="query",
                         sql="\"{{params.templated_params.x}}\" as x",
                         templated_params={
                             "x": "{{var.json.global_config.template_data.environment}}"
                         },
                         destination_dataset_table="xxx.yyy.zzz"
                         )

We want ExtendedBigQueryOperator to satisfy the following properties:

  1. mimic the constructor of BigQueryOperator, except
  2. that it would accept additional dictionary parameter templated_params, which would contain parameters which we want to substitute twice
  3. behave identically to BigQueryOperator in every other aspect

Requirements 1. and 3. naturally call for ExtendedBigQueryOperator being inherited from BigQueryOperator, and we also
add the additional requirement

  1. amount of additional code in ExtendedBigQueryOperator should be as small as possible, so to prevent the inheritance from breaking in case BigQueryOperator will change in future

Finally, this is what I came up with, based on the requirements above (16 lines of code):

from airflow.contrib.operators.bigquery_operator import BigQueryOperator

def _deep_copy_dict(dst,src):
    for k in src:
        dst[k] = src[k]
class ExtendedBigQueryOperator(BigQueryOperator):
    def __init__(self,templated_params,*args,**kwargs):
        kwargs["params"] = {**kwargs["params"],"templated_params":templated_params}
        self.templated_params = templated_params
        self.template_fields = ("templated_params",*(super(ExtendedBigQueryOperator, self).template_fields))
        super(ExtendedBigQueryOperator, self).__init__(*args, **kwargs)
    def render_template(self, content, *args, **kwargs):
        res = super(ExtendedBigQueryOperator, self).render_template(content,*args,**kwargs)
        if(content=="templated_params"):
            _deep_copy_dict(self.templated_params,res)
        return res