Dataflow,  Data Engineering,  GCP,  BigQuery

Converting table schemas from any database to BigQuery for Dataflow

Converting table schemas from any database to BigQuery for Dataflow

Why do this?

When using Dataflow Google-provided templates for any RDBMS to BigQuery (such as PostgreSQL to BigQuery) the target table in BigQuery has to already exist. It logically must have the correct column names and data types.

bigquery ddl helper flex

We want a way to automate the process of reading a source table schema and converting it to a BigQuery schema and then deploying it.

Example source system

For demonstration purposes we will use a Postgres database. It is hosted on a Cloud VM and the firewall rules have been set so that my local machine can access it as well as the Dataflow worker node.

I am using the HeidiSQL IDE to connect to one of the databases on the instance.

bigquery ddl helper postgres heidi

The table schema can be obtained from the information_schema.columns view.

BeamMeUp

Managing many table loads that use Dataflow Google-provided templates across many environments is not straight forward. One such issue is the one outlined in this blog post regarding the need to encrypt and encode credentials.

In an attempt to not repeat myself DRY while also trying to not over engineer things YAGNI, I developed a Python package with some reuse helper functions for working with Dataflow. I called it BeamMeUp.

One of the functions within BeamMeUp that this process relies on is the get_db_type_encoder function.

def get_db_type_encoder(
    db_type: str,
) -> pd.DataFrame:
    """Returns an encoder dataframe for source system to target system (BigQuery in this case)

    Args:
        db_type (str): The type of database (postgres or mssql)

    Returns:
        pd.DataFrame: a dataframe of the data type mappings
    """
    columns = ["db_type", "source_type", "target_type"]
    data = [
        ("mssql", "char", "string"),
        ...
        ("mssql", "date", "date"),
        ("mssql", "varchar", "string"),
        ("postgres", "character", "string"),
        ...
        ("postgres", "date", "date"),
    ]
    db_type_encoder = pd.DataFrame(data, columns=columns)
    return db_type_encoder[db_type_encoder["db_type"] == db_type]

DDL Helper

I developed a helper script that can be used for both PostgreSQL and Microsoft SQL Server source systems.

bqddl-postgres-from-source.py

python3 bqddl-postgres-from-source.py --env dev
[11:25:21] Creating table DDL from source 🚀  mortimer_nfl.weather       dbtools.py:409
               writing DDL to repo                                       dbtools.py:435

This will generate a script in appropriate folder within the repo.

📁 ddl/
│   ├── 📁 bigquery/
│   │   ├── 📁 fruit-0/
│   │   │   │   📄 nfl.weather.sql
create table nfl.weather (
`weather_game_id` int64
  ,`game_id` int64
  ,`temperature` int64
  ,`weather_condition` string
  ,`wind_speed` int64
  ,`humidity` int64
  ,`wind_direction` string
  ,`created_date` timestamp
  ,`create_user` string
)
;

We can now deploy this to the appropriate project in GCP using the --run_flag.

python3 bqddl-postgres-from-source.py --env dev --run_flag
[11:25:21] Creating table DDL from source 🚀  mortimer_nfl.weather       dbtools.py:409
[11:25:22]     writing DDL to repo                                       dbtools.py:435
               Deploying TABLE DDL to BigQuery                           dbtools.py:451

This will have dropped the table if it existed and created the new one ready to be loaded by Dataflow.

bigquery ddl helper weather