multiprocessing类似于threading模块支持生成进程的包,是Python的标准模块,它既可以用来编写多进程,也可以用来编写多线程。由于python使用全局解释器锁(GIL),他会将进程中的线程序列化,也就是多核cpu实际上并不能达到并行提高速度的目的,而使用多进程则是不受限的,所以现实应用中都是使用多进程的。
如果每个子进程执行需要消耗的时间非常短(执行+1操作等),这不必使用多进程,因为进程的启动关闭也会耗费资源。当然使用多进程往往是用来处理CPU密集型(科学计算)的需求,如果是IO密集型(文件读取,爬虫等)则可以使用多线程去处理。
管理进程模块
函数名 | 功能 |
---|---|
Process | 用于创建进程模块 |
Pool | 用于创建管理进程池 |
Queue | 创建‘队列对象’,用于进程通信,资源共享 |
Pipe | 用于管道通信 |
Manager | 用于资源共享 |
Value,Array | 用于进程通信,资源共享 |
同步子进程模块
函数名 | 功能 |
---|---|
Condition | Condition(条件变量)通常与一个锁关联 |
Event | 多线程,事件对象 |
Lock | Lock锁,线程同步 |
RLock | 和lock用法基本一致,但是对RLock进行多次acquire()操作,程序不会阻塞。 |
Semaphore | Semaphore是一个工厂函数,负责返回一个新的信号量对象。 |
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
语法:Process([group [, target [, name [, args [, kwargs]]]]])
参数介绍:
group:参数未使用,默认值为None。
target:表示调用对象,即子进程要执行的任务。
args:表示调用的位置参数元祖。
kwargs:表示调用对象的字典。如kwargs = {‘name’:Jack, ‘age’:18}。
name:子进程名称。
方法名 | 功能 |
---|---|
run() | 第 2 种创建进程的方式需要用到,继承类中需要对方法进行重写,该方法中包含的是新进程要执行的代码。 |
start() | 和启动子线程一样,新创建的进程也需要手动启动,该方法的功能就是启动新创建的线程。 |
join([timeout]) | 和 thread 类 join() 方法的用法类似,其功能是在多进程执行过程,其他进程必须等到调用 join() 方法的进程执行完毕(或者执行规定的 timeout 时间)后,才能继续执行; |
is_alive() | 判断当前进程是否存活。 |
terminate() | 中断该进程。 |
name属性 | 可以为该进程重命名,也可以获得该进程的名称。 |
daemon | 和守护线程类似,通过设置该属性为 True,可将新建进程设置为“守护进程”。 |
pid | 返回进程的 ID 号。每个进程都有一个唯一的 ID 号。 |
方法一:直接调用multiprocessing下面的process模块
'''
方法一,直接调用multiprocessing下面的process模块
'''
from multiprocessing import Process
import time
def task(name,prompt):
print('%s 开启--%s--任务 %s'%(name,prompt,time.strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(3)
print('%s 结束--%s--任务 %s' % (name,prompt, time.strftime('%Y-%m-%d %H:%M:%S')))
if __name__ == "__main__":
obj0 = Process(target=task,args=('Jasn','进程一')) # 进程一
obj1 = Process(target=task, args=('alex','进程二')) # 进程二
obj2 = Process(target=task, args=('wupeqi','进程三')) # 进程三
obj3 = Process(target=task, args=('yuanhao','进程四')) # 进程四
obj0.start()
obj1.start()
obj2.start()
obj3.start()
print('主进程')
'''打印结果 :
主进程
Jasn 开启--进程一--任务 2020-03-18 21:38:14
alex 开启--进程二--任务 2020-03-18 21:38:14
wupeqi 开启--进程三--任务 2020-03-18 21:38:14
yuanhao 开启--进程四--任务 2020-03-18 21:38:14
Jasn 结束--进程一--任务 2020-03-18 21:38:17
alex 结束--进程二--任务 2020-03-18 21:38:17
wupeqi 结束--进程三--任务 2020-03-18 21:38:17
yuanhao 结束--进程四--任务 2020-03-18 21:38:17
'''
方法二:通过继承 Process 类的子类,创建新的进程
'''
通过继承 Process 类的子类,创建实例对象,也可以创建新的进程。注意,继承 Process 类的子类需重写父类的 run() 方法。
'''
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name,prompt):
super(MyProcess,self).__init__()
self.name = name
self.prompt = prompt
def run(self):
print('%s 开启--%s--任务 %s' % (self.name, self.prompt, time.strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(3)
print('%s 结束--%s--任务 %s' % (self.name, self.prompt, time.strftime('%Y-%m-%d %H:%M:%S')))
if __name__ == "__main__":
obj0 = MyProcess('Jasn', '进程一') # 进程一
obj1 = MyProcess('alex','进程二') # 进程二
obj2 = MyProcess('wupeqi','进程三') # 进程三
obj3 = MyProcess('yuanhao','进程四') # 进程四
print('===========进程方法============')
obj0.start() # 重新开启进程方法 ,时间2020-03-18 15:38:20
obj1.start() # 2020-03-18 15:38:20
obj2.start() # 2020-03-18 15:38:20
obj3.start() # 2020-03-18 15:38:20
time.sleep(5)
print('===========普通方法============')
obj0.run() # Jasn 开启--进程一--任务 2020-03-18 15:42:48
obj1.run() # alex 开启--进程二--任务 2020-03-18 15:42:51
obj2.run() # wupeqi 开启--进程三--任务 2020-03-18 15:42:54
obj3.run() # yuanhao 开启--进程四--任务 2020-03-18 15:42:57
'''
obj3.run();
只是调用了一个普通方法,并没有启动另一个线程,程序还是会按照顺序执行相应的代码。
obj3.start();
则表示,重新开启一个进程,不必等待其他线程运行完,只要得到CPU就可以运行该进程程。
'''
注意:在windows中Process()必须放到# if name == ‘main’:下
3.2 Process对象的join方法'''
Process对象的join方法
'''
from multiprocessing import Process
import time
import random
def Task(name):
print('%s 开启----任务 %s' % (name, time.strftime('%Y-%m-%d %H:%M:%S')))
time.sleep(random.randint(1, 3))
print('%s 结束----任务 %s' % (name, time.strftime('%Y-%m-%d %H:%M:%S')))
if __name__ == "__main__":
p1 = Process(target=Task, args=('Jasn',))
p2 = Process(target=Task, args=('nancy',))
p3 = Process(target=Task, args=('刘亦菲',))
p4 = Process(target=Task, args=('花木兰',))
for p in [p1, p2, p3, p4]: # 简写
p.start()
p1.join()
print('主线程')
# 有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
# 当然不是了,必须明确:p.join()是让谁等?
# 很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
详细解析:
进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有4个并发的进程了
而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束
可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
'''
进程对象的其他方法一:terminate,is_alive
'''
from multiprocessing import Process
import time
import random
class Task(Process):
def __init__(self, name):
self.name = name
super().__init__() # Process的__init__方法会执行self.name= Task-1 ,所以加到这里,会覆盖我们的self.name=name,
# 我们开启的进程设置名字的做法,是将self.name = name 放在父类方法下
def run(self):
print('%s is Tasking' % self.name)
time.sleep(random.randrange(2, 5))
print('%s is Task end' % self.name)
if __name__ == "__main__":
p1 = Task('Jasn')
p1.start()
p1.terminate() # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) # 结果为True
print('开始')
print(p1.is_alive()) # 结果为False
4、守护进程
'''
守护进程:一定具有daemon属性标志,(thread.setDaemon(True))就表示这个线程“不重要”。
'''
from multiprocessing import Process
import time
import random
class Task(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print('%s is Tasking' %self.name)
time.sleep(random.randrange(1,3))
print('%s is Task end' %self.name)
if __name__ == "__main__":
p = Task('Jasn')
p.daemon = True # 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
print('主')
总结:
主进程退出,不管子进程是否执行完,子进程都会退出; 非守护进程运行完,主进程才算真正的运行完。 如果是多进程的话,那么主进程先运行,再运行子进程;如果是多线程的话,那么按照代码的输出顺序输出。 如果在守护进程中子进程加了join()(起到阻塞作用),那么主进程会等子进程都运行完。参考博客:https://blog.csdn.net/u013210620/article/details/78710532
5、僵尸进程和孤儿进程 5.1 僵尸进程概念僵尸进程:(父进程没结束,子进程提前结束,父进程没有处理子进程的状态)-------有害,应当避免
一个进程使用fork创建子进程,如果子进程退出,而父进程没有调用wait或waitpid获取进程的状态信息,那么子进程的进程描述符仍保存在系统中,这种进程称为僵死进程。
孤儿进程:(父进程提前退出,子进程还没结束,子进程成为孤儿进程)--------无害
一个父进程退出,而它的一个或着多个子进程还在运行,那么这些子进程将称为孤儿进程。孤儿进程将被init进程(进程号1)所收养, 并由init进程对它们完成状态收集工作。
参考博客:https://blog.csdn.net/Lovegengxin/article/details/80347468
6、进程互斥锁互斥锁:就是将要执行任务的部门代码(只涉及到修改共享数据的代码)变成串行
实验:实现简单的抢票功能
分析:
1、查看变为并发(实现大家查询到的车票信息是一致的)
2、购买变为串行(因为购买涉及到了修改数据信息,所以一定要遵循先到先得)
(理解为用锁来限制,同一时间只能让一个人拿着锁去改数据,先抢到锁的人 就有优先购买的权限)
db.txt 文件
{"count": 1} # 假设只有一张余票,10个人来抢
执行文件
from multiprocessing import Process,Lock
import time
import json,os
import random
def search():
dic = json.load(open('db.txt','r',encoding='utf-8'))
print('用户[%s]---剩余票数%s' % (os.getpid(), dic['count'])) # 不同进程ID号唯一,用来区分用户
def get(lock):
time.sleep(random.uniform(3, 6)) # 模拟读数据的网络延迟
lock.acquire() # 加上互斥锁,让并行变成串行
dic = json.load(open('db.txt','r',encoding='utf-8'))
if dic['count'] > 0: # 如果大于0 ,则说明有票
dic['count'] -= 1
time.sleep(0.2) # 模拟写数据的网络延迟
json.dump(dic, open('db.txt','w'))
print('\n用户[%s],购票成功'%os.getpid())
else:
print('\n用户[%s],票已售罄!'%os.getpid())
lock.release() # 退出互斥锁
def task(lock):
search() # 查票 (并行访问)
get(lock) #抢票(加入互斥锁,实现串行访问,先到先得原则)
if __name__ == '__main__':
lock = Lock()
for i in range(10): # 模拟并发10个客户端抢票
p = Process(target=task, args=(lock,))
p.start()
7、队列 (Queue)
7.1 队列介绍
Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递。
7.2 基本FIFO队列语法:
class Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
from multiprocessing import Queue
q = Queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
结果:
0
1
2
3
4
语法:
class Queue.LifoQueue([maxsize])
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上
import queue
q = queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get())
结果:
4
3
2
1
0
属性名或方法名 | 功能 |
---|---|
Queue.qsize() | 返回队列的大小 |
Queue.empty() | 如果队列为空,返回True,反之False |
Queue.full() | 如果队列满了,返回True,反之False |
Queue.full | 与 maxsize 大小对应 |
Queue.get([block[, timeout]])() | 获取队列,timeout等待时间 |
Queue.get_nowait() | 相当Queue.get(False) |
Queue.put(item) | 写入队列,timeout等待时间 |
Queue.put_nowait(item | 当Queue.put(item, False) |
Queue.task_done() | 在完成一项工作后,Queue.task_done()函数向任务已经完成的队列发送一个信号 |
Queue.join() | 实际上意味着等到队列为空,再执行别的操作 |
1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
其他博文:【python内功修炼001】:python多进程编程基础