Converting table schemas from any database to BigQuery for Dataflow

Why do this?
When using Dataflow Google-provided templates for any RDBMS to BigQuery (such as PostgreSQL to BigQuery) the target table in BigQuery has to already exist. It logically must have the correct column names and data types.
We want a way to automate the process of reading a source table schema and converting it to a BigQuery schema and then deploying it.
Example source system
For demonstration purposes we will use a Postgres database. It is hosted on a Cloud VM and the firewall rules have been set so that my local machine can access it as well as the Dataflow worker node.
I am using the HeidiSQL IDE to connect to one of the databases on the instance.
The table schema can be obtained from the information_schema.columns view.
BeamMeUp
Managing many table loads that use Dataflow Google-provided templates across many environments is not straight forward. One such issue is the one outlined in this blog post regarding the need to encrypt and encode credentials.
In an attempt to not repeat myself DRY while also trying to not over engineer things YAGNI, I developed a Python package with some reuse helper functions for working with Dataflow. I called it BeamMeUp.
One of the functions within BeamMeUp that this process relies on is the get_db_type_encoder
function.
def get_db_type_encoder(
db_type: str,
) -> pd.DataFrame:
"""Returns an encoder dataframe for source system to target system (BigQuery in this case)
Args:
db_type (str): The type of database (postgres or mssql)
Returns:
pd.DataFrame: a dataframe of the data type mappings
"""
columns = ["db_type", "source_type", "target_type"]
data = [
("mssql", "char", "string"),
...
("mssql", "date", "date"),
("mssql", "varchar", "string"),
("postgres", "character", "string"),
...
("postgres", "date", "date"),
]
db_type_encoder = pd.DataFrame(data, columns=columns)
return db_type_encoder[db_type_encoder["db_type"] == db_type]
DDL Helper
I developed a helper script that can be used for both PostgreSQL and Microsoft SQL Server source systems.
python3 bqddl-postgres-from-source.py --env dev
[11:25:21] Creating table DDL from source 🚀 mortimer_nfl.weather dbtools.py:409
writing DDL to repo dbtools.py:435
This will generate a script in appropriate folder within the repo.
📁 ddl/
│ ├── 📁 bigquery/
│ │ ├── 📁 fruit-0/
│ │ │ │ 📄 nfl.weather.sql
create table nfl.weather (
`weather_game_id` int64
,`game_id` int64
,`temperature` int64
,`weather_condition` string
,`wind_speed` int64
,`humidity` int64
,`wind_direction` string
,`created_date` timestamp
,`create_user` string
)
;
We can now deploy this to the appropriate project in GCP using the --run_flag
.
python3 bqddl-postgres-from-source.py --env dev --run_flag
[11:25:21] Creating table DDL from source 🚀 mortimer_nfl.weather dbtools.py:409
[11:25:22] writing DDL to repo dbtools.py:435
Deploying TABLE DDL to BigQuery dbtools.py:451
This will have dropped the table if it existed and created the new one ready to be loaded by Dataflow.