Data Leader with an Infinite Sleep Debt
Building an enterprise data warehouse can be either relatively straightforward or very sophisticated. It depends on many factors, such as the conceptual data model complexity and the variety of source systems. In many cases, applying the Change Data Capture (CDC) approach can make the data integration simpler. Fortunately, there are plenty of CDC tools available in the market, many of which are easy-to-use and affordable, while others are cumbersome and expensive (for what it is).
What I am interested in doing is to move the data from a SQL Server to Google BigQuery without too much hassle like changing the firewall rules. Many enterprise solutions can achieve this out of the box, such as Alooma and Qlik Replicate. The selection process should take into account at least the following.
To test this out, I use the Stack Overflow data (~10GB) provided by Brent Ozar with a simple setup as follows.
Configure a SQL Server
Set Up a Debezium Server
debezium.sink.type=pubsub debezium.sink.pubsub.projectid=<PROJECT_ID> debezium.sink.pubsub.ordering.enabled=true debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=<IP_ADDRESS_OR_HOSTNAME> debezium.source.database.port=1433 debezium.source.database.user=<USERNAME> debezium.source.database.password=<PASSWORD> debezium.source.database.dbname=<DB_NAME> debezium.source.database.server.name=<SERVER_NAME> debezium.source.schema.whitelist=<SCHEMA> debezium.source.database.history.kafka.bootstrap.servers=<KAFKA_SERVER> debezium.source.database.history.kafka.topic=<KAFKA_TOPIC>
Create and Start the Processing Pipeline
#!/usr/bin/env python3 # -*- coding: utf-8 -*- ''' This HTTP function is responsible for: - Parsing the messages published by the Debezium Server. - Inserting the records to relevant BigQuery tables. This HTTP function is not responsible for: - Handling or logging errors. - Deduplicating data. - Validating the input. - Checking for dupliations. ''' import base64 import json from google.api_core import retry from google.cloud import bigquery BQ = bigquery.Client() def stream_dbz_message(request): '''This function is executed whenever the endpoint is called''' request_json = request.get_json(silent=True) data = request_json["message"]["data"] data = base64.b64decode(data).decode('utf-8') data = json.loads(data) payload = data["payload"] record = payload["after"] record["op"] = payload["op"] record["ts_ms"] = payload["ts_ms"] dataset_id = payload["source"]["db"] table_id = payload["source"]["table"] table = BQ.dataset(dataset_id).table(table_id) _ = BQ.insert_rows_json(table=table, json_rows=[record]) return
The initial load of all the tables took quite a while to complete as it was configured to run sequentially. The HTTP function above was written for a quick test and should not be used in production. To make it production-ready, it requires a better error handling, a recovery mechanism, and a deduplication method to guarantee high-quality delivery.
With the support for SQL Server coming soon, it might be more convenient for those who are already on GCP to use the Dataflow CDC Example to stream data directly into BigQuery in a more scalable and controllable way. For example, the embedded connector can be run in a single-topic mode, publishing all updates for a database to a single Pub/Sub topic.
I would also like to try out a framework like PipelineWise, which is based on Singer.io specification and already have a few log-based connectors. It seems that the industry is shying away from a locked-in ecosystem and aggressive price points at a rapid pace. Debezium and many other open-source tools are still cumbersome, but it is evolving both in terms of quality and usability. When it is not feasible to use fully managed services, these can become great options.
Previously published at https://www.suksant.com/debezium/