python进程池:multiprocessing.pool
在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool 可以提供指定数量的进程供用户调用,当有新的请求提交到 pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
例 1:使用进程池
#coding: utf-8 import multiprocessing import timedef func(msg):
print "msg:", msg
time.sleep(3)
print "end"if name == "main":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply_async(func, (msg,)) #维持执行的进程总数为 processes,当一个进程执行完毕后会添加新的进程进去<span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> pool.close() pool.join() </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束</span> <span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Sub-process(es) done.</span><span style="color: rgba(128, 0, 0, 1)">"</span></pre>
一次执行结果
1 2 3 4 5 6 7 8 9 10 | mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0 msg: hello 1 msg: hello 2 end msg: hello 3 end end end Sub-process(es) done. |
函数解释:
- apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]]) 是阻塞的(理解区别,看例 1 例 2 结果区别)
- close() 关闭 pool,使其不在接受新的任务。
- terminate() 结束工作进程,不在处理未完成的任务。
- join() 主进程阻塞,等待子进程的退出, join 方法要在 close 或 terminate 之后使用。
执行说明:创建一个进程池 pool,并设定进程的数量为 3,xrange(4) 会相继产生四个对象 [0, 1, 2, 4],四个对象被提交到 pool 中,因 pool 指定进程数为 3,所以 0、1、2 会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象 3,所以会出现输出“msg: hello 3”出现在 "end" 后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完 for 循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在 pool.join()处等待各个进程的结束。
例 2:使用进程池(阻塞)
#coding: utf-8 import multiprocessing import timedef func(msg):
print "msg:", msg
time.sleep(3)
print "end"if name == "main":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply(func, (msg,)) #维持执行的进程总数为 processes,当一个进程执行完毕后会添加新的进程进去<span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> pool.close() pool.join() </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束</span> <span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Sub-process(es) done.</span><span style="color: rgba(128, 0, 0, 1)">"</span></pre>
一次执行的结果
1 2 3 4 5 6 7 8 9 10 | msg: hello 0 end msg: hello 1 end msg: hello 2 end msg: hello 3 end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ Sub-process(es) done. |
例 3:使用进程池,并关注结果
import multiprocessing import timedef func(msg):
print "msg:", msg
time.sleep(3)
print "end"
return "done" + msgif name == "main":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(3):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg,)))
pool.close()
pool.join()
for res in result:
print ":::", res.get()
print "Sub-process(es) done."
一次执行结果
1 2 3 4 5 6 7 8 9 10 | msg: hello 0 msg: hello 1 msg: hello 2 end end end ::: donehello 0 ::: donehello 1 ::: donehello 2 Sub-process(es) done. |
注:get() 函数得出每个返回结果的值
例 4:使用多个进程池
#coding: utf-8 import multiprocessing import os, time, randomdef Lee():
print "\nRun task Lee-%s" %(os.getpid()) #os.getpid() 获取当前的进程的 ID
start = time.time()
time.sleep(random.random() * 10) #random.random() 随机生成 0-1 之间的小数
end = time.time()
print 'Task Lee, runs %0.2f seconds.' %(end - start)def Marlon():
print "\nRun task Marlon-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end=time.time()
print 'Task Marlon runs %0.2f seconds.' %(end - start)def Allen():
print "\nRun task Allen-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' %(end - start)def Frank():
print "\nRun task Frank-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' %(end - start)if name=='main':
function_list= [Lee, Marlon, Allen, Frank]
print "parent process %s" %(os.getpid())pool</span>=multiprocessing.Pool(4<span style="color: rgba(0, 0, 0, 1)">) </span><span style="color: rgba(0, 0, 255, 1)">for</span> func <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> function_list: pool.apply_async(func) </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中</span> <span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">Waiting for all subprocesses done...</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)"> pool.close() pool.join() </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束</span> <span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">All subprocesses done.</span><span style="color: rgba(128, 0, 0, 1)">'</span></pre>
一次执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | parent process 7704 Waiting for all subprocesses done... Run task Lee -6948 Run task Marlon -2896 Run task Allen -7304 Run task Frank -3052 Task Lee, runs 1.59 seconds. Task Marlon runs 8.48 seconds. Task Frank runs 15.68 seconds. Task Allen runs 18.08 seconds. All subprocesses done. |
#coding: utf-8 import multiprocessingdef m1(x):
print x * xif name == 'main':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
pool.map(m1, i_list)
一次执行结果
1 2 3 4 5 6 7 8 | 0 1 4 9 16 25 36 49 |
参考:http://www.dotblogs.com.tw/rickyteng/archive/2012/02/20/69635.aspx
问题:http://bbs.chinaunix.net/thread-4111379-1-1.html
#coding: utf-8 import multiprocessing import loggingdef create_logger(i):
print iclass CreateLogger(object):
def init(self, func):
self.func = funcif name == 'main':
ilist = range(10)cl </span>=<span style="color: rgba(0, 0, 0, 1)"> CreateLogger(create_logger) pool </span>=<span style="color: rgba(0, 0, 0, 1)"> multiprocessing.Pool(multiprocessing.cpu_count()) pool.map(cl.func, ilist) </span><span style="color: rgba(0, 0, 255, 1)">print</span> <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">hello------------></span><span style="color: rgba(128, 0, 0, 1)">"</span></pre>
一次执行结果
1 2 3 4 5 6 7 8 9 10 11 | 0 1 2 3 4 5 6 7 8 9 hello------------> |