Spark Streaming with Kafka-Connect Debezium Connector

Suchit Gupta
5 min readMay 17, 2020

A quick guide to set up Debezium and use Spark-Streaming Job as a Kafka consumer.

Bird’s eye view of the architecture:

The Architecture

Debezium

Debezium is an open-source distributed platform for change data capture. It provides a set of Kafka connector which are capable of reading from the Database Binlog files and produce the changes as events in Kafka.

As a CDC, it gives you a state of Database Row before & after the commit and also tells what kind of operation it was i.e Create/ Update or Delete.

A Debezium Kafka message looks like:

{
"schema":{
"type":"struct",
"fields":[
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"order_number"
},
{
"type":"int32",
"optional":false,
"name":"io.debezium.time.Date",
"version":1,
"field":"order_date"
},
{
"type":"int32",
"optional":false,
"field":"purchaser"
},
{
"type":"int32",
"optional":false,
"field":"quantity"
},
{
"type":"int32",
"optional":false,
"field":"product_id"
}
],
"optional":true,
"name":"dbserver1.inventory.orders.Value",
"field":"before"
},
{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"order_number"
},
{
"type":"int32",
"optional":false,
"name":"io.debezium.time.Date",
"version":1,
"field":"order_date"
},
{
"type":"int32",
"optional":false,
"field":"purchaser"
},
{
"type":"int32",
"optional":false,
"field":"quantity"
},
{
"type":"int32",
"optional":false,
"field":"product_id"
}
],
"optional":true,
"name":"dbserver1.inventory.orders.Value",
"field":"after"
},
{
"type":"struct",
"fields":[
{
"type":"string",
"optional":false,
"field":"version"
},
{
"type":"string",
"optional":false,
"field":"connector"
},
{
"type":"string",
"optional":false,
"field":"name"
},
{
"type":"int64",
"optional":false,
"field":"ts_ms"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.data.Enum",
"version":1,
"parameters":{
"allowed":"true,last,false"
},
"default":"false",
"field":"snapshot"
},
{
"type":"string",
"optional":false,
"field":"db"
},
{
"type":"string",
"optional":true,
"field":"table"
},
{
"type":"int64",
"optional":false,
"field":"server_id"
},
{
"type":"string",
"optional":true,
"field":"gtid"
},
{
"type":"string",
"optional":false,
"field":"file"
},
{
"type":"int64",
"optional":false,
"field":"pos"
},
{
"type":"int32",
"optional":false,
"field":"row"
},
{
"type":"int64",
"optional":true,
"field":"thread"
},
{
"type":"string",
"optional":true,
"field":"query"
}
],
"optional":false,
"name":"io.debezium.connector.mysql.Source",
"field":"source"
},
{
"type":"string",
"optional":false,
"field":"op"
},
{
"type":"int64",
"optional":true,
"field":"ts_ms"
}
],
"optional":false,
"name":"dbserver1.inventory.orders.Envelope"
},
"payload":{
"before":{
"order_number":10004,
"order_date":16852,
"purchaser":1003,
"quantity":1,
"product_id":107
},
"after":{
"order_number":10004,
"order_date":16852,
"purchaser":1003,
"quantity":3,
"product_id":107
},
"source":{
"version":"1.0.3.Final",
"connector":"mysql",
"name":"dbserver1",
"ts_ms":1589660547000,
"snapshot":"false",
"db":"inventory",
"table":"orders",
"server_id":223344,
"gtid":null,
"file":"mysql-bin.000003",
"pos":354,
"row":0,
"thread":12,
"query":null
},
"op":"u",
"ts_ms":1589660547884
}
}

Spark Streaming

We are using Spark-Streaming as a processing engine, it reads the Debezium events from the Kafka topic and pushes the changes to PostgreSQL.

Sample Code

The sample code under discussion can be cloned from Github

How to bring the infrastructure up and running

docker-compose

version: '3.0'
services
:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- 2181:2181
- 2888:2888
- 3888:3888
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- 9092:9092
- 29092:29092
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
mysql:
image: debezium/example-mysql:1.0
container_name: mysql
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
connect:
image: debezium/connect:1.0
container_name: connect
ports:
- 8083:8083
depends_on:
- kafka
- mysql
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
postgresql:
image: sameersbn/postgresql:9.4
container_name: postgresql
environment:
- DEBUG=false
- DB_USER=test
- DB_PASS=Test123
- DB_NAME=test
ports:
- "5432:5432"

Execute:

docker-compose up -d

It will bring up the following containers:

containers

Add Debezium-Connector

Once the container “connect” is up and running, execute the following command to add MySqlConnector:

curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "1",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist":"inventory.orders"
}
}'

The connector is now ready to read MySQL Binlog files.

MySQL Database

The MySQL database in the docker-compose comes with a sample database and has the required database changes which are required for the connector to read the DB logs.

Table in MySQL

If you plan to use your own database then ensure the following things:

  1. The MySQL config for ‘binlog_format’ should be ROW
show variable 'binlog_format'; //should be ROW

2. Binary logging should be enabled. In MySQL cnf file:

# Uncomment the following if you want to log updates
log-bin=mysql-bin
expire_logs_days = 10
binlog_format = row

Kafka Topic

Once the connector is added via curl command, it starts pushing the snapshot changes to Kafka, and once that is done it changes it’s mode to Binlog where it is watching the changes in Binlog file and pushed them to Kafka.

Kafka Topic:

kafkacat -b localhost:29092 -t dbserver1.inventory.orders
Sample Kafka Message

Spark Streaming Job

The Spark-Streaming will listen to the Kafka Message, serialize/deserialize the message and push the change to PostgreSQL.

package com.sg.job.streaming

import com.sg.wrapper.SparkSessionWrapper
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object StreamingJob extends App with SparkSessionWrapper {

val currentDirectory = new java.io.File(".").getCanonicalPath
val kafkaReaderConfig = KafkaReaderConfig("localhost:29092", "dbserver1.inventory.orders")
val jdbCConfig = JDBCConfig(url = "jdbc:postgresql://localhost:5432/test")
new StreamingJobExecutor(spark, kafkaReaderConfig, currentDirectory + "/checkpoint/job", jdbCConfig).execute()
}

case class JDBCConfig(url: String, user: String = "test", password: String = "Test123", tableName: String = "orders_it")

case class KafkaReaderConfig(kafkaBootstrapServers: String, topics: String, startingOffsets: String = "latest")

case class StreamingJobConfig(checkpointLocation: String, kafkaReaderConfig: KafkaReaderConfig)

class StreamingJobExecutor(spark: SparkSession, kafkaReaderConfig: KafkaReaderConfig, checkpointLocation: String, jdbcConfig: JDBCConfig) {

def execute(): Unit = {
// read data from kafka and parse them
val transformDF = read()
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "topic")

transformDF
.writeStream
.option("checkpointLocation", checkpointLocation)
.foreachBatch { (batchDF: DataFrame, _: Long) => {
batchDF.write
.format("jdbc")
.option("url", jdbcConfig.url)
.option("user", jdbcConfig.user)
.option("password", jdbcConfig.password)
.option("driver", "org.postgresql.Driver")
.option(JDBCOptions.JDBC_TABLE_NAME, jdbcConfig.tableName)
.option("stringtype", "unspecified")
.mode(SaveMode.Append)
.save()
}
}.start()
.awaitTermination()
}

def read(): DataFrame = {
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaReaderConfig.kafkaBootstrapServers)
.option("subscribe", kafkaReaderConfig.topics)
.option("startingOffsets", kafkaReaderConfig.startingOffsets)
.load()
}
}

PostgreSQL

Once the job is running, you can connect to PostgreSQL DB and see a table with the name orders_it has been created.

Table order_it

Now, play around with MySQL DB and see the changes getting pushed to PostgreSQL.

The same can be simulated via an Integration Test.

--

--