并发和并行
并发(Concurrency):拥有交替处理多个任务的能力,但不是同时。通常适用于 I/O 密集型任务。
并行(Parallelism):拥有同时处理多个任务的能力。通常适用于 CPU 密集型任务。
锁机制 GIL(全局解释器锁) GIL(Global Interpreter Lock)全局解释器锁,是最流行的 Python 解释器 CPython (官方解释器)中引入的一个概念,它使得同一个进程下任何时刻 仅有一个线程在执行 ,无法利用多核优势。由于GIL的存在,导致Python多线程性能甚至比单线程更糟。
在 Cpython 中,每一个线程在开始执行时,都会获得 GIL 阻止别的线程执行,在线程遇到 I/O 等会引起阻塞状态的操作时,会释放 GIL 以允许其他线程开始利用资源。同时,CPython 中还有一个 间隔检查(check interval) 机制,即 CPython 解释器会轮询检查线程 GIL 锁的获得情况,每隔一段时间就会强制当前线程释放 GIL,使别的线程能有执行的机会。这样一来,用户看到的就是“伪”并行,即 Python 线程在交替执行,但并没有真正的并行。
线程抢占 GIL 的调度流程,如下图所示:
引入 GIL 的原因
设计者为了规避类似于内存管理这样复杂的条件竞争问题。
CPython 使用了大量 C 语言库,但大部分 C 语言库都不是原生线程安全的。
注:Python GIL 不能绝对保证线程安全 ,因为即便 GIL 仅允许一个 Python 线程执行,但 CPython 还有 check interval 这样的抢占机制。即 GIL 的设计,主要是为了方便 CPython 解释器层的编写者,而不是为了 Python 应用层的软件工程师。
如何绕过 GIL
绕过 CPython,使用 JPython 等其他解释器。
把关键的性能代码,用 C 语言来实现。
多核 CPU 可使用多进程并行的方式,替代多线程并发。
示例:多线程访问同一变量(未加锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import threadingimport time total = 10 def sale (): global total print ('售出第%s张票...' % (10 - total + 1 )) total -= 1 print ('剩余%s张票\n' % total) threads = []for i in range (10 ): t = threading.Thread(target=sale, args=()) threads.append(t)for t in threads: t.start()
互斥锁(同步锁) 互斥锁(Mutex),保证同一时间只能有一个线程修改共享数据,解决了上述io密集型场景产生的计算错误问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import threadingimport time total = 10 lock = threading.Lock()def sale (): global total lock.acquire() print ('正在售出第%s张票...' % (10 - total + 1 )) time.sleep(0.001 ) total -= 1 time.sleep(0.001 ) print ('剩余%s张票\n' % total) lock.release() threads = []for i in range (10 ): t = threading.Thread(target=sale, args=()) threads.append(t)for t in threads: t.start()
死锁 由于保护不同的数据应该使用不同的互斥锁,当有多个互斥锁存在的时候,就可能会导致死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import threadingimport time lockA = threading.Lock() lockB = threading.Lock()def foo (): lockA.acquire() print ('func foo acquire lockA' ) lockB.acquire() print ('func foo acquire lockB' ) lockA.release() print ('func foo release lockA' ) lockB.release() print ('func foo release lockB' )def bar (): lockB.acquire() print ('func bar acquire lockB' ) time.sleep(1 ) lockA.acquire() print ('func bar acquire lockA' ) lockB.release() print ('func bar release lockB' ) lockA.release() print ('func bar release lockA' )def run (): foo() bar()for i in range (5 ): t = threading.Thread(target=run, args=()) t.start()
递归锁(可重入锁) 递归锁(Recursive Mutex)可以被获取多次,并且只能被所有者释放,用于解决多个互斥锁死锁情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import threadingimport time rlock = threading.RLock()def foo (): rlock.acquire() print ('func foo acquire lock-1' ) rlock.acquire() print ('func foo acquire lock-2' ) rlock.release() print ('func foo release lock-2' ) rlock.release() print ('func foo release lock-1' )def bar (): rlock.acquire() print ('func bar acquire lock-1' ) time.sleep(1 ) rlock.acquire() print ('func bar acquire lock-2' ) rlock.release() print ('func bar release lock-2' ) rlock.release() print ('func bar release lock-1' )def run (): foo() bar()for i in range (5 ): t = threading.Thread(target=run, args=()) t.start()
信号量 信号量(Semaphore),用于多线程同步,限制线程的并发量。
mutex是semaphore的一种特殊情况(n=1时)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import threadingimport time sem = threading.Semaphore(5 ) def foo (): sem.acquire() time.sleep(2 ) print (threading.current_thread().name) sem.release()def bar (): with sem: time.sleep(2 ) print (threading.current_thread().name)for i in range (10 ): t = threading.Thread(target=foo, args=[]) t.start()