Debezium- Production Deployment Preparation
If you are working on Debezium and plans to move it to production, I will suggest you go through this self-explanatory post. It's all based on my experience with MySQL Connector.
Notes:
- To make things understandable, I have exploded the Debezium event payload and extracted before, after, op, ts_ms, source(with only a few fields that I am interested in)
- I have used docker to set up my local environment
MySQL Binlog Files
MySQL has a binary log (binlog) that records all operations in the order in which they are committed to the database. This includes changes to table schemas and the data within tables. The Debezium MySQL connector reads the binlog and produces change events.
Offset Management
The Kafka Connect service is configured to periodically record the position and offsets of each connector to a topic, by default it is “my_connect_offsets”.
Command to get the offset using kafkacat:
kafkacat -b localhost:29092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
Offset looks like:
UseCase: When connect is not available
Stopped gracefully
- All positions and offsets will be recorded in Kafka.
- When those connectors are restarted, they will continue recording events exactly where they left off, with no duplicate events being recorded.
- Downstream applications consume from the topics will simply wait until new events are added.
Crashes unexpectedly
- Crash, would not let connectors commit the latest position and offsets to Kafka.
- When those connectors are restarted, they will start recording events from the database starting at the position/offset last recorded by the connector before it crashed. This will cause connect to send duplicate events to Kafka and eventually to the downstream applications.
- Downstream applications consume from the topics will simply wait until new events are added.
Kafka is not available
- The Debezium connector pauses until the connection is re-established and the connector resumes where it left last. No duplicate events are recorded.
- In some instances, a connector restart might be required. If the latest position and offset were recorded in Kafka then there would not be any duplicate events else it can lead to duplicate
MySQL purges binlog files
- If the Debezium MySQL connector stops for too long, the MySQL server purges older binlog files and the connector’s last position may be lost. When the connector is restarted, the MySQL server no longer has the starting point.
- If snapshot mode is “initial”, the connector fails with an error. No further processing of events would occur.
Connector status will be as below:
{
"name": "inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "172.18.0.6:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "172.18.0.6:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at binlog file 'mysql-bin.000005', pos=526, skipping 2 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.\n\tat io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:133)\n\tat io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"
}
],
"type": "source"
}
- If snapshot mode is “when_needed”, the connector performs another initial snapshot. This would leave to duplicate events. With this mode, try to explore the use of connector config “snapshot.select.statement.overrides”
Controls in which rows from tables will be included in the snapshot.
This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME). Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]. The value of those properties is the SELECT statement to use when retrieving data from the specific table during snapshotting. A possible use case for large append-only tables is setting a specific point where to start (resume) snapshotting, in case a previous snapshotting was interrupted.
Note: This setting has an impact on snapshots only. Events captured from binlog are not affected by it at all.
UseCase: When DB is not available
Stopped gracefully or crashed unexpectedly
- If your MySQL server becomes unavailable, the Debezium MySQL connector fails with an error, and the connector stops gracefully. You simply need to restart the connector when the server is available.
- Since Connector stops gracefully, all positions and offsets will be recorded in Kafka.
- When those connectors are restarted, they will continue recording events exactly where they left off, with no duplicate events being recorded.
- Downstream applications consume from the topics will simply wait until new events are added.
Use Case: Delete event (If Kafka is set up to be log compacted)
In the delete events; the connector triggers two events.
- The first is the same as other events.
- The second event is called a Tombstone event that has the same key but a
null
value. This means that Kafka will remove all prior messages with the same key.
The initial state of the sample table:
A record with the primary key ‘10040’ is deleted
Two Debezium events:
- Delete event
- Tombstone event
Use Case: Primary or unique key is updated
In the use case; the connector triggers three events.
- A delete event
- A Tombstone event with the old key for the row
- INSERT event with the new key for the row
The initial state of the sample table:
Primary Key(order_number) is updated from 10004 to 10005.
Three connect events:
- Delete event
- Tombstone event
- Insert Event
UseCase: When data is bad and we want to skip the record Or we want to read or reprocess old messages
How to get the offset?
Kafkacat utility can be used to get offset. The command looks like:
kafkacat -b localhost:29092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
How to update the offset?
- Pause your connector using:
curl -X PUT \
http://localhost:8083/connectors/inventory-connector/pause \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector-1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.fetch.size": 1
}
}'
2. Use Kafkacat utility can be used to interact with the Kafka topic and update the offset. The command looks like:
echo '["inventory-connector",{"server":"dbserver1"}]|{"ts_sec":1530168950,"file":"mysql-bin.000003","pos":154}' | kafkacat -P -b localhost:29092 -t my_connect_offsets -K \| -p 11
Note: Offset values should be one from history. A random assignment will fail.
3. Restart the connector.
curl -X POST \
http://localhost:8083/connectors/inventory-connector/restart \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector-1",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.fetch.size": 1
}
}'
Use Case: Understanding the initial Snapshot
The initial state of the table after creation
Update value in a row
Create MySQL connector
curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist": "inventory.orders"
}
}'
Debezium Events
- source.ts_ms: Is zero for all the records
- source.pos: It is the same for all the records and is the latest pos from the binlog file
- snapshot: This is true except for the last record.
Update a record
Offsets
Get offsets by executing:
kafkacat -b localhost:29092 -C -t my_connect_offsets -f 'Partition(%p) %k %s\n'
Use Case: Duplicate events
Update the offset
Let’s try to produce the duplicate events by changing the offset to an old value
echo '["inventory-connector",{"server":"dbserver1"}]|{"file":"mysql-bin.000003","pos":461}' | kafkacat -P -b localhost:29092 -t my_connect_offsets -K \| -p 11
Stop Kafka Connect and Run it again
Debezium Event
Batch 2 is the duplicate of batch 1
A close look and you will observer, both the batch have same source.pos and source.ts_ms
UseCase: View binlog file
Get into the mysql
docker exec -it mysql bash
List the files(mysql-bin.0000*)
Decode the file
Execute:
mysqlbinlog --base64-output=decode-rows -vv --start-datetime="2015-01-12 21:40:00" --stop-datetime="2021-01-12 21:45:00" mysql-bin.000003
The file looks like:
SET TIMESTAMP=1588903405/*!*/;
SET @@session.pseudo_thread_id=2/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1436549152/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=45,@@session.collation_connection=45,@@session.collation_server=8/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
BEGIN
/*!*/;
# at 296
#200508 2:03:25 server id 223344 end_log_pos 354 CRC32 0x967d2016 Table_map: `inventory`.`orders` mapped to number 108
# at 354
#200508 2:03:25 server id 223344 end_log_pos 430 CRC32 0x3aec8548 Update_rows: table id 108 flags: STMT_END_F
### UPDATE `inventory`.`orders`
### WHERE
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=1 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
### SET
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=31 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
# at 430
#200508 2:03:25 server id 223344 end_log_pos 461 CRC32 0x7b02e4a7 Xid = 33
COMMIT/*!*/;
# at 461
#200508 2:08:21 server id 223344 end_log_pos 526 CRC32 0x7c7f0216 Anonymous_GTID last_committed=1 sequence_number=2 rbr_only=yes
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 526
#200508 2:08:21 server id 223344 end_log_pos 603 CRC32 0x20abb723 Query thread_id=2 exec_time=0 error_code=0
SET TIMESTAMP=1588903701/*!*/;
BEGIN
/*!*/;
# at 603
#200508 2:08:21 server id 223344 end_log_pos 661 CRC32 0x9a928ca5 Table_map: `inventory`.`orders` mapped to number 231
# at 661
#200508 2:08:21 server id 223344 end_log_pos 737 CRC32 0xa058effd Update_rows: table id 231 flags: STMT_END_F
### UPDATE `inventory`.`orders`
### WHERE
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=31 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
### SET
### @1=10004 /* INT meta=0 nullable=0 is_null=0 */
### @2='2016:02:21' /* DATE meta=0 nullable=0 is_null=0 */
### @3=1003 /* INT meta=0 nullable=0 is_null=0 */
### @4=41 /* INT meta=0 nullable=0 is_null=0 */
### @5=107 /* INT meta=0 nullable=0 is_null=0 */
# at 737
#200508 2:08:21 server id 223344 end_log_pos 768 CRC32 0xfab6c10d Xid = 252
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
Note: show binary logs; command can be used to list the log files.
Use case Config “snapshot.select.statement.override”
The intial state of the table
Create a connector with config “snapshot.select.statement.overrides”
curl -X POST \
http://localhost:8083/connectors/ \
-H 'Cache-Control: no-cache' \
-H 'Content-Type: application/json' \
-d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"decimal.handling.mode": "double",
"snapshot.mode": "when_needed",
"table.whitelist": "inventory.orders",
"snapshot.select.statement.overrides": "inventory.orders",
"snapshot.select.statement.overrides.inventory.orders": "SELECT orders.* FROM orders WHERE order_date > '\''2016-01-17'\''"
}
}'
Debezium events
In the initial snapshot mode, only two records are fetched as they fit to the criteria of the query “SELECT orders.* FROM orders WHERE order_date > ‘2016–01–17’”
Hope this was helpful :)