Dataflow - e2e test your pipeline

Dataflow is a managed service from Google Cloud for data processing and any transformation of data. It uses Apache Beam, an open source project, to define the pipeline, which is a list of data transformations and aggregations from an input to a final output. While is easy to find documentation about how to unit test an Apache Beam pipeline, I was not able to find any about how to do end-to-end testing of a DataFlow pipeline using a real environment, so I decided to create one.


You can find the source code for this solution in this repository

The challenge

In order to run e2e tests on a Dataflow pipeline you need use real resources like Pub/Sub topics, BigQuery tables and all these resources need to be created before running the test and deleted at the end of the test including the actual dataflow job (a streaming pipeline needs to be explicitly canceled).

The solution

While JUnit 5 has support for @BeforeAll, @AfterAll, @BeforeEach, or @AfterEach and you can use these annotations to create the resources at the beginning of you test and delete them at the end, but doing this it will make your tests look so ugly as it will require a lot of code to create and delete the resources.

fun init() {
	println("BeforeAll init() method called")

The JUnit 5 has also the concept of “extension”. You can find the full documentation here. Using this method we can hide the complexity or creating and deleting all the required GCP resource.

class E2eTestsDemo {

    var pubsubTopic: String? = null
    var pubsubSubscription: String? = null
    var bigQueryOutputTable: String? = null
    var bigQueryOutput2Table: String? = null

    fun testPipeline() {
        // use any of the above fields here

We declared some private fields here pubsubTopic,pubsubSubscription,bigQueryOutputTable,bigQueryOutput2Table. The values of these variables will be injected by our custom extension classes PubsubResourceExtension, BigQueryResourceExtension and DataflowResourcesExtension using reflection before the test runs.

The tests are looking a lot more cleaner now.

Let’s go into the details of how the extensions work.

For PubsubResourceExtension we will implement BeforeAllCallback and AfterAllCallback interfaces.

  override fun beforeAll(context: ExtensionContext) {"Setup required Pub/Sub resources...")
    // Get the test instance
    val testInstance = context.getTestInstance().get()
    val className = Utils.toDashCase(testInstance::class.simpleName!!)
    val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")
    val pubsubTopic: String?
    val pubsubSubscription: String?

    // Setup a Pub/Sub topic
    val pubsubTopicField = testInstance::class.memberProperties.find { == "pubsubTopic" }
    if (pubsubTopicField != null && pubsubTopicField is KMutableProperty<*>) {
      pubsubTopic = createPubsubTopic(projectId, className)
      if (pubsubTopic == null) {
        fail("Cannot create Pub/Sub topic for project ${projectId}")
      context.getStore(STORE).put(PUBSUB_TOPIC, pubsubTopic), pubsubTopic)

      // Setup a Pub/Sub subscription
      val pubsubSubscriptionField =
          testInstance::class.memberProperties.find { == "pubsubSubscription" }
      if (pubsubSubscriptionField != null && pubsubSubscriptionField is KMutableProperty<*>) {
        pubsubSubscription = createPubsubSubscription(projectId, pubsubTopic!!)
        if (pubsubSubscription == null) {
          fail("Cannot create Pub/Sub subscription for topic ${projectId}/${pubsubTopic}")
        context.getStore(STORE).put(PUBSUB_SUBSCRIPTION, pubsubSubscription), pubsubSubscription)

This method searches in the test class instance a field called pubsubTopic. If found it will create a new topic with a random name an it will inject the value into this field. Also it will save the name of the topic into the context store so it can be retrieved in the afterAll method in order to delete it. Same for the pubsubTopic.

  override fun afterAll(context: ExtensionContext) {"Cleaning up the Pub/Sub resources...")
    val testInstance = context.getTestInstance().get()
    val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")

    val noCleanOptionPubSub = System.getenv("NO_CLEANUP_PUBSUB")
    if (noCleanOptionPubSub == null || noCleanOptionPubSub != "1") {
          context.getStore(STORE).get(PUBSUB_SUBSCRIPTION) as String?
      deletePubsubTopic(projectId, context.getStore(STORE).get(PUBSUB_TOPIC) as String?)
    } else {"!!!NO_CLEANUP_PUBSUB requested. The resources must be deleted manually")

This method is called after all the test has been executed, and here we delete the topic and the subscription created. There is also and option to keep the created resources, just in case you need to debug the tests.

The same principle used for the BigQueryResourceExtension, but with some added features. This class can handle multiple BigQuery tables just by adding multiple field with this pattern bigQuery{name}Table. The schema for the tables will be searched in the resource folder with this pattern {name}Schema.json. If the schema does not exists it will only generate a table name and it will try to delete this table at the end. This is useful to test if the pipeline is able to create the tables if not exists.

The DataflowResourcesExtension will run/create the Dataflow job

  override fun beforeAll(context: ExtensionContext) {"Setup required Dataflow resources...")

    val testInstance = context.getTestInstance().get()
    val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")
    val region = Utils.readInstanceProperty<String>(testInstance, "region")
    val className = Utils.toDashCase(testInstance::class.simpleName!!)

    // run the job
    val jobName = "$className-${SimpleDateFormat("yyMMddHHmmss").format(Date())}"

    val runDataFlowMethod = testInstance::class.memberFunctions.find { == "runDataflowJob" }
    if (runDataFlowMethod == null) {
          "function 'runDataflowJob(jobName: String)') not found. Skipping job run. You must define and use a function named 'runDataflowJob(jobName: String)') for streaming job setup and cleaning up. Ignore for batch jobs."
    // save the job here even if is possible that the job to have an error on create
    context.getStore(STORE).put(DATAFLOW_JOB, jobName)"Setup Dataflow job with name {} at {}/{}", jobName, projectId, region)

    // run the job, jobName)

and it will cancel the job at the end of the tests

  override fun afterAll(context: ExtensionContext) {"Cleaning up the Dataflow resources...")
    val testInstance = context.getTestInstance().get()
    val projectId = Utils.readInstanceProperty<String>(testInstance, "projectId")
    val region = Utils.readInstanceProperty<String>(testInstance, "region")

    deleteDataFlowJob(projectId, region, context.getStore(STORE).get(DATAFLOW_JOB) as String?)

With all these classes in place the only thing remaining is to create the actual tests.

There are more features in this repository that I was not able to cover in this post. Check it out.


Unit tests are not covering all the situations that you might face in production, like api changes and depreciations, sdk changes, and so on and you must have end-to-end tests for your pipeline to make sure you don’t have any surprises in production. The worst devops nightmare is to have a broken streaming pipeline that needs to be fixed in just a few hours.