一、线程
线程是CPU分配资源的基本单位。但一个程序开始运行,这个程序就变成了一个进程,而一个进程相当于一个或者多个线程。当没有多线程编程时,一个进程也是一个主线程,但有多线程编程时,一个进程包含多个线程,包括主线程。使用线程可以实现程序的并发。
进程VS线程:
功能:
(1)进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ
(2)线程,能够完成多任务,比如 一个QQ中的多个聊天窗口定义不同:
(1)进程是系统进行资源分配和调度的一个独立单位
(2)线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.区别:
(1)一个程序至少有一个进程,一个进程至少有一个线程.
(2)线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。
(3)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
(4)线线程不能够独立执行,必须依存在进程中优缺点:
线程执行开销小,但不利于资源的管理和保护;而进程正相反
Python3通过两个标准库_thread
和threading
提供对线程的支持:
(1)_thread
提供了低级别的、原始的线程以及一个简单的锁,它相比于threading模块的功能还是比较有限的。
(2)threading
模块除了包含_thread 模块中的所有方法外,还提供的其他方法:
threading.currentThread()
: 返回当前的线程变量。threading.enumerate()
: 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount()
: 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:run()
: 用以表示线程活动的方法。start()
:启动线程活动。join([time])
: 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。isAlive()
: 返回线程是否活动的。getName()
: 返回线程名。setName()
: 设置线程名。
二、_thread模块
函数式:调用_thread模块中的start_new_thread()函数来产生新线程。_thread.start_new_thread ( function, args[, kwargs] )
- function - 线程函数。
- args - 传递给线程函数的参数,他必须是个tuple类型。
- kwargs - 可选参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import _thread
import time
# 为线程定义一个函数
def print_time(threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print('s%: s%' % (threadName, time.ctime(time.time())))
# 创建两个线程
try:
_thread.start_new_thread(print_time, ('thread-1', 2))
_thread.start_new_thread(print_time, ('thread-2', 4))
except:
print('E:无法启动线程')
while 1:
pass
三、threading模块
1、用threading.Thread直接在线程中运行函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 # 直接创建线程:
import threading
import time
def sing(name,**kwargs):
for i in range(3):
print('正在唱歌<<%s>>_%d' % (name, i))
print(kwargs)
time.sleep(1)
def dance(name,**kwargs):
for i in range(3):
print('正在跳舞<<%s>>_%d' % (name, i))
print(kwargs)
time.sleep(1)
if __name__ == '__main__':
# 主线程会等待所有的子线程结束后才结束
print('开始%s' % time.ctime())
# threading.Thread(target=这个线程实例所调用对象, args=调用对象的位置参数元组, kwargs=调用对象的关键字参数字典)
t1 = threading.Thread(target=sing, args=('欢乐颂',), kwargs={'时长':'5min'}) # 创建线程
t2 = threading.Thread(target=dance, args=('新年好',), kwargs={'时长':'20min'})
t1.start() # start() 启动线程活动
t2.start()
t1.join() # join([time]) 等待线程中止,可设置超时时间
t2.join()
print('线程是否活动:', t1.isAlive()) # isAlive() 返回线程是否活动的
print('线程是否活动:', t2.isAlive())
t1.setName('thread-t1') # setName() 设置线程名
print('获取线程名称:', t1.getName()) # getName() 返回线程名
print('当前运行的线程:', threading.currentThread()) # threading.currentThread() 返回当前的线程变量
while True:
print('当前运行的线程数%d' % threading.activeCount()) # threading.activeCount() 返回正在运行的线程数量
print('当前运行的所有线程:', threading.enumerate()) # threading.enumerate() 返回一个包含正在运行的线程的列表
if len(threading.enumerate()) <= 1: # len(threading.enumerate()) 等同与 threading.activeCount()
break
time.sleep(0.5)
print('结束%s' % time.ctime())
2、通过继承threading.Thread类来创建线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 # 使用类创建线程:
import threading
import time
class MyThread(threading.Thread):
def __init__(self,name):
super().__init__() # 使用父类__init__进行初始化
self.name = name
def run(self): # 重写父类Thread的run()方法,定义线程的功能函数,用于后面start()调用
for i in range(3):
time.sleep(1)
print('I am ' + self.name + ' @ ' + str(i))
if __name__ == '__main__':
t = MyThread('Thread-t') # 实例化就生成一个线程
t.start() # 执行start()方法,会运行这个类中的run()方法,所以这里会执行t.run()
四、线程同步
1、多线程共享全局变量
在一个进程内的所有线程共享全局变量,能够在不适用其他方式的前提下完成多线程之间的数据共享(这点要比多进程要好)
缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)
1 | import threading |
2、线程同步:
如果没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。
解决思路就是引入线程同步,同步就是协同步调,按预定的先后次序进行运行。
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。
- 锁的好处:确保了某段关键代码只能由一个线程从头到尾完整地执行
- 锁的坏处:
- 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
- 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
使用Thread对象的Lock()
和Rlock()
可以实现简单的线程同步,这两个对象都有acquire()
方法和release()
方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire()和release()方法之间。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32import threading
import time
g_num = 0
def work1():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) # acquire([blocking]) 锁定
# True表示堵塞 即如果这个锁在上锁之前已经被上锁了,那么这个线程会在这里一直等待到解锁为止
# False表示非堵塞,即不管本次调用能够成功上锁,都不会卡在这,而是继续执行下面的代码
if mutexFlag:
g_num +=1
mutex.release() # release() 解锁
print('work1:g_num=%d' % g_num)
def work2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num +=1
mutex.release()
print('work1:g_num=%d' % g_num)
mutex = threading.Lock() # 创建一个互斥锁,这个锁的默认状态是未上锁
t1 = threading.Thread(target=work1)
t2 = threading.Thread(target=work2)
t1.start()
t2.start()
print('g_num=%d' % g_num)
3、死锁:
死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
避免死锁:
- 程序设计时要尽量避免(银行家算法)
- 添加超时时间等
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
1 | import threading |
五、全局解释器锁
Python GIL(Global Interpreter Lock)
如果使用多核CPU,在Cpython解释器中,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。
1、i/o密集型
多线程用于IO密集型,如socket,爬虫,web。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 from threading import Thread
from multiprocessing import Process
import time
import os
def work():
time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
print(os.getpid())
if __name__ == '__main__':
t_l=[]
start_time=time.time()
for i in range(1000):
t=Thread(target=work) #耗时大概为2秒
# t=Process(target=work) #耗时大概为25秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
t_l.append(t)
t.start()
for t in t_l:
t.join()
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
2、cpu密集型
多进程用于计算密集型,如金融分析,视频解码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33 from threading import Thread
from multiprocessing import Process
import os
import time
def work():
res=0
for i in range(1000000):
res+=i
if __name__ == '__main__':
t_l=[]
start_time=time.time()
for i in range(300):
# t=Thread(target=work) #多线程49.64094281196594
t=Process(target=work) #多进程11.664679050445557
t_l.append(t)
t.start()
for i in t_l:
i.join()
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
print('主线程')
# run time is 49.64094281196594
# 主线程
# run time is 11.664679050445557
# 主线程
六、线程优先级队列
Queue模块中提供了同步的、线程安全的队列类,包括:
- FIFO(先入先出)队列 Queue
- LIFO(后入先出)队列 LifoQueue
- 优先级队列 PriorityQueue
这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用,可以使用队列来实现线程间的同步。
Queue模块中的常用方法:
Queue.qsize()
返回队列的大小Queue.empty()
如果队列为空,返回True,反之FalseQueue.full()
如果队列满了,返回True,反之FalseQueue.full
与 maxsize 大小对应Queue.get([block[, timeout]])
获取队列,timeout等待时间Queue.get_nowait()
相当Queue.get(False)Queue.put(item)
写入队列,timeout等待时间Queue.put_nowait(item)
相当Queue.put(item, False)Queue.task_done()
在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号Queue.join()
实际上意味着等到队列为空,再执行别的操作
1 | import threading |
七、ThreadLocal
ThreadLoca可以解决参数在一个线程中各个函数之间互相传递的问题。
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等。
1 | import threading |
持续更新…
最后更新: 2018年12月05日 15:02