Zero to Inf

技术栈的流动-从零到无穷

使用Go语言实现Redis分布式锁(附有看门狗自动续期机制)

Redis我们日常开发经常使用,而分布式锁的一个重要实现就是通过Redis完成,分布式锁要解决的核心问题是防止对某个资源进行重复或者过度请求,例如我们在分布式系统中创建订单之前,必须获取分布式锁才能创建订单,其要解决的主要问题有两个:

1)如果用户重复点击提交订单按钮,可以通过分布式锁避免重复创建订单

2)商品库存有限,通过分布式锁能够解决超卖问题

Redis实现分布式锁

由于Redis中set nx命令的原子性,只有在键值不存在的时候才能设置值,因此可以通过set nx实现分布式锁,但是它有弊端就是:

如果业务还没有执行完,那么会导致业务没有执行完,锁就被释放了

因此延伸出了基于Redis延伸的Redisson分布式锁框架,它实现的原理在于使用Redis单线程模型执行SET NX命令lua脚本确保获取锁操作原子性,同时它内置了更为丰富的看门狗机制,满足了在业务执行过程中,自动续期锁

Go语言实现分布式锁

上面都是基于理论介绍,接下来就实现基于Redis的分布式锁

明确原子性操作

在实现之前,我们要明确加锁、释放锁续期锁的机制:

加锁:设置键值与过期时间,通过setnx px实现,如果键值已经存在直接返回错误

释放锁:我们一定要确保释放的锁是自己加的锁,因此要判断value值是否是之前设置的value值,只有判断正确才能够释放锁

续期锁:与释放锁同理,只有当前锁是自己加的锁才续期

package utils

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/google/uuid"
	"github.com/redis/go-redis/v9"
)

// 预定义错误,便于调用方进行错误判断
var (
	// ErrLockNotHeld 释放锁时发现锁不属于当前实例
	ErrLockNotHeld = errors.New("lock not held")
	// ErrLockAcquireFailed 加锁失败(因已被他人持有或超时)
	ErrLockAcquireFailed = errors.New("failed to acquire lock")
	// ErrLockLost 锁在持有期间丢失(续期失败),此时业务应中断
	ErrLockLost = errors.New("lock lost during hold")
)

// Logger 可注入的日志接口,便于集成各类日志库
type Logger interface {
	Printf(format string, v ...interface{})
}

// defaultLogger 默认日志实现,输出到标准输出
type defaultLogger struct{}

func (l *defaultLogger) Printf(format string, v ...interface{}) {
	fmt.Printf(format+"\n", v...)
}

// RenewFailedCallback 续期失败时的回调函数,用于通知业务方锁已丢失
type RenewFailedCallback func(key string, err error)

// RetryStrategy 重试策略类型,用于自定义获取锁失败时的等待行为
type RetryStrategy func(attempt int) time.Duration

// Mutex 基于 Redis 的分布式锁,内置看门狗自动续期。
// 每个实例仅允许一次 Lock → Unlock 生命周期,不可复用。
type Mutex struct {
	client redis.UniversalClient // Redis 客户端,支持单机、哨兵、集群
	key    string                // Redis 中锁的键名
	value  string                // 锁持有者唯一标识(UUID)
	ttl    time.Duration         // 锁的过期时间

	// 状态保护
	mu     sync.Mutex
	locked bool   // 是否已成功加锁
	lost   bool   // 锁是否已丢失(续期失败导致),丢失后不应再尝试释放
	stopCh chan struct{} // 通知看门狗退出的通道
	wg     sync.WaitGroup

	// 看门狗启动保护
	watchdogOnce sync.Once // 确保看门狗只启动一次

	// 配置项
	retryStrategy     RetryStrategy      // 自定义重试策略
	minRenewInterval  time.Duration      // 最小续期间隔,防止高频续期
	renewTimeout      time.Duration      // 续期操作的超时时间
	logger            Logger             // 日志接口
	renewFailedCb     RenewFailedCallback // 续期失败回调
}

// Option 函数式配置选项
type Option func(*Mutex)

// WithRetryDelay 设置固定间隔的重试策略(简单场景)
func WithRetryDelay(d time.Duration) Option {
	return func(m *Mutex) {
		m.retryStrategy = func(attempt int) time.Duration {
			return d
		}
	}
}

// WithExponentialBackoff 设置指数退避重试策略
// 初始间隔为 initial,最大间隔为 max,每次重试间隔翻倍
func WithExponentialBackoff(initial, max time.Duration) Option {
	return func(m *Mutex) {
		m.retryStrategy = func(attempt int) time.Duration {
			d := initial
			for i := 0; i < attempt; i++ {
				d *= 2
				if d > max {
					return max
				}
			}
			return d
		}
	}
}

// WithMinRenewInterval 设置看门狗最小续期间隔,避免高频续期(默认 200ms)
func WithMinRenewInterval(d time.Duration) Option {
	return func(m *Mutex) {
		m.minRenewInterval = d
	}
}

// WithRenewTimeout 设置单次续期操作的超时时间(默认 2s)
func WithRenewTimeout(d time.Duration) Option {
	return func(m *Mutex) {
		m.renewTimeout = d
	}
}

// WithLogger 注入自定义日志实现
func WithLogger(l Logger) Option {
	return func(m *Mutex) {
		m.logger = l
	}
}

// WithRenewFailedCallback 设置续期失败回调
func WithRenewFailedCallback(cb RenewFailedCallback) Option {
	return func(m *Mutex) {
		m.renewFailedCb = cb
	}
}

// NewMutex 创建一个新的分布式锁实例
// client: Redis 客户端
// key: 锁的键名,建议使用业务唯一标识,如 "order:lock:123"
// ttl: 锁的过期时间,建议设置为业务处理时间的 2~3 倍
func NewMutex(client redis.UniversalClient, key string, ttl time.Duration, opts ...Option) *Mutex {
	if ttl <= 0 {
		ttl = 30 * time.Second
	}
	m := &Mutex{
		client:           client,
		key:              key,
		value:            uuid.New().String(), // 生成唯一持有者标识
		ttl:              ttl,
		stopCh:           make(chan struct{}),
		retryStrategy:    func(attempt int) time.Duration { return 50 * time.Millisecond }, // 默认固定 50ms
		minRenewInterval: 200 * time.Millisecond,
		renewTimeout:     2 * time.Second,
		logger:           &defaultLogger{},
	}
	for _, opt := range opts {
		opt(m)
	}
	return m
}

// Lock 阻塞式获取锁,直到成功或 ctx 超时/取消。
// 成功获取锁后自动启动看门狗进行续期。
func (m *Mutex) Lock(ctx context.Context) error {
	attempt := 0
	for {
		ok, err := m.tryAcquire(ctx)
		if err != nil {
			return err
		}
		if ok {
			// 加锁成功,启动看门狗
			m.startWatchdog()
			return nil
		}

		// 未获取到锁,计算等待时间
		delay := m.retryStrategy(attempt)
		attempt++

		select {
		case <-ctx.Done():
			return fmt.Errorf("lock %s acquire timeout: %w", m.key, ctx.Err())
		case <-time.After(delay):
			// 继续重试
		}
	}
}

// TryLock 非阻塞尝试获取一次锁,立即返回结果。
// 若成功获取,会自动启动看门狗。
func (m *Mutex) TryLock(ctx context.Context) (bool, error) {
	ok, err := m.tryAcquire(ctx)
	if err != nil {
		return false, err
	}
	if ok {
		m.startWatchdog()
		return true, nil
	}
	return false, nil
}

// Unlock 释放锁,并停止看门狗。
// 只有锁的当前持有者才能成功释放,否则返回 ErrLockNotHeld。
// 若锁在持有期间已丢失(续期失败),则返回 ErrLockLost。
func (m *Mutex) Unlock(ctx context.Context) error {
	// 先停止看门狗,确保不再有续期操作
	m.stopWatchdog()

	m.mu.Lock()
	defer m.mu.Unlock()

	if m.lost {
		return ErrLockLost
	}
	if !m.locked {
		return ErrLockNotHeld
	}

	err := m.releaseLock(ctx)
	if err == nil {
		m.locked = false
	}
	return err
}

// IsHeld 检查当前实例是否仍持有锁(通过 Redis 实时验证)
func (m *Mutex) IsHeld(ctx context.Context) (bool, error) {
	val, err := m.client.Get(ctx, m.key).Result()
	if err == redis.Nil {
		return false, nil
	}
	if err != nil {
		return false, err
	}
	return val == m.value, nil
}

// tryAcquire 执行一次加锁尝试,内部更新 locked 状态
func (m *Mutex) tryAcquire(ctx context.Context) (bool, error) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.locked {
		return false, fmt.Errorf("mutex already locked")
	}

	ok, err := m.client.SetNX(ctx, m.key, m.value, m.ttl).Result()
	if err != nil {
		return false, err
	}
	if ok {
		m.locked = true
		m.lost = false
		return true, nil
	}
	return false, nil
}

// releaseLock 执行 Redis 释放锁的 Lua 脚本,确保原子性
func (m *Mutex) releaseLock(ctx context.Context) error {
	script := `
		if redis.call("GET", KEYS[1]) == ARGV[1] then
			return redis.call("DEL", KEYS[1])
		else
			return 0
		end
	`
	result, err := m.client.Eval(ctx, script, []string{m.key}, m.value).Result()
	if err != nil {
		return fmt.Errorf("redis eval error: %w", err)
	}
	if result.(int64) == 0 {
		return ErrLockNotHeld
	}
	return nil
}

// startWatchdog 启动后台续期协程(内部使用 Once 保证只启动一次)
func (m *Mutex) startWatchdog() {
	m.watchdogOnce.Do(func() {
		m.wg.Add(1)
		go m.watchdogLoop()
	})
}

// watchdogLoop 看门狗主循环,定期续期
func (m *Mutex) watchdogLoop() {
	defer m.wg.Done()

	// 计算续期间隔:TTL 的 1/3,且不小于最小间隔
	interval := m.ttl / 3
	if interval < m.minRenewInterval {
		interval = m.minRenewInterval
	}

	ticker := time.NewTicker(interval)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			if err := m.renew(); err != nil {
				// 续期失败,标记锁已丢失,并通知外部
				m.markLost()
				m.logger.Printf("watchdog renew failed for key %s: %v", m.key, err)
				if m.renewFailedCb != nil {
					m.renewFailedCb(m.key, err)
				}
				return
			}
		case <-m.stopCh:
			// 收到停止信号,正常退出
			return
		}
	}
}

// renew 执行一次续期操作,使用 Lua 脚本验证持有者身份并延长 TTL
func (m *Mutex) renew() error {
	ctx, cancel := context.WithTimeout(context.Background(), m.renewTimeout)
	defer cancel()

	script := `
		if redis.call("GET", KEYS[1]) == ARGV[1] then
			return redis.call("EXPIRE", KEYS[1], ARGV[2])
		else
			return 0
		end
	`
	result, err := m.client.Eval(ctx, script, []string{m.key}, m.value, int64(m.ttl.Seconds())).Result()
	if err != nil {
		return err
	}
	if result.(int64) == 0 {
		return errors.New("lock not held or key missing")
	}
	return nil
}

// markLost 标记锁已丢失,由续期失败时调用
func (m *Mutex) markLost() {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.lost = true
	m.locked = false // 丢失后即认为不再持有锁
}

// stopWatchdog 停止看门狗协程并等待其退出
func (m *Mutex) stopWatchdog() {
	// 关闭 stopCh 通知看门狗退出(使用 select 避免重复关闭)
	select {
	case <-m.stopCh:
		// 已经关闭
	default:
		close(m.stopCh)
	}
	// 等待看门狗协程完全退出
	m.wg.Wait()
}

完整测试用例:

测试依赖 miniredis 模拟 Redis 服务,请先安装:

go get github.com/alicebob/miniredis/v2

测试文件:mutex_test.go

package redislock

import (
    "context"
    "sync"
    "testing"
    "time"

    "github.com/alicebob/miniredis/v2"
    "github.com/redis/go-redis/v9"
    "github.com/stretchr/testify/assert"
    "github.com/stretchr/testify/require"
)

// 创建测试用的 Redis 客户端(基于 miniredis)
func setupTestRedis(t *testing.T) (*redis.Client, *miniredis.Miniredis) {
    mr, err := miniredis.Run()
    require.NoError(t, err)

    client := redis.NewClient(&redis.Options{
        Addr: mr.Addr(),
    })
    return client, mr
}

// 测试:正常加锁和解锁流程
func TestMutex_LockUnlock(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    mutex := NewMutex(client, "test:lock", 5*time.Second)
    ctx := context.Background()

    // 加锁
    err := mutex.Lock(ctx)
    require.NoError(t, err)

    // 验证 Redis 中确实存在该 key
    val, err := client.Get(ctx, "test:lock").Result()
    require.NoError(t, err)
    assert.Equal(t, mutex.value, val)

    // 解锁
    err = mutex.Unlock(ctx)
    require.NoError(t, err)

    // 验证 key 已被删除
    _, err = client.Get(ctx, "test:lock").Result()
    assert.ErrorIs(t, err, redis.Nil)
}

// 测试:TryLock 非阻塞获取
func TestMutex_TryLock(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    mutex1 := NewMutex(client, "test:trylock", 5*time.Second)
    mutex2 := NewMutex(client, "test:trylock", 5*time.Second)

    ctx := context.Background()

    // 第一个实例获取锁应成功
    ok, err := mutex1.TryLock(ctx)
    require.NoError(t, err)
    assert.True(t, ok)

    // 第二个实例获取锁应失败(已被占用)
    ok, err = mutex2.TryLock(ctx)
    require.NoError(t, err)
    assert.False(t, ok)

    // 释放锁后,第二个实例应能获取
    err = mutex1.Unlock(ctx)
    require.NoError(t, err)

    ok, err = mutex2.TryLock(ctx)
    require.NoError(t, err)
    assert.True(t, ok)
}

// 测试:阻塞等待超时
func TestMutex_LockTimeout(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    // 先占用锁
    holder := NewMutex(client, "test:timeout", 10*time.Second)
    err := holder.TryLock(context.Background())
    require.NoError(t, err)
    defer holder.Unlock(context.Background())

    // 第二个实例尝试获取,设置较短超时
    waiter := NewMutex(client, "test:timeout", 10*time.Second)
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    start := time.Now()
    err = waiter.Lock(ctx)
    elapsed := time.Since(start)

    assert.Error(t, err)
    assert.Contains(t, err.Error(), "acquire timeout")
    assert.True(t, elapsed >= 500*time.Millisecond)
    assert.True(t, elapsed < 1*time.Second)
}

// 测试:看门狗自动续期
func TestMutex_WatchdogRenew(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    // 设置较短的 TTL 和续期间隔,加快测试
    mutex := NewMutex(
        client,
        "test:watchdog",
        2*time.Second,
        WithMinRenewInterval(200*time.Millisecond),
    )

    ctx := context.Background()
    err := mutex.Lock(ctx)
    require.NoError(t, err)

    // 快进 Redis 时间,模拟 3 秒过去(超过原始 TTL)
    mr.FastForward(3 * time.Second)

    // 锁应该仍然存在(被看门狗续期)
    val, err := client.Get(ctx, "test:watchdog").Result()
    require.NoError(t, err)
    assert.Equal(t, mutex.value, val)

    // 释放锁
    err = mutex.Unlock(ctx)
    require.NoError(t, err)
}

// 测试:续期失败回调
func TestMutex_RenewFailedCallback(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    callbackCalled := make(chan error, 1)
    mutex := NewMutex(
        client,
        "test:callback",
        2*time.Second,
        WithMinRenewInterval(100*time.Millisecond),
        WithRenewFailedCallback(func(key string, err error) {
            callbackCalled <- err
        }),
    )

    ctx := context.Background()
    err := mutex.Lock(ctx)
    require.NoError(t, err)

    // 模拟锁被外部删除(如手动 DEL)
    client.Del(ctx, "test:callback")

    // 等待回调被触发(续期时会发现锁不存在)
    select {
    case err := <-callbackCalled:
        assert.Error(t, err)
    case <-time.After(2 * time.Second):
        t.Fatal("续期失败回调未被调用")
    }

    // 解锁时应返回 ErrLockLost
    err = mutex.Unlock(ctx)
    assert.ErrorIs(t, err, ErrLockLost)
}

// 测试:锁持有者身份校验(防止误解锁)
func TestMutex_IdentityProtection(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    mutex1 := NewMutex(client, "test:identity", 5*time.Second)
    mutex2 := NewMutex(client, "test:identity", 5*time.Second)

    ctx := context.Background()

    // mutex1 加锁
    err := mutex1.Lock(ctx)
    require.NoError(t, err)

    // mutex2 尝试解锁应失败
    err = mutex2.Unlock(ctx)
    assert.ErrorIs(t, err, ErrLockNotHeld)

    // mutex1 解锁应成功
    err = mutex1.Unlock(ctx)
    assert.NoError(t, err)
}

// 测试:IsHeld 实时状态查询
func TestMutex_IsHeld(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    mutex := NewMutex(client, "test:isheld", 5*time.Second)
    ctx := context.Background()

    // 加锁前
    held, err := mutex.IsHeld(ctx)
    require.NoError(t, err)
    assert.False(t, held)

    // 加锁后
    err = mutex.Lock(ctx)
    require.NoError(t, err)
    held, err = mutex.IsHeld(ctx)
    require.NoError(t, err)
    assert.True(t, held)

    // 解锁后
    err = mutex.Unlock(ctx)
    require.NoError(t, err)
    held, err = mutex.IsHeld(ctx)
    require.NoError(t, err)
    assert.False(t, held)
}

// 测试:并发竞争场景
func TestMutex_Concurrent(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    const goroutines = 20
    var wg sync.WaitGroup
    counter := 0
    var mu sync.Mutex // 仅用于保护 counter 的本地累加

    for i := 0; i < goroutines; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            mutex := NewMutex(client, "test:concurrent", 5*time.Second)
            ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
            defer cancel()

            if err := mutex.Lock(ctx); err != nil {
                t.Logf("goroutine 获取锁失败: %v", err)
                return
            }
            defer mutex.Unlock(context.Background())

            // 临界区:安全地修改共享资源
            mu.Lock()
            counter++
            mu.Unlock()

            // 模拟业务处理
            time.Sleep(50 * time.Millisecond)
        }()
    }

    wg.Wait()

    // 最终 counter 应等于成功获取锁的 goroutine 数量(即 goroutines)
    assert.Equal(t, goroutines, counter)
}

// 测试:指数退避重试策略
func TestMutex_ExponentialBackoff(t *testing.T) {
    client, mr := setupTestRedis(t)
    defer mr.Close()
    defer client.Close()

    // 先占用锁
    holder := NewMutex(client, "test:backoff", 10*time.Second)
    err := holder.TryLock(context.Background())
    require.NoError(t, err)
    defer holder.Unlock(context.Background())

    // 第二个实例使用指数退避
    waiter := NewMutex(
        client,
        "test:backoff",
        10*time.Second,
        WithExponentialBackoff(50*time.Millisecond, 1*time.Second),
    )

    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel()

    start := time.Now()
    err = waiter.Lock(ctx)
    elapsed := time.Since(start)

    assert.Error(t, err)
    // 验证确实等待了一段时间(由于退避,总等待时间应大于最小间隔)
    assert.True(t, elapsed > 100*time.Millisecond)
}

运行测试

go test -v -race ./...

-race 标志会启用数据竞争检测,确保并发安全。


注意事项

  1. miniredis 时间快进mr.FastForward() 会推进 Redis 内部时钟,用于模拟过期时间,这对测试看门狗续期非常有用。
  2. 测试并发安全:使用 -race 运行测试可发现潜在的竞态条件。
  3. 生产环境日志:示例中的 Logger 接口可以替换为 *slog.Logger 或 logrus 实现,方便集成。
  4. 上下文传递:建议业务层在调用 Lock 时传入带有 trace_id 的 context,便于问题追踪。

发表回复

Your email address will not be published. Required fields are marked *.

*
*