Dataflow. End-to-end test your pipeline

Dataflow is a managed service from Google Cloud for data processing and data transformations. It uses Apache Beam, an open source project, to define the pipeline, which in fact it is just a list of data transformations and aggregations rules from an input to a final output. While it’s easy to find documentation about how to create a new pipeline and unit tests, I was not able to find any tutorial about how to do end-to-end testing of a DataFlow pipeline using a real environment.

TL;DR

You can find the source code for this solution in this repository github.com/gabihodoroaga/dataflow-e2e-demo.

The challenge

In order to run e2e tests on a Dataflow pipeline you need to use real GCP resources like Pub/Sub topics and BigQuery tables. All these resources need to be created (preferably with unique random names) before running the test and deleted at the end of the test, including the actual Dataflow job (a streaming pipeline needs to be explicitly canceled).

The solution

JUnit 5 has support for @BeforeAll, @AfterAll, @BeforeEach, or @AfterEach and you can use these annotations to create the resources at the beginning of the test and delete them at the end, but doing this will make the tests look ugly because it will require a lot of code to create and delete the GCP resources.

@BeforeAll
fun init() {
	println("BeforeAll init() method called")
}

The JUnit 5 has also the concept of “extension”. You can find the full documentation here. Using this method we can hide the complexity of creating and deleting all the required GCP resources in some dedicated classes or we can put them in a library.

Our skeleton for a test class will look like this:


@TestInstance(Lifecycle.PER_CLASS)
@ExtendWith(
        PubsubResourceExtension::class,
        BigQueryResourceExtension::class,
        DataflowResourcesExtension::class
)
class E2eTestsDemo {

    var pubsubTopic: String? = null
    var pubsubSubscription: String? = null
    val pubsubOtherTopic: String? = null
    val pubsubOtherSubscription: String? = null
    var bigQueryOutputTable: String? = null
    var bigQueryOutput2Table: String? = null

    @Test
    fun testPipeline() {
        // use any of the above fields here
    }
}

We declared some private fields here pubsubTopic, pubsubSubscription, bigQueryOutputTable, bigQueryOutput2Table. The values of these variables will be injected by our custom extension classes PubsubResourceExtension, BigQueryResourceExtension and DataflowResourcesExtension using reflection before the test runs.

The tests are looking a lot cleaner now, because all the code required to create the resources is hidden in our specialized extension classes.

Next, let’s go into the details of how the extensions work.

For PubsubResourceExtension we will implement BeforeAllCallback and AfterAllCallback interfaces.

    override fun beforeAll(context: ExtensionContext) {
        logger.info("Setup required Pub/Sub resources...")
        // Get the test instance
        val testInstance = context.getTestInstance().get()
        val className = Utils.toDashCase(testInstance::class.simpleName!!)
        val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")

        val pubsubTopicFields =
                testInstance::class.memberProperties.filter { it.name.matches(searchRegexTopic) }

        val pubsubTopics = mutableListOf<String?>()
        context.getStore(STORE).put(PUBSUB_TOPIC, pubsubTopics)

        val pubsubSubscriptions = mutableListOf<String?>()
        context.getStore(STORE).put(PUBSUB_SUBSCRIPTION, pubsubSubscriptions)

        pubsubTopicFields.forEach {
            if (it is KMutableProperty<*>) {
                // find the pubsub topic name
                var topicName = className
                val groupName = searchRegexTopic.find(it.name)!!.groupValues[1]
                if (groupName != "") {
                    topicName += "-" + groupName.lowercase()
                }

                val pubsubTopic = createPubsubTopic(projectId, topicName)
                if (pubsubTopic == null) {
                    fail("Cannot create Pub/Sub topic for project ${projectId}")
                }
                pubsubTopics.add(pubsubTopic)
                it.setter.call(testInstance, pubsubTopic)

                // Setup a Pub/Sub subscription
                val pubsubSubscriptionField =
                        testInstance::class.memberProperties.find {
                            it.name == "pubsub${groupName}Subscription"
                        }
                if (pubsubSubscriptionField != null &&
                                pubsubSubscriptionField is KMutableProperty<*>
                ) {
                    val pubsubSubscription = createPubsubSubscription(projectId, pubsubTopic!!)
                    if (pubsubSubscription == null) {
                        fail(
                                "Cannot create Pub/Sub subscription for topic ${projectId}/${pubsubTopic}"
                        )
                    }
                    pubsubSubscriptions.add(pubsubSubscription)
                    pubsubSubscriptionField.setter.call(testInstance, pubsubSubscription)
                }
            }
        }
    }

This method searches in the test class instance all the fields that match the pattern pubsub([a-zA-Z0-9]*)Topic. If found, it will create a new topics with a random names and it will inject the value into this field. It will also save the names of the topics created into the context store so it can be retrieved in the afterAll method in order to delete them. For the pubsubSubscription fields it will search the corresponding topic to attach to.

Next, the afterAll method is called after all the tests have been executed, and here we delete all the topics created and all the subscriptions. There is also an option to keep the created resources, if you setup the environment variable NO_CLEANUP_PUBSUB=1 (just in case you need to debug the test).

    override fun afterAll(context: ExtensionContext) {
        logger.info("Cleaning up the Pub/Sub resources...")
        val testInstance = context.getTestInstance().get()
        val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")

        val noCleanOptionPubSub = System.getenv("NO_CLEANUP_PUBSUB")
        if (noCleanOptionPubSub == null || noCleanOptionPubSub != "1") {
            val pubsubSubscriptions =
                    context.getStore(STORE).get(PUBSUB_SUBSCRIPTION) as List<String>
            pubsubSubscriptions.forEach { deletePubsubSubscription(projectId, it) }

            val pubsubTopics = context.getStore(STORE).get(PUBSUB_TOPIC) as List<String>
            pubsubTopics.forEach { deletePubsubTopic(projectId, it) }
        } else {
            logger.info("!!!NO_CLEANUP_PUBSUB requested. The resources must be deleted manually")
        }
    }

The same principle is used for the class BigQueryResourceExtension, but with some added features. This class can handle multiple BigQuery tables just by adding multiple fields with this pattern bigQuery{name}Table. The schema for the tables will be searched in the resource folder with this pattern {name}Schema.json, if the schema does not exist it will only generate a table name and it will try to delete this table at the end. This is useful to test if the pipeline is able to create a table. You can find the implementation of the BigQueryResourceExtension class in this repository github.com/gabihodoroaga/dataflow-e2e-demo.

The DataflowResourcesExtension will run/create the Dataflow job.

  override fun beforeAll(context: ExtensionContext) {
    logger.info("Setup required Dataflow resources...")

    val testInstance = context.getTestInstance().get()
    val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")
    val region = Utils.readInstanceProperty<String>(testInstance, "region")
    val className = Utils.toDashCase(testInstance::class.simpleName!!)

    // run the job
    val jobName = "$className-${SimpleDateFormat("yyMMddHHmmss").format(Date())}"

    val runDataFlowMethod = testInstance::class.memberFunctions.find { it.name == "runDataflowJob" }
    if (runDataFlowMethod == null) {
      logger.info(
          "function 'runDataflowJob(jobName: String)') not found. Skipping job run. You must define and use a function named 'runDataflowJob(jobName: String)') for streaming job setup and cleaning up. Ignore for batch jobs."
      )
      return
    }
    // save the job here even if is possible that the job to have an error on create
    context.getStore(STORE).put(DATAFLOW_JOB, jobName)
    logger.info("Setup Dataflow job with name {} at {}/{}", jobName, projectId, region)

    // run the job
    runDataFlowMethod.call(testInstance, jobName)
  }

and it will cancel the job at the end of the tests.

  override fun afterAll(context: ExtensionContext) {
    logger.info("Cleaning up the Dataflow resources...")
    val testInstance = context.getTestInstance().get()
    val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")
    val region = Utils.readInstanceProperty<String>(testInstance, "region")

    deleteDataFlowJob(projectId, region, context.getStore(STORE).get(DATAFLOW_JOB) as String?)
  }

With all these classes in place, the only thing remaining is to create the actual tests.

There are more features in this repository github.com/gabihodoroaga/dataflow-e2e-demo that I was not able to cover in this post. Check it out.

Conclusion

Unit tests are not covering all the situations that you might face in production: API changes and depreciations, SDK changes, authentication and authorization of services and so on. Having end-to-end tests for your pipeline to make sure you don’t have any surprises in production is a requirement. The worst devops nightmare is to have a broken streaming pipeline that needs to be fixed in 2 hours.