一、进程
- 定义方面:进程是程序在某个数据集合上的一次运行活动;线程是进程中的一个执行路径。
- 角色方面:在支持线程机制的系统中,进程是系统资源分配的单位,线程是系统调度的单位。
- 资源共享方面:进程之间不能共享资源,而线程共享所在进程的地址空间和其它资源。同时线程还有自己的栈和栈指针,程序计数器等寄存器。
- 独立性方面:进程有自己独立的地址空间,而线程没有,线程必须依赖于进程而存在。
二、multiprocessing模块
multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
1、Process类
Process类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成Process实例的创建。Process([group [, target [, name [, args [, kwargs]]]]])
参数介绍:
- group:参数未使用,值始终为None
- target:表示调用对象,即子进程要执行的任务
- args:表示调用对象的位置参数元组,args=(1,2,’hexin’,)
- kwargs:表示调用对象的字典,kwargs={‘name’:’hexin’,’age’:18}
- name:为子进程的名称
方法介绍:
p.start()
:启动进程,并调用该子进程中的p.run()p.run()
:进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法p.terminate()
:强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁p.is_alive()
:如果p仍然运行,返回Truep.join([timeout])
:主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
属性介绍:
p.daemon
:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置p.name
: 进程的名称p.pid
:进程的pidp.exitcode
: 进程在运行时为None、如果为–N,表示被信号N结束(了解即可)p.authkey
: 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
2、创建进程
(1)用multiprocessing.multiprocessing直接创建进程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27# 直接创建进程:
from multiprocessing import Process
import os
import time
def run_proc(name, age, **kwargs): # 子进程要执行的代码
for i in range(10):
print('子进程:name= %s,age=%d ,pid=%d' % (name, age,os.getpid())) # os.getpid() 获得当前进程PID值
print('他的父进程pid=%d' % os.getppid()) # os.getppid() 子进程获得父进程的PID值
print(kwargs)
time.sleep(0.5)
if __name__=='__main__':
print('父进程:pid=%d' % os.getpid()) # os.getpid() 获得当前进程PID值
p = Process(target=run_proc, args=('jack',18), kwargs={"grade":5}, name='run_proc')
# Process(target=这个进程实例所调用对象, args=调用对象的位置参数元组, kwargs=调用对象的关键字参数字典, name=为当前进程实例的别名)
print('子进程%s将要执行' % p.name) # name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数
p.start() # start():启动进程实例(创建子进程)
time.sleep(1)
p.terminate() # terminate():不管任务是否完成,立即终止
p.join(2) # join([timeout]):是否等待进程实例执行结束,或等待多少秒
print('子进程是否还在运行行:%s' % p.is_alive()) # is_alive(): 判断进程实例是否还在执行
print('子进程pid=%d已结束' % p.pid) # pid:当前进程实例的PID值
(2)通过继承multiprocessing.multiprocessing类来创建线程1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29# 通过类创建进程:
from multiprocessing import Process
import os
import time
class Process_Class(Process): # 创建进程的类,需要继承Process类
def __init__(self,interval):
Process.__init__(self) # 用父类Process的__init__进行初始化
self.interval = interval
def run(self): # 重写父类Process的run()方法,用于后面start()调用
print('子进程pid=%s开始执行,父进程为pid=%s' % (os.getpid(),os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print('子进程pid=%s执行结束,耗时%0.2f秒' % (os.getpid(),t_stop-t_start))
if __name__ == '__main__':
t_start = time.time()
print('父进程pid=%s' % os.getpid())
p = Process_Class(2) # 实例化继承了Process类的类,就等同于实例化一个进程对象
p.start() # 对一个不包含target属性的Process类执行start()方法,会运行这个类中的run()方法,所以这里会执行p.run()
p.join()
t_stop = time.time()
print('父进程pid=%s执行结束,耗时%0.2f秒' % (os.getpid(),t_stop-t_start))
三、进程同步
进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的。
用文件共享数据,加锁的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度而保证了数据安全。
multiprocessing.Lock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#文件当做数据库,模拟抢票(Lock互斥锁)
#文件db的内容为:{"count":2}
#注意一定要用双引号,不然json无法识别
from multiprocessing import Process,Lock
import json
import time
import random
import os
def work(filename,lock): #买票
# lock.acquire()
with lock:
with open(filename,encoding='utf-8') as f:
dic=json.loads(f.read())
# print('剩余票数: %s' % dic['count'])
if dic['count'] > 0:
dic['count']-=1
time.sleep(random.randint(1,3)) #模拟网络延迟
with open(filename,'w',encoding='utf-8') as f:
f.write(json.dumps(dic))
print('%s 购票成功' %os.getpid())
else:
print('%s 购票失败' %os.getpid())
# lock.release()
if __name__ == '__main__':
lock=Lock()
p_l=[]
for i in range(10):
p=Process(target=work,args=('db',lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()
print('主线程')
四、进程异步
同步调用就是‘你’喊‘你朋友’吃饭 ,‘你朋友’在忙 ,‘你’就一直在那等,等‘你朋友’忙完了 ,‘你们’一起去
异步调用就是‘你’喊‘你朋友’吃饭 ,‘你朋友’说知道了 ,待会忙完去找‘你’ ,‘你’就去做别的了。
1 | # 异步调用 |
五、进程池
Pool可以提供指定数量的进程供用户使用,默认是CPU核数。当有新的请求提交到Poll的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。Pool([numprocess [,initializer [, initargs]]])
参数介绍:
- numprocess: 要创建的进程数,如果省略,将默认使用cpu_count()的值
- initializer:是每个工作进程启动时要执行的可调用对象,默认为None
- initargs:是要传给initializer的参数组
方法介绍:
p.apply(func [, args [, kwargs]])
:在一个池工作进程中执行func(args,*kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()p.apply_async(func [, args [, kwargs]])
:在一个池工作进程中执行func(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。p.close()
:关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成P.jion()
:等待所有工作进程退出。此方法只能在close()或teminate()之后调用
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法:
obj.get()
:返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。obj.ready()
:如果调用完成,返回Trueobj.successful()
:如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常obj.wait([timeout])
:等待结果变为可用。obj.terminate()
:立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
1 | # 进程池 Pool: |
六、进程间通信
Process之间肯定是需要通信的,Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。Queue([maxsize])
:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
maxsize
:是队列中允许最大项数,省略则无大小限制。q.put()
方法用以插入数据到队列中put方法还有两个可选参数:blocked和timeout。
如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。q.get()
方法可以从队列读取并且删除一个元素。get方法有两个可选参数:blocked和timeout。
如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.q.get_nowait()
:同q.get(False)q.put_nowait()
:同q.put(False)q.empty()
:调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。q.full()
:调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。q.qsize()
:返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
1 | # 进程间通信 Queue: |
七、子进程
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。subprocess
模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。
1
2
3
4
5
6 #Python代码中运行命令nslookup www.python.org
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
如果子进程还需要输入,则可以通过communicate()
方法输入:
1
2
3
4
5
6
7 import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
持续更新…
最后更新: 2018年12月05日 14:18