python并发编程之多进程(实践篇)

一 multiprocessing 模块介绍

python 中的多线程无法利用多核优势,如果想要充分地使用多核 CPU 的资源,在 python 中大部分情况需要使用多进程。Python 提供了 multiprocessing。

multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),multiprocessing 模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了 Process、Queue、Pipe、Lock 等组件。

与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内 

 

二 Process 类的介绍与使用

 

  创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)

强调:

  1. 需要使用关键字的方式来指定参数
  2. args 指定的为传给 target 函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:

复制代码
1 group 参数未使用,值始终为 None
2 target 表示调用对象,即子进程要执行的任务
3 args 表示调用对象的位置参数元组,args=(1,2,'anne',)
4 kwargs 表示调用对象的字典,kwargs={'name':'anne','age':18}
5 name 为子进程的名称
复制代码

创建并开启进程的两种方法

#方法一 直接调用
import
time import random from multiprocessing import Process def run(name): print('%s runing' %name) time.sleep(random.randrange(1,5)) print('%s running end' %name)

p1=Process(target=run,args=('anne',)) #必须加, 号
p2=Process(target=run,args=('alice',))
p3
=Process(target=run,args=('biantai',))
p4
=Process(target=run,args=('haha',))

p1.start()
p2.start()
p3.start()
p4.start()
print('主线程')

 

#方法二 继承式调用
import time
import random
from multiprocessing import Process

class Run(Process):
def init(self,name):
super().
init()
self.name
=name
def run(self):
print('%s runing' %self.name)
time.sleep(random.randrange(
1,5))
print('%s runing end' %self.name)

p1=Run('anne')
p2
=Run('alex')
p3
=Run('ab')
p4
=Run('hey')
p1.start()
#start 会自动调用 run
p2.start()
p3.start()
p4.start()
print('主线程')

Process 对象的 join 方法

import time
import random
from multiprocessing import Process

class Run(Process):
def init(self,name):
super().
init()
self.name
=name
def run(self):
print('%s runing' %self.name)
time.sleep(random.randrange(
1,5))
print('%s runing end' %self.name)

p1=Run('anne')
p2
=Run('alex')
p3
=Run('ab')
p4
=Run('hey')
p1.start()
#start 会自动调用 run
p2.start()
p3.start()
p4.start()
p1.join()
#等待 p1 进程停止
p2.join()
p3.join()
p4.join()
print('主线程')

#注意上面的代码是主进程等待子进程,等待的是主进程,所以等待的总时间是子进程中耗费时间最长的那个进程运行的时间

#上述启动进程与 join 进程可以简写为
#
p_l=[p1,p2,p3,p4]
#

#
for p in p_l:
#
p.start()
#

#
for p in p_l:
#
p.join()

主进程等,等待子进程结束

三 守护进程

主进程创建守护进程

1)守护进程会在主进程代码执行结束后就终止

2)守护进程内无法再开启子进程, 否则抛出异常:AssertionError: daemonic processes are not allowed to have children

注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

from multiprocessing import Process
import time
import random

class Run(Process):
def init(self,name):
self.name
=name
super().
init()
def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randrange(
1,3))
print('%s is piao end' %self.name)

p=Run('anne')
p.daemon
=True #一定要在 p.start() 前设置, 设置 p 为守护进程, 禁止 p 创建子进程, 并且父进程代码执行结束,p 即终止运行
p.start()
print('')

设置守护进程
#主进程代码运行完毕, 守护进程就会结束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")

def bar():
print(456)
time.sleep(
3)
print("end456")

p1=Process(target=foo)
p2
=Process(target=bar)

p1.daemon=True
p1.start()
p2.start()
print("main-------") #打印该行则主进程代码结束, 则守护进程 p1 应该被终止, 可能会有 p1 任务执行的打印信息 123, 因为主进程打印 main---- 时,p1 也执行了, 但是随即被终止

守护进程生命周期

只是守护进程结束,非守护进程不受影响

四 进程同步 (锁)

进程之间数据不共享, 但是共享同一套文件系统, 所以访问同一个文件, 或同一个打印终端, 是没有问题的,

而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

例 1:多个进程共享同一打印终端

#并发运行, 效率高, 但竞争同一打印终端, 带来了打印错乱
from multiprocessing import Process
import os,time
def work():
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())

if name == 'main':
for i in range(3):
p
=Process(target=work)
p.start()

不加锁
#由并发变成了串行, 牺牲了运行效率, 但避免了竞争
from multiprocessing import Process,Lock
import os,time
def work(lock):
    lock.acquire()
    print('%s is running' %os.getpid())
    time.sleep(2)
    print('%s is done' %os.getpid())lock.release()
if __name__ == '__main__':
    lock=Lock()
    for i in range(3):
        p=Process(target=work,args=(lock,))
        p.start()
加锁

例 2: 多个进程共享同一文件

文件当数据库,模拟抢票

#文件 db 的内容为:{"count":1}
#注意一定要用双引号,不然 json 无法识别
#并发运行,效率高,但竞争写同一文件,数据写入错乱
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m 剩余票数 %s\033[0m' %dic['count'])

def get():
dic
=json.load(open('db.txt'))
time.sleep(
0.1) #模拟读数据的网络延迟
if dic['count'] >0:
dic[
'count']-=1
time.sleep(
0.2) #模拟写数据的网络延迟
json.dump(dic,open('db.txt','w'))
print('\033[43m 购票成功 \033[0m')

def task(lock):
search()
get()
if name == 'main':
lock
=Lock()
for i in range(100): #模拟并发 100 个客户端抢票
p=Process(target=task,args=(lock,))
p.start()

不加锁
#文件 db 的内容为:{"count":1}
#注意一定要用双引号,不然 json 无法识别
#购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m 剩余票数 %s\033[0m' %dic['count'])

def get():
dic
=json.load(open('db.txt'))
time.sleep(
0.1) #模拟读数据的网络延迟
if dic['count'] >0:
dic[
'count']-=1
time.sleep(
0.2) #模拟写数据的网络延迟
json.dump(dic,open('db.txt','w'))
print('\033[43m 购票成功 \033[0m')

def task(lock):
search()
lock.acquire()
#获取锁
get()
lock.release()
#释放锁
if name == 'main':
lock
=Lock()
for i in range(100): #模拟并发 100 个客户端抢票
p=Process(target=task,args=(lock,))
p.start()

加锁


加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

五 进程间通信

虽然可以用文件共享数据实现进程间通信,但问题是:

1)效率低(共享数据基于文件,而文件是硬盘上的数据) 2)需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:1)效率高(多个进程共享一块内存的数据)2)帮我们处理好锁问题。

mutiprocessing 模块为我们提供的基于消息的 IPC 通信机制:队列和管道。

1 队列和管道都是将数据存放于内存中

2 队列又是基于(管道 + 锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性

1. 队列(推荐使用)

创建队列的类(底层就是以管道和锁定的方式实现)

 Queue([maxsize]): 创建共享的进程队列,Queue 是多进程安全的队列,可以使用 Queue 实现多进程之间的数据传递。 

 参数介绍:

 maxsize 是队列中允许最大项数,省略则无大小限制。    

应用:

“‘multiprocessing 模块支持进程间通信的两种主要形式: 管道和队列
都是基于消息传递实现的, 但是队列接口
'''

from multiprocessing import Process,Queue
import time
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了

队列的使用

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
for i in range(10):
time.sleep(random.randint(
1,3))
res
='包子 %s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if name == 'main':
q
=Queue()
#生产者们: 即厨师们
p1=Process(target=producer,args=(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">消费者们:即吃货们</span>
c1=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">开始</span>

p1.start()
c1.start()
print('')

基于队列的生产者消费者模型

此时的问题是主进程永远不会结束,原因是:生产者 p 在生产完后就结束了,但是消费者 c 在取空了 q 之后,则一直处于死循环中且卡在 q.get() 这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以 break 出死循环。

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
for i in range(10):
time.sleep(random.randint(
1,3))
res
='包子 %s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
q.put(None)
#发送结束信号
if name == 'main':
q
=Queue()
#生产者们: 即厨师们
p1=Process(target=producer,args=(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">消费者们:即吃货们</span>
c1=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">开始</span>

p1.start()
c1.start()
print('')

生产者在生产完毕后发送结束信号 None

注意:结束信号 None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
for i in range(2):
time.sleep(random.randint(
1,3))
res
='包子 %s' %i
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if name == 'main':
q
=Queue()
#生产者们: 即厨师们
p1=Process(target=producer,args=(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">消费者们:即吃货们</span>
c1=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">开始</span>

p1.start()
c1.start()

p1.join()
q.put(None) </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">发送结束信号</span>
<span style="color: rgba(0, 0, 255, 1)">print</span>(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">主</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">)

主进程在生产者生产完毕后发送结束信号 None

主进程在生产者生产完毕后发送结束信号 None

但上述解决方式,在有多个生产者和多个消费者时,应该怎么做呢?有几个消费者就发几次信号?

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到结束信号则结束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
for i in range(2):
time.sleep(random.randint(
1,3))
res
='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))

if name == 'main':
q
=Queue()
#生产者们: 即厨师们
p1=Process(target=producer,args=('包子',q))
p2
=Process(target=producer,args=('骨头',q))
p3
=Process(target=producer,args=('泔水',q))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">消费者们:即吃货们</span>
c1=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))
c2</span>=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">开始</span>

p1.start()
p2.start()
p3.start()
c1.start()

p1.join() </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">必须保证生产者全部生产完毕,才应该发送结束信号</span>

p2.join()
p3.join()
q.put(None)
#有几个消费者就应该发送几次结束信号 None
q.put(None) #发送结束信号
print('')

有几个消费者就应该发送几次结束信号 None

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

1
2
3
4
5
6
7
8
#JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
 
   #参数介绍:
    maxsize是队列中允许最大项数,省略则无大小限制。   
  #方法介绍:
    JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))
    q.task_done() </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">向q.join()发送一次信号,证明一个数据已经被取走了</span>

def producer(name,q):
for i in range(10):
time.sleep(random.randint(
1,3))
res
='%s%s' %(name,i)
q.put(res)
print('\033[44m%s 生产了 %s\033[0m' %(os.getpid(),res))
q.join()

if name == 'main':
q
=JoinableQueue()
#生产者们: 即厨师们
p1=Process(target=producer,args=('包子',q))
p2
=Process(target=producer,args=('骨头',q))
p3
=Process(target=producer,args=('泔水',q))

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">消费者们:即吃货们</span>
c1=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))
c2</span>=Process(target=consumer,args=<span style="color: rgba(0, 0, 0, 1)">(q,))
c1.daemon</span>=<span style="color: rgba(0, 0, 0, 1)">True
c2.daemon</span>=<span style="color: rgba(0, 0, 0, 1)">True

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">开始</span>
p_l=<span style="color: rgba(0, 0, 0, 1)">[p1,p2,p3,c1,c2]
</span><span style="color: rgba(0, 0, 255, 1)">for</span> p <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> p_l:
    p.start()

p1.join()
p2.join()
p3.join()
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">主</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">) 

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">主进程等---&gt;p1,p2,p3等----&gt;c1,c2</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据</span>
<span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程</span></pre>
多消费者多生产者

2. 管道

创建管道的类:

Pipe([duplex]): 在进程之间创建一条管道,并返回元组(conn1,conn2), 其中 conn1,conn2 表示管道两端的连接对象,强调一点:必须在产生 Process 对象之前产生管道

参数介绍:

dumplex: 默认管道是全双工的,如果将 duplex 射成 False,conn1 只能用于接收,conn2 只能用于发送。

from multiprocessing import Process,Pipe

import time,os
def consumer(p,name):
left,right
=p
left.close()
while True:
try:
baozi
=right.recv()
print('%s 收到包子:%s' %(name,baozi))
except EOFError:
right.close()
break
def producer(seq,p):
left,right
=p
right.close()
for i in seq:
left.send(i)
# time.sleep(1)
else:
left.close()
if name == 'main':
left,right
=Pipe()

c1</span>=Process(target=consumer,args=((left,right),<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">c1</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">))
c1.start()


seq</span>=(i <span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span> range(10<span style="color: rgba(0, 0, 0, 1)">))
producer(seq,(left,right))

right.close()
left.close()

c1.join()
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">主进程</span><span style="color: rgba(128, 0, 0, 1)">'</span>)</pre>
基于管道实现进程间通信(与队列的方式是类似的,队列就是管道加锁实现的)

注意:生产者和消费者都没有使用管道的某个端点,就应该将其关闭,如在生产者中关闭管道的右端,在消费者中关闭管道的左端。如果忘记执行这些步骤,程序可能再消费者中的 recv() 操作上挂起。管道是由操作系统进行引用计数的, 必须在所有进程中关闭管道后才能生产 EOFError 异常。因此在生产者中关闭管道不会有任何效果,付费消费者中也关闭了相同的管道端点。

管道可以用于双向通信,利用通常在客户端 / 服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序

from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
server,client
=p
client.close()
while True:
try:
x,y
=server.recv()
except EOFError:
server.close()
break
res
=x+y
server.send(res)
print('server done')
if name == 'main':
server,client
=Pipe()

c1</span>=Process(target=adder,args=((server,client),<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">c1</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">))
c1.start()

server.close()

client.send((</span>10,20<span style="color: rgba(0, 0, 0, 1)">))
</span><span style="color: rgba(0, 0, 255, 1)">print</span><span style="color: rgba(0, 0, 0, 1)">(client.recv())
client.close()

c1.join()
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">主进程</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">)

#注意:send()和 recv() 方法使用 pickle 模块对对象进行序列化。

管道用于双向通信

 

3. 共享数据

展望未来,基于消息传递的并发编程是大势所趋

即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合

通过消息队列交换数据。这样极大地减少了对使用锁定和其他同步手段的需求,

还可以扩展到分布式系统中

进程间通信应该尽量避免使用本节所讲的共享数据的方式

进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过 Manager 实现数据共享
from multiprocessing import Manager,Process,Lock
import os
def work(d,lock):
    # with lock: #不加锁而操作共享的数据, 肯定会出现数据错乱
        d['count']-=1

if name == 'main':
lock
=Lock()
with Manager() as m:
dic
=m.dict({'count':100})
p_l
=[]
for i in range(100):
p
=Process(target=work,args=(dic,lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print(dic)
#{'count': 94}

Manger 实现进程间数据共享

4. 信号量 ( 了解)

互斥锁 同时只允许一个线程更改数据,而 Semaphore 是同时允许一定数量的线程更改数据 ,比如厕所有 3 个坑,那最多只允许 3 个人上厕所,后面的人只能等里面有人出来了才能再进去,如果指定信号量为 3,那么来一个人获得一把锁,计数加 1,当计数等于 3 时,后面的人均需要等待。一旦释放,就有人可以获得一把锁
信号量与进程池的概念很像,但是要区分开,信号量涉及到加锁的概念

from multiprocessing import Process,Semaphore
import time,random

def go_wc(sem,user):
sem.acquire()
print('%s 占到一个茅坑' %user)
time.sleep(random.randint(0,
3)) #模拟每个人拉屎速度不一样,0 代表有的人蹲下就起来了
sem.release()

if name == 'main':
sem
=Semaphore(5)
p_l
=[]
for i in range(13):
p
=Process(target=go_wc,args=(sem,'user%s' %i,))
p.start()
p_l.append(p)

</span><span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> p_l:
    i.join()
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">============》</span><span style="color: rgba(128, 0, 0, 1)">'</span>)</pre>
同线程一样

5. 事件 (了解)

python 线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

clear:将“Flag”设置为 False
set:将“Flag”设置为 True

#coding:utf-8
#
!/usr/bin/env python

from multiprocessing import Process,Event
import time,random

def car(e,n):
while True:
if not e.is_set(): #Flase
print('\033[31m 红灯亮 \033[0m,car%s 等着' %n)
e.wait()
print('\033[32m 车 %s 看见绿灯亮了 \033[0m' %n)
time.sleep(random.randint(
3,6))
if not e.is_set():
continue
print('走你,car', n)
break

def police_car(e,n):
while True:
if not e.is_set():
print('\033[31m 红灯亮 \033[0m,car%s 等着' % n)
e.wait(
1)
print('灯的是 %s,警车走了,car %s' %(e.is_set(),n))
break

def traffic_lights(e,inverval):
while True:
time.sleep(inverval)
if e.is_set():
e.clear()
#e.is_set() ---->False
else:
e.set()

if name == 'main':
e
=Event()
# for i in range(10):
# p=Process(target=car,args=(e,i,))
# p.start()

<span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span> range(5<span style="color: rgba(0, 0, 0, 1)">):
    p </span>= Process(target=police_car, args=<span style="color: rgba(0, 0, 0, 1)">(e, i,))
    p.start()
t</span>=Process(target=traffic_lights,args=(e,10<span style="color: rgba(0, 0, 0, 1)">))
t.start()

</span><span style="color: rgba(0, 0, 255, 1)">print</span>(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">============》</span><span style="color: rgba(128, 0, 0, 1)">'</span>)</pre>
红绿灯(同线程一样)

6. 进程池

在利用 Python 进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

1)很明显需要并发执行的任务通常要远大于核数

2)一个操作系统不可能无限开启进程,通常有几个核就开几个进程

3)进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

我们就可以通过维护一个进程池来控制进程数目,比如 httpd 的进程模式,规定最小进程数和最大进程数... 

对于远程过程调用的高级应用程序而言,应该使用进程池,Pool 可以提供指定数量的进程,供用户调用,当有新的请求提交到 pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

创建进程池的类:如果指定 numprocess 为 3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程

1 Pool([numprocess  [,initializer [, initargs]]]): 创建进程池 

 参数介绍:

1 numprocess: 要创建的进程数,如果省略,将默认使用 cpu_count() 的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为 None
3 initargs:是要传给 initializer 的参数组
 

主要方法:

1 p.apply(func [, args [, kwargs]])
在一个池工作进程中执行 func(*args,**kwargs), 然后返回结果。
需要强调的是:此操作并不会在所有池工作进程中并执行 func 函数。如果要通过不同参数并发地执行 func 函数,必须从不同线程调用 p.apply()函数或者使用 p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):
在一个池工作进程中执行 func(*args,**kwargs), 然后返回结果。
此方法的结果是 AsyncResult 类的实例,callback 是可调用对象,接收输入参数。当 func 的结果变为可用时,
将理解传递给 callback。callback 禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。   
3 p.close(): 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
4 P.jion(): 等待所有工作进程退出。此方法只能在 close()或 teminate() 之后调用

应用

from multiprocessing import Pool
import os,time
def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if name == 'main':
p
=Pool(3) #进程池中从无到有创建三个进程, 以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res
=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到 res,等待任务 work 执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走 cpu 的执行权限
res_l.append(res)
print(res_l)

同步调用 applay
from multiprocessing import Pool
import os,time
def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if name == 'main':
p
=Pool(3) #进程池中从无到有创建三个进程, 以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res
=p.apply_async(work,args=(i,)) #同步运行, 阻塞、直到本次任务执行完毕拿到 res
res_l.append(res)

</span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了</span>

p.close()
p.join()
for res in res_l:
print(res.get()) #使用 get 来获取 apply_aync 的结果, 如果是 apply, 则没有 get 方法, 因为 apply 是同步执行, 立刻获取结果, 也根本无需 get

异步调用 apply_async
#一:使用进程池(异步调用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
print( "msg:", msg)
time.sleep(
1)
return msg

if name == "main":
pool
= Pool(processes = 3)
res_l
=[]
for i in range(10):
msg
= "hello %d" %(i)
res
=pool.apply_async(func, (msg,)) #维持执行的进程总数为 processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res)
print("==============================>") #没有后面的 join,或 get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了

pool.close()
#关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
pool.join() #调用 join 之前,先调用 close 函数,否则会出错。执行完 close 后不会有新的进程加入到 pool,join 函数等待所有子进程结束

<span style="color: rgba(0, 0, 255, 1)">print</span>(res_l) <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">看到的是&lt;multiprocessing.pool.ApplyResult object at 0x10357c4e0&gt;对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> res_l:
    </span><span style="color: rgba(0, 0, 255, 1)">print</span>(i.get()) <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get</span>

#二:使用进程池(同步调用,apply)
#
coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
print( "msg:", msg)
time.sleep(
0.1)
return msg

if name == "main":
pool
= Pool(processes = 3)
res_l
=[]
for i in range(10):
msg
= "hello %d" %(i)
res
=pool.apply(func, (msg,)) #维持执行的进程总数为 processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
print("==============================>")
pool.close()
pool.join()
#调用 join 之前,先调用 close 函数,否则会出错。执行完 close 后不会有新的进程加入到 pool,join 函数等待所有子进程结束

<span style="color: rgba(0, 0, 255, 1)">print</span>(res_l) <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">看到的就是最终的结果组成的列表</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span> res_l: <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">apply是同步的,所以直接得到结果,没有get()方法</span>
    <span style="color: rgba(0, 0, 255, 1)">print</span>(i)</pre>
apply_async 与 apply 详解

使用进程池维护固定数目的进程

#Pool 内的进程数默认是 cpu 核数,假设为 4(查看方法 os.cpu_count())
#开启 6 个客户端,会发现 2 个客户端处于等待状态
#在每个进程内查看 pid,会发现 pid 使用为 4 个,即多个客户端公用 4 个进程
from socket import *
from multiprocessing import Pool
import os

server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,
1)
server.bind((
'127.0.0.1',8080))
server.listen(
5)

def talk(conn,client_addr):
print('进程 pid: %s' %os.getpid())
while True:
try:
msg
=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break

if name == 'main':
p
=Pool()
while True:
conn,client_addr
=server.accept()
p.apply_async(talk,args
=(conn,client_addr))
# p.apply(talk,args=(conn,client_addr)) #同步的话,则同一时间只有一个客户端能访问

server 端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect((
'127.0.0.1',8080))

while True:
msg
=input('>>: ').strip()
if not msg:continue

client.send(msg.encode(</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(0, 0, 0, 1)">))
msg</span>=client.recv(1024<span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">print</span>(msg.decode(<span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">utf-8</span><span style="color: rgba(128, 0, 0, 1)">'</span>))</pre>
客户端

发现:并发开启多个客户端,服务端同一时间只有 3 个不同的 pid,干掉一个客户端,另外一个客户端才会进来,被 3 个进程之一处理

回掉函数:

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了 I/O 的过程,直接拿到的是任务的结果。

from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
print('< 进程 %s> get %s' %(os.getpid(),url))
respone
=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}

def pasrse_page(res):
print('< 进程 %s> parse %s' %(os.getpid(),res['url']))
parse_res
='url:<%s> size:[%s]\n' %(res['url'],len(res['text']))
with open(
'db.txt','a') as f:
f.write(parse_res)

if name == 'main':
urls
=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]

p</span>=Pool(3<span style="color: rgba(0, 0, 0, 1)">)
res_l</span>=<span style="color: rgba(0, 0, 0, 1)">[]
</span><span style="color: rgba(0, 0, 255, 1)">for</span> url <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> urls:
    res</span>=p.apply_async(get_page,args=(url,),callback=<span style="color: rgba(0, 0, 0, 1)">pasrse_page)
    res_l.append(res)

p.close()
p.join()
</span><span style="color: rgba(0, 0, 255, 1)">print</span>([res.get() <span style="color: rgba(0, 0, 255, 1)">for</span> res <span style="color: rgba(0, 0, 255, 1)">in</span> res_l]) <span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了</span>

'''
打印结果:
< 进程 3388> get https://www.baidu.com
< 进程 3389> get https://www.python.org
< 进程 3390> get https://www.openstack.org
< 进程 3388> get https://help.github.com/
< 进程 3387> parse https://www.baidu.com
< 进程 3389> get http://www.sina.com.cn/
< 进程 3387> parse https://www.python.org
< 进程 3387> parse https://help.github.com/
< 进程 3387> parse http://www.sina.com.cn/
< 进程 3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com'

View Code

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

from multiprocessing import Pool
import time,random,os

def work(n):
time.sleep(
1)
return n**2
if name == 'main':
p
=Pool()

res_l</span>=<span style="color: rgba(0, 0, 0, 1)">[]
</span><span style="color: rgba(0, 0, 255, 1)">for</span> i <span style="color: rgba(0, 0, 255, 1)">in</span> range(10<span style="color: rgba(0, 0, 0, 1)">):
    res</span>=p.apply_async(work,args=<span style="color: rgba(0, 0, 0, 1)">(i,))
    res_l.append(res)

p.close()
p.join() </span><span style="color: rgba(0, 128, 0, 1)">#</span><span style="color: rgba(0, 128, 0, 1)">等待进程池中所有进程执行完毕</span>
nums=[] for res in res_l: nums.append(res.get()) #拿到所有结果 print(nums) #主进程拿到所有的处理结果, 可以在主进程中进行统一进行处理
View Code