Introduction
Go's sync.Mutex
is a mutual exclusion lock designed to lock critical sections of code and prevent concurrent access to shared resources by multiple goroutines within the same program. It ensures that only one goroutine at a time can read/write a block of code protected by the mutex, thereby avoiding race conditions and providing safe and predictable access to shared variables or structures. Once a goroutine has successfully locked the mutex, it can safely read from or write to the shared variables or data structures without the risk of concurrent modifications. Other goroutines that attempt to acquire the same mutex while it is locked will be blocked, meaning they must wait until the previous goroutine releases the lock.
sync.Mutex
provides a straightforward mutual exclusion lock to prevent concurrent modifications within a single program. However, its limitation is that the lock is restricted to the program's memory space, meaning it cannot handle concurrent modifications across multiple instances of the program. Additionally, sync.Mutex
cannot lock resources stored in external sources, such as databases, caches, or external API services.
In distributed environments, where multiple instances of the same program run simultaneously, it becomes essential to implement a mechanism to lock shared resources across these instances. For example, when two processes attempt to write to the same database record simultaneously, without proper locking, one operation could unintentionally overwrite the results of the other, leading to data corruption or unexpected behaviour.
In this article, we will explore how to use a distributed mutex by creating a distributed wallet service using the github.com/go-redsync/redsync
library to manage the mutex. This application is a simple wallet service designed to run in a distributed environment, allowing multiple concurrent requests to update (increment or decrement) the same user's balance.
Requirements
-
Ensure you have the Redis Server installed and running on your dev environment
-
Install the Redis Go Client:
go get github.com/redis/go-redis/v9
-
Install the Redsync library:
go get github.com/go-redsync/redsync/v4
Wallet Service without Locking
To begin, we'll create a basic version of the wallet service that does not use a locking mechanism. To minimise software dependencies for this article, we will also use Redis as the database.
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/redis/go-redis/v9"
)
type updateBalanceRequest struct {
UserID string `json:"userId"`
Amount int `json:"amount"`
}
func main() {
redisClient := redis.NewClient(&redis.Options{
Network: "tcp",
Addr: "localhost:6379",
})
httpServer := http.NewServeMux()
httpServer.HandleFunc("/wallets/balance", func(w http.ResponseWriter, r *http.Request) {
var req updateBalanceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
log.Printf("Request received for updating balance for user: %s with amount: %d \n", req.UserID, req.Amount)
ctx := r.Context()
walletID := fmt.Sprintf("wallet:%s", req.UserID)
currentBalance, err := redisClient.Get(ctx, walletID).Int()
if err != nil {
if !errors.Is(err, redis.Nil) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// If the balance is not found, we set it to 0.
currentBalance = 0
}
// Let's sleep for 10 seconds to simulate a long-running process.
// In a real-world scenario, this might be a long-running process
// like checking limits, transaction processing, etc.
time.Sleep(10 * time.Second)
newBalance := currentBalance + req.Amount
err = redisClient.Set(ctx, walletID, newBalance, 0).Err()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("New Balance: %d", newBalance)))
log.Printf("Request completed for updating balance for user: %s with amount: %d \n", req.UserID, req.Amount)
})
// Start the server
port := os.Getenv("PORT")
if port == "" {
log.Fatal("PORT environment variable is not set")
}
log.Println("Starting server on port " + port)
http.ListenAndServe(":"+port, httpServer)
}
👀 Why not just use INCRBY/DECRBY?
Redis’s INCRBY/DECRBY is atomic and would fix a simple counter in one line. I am skipping it here to illustrate the general pattern, locking around multi-step logic.
To simulate a distributed setup, we will run two separate instances of the HTTP service on ports :8010
(Instance A) and :8020
(Instance B).
- Terminal 1:
PORT=8010 go run main.go
- Terminal 2:
PORT=8020 go run main.go
We will send two concurrent POST requests to both instances simultaneously. The first request will increment the balance by 500, while the second request will decrement the balance by 350. If the user's initial balance is zero (0), we would expect the final balance to be 150, calculated as 500 - 350. However, the final balance could end up being -350 or 500, depending on the API call that writes to the database last.
This occurs because both requests are processed simultaneously. When the program retrieves the current user balance, it then conducts further processing to compute the new balance, which takes some time. By the time the processing is completed, both instances have updated the user's balance based on the initial value they retrieved.
For example, Instance A retrieves the current balance of 0 and starts processing. Shortly after, Instance B also retrieves the current balance, which remains at 0 since Instance A is still processing. After completing the processing, Instance A increments the balance from 0 to 500, while Instance B decrements it from 0 to -350. This illustrates a classic case of a race condition, where the outcome depends on the timing of the processes.
Distributed Mutex to the Rescue
Race conditions can be chaotic in a distributed system. To mitigate this issue, it's essential to lock the resource using a unique identifier before retrieving and performing any operations on its value. This process ensures that we consistently access a committed value.
In our case, we will use a distributed mutex that locks the user wallet resource using the wallet ID as the lock key before initiating the balance update operation. Once the operation is complete, we will unlock the mutex, allowing other processes to access the wallet resource safely.
package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"time"
"github.com/go-redsync/redsync/v4"
"github.com/go-redsync/redsync/v4/redis/goredis/v9"
"github.com/redis/go-redis/v9"
)
type updateBalanceRequest struct {
UserID string `json:"userId"`
Amount int `json:"amount"`
}
func main() {
// Initialize Redis client, this is a connection to a single node Redis instance.
// For a cluster setup, we can use redis.NewClusterClient(...).
// e.g
// clusterClient := redis.NewClusterClient(&redis.ClusterOptions{
// Addrs: []string{
// "localhost:6379",
// "localhost:6380",
// // ... other redis nodes
// },
// })
redisClient := redis.NewClient(&redis.Options{
Network: "tcp",
Addr: "localhost:6379",
})
rs := redsync.New(goredis.NewPool(redisClient))
httpServer := http.NewServeMux()
httpServer.HandleFunc("/wallets/balance", func(w http.ResponseWriter, r *http.Request) {
var req updateBalanceRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request", http.StatusBadRequest)
return
}
log.Printf("Request received for updating balance for user: %s with amount: %d \n", req.UserID, req.Amount)
ctx := r.Context()
walletID := fmt.Sprintf("wallet:%s", req.UserID)
// lockKey should be a unique value for representing the resource to be locked,
// in this case, it's the walletID
//
// adding the "mutex:" prefix to the walletID since we are also using redis as the database,
// if we had a different database, we could just use the walletID as the lockKey
lockKey := fmt.Sprintf("mutex:%s", walletID)
// Create a mutex with the unique lockKey
mutex := rs.NewMutex(
lockKey,
// We don't want to lock the resource for too long, so we set an expiry.
// This is to avoid deadlocks, in case the process holding the lock crashes
// or hangs.
redsync.WithExpiry(15*time.Second),
// We want to retry to acquire the lock if it fails i.e if it's already locked by another process.
// We set a delay of 5 seconds between retries.
redsync.WithRetryDelay(5*time.Second),
// We want to try to acquire the lock 3 times, if it fails, we return an error.
// This is to avoid indefinite waiting for the lock, might happen if multiple processes are competing
// for the same lock.
redsync.WithTries(3),
)
if err := mutex.LockContext(ctx); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer mutex.Unlock()
// Always fetch the current balance to ensure that another process
// has not updated the balance while we were waiting for the lock
currentBalance, err := redisClient.Get(ctx, walletID).Int()
if err != nil {
if !errors.Is(err, redis.Nil) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// If the balance is not found, we set it to 0.
currentBalance = 0
}
// Let's sleep for 10 seconds to simulate a long-running process.
// In a real-world scenario, this might be a long-running process
// like checking limits, transaction processing, etc.
time.Sleep(10 * time.Second)
newBalance := currentBalance + req.Amount
err = redisClient.Set(ctx, walletID, newBalance, 0).Err()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf("New Balance: %d", newBalance)))
log.Printf("Request completed for updating balance for user: %s with amount: %d \n", req.UserID, req.Amount)
})
// Start the server
port := os.Getenv("PORT")
if port == "" {
log.Fatal("PORT environment variable is not set")
}
log.Println("Starting server on port " + port)
http.ListenAndServe(":"+port, httpServer)
}
Now, if we receive multiple simultaneous requests to update a user's balance, we will ensure that the balance is updated correctly, regardless of the order in which the requests are received. This is achieved by locking the user wallet resource or attempting to acquire the lock before performing any operations on the user's wallet.
Conclusion
In this article, we explored the use of a distributed mutex in Go with Redis, using the Redsync library. To keep things straightforward and avoid unnecessary complexity, I chose not to include comprehensive error handling in the HTTP handler or to separate the code beyond the main function. If you want to learn more about Redsync and its features, I encourage you to check out the library's code and documentation on GitHub https://github.com/go-redsync/redsync.