Dataflow. From BigQuery to Postgres
Dataflow is the unified stream and batch data processing
from Google Cloud Platform that’s serverless, fast, and cost-effective.
Is based on the open source project Apache Beam and has 3 supported SDKs: java,
python and golang. The golang support is still experimental the databaseio currently does not support for
PostgreSQL. Luckily the implementation for writing to Postgres is quite simple.
TL;DR
You can find the full example of how you can transfer data from BigQuery to PostgreSQL on GitHub.
The solution
First, let’s create the main entry point:
var (
	pgconn = flag.String("pgconn", "", "postgres connection string (required).")
)
func main() {
	flag.Parse()
	beam.Init()
	if *pgconn == "" {
		fmt.Println("pgconn is required")
		os.Exit(1)
	}
	p := beam.NewPipeline()
	s := p.Root()
	query := "SELECT timestamp, userid FROM `demo_bq_dataset.users_log`"
	project := gcpopts.GetProject(context.Background())
	rows := bigqueryio.Query(s, project, query, reflect.TypeOf(bqRecord{}), bigqueryio.UseStandardSQL())
	beam.ParDo0(s, &writePostgresFn{Dsn: *pgconn}, rows)
	if err := beamx.Run(context.Background(), p); err != nil {
		fmt.Printf("Failed to execute job: %v", err)
		os.Exit(1)
	}
}
The only required input parameter here is the connection to the postgres database.
The BigQuery records are stored using this type:
type bqRecord struct {
	Timestamp time.Time `json:"timestamp"`
	UserID      int       `json:"userid"`
}
Next lets create a struct the implement the beam.ParDo0 interface.
type writePostgresFn struct {
	Dsn   string `json:"dsn"`
	db    *sql.DB
	stmt  *sql.Stmt
	tx    *sql.Tx
	Table string `json:"table"`
}
There are 4 lifecycle event for beam.ParDo: Setup, StartBundle, ProcessElement, FinishBundle, and Teardown.
In the Setup a database connection is created
func (f *writePostgresFn) Setup(ctx context.Context) error {
	// crate the db connection
	db, err := sql.Open("postgres", f.Dsn)
	if err != nil {
		return errors.Wrap(err, "failed to open database")
	}
	f.db = db
	return nil
}
In the StartBundle a statement is prepared within a new sql transaction
func (f *writePostgresFn) StartBundle(ctx context.Context) error {
	tx, err := f.db.Begin()
	if err != nil {
		return errors.Wrap(err, "failed to open transaction")
	}
	stmt, err := tx.PrepareContext(ctx, "INSERT INTO users_log(timestamp,userid) VALUES($1,$2)")
	if err != nil {
		return errors.Wrap(err, "failed to prepare statement")
	}
	f.tx = tx
	f.stmt = stmt
	return nil
}
In the ProcessElement the statement prepared in previous step is executed
func (f *writePostgresFn) ProcessElement(ctx context.Context, elem bqRecord) error {
	log.Infof(ctx, "elem %v", elem)
	_, err := f.stmt.Exec(elem.Timestamp, elem.UserID)
	if err != nil {
		return errors.Wrapf(err, "failed to execute statement for element %v", elem)
	}
	return nil
}
In the FinishBundle the statement is closed and the transaction is committed
func (f *writePostgresFn) FinishBundle(ctx context.Context) error {
	if err := f.stmt.Close(); err != nil {
		return errors.Wrap(err, "failed to close statement")
	}
	if err := f.tx.Commit(); err != nil {
		return errors.Wrap(err, "failed to commit transaction")
	}
	log.Infof(ctx, "from FinishBundle %v", true)
	return nil
}
And finally, in the Teardown the connection is closed
func (f *writePostgresFn) Teardown(ctx context.Context) error {
	f.db.Close()
	return nil
}
Build the project
env CGO_ENABLED=0 go build -o bqtopostgres
Then run it locally
export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_PORT="${POSTGRES_PORT:-5432}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"
./bqtopostgres  \
    --project $PROJECT_ID \
    --region=$REGION \
    --pgconn="postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST:$POSTGRES_PORT/bqdemo?sslmode=disable"
or using Dataflow
export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_PORT="${POSTGRES_PORT:-5432}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"
JOB_NAME=bqdemo-test-`date -u +"%Y%m%d-%H%M%S"`
env CGO_ENABLED=0 ./bqtopostgres  \
            --runner dataflow \
            --execute_async \
            --job_name $JOB_NAME \
            --project $PROJECT_ID \
            --region=$REGION \
            --worker_machine_type=e2-medium \
            --max_num_workers=1 \
            --temp_location gs://$BUCKET_NAME/temp/bqdemo \
            --staging_location gs://$BUCKET_NAME/staging/bqdemo \
            --worker_harness_container_image=apache/beam_go_sdk:latest \
            --pgconn=postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST:$POSTGRES_PORT/bqdemo?sslmode=disable
You can find the full example on GitHub
Conclusion
Using golang for Apache Beam looks promising and I hope will not stay as experimental for long.