Redis分布式锁解决超卖问题

思考并回答以下问题:

  • Redis的multi命令是什么意思?
  • watch要在multi前面吗?

Redis事务

Redis事务介绍

  • 1.redis事务可以一次执行多个命令,本质是一组命令的集合。
  • 2.一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入。

作用:一个队列中,一次性、顺序性、排他性的执行一系列命令。

multi指令的使用

  • 1.下面指令演示了一个完整的事务过程,所有指令在exec前不执行,而是缓存在服务器的一个事务队列中。
  • 2.服务器一旦收到exec指令才开始执行事务队列,执行完毕后一次性返回所有结果。
  • 3.因为redis是单线程的,所以不必担心自己在执行队列时被打断,可以保证“原子性”。

注:Redis事务在遇到指令失败后,后面的指令会继续执行。

1
2
3
4
5
6
7
# Multi命令用于标记一个事务块的开始
# 事务块内的多条命令会按照先后顺序被放进一个队列当中,最后由EXEC命令原子性(atomic)的执行
> multi #(开始一个redis事务)
incr books
incr books
> exec #(执行事务)
> discard #(丢弃事务)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def multi_test():
r = redis.Redis(host='127.0.0.1')
pipe = r.pipeline()
pipe.multi() # 开启事务
pipe.set('key2', 400) # 存储子命令
pipe.execute() # 执行事务

print("第一次事务提交后的结果" + r.get('key2').decode("utf-8"))

pipe.multi() # 开启事务
pipe.set('key2', 100) # 存储子命令

print("第二次未提交事务的结果" + r.get("key2").decode("utf-8"))

#第一次事务提交后的结果400
#第二次未提交事务的结果400

注:mysql的rollback与redis的discard的区别

  • 1.mysql回滚为sql全部成功才执行,一条sql失败则全部失败,执行rollback后所有语句造成的影响消失。
  • 2.redis的discard只是结束本次事务,正确命令造成的影响仍然还在。
    • 1)redis如果在一个事务中的命令出现错误,那么所有的命令都不会执行;
    • 2)redis如果在一个事务中出现运行错误,那么正确的命令会被执行。

watch指令作用

  • 1.watch其实就是redis提供的一种乐观锁,可以解决并发修改问题。
  • 2.watch会在事务开始前盯住一个或多个关键变量,当服务器收到exec指令要顺序执行缓存中的事务队列时,redis会检查关键变量自watch后是否被修改。
  • 3.watch只会在数据被其他客户端抢先修改了的情况下通知执行命令的这个客户端(通过WatchError异常),但不会阻止其他客户端对数据的修改。

watch+multi实现乐观锁

setnx指令(redis的分布式锁)

  • 1.分布式锁本质是占一个坑,当别的进程也要来占坑时发现已经被占,就会放弃或者稍后重试。
  • 2.占坑一般使用setnx(set if not exists)指令,只允许一个客户端占坑。
  • 3.先来先占,用完了在调用del指令释放坑。
1
2
3
> setnx lock:codehole true
.... do something critical ....
> del lock:codehole
  • 4.但是这样有一个问题,如果逻辑执行到中间出现异常,可能导致del指令没有被调用,这样就会陷入死锁,锁永远无法释放。
  • 5.为了解决死锁问题,我们拿到锁时可以加上一个expire过期时间,这样即使出现异常,当到达过期时间也会自动释放锁。
1
2
3
4
> setnx lock:codehole true
> expire lock:codehole 5
.... do something critical ....
> del lock:codehole
  • 6.这样又有一个问题,setnx和expire是两条指令而不是原子指令,如果两条指令之间进程挂掉依然会出现死锁。
  • 7.为了治理上面乱象,在redis2.8中加入了set指令的扩展参数,使setnx和expire指令可以一起执行。
1
2
3
> set lock:codehole true ex 5 nx
''' do something '''
> del lock:codehole

Redis解决超卖问题

使用Redis的watch+multi指令实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#! /usr/bin/env python
# -*- coding: utf-8 -*-
import redis

def sale(rs):
while True:
with rs.pipeline() as p:
try:
p.watch('apple') # 监听key值为apple的数据数量改变
count = int(rs.get('apple'))
print('拿取到了苹果的数量: %d' % count)

p.multi() # 事务开始
if count> 0 : # 如果此时还有库存
p.set('apple', count - 1)
p.execute() # 执行事务
p.unwatch()
break # 当库存成功减一或没有库存时跳出执行循环

except Exception as e: # 当出现watch监听值出现修改时,WatchError异常抛出
print('[Error]: %s' % e)
continue # 继续尝试执行

rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis
rs.set('apple',1000) # 首先在redis中设置某商品apple对应数量value值为1000
sale(rs)

1)原理

  • 1.当用户购买时,通过WATCH监听用户库存,如果库存在watch监听后发生改变,就会捕获异常而放弃对库存减一操作;
  • 2.如果库存没有监听到变化并且数量大于1,则库存数量减一,并执行任务。

2)弊端

  • 1.Redis在尝试完成一个事务的时候,可能会因为事务的失败而重复尝试重新执行。
  • 2.保证商品的库存量正确是一件很重要的事情,但是单纯的使用WATCH这样的机制对服务器压力过大。

使用Redis的watch+multi+setnx指令实现

1)为什么要自己构建锁

  • 1.虽然有类似的SETNX命令可以实现Redis中的锁的功能,但他锁提供的机制并不完整。
  • 2.并且setnx也不具备分布式锁的一些高级特性,还是得通过我们手动构建。

2)创建一个redis锁

  • 1.在Redis中,可以通过使用SETNX命令来构建锁:rs.setnx(lock_name, uuid值)。
  • 2.而锁要做的事情就是将一个随机生成的128位UUID设置位键的值,防止该锁被其他进程获取。

3)释放锁

  • 1.锁的删除操作很简单,只需要将对应锁的key值获取到的uuid结果进行判断验证;
  • 2.符合条件(判断uuid值)通过delete在redis中删除即可,rs.delete(lockname);
  • 3.此外当其他用户持有同名锁时,由于uuid的不同,经过验证后不会错误释放掉别人的锁。

4)解决锁无法释放问题

  • 1.在之前的锁中,还出现这样的问题,比如某个进程持有锁之后突然程序崩溃,那么会导致锁无法释放;
  • 2.而其他进程无法持有锁继续工作,为了解决这样的问题,可以在获取锁的时候加上锁的超时功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import redis
import uuid
import time

# 1.初始化连接函数
def get_conn(host,port=6379):
rs = redis.Redis(host=host, port=port)
return rs

# 2. 构建redis锁
def acquire_lock(rs, lock_name, expire_time=10):
'''
rs: 连接对象
lock_name: 锁标识
acquire_time: 过期超时时间
return -> False 获锁失败 or True 获锁成功
'''
identifier = str(uuid.uuid4())
end = time.time() + expire_time
while time.time() < end:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if rs.setnx(lock_name, identifier): # 尝试取得锁
return identifier
# time.sleep(.001)
return False

# 3. 释放锁
def release_lock(rs, lockname, identifier):
'''
rs: 连接对象
lockname: 锁标识
identifier: 锁的value值,用来校验
'''
if rs.get(lockname).decode() == identifier: # 防止其他进程同名锁被误删
rs.delete(lockname)
return True # 删除锁
else:
return False # 删除失败

#有过期时间的锁
def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
'''
rs: 连接对象
lock_name: 锁标识
acquire_time: 过期超时时间
locked_time: 锁的有效时间
return -> False 获锁失败 or True 获锁成功
'''
identifier = str(uuid.uuid4())
end = time.time() + expire_time
while time.time() < end:
# 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
if rs.setnx(lock_name, identifier): # 尝试取得锁
# print('锁已设置: %s' % identifier)
rs.expire(lock_name, locked_time)
return identifier
time.sleep(.001)
return False


'''在业务函数中使用上面的锁'''
def sale(rs):
start = time.time() # 程序启动时间
with rs.pipeline() as p:
'''
通过管道方式进行连接
多条命令执行结束,一次性获取结果
'''

while 1:
lock = acquire_lock(rs, 'lock')
if not lock: # 持锁失败
continue

#开始监测"lock"
p.watch("lock")
try:
#开启事务
p.multi()
count = int(rs.get('apple')) # 取量
p.set('apple', count-1) # 减量
# time.sleep(5)
#提交事务
p.execute()
print('当前库存量: %s' % count)
#成功则跳出循环
break
except:
#事务失败对应处理
print("修改数据失败")

#无论成功与否最终都需要释放锁
finally:

res = release_lock(rs, 'lock', lock)
#释放锁成功,
if res:
print("删除锁成功")
#释放锁失败,强制删除
else:
print("删除锁失败,强制删除锁")
res = rs.delete('lock')
print(res)

print('[time]: %.2f' % (time.time() - start))

rs = redis.Redis(host='127.0.0.1', port=6379) # 连接redis
# rs.set('apple',1000) # # 首先在redis中设置某商品apple 对应数量value值为1000
sale(rs)

优化锁无法释放的问题,为锁添加过期时间

def acquire_expire_lock(rs, lock_name, expire_time=10, locked_time=10):
    '''
    rs: 连接对象
    lock_name: 锁标识
    acquire_time: 过期超时时间
    locked_time: 锁的有效时间
    return -> False 获锁失败 or True 获锁成功
    '''
    identifier = str(uuid.uuid4())
    end = time.time() + expire_time
    while time.time() < end:
        # 当获取锁的行为超过有效时间,则退出循环,本次取锁失败,返回False
        if rs.setnx(lock_name, identifier): # 尝试取得锁
            # print('锁已设置: %s' % identifier)
            rs.expire(lock_name, locked_time)
            return identifier
        time.sleep(.001)
    return False
0%