4.1实现锁

利用watch+multi来实现乐观锁

乐观锁,顾名思义,乐观的认为数据不会被修改,只有当更新时才去判断数据是否被修改过,通常用版本号或时间戳来实现。 redis中通过watch和multi来实现,watch会监视给定的key是否发生更改,当exec的时候如果监视的key发生过改变,则整个事务会失败。 当然我们可以调用多次watch监视多个key。

<?php

$redis = new Redis();
$redis->connect('127.0.0.1', 6379, 60);

//设置商品的库存数为100
$redis->set('goods_stock_nums', 100);
//监视该key
$redis->watch('goods_stock_nums');

//开启事务
$redis->multi();

//修改库存数,incr原子性递增,decr原子性递减
$redis->decr('goods_stock_nums');

//提交事务,如果在此期间有其他请求修改了该key,那么事务会失败
if ($redis->exec()) {
    echo '抢购成功';
} else {
    echo '数据错误,请重新再试';
}

使用setnx来实现悲观锁

<?php

function getRedis()
{
    $redis = new Redis();
    $redis->connect('127.0.0.1', 6379, 60);
    return $redis;
}

function lock($key, $random)
{
    $redis = getRedis();
    return $redis->set($key, $random, ['nx', 'ex' => 3]);
}

function unlock($key, $random)
{
    $redis = getRedis();
    //使用lua脚本保证原子性
    $script = 'if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end';
    return $redis->eval($script, [$key, $random], 1);
}

function decrGoodsStockNums()
{
    $redis = getRedis();

    //获取商品库存数
    $ret = $redis->get('goods_stock_nums');

    if ($ret === false) {
        return false;
    }

    if ($ret <= 0) {
        return false;
    }
 
    $random = mt_rand();
    //先获取锁
    if (lock('goods_stock_nums_lock', $random)) {
        //修改库存数
        $redis->decr('goods_stock_nums');

        //释放锁
        unlock('goods_stock_nums_lock', $random);
        return true;
    } else {
        usleep(100);
        decrGoodsStockNums();
    }
}

decrGoodsStockNums();

分布式锁

问题主要集中在如何获取分布式锁 setnx创建锁成功 或者 锁过期且getset时间为过期时间

/**
     * 实现Redis分布锁
     */
    $key        = 'demo';       //要更新信息的缓存KEY
    $lockKey    = 'lock:'.$key; //设置锁KEY
    $lockExpire = 10;           //设置锁的有效期为10秒,防止死锁
     
    //获取缓存信息
    $result = $redis->get($key);
    //判断缓存中是否有数据
    if(empty($result))
    {
        $status = TRUE;
        while ($status)
        {
            //设置锁值为当前时间戳 + 有效期
            $lockValue = time() + $lockExpire;
            /**
             * 创建锁
             * 试图以$lockKey为key创建一个缓存,value值为当前时间戳
             * 由于setnx()函数只有在不存在当前key的缓存时才会创建成功
             * 所以,用此函数就可以判断当前执行的操作是否已经有其他进程在执行了
             * @var [type]
             */
            $lock = $redis->setnx($lockKey, $lockValue);
            /**
             * 满足两个条件中的一个即可进行操作
             * 1、上面一步创建锁成功;
             * 2、   1)判断锁的值(时间戳)是否小于当前时间    $redis->get()
             *      2)同时给锁设置新值成功    $redis->getset()
             */
            if(!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time() ))
            {
                //给锁设置生存时间
                $redis->expire($lockKey, $lockExpire);
                //******************************
                //此处执行插入、更新缓存操作...
                //******************************
     
                //以上程序走完删除锁
                //检测锁是否过期,过期锁没必要删除
                if($redis->ttl($lockKey))
                    $redis->del($lockKey);
                $status = FALSE;
            }else{
                /**
                 * 如果存在有效锁这里做相应处理
                 *      等待当前操作完成再执行此次请求
                 *      直接返回
                 */
                sleep(2);//等待2秒后再尝试执行操作
            }
        }
    }

redis锁(setnx)有一定的安全风险,redis挂掉。第一个客户端没释放锁,第二个客户端就获取到锁 持久化 rdb 镜像复制 aof 增量复制 1、每操作 2、每时间段 3、每cpubuffer 都有空窗期 redis集群默认是弱一致性,开启强一致性 1、每操作 降低性能 zookeeper是解决分布式锁的较好方案(zookeeper 安全性能非最优) 大数据etcd 安全性能优

基于golang实现

package main

import (
    "fmt"
    "sync"
    "time"

    "github.com/go-redis/redis"
)

var client *redis.Client

func init() {
    c := redis.NewClient(&redis.Options{
        Addr:     "node01:6379",
        Password: "",
        DB:       0,
        PoolSize: 100,
    })
    client = c
}

func incr() {

    var lockKey = "counter_lock"
    var counterKey = "counter"

    resp := client.SetNX(lockKey, 1, time.Second*5)
    lockSuccess, err := resp.Result()
    if err != nil || !lockSuccess {
        fmt.Println(err, "lock result: ", lockSuccess)
        //incr()
        return
    }
    getResp := client.Get(counterKey)
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        cntValue++
        resp := client.Set(counterKey, cntValue, 0)
        _, err := resp.Result()
        if err != nil {
            panic("set value error!")
        }
    }
    delResp := client.Del(lockKey)
    unlockSuccess, err := delResp.Result()
    if err == nil && unlockSuccess > 0 {
    } else {
        panic("unlock failed")

    }
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            incr()
        }()
    }
    wg.Wait()
    getResp := client.Get("counter")
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        println(cntValue)
    }
}

最后更新于