Dataflow. S3 credentials from Secret Manager

Google Dataflow is the unified stream and batch data processing from Google Cloud Platform that’s serverless, fast, and cost-effective. Is based on the open source project Apache Beam. Some of the challenges faced when deploying a pipeline to Dataflow are the access credentials. In particular the FileIO implementation of the AWS S3 which can leak the credentials to the template file.

TL;DR

You can find the full example of how you can provide the aws credentials at runtime on GitHub

The solution

A common way to send the aws credentials to a Dataflow pipeline is by using the --awsCredentialsProvider pipeline option.

export AWS_ACCESS_KEY_ID=DUMMY
export AWS_SECRET_KEY=dummySecretKey123
gradle run --args="--runner=DataflowRunner \
    --project=$PROJECT_ID \
    --region=$REGION \
    --workerMachineType=e2-medium \
    --maxNumWorkers=1 \
    --stagingLocation=gs://$BUCKET_NAME/staging/demo-test \
    --tempLocation=gs://$BUCKET_NAME/temp/demo-test \
    --templateLocation=gs://$BUCKET_NAME/demo-test-template.json \
    --awsRegion=$AWS_REGION \
    --sourceBucket=$AWS_SOURCE_BUCKET \
    --awsCredentialsProvider='{\"@type\":\"AWSStaticCredentialsProvider\",\"awsAccessKeyId\":\"$AWS_ACCESS_KEY_ID\",\"awsSecretKey\":\"$AWS_SECRET_KEY\"}' \
    --startDate=2022-01-02T03:00:00Z \
    --lookBack=6 \
    --outputTable=$BIGQUERY_DATASET.$BIGQUERY_TABLE"

If you pass the credentials like this, as parameter, when you deploy the pipeline, the values will visible in the pipeline template file gs://$BUCKET_NAME/demo-test-template.json in clear text. This is serious security issue because anyone who has access to this bucket can potentially see the credentials. The credential leak is happening even if the credentials are retrieved from Google Secret Manger or other vault solutions.

Note: Providing the parameter when run the job using
gcloud dataflow jobs run --parameters=""
from a template does not work because the option awsCredentialsProvider does not implement the ValueProvider interface.

To overcome this issue and to be able to read the credentials in runtime a custom implementation of the S3ClientBuilderFactory is required to.

First, let’s save the credentials to Secret Manager:

echo -n "AWS_ACCESS_KEY_ID" | gcloud secrets create \
    aws_access_key_id --data-file=-
echo -n "AWS_SECRET_KEY" | gcloud secrets create \
    aws_secret_key --data-file=-

and then set the variables to point to the resource id of the created secrets:

export AWS_ACCESS_KEY_ID=secret:$(gcloud secrets describe \
    aws_access_key_id --format='value(name)')/versions/latest
export AWS_SECRET_KEY=secret:$(gcloud secrets describe \
    aws_secret_key --format='value(name)')/versions/latest

Next, the implementation of the S3ClientBuilderFactory:

class S3ClientFactory : S3ClientBuilderFactory {

    companion object {
        val LOGGER = LoggerFactory.getLogger("s3tobigquery.S3ClientFactory")
    }

    fun getSecretValue(secret: String): String {
        var secretName = secret
        if (!secretName.startsWith("secret:")) {
            return secretName
        } else {
            secretName = secretName.removePrefix("secret:")
        }

        var client: SecretManagerServiceClient? = null
        try {
            client = SecretManagerServiceClient.create()
            val response = client.accessSecretVersion(secretName)
            return response.getPayload().getData().toStringUtf8()
        } catch (e: Exception) {
            LOGGER.error("Unable to read secret $secretName, error ", e)
        } finally {
            if (client != null) {
                client.close()
            }
        }
        return ""
    }

    override fun createBuilder(s3Options: S3Options): AmazonS3ClientBuilder {

        val options = s3Options.`as`(CustomPipelineOptions::class.java)
        val awsKey =
                if (options.awsAccessKeyId.isAccessible)
                        getSecretValue(options.awsAccessKeyId.get())
                else ""
        val awsSecret =
                if (options.awsSecretKey.isAccessible) getSecretValue(options.awsSecretKey.get())
                else ""

        var builder =
                AmazonS3ClientBuilder.standard()
                        .withCredentials(
                                AWSStaticCredentialsProvider(BasicAWSCredentials(awsKey, awsSecret))
                        )

        builder = builder.withRegion(s3Options.getAwsRegion())

        return builder
    }
}

This class will create the S3 client using the credentials retrieved from the Secret Manager.

Next, the pipeline options must be updated to take this class as parameter:

fun main(args: Array<String>) {

    val options =
            PipelineOptionsFactory.fromArgs(*args)
                    .withValidation()
                    .`as`(CustomPipelineOptions::class.java)

    options.setS3ClientFactoryClass(S3ClientFactory::class.java)

    // all other pipeline options
}

after this, the runtime options must be added to able to pass the credentials:

interface CustomPipelineOptions : S3Options {
    @get:Description("The awsAccessKeyId") 
    var awsAccessKeyId: ValueProvider<String>
    @get:Description("The awsSecretKey") 
    var awsSecretKey: ValueProvider<String>

    // other pipeline options bellow
}

Now the pipeline must be deployed using this script:

gradle run --args="--runner=DataflowRunner \
    --project=$PROJECT_ID \
    --region=$REGION \
    --workerMachineType=e2-medium \
    --maxNumWorkers=1 \
    --stagingLocation=gs://$BUCKET_NAME/staging/demo-test \
    --tempLocation=gs://$BUCKET_NAME/temp/demo-test \
    --templateLocation=gs://$BUCKET_NAME/demo-test-template.json \
    --awsAccessKeyId=\"$AWS_ACCESS_KEY_ID\" \
    --awsSecretKey=\"$AWS_SECRET_KEY\" \
    --awsRegion=$AWS_REGION \
    --sourceBucket=$AWS_SOURCE_BUCKET \
    --outputTable=$BIGQUERY_DATASET.$BIGQUERY_TABLE"

JOB_NAME=demo-test-`date -u +"%Y%m%d-%H%M%S"`

gcloud dataflow jobs run ${JOB_NAME} \
    --gcs-location=gs://$BUCKET_NAME/demo-test-template.json \
    --region=$REGION \
    --parameters="startDate=2022-01-02T03:00:00Z,lookBack=6"

With this approach the aws credentials are safe and will be read from secret manager only when the pipeline runs. As a bonus, now is possible to update credentials without to have to regenerate the pipeline template.

You can find the working example on GitHub.

Conclusion

Security is hard especially in a cloud environment and sometimes a security leak is not obvious, like in this case, where the credentials are saved in a template file somewhere in a bucket. But with a little digging into the source code of Apache Beam it was possible to overcome this issue.