Streaming events from Pub/Sub to BigQuery using Cloud Run

If need to stream events from Pub/Sub to BigQuery and the BigQuery subscriptions solution is not enough because you need to do some transformations and you find that DataFlow too complex to deploy and manage, then a custom solution might be the only option. This post is a step by step tutorial about how to deploy a service on CloudRun in order to stream events from Pub/Sub to BigQuery using the new BigQuery Storage Write API.

TL;DR

You can find the full source code and the documentation about how to run and deploy at github.com/gabihodoroaga/pubsub-to-bigquery.

The solution

The overall solution architecture has 2 services or components:

  • the event handler - the service that is responsible for reading and parsing the pubsub messages
  • the sink - the service responsible for pushing the parsed message to the destination

Notice: The source code presented in this post is not complete and is not runnable if you just copy/paste it. For the full solution check out this repository github.com/gabihodoroaga/pubsub-to-bigquery.

Let’s start by defining our models.


// EventHandler is the abstraction the events processor
type EventHandler interface {
	Start(ctx context.Context) error
	Stats(ctx context.Context) (HandlerStats, error)
}

// HandlerStats is the model for reporting the event processing statistics
type HandlerStats struct {
	Received int64 `json:"received"`
	Success  int64 `json:"success"`
	Errors   int64 `json:"error"`
}

// EventSink is the abstraction of the event destination
type EventSink interface {
	Save(ctx context.Context, events []*Event) error
}

Now lets implement the EventHandler.

// EventHandlerPubsub implements the EventHandler using Pub/Sub
type EventHandlerPubsub struct {
	host         string
	project      string
	subscription string
	sink         model.EventSink
	stats        *model.HandlerStats
}

and the Start() function implementation.

// Start implements model.EventHandler and start listening for events
func (c *EventHandlerPubsub) Start(ctx context.Context) error {

	var client *pubsub.Client
	var err error
    client, err = pubsub.NewClient(ctx, c.project)
    if err != nil {
        return errors.Wrap(err, "failed to create PubSub client")
    }
	sub := client.Subscription(c.subscription)
	go func() {
		for {
			zap.L().Sugar().Infof("begin receive messages from subscription %s.", sub.String())
			err = sub.Receive(ctx, func(msgCtx context.Context, msg *pubsub.Message) {

				atomic.AddInt64(&c.stats.Received, 1)
				requestID := uniuri.NewLen(10)
				newCtx := context.WithValue(msgCtx, model.ContextKey("request_id"), requestID)
				newCtx, span := tracer.Start(newCtx, "pubsub/receive")
				defer span.End()

				err := c.handleMessage(newCtx, msg.Data)
				if err != nil {
					msg.Nack()
					atomic.AddInt64(&c.stats.Errors, 1)
				} else {
					msg.Ack()
					atomic.AddInt64(&c.stats.Success, 1)
				}
			})
			zap.L().Sugar().Infof("pubsub receive exit for subscription %s", sub.String())
			if err != nil {
				zap.L().Error(fmt.Sprintf("pubsub receive error for subscription %s, receive will be retried in 2 seconds", sub.String()), zap.Error(err))
				time.Sleep(time.Second * 2)
				continue
			}
			// if no error is received then the context has been canceled and we just exist
			zap.L().Sugar().Infof("receive done on subscription %s. No messages will be processed.", sub.String())
			client.Close()
			return
		}
	}()
	return nil
}

The above code creates the client and starts goroutine that listen for messages. There is also a simple retry logic just in case the connection to the Pub/Sub server gets interrupted.

Next let’s define the handle message function.

func (c *EventHandlerPubsub) handleMessage(ctx context.Context, data []byte) error {
	logger := zap.L().With(zap.Any("request_id", ctx.Value(model.ContextKey("request_id"))))
	logger.Debug("handleMessage: begin request")

	ctx, span := tracer.Start(ctx, "pubsub/handleMessage")
	defer span.End()

	message := &model.Event{}
	if err := json.Unmarshal(data, message); err != nil {
		return err
	}
	// TODO: Do some processing here

	// Bundle the messages together
	events := []*model.Event{message}

	// send the message to the sing
	err := c.sink.Save(ctx, events)
	if err != nil {
		return errors.Wrap(err, "failed to send message to sink")
	}

	return nil
}

This function converts the bytes into a json message and then call the service c.sink.Save()

Next let’s add the implementation for our sink service, the one responsible for pushing the message to the BigQuery table.

type EventSinkBigQuery struct {
	managedStream *managedwriter.ManagedStream
}

The managedStream need to be created before we can send data to BigQuery.

func NewEventSinkBigQuery(ctx context.Context, project, dataset, table string) (model.EventSink, error) {

	client, err := managedwriter.NewClient(ctx, project)
	if err != nil {
		return nil, errors.Wrap(err, "failed to create managedwriter client")
	}

	m := &model.Event{}
	descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

	tableName := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table)
	managedStream, err := client.NewManagedStream(ctx,
		managedwriter.WithDestinationTable(tableName),
		managedwriter.WithType(managedwriter.DefaultStream),
		managedwriter.WithSchemaDescriptor(descriptorProto))
	if err != nil {
		return nil, errors.Wrap(err, "failed to create managedwriter stream")
	}

	return &EventSinkBigQuery{
		managedStream: managedStream,
	}, nil
}

The above code creates a new managed client and a new managed stream. In order to send data to BigQuery we need to provide the schema of the messages that we will send. The easiest way is to create a proto file, generate the proto code then use the google.golang.org/protobuf/reflect/protodesc package to generate schema descriptor.

Once the managed stream is created let’s add the Save() function implementation.

func (s *EventSinkBigQuery) Save(ctx context.Context, events []*model.Event) error {
	logger := zap.L().With(zap.Any("request_id", ctx.Value(model.ContextKey("request_id"))))
	logger.Debug("EventSinkBigQuery.save: begin request")

	ctx, span := tracer.Start(ctx, "bigquery/save")
	defer span.End()

	// Encode the messages into binary format.
	encoded := make([][]byte, len(events))
	for k, v := range events {
		b, err := proto.Marshal(v)
		if err != nil {
			panic(err)
		}
		encoded[k] = b
	}
	// Send the rows to the service
	result, err := s.managedStream.AppendRows(ctx, encoded)
	if err != nil {
		return errors.Wrap(err, "failed to append rows")
	}
	// Block until the write is complete and return the result.
	_, err = result.GetResult(ctx)
	if err != nil {
		return errors.Wrap(err, "failed to save rows")
	}

	return nil
}

Nothing special here, we just encode the messages into the binary format and use the AppendRows() function to send them to BigQuery and wait for the result.

Next let’s glue the 2 services together

// setup the required services
sink, err := service.NewEventSinkBigQuery(ctx, cfg.BigQueryProject, cfg.BigQueryDataset, cfg.BigQueryTable)
if err != nil {
    fmt.Printf("failed to create EventSinkBigQuery: %v", err)
    os.Exit(1)
    return
}

handler, err := service.NewEventHandlerPubsub("", cfg.PubsubProject, cfg.PubSubSubscription, sink)
if err != nil {
    fmt.Printf("failed to create EventHandlerPubsub: %v", err)
    os.Exit(1)
    return
}

err = handler.Start(ctx)
if err != nil {
    fmt.Printf("failed to start EventHandlerPubsub: %v", err)
    os.Exit(1)
    return
}

The final step is to create a very simple gin server in order to keep the server running forever. Later you can use the server to retrieve some useful statistics and status.

r := gin.New()
r.GET("/", func(c *gin.Context) {
    c.Status(http.StatusOK)
})
r.Run()

Done. If you want to see it in action the best will be to follow the next steps and deploy the service to Cloud Run.

Deploy to Cloud Run

Clone the repository

git clone https://github.com/gabihodoroaga/pubsub-to-bigquery.git

Setup your variables

PROJECT_ID=test-244421
REGION=us-central1

Make sure you have the necessary credentials in order to run the gcloud commands

gcloud auth login

Create PubSub topic and subscription

TOPIC_ID=demo-events

gcloud pubsub topics create $TOPIC_ID \
    --project $PROJECT_ID

gcloud pubsub subscriptions create $TOPIC_ID-sub \
    --topic=$TOPIC_ID \
    --project $PROJECT_ID

Create BigQuery table from schema

# Create the dataset
DATASET_ID=demo_bq_dataset
bq --location=us-central1 mk \
    --dataset \
     $PROJECT_ID:$DATASET_ID

bq mk \
    --table \
    $PROJECT_ID:$DATASET_ID.demo_events \
    model/bq_schema.json

Deploy on GCP

gcloud builds submit --config=cloudbuild-staging.yaml \
    --substitutions=SHORT_SHA=local,_SERVICE_NAME=pubsub-to-bq,_PROJECT=$PROJECT_ID,_REGION=$REGION,_GIN_MODE=release,_LOG_LEVEL=info,_TRACE_SAMPLE=1,_PUBSUB_PROJECT=$PROJECT_ID,_PUBSUB_TOPIC=$TOPIC_ID,_PUBSUB_SUBSCRIPTION=$TOPIC_ID-sub,_BIGQUERY_PROJECT=$PROJECT_ID,_BIGQUERY_DATASET=$DATASET_ID,_BIGQUERY_TABLE=demo_events \
    .

After the service has been deploy publish some test messages.

PUBSUB_PROJECT=$PROJECT_ID \
PUBSUB_TOPIC=$TOPIC_ID \
python3 testdata/push_to_pubsub.py

Check your the BigQuery table to see the events.

SELECT * 
FROM `test-244421.demo_bq_dataset.demo_events` 

Done.

Cleaning up

In order to remove all the resources created and avoid unnecessary charges, run the following commands:

# delete the CloudRun service
gcloud -q run services delete pubsub-to-bq --project=$PROJECT_ID --region=$REGION
# delete the BigQuery table
bq rm -f -t $PROJECT_ID:$DATASET_ID.demo_events
# delete the BigQuery dataset
bq rm -f -d $PROJECT_ID:$DATASET_ID
# delete the subscription
gcloud -q pubsub subscriptions delete $TOPIC_ID-sub --project=$PROJECT_ID
# delete the pubsub topic
gcloud pubsub topics delete $TOPIC_ID --project=$PROJECT_ID

Conclusion

I, most of the time, choose to write from scratch a solution that fits perfectly to my needs than to use an already made solution with some compromises. I hope someone else will find this project useful and if yes then add a star to the github project.