Python 线程池和进程池
Python3.2 之后,标准库里引入了concurrent.futures
模块,为异步调用提供了高级的接口。在此记录下我对其中的ThreadPoolExecutor
和ProcessPoolExecutor
类的学习和理解。
ThreadPoolExecutor
ThreadPoolExecutor
是Executor
类的子类。它有一个参数是max_workers
,指定了线程池中最多同时执行的线程数量。这个是最常用的参数,3.5 版本之后又增加了几个参数,但是不常用,可以去文档里查看。
1 个线程
开启 1 个线程访问 https://www.baidu.com/
from concurrent.futures import ThreadPoolExecutor
import requests
def get_headers(url):
return (url, requests.get(url).headers)
with ThreadPoolExecutor(1) as executor:
f = executor.submit(get_headers, 'https://www.baidu.com/')
print(f.result())
3 个线程
开启 3 个线程分别访问 https://www.baidu.com/、https://www.163.com/、https://www.qq.com/
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def get_headers(url):
return (url, requests.get(url).headers)
with ThreadPoolExecutor(3) as executor:
fs = [
executor.submit(get_headers, 'https://www.baidu.com/'),
executor.submit(get_headers, 'https://www.163.com/'),
executor.submit(get_headers, 'https://www.qq.com/'),
]
for f in as_completed(fs):
print(f.result())
也可以使用map
函数
from concurrent.futures import ThreadPoolExecutor
import requests
def get_headers(url):
return (url, requests.get(url).headers)
with ThreadPoolExecutor(3) as executor:
urls = [
'https://www.baidu.com/',
'https://www.163.com/',
'https://www.qq.com/',
]
for result in executor.map(get_headers, urls):
print(result)
需要注意的是,使用map
函数时,返回的结果的顺序和 Python 内置的 map 函数一样,是按照传递进去的顺序来的,并不是真正的它们完成的顺序。比如在上面的代码里,第一个返回的永远是 baidu 的请求。通过查看源码也印证了这一问题。
Executor.map
的源码
def map(self, fn, *iterables, timeout=None, chunksize=1):
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
先调用fs.reverse
转置列表,然后不断调用 pop 取最后一个的 result。
死锁
import time
from concurrent.futures import ThreadPoolExecutor
def a_work():
time.sleep(5)
print(b.result())
return 'a'
def b_work():
time.sleep(5)
print(a.result())
return 'b'
with ThreadPoolExecutor(2) as executor:
a = executor.submit(a_work)
b = executor.submit(b_work)
这个程序永远都不会运行结束,因为线程 a 在等待 b 的结果,b 在等待 a 的结果。其中的time.sleep(5)
非常关键,它可以保证这两个线程都开始运行了。
去掉它之后,可以发现程序居然可以运行结束了,但是却没有任何的输出。这其实就是第二个常见错误,只有调用在 future 对象上调用result()
函数之后,异常才可以被捕获。
异常
from concurrent.futures import ThreadPoolExecutor
def a_work():
print(after_b)
print(b.result())
return 'a'
def b_work():
print(a.result())
return 'b'
after_b = False
with ThreadPoolExecutor(2) as executor:
a = executor.submit(a_work)
b = executor.submit(b_work)
after_b = True
print(a.result(), b.result())
运行上面的代码,可以发现程序输出了一个False
,和一个异常:NameError: name 'b' is not defined
,这是因为 a 线程已经开始运行了,但是 b 还没有开始,即,submit 函数还没有返回,那么这个时候 after_b 的值仍然是 False,而且变量 b 还是未定义的。
我们之前的代码,之所以没有出现异常,就是因为没有调用 result 函数。下面的代码可以更直观的说明问题。
from concurrent.futures import ThreadPoolExecutor
def raise_exception():
raise ValueError('123')
with ThreadPoolExecutor(1) as executor:
a = executor.submit(raise_exception)
# a.result()
运行这段代码,不会有任何输出。把a.result()
这一行的注释取消,就可以看到异常:ValueError: 123
。
ProcessPoolExecutor
和ThreadPoolExecutor
的用法基本上类似,不同之处在于,只有可以被 pickle 库序列化的对象才可以被执行和返回。因为需要在不同的进程之间传递消息。这个类使用了 multiprocessing 库。
为什么要使用进程池和线程池?
在我的理解看来,线程和进程的创建是需要开销的,而之前创建的线程或进程在执行完了任务之后,可以先不销毁,继续用来执行以后的任务。还有一个优势就是方便对这些创建的进程和线程进行统一的管理。
参考资料
官方文档:https://docs.python.org/3/library/concurrent.futures.html