Integration Testing in Spark Structured Streaming

Suchit Gupta
5 min readMay 26, 2020

A guide for writing integration test for a Spark Structured Streaming Application

Bird’s eye view of the architecture:

The basic Architecture

The focus of the post will be entirely on Integration Test

For Set Up And Running the Spark Application, please read my earlier post.

The Code

  1. Spark: 2.4.5
  2. Scala 2.12.8
  3. Test Containers: 0.37.0
  4. Scala-Test: 3.0.1

Project Structure

The Spark Application

The Debezium Connector reads MySQL DB changes from the Binlog file and pushes the changes as Debezium events to a Kafka Topic.

The Spark Application Reads the data from the Kafka Topic and pushes the change to PostgreSQL.

package com.sg.job.streaming

import com.sg.wrapper.SparkSessionWrapper
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object StreamingJob extends App with SparkSessionWrapper {

val currentDirectory = new java.io.File(".").getCanonicalPath
val kafkaReaderConfig = KafkaReaderConfig("localhost:29092", "dbserver1.inventory.orders")
val jdbcConfig = JDBCConfig(url = "jdbc:postgresql://localhost:5432/test")
new StreamingJobExecutor(spark, kafkaReaderConfig, currentDirectory + "/checkpoint/job", jdbcConfig).execute()
}

case class JDBCConfig(url: String, user: String = "test", password: String = "Test123", tableName: String = "orders_it")

case class KafkaReaderConfig(kafkaBootstrapServers: String, topics: String, startingOffsets: String = "latest")

case class StreamingJobConfig(checkpointLocation: String, kafkaReaderConfig: KafkaReaderConfig)

class StreamingJobExecutor(spark: SparkSession, kafkaReaderConfig: KafkaReaderConfig, checkpointLocation: String, jdbcConfig: JDBCConfig) {

def execute(): Unit = {
// read data from kafka and parse them
val transformDF = read()
.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value", "topic")

transformDF
.writeStream
.option("checkpointLocation", checkpointLocation)
.foreachBatch { (batchDF: DataFrame, _: Long) => {
batchDF.write
.format("jdbc")
.option("url", jdbcConfig.url)
.option("user", jdbcConfig.user)
.option("password", jdbcConfig.password)
.option("driver", "org.postgresql.Driver")
.option(JDBCOptions.JDBC_TABLE_NAME, jdbcConfig.tableName)
.option("stringtype", "unspecified")
.mode(SaveMode.Append)
.save()
}
}.start()
.awaitTermination()
}

def read(): DataFrame = {
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaReaderConfig.kafkaBootstrapServers)
.option("subscribe", kafkaReaderConfig.topics)
.option("startingOffsets", kafkaReaderConfig.startingOffsets)
.load()
}
}

Integration Test For the Spark Application

For writing the integration test for the Spark Application discussed above we need the following:

  1. MySQL DB: With Binary Logging
  2. Kafka-Connect with Debezium connector
  3. Kafka
  4. Zookeeper
  5. PostgreSQL

I will use Docker Test Containers to build the required infra.

build.sbt

name := "spark-streaming-with-debezium"

version := "0.1"
scalaVersion := "2.12.8"

val
sparkVersion = "2.4.5"
val
testcontainersScalaVersion = "0.37.0"

resolvers += "confluent" at "http://packages.confluent.io/maven/"
resolvers += Resolver.jcenterRepo
resolvers += "Spark Packages Repo" at "http://dl.bintray.com/spark-packages/maven"


assemblyMergeStrategy in assembly := {
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case x => MergeStrategy.first
}

coverageEnabled.in(ThisBuild, IntegrationTest, test) := true

//skipping test cases during package
test
in assembly := {}

lazy val server = (project in file("."))
.configs(IntegrationTest)
.settings(Defaults.itSettings)

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.apache.kafka" % "kafka-clients" % "2.2.1",
"org.postgresql" % "postgresql" % "42.2.5",
"com.typesafe" % "config" % "1.4.0",
"org.slf4j" % "slf4j-simple" % "1.7.30",
"org.scalatest" % "scalatest_2.12" % "3.0.1" % "it,test",
"com.dimafeng" %% "testcontainers-scala-scalatest" % testcontainersScalaVersion % IntegrationTest,
"com.dimafeng" %% "testcontainers-scala-postgresql" % testcontainersScalaVersion % IntegrationTest,
"com.dimafeng" %% "testcontainers-scala-kafka" % testcontainersScalaVersion % IntegrationTest,
"com.dimafeng" %% "testcontainers-scala-mysql" % testcontainersScalaVersion % IntegrationTest,
"io.debezium" % "debezium-testing-testcontainers" % "1.0.3.Final" % IntegrationTest,
"mysql" % "mysql-connector-java" % "5.1.49" % IntegrationTest
)

Integration Test Flow

  1. At the start of the test case execution, we will start our docker test containers.
  2. We will do some write operations on our MySQL Database.
  3. Will add Debezium connector via a Curl command
  4. We will wait for some time to ensure that Kafka Connector is ready to read messages from MySQL DB
  5. Kafka-Connect(Debezium) Connector will read the MySQL changes and put them as events in Kafka.
  6. We will execute our Spark Structured Streaming job.
  7. The job will stream the Kafka messages and with small transformation put them in PostgreSQL.
  8. After some time a thread will stop the streaming job.
  9. Now, we will assert the records in PostgreSQL with the expected data.

Integration Test class

package com.sg.job.streaming

import java.sql.{Connection, DriverManager, Statement}
import java.time.LocalDateTime

import com.dimafeng.testcontainers.{ForAllTestContainer, MultipleContainers}
import com.sg.wrapper.{ContainerTestWrapper, SparkSessionITWrapper}
import io.debezium.testing.testcontainers.ConnectorConfiguration
import org.apache.spark.sql.SparkSession
import org.scalatest.{BeforeAndAfterAll, FlatSpec}
import org.slf4j.{Logger, LoggerFactory}

/**
* A Integration Test for
[[StreamingJob]]
*/
class StreamingIT extends FlatSpec with ForAllTestContainer with BeforeAndAfterAll with ContainerTestWrapper with SparkSessionITWrapper {

val sparkTest: SparkSession = spark
override val container: MultipleContainers = MultipleContainers(kafkaContainer, postgresContainer, mySQLContainer)
private val logger: Logger = LoggerFactory.getLogger(this.getClass.getName)

it must "integration test for streaming job" in {

logger.info("Kafka-Connect is ready to send events...")
createBinLogDBRecords(mySQLContainer.driverClassName, mySQLContainer.jdbcUrl, mySQLContainer.username, mySQLContainer.password)

logger.info("launching the job...")

val checkpoint = new java.io.File(".").getCanonicalPath + "/checkpoint/it" + LocalDateTime.now()
val kafkaReaderConfig = KafkaReaderConfig("localhost:29092", "dbserver1.inventory.orders")
val jobcConfig = JDBCConfig(url = postgresContainer.jdbcUrl)

new StreamingJobExecutor(sparkTest, kafkaReaderConfig, checkpoint, jobcConfig).execute()

logger.info("Assert the records")
Class.forName(postgresContainer.driverClassName)
val connection: Connection = DriverManager.getConnection(postgresContainer.jdbcUrl, postgresContainer.username, postgresContainer.password)
connection.setAutoCommit(true)
connection.setSchema("test")
val statement: Statement = connection.createStatement()

val sql = "select * from public.orders_it"
val
rs = statement.executeQuery(sql)
assert(rs != null)
try {
while (rs.next()) {
//Put the logic for assertion
println
(rs.getString("key"), rs.getString("value"), rs.getString("topic"))
}
} finally {
rs.close()
}
}

override def beforeAll() {
super.beforeAll()
debeziumContainer.start()
createSnapshotDBRecords(mySQLContainer.driverClassName, mySQLContainer.jdbcUrl, mySQLContainer.username, mySQLContainer.password)

val connector = ConnectorConfiguration.forJdbcContainer(mySQLContainer.container)
.`with`("database.server.name", "dbserver1")
.`with`("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.`with`("database.whitelist", "inventory")
.`with`("database.user", "root")
.`with`("database.password", "test")
.`with`("database.hostname", mySQLContainer.networkAliases.head)
.`with`("database.server.id", "123")
.`with`("database.port", "3306")
.`with`("name", "my-connector")
.`with`("database.history.kafka.bootstrap.servers", s"${kafkaContainer.networkAliases.head}:9092")
.`with`("database.history.kafka.topic", "schema-changes.inventory")

logger.info("Trying to connect to Kafka-Connect")
debeziumContainer.registerConnector("my-connector", connector)

//Giving time for kafka-connect to settle down.
Thread.sleep(30000)

//A way out of stopping the streaming job
val runnable: Runnable = () =>
try {
Thread.sleep(60000)
logger.info("Stopping the job...")
sparkTest.streams.active.foreach(q => q.stop())
sparkTest.sparkContext.stop()
sparkTest.stop()
} catch {
case e: InterruptedException => e.printStackTrace()
}
val thread = new Thread(runnable)
thread.start()
}

override def afterAll() {
sparkTest.stop()
debeziumContainer.stop()
super.afterAll()
}

/**
* DB changes for snapshot mode
*
*/
def createSnapshotDBRecords(driverClassName: String, jdbcUrl: String, username: String, password: String): Unit = {
Class.forName(driverClassName)
val connection: Connection = DriverManager.getConnection(jdbcUrl, username, password)
val statement: Statement = connection.createStatement()
statement.execute("create table inventory.orders (id int not null, title varchar(255), primary key (id))")
statement.execute("insert into inventory.orders values (1, 'Learn ADC')")
statement.execute("insert into inventory.orders values (2, 'Learn BDC')")
statement.closeOnCompletion()
connection.close()

}
/**
* DB change for binlog mode
*
*/
def createBinLogDBRecords(driverClassName: String, jdbcUrl: String, username: String, password: String): Unit = {
Class.forName(driverClassName)
val connection: Connection = DriverManager.getConnection(jdbcUrl, username, password)
val statement: Statement = connection.createStatement()
statement.execute("insert into inventory.orders values (3, 'Learn CDC')")
statement.execute("insert into inventory.orders values (4, 'Learn DDC')")
statement.execute("update inventory.orders set title = 'Learn EDC' where id = 4")
statement.execute("delete from inventory.orders where id = 3")
statement.closeOnCompletion()
connection.close()
}

}

When you run the test case, it will first bring the containers

Logs for integration test

Once the containers are up, we insert or test data in MySQL.

Debezium Connector ensures that MySQL records are available in Kafka as events.

Spark Application reads the Kafka topic and after doing the required transformation inserts data to PostgreSQL.

After 90 seconds, spark application is killed and control comebacks to Integration Test.

Now, you can create a connection to PostgreSQL DB and perform the required assertion.

sbt

Execute the following command to run the integration test

sbt clean coverage test coverage it:test coverageReport

I hope this was precise and helpful. The code can be downloaded from Github

--

--