logo
首页
标签
关于

Python 线程池和进程池

2019-04-11
python

Python3.2 之后,标准库里引入了concurrent.futures模块,为异步调用提供了高级的接口。在此记录下我对其中的ThreadPoolExecutor和ProcessPoolExecutor类的学习和理解。

ThreadPoolExecutor

ThreadPoolExecutor是Executor类的子类。它有一个参数是max_workers,指定了线程池中最多同时执行的线程数量。这个是最常用的参数,3.5 版本之后又增加了几个参数,但是不常用,可以去文档里查看。

1 个线程

开启 1 个线程访问 https://www.baidu.com/

from concurrent.futures import ThreadPoolExecutor

import requests

def get_headers(url):
    return (url, requests.get(url).headers)

with ThreadPoolExecutor(1) as executor:
    f = executor.submit(get_headers, 'https://www.baidu.com/')
    print(f.result())

3 个线程

开启 3 个线程分别访问 https://www.baidu.com/、https://www.163.com/、https://www.qq.com/

from concurrent.futures import ThreadPoolExecutor, as_completed

import requests

def get_headers(url):
    return (url, requests.get(url).headers)

with ThreadPoolExecutor(3) as executor:
    fs = [
        executor.submit(get_headers, 'https://www.baidu.com/'),
        executor.submit(get_headers, 'https://www.163.com/'),
        executor.submit(get_headers, 'https://www.qq.com/'),
    ]
    for f in as_completed(fs):
        print(f.result())

也可以使用map函数

from concurrent.futures import ThreadPoolExecutor

import requests

def get_headers(url):
    return (url, requests.get(url).headers)

with ThreadPoolExecutor(3) as executor:
    urls = [
        'https://www.baidu.com/',
        'https://www.163.com/',
        'https://www.qq.com/',
    ]
    for result in executor.map(get_headers, urls):
        print(result)

需要注意的是,使用map函数时,返回的结果的顺序和 Python 内置的 map 函数一样,是按照传递进去的顺序来的,并不是真正的它们完成的顺序。比如在上面的代码里,第一个返回的永远是 baidu 的请求。通过查看源码也印证了这一问题。

Executor.map的源码

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        # Yield must be hidden in closure so that the futures are submitted
        # before the first iterator value is required.
        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()

        return result_iterator()

先调用fs.reverse转置列表,然后不断调用 pop 取最后一个的 result。

死锁

import time
from concurrent.futures import ThreadPoolExecutor

def a_work():
    time.sleep(5)
    print(b.result())
    return 'a'

def b_work():
    time.sleep(5)
    print(a.result())
    return 'b'

with ThreadPoolExecutor(2) as executor:
    a = executor.submit(a_work)
    b = executor.submit(b_work)

这个程序永远都不会运行结束,因为线程 a 在等待 b 的结果,b 在等待 a 的结果。其中的time.sleep(5)非常关键,它可以保证这两个线程都开始运行了。

去掉它之后,可以发现程序居然可以运行结束了,但是却没有任何的输出。这其实就是第二个常见错误,只有调用在 future 对象上调用result()函数之后,异常才可以被捕获。

异常

from concurrent.futures import ThreadPoolExecutor

def a_work():
    print(after_b)
    print(b.result())
    return 'a'

def b_work():
    print(a.result())
    return 'b'

after_b = False
with ThreadPoolExecutor(2) as executor:
    a = executor.submit(a_work)
    b = executor.submit(b_work)
    after_b = True
    print(a.result(), b.result())

运行上面的代码,可以发现程序输出了一个False,和一个异常:NameError: name 'b' is not defined,这是因为 a 线程已经开始运行了,但是 b 还没有开始,即,submit 函数还没有返回,那么这个时候 after_b 的值仍然是 False,而且变量 b 还是未定义的。

我们之前的代码,之所以没有出现异常,就是因为没有调用 result 函数。下面的代码可以更直观的说明问题。

from concurrent.futures import ThreadPoolExecutor

def raise_exception():
    raise ValueError('123')

with ThreadPoolExecutor(1) as executor:
    a = executor.submit(raise_exception)
    # a.result()

运行这段代码,不会有任何输出。把a.result()这一行的注释取消,就可以看到异常:ValueError: 123。

ProcessPoolExecutor

和ThreadPoolExecutor的用法基本上类似,不同之处在于,只有可以被 pickle 库序列化的对象才可以被执行和返回。因为需要在不同的进程之间传递消息。这个类使用了 multiprocessing 库。

为什么要使用进程池和线程池?

在我的理解看来,线程和进程的创建是需要开销的,而之前创建的线程或进程在执行完了任务之后,可以先不销毁,继续用来执行以后的任务。还有一个优势就是方便对这些创建的进程和线程进行统一的管理。

参考资料

官方文档:https://docs.python.org/3/library/concurrent.futures.html