Dataflow. From BigQuery to Postgres

Dataflow is the unified stream and batch data processing from Google Cloud Platform that’s serverless, fast, and cost-effective. Is based on the open source project Apache Beam and has 3 supported SDKs: java, python and golang. The golang support is still experimental the databaseio currently does not support for PostgreSQL. Luckily the implementation for writing to Postgres is quite simple.


You can find the full example of how you can transfer data from BigQuery to PostgreSQL on GitHub.

The solution

First, let’s create the main entry point:

var (
	pgconn = flag.String("pgconn", "", "postgres connection string (required).")

func main() {


	if *pgconn == "" {
		fmt.Println("pgconn is required")

	p := beam.NewPipeline()
	s := p.Root()

	query := "SELECT timestamp, userid FROM `demo_bq_dataset.users_log`"

	project := gcpopts.GetProject(context.Background())

	rows := bigqueryio.Query(s, project, query, reflect.TypeOf(bqRecord{}), bigqueryio.UseStandardSQL())

	beam.ParDo0(s, &writePostgresFn{Dsn: *pgconn}, rows)

	if err := beamx.Run(context.Background(), p); err != nil {
		fmt.Printf("Failed to execute job: %v", err)

The only required input parameter here is the connection to the postgres database.

The BigQuery records are stored using this type:

type bqRecord struct {
	Timestamp time.Time `json:"timestamp"`
	UserID      int       `json:"userid"`

Next lets create a struct the implement the beam.ParDo0 interface.

type writePostgresFn struct {
	Dsn   string `json:"dsn"`
	db    *sql.DB
	stmt  *sql.Stmt
	tx    *sql.Tx
	Table string `json:"table"`

There are 4 lifecycle event for beam.ParDo: Setup, StartBundle, ProcessElement, FinishBundle, and Teardown.

In the Setup a database connection is created

func (f *writePostgresFn) Setup(ctx context.Context) error {
	// crate the db connection
	db, err := sql.Open("postgres", f.Dsn)
	if err != nil {
		return errors.Wrap(err, "failed to open database")
	f.db = db

	return nil

In the StartBundle a statement is prepared within a new sql transaction

func (f *writePostgresFn) StartBundle(ctx context.Context) error {
	tx, err := f.db.Begin()
	if err != nil {
		return errors.Wrap(err, "failed to open transaction")

	stmt, err := tx.PrepareContext(ctx, "INSERT INTO users_log(timestamp,userid) VALUES($1,$2)")
	if err != nil {
		return errors.Wrap(err, "failed to prepare statement")
	f.tx = tx
	f.stmt = stmt
	return nil

In the ProcessElement the statement prepared in previous step is executed

func (f *writePostgresFn) ProcessElement(ctx context.Context, elem bqRecord) error {
	log.Infof(ctx, "elem %v", elem)
	_, err := f.stmt.Exec(elem.Timestamp, elem.UserID)
	if err != nil {
		return errors.Wrapf(err, "failed to execute statement for element %v", elem)
	return nil

In the FinishBundle the statement is closed and the transaction is committed

func (f *writePostgresFn) FinishBundle(ctx context.Context) error {
	if err := f.stmt.Close(); err != nil {
		return errors.Wrap(err, "failed to close statement")

	if err := f.tx.Commit(); err != nil {
		return errors.Wrap(err, "failed to commit transaction")

	log.Infof(ctx, "from FinishBundle %v", true)
	return nil

And finally, in the Teardown the connection is closed

func (f *writePostgresFn) Teardown(ctx context.Context) error {
	return nil

Build the project

env CGO_ENABLED=0 go build -o bqtopostgres

Then run it locally

export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"

./bqtopostgres  \
    --project $PROJECT_ID \
    --region=$REGION \

or using Dataflow

export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"

JOB_NAME=bqdemo-test-`date -u +"%Y%m%d-%H%M%S"`

env CGO_ENABLED=0 ./bqtopostgres  \
            --runner dataflow \
            --execute_async \
            --job_name $JOB_NAME \
            --project $PROJECT_ID \
            --region=$REGION \
            --worker_machine_type=e2-medium \
            --max_num_workers=1 \
            --temp_location gs://$BUCKET_NAME/temp/bqdemo \
            --staging_location gs://$BUCKET_NAME/staging/bqdemo \
            --worker_harness_container_image=apache/beam_go_sdk:latest \

Using golang for Apache Beam looks promising and I hope will not stay as experimental for long.