MySQL to PostgreSQL using Kafka Connect

Suchit Gupta
3 min readMay 31, 2020

Our objective would be to quickly set up a data pipeline and move data from MySQL to PostgreSQL.

Debezium

We would use the Debezium MySQL connector for reading MySQL Database logs. Debezium would put the events in Kafka.

Kafka-Connect

We would use the JDBC Connector to read the events from the Kafka Topic and put in PostgreSQL.

Code

The code can be downloaded from Github.

Build the pipeline

We would use docker-compose to set up the following:

  1. MySQL: The source DB.
  2. PostgreSQL: The destination DB
  3. Kafka-Connect(Debezium and JDBC Connector): Debezium for reading MySQL Logs and JDBC Connector for pushing the change to PostgreSQL.
  4. Kafka
  5. Zookeeper

Build the image

To build the Kafka-connect, we will add the JDBC connector and PostgreSQL JDBC Driver to our Kafka-Connect image.

docker-compose build

Dockerfile

FROM debezium/connect:1.0
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc

ARG POSTGRES_VERSION=42.2.8
ARG KAFKA_JDBC_VERSION=5.5.0

# Deploy PostgreSQL JDBC Driver
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar

# Deploy Kafka Connect JDBC
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

Run the containers

docker-compose up -d
Our containers

MySQL

MySQL comes with a database inventory.

Debezium Connector

Execute the following curl command to set up the Debezium connector for reading the logs from MySQL.

This will create topics following topics:

  1. orders
  2. addresses
  3. customers
  4. products
  5. products_on_hand
curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}'

JDBC connector

Execute the following curl command to set up the JDBC connector for writing the events from “orders” MySQL Table to PostgreSQL.

We have kept “auto.create”: “true” so that it automatically creates tables in PostgreSQL.

curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "rtw-connector-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "orders",
"connection.url": "jdbc:postgresql://postgresql:5432/test_db",
"connection.user": "test_user",
"connection.password": "Welcome123",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "insert",
"pk.fields": "order_number",
"pk.mode": "none"

}
}'

PostgreSQL

Once the connector is set up it will automatically create a table with name orders.

Play Around

Make changes in MySQL orders table and see them getting reflected in PostgreSQL orders table.

--

--