浅析 Python 中各种锁机制

并发和并行

  • 并发(Concurrency):拥有交替处理多个任务的能力,但不是同时。通常适用于 I/O 密集型任务。
  • 并行(Parallelism):拥有同时处理多个任务的能力。通常适用于 CPU 密集型任务。

锁机制

GIL(全局解释器锁)

GIL(Global Interpreter Lock)全局解释器锁,是最流行的 Python 解释器 CPython(官方解释器)中引入的一个概念,它使得同一个进程下任何时刻 仅有一个线程在执行,无法利用多核优势。由于GIL的存在,导致Python多线程性能甚至比单线程更糟。

在 Cpython 中,每一个线程在开始执行时,都会获得 GIL 阻止别的线程执行,在线程遇到 I/O 等会引起阻塞状态的操作时,会释放 GIL 以允许其他线程开始利用资源。同时,CPython 中还有一个 间隔检查(check interval) 机制,即 CPython 解释器会轮询检查线程 GIL 锁的获得情况,每隔一段时间就会强制当前线程释放 GIL,使别的线程能有执行的机会。这样一来,用户看到的就是“伪”并行,即 Python 线程在交替执行,但并没有真正的并行。

线程抢占 GIL 的调度流程,如下图所示:

GIL 工作流程示意图

引入 GIL 的原因

  1. 设计者为了规避类似于内存管理这样复杂的条件竞争问题。
  2. CPython 使用了大量 C 语言库,但大部分 C 语言库都不是原生线程安全的。

注:Python GIL 不能绝对保证线程安全,因为即便 GIL 仅允许一个 Python 线程执行,但 CPython 还有 check interval 这样的抢占机制。即 GIL 的设计,主要是为了方便 CPython 解释器层的编写者,而不是为了 Python 应用层的软件工程师。

如何绕过 GIL

  1. 绕过 CPython,使用 JPython 等其他解释器。
  2. 把关键的性能代码,用 C 语言来实现。
  3. 多核 CPU 可使用多进程并行的方式,替代多线程并发。

示例:多线程访问同一变量(未加锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading
import time

total = 10


def sale():
global total
print('售出第%s张票...' % (10 - total + 1))
# time.sleep(0.001) # 等待io操作
total -= 1
# time.sleep(0.001)
print('剩余%s张票\n' % total)


threads = []
for i in range(10):
t = threading.Thread(target=sale, args=())
threads.append(t)
for t in threads:
t.start()
# cpu密集型(注释掉io操作):结果符合预期(线程start前total-=1已经计算完成)
# io密集型:结果异常

互斥锁(同步锁)

互斥锁(Mutex),保证同一时间只能有一个线程修改共享数据,解决了上述io密集型场景产生的计算错误问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import threading
import time

total = 10
lock = threading.Lock()


def sale():
global total
lock.acquire()
print('正在售出第%s张票...' % (10 - total + 1))
time.sleep(0.001) # 等待io操作
total -= 1
time.sleep(0.001)
print('剩余%s张票\n' % total)
lock.release()


threads = []
for i in range(10):
t = threading.Thread(target=sale, args=())
threads.append(t)
for t in threads:
t.start()

死锁

由于保护不同的数据应该使用不同的互斥锁,当有多个互斥锁存在的时候,就可能会导致死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 发生死锁的情况
import threading
import time

lockA = threading.Lock()
lockB = threading.Lock()


def foo():
lockA.acquire()
print('func foo acquire lockA')
lockB.acquire()
print('func foo acquire lockB')
lockA.release()
print('func foo release lockA')
lockB.release()
print('func foo release lockB')


def bar():
lockB.acquire()
print('func bar acquire lockB')
time.sleep(1) # 等待io操作
lockA.acquire()
print('func bar acquire lockA')
lockB.release()
print('func bar release lockB')
lockA.release()
print('func bar release lockA')


def run():
foo()
bar()


for i in range(5):
t = threading.Thread(target=run, args=())
t.start()

递归锁(可重入锁)

递归锁(Recursive Mutex)可以被获取多次,并且只能被所有者释放,用于解决多个互斥锁死锁情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import threading
import time

rlock = threading.RLock()


def foo():
rlock.acquire()
print('func foo acquire lock-1')
rlock.acquire()
print('func foo acquire lock-2')
rlock.release()
print('func foo release lock-2')
rlock.release()
print('func foo release lock-1')


def bar():
rlock.acquire()
print('func bar acquire lock-1')
time.sleep(1) # 等待io操作
rlock.acquire()
print('func bar acquire lock-2')
rlock.release()
print('func bar release lock-2')
rlock.release()
print('func bar release lock-1')


def run():
foo()
bar()


for i in range(5):
t = threading.Thread(target=run, args=())
t.start()

信号量

信号量(Semaphore),用于多线程同步,限制线程的并发量。

mutex是semaphore的一种特殊情况(n=1时)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import threading
import time

sem = threading.Semaphore(5) # 每2秒打印5次线程名称


def foo():
sem.acquire()
time.sleep(2)
print(threading.current_thread().name)
sem.release()


def bar():
with sem: # 使用with语句
time.sleep(2)
print(threading.current_thread().name)


for i in range(10):
t = threading.Thread(target=foo, args=[])
t.start()

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!