day22 并发编程(上)
网络编程,了解网络相关的知识点并且要知道几乎所有网络的通信本质上都是通过socket模块实现。例如:网站、网络爬虫。
并发编程,提升代码执行的效率。原来代码执行需要20分钟,学习并发编程后可以加快到1分钟执行完毕。
今日课程目标:初步了解进程和线程并可以基于线程实现并发编程。
今日概要:
- 初识进程和线程
- 多线程开发
- 线程安全
- 线程锁
- 死锁
- 线程池
1. 进程和线程
先来了解下进程和线程。
类比:
一个工厂,至少有一个车间,一个车间中至少有一个工人,最终是工人在工作。
一个程序,至少有一个进程,一个进程中至少有一个线程,最终是线程在工作。
1
上述串行的代码示例就是一个程序,在使用python xx.py 运行时,内部就创建一个进程(主进程),在进程中创建了一个线程(主线程),由线程逐行运行代码。
进程和线程:
1 | 线程,是计算机中可以被cpu调度的最小单元(真正在工作)。 |
以前我们开发的程序中所有的行为都只能通过串行的形式运行,排队逐一执行,前面未完成,后面也无法继续。例如:
1 | import time |
1 | import time |
通过 进程 和 线程 都可以将 串行 的程序变为并发,对于上述示例来说就是同时下载三个视频,这样很短的时间内就可以下载完成。
1.1 多线程
基于多线程对上述串行示例进行优化:
- 一个工厂,创建一个车间,这个车间中创建 3个工人,并行处理任务。
- 一个程序,创建一个进程,这个进程中创建 3个线程,并行处理任务。
1 | import time |
1 | import time |
1.2 多进程
基于多进程对上述串行示例进行优化:
- 一个工厂,创建 三个车间,每个车间 一个工人(共3人),并行处理任务。
- 一个程序,创建 三个进程,每个进程 一个线程(共3人),并行处理任务。
1 | import time |
综上所述,大家会发现 多进程 的开销比 多线程 的开销大。哪是不是使用多线程要比多进程更好呀?
接下来,给大家再来介绍一个Python内置的GIL锁的知识,然后再根据 进程 和 线程 各自的特点总结各自适合应用场景。
1.3 GIL锁
GIL, 全局解释器锁(Global Interpreter Lock),是CPython解释器特有一个玩意,让一个进程中同一个时刻只能有一个线程可以被CPU调用。

如果程序想利用 计算机的多核优势,让CPU同时处理一些任务,适合用多进程开发(即使资源开销大)。

如果程序不利用 计算机的多核优势,适合用多线程开发。

常见的程序开发中,计算操作需要使用CPU多核优势,IO操作不需要利用CPU的多核优势,所以,就有这一句话:
- 计算密集型,用多进程,例如:大量的数据计算【累加计算示例】。
- IO密集型,用多线程,例如:文件读写、网络数据传输【下载抖音视频示例】。
累加计算示例(计算密集型):
串行处理
1
2
3
4
5
6
7
8
9
10
11
12import time
start = time.time()
result = 0
for i in range(100000000):
result += i
print(result)
end = time.time()
print("耗时:", end - start) # 耗时: 9.522780179977417多进程处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import time
import multiprocessing
def task(start, end, queue):
result = 0
for i in range(start, end):
result += i
queue.put(result)
if __name__ == '__main__':
queue = multiprocessing.Queue()
start_time = time.time()
p1 = multiprocessing.Process(target=task, args=(0, 50000000, queue))
p1.start()
p2 = multiprocessing.Process(target=task, args=(50000000, 100000000, queue))
p2.start()
v1 = queue.get(block=True)
v2 = queue.get(block=True)
print(v1 + v2)
end_time = time.time()
print("耗时:", end_time - start_time) # 耗时: 2.6232550144195557
当然,在程序开发中 多线程 和 多进程 是可以结合使用,例如:创建2个进程(建议与CPU个数相同),每个进程中创建3个线程。
1 | import multiprocessing |
2. 多线程开发
1 | import threading |
线程的常见方法:
t.start(),当前线程准备就绪(等待CPU调度,具体时间是由CPU来决定)。1
2
3
4
5
6
7
8
9
10
11
12
13
14import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
t = threading.Thread(target=_add,args=(loop,))
t.start()
print(number)t.join(),等待当前线程的任务执行完毕后再向下继续执行。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
t = threading.Thread(target=_add)
t.start()
t.join() # 主线程等待中...
print(number)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import threading
number = 0
def _add():
global number
for i in range(10000000):
number += 1
def _sub():
global number
for i in range(10000000):
number -= 1
t1 = threading.Thread(target=_add)
t2 = threading.Thread(target=_sub)
t1.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.start()
t2.join() # t2线程执行完毕,才继续往后走
print(number)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)t.setDaemon(布尔值),守护线程(必须放在start之前)t.setDaemon(True),设置为守护线程,主线程执行完毕后,子线程也自动关闭。t.setDaemon(False),设置为非守护线程,主线程等待子线程,子线程执行完毕后,主线程才结束。(默认)
1
2
3
4
5
6
7
8
9
10
11
12import threading
import time
def task(arg):
time.sleep(5)
print('任务')
t = threading.Thread(target=task, args=(11,))
t.setDaemon(True) # True/False
t.start()
print('END')线程名称的设置和获取
1
2
3
4
5
6
7
8
9
10
11
12
13import threading
def task(arg):
# 获取当前执行此代码的线程
name = threading.current_thread().getName()
print(name)
for i in range(10):
t = threading.Thread(target=task, args=(11,))
t.setName('日魔-{}'.format(i))
t.start()自定义线程类,直接将线程需要做的事写到run方法中。
1
2
3
4
5
6
7
8
9
10import threading
class MyThread(threading.Thread):
def run(self):
print('执行此线程', self._args)
t = MyThread(args=(100,))
t.start()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import requests
import threading
class DouYinThread(threading.Thread):
def run(self):
file_name, video_url = self._args
res = requests.get(video_url)
with open(file_name, mode='wb') as f:
f.write(res.content)
url_list = [
("东北F4模仿秀.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0300f570000bvbmace0gvch7lo53oog"),
("卡特扣篮.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f3e0000bv52fpn5t6p007e34q1g"),
("罗斯mvp.mp4", "https://aweme.snssdk.com/aweme/v1/playwm/?video_id=v0200f240000buuer5aa4tij4gv6ajqg")
]
for item in url_list:
t = DouYinThread(args=(item[0], item[1]))
t.start()
3. 线程安全
一个进程中可以有多个线程,且线程共享所有进程中的资源。
多个线程同时去操作一个”东西”,可能会存在数据混乱的情况,例如:
示例1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
loop = 10000000
number = 0
def _add(count):
global number
for i in range(count):
number += 1
def _sub(count):
global number
for i in range(count):
number -= 1
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34import threading
lock_object = threading.RLock()
loop = 10000000
number = 0
def _add(count):
lock_object.acquire() # 加锁
global number
for i in range(count):
number += 1
lock_object.release() # 释放锁
def _sub(count):
lock_object.acquire() # 申请锁(等待)
global number
for i in range(count):
number -= 1
lock_object.release() # 释放锁
t1 = threading.Thread(target=_add, args=(loop,))
t2 = threading.Thread(target=_sub, args=(loop,))
t1.start()
t2.start()
t1.join() # t1线程执行完毕,才继续往后走
t2.join() # t2线程执行完毕,才继续往后走
print(number)示例2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14import threading
num = 0
def task():
global num
for i in range(1000000):
num += 1
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import threading
num = 0
lock_object = threading.RLock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18import threading
num = 0
lock_object = threading.RLock()
def task():
print("开始")
with lock_object: # 基于上下文管理,内部自动执行 acquire 和 release
global num
for i in range(1000000):
num += 1
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
在开发的过程中要注意有些操作默认都是 线程安全的(内部集成了锁的机制),我们在使用的时无需再通过锁再处理,例如:
1 | import threading |

所以,要多注意看一些开发文档中是否标明线程安全。
4. 线程锁
在程序中如果想要自己手动加锁,一般有两种:Lock 和 RLock。
Lock,同步锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21import threading
num = 0
lock_object = threading.Lock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()RLock,递归锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import threading
num = 0
lock_object = threading.RLock()
def task():
print("开始")
lock_object.acquire() # 第1个抵达的线程进入并上锁,其他线程就需要再此等待。
global num
for i in range(1000000):
num += 1
lock_object.release() # 线程出去,并解开锁,其他线程就可以进入并执行了
print(num)
for i in range(2):
t = threading.Thread(target=task)
t.start()
RLock支持多次申请锁和多次释放;Lock不支持。例如:
1 | import threading |
1 | import threading |
5.死锁
死锁,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象。
1 | import threading |
1 | import threading |
6.线程池
Python3中官方才正式提供线程池。
线程不是开的越多越好,开的多了可能会导致系统的性能更低了,例如:如下的代码是不推荐在项目开发中编写。
不建议:无限制的创建线程。
1 | import threading |
建议:使用线程池
示例1:
1 | import time |
示例2:等待线程池的任务执行完毕。
1 | import time |
示例3:任务执行完任务,再干点其他事。
1 | import time |
示例4:最终统一获取结果。
1 | import time |
案例:基于线程池下载豆瓣图片。
1 | 26044585,Hush,https://hbimg.huabanimg.com/51d46dc32abe7ac7f83b94c67bb88cacc46869954f478-aP4Q3V |
1 | import os |
1 | import os |
7.单例模式(扩展)
面向对象 + 多线程相关的一个面试题(以后项目和源码中会用到)。
之前写一个类,每次执行 类() 都会实例化一个类的对象。
1 | class Foo: |
以后开发会遇到单例模式,每次实例化类的对象时,都是最开始创建的那个对象,不再重复创建对象。
简单的实现单例模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
# 返回空对象
if cls.instance:
return cls.instance
cls.instance = object.__new__(cls)
return cls.instance
obj1 = Singleton('alex')
obj2 = Singleton('SB')
print(obj1,obj2)多线程执行单例模式,有BUG
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27import threading
import time
class Singleton:
instance = None
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
if cls.instance:
return cls.instance
time.sleep(0.1)
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton('x')
print(obj)
for i in range(10):
t = threading.Thread(target=task)
t.start()加锁解决BUG
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import threading
import time
class Singleton:
instance = None
lock = threading.RLock()
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
with cls.lock:
if cls.instance:
return cls.instance
time.sleep(0.1)
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton('x')
print(obj)
for i in range(10):
t = threading.Thread(target=task)
t.start()加判断,提升性能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import threading
import time
class Singleton:
instance = None
lock = threading.RLock()
def __init__(self, name):
self.name = name
def __new__(cls, *args, **kwargs):
if cls.instance:
return cls.instance
with cls.lock:
if cls.instance:
return cls.instance
time.sleep(0.1)
cls.instance = object.__new__(cls)
return cls.instance
def task():
obj = Singleton('x')
print(obj)
for i in range(10):
t = threading.Thread(target=task)
t.start()
# 执行1000行代码
data = Singleton('asdfasdf')
print(data)
总结
- 进程和线程的区别和应用场景。
- 什么是GIL锁
- 多线程和线程池的使用。
- 线程安全 & 线程锁 & 死锁
- 单例模式
作业
简述进程和线程的区别以及应用场景。
什么是GIL锁
手写单例模式
程序从flag a执行到falg b的时间大致是多少秒?
1
2
3
4
5
6
7
8
9import threading
import time
def _wait():
time.sleep(60)
# flag a
t = threading.Thread(target=_wait)
t.setDaemon(False)
t.start()
# flag b程序从flag a执行到falg b的时间大致是多少秒?
1
2
3
4
5
6
7
8
9import threading
import time
def _wait():
time.sleep(60)
# flag a
t = threading.Thread(target=_wait)
t.setDaemon(True)
t.start()
# flag b程序从flag a执行到falg b的时间大致是多少秒?
1
2
3
4
5
6
7
8
9import threading
import time
def _wait():
time.sleep(60)
# flag a
t = threading.Thread(target=_wait)
t.start()
t.join()
# flag b读程序,请确认执行到最后number是否一定为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17import threading
loop = int(1E7)
def _add(loop = 1):
global number
for _ in range(loop):
number += 1
def _sub(loop = 1):
global number
for _ in range(loop):
number -= 1
number = 0
ta = threading.Thread(target=_add,args=(loop,))
ts = threading.Thread(target=_sub,args=(loop,))
ta.start()
ta.join()
ts.start()
ts.join()读程序,请确认执行到最后number是否一定为0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19import threading
loop = int(1E7)
number = 0
def _add(loop = 1):
global number
for _ in range(loop):
number += 1
def _sub(loop = 1):
global number
for _ in range(loop):
number -= 1
ta = threading.Thread(target=_add,args=(loop,))
ts = threading.Thread(target=_sub,args=(loop,))
ta.start()
ts.start()
ta.join()
ts.join()data.txt 文件中共有 10000 条数据,请为每 100行 数据创建一个线程,并在线程中把当前100条数据的num列相加并打印。
1
2
3
4
5
6
7subscription_id,erotic,num
ASDFOKASDJF423KASDFJASDF,5,1
FSD23R23SFSDF4DFGDFGDFGDF,5,99
ASDDSFGWERTCERT44GFGDSFG,5,2
FFFFFFSDSVFG5RTFGDDFFFFA,5,11
ASDFASDF3234XCVWEGDFGSAF,5,1
...