Debezium- Production Deployment Preparation

Suchit Gupta
9 min readMay 8, 2020

--

Debezium

If you are working on Debezium and plans to move it to production, I will suggest you go through this self-explanatory post. It's all based on my experience with MySQL Connector.

Notes:

  • To make things understandable, I have exploded the Debezium event payload and extracted before, after, op, ts_ms, source(with only a few fields that I am interested in)
  • I have used docker to set up my local environment

MySQL Binlog Files

MySQL has a binary log (binlog) that records all operations in the order in which they are committed to the database. This includes changes to table schemas and the data within tables. The Debezium MySQL connector reads the binlog and produces change events.

Offset Management

The Kafka Connect service is configured to periodically record the position and offsets of each connector to a topic, by default it is “my_connect_offsets”.

Command to get the offset using kafkacat:

kafkacat -b localhost:29092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'

Offset looks like:

Offset after the initial snapshot
Offset when snapshot mode is false

UseCase: When connect is not available

Stopped gracefully

  • All positions and offsets will be recorded in Kafka.
  • When those connectors are restarted, they will continue recording events exactly where they left off, with no duplicate events being recorded.
  • Downstream applications consume from the topics will simply wait until new events are added.

Crashes unexpectedly

  • Crash, would not let connectors commit the latest position and offsets to Kafka.
  • When those connectors are restarted, they will start recording events from the database starting at the position/offset last recorded by the connector before it crashed. This will cause connect to send duplicate events to Kafka and eventually to the downstream applications.
  • Downstream applications consume from the topics will simply wait until new events are added.

Kafka is not available

  • The Debezium connector pauses until the connection is re-established and the connector resumes where it left last. No duplicate events are recorded.
  • In some instances, a connector restart might be required. If the latest position and offset were recorded in Kafka then there would not be any duplicate events else it can lead to duplicate

MySQL purges binlog files

  • If the Debezium MySQL connector stops for too long, the MySQL server purges older binlog files and the connector’s last position may be lost. When the connector is restarted, the MySQL server no longer has the starting point.
  • If snapshot mode is “initial”, the connector fails with an error. No further processing of events would occur.

Connector status will be as below:

{
"name": "inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.18.0.6:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.18.0.6:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at binlog file 'mysql-bin.000005', pos=526, skipping 2 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
}
],
"type": "source"
}
  • If snapshot mode is “when_needed”, the connector performs another initial snapshot. This would leave to duplicate events. With this mode, try to explore the use of connector config snapshot.select.statement.overrides”
Controls in which rows from tables will be included in the snapshot.
This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id
snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]. The value of those properties is the SELECT statement to use when retrieving data from the specific table during snapshotting. A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted.
Note: This setting has an impact on snapshots only. Events captured from binlog are not affected by it at all.

UseCase: When DB is not available

Stopped gracefully or crashed unexpectedly

  • If your MySQL server becomes unavailable, the Debezium MySQL connector fails with an error, and the connector stops gracefully. You simply need to restart the connector when the server is available.
  • Since Connector stops gracefully, all positions and offsets will be recorded in Kafka.
  • When those connectors are restarted, they will continue recording events exactly where they left off, with no duplicate events being recorded.
  • Downstream applications consume from the topics will simply wait until new events are added.

Use Case: Delete event (If Kafka is set up to be log compacted)

In the delete events; the connector triggers two events.

  • The first is the same as other events.
  • The second event is called a Tombstone event that has the same key but a null value. This means that Kafka will remove all prior messages with the same key.

The initial state of the sample table:

The initial state of the table

A record with the primary key ‘10040’ is deleted

State after a row is deleted

Two Debezium events:

  • Delete event
  • Tombstone event
Debezium events

Use Case: Primary or unique key is updated

In the use case; the connector triggers three events.

  • A delete event
  • A Tombstone event with the old key for the row
  • INSERT event with the new key for the row

The initial state of the sample table:

The initial state of the table

Primary Key(order_number) is updated from 10004 to 10005.

Three connect events:

  • Delete event
  • Tombstone event
  • Insert Event
Debezium events

UseCase: When data is bad and we want to skip the record Or we want to read or reprocess old messages

How to get the offset?

Kafkacat utility can be used to get offset. The command looks like:

kafkacat -b localhost:29092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'

How to update the offset?

  1. Pause your connector using:
curl -X PUT \
http://localhost:8083/connectors/inventory-connector/pause \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector-1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.fetch.size": 1
}
}'

2. Use Kafkacat utility can be used to interact with the Kafka topic and update the offset. The command looks like:

echo '["inventory-connector",{"server":"dbserver1"}]|{"ts_sec":1530168950,"file":"mysql-bin.000003","pos":154}' | kafkacat -P -b localhost:29092 -t my_connect_offsets -K \| -p 11

Note: Offset values should be one from history. A random assignment will fail.

3. Restart the connector.

curl -X POST \
http://localhost:8083/connectors/inventory-connector/restart \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector-1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.fetch.size": 1
}
}'

Use Case: Understanding the initial Snapshot

The initial state of the table after creation

Update value in a row

Create MySQL connector

curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist": "inventory.orders"
}
}'

Debezium Events

  • source.ts_ms: Is zero for all the records
  • source.pos: It is the same for all the records and is the latest pos from the binlog file
  • snapshot: This is true except for the last record.
The connector takes the snapshot at the time it is created; it doesn’t go to history

Update a record

Debezium Event

Offsets

Get offsets by executing:

kafkacat -b localhost:29092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
First one is for snapshot and second one is for the normal cycle

Use Case: Duplicate events

Update the offset

Let’s try to produce the duplicate events by changing the offset to an old value

echo '["inventory-connector",{"server":"dbserver1"}]|{"file":"mysql-bin.000003","pos":461}' | kafkacat -P -b localhost:29092 -t my_connect_offsets -K \| -p 11
Offset update

Stop Kafka Connect and Run it again

Debezium Event

Batch 2 is the duplicate of batch 1

Batch 2 is the duplicate of batch 1

A close look and you will observer, both the batch have same source.pos and source.ts_ms

UseCase: View binlog file

Get into the mysql

docker exec -it mysql bash

List the files(mysql-bin.0000*)

Decode the file

Execute:

mysqlbinlog --base64-output=decode-rows -vv  --start-datetime="2015-01-12 21:40:00"  --stop-datetime="2021-01-12 21:45:00" mysql-bin.000003

The file looks like:

SET TIMESTAMP=1588903405/*!*/;
SET @@session.pseudo_thread_id=2/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1436549152/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=45,@@session.collation_connection=45,@@session.collation_server=8/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
BEGIN
/*!*/;
# at 296
#200508 2:03:25 server id 223344 end_log_pos 354 CRC32 0x967d2016 Table_map: `inventory`.`orders` mapped to number 108
# at 354
#200508 2:03:25 server id 223344 end_log_pos 430 CRC32 0x3aec8548 Update_rows: table id 108 flags: STMT_END_F
### UPDATE `inventory`.`orders`
### WHERE
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=1 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
### SET
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=31 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
# at 430
#200508 2:03:25 server id 223344 end_log_pos 461 CRC32 0x7b02e4a7 Xid = 33
COMMIT/*!*/;
# at 461
#200508 2:08:21 server id 223344 end_log_pos 526 CRC32 0x7c7f0216 Anonymous_GTID last_committed=1 sequence_number=2 rbr_only=yes
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 526
#200508 2:08:21 server id 223344 end_log_pos 603 CRC32 0x20abb723 Query thread_id=2 exec_time=0 error_code=0
SET TIMESTAMP=1588903701/*!*/;
BEGIN
/*!*/;
# at 603
#200508 2:08:21 server id 223344 end_log_pos 661 CRC32 0x9a928ca5 Table_map: `inventory`.`orders` mapped to number 231
# at 661
#200508 2:08:21 server id 223344 end_log_pos 737 CRC32 0xa058effd Update_rows: table id 231 flags: STMT_END_F
### UPDATE `inventory`.`orders`
### WHERE
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=31 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
### SET
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=41 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
# at 737
#200508 2:08:21 server id 223344 end_log_pos 768 CRC32 0xfab6c10d Xid = 252
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;

Note: show binary logs; command can be used to list the log files.

Use case Config “snapshot.select.statement.override”

The intial state of the table

Create a connector with config “snapshot.select.statement.overrides”

curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist": "inventory.orders",
"snapshot.select.statement.overrides": "inventory.orders",
"snapshot.select.statement.overrides.inventory.orders": "SELECT orders.* FROM orders WHERE order_date > '\''2016-01-17'\''"
}
}'

Debezium events

In the initial snapshot mode, only two records are fetched as they fit to the criteria of the query “SELECT orders.* FROM orders WHERE order_date > ‘2016–01–17’”

Only two events will come as they fit the selection criteria

Hope this was helpful :)

--

--

Responses (2)