Streaming events from Pub/Sub to BigQuery using Cloud Run
If need to stream events from Pub/Sub to BigQuery and the BigQuery subscriptions solution is not enough because you need to do some transformations and you find that DataFlow too complex to deploy and manage, then a custom solution might be the only option. This post is a step by step tutorial about how to deploy a service on CloudRun in order to stream events from Pub/Sub to BigQuery using the new BigQuery Storage Write API.
TL;DR
You can find the full source code and the documentation about how to run and deploy at github.com/gabihodoroaga/pubsub-to-bigquery.
The solution
The overall solution architecture has 2 services or components:
- the event handler - the service that is responsible for reading and parsing the pubsub messages
- the sink - the service responsible for pushing the parsed message to the destination
Notice: The source code presented in this post is not complete and is not runnable if you just copy/paste it. For the full solution check out this repository github.com/gabihodoroaga/pubsub-to-bigquery.
Let’s start by defining our models.
// EventHandler is the abstraction the events processor
type EventHandler interface {
Start(ctx context.Context) error
Stats(ctx context.Context) (HandlerStats, error)
}
// HandlerStats is the model for reporting the event processing statistics
type HandlerStats struct {
Received int64 `json:"received"`
Success int64 `json:"success"`
Errors int64 `json:"error"`
}
// EventSink is the abstraction of the event destination
type EventSink interface {
Save(ctx context.Context, events []*Event) error
}
Now lets implement the EventHandler
.
// EventHandlerPubsub implements the EventHandler using Pub/Sub
type EventHandlerPubsub struct {
host string
project string
subscription string
sink model.EventSink
stats *model.HandlerStats
}
and the Start()
function implementation.
// Start implements model.EventHandler and start listening for events
func (c *EventHandlerPubsub) Start(ctx context.Context) error {
var client *pubsub.Client
var err error
client, err = pubsub.NewClient(ctx, c.project)
if err != nil {
return errors.Wrap(err, "failed to create PubSub client")
}
sub := client.Subscription(c.subscription)
go func() {
for {
zap.L().Sugar().Infof("begin receive messages from subscription %s.", sub.String())
err = sub.Receive(ctx, func(msgCtx context.Context, msg *pubsub.Message) {
atomic.AddInt64(&c.stats.Received, 1)
requestID := uniuri.NewLen(10)
newCtx := context.WithValue(msgCtx, model.ContextKey("request_id"), requestID)
newCtx, span := tracer.Start(newCtx, "pubsub/receive")
defer span.End()
err := c.handleMessage(newCtx, msg.Data)
if err != nil {
msg.Nack()
atomic.AddInt64(&c.stats.Errors, 1)
} else {
msg.Ack()
atomic.AddInt64(&c.stats.Success, 1)
}
})
zap.L().Sugar().Infof("pubsub receive exit for subscription %s", sub.String())
if err != nil {
zap.L().Error(fmt.Sprintf("pubsub receive error for subscription %s, receive will be retried in 2 seconds", sub.String()), zap.Error(err))
time.Sleep(time.Second * 2)
continue
}
// if no error is received then the context has been canceled and we just exist
zap.L().Sugar().Infof("receive done on subscription %s. No messages will be processed.", sub.String())
client.Close()
return
}
}()
return nil
}
The above code creates the client and starts goroutine that listen for messages. There is also a simple retry logic just in case the connection to the Pub/Sub server gets interrupted.
Next let’s define the handle message function.
func (c *EventHandlerPubsub) handleMessage(ctx context.Context, data []byte) error {
logger := zap.L().With(zap.Any("request_id", ctx.Value(model.ContextKey("request_id"))))
logger.Debug("handleMessage: begin request")
ctx, span := tracer.Start(ctx, "pubsub/handleMessage")
defer span.End()
message := &model.Event{}
if err := json.Unmarshal(data, message); err != nil {
return err
}
// TODO: Do some processing here
// Bundle the messages together
events := []*model.Event{message}
// send the message to the sing
err := c.sink.Save(ctx, events)
if err != nil {
return errors.Wrap(err, "failed to send message to sink")
}
return nil
}
This function converts the bytes into a json message and then call the service c.sink.Save()
Next let’s add the implementation for our sink service, the one responsible for pushing the message to the BigQuery table.
type EventSinkBigQuery struct {
managedStream *managedwriter.ManagedStream
}
The managedStream
need to be created before we can send data to BigQuery.
func NewEventSinkBigQuery(ctx context.Context, project, dataset, table string) (model.EventSink, error) {
client, err := managedwriter.NewClient(ctx, project)
if err != nil {
return nil, errors.Wrap(err, "failed to create managedwriter client")
}
m := &model.Event{}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())
tableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table)
managedStream, err := client.NewManagedStream(ctx,
managedwriter.WithDestinationTable(tableName),
managedwriter.WithType(managedwriter.DefaultStream),
managedwriter.WithSchemaDescriptor(descriptorProto))
if err != nil {
return nil, errors.Wrap(err, "failed to create managedwriter stream")
}
return &EventSinkBigQuery{
managedStream: managedStream,
}, nil
}
The above code creates a new managed client and a new managed stream. In order to send
data to BigQuery we need to provide the schema of the messages that we will send.
The easiest way is to create a proto file, generate the proto code then use the google.golang.org/protobuf/reflect/protodesc
package to generate schema descriptor.
Once the managed stream is created let’s add the Save()
function implementation.
func (s *EventSinkBigQuery) Save(ctx context.Context, events []*model.Event) error {
logger := zap.L().With(zap.Any("request_id", ctx.Value(model.ContextKey("request_id"))))
logger.Debug("EventSinkBigQuery.save: begin request")
ctx, span := tracer.Start(ctx, "bigquery/save")
defer span.End()
// Encode the messages into binary format.
encoded := make([][]byte, len(events))
for k, v := range events {
b, err := proto.Marshal(v)
if err != nil {
panic(err)
}
encoded[k] = b
}
// Send the rows to the service
result, err := s.managedStream.AppendRows(ctx, encoded)
if err != nil {
return errors.Wrap(err, "failed to append rows")
}
// Block until the write is complete and return the result.
_, err = result.GetResult(ctx)
if err != nil {
return errors.Wrap(err, "failed to save rows")
}
return nil
}
Nothing special here, we just encode the messages into the binary format and use the AppendRows()
function to send them to BigQuery and wait for the result.
Next let’s glue the 2 services together
// setup the required services
sink, err := service.NewEventSinkBigQuery(ctx, cfg.BigQueryProject, cfg.BigQueryDataset, cfg.BigQueryTable)
if err != nil {
fmt.Printf("failed to create EventSinkBigQuery: %v", err)
os.Exit(1)
return
}
handler, err := service.NewEventHandlerPubsub("", cfg.PubsubProject, cfg.PubSubSubscription, sink)
if err != nil {
fmt.Printf("failed to create EventHandlerPubsub: %v", err)
os.Exit(1)
return
}
err = handler.Start(ctx)
if err != nil {
fmt.Printf("failed to start EventHandlerPubsub: %v", err)
os.Exit(1)
return
}
The final step is to create a very simple gin server in order to keep the server running forever. Later you can use the server to retrieve some useful statistics and status.
r := gin.New()
r.GET("/", func(c *gin.Context) {
c.Status(http.StatusOK)
})
r.Run()
Done. If you want to see it in action the best will be to follow the next steps and deploy the service to Cloud Run.
Deploy to Cloud Run
Clone the repository
git clone https://github.com/gabihodoroaga/pubsub-to-bigquery.git
Setup your variables
PROJECT_ID=test-244421
REGION=us-central1
Make sure you have the necessary credentials in order to run the gcloud commands
gcloud auth login
Create PubSub topic and subscription
TOPIC_ID=demo-events
gcloud pubsub topics create $TOPIC_ID \
--project $PROJECT_ID
gcloud pubsub subscriptions create $TOPIC_ID-sub \
--topic=$TOPIC_ID \
--project $PROJECT_ID
Create BigQuery table from schema
# Create the dataset
DATASET_ID=demo_bq_dataset
bq --location=us-central1 mk \
--dataset \
$PROJECT_ID:$DATASET_ID
bq mk \
--table \
$PROJECT_ID:$DATASET_ID.demo_events \
model/bq_schema.json
Deploy on GCP
gcloud builds submit --config=cloudbuild-staging.yaml \
--substitutions=SHORT_SHA=local,_SERVICE_NAME=pubsub-to-bq,_PROJECT=$PROJECT_ID,_REGION=$REGION,_GIN_MODE=release,_LOG_LEVEL=info,_TRACE_SAMPLE=1,_PUBSUB_PROJECT=$PROJECT_ID,_PUBSUB_TOPIC=$TOPIC_ID,_PUBSUB_SUBSCRIPTION=$TOPIC_ID-sub,_BIGQUERY_PROJECT=$PROJECT_ID,_BIGQUERY_DATASET=$DATASET_ID,_BIGQUERY_TABLE=demo_events \
.
After the service has been deploy publish some test messages.
PUBSUB_PROJECT=$PROJECT_ID \
PUBSUB_TOPIC=$TOPIC_ID \
python3 testdata/push_to_pubsub.py
Check your the BigQuery table to see the events.
SELECT *
FROM `test-244421.demo_bq_dataset.demo_events`
Done.
Cleaning up
In order to remove all the resources created and avoid unnecessary charges, run the following commands:
# delete the CloudRun service
gcloud -q run services delete pubsub-to-bq --project=$PROJECT_ID --region=$REGION
# delete the BigQuery table
bq rm -f -t $PROJECT_ID:$DATASET_ID.demo_events
# delete the BigQuery dataset
bq rm -f -d $PROJECT_ID:$DATASET_ID
# delete the subscription
gcloud -q pubsub subscriptions delete $TOPIC_ID-sub --project=$PROJECT_ID
# delete the pubsub topic
gcloud pubsub topics delete $TOPIC_ID --project=$PROJECT_ID
Conclusion
I, most of the time, choose to write from scratch a solution that fits perfectly to my needs than to use an already made solution with some compromises. I hope someone else will find this project useful and if yes then add a star to the github project.