Dataflow. From BigQuery to Postgres

Dataflow is the unified stream and batch data processing from Google Cloud Platform that’s serverless, fast, and cost-effective. Is based on the open source project Apache Beam and has 3 supported SDKs: java, python and golang. The golang support is still experimental the databaseio currently does not support for PostgreSQL. Luckily the implementation for writing to Postgres is quite simple.

TL;DR

You can find the full example of how you can transfer data from BigQuery to PostgreSQL on GitHub.

The solution

First, let’s create the main entry point:

var (
	pgconn = flag.String("pgconn", "", "postgres connection string (required).")
)

func main() {

	flag.Parse()
	beam.Init()

	if *pgconn == "" {
		fmt.Println("pgconn is required")
		os.Exit(1)
	}

	p := beam.NewPipeline()
	s := p.Root()

	query := "SELECT timestamp, userid FROM `demo_bq_dataset.users_log`"

	project := gcpopts.GetProject(context.Background())

	rows := bigqueryio.Query(s, project, query, reflect.TypeOf(bqRecord{}), bigqueryio.UseStandardSQL())

	beam.ParDo0(s, &writePostgresFn{Dsn: *pgconn}, rows)

	if err := beamx.Run(context.Background(), p); err != nil {
		fmt.Printf("Failed to execute job: %v", err)
		os.Exit(1)
	}
}

The only required input parameter here is the connection to the postgres database.

The BigQuery records are stored using this type:

type bqRecord struct {
	Timestamp time.Time `json:"timestamp"`
	UserID      int       `json:"userid"`
}

Next lets create a struct the implement the beam.ParDo0 interface.

type writePostgresFn struct {
	Dsn   string `json:"dsn"`
	db    *sql.DB
	stmt  *sql.Stmt
	tx    *sql.Tx
	Table string `json:"table"`
}

There are 4 lifecycle event for beam.ParDo: Setup, StartBundle, ProcessElement, FinishBundle, and Teardown.

In the Setup a database connection is created

func (f *writePostgresFn) Setup(ctx context.Context) error {
	// crate the db connection
	db, err := sql.Open("postgres", f.Dsn)
	if err != nil {
		return errors.Wrap(err, "failed to open database")
	}
	f.db = db

	return nil
}

In the StartBundle a statement is prepared within a new sql transaction

func (f *writePostgresFn) StartBundle(ctx context.Context) error {
	tx, err := f.db.Begin()
	if err != nil {
		return errors.Wrap(err, "failed to open transaction")
	}

	stmt, err := tx.PrepareContext(ctx, "INSERT INTO users_log(timestamp,userid) VALUES($1,$2)")
	if err != nil {
		return errors.Wrap(err, "failed to prepare statement")
	}
	f.tx = tx
	f.stmt = stmt
	return nil
}

In the ProcessElement the statement prepared in previous step is executed

func (f *writePostgresFn) ProcessElement(ctx context.Context, elem bqRecord) error {
	log.Infof(ctx, "elem %v", elem)
	_, err := f.stmt.Exec(elem.Timestamp, elem.UserID)
	if err != nil {
		return errors.Wrapf(err, "failed to execute statement for element %v", elem)
	}
	return nil
}

In the FinishBundle the statement is closed and the transaction is committed

func (f *writePostgresFn) FinishBundle(ctx context.Context) error {
	if err := f.stmt.Close(); err != nil {
		return errors.Wrap(err, "failed to close statement")
	}

	if err := f.tx.Commit(); err != nil {
		return errors.Wrap(err, "failed to commit transaction")
	}

	log.Infof(ctx, "from FinishBundle %v", true)
	return nil
}

And finally, in the Teardown the connection is closed

func (f *writePostgresFn) Teardown(ctx context.Context) error {
	f.db.Close()
	return nil
}

Build the project

env CGO_ENABLED=0 go build -o bqtopostgres

Then run it locally

export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_PORT="${POSTGRES_PORT:-5432}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"

./bqtopostgres  \
    --project $PROJECT_ID \
    --region=$REGION \
    --pgconn="postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST:$POSTGRES_PORT/bqdemo?sslmode=disable"

or using Dataflow

export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_PORT="${POSTGRES_PORT:-5432}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"

JOB_NAME=bqdemo-test-`date -u +"%Y%m%d-%H%M%S"`

env CGO_ENABLED=0 ./bqtopostgres  \
            --runner dataflow \
            --execute_async \
            --job_name $JOB_NAME \
            --project $PROJECT_ID \
            --region=$REGION \
            --worker_machine_type=e2-medium \
            --max_num_workers=1 \
            --temp_location gs://$BUCKET_NAME/temp/bqdemo \
            --staging_location gs://$BUCKET_NAME/staging/bqdemo \
            --worker_harness_container_image=apache/beam_go_sdk:latest \
            --pgconn=postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST:$POSTGRES_PORT/bqdemo?sslmode=disable

You can find the full example on GitHub

Conclusion

Using golang for Apache Beam looks promising and I hope will not stay as experimental for long.