TOC

    为了解决多线程的问题,出现了很多技术,前面刚刚看到了Event,而Event是一个比较但简单的东西,它就是等待着一个开关量的变化,或者说一个事件的变化,Event是从False变为True,我们可以通过Event的is_set方法来判断是否发生了变化,总之,只要拿到了就是True,拿不到就是False,那么有了Event就能解决很多问题,在源代码中也经常能够看到它的身影,包括循环等待,都可以使用它,比如ATM机取钱,目前只有一台ATM机,那么在第一个人进去门自动会上锁,后来的人只有等到这个锁开了之后才能进去取钱,这和独占锁其实是一个意思;
    那么接下来需要讲解的是一个称之为锁(Lock)的概念,它是高并发的解决方案当中其中就说到了争抢和锁,我们说排队没必要加锁,因为他们说有顺序的,所有的任务一个一个有序来,但凡是锁,就是就期望独占,或者说优先使用,大多数情况下都是这样,这个时候往往都会上锁,因为如果不上锁,那么很可能出现争抢的情况,线程特定的代码还没有执行完成,这个资源就被其他的线程抢走了;
    一般情况下,上锁的目的就是为了独占,线程使用完了这个资源之后再去释放锁,在没用完之前其他都线程全部都得等待,这种我们也一般称之为"独占锁" ,独占锁是目前使用得最多的一种锁,对于独占锁而言,因为资源只有一个,所以说当资源紧缺的时候,又有多个线程都要使用这个资源,那么这个时候就必定要上锁,解决线程间争抢的问题,一旦不上锁,可能指令还没执行完,资源就被抢走了;

独占锁

     在Python的threading多线程模块当中,提供了两种锁,一种是Lock,一种是RLock,Lock即独占锁,凡事存在共享资源争抢的情况下都可以上锁,从而保证在特定得时间内只有一个使用者可以完全使用这个共享的资源,这是独占锁的一种模式,也有那种并非独占的锁,比如优先使用,或者更多的使用,但是独占锁是用得最多的。
    锁是一个非常实用的技术,也在各个技术领域内大批量应用,不论是数据库、中间件、消息队列等等等,使用非常广泛,加锁的时机是非常重要的,凡事存在共享的资源可能会出现争抢的情况下,都可以上锁,从而保证只有一个使用者,可以完全独享这个资源的使用;
    对于独占锁来讲,它是一旦抢到这个资源,就会马上上锁,那么在上锁期间,只有这个上锁的线程能够使用,那么其他的线程只能等待这个锁被释放,它们都将被阻塞,然后进行新一轮的锁争抢,当然,线程也可以不阻塞;
acquire(blocking=True,timeout=-1):默认阻塞,阻塞可以设置超时时间,blocking为False时为非阻塞,timeout禁止设置,成功获得锁,将返回True,否则返回False;
release():释放锁,可以从任何线程中调用释放,以上锁的锁会被重置为unlocked,未上锁的锁调用该方法,则会抛出RuntimeError异常;

基本使用

    对于锁的基本使用方式如下,需要注意的是,一旦我们设置了blocking为False时,其实就没必要设置timeout了,因为blocking为False时,会去尝试获得锁,一旦抢不到锁,就不再等待这个锁;
lock=threading.Lock()
print(lock.acquire()) # 争抢锁
# True
print(lock.acquire(timeout=2)) # 争抢锁
# False
print(lock.acquire(blocking=False)) # 争抢锁,blocking=False表示不阻塞,意思就是尝试获取锁,能抢到就抢,抢不到就返回False
# False
print(lock.acquire()) # 争抢锁,此时已形成死锁,因为第一个lock.acquire()获取到这个锁之后,一直没有release,所以此处只会一直处于阻塞状态
    对于锁而言,在任何一个线程中,都是可以访问的,那么在众多线程中,对同一个锁争抢,当在争抢的时候,获得锁的线程没有release,那么其他线程只能阻塞,或者不阻塞,总之只要获取不到这一把锁,返回就是False;
Lock=threading.Lock()
Lock.acquire()

def worker(lock):
    print("进入工作线程")
    lock.acquire()
    print("获得锁,开始处理")
    lock.release()
    print("处理完成,锁已释放")

threading.Thread(target=worker,args=(Lock,)).start()
while True:
    cmd=input("请输入选项q退出/r释放锁:").strip()
    if cmd == 'q':
        break
    elif cmd == 'r':
        Lock.release()
        time.sleep(1)
    else:
        print(threading.enumerate())
# 进入工作线程
# 请输入选项q退出/r释放锁:
# [<_MainThread(MainThread, started 140734927617472)>, <Thread(Thread-1, started 123145348476928)>]
# 请输入选项q退出/r释放锁:r
# 获得锁,开始处理
# 处理完成,锁已释放
# 请输入选项q退出/r释放锁:
    可以看到上面的脚本,主线程一创建锁,就占用了这个锁资源,因此当上文中的子线程启动了之后,打印出了第一行日志"进入工作线程",没有继续往下执行了,因为此时,该子线程一直都在等待这个锁的释放;
    那么当,我们在使用命令"r"释放了这个锁的时候,就会发现,子线程就立马从阻塞态转变为了运行态,立马打印出了lock.acquire()下面的结果,这也就是独占锁的一种表现,只要没有释放锁,不管什么场景下,都得等待着;
    上述是一个子线程和主线程同时去争抢一把锁,那么当有多个子线程时,其实它们都是一样的,但是不管有多少个线程,一般而言,相同的业务场景锁只有一把,所有的子线程同时去争抢这一把锁,如下;
Lock=threading.Lock()
Lock.acquire()

def worker(lock):
    print("进入工作线程")
    lock.acquire()
    print("%s获得锁,开始处理"%threading.current_thread().name)

for id in range(5):
    threading.Thread(target=worker,daemon=True,args=(Lock,),name="worker-%s"%id).start()
while True:
    cmd=input("请输入选项q退出/r释放锁:").strip()
    if cmd == 'q':
        break
    elif cmd == 'r':
        Lock.release()
        time.sleep(1)
    else:
        print(threading.enumerate())
# 进入工作线程
# 进入工作线程
# 进入工作线程
# 进入工作线程
# 进入工作线程
# 请输入选项q退出/r释放锁:
# [<_MainThread(MainThread, started 140734756306368)>, <Thread(worker-0, started daemon 123145575800832)>, <Thread(worker-1, started daemon 123145581056000)>, <Thread(worker-3, started daemon 123145591566336)>, <Thread(worker-4, started daemon 123145596821504)>, <Thread(worker-2, started daemon 123145586311168)>]
# 请输入选项q退出/r释放锁:r
# worker-0获得锁,开始处理
# 请输入选项q退出/r释放锁:r
# worker-1获得锁,开始处理
# 请输入选项q退出/r释放锁:
    可以看到上述代码,在程序启动之初,就立马创建了五个线程,这个五个线程起初全部是阻塞态,因为它们都需要等待锁,那么待我们手动使用"r"命令,来释放锁时,会发现锁立马就会被抢走,那么此时,抢走后的这个线程将专为运行态,那么此时,还有四个线程依旧处于阻塞状态;
    所以说,它们其实是一种争抢的模式去获得锁,但是我们也不要被上述的结果给混淆了,其实我们是无法预测到底是哪一个线程会抢到锁,上述第一个是worker-0,第二个是worker-1这并不能说明,在多线程模型下的锁是有顺序的,其实它们是无序的;
    此外,对于上面的示例,其实它就是一种死锁的现象,因为没有哪一个线程里面有对这个锁release的语句,所有的线程都在等着锁释放,结果没有一个线程会去释放这把锁,死锁这个概念,就是指的一把无法获取的锁,其实死锁本身的概念是这样的,表示这把锁无解,没有谁能将其打开,所有线程全部阻塞;
  • 注意:需要注意的是,上面这种编码方式其实是存在很大的问题的,因为一般来讲,锁这个东西,都是哪个线程获取了锁,就由哪个线程去释放锁,上述是例子是子线程拿到了锁,由主线程去释放这个锁,这样是不可取的,存在非常大的问题,当然,此处只是为了演示锁的应用;

with语句

    对于锁来讲,它也支持上下文管理,即,支持with语句,虽然无法查看Lock的源码,但是可以从RLock中看到,在使用with语句时,会调用Lock类的__enter__方法,此时会调用Lock对象的acquire方法,那么在退出时,会调用类的__exit__方法,从而调用lock对象的release方法,;
lock=threading.Lock()
with lock:
    print(1)

案例一

    有一个例子,如一个工厂收到一份订单,这个订单要求生产1000个杯子,但是这个工厂非常小,只有10个工人,那么怎么能够保证准确无误的不浪费的生产1000个杯子,这是我们需要思考的;
    有的工人生产速度快,有的工人生产速度慢,它们的生产的速度并不是一致的,所以我们需要解决的问题是,10个工人,齐头并进生产1000个杯子应该怎么使用多线程的方式去处理;
cups=[]
def worker(count,lock_obj):
    while True: # 因为每个工人都在不停的生产,所以此处需要用到while True
        lock.acquire() # 在工人获取是否已经生产足量的杯子时,加锁
        if len(cups) < count:
            time.sleep(0.001) # 此处必须要加入time.sleep,因为此时要阻塞线程,让线程自动放弃CPU使用权,因为CPU的计算能力太快了,如果不使其阻塞,可能10个线程,第一个线程就直接将1000个杯子生产完了,所以,必须阻塞,这样,其他线程才能有机会被调度
            cups.append(threading.current_thread().name)
            lock_obj.release() # 此处的解锁很关键,我们需要在数据正式入库的之后才能解锁,只有数据入库之后,别人使用if len(cups) < count才会拿到目前正确的杯子数量
        else:
            lock_obj.release() # 此处也需要解锁,因为如果上面的if条件为False,那么上面的代码段就不会执行,因此锁就得不到释放,此处的解锁就是为了解决上面if判断的问题;
            break

lock=threading.Lock()
for id in range(100):
    thread=threading.Thread(target=worker,name="worker-%s"%id,args=(1000,lock))
    thread.start()
    可以看到上述代码,那么首先,需要先声明一点,代码中的time.sleep主要是因为需要使线程进入阻塞态,因为CPU的执行效率太高,可能在很快的一段时间就执行完成了所有代码,可能一个线程就将这个1000个杯子就直接造完了,所以,我们需要借助time.sleep()来使当前线程主动放弃CPU的使用权,从而达到每个线程在启动时,先阻塞一段时间,让其他现在有调度的机会;
    其次,对于以上代码,我们主要是看加锁和释放锁的两个地方,这个加锁的时间不宜早不宜晚,过早的加锁,会影响多线程的执行效率,过晚的加锁,可能会影响最终的结果不正确,所以加锁的位置是一个非常重要的关键点,在需要正好在准备操作数据的那一刻进行加锁,这样才能更好的利用多线程的特性。
    上文加锁是在判断当前的杯子的数量是否已经满足之后,这个位置刚刚好,如果判断杯子数量是否已经满足之前,即while True之前加锁,那么就把while循环锁在里面了,可能第一个线程,获取锁之后进入了while True,然后就直接将这1000个杯子直接生产完成了,主要原因是因为加锁太早了,这是普遍现象,那么对于如上的代码,如果在while True之前加锁,那么可以看到代码中的while循环内部,第一次循环会释放锁,第二次循环又会释放锁,这样就出现了多次释放锁的问题,抛出release unlocked lock异常;
    那么对于释放锁也是个很重要的关键点,上文中释放锁有两处,第一次释放是在杯子数量未满1000时,第二处是在杯子已满1000时,第一个主要的作用是在一个线程生产出一个杯子之后,立马就释放锁,这样,那么当其他线程拿到这个锁的时候,才是当前线程将杯子制作完成之后才获得的这个锁,如果在杯子未制作完成别的线程就拿到了这个锁,那么可能就出现超出1000个杯子问题,因为每个线程在正式生产之前首先会看看目前有没有满足1000个杯子,如果没有满足才会生产,满足就不继续生产,所以我们应该在每个获得锁的线程将杯子制作完成并将制作完成的杯子加入到杯子池中,才能让其他的线程去看当前是否已满足1000个杯子;
    这样别的线程就能拿到锁,拿到锁然后进行判断,判断杯子是否满足一千,不满足继续生产然后释放锁,满足释放获取到的锁,此线程结束;

案例二

    计数器类,可以加可以减
class Counter():
    def __init__(self):
        self._value=0
    @property
    def value(self):
        return self._value
    def inc(self):
        self._value+=1
    def dec(self):
        self._value-=1

def balance(obj,loop):
    for i in range(loop):
        for j in range(-50,50):
            if j < 0:
                obj.dec()
            else:
                obj.inc()
    else:
        print(obj.value)
c=Counter()
for i in range(10):
    thread=threading.Thread(target=balance,args=(c,1000))
    thread.start()
# ...
# 10
    可以看到上述代码,一个计数器类,10个线程针对这个计数器类进行100次计算,分别为50次加1和50次减1,一般情况,在不考虑多线程带来的线程安全的情况下,不管多少个线程计算多少次,他门最终的结果应该都是0,但是可以很明显的看到上述的结果为10,其实如果将计算的数量变小,比如把100修改为10,这样我们是可以得到正确的结果0的,那么我们改为100之后,把整个线程的执行时间拉长了,这在多线程情况下就出现了线程安全的问题,问题主要有两个;
问题一
    问题一,为线程安全问题,这种+=或者-+的赋值语句分三步,先获取原值,然后计算,最后赋值,假如,线程1在做加法的时候,首先会去获取到目前self._value的值,那么在此时,这个获取self._value的值时,线程2、线程3...在此时也可能会去获取self._value的值,如果此时self._value的值为999,那么所有的线程在这一时刻可能获取到的值都是999,那么线程1最终计算的结果是1000,线程2最终计算的结果也是1000,线程3也是一样,本来正确到结果是1002,正因为这种在获取值的时候没有加锁,从而导致中间有很多累加的过程就全丢掉了,这就说明Counter这个类线程不安全;
    通过上述的描述,我们找到问题的根源了,所以此时我们应该在Counter这个类上面加锁,就是从看到self._value这个值,到self._value赋值完成这个过程,是不允许其他线程能够访问self._value的,如下示例;
class Counter():
    def __init__(self):
        self._value=0
        self._lock=threading.Lock()
    @property
    def value(self):
        # with self._lock: # 此处是可以不加的,下面的赋值语句并非从此处获取值,而是从构造函数里面去获取self._value的值
            return self._value
    def inc(self):
        with self._lock:
            self._value+=1
    def dec(self):
        with self._lock:
            self._value-=1

def balance(obj,loop):
    for i in range(loop):
        for j in range(-50,50):
            if j < 0:
                obj.dec()
            else:
                obj.inc()
    else:
        print(obj.value)
c=Counter()
for i in range(10):
    thread=threading.Thread(target=balance,args=(c,1000))
    thread.start()
# ...
# 0
问题二
    可以看到如上代码,如果我们期望在计算结束之后打印出最终的结果,那么上面的代码是没办法实现的,此时,我们可能会想到在for循环里面添加一个join,但是需要知道的是,join这个方法主要作用是暂停主线程的执行,也就是第一个主线程start,并且join之后,for循环就不会继续往下执行了,由此第二个线程也没法创建和启动,这样,这个代码就不再是并行了,而是串形;
    其实这种情况我们也有很多方式可以实现,比如说,我们可以另启一个while循环,不停的判断当前的线程是否为1,这个1是主线程,一旦只剩下主线程,那么就打印最终计算的value值;
while True:
    time.sleep(1)
    if threading.active_count() == 1:
        print(c.value)
        break
    当然,我们也可以使用join的方法来实现,将线程对象加入到一个list里面去,然后利用for循环将其一个一个进行join,这种方法,就不会直接阻塞主线程的操作;
    那么在这种情况下,假设有3个线程,第一个线程需要1秒执行完成,第二个线程需要3秒执行完成,第三个线程需要5秒执行完成,那么主线程最少需要join的时间为5秒,以最大为准,因为他们是并行执行的,所以在第三个线程执行完成之后,第一个和第二个线程早就执行完了;
class Counter():
    def __init__(self):
        self._value=0
        self._lock=threading.Lock()
    @property
    def value(self):
       with self._lock: # 在获取的时候,其他的全部阻塞
            return self._value
    def inc(self):
        with self._lock:
            self._value+=1
    def dec(self):
        with self._lock:
            self._value-=1

def balance(obj,loop):
    for i in range(loop):
        for j in range(-50,50):
            if j < 0:
                obj.dec()
            else:
                obj.inc()
c=Counter()
thread_list=[]
for i in range(10):
    thread=threading.Thread(target=balance,args=(c,1000))
    thread.start()
    thread_list.append(thread)

for thread in thread_list:
    thread.join()
print(c.value) # 0

死锁

    加锁解锁是比较简单的,但是加锁后解锁前,还有一些代码执行,就有可能有异常,一旦出现异常,程序被异常终止,锁是没有被释放的,这就会导致其他线程会无限的等待下去,这种现象我们就称之为死锁,产生死锁的原因有很多,包括循环等待或者简介等待,A等B,B等A等;
    还好,Python提供了with语法,它保证在出现异常、return、break这些情况下,都可以保证退出with之后释放锁,所以更加推荐使用with语法,当然,我们也可以使用try...finally语句;

锁的应用场景

    在同一个资源被所有的线程或者进程使用时,这个时候都会出现资源争用的问题,从而在这种场景下都会使用锁,来解决在资源争抢的情况下带来的线程安全问题;
    如果全部是读取一个资源时,一般是不需要锁的,因为只是读取,不管跨多少个线程,反正没人改,但是只要是如果需要改变这个资源的时候,就需要用到锁了;

锁的注意事项

    第一个,能不用则不用,使用了锁,多线程访问被锁的资源时,就形成了串行,一个完了,另一个才来,这就锁串行的效果,要么排队,要么争抢;
    第二个,加锁的时间越短越好,不需要就立即释放锁;
    第三个,一定要避免死锁;

非阻塞锁的使用

    非阻塞锁就是线程在运行起来时,它这个阻塞的现象,我们可以将其去掉,变成一个非阻塞,非阻塞锁和阻塞锁两种实现代码是不一样的,如下;
Lock=threading.Lock()

def worker(lock):
    print('enter')
    while True:
        time.sleep(1)
        Flag=lock.acquire(False) # 判断是否拿到锁,拿到锁才执行逻辑
        if Flag:
            print(True)
            lock.release()
            break
        else:
            print(False)

    print('exit')

for i in range(2):
    thread=threading.Thread(target=worker,name='worker-%s'%i,args=(Lock,))
    thread.start()
# enter
# enter
# True
# exit
# True
# exit

可重入锁

    除了Lock之外,在Python中还有一个RLock,即可重入锁,也叫递归锁,它是一个与线程相关的锁,线程A可以获得可重入锁,并且该线程还可以多次成功获取,不会阻塞,但是只要换一个线程就必定会阻塞,或者选择不阻塞,但是一定拿不到这把锁;
    可重入锁,指的是只要一个线程里面获取到一回锁,就可以在该线程里面重复获取多次锁,但是需要知道的是,获取了多少次锁,就得释放多少次锁,否则就其他线程就死锁了;
Lock=threading.RLock()

print(Lock.acquire())
print(Lock.acquire())
Lock.release()

def sub(lock):
    print("enter")
    print(lock.acquire())
    print('exit')
threading.Thread(target=sub,args=(Lock,)).start()

# True
# True
# enter  # 此时进入阻塞状态
    可以看到上述代码,在我们使用可重入锁时,获取了2次,但是只释放了一次,在这种情况下在主线程内部是可以继续获得锁的,这没什么问题,但是除了主线程的其他线程是没有资格去争抢这把锁的,由此,可以得出,RLock主要与线程相关,同一线程内可以获取多次锁,但获取多次的同时也需要释放多次锁,只要只要释放次数和获取次数不一致,那么其他线程无法获得这个锁;
    另外,需要注意的是,释放锁,必须在加锁的线程里面去释放,比如在A线程中获取锁,但是我们在B现在中去释放锁,这种情况是行不通的,这个锁和线程相关,无法代替其他线程释放;
lock=threading.RLock()
print(lock.acquire())
print(lock.acquire())
lock.release()
def sub(Lock):
    print("enter")
    Lock.release()
    print("exit")
threading.Thread(target=sub,args=(lock,)).start() 
# cannot release un-acquired lock
    通过源码也可以看到,RLock在release时,首先会判断release的线程是否为当前线程,如果不是则抛出RuntimeError异常,如果是,那么就将self._count这个属性-1,那么一旦count等于0,则将锁的属主设置为None,然后真正的将这个锁进行释放,此时,其他现在就可以来进行争抢这个锁了;
    def release(self):
        if self._owner != get_ident():
            raise RuntimeError("cannot release un-acquired lock")
        self._count = count = self._count - 1
        if not count:
            self._owner = None
            self._block.release()
    def __exit__(self, t, v, tb):
        self.release()

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注