python concurrent.futures

python 因为其全局解释器锁 GIL 而无法通过线程实现真正的平行计算。这个论断我们不展开,但是有个概念我们要说明,IO 密集型 vs. 计算密集型。

IO 密集型:读取文件,读取网络套接字频繁。

计算密集型:大量消耗 CPU 的数学与逻辑运算,也就是我们这里说的平行计算。

而 concurrent.futures 模块,可以利用 multiprocessing 实现真正的平行计算。

核心原理是:concurrent.futures 会以子进程的形式,平行的运行多个 python 解释器,从而令 python 程序可以利用多核 CPU 来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个 CPU 内核。

 

 第一章 concurrent.futures 性能阐述

  • 最大公约数

这个函数是一个计算密集型的函数。

# -*- coding:utf-8 -*-
# 求最大公约数
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i

numbers = [
(
1963309, 2265973), (1879675, 2493670), (2030677, 3814172),
(
1551645, 2229620), (1988912, 4736670), (2198964, 7876293)
]

 

  • 不使用多线程 / 多进程
import time

start = time.time()
results
= list(map(gcd, numbers))
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

Took 2.507 seconds.

消耗时间是:2.507。

 

  • 多线程 ThreadPoolExecutor
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
pool
= ThreadPoolExecutor(max_workers=2)
results
= list(pool.map(gcd, numbers))
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

Took 2.840 seconds.

消耗时间是:2.840。

上面说过 gcd 是一个计算密集型函数,因为 GIL 的原因,多线程是无法提升效率的。同时,线程启动的时候,有一定的开销,与线程池进行通信,也会有开销,所以这个程序使用了多线程反而更慢了。

 

  • 多进程 ProcessPoolExecutor
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
pool
= ProcessPoolExecutor(max_workers=2)
results
= list(pool.map(gcd, numbers))
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

Took 1.861 seconds.

消耗时间:1.861。

在两个 CPU 核心的机器上运行多进程程序,比其他两个版本都快。这是因为,ProcessPoolExecutor 类会利用 multiprocessing 模块所提供的底层机制,完成下列操作:

1)把 numbers 列表中的每一项输入数据都传给 map。

2)用 pickle 模块对数据进行序列化,将其变成二进制形式。

3)通过本地套接字,将序列化之后的数据从煮解释器所在的进程,发送到子解释器所在的进程。

4)在子进程中,用 pickle 对二进制数据进行反序列化,将其还原成 python 对象。

5)引入包含 gcd 函数的 python 模块。

6)各个子进程并行的对各自的输入数据进行计算。

7)对运行的结果进行序列化操作,将其转变成字节。

8)将这些字节通过 socket 复制到主进程之中。

9)主进程对这些字节执行反序列化操作,将其还原成 python 对象。

10)最后,把每个子进程所求出的计算结果合并到一份列表之中,并返回给调用者。

multiprocessing 开销比较大,原因就在于:主进程和子进程之间通信,必须进行序列化和反序列化的操作。

 

 

第二章 concurrent.futures 源码分析

  • Executor

可以任务 Executor 是一个抽象类,提供了如下抽象方法 submit,map(上面已经使用过),shutdown。值得一提的是 Executor 实现了 __enter__ 和 __exit__ 使得其对象可以使用 with 操作符。关于上下文管理和 with 操作符详细请参看这篇博客http://www.cnblogs.com/kangoroo/p/7627167.html

ThreadPoolExecutor 和 ProcessPoolExecutor 继承了 Executor,分别被用来创建线程池和进程池的代码。

class Executor(object):
    """This is an abstract base class for concrete asynchronous executors."""
<span style="color: rgba(0, 0, 255, 1)">def</span> submit(self, fn, *args, **<span style="color: rgba(0, 0, 0, 1)">kwargs):
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">Submits a callable to be executed with the given arguments.

    Schedules the callable to be executed as fn(*args, **kwargs) and returns
    a Future instance representing the execution of the callable.

    Returns:
        A Future representing the given call.
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span>
    <span style="color: rgba(0, 0, 255, 1)">raise</span><span style="color: rgba(0, 0, 0, 1)"> NotImplementedError()

</span><span style="color: rgba(0, 0, 255, 1)">def</span> map(self, fn, *iterables, **<span style="color: rgba(0, 0, 0, 1)">kwargs):
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">Returns a iterator equivalent to map(fn, iter).

    Args:
        fn: A callable that will take as many arguments as there are
            passed iterables.
        timeout: The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.

    Returns:
        An iterator equivalent to: map(func, *iterables) but the calls may
        be evaluated out-of-order.

    Raises:
        TimeoutError: If the entire result iterator could not be generated
            before the given timeout.
        Exception: If fn(*args) raises for any values.
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(0, 0, 0, 1)">
    timeout </span>= kwargs.get(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">timeout</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">)
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> timeout <span style="color: rgba(0, 0, 255, 1)">is</span> <span style="color: rgba(0, 0, 255, 1)">not</span><span style="color: rgba(0, 0, 0, 1)"> None:
        end_time </span>= timeout +<span style="color: rgba(0, 0, 0, 1)"> time.time()

    fs </span>= [self.submit(fn, *args) <span style="color: rgba(0, 0, 255, 1)">for</span> args <span style="color: rgba(0, 0, 255, 1)">in</span> itertools.izip(*<span style="color: rgba(0, 0, 0, 1)">iterables)]

    </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> Yield must be hidden in closure so that the futures are submitted</span>
    <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)"> before the first iterator value is required.</span>
    <span style="color: rgba(0, 0, 255, 1)">def</span><span style="color: rgba(0, 0, 0, 1)"> result_iterator():
        </span><span style="color: rgba(0, 0, 255, 1)">try</span><span style="color: rgba(0, 0, 0, 1)">:
            </span><span style="color: rgba(0, 0, 255, 1)">for</span> future <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> fs:
                </span><span style="color: rgba(0, 0, 255, 1)">if</span> timeout <span style="color: rgba(0, 0, 255, 1)">is</span><span style="color: rgba(0, 0, 0, 1)"> None:
                    </span><span style="color: rgba(0, 0, 255, 1)">yield</span><span style="color: rgba(0, 0, 0, 1)"> future.result()
                </span><span style="color: rgba(0, 0, 255, 1)">else</span><span style="color: rgba(0, 0, 0, 1)">:
                    </span><span style="color: rgba(0, 0, 255, 1)">yield</span> future.result(end_time -<span style="color: rgba(0, 0, 0, 1)"> time.time())
        </span><span style="color: rgba(0, 0, 255, 1)">finally</span><span style="color: rgba(0, 0, 0, 1)">:
            </span><span style="color: rgba(0, 0, 255, 1)">for</span> future <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> fs:
                future.cancel()
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> result_iterator()

</span><span style="color: rgba(0, 0, 255, 1)">def</span> shutdown(self, wait=<span style="color: rgba(0, 0, 0, 1)">True):
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span><span style="color: rgba(128, 0, 0, 1)">Clean-up the resources associated with the Executor.

    It is safe to call this method several times. Otherwise, no other
    methods can be called after this one.

    Args:
        wait: If True then shutdown will not return until all running
            futures have finished executing and the resources used by the
            executor have been reclaimed.
    </span><span style="color: rgba(128, 0, 0, 1)">"""</span>
    <span style="color: rgba(0, 0, 255, 1)">pass</span>

<span style="color: rgba(0, 0, 255, 1)">def</span> <span style="color: rgba(128, 0, 128, 1)">__enter__</span><span style="color: rgba(0, 0, 0, 1)">(self):
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> self

</span><span style="color: rgba(0, 0, 255, 1)">def</span> <span style="color: rgba(128, 0, 128, 1)">__exit__</span><span style="color: rgba(0, 0, 0, 1)">(self, exc_type, exc_val, exc_tb):
    self.shutdown(wait</span>=<span style="color: rgba(0, 0, 0, 1)">True)
    </span><span style="color: rgba(0, 0, 255, 1)">return</span> False</pre>

下面我们以线程 ProcessPoolExecutor 的方式说明其中的各个方法。

 

  • map
map(self, fn, *iterables, **kwargs)

map 方法的实例我们上面已经实现过,值得注意的是,返回的 results 列表是有序的,顺序和 *iterables 迭代器的顺序一致。

这里我们使用 with 操作符,使得当任务执行完成之后,自动执行 shutdown 函数,而无需编写相关释放代码。

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
with ProcessPoolExecutor(max_workers
=2) as pool:
results
= list(pool.map(gcd, numbers))
print 'results: %s' % results
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

产出结果是:

results: [1, 5, 1, 5, 2, 3]
Took 1.617 seconds.

 

  • submit
submit(self, fn, *args, **kwargs)

submit 方法用于提交一个可并行的方法,submit 方法同时返回一个 future 实例。

future 对象标识这个线程 / 进程异步进行,并在未来的某个时间执行完成。future 实例表示线程 / 进程状态的回调。

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor

start = time.time()
futures
= list()
with ProcessPoolExecutor(max_workers
=2) as pool:
for pair in numbers:
future
= pool.submit(gcd, pair)
futures.append(future)
print 'results: %s' % [future.result() for future in futures]
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

产出结果是:

results: [1, 5, 1, 5, 2, 3]
Took 2.289 seconds.

 

  • future

submit 函数返回 future 对象,future 提供了跟踪任务执行状态的方法。比如判断任务是否执行中 future.running(),判断任务是否执行完成 future.done() 等等。

as_completed 方法传入 futures 迭代器和 timeout 两个参数

默认 timeout=None,阻塞等待任务执行完成,并返回执行完成的 future 对象迭代器,迭代器是通过 yield 实现的。 

timeout>0,等待 timeout 时间,如果 timeout 时间到仍有任务未能完成,不再执行并抛出异常 TimeoutError

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed

start = time.time()
with ProcessPoolExecutor(max_workers
=2) as pool:
futures
= [pool.submit(gcd, pair) for pair in numbers]
for future in futures:
print '执行中:%s, 已完成:%s' % (future.running(), future.done())
print '#### 分界线 ####'
for future in as_completed(futures, timeout=2):
print '执行中:%s, 已完成:%s' % (future.running(), future.done())
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

 

  • wait

wait 方法接会返回一个 tuple(元组),tuple 中包含两个 set(集合),一个是 completed(已完成的) 另外一个是 uncompleted(未完成的)。

使用 wait 方法的一个优势就是获得更大的自由度,它接收三个参数 FIRST_COMPLETED, FIRST_EXCEPTION 和 ALL_COMPLETE,默认设置为 ALL_COMPLETED。

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, Executor, as_completed, wait, ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION

start = time.time()
with ProcessPoolExecutor(max_workers
=2) as pool:
futures
= [pool.submit(gcd, pair) for pair in numbers]
for future in futures:
print '执行中:%s, 已完成:%s' % (future.running(), future.done())
print '#### 分界线 ####'
done, unfinished
= wait(futures, timeout=2, return_when=ALL_COMPLETED)
for d in done:
print '执行中:%s, 已完成:%s' % (d.running(), d.done())
print d.result()
end
= time.time()
print 'Took %.3f seconds.' % (end - start)

由于设置了 ALL_COMPLETED,所以 wait 等待所有的 task 执行完成,可以看到 6 个任务都执行完成了。

执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:False, 已完成:False
执行中:False, 已完成:False
#### 分界线 ####
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
执行中:False, 已完成:True
Took 1.518 seconds.

 

如果我们将配置改为 FIRST_COMPLETED,wait 会等待直到第一个任务执行完成,返回当时所有执行成功的任务。这里并没有做并发控制。

重跑,结构如下,可以看到执行了 2 个任务。

执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:True, 已完成:False
执行中:False, 已完成:False
执行中:False, 已完成:False
#### 分界线 ####
执行中:False, 已完成:True
执行中:False, 已完成:True
Took 1.517 seconds.