人生苦短,我用python。

一、进程

  • 定义方面:进程是程序在某个数据集合上的一次运行活动;线程是进程中的一个执行路径。
  • 角色方面:在支持线程机制的系统中,进程是系统资源分配的单位,线程是系统调度的单位。
  • 资源共享方面:进程之间不能共享资源,而线程共享所在进程的地址空间和其它资源。同时线程还有自己的栈和栈指针,程序计数器等寄存器。
  • 独立性方面:进程有自己独立的地址空间,而线程没有,线程必须依赖于进程而存在。

二、multiprocessing模块

multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

1、Process类

Process类用来描述一个进程对象。创建子进程的时候,只需要传入一个执行函数和函数的参数即可完成Process实例的创建。
Process([group [, target [, name [, args [, kwargs]]]]])

参数介绍:

  • group:参数未使用,值始终为None
  • target:表示调用对象,即子进程要执行的任务
  • args:表示调用对象的位置参数元组,args=(1,2,’hexin’,)
  • kwargs:表示调用对象的字典,kwargs={‘name’:’hexin’,’age’:18}
  • name:为子进程的名称

方法介绍:

  • p.start():启动进程,并调用该子进程中的p.run()
  • p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
  • p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
  • p.is_alive():如果p仍然运行,返回True
  • p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程

属性介绍:

  • p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
  • p.name: 进程的名称
  • p.pid:进程的pid
  • p.exitcode: 进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
  • p.authkey: 进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

2、创建进程

(1)用multiprocessing.multiprocessing直接创建进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 直接创建进程:
from multiprocessing import Process
import os
import time

def run_proc(name, age, **kwargs): # 子进程要执行的代码
for i in range(10):
print('子进程:name= %s,age=%d ,pid=%d' % (name, age,os.getpid())) # os.getpid() 获得当前进程PID值
print('他的父进程pid=%d' % os.getppid()) # os.getppid() 子进程获得父进程的PID值
print(kwargs)
time.sleep(0.5)

if __name__=='__main__':
print('父进程:pid=%d' % os.getpid()) # os.getpid() 获得当前进程PID值

p = Process(target=run_proc, args=('jack',18), kwargs={"grade":5}, name='run_proc')
# Process(target=这个进程实例所调用对象, args=调用对象的位置参数元组, kwargs=调用对象的关键字参数字典, name=为当前进程实例的别名)

print('子进程%s将要执行' % p.name) # name:当前进程实例别名,默认为Process-N,N为从1开始递增的整数
p.start() # start():启动进程实例(创建子进程)
time.sleep(1)

p.terminate() # terminate():不管任务是否完成,立即终止
p.join(2) # join([timeout]):是否等待进程实例执行结束,或等待多少秒

print('子进程是否还在运行行:%s' % p.is_alive()) # is_alive(): 判断进程实例是否还在执行
print('子进程pid=%d已结束' % p.pid) # pid:当前进程实例的PID值

(2)通过继承multiprocessing.multiprocessing类来创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 通过类创建进程:
from multiprocessing import Process
import os
import time

class Process_Class(Process): # 创建进程的类,需要继承Process类
def __init__(self,interval):
Process.__init__(self) # 用父类Process的__init__进行初始化
self.interval = interval

def run(self): # 重写父类Process的run()方法,用于后面start()调用
print('子进程pid=%s开始执行,父进程为pid=%s' % (os.getpid(),os.getppid()))
t_start = time.time()

time.sleep(self.interval)

t_stop = time.time()
print('子进程pid=%s执行结束,耗时%0.2f秒' % (os.getpid(),t_stop-t_start))

if __name__ == '__main__':
t_start = time.time()
print('父进程pid=%s' % os.getpid())

p = Process_Class(2) # 实例化继承了Process类的类,就等同于实例化一个进程对象
p.start() # 对一个不包含target属性的Process类执行start()方法,会运行这个类中的run()方法,所以这里会执行p.run()
p.join()

t_stop = time.time()
print('父进程pid=%s执行结束,耗时%0.2f秒' % (os.getpid(),t_stop-t_start))

三、进程同步

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的。

用文件共享数据,加锁的是为了保证多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度而保证了数据安全。

multiprocessing.Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#文件当做数据库,模拟抢票(Lock互斥锁)
#文件db的内容为:{"count":2}
#注意一定要用双引号,不然json无法识别

from multiprocessing import Process,Lock
import json
import time
import random
import os

def work(filename,lock): #买票
# lock.acquire()
with lock:
with open(filename,encoding='utf-8') as f:
dic=json.loads(f.read())
# print('剩余票数: %s' % dic['count'])
if dic['count'] > 0:
dic['count']-=1
time.sleep(random.randint(1,3)) #模拟网络延迟
with open(filename,'w',encoding='utf-8') as f:
f.write(json.dumps(dic))
print('%s 购票成功' %os.getpid())
else:
print('%s 购票失败' %os.getpid())
# lock.release()

if __name__ == '__main__':
lock=Lock()
p_l=[]
for i in range(10):
p=Process(target=work,args=('db',lock))
p_l.append(p)
p.start()
for p in p_l:
p.join()

print('主线程')

四、进程异步

同步调用就是‘你’喊‘你朋友’吃饭 ,‘你朋友’在忙 ,‘你’就一直在那等,等‘你朋友’忙完了 ,‘你们’一起去
异步调用就是‘你’喊‘你朋友’吃饭 ,‘你朋友’说知道了 ,待会忙完去找‘你’ ,‘你’就去做别的了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 异步调用

from multiprocessing import Pool
import time
import os

def test():
print("---进程池中的进程---pid=%d,ppid=%d--"%(os.getpid(),os.getppid()))
for i in range(3):
print("----%d---"%i)
time.sleep(1)
return "hahah" # 子进程执行结束后,return返回给操作系统,操作系统再传给test2函数

def test2(args):
print("---callback func--pid=%d"%os.getpid())
print("---callback func--args=%s"%args)

pool = Pool(3)
pool.apply_async(func=test,callback=test2) # callback回调来实现异步调用,告诉主进程停下来正在做的,调用下test2函数后(不用等),再接着做主进程的事

time.sleep(5)
print("----主进程-pid=%d----"%os.getpid())

五、进程池

Pool可以提供指定数量的进程供用户使用,默认是CPU核数。当有新的请求提交到Poll的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。
Pool([numprocess [,initializer [, initargs]]])

参数介绍:

  • numprocess: 要创建的进程数,如果省略,将默认使用cpu_count()的值
  • initializer:是每个工作进程启动时要执行的可调用对象,默认为None
  • initargs:是要传给initializer的参数组

方法介绍:

  • p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
  • p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
  • p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
  • P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法:

  • obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
  • obj.ready():如果调用完成,返回True
  • obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
  • obj.wait([timeout]):等待结果变为可用。
  • obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 进程池 Pool:
# Pool可以提供指定数量的进程供用户使用,默认是CPU核数。
# 当有新的请求提交到Poll的时候,如果池子没有满,会创建一个进程来执行,否则就会让该请求等待。

from multiprocessing import Pool
import os
import time
import random

def worker(args,**kwds):
t_start = time.time()
print("%s开始执行,进程号为%d"%(msg,os.getpid()))
time.sleep(random.random()*2)
t_stop = time.time()
print(args,kwds,"执行完毕,耗时%0.2f"%(t_stop-t_start))

po=Pool(3) # 定义一个进程池,最大进程数3

for i in range(0,10):
#每次循环将会用空闲出来的子进程去调用目标
po.apply_async(worker,(i,{"key":i})) # 非阻塞方式调用

# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,),传递给目标的关键字参数列表)
# Pool.apply(func[, args[, kwds]]) 是阻塞方式调用func

print("----start----")
po.terminate() # 不管任务是否完成,立即终止
po.close() # 关闭进程池,不再接收新的请求
po.join() # 主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用
print("-----end-----")

六、进程间通信

Process之间肯定是需要通信的,Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。
Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

  • maxsize:是队列中允许最大项数,省略则无大小限制。
  • q.put()方法用以插入数据到队列中

    put方法还有两个可选参数:blocked和timeout。
    如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
    如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

  • q.get()方法可以从队列读取并且删除一个元素。

    get方法有两个可选参数:blocked和timeout。
    如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
    如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

  • q.get_nowait():同q.get(False)
  • q.put_nowait():同q.put(False)
  • q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
  • q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
  • q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 进程间通信 Queue:

# (1)可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序
from multiprocessing import Process,Queue
import os
import time
import random

def write(q): # 写数据进程执行的代码
for value in ['A', 'B', 'C']:
if not q.full(): # Queue.full() 判断消息队列是否已满
print('put %s ro queue' % value)
q.put(value) # Queue.put(item,[block[, timeout]]) 向消息队列写入一条消息
# q.put_notwait(value) # 相当Queue.put(item, False)
time.sleep(random.random())
else:
print('消息列队已满,现有消息数量:%s' % q.qsize()) # Queue.qsize() 返回当前队列包含的消息数量

def read(q): # 读数据进程执行的代码
while True:
if not q.empty(): # Queue.empty() 判断消息队列是否为空
value = q.get(True) # Queue.get([block[, timeout]]) 获取队列中的一条消息,然后将其从列队中移除
# value = q.get_nowait() # 相当Queue.get(False)
print('Get %s from queue.' % value)
time.sleep(random.random())
else:
break

if __name__ == '__main__':
q = Queue() # 父进程创建Queue,并传给各个子进程

pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))

pw.start()
pw.join()

pr.start()
pr.join() # pr进程里是死循环,无法等待其结束,只能强行终止

print('所有数据都写入并且读完')


# (2)如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue()
from multiprocessing import Pool,Manager
import os,time,random

def writer(q):
print("writer启动(%s),父进程为(%s)" % (os.getpid(),os.getppid()))
for i in ['A','B','C']:
q.put(i)

def reader(q):
print("reader启动(%s),父进程为(%s)" % (os.getpid(),os.getppid()))
for i in range(q.qsize()):
print("reader从Queue获取到消息:%s" % q.get(True))

if __name__=="__main__":
print("(%s) start" % os.getpid())
q=Manager().Queue() #使用Manager中的Queue来初始化
po=Pool()

#使用阻塞模式创建进程,这样就不需要在reader中使用死循环了,可以让writer完全执行完成后,再用reader去读取
po.apply(writer,(q,))
po.apply(reader,(q,))

po.close()
po.join()

print("(%s) End" % os.getpid())

七、子进程

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。
subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

1
2
3
4
5
6
#Python代码中运行命令nslookup www.python.org
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

如果子进程还需要输入,则可以通过communicate()方法输入:

1
2
3
4
5
6
7
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

持续更新…

最后更新: 2018年12月05日 14:18

原始链接: http://pythonfood.github.io/2017/12/30/python多进程/

× 多少都行~
打赏二维码