Leader election implementation in golang

Sometime you need to ensure that at most one process or instance is the one who is able to perform a specific task, but, in the cloud world of today the best practice is to deploy the new version make sure it stays on and then update your config or load balancer to point to the new instance and this is a problem if is critical if the requirements are that at most one instance must be the leader. Many of the serverless solutions don’t guaranties that at most one instance is running (Google Cloud Run does not do that according to the documentation. One solution could be to use a simple VM and to make sure that you run the instance only once, but this will not give you the huge advantages that you get from a manage serverless platform, or you can implement a very simple leader election algorithm using a backend of your choice.

TL;DR

You can find the complete implementation and the documentation at github.com/gabihodoroaga/leaderelection.

The solution

The leader election process begins with the creation of a lock object, where the leader updates the current timestamp at regular intervals as a way of informing other replicas regarding its leadership.

The lock object, which can be a sql table row, or a file in a cloud blob storage or a remote file or anything that guaranties that the writes and reads are atomic. The lock also holds the identity of the current leader.

If the leader fails to update the timestamp within the given interval, it is assumed to have been crashed, which is when the inactive replicas race to acquire leadership by updating the lock with their identity.

First let’s define a interface for our backend implementation.

type Backend interface {
	WriteEntry(ctx context.Context, id string, term time.Duration) (bool, error)
}

The WriteEntry attempts to put/update an entry to the storage using condition to evaluate the term. Returns true if the lock was obtained, false if not. If error is not nil is mean that something unexpected happened, and is not equivalent with return false.

The id parameter of this function is the unique identity of each process. This is used know when the leader has changed. The term parameter is the amount of time expected to wait for the previous leader before is considered dead.

There are 3 time interval involved:

  • term is the amount of time for a leader to be considered without renewal. After this time the leader is considered dead and the leader election starts again
  • renew is the time interval at which the leader will try to renew his lock
  • retry is the amount of time to wait between retries of becoming the leader
type Config struct {
	Term  time.Duration
	Renew time.Duration
	Retry time.Duration
}

In order to be useful we need to signal somehow that the current process is the leader so an action can be performed. Also, there is also important to signal when the current process is no longer the leader. Of course we will need also a way to capture and log all the errors that occurs during the election for renewal process.

The leader manager will have this structure

type LeaderManager struct {
	backend   Backend
	config    *Config
	identity  string
	OnElected func(string, context.CancelFunc)
	OnOusting func(string, context.CancelFunc)
	OnError   func(string, error, context.CancelFunc)
}

Now we need to implement the first part of the election process, try to get the lock.

func (l *LeaderManager) tryToLock(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return context.Canceled
	default:
		success, err := l.backend.WriteEntry(ctx, l.identity, l.config.Term)
		if err != nil {
			return err
		}
		if success {
			return nil
		}
	}
	retryTicker := time.NewTicker(l.config.Retry)
	defer retryTicker.Stop()
	for {
		select {
		case <-ctx.Done():
			return context.Canceled
		case <-retryTicker.C:
			success, err := l.backend.WriteEntry(ctx, l.identity, l.config.Term)
			if err != nil {
				return err
			}
			if success {
				return nil
			}
		}
	}
}

The function calls the backend WriteEntry function in order to obtain the lock. If success is true or an error then exists else it will retry forever until will the WriteEntry will return success or the context is canceled. The first select statement is to ensure the first attempt to take the leadership happens immediately and not after the first retry interval.

The second part of the process it the renew part.

func (l *LeaderManager) renewLock(ctx context.Context) error {
	renewTicker := time.NewTicker(l.config.Renew)
	defer renewTicker.Stop()

	for {
		select {
		case <-ctx.Done():
			return context.Canceled
		case <-renewTicker.C:
			success, err := l.backend.WriteEntry(ctx, l.identity, l.config.Term)
			if err != nil {
				return err
			}
			if !success {
				return nil
			}
		}
	}
}

This function calls the WriteEntry function and tries to renew the leadership lock. If the function returns false or an error the function exist and the renew process is over.

Now let’s put everything together and a start function.

func (l *LeaderManager) Start(ctx context.Context) {
	newCtx, cancel := context.WithCancel(ctx)

	go func() {
		for {
			err := l.tryToLock(newCtx)
			if err != nil {
				if errors.Is(err, context.Canceled) {
					return
				}
				l.OnError(l.identity, err, cancel)
				time.Sleep(l.config.Retry)
				continue
			}
			// if we got here we are the leader
			if l.OnElected != nil {
				l.OnElected(l.identity, cancel)
			}

			err = l.renewLock(newCtx)
			// if we get here we are not the leader anymore
			if l.OnOusting != nil {
				l.OnOusting(l.identity, cancel)
			}
			if err != nil {
				if errors.Is(err, context.Canceled) {
					return
				}
				l.OnError(l.identity, err, cancel)
			}
		}
	}()
}

This function will create a new cancelable context and will create and run new goroutine that will try to get the lock, and when it succeeds will signal that this process it the new leader and start the renew process. When the renew fails it will signal that the leadership has been lost and process starts again by trying to become the leader again. If an error occurs the error is signaled and the process continues.

The only way to stop the election process is to cancel the context. A cancel function is provided as parameter for the callback functions.

We also need a constructor for our LeaderManager

func NewLeaderManager(backend Backend, config *Config) (*LeaderManager, error) {
	return &LeaderManager{
		backend:  backend,
		config:   config,
		identity: uuid.NewString(),
	}, nil
}

For the backend we will use a postgresql instance and the implementation is very simple.

type PostgresqlBackend struct {
	key string
	db  *pgxpool.Pool
}

func (b *PostgresqlBackend) WriteEntry(ctx context.Context, leaderID string, ttl time.Duration) (bool, error) {

	result, err := b.db.Exec(ctx, `
    INSERT INTO leaderlock as t (key, leader_id, valid_until) VALUES ($1, $2, NOW() at time zone 'utc' + $3 * INTERVAL '1 seconds'  ) 
	ON CONFLICT (key) DO
	UPDATE SET leader_id = $2, valid_until =  NOW() at time zone 'utc' + $3 * INTERVAL '1 seconds'
	WHERE t.valid_until < NOW() at time zone 'utc' OR t.leader_id = $2
    `, b.key, leaderID, int64(ttl.Seconds()))

	if err != nil {
		return false, err
	}

	return result.RowsAffected() == 1, nil
}

The key parameter is just a simple key to identify the leader bucket.

The final step is the demo:

func main() {
	ctx := context.Background()

	conn, err := pgxpool.New(ctx, "user=postgres password=password database=leaderelection host=localhost")
	if err != nil {
		panic(err)
	}

	le, err := NewLeaderManager(
		&PostgresqlBackend{key: "test-key", db: conn},
		&Config{Term: 8 * time.Second, Renew: 4 * time.Second, Retry: 2 * time.Second})

	if err != nil {
		panic(err)
	}

	le.OnElected = func(id string, cancel context.CancelFunc) {
		fmt.Printf("leader elected %s\n", id)
	}
	le.OnOusting = func(id string, cancel context.CancelFunc) {
		fmt.Printf("leader ousting %s\n", id)
	}
	le.OnError = func(id string, err error, cancel context.CancelFunc) {
		fmt.Printf("shit happens for leader %s %v\n", id, err)
	}

	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)

	le.Start(ctx)
	fmt.Printf("leader manager started \n")

	<-ctx.Done()
	stop()
	fmt.Printf("server exit\n")
}

Run the same program in 2 different terminal windows and see what happens.

go run main.go

Conclusion

Sometime using an already existing strongly consistent system to decide which node holds the lock helps a lot and the implementation is a lot faster.