博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
理解Python并发编程-PoolExecutor篇
阅读量:7282 次
发布时间:2019-06-30

本文共 4550 字,大约阅读时间需要 15 分钟。

摘要: 之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。

之前我们使用多线程(threading)和多进程(multiprocessing)完成常规的需求,在启动的时候start、jon等步骤不能省,复杂的需要还要用1-2个队列。随着需求越来越复杂,如果没有良好的设计和抽象这部分的功能层次,代码量越多调试的难度就越大。有没有什么好的方法把这些步骤抽象一下呢,让我们不关注这些细节,轻装上阵呢?

答案是:有的。

从Python3.2开始一个叫做concurrent.futures被纳入了标准库,而在Python2它属于第三方的futures库,需要手动安装:

pip install futures```                                             这个模块中有2个类:ThreadPoolExecutor和ProcessPoolExecutor,也就是对threading和multiprocessing的进行了高级别的抽象,暴露出统一的接口,帮助开发者非常方便的实现异步调用:```pythonimport timefrom concurrent.futures import ProcessPoolExecutor, as_completedNUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() with ProcessPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)

感受下是不是很轻便呢?看一下花费的时间:

python fib_executor.pyfib(25) = 75025 fib(26) = 121393 fib(27) = 196418 fib(28) = 317811 fib(29) = 514229 fib(30) = 832040 fib(31) = 1346269 fib(32) = 2178309 fib(33) = 3524578 fib(34) = 5702887 fib(35) = 9227465 fib(36) = 14930352 fib(37) = 24157817 COST: 10.8920350075

除了用map,另外一个常用的方法是submit。如果你要提交的任务的函数是一样的,就可以简化成map。但是假如提交的任务函数是不一样的,或者执行的过程之可能出现异常(使用map执行过程中发现问题会直接抛出错误)就要用到submit:

from concurrent.futures import ThreadPoolExecutor, as_completedNUMBERS = range(30, 35) def fib(n): if n == 34: raise Exception("Don't do this") if n<= 2: return 1 return fib(n-1) + fib(n-2) with ThreadPoolExecutor(max_workers=3) as executor: future_to_num = {executor.submit(fib, num): num for num in NUMBERS} for future in as_completed(future_to_num): num = future_to_num[future] try: result = future.result() except Exception as e: print 'raise an exception: {}'.format(e) else: print 'fib({}) = {}'.format(num, result) with ThreadPoolExecutor(max_workers=3) as executor: for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result)

执一下:

python fib_executor_with_raise.pyfib(30) = 832040 fib(31) = 1346269 raise an exception: Don't do this fib(32) = 2178309 fib(33) = 3524578 Traceback (most recent call last): File "fib_executor_with_raise.py", line 28, in 
for num, result in zip(NUMBERS, executor.map(fib, NUMBERS)): File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 580, in map yield future.result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 400, in result return self.__get_result() File "/Library/Python/2.7/site-packages/concurrent/futures/_base.py", line 359, in __get_result reraise(self._exception, self._traceback) File "/Library/Python/2.7/site-packages/concurrent/futures/_compat.py", line 107, in reraise exec('raise exc_type, exc_value, traceback', { }, locals_) File "/Library/Python/2.7/site-packages/concurrent/futures/thread.py", line 61, in run result = self.fn(*self.args, **self.kwargs) File "fib_executor_with_raise.py", line 9, in fib raise Exception("Don't do this") Exception: Don't do this

可以看到,第一次捕捉到了异常,但是第二次执行的时候错误直接抛出来了。

上面说到的map,有些同学马上会说,这不是进程(线程)池的效果吗?看起来确实是的:

import timefrom multiprocessing.pool import PoolNUMBERS = range(25, 38) def fib(n): if n<= 2: return 1 return fib(n-1) + fib(n-2) start = time.time() pool = Pool(3) results = pool.map(fib, NUMBERS) for num, result in zip(NUMBERS, pool.map(fib, NUMBERS)): print 'fib({}) = {}'.format(num, result) print 'COST: {}'.format(time.time() - start)

好像代码量更小哟。好吧,看一下花费的时间:

 
 
python fib_pool.pyfib(25) = 75025fib(26) = 121393fib(27) = 196418fib(28) = 317811fib(29) = 514229fib(30) = 832040fib(31) = 1346269fib(32) = 2178309fib(33) = 3524578fib(34) = 5702887fib(35) = 9227465fib(36) = 14930352fib(37) = 24157817COST: 17.1342718601
 
 

WhatTF竟然花费了1.7倍的时间。为什么?

BTW,有兴趣的同学可以对比下ThreadPool和ThreadPoolExecutor,由于GIL的缘故,对比的差距一定会更多。

原理

我们就拿ProcessPoolExecutor介绍下它的原理,引用官方代码注释中的流程图:

|======================= In-process =====================|== Out-of-process ==| +----------+ +----------+ +--------+ +-----------+ +---------+ | | => | Work Ids | => | | => | Call Q | => | | | | +----------+ | | +-----------+ | | | | | ... | | | | ... | | | | | | 6 | | | | 5, call() | | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | | Executor | | Thread | | | | | +----------- + | | +-----------+ | | | | <=> | Work Items | <=> | | <= | Result Q | <= | | | | +------------+ | | +

转载于:https://www.cnblogs.com/jzy996492849/p/7054079.html

你可能感兴趣的文章
【GitHub】README.md文件中 markdown语法 插入超链接
查看>>
移动着,心就变了
查看>>
2014冬去春来
查看>>
Python全栈--6.1-match-search-findall-group(s)的区别以及计算器实例
查看>>
基本概念
查看>>
《Linux内核设计与实现》读书笔记(10)--- 定时器和时间管理(2)
查看>>
Spark On YARN内存分配
查看>>
Python学习笔记【第十三篇】:Python网络编程一Socket基础
查看>>
Hibernate ORM框架——项目一:Hibernate查询;项目二:集合相关查询
查看>>
Ionic2开发环境搭建
查看>>
ccf 最优灌溉
查看>>
(30)批处理文件.bat
查看>>
基于MFC和opencv的FFT
查看>>
0823模拟赛
查看>>
Ajax
查看>>
HDU 1849 Rabbit and Grass 【Nim博弈】
查看>>
JMeter-Java压力测试工具-01
查看>>
搜狐在线笔试 时间复杂度O(n)实现数组A[n]中所有元素循环左移k个位置
查看>>
写python时加入缩进设置
查看>>
ubuntu下安装opencv 2.4.9 脚本,支持摄像头和cuda
查看>>