人生苦短,我用python。

一、线程

线程是CPU分配资源的基本单位。但一个程序开始运行,这个程序就变成了一个进程,而一个进程相当于一个或者多个线程。当没有多线程编程时,一个进程也是一个主线程,但有多线程编程时,一个进程包含多个线程,包括主线程。使用线程可以实现程序的并发。

进程VS线程:

功能:
(1)进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ
(2)线程,能够完成多任务,比如 一个QQ中的多个聊天窗口

定义不同:
(1)进程是系统进行资源分配和调度的一个独立单位
(2)线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

区别:
(1)一个程序至少有一个进程,一个进程至少有一个线程.
(2)线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。
(3)进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
(4)线线程不能够独立执行,必须依存在进程中

优缺点:
线程执行开销小,但不利于资源的管理和保护;而进程正相反

Python3通过两个标准库_threadthreading提供对线程的支持:
(1)_thread提供了低级别的、原始的线程以及一个简单的锁,它相比于threading模块的功能还是比较有限的。
(2)threading模块除了包含_thread 模块中的所有方法外,还提供的其他方法:

  • threading.currentThread(): 返回当前的线程变量。
  • threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
    除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
  • run(): 用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive(): 返回线程是否活动的。
  • getName(): 返回线程名。
  • setName(): 设置线程名。

二、_thread模块

函数式:调用_thread模块中的start_new_thread()函数来产生新线程。
_thread.start_new_thread ( function, args[, kwargs] )

  • function - 线程函数。
  • args - 传递给线程函数的参数,他必须是个tuple类型。
  • kwargs - 可选参数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import _thread
import time

# 为线程定义一个函数
def print_time(threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print('s%: s%' % (threadName, time.ctime(time.time())))

# 创建两个线程
try:
_thread.start_new_thread(print_time, ('thread-1', 2))
_thread.start_new_thread(print_time, ('thread-2', 4))
except:
print('E:无法启动线程')

while 1:
pass

三、threading模块

1、用threading.Thread直接在线程中运行函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 直接创建线程:
import threading
import time

def sing(name,**kwargs):
for i in range(3):
print('正在唱歌<<%s>>_%d' % (name, i))
print(kwargs)
time.sleep(1)

def dance(name,**kwargs):
for i in range(3):
print('正在跳舞<<%s>>_%d' % (name, i))
print(kwargs)
time.sleep(1)

if __name__ == '__main__':
# 主线程会等待所有的子线程结束后才结束
print('开始%s' % time.ctime())

# threading.Thread(target=这个线程实例所调用对象, args=调用对象的位置参数元组, kwargs=调用对象的关键字参数字典)
t1 = threading.Thread(target=sing, args=('欢乐颂',), kwargs={'时长':'5min'}) # 创建线程
t2 = threading.Thread(target=dance, args=('新年好',), kwargs={'时长':'20min'})

t1.start() # start() 启动线程活动
t2.start()
t1.join() # join([time]) 等待线程中止,可设置超时时间
t2.join()

print('线程是否活动:', t1.isAlive()) # isAlive() 返回线程是否活动的
print('线程是否活动:', t2.isAlive())

t1.setName('thread-t1') # setName() 设置线程名
print('获取线程名称:', t1.getName()) # getName() 返回线程名
print('当前运行的线程:', threading.currentThread()) # threading.currentThread() 返回当前的线程变量

while True:
print('当前运行的线程数%d' % threading.activeCount()) # threading.activeCount() 返回正在运行的线程数量
print('当前运行的所有线程:', threading.enumerate()) # threading.enumerate() 返回一个包含正在运行的线程的列表
if len(threading.enumerate()) <= 1: # len(threading.enumerate()) 等同与 threading.activeCount()
break
time.sleep(0.5)

print('结束%s' % time.ctime())

2、通过继承threading.Thread类来创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 使用类创建线程:
import threading
import time

class MyThread(threading.Thread):

def __init__(self,name):
super().__init__() # 使用父类__init__进行初始化
self.name = name

def run(self): # 重写父类Thread的run()方法,定义线程的功能函数,用于后面start()调用
for i in range(3):
time.sleep(1)
print('I am ' + self.name + ' @ ' + str(i))

if __name__ == '__main__':
t = MyThread('Thread-t') # 实例化就生成一个线程
t.start() # 执行start()方法,会运行这个类中的run()方法,所以这里会执行t.run()

四、线程同步

1、多线程共享全局变量

在一个进程内的所有线程共享全局变量,能够在不适用其他方式的前提下完成多线程之间的数据共享(这点要比多进程要好)

缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import threading
import time

g_num = 100
g_nums = [100, 200, 300]
print('多线程执行前g_num=%d' % g_num)
print('多线程执行前g_nums=', g_nums)

def work1(nums):
global g_num
g_num += 1 # 线程修改全局变量
nums.append(400) # 列表当做实参传递到线程中,也可以修改
print('多线程work1执行后g_num=%d' % g_num)
print('多线程work1执行后g_nums=', nums)

def work2(nums):
global g_num
g_num += 1
nums.append(500)
print('多线程work2执行后g_num=%d' % g_num)
print('多线程work2执行后g_nums=', nums)

t1 = threading.Thread(target=work1, args=(g_nums,))
t2 = threading.Thread(target=work2, args=(g_nums,))
t1.start()
time.sleep(1) # 延时一会,保证t1线程中的事情做完
t2.start()

2、线程同步:

如果没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。
解决思路就是引入线程同步,同步就是协同步调,按预定的先后次序进行运行。

线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。

  • 锁的好处:确保了某段关键代码只能由一个线程从头到尾完整地执行
  • 锁的坏处:
    • 阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了
    • 由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁

使用Thread对象的Lock()Rlock()可以实现简单的线程同步,这两个对象都有acquire()方法和release()方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire()和release()方法之间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import threading
import time

g_num = 0

def work1():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True) # acquire([blocking]) 锁定
# True表示堵塞 即如果这个锁在上锁之前已经被上锁了,那么这个线程会在这里一直等待到解锁为止
# False表示非堵塞,即不管本次调用能够成功上锁,都不会卡在这,而是继续执行下面的代码
if mutexFlag:
g_num +=1
mutex.release() # release() 解锁
print('work1:g_num=%d' % g_num)

def work2():
global g_num
for i in range(1000000):
mutexFlag = mutex.acquire(True)
if mutexFlag:
g_num +=1
mutex.release()
print('work1:g_num=%d' % g_num)

mutex = threading.Lock() # 创建一个互斥锁,这个锁的默认状态是未上锁

t1 = threading.Thread(target=work1)
t2 = threading.Thread(target=work2)
t1.start()
t2.start()
print('g_num=%d' % g_num)

3、死锁:

死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

避免死锁:

  • 程序设计时要尽量避免(银行家算法)
  • 添加超时时间等

RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import threading
import time

class MyThread1(threading.Thread):
def run(self):
if mutexA.acquire(): # 对mutexA上锁
print(self.name+'----do1---up----')
time.sleep(1)
if mutexB.acquire(): # 等待mutexB解锁
print(self.name+'----do1---down----')
mutexB.release()
mutexA.release()

class MyThread2(threading.Thread):
def run(self):
if mutexB.acquire(): # 对mutexB上锁
print(self.name+'----do2---up----')
time.sleep(1)
if mutexA.acquire(): # 等待mutexA解锁
print(self.name+'----do2---down----')
mutexA.release()
mutexB.release()

mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()

五、全局解释器锁

Python GIL(Global Interpreter Lock)
如果使用多核CPU,在Cpython解释器中,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

面向I/O的(会调用内建的操作系统C代码的)程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。

1、i/o密集型
多线程用于IO密集型,如socket,爬虫,web。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from threading import Thread
from multiprocessing import Process
import time
import os
def work():
time.sleep(2) #模拟I/O操作,可以打开一个文件来测试I/O,与sleep是一个效果
print(os.getpid())

if __name__ == '__main__':
t_l=[]
start_time=time.time()
for i in range(1000):
t=Thread(target=work) #耗时大概为2秒
# t=Process(target=work) #耗时大概为25秒,创建进程的开销远高于线程,而且对于I/O密集型,多cpu根本不管用
t_l.append(t)
t.start()

for t in t_l:
t.join()
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

2、cpu密集型
多进程用于计算密集型,如金融分析,视频解码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from threading import Thread
from multiprocessing import Process
import os
import time
def work():
res=0
for i in range(1000000):
res+=i

if __name__ == '__main__':
t_l=[]
start_time=time.time()

for i in range(300):
# t=Thread(target=work) #多线程49.64094281196594
t=Process(target=work) #多进程11.664679050445557
t_l.append(t)
t.start()

for i in t_l:
i.join()

stop_time=time.time()
print('run time is %s' %(stop_time-start_time))

print('主线程')


# run time is 49.64094281196594
# 主线程

# run time is 11.664679050445557
# 主线程

六、线程优先级队列

Queue模块中提供了同步的、线程安全的队列类,包括:

  • FIFO(先入先出)队列 Queue
  • LIFO(后入先出)队列 LifoQueue
  • 优先级队列 PriorityQueue

这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用,可以使用队列来实现线程间的同步。

Queue模块中的常用方法:

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True,反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.full 与 maxsize 大小对应
  • Queue.get([block[, timeout]])获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)
  • Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
  • Queue.join() 实际上意味着等到队列为空,再执行别的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import threading
import time
from queue import Queue
# Queue.qsize() 返回队列的大小
# Queue.empty() 如果队列为空,返回True,反之False
# Queue.full() 如果队列满了,返回True,反之False
# Queue.full 与 maxsize 大小对应
# Queue.get([block[, timeout]])获取队列,timeout等待时间
# Queue.get_nowait() 相当Queue.get(False)
# Queue.put(item) 写入队列,timeout等待时间
# Queue.put_nowait(item) 相当Queue.put(item, False)
# Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
# Queue.join() 实际上意味着等到队列为空,再执行别的操作

class Producer(threading.Thread):
def run(self):
global queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count = count +1
msg = '生成产品'+str(count)
queue.put(msg)
print(msg)
time.sleep(0.5)

class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + '消费了 '+queue.get()
print(msg)
time.sleep(1)

if __name__ == '__main__':
queue = Queue()
for i in range(500):
queue.put('初始产品'+str(i))

for i in range(2):
p = Producer()
p.start()

for i in range(5):
c = Consumer()
c.start()

七、ThreadLocal

ThreadLoca可以解决参数在一个线程中各个函数之间互相传递的问题。

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import threading

local_school = threading.local() # 创建全局ThreadLocal对象
# 全局变量local_school就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。
# 但每个属性如local_school.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
# 可以理解为全局变量local_school是一个dict,不但可以用local_school.student,还可以绑定其他变量

def process_student():
# 获取当前线程关联的student属性
std = local_school.student
print('Student %s in %s ' % (std, threading.current_thread().name))

def process_thread(name):
# 绑定ThreadLocal的student
local_school.student = name
process_student()

t1 = threading.Thread(target=process_thread, args=('Jack'), name='Thread-t1')
t2 = threading.Thread(target=process_thread, args=('Tom'), name='Thread-t2')
t1.start()
t2.start()
t1.join()
t2.join()

持续更新…

最后更新: 2018年12月05日 15:02

原始链接: http://pythonfood.github.io/2017/12/30/python多线程/

× 多少都行~
打赏二维码