Dataflow. From BigQuery to Postgres
Dataflow is the unified stream and batch data processing
from Google Cloud Platform that’s serverless, fast, and cost-effective.
Is based on the open source project Apache Beam and has 3 supported SDKs: java,
python and golang. The golang support is still experimental the databaseio
currently does not support for
PostgreSQL. Luckily the implementation for writing to Postgres is quite simple.
TL;DR
You can find the full example of how you can transfer data from BigQuery to PostgreSQL on GitHub.
The solution
First, let’s create the main entry point:
var (
pgconn = flag.String("pgconn", "", "postgres connection string (required).")
)
func main() {
flag.Parse()
beam.Init()
if *pgconn == "" {
fmt.Println("pgconn is required")
os.Exit(1)
}
p := beam.NewPipeline()
s := p.Root()
query := "SELECT timestamp, userid FROM `demo_bq_dataset.users_log`"
project := gcpopts.GetProject(context.Background())
rows := bigqueryio.Query(s, project, query, reflect.TypeOf(bqRecord{}), bigqueryio.UseStandardSQL())
beam.ParDo0(s, &writePostgresFn{Dsn: *pgconn}, rows)
if err := beamx.Run(context.Background(), p); err != nil {
fmt.Printf("Failed to execute job: %v", err)
os.Exit(1)
}
}
The only required input parameter here is the connection to the postgres database.
The BigQuery records are stored using this type:
type bqRecord struct {
Timestamp time.Time `json:"timestamp"`
UserID int `json:"userid"`
}
Next lets create a struct the implement the beam.ParDo0 interface.
type writePostgresFn struct {
Dsn string `json:"dsn"`
db *sql.DB
stmt *sql.Stmt
tx *sql.Tx
Table string `json:"table"`
}
There are 4 lifecycle event for beam.ParDo: Setup, StartBundle, ProcessElement, FinishBundle, and Teardown.
In the Setup a database connection is created
func (f *writePostgresFn) Setup(ctx context.Context) error {
// crate the db connection
db, err := sql.Open("postgres", f.Dsn)
if err != nil {
return errors.Wrap(err, "failed to open database")
}
f.db = db
return nil
}
In the StartBundle a statement is prepared within a new sql transaction
func (f *writePostgresFn) StartBundle(ctx context.Context) error {
tx, err := f.db.Begin()
if err != nil {
return errors.Wrap(err, "failed to open transaction")
}
stmt, err := tx.PrepareContext(ctx, "INSERT INTO users_log(timestamp,userid) VALUES($1,$2)")
if err != nil {
return errors.Wrap(err, "failed to prepare statement")
}
f.tx = tx
f.stmt = stmt
return nil
}
In the ProcessElement the statement prepared in previous step is executed
func (f *writePostgresFn) ProcessElement(ctx context.Context, elem bqRecord) error {
log.Infof(ctx, "elem %v", elem)
_, err := f.stmt.Exec(elem.Timestamp, elem.UserID)
if err != nil {
return errors.Wrapf(err, "failed to execute statement for element %v", elem)
}
return nil
}
In the FinishBundle the statement is closed and the transaction is committed
func (f *writePostgresFn) FinishBundle(ctx context.Context) error {
if err := f.stmt.Close(); err != nil {
return errors.Wrap(err, "failed to close statement")
}
if err := f.tx.Commit(); err != nil {
return errors.Wrap(err, "failed to commit transaction")
}
log.Infof(ctx, "from FinishBundle %v", true)
return nil
}
And finally, in the Teardown the connection is closed
func (f *writePostgresFn) Teardown(ctx context.Context) error {
f.db.Close()
return nil
}
Build the project
env CGO_ENABLED=0 go build -o bqtopostgres
Then run it locally
export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_PORT="${POSTGRES_PORT:-5432}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"
./bqtopostgres \
--project $PROJECT_ID \
--region=$REGION \
--pgconn="postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST:$POSTGRES_PORT/bqdemo?sslmode=disable"
or using Dataflow
export PROJECT_ID="${PROJECT_ID:-demo-project-123}"
export REGION="${REGION:-us-central1}"
export BUCKET_NAME="${BUCKET_NAME:-demo-project-123-storage}"
export POSTGRES_HOST="${POSTGRES_HOST:-localhost}"
export POSTGRES_PORT="${POSTGRES_PORT:-5432}"
export POSTGRES_USER="${POSTGRES_USER:-postgres}"
export POSTGRES_PASS="${POSTGRES_HOST:-password}"
JOB_NAME=bqdemo-test-`date -u +"%Y%m%d-%H%M%S"`
env CGO_ENABLED=0 ./bqtopostgres \
--runner dataflow \
--execute_async \
--job_name $JOB_NAME \
--project $PROJECT_ID \
--region=$REGION \
--worker_machine_type=e2-medium \
--max_num_workers=1 \
--temp_location gs://$BUCKET_NAME/temp/bqdemo \
--staging_location gs://$BUCKET_NAME/staging/bqdemo \
--worker_harness_container_image=apache/beam_go_sdk:latest \
--pgconn=postgres://$POSTGRES_USER:$POSTGRES_PASS@$POSTGRES_HOST:$POSTGRES_PORT/bqdemo?sslmode=disable
You can find the full example on GitHub
Conclusion
Using golang for Apache Beam looks promising and I hope will not stay as experimental for long.