TOC
线程同步
多线程都存在所谓的线程安全的问题,在多线程的应用场景下,多个子线程去操作同一个数据,一般情况我们根本无法预期它最终的结果,本来我们是想并行,但是并行之后,连结果都不知道是什么,那么这多个线程之间,如何让它们能够协作,这就是线程同步的技术,所要做的事情;
目前说的是线程同步技术,即,同一个进程下的不同子线程如何来协同工作,在之前的例子也能够看到,在多线程场景如果涉及到全局作用域的数据操作,是很容易出现线程安全的问题的,那么解决线程同步的技术有很多,如Event、Lock、Condition、Semaphore和信号量等,这些技术都用得到;
Event事件
Event事件,我们可以认为它是一个开关量,它是线程间通信机制中最简单的实现,它使用一个内部的标记Flag,通过Flag的True或Flase的变化来进行操作,在前面说过的线程无法start两次的原因就是因为线程内部使用了Event这个开关量来实现的,Event使用也非常简单,如下;
set():标记设置为True;
clear():标记设置为False;
is_set():查看标记是否为True;
wait(timeout=None):设置等待标记为True的时长,None为无限期等待,如果超时了都无法拿到True,则返回False
案例
老板雇用了一个工人,让这个工人生产杯子,在生产杯子的过程中,老板一直等待,直到工人生产了10个杯子,老板才将货物发出去;
那么对于这么一个案例,我们可以分两步做,两个函数,一个为工人函数,一个为老板函数,工人负责生产杯子,老板负责等待生产完成,如下,在没有使用Python提供的线程同步的技术来实现;
Flag=False
def worker():
global Flag
num=0
while num <= 10:
num+=1
time.sleep(1)
Flag=True
def boss():
while True:
if Flag:
print(True)
break
thread1=threading.Thread(target=boss,name="boos")
thread2=threading.Thread(target=worker,name="worker-1")
thread1.start()
thread2.start()
# True
上面这种方法有一个非常大的弊端,就是boss函数,boss函数里面的while True耗费了大量的CPU时间片,其实我们期望的是,我们有一个Flag,在这个Flag在未转变为我们想要的状态时,boss线程可以处于睡眠状态,不去浪费CPU的时间,就是将boss线程阻塞,那么当这个Flag转变为期望的状态时,boss线程立马能够从睡眠的状态切换到运行状态,执行boss函数下面的指令;
那么Python中的Event就提供了很好的线程同步解决方式,首先需要创建一个Event对象,这个Event对象默认为False,那么当工人函数10个杯子处理完成之后,可以使用set()方法,将这个Event对象的开关量,置为True,那么当boss函数发现这个Event对象的开关量为True时,才会继续执行下面的指令,当未转变为True时,boss函数一直处于阻塞状态;
在最初说线程几大状态时,就说过,一个阻塞态的线程是不会占用CPU的资源的,因为阻塞状态是指线程因为某种原因放弃了CPU使用权,让出了cpu timeslice,暂时停止运行。直到线程进入可运行(runnable)状态,才有机会再次获得cpu timeslice 转到运行(running)状态,而对于Event来讲,就是当对这个Event对象使用了set()方法,才会进入runnable状态;
对于Event的使用也非常简单,如下示例;
event=threading.Event() # 创建Event对象,初始状态是False
def worker():
num=0
while num <= 10:
num+=1
time.sleep(1)
event.set() # 当10个杯子制作完成之后,设置event对象为True
def boos():
event.wait() # 等待event对象状态转变为True,此时处于阻塞状态
print(True)
thread1=threading.Thread(target=boos,name="boos")
thread2=threading.Thread(target=worker,name="worker-1")
thread1.start()
thread2.start()
# True
那么对于线程同步,我们可以在多线程的场景下,当一个线程访问某个数据的时候,使其他线程不能访问这个数据,通过这种方式去完成数据的操作,操作完成之后,其他线程才能访问这个数据,这样的话,我们就可以保证数据是安全的;
在多线程的情况下,我们也可以使用同一个Event对象,Event对象不限制等待线程的个数,wait不限制个数,可以同时有N个线程一起等,它们之间是“一对多”的关系,所有的线程等待一个Event对象,当这个Event对象的开关量为True时,那么所有等待的线程将全部激活;
event=threading.Event() # 创建Event对象,初始状态是False
def worker():
while not event.wait(1): # 一直等待event开关量变成True
print("%s is waiting..."%threading.current_thread().name)
for _ in range(2):
threading.Thread(target=worker).start()
event.wait(3) # 阻塞主进程3秒
event.set()
# Thread-1 is waiting...
# Thread-2 is waiting...
# Thread-2 is waiting...
# Thread-1 is waiting...
总结
可以看到如上代码,如果想实现线程一秒钟执行一次的效果,就可以使用wait来做,使其一秒钟执行一次,这个wait的超时时间是1秒,当然,也可以更长时间;
但是需要的知道的时,这个是超时时间,并非阻塞时间,在这个超时时间内,只要是开关量置为True就会立即停止wait,不会等到这个超时时间结束,换而言之就是wait方法里面的参数是一个超时时间,并非是一个等待间隔时间;
Timer定时器
在threading模块下,有一个Timer类,它是Event的一个最佳实践,它集成自Thread类,通过源码可以看到,它重写了Thread类的run方法,没有重写start方法,它也没法重写,因为start方法主要的作用就是调用操作系统内核执行创建一个线程,这些都是底层的C来实现的;
同时,通过源码,可以看到,它是借助Event来实现线程的阻塞,一旦使用Timer类生成一个Thread对象时,就会创建一个Event对象,一旦进入run方法,就会进入线程阻塞,那么在这个阻塞的过程中,我们也可以调用Timer提供的cancel方法来绕过这个线程函数的执行,即取消这个定时器;
一般情况下,这个Timer类主要是用于一个定时器的作用,它定时器的实现主要就是借助Event线程同步的技术,如下,这就是定时器的所有源码;
class Timer(Thread):
def __init__(self, interval, function, args=None, kwargs=None):
Thread.__init__(self)
self.interval = interval
self.function = function
self.args = args if args is not None else []
self.kwargs = kwargs if kwargs is not None else {}
self.finished = Event()
def cancel(self):
self.finished.set()
def run(self):
self.finished.wait(self.interval)
if not self.finished.is_set():
self.function(*self.args, **self.kwargs)
self.finished.set()
延迟线程
第一个作用就是实现延迟执行,延迟只是就真的是延迟执行,它会首先创建好线程,然后使线程调用一个Event对象,使其进入阻塞状态,然后wait等待多少秒之后,才会真正去执行这个线程工作函数;
def worker():
print(threading.current_thread().name)
timer=threading.Timer(2,worker)
timer.start()
# 等待了2秒钟之后,打印出如下结果
# Thread-1
定时器
第二个作用就是定时器,即每隔多少秒之后,开辟一个线程,然后去执行该线程的工作函数,就是一个定时器,tkinter窗口的after函数就是这样;
def worker():
print(threading.enumerate())
timer=threading.Timer(1,worker)
timer.start()
worker()
# 每隔一秒,开辟一个新的线程去执行工作函数
# [<_MainThread(MainThread, started 140734806838720)>]
# [<_MainThread(MainThread, stopped 140734806838720)>, <Timer(Thread-1, started 123145503813632)>]
# [<_MainThread(MainThread, stopped 140734806838720)>, <Timer(Thread-2, started 123145509068800)>]
案例
如下,就是一个摘自互联网的一个60s验证码自动过期的案例;
import random
from threading import Timer
# 定义Code类
class Code:
# 初始化时调用缓存
def __init__(self):
self.make_cache()
def make_cache(self, interval=60):
# 先生成一个验证码
self.cache = self.make_code()
print(self.cache)
# 开启定时器,60s后重新生成验证码
self.t = Timer(interval, self.make_cache)
self.t.start()
# 随机生成4位数验证码
def make_code(self, n=4):
res = ''
for i in range(n):
s1 = str(random.randint(0, 9))
s2 = chr(random.randint(65, 90))
res += random.choice([s1, s2])
return res
# 验证验证码
def check(self):
while True:
code = input('请输入验证码(不区分大小写):').strip()
if code.upper() == self.cache:
print('验证码输入正确')
# 正确输入验证码后,取消定时器任务
self.t.cancel()
break
obj = Code()
obj.check()