TOC

Condition条件变量

   Condition类似条件变量,它里面要传入一个锁,这个锁一般是可重入锁,它与锁相关联,这个锁可以是外部传入的锁或是系统默认创建的锁。Condition主要是用于生产者和消费者模型,为了解决生产者和消费者速度不匹配的问题,因为生产者一般情况下都比消费者慢;
    使用Condition必须先acquie,用完了之后需要release,因为内部使用了锁,默认使用RLock锁,也可以使用Lock锁,最好的方式死使用with上下文管理的方式进程编码;
acquire(blocking=True,timeout=None):获取锁;
wait(self,timeout=None):等待或超时;
notify(n-1):唤醒至多指定数目个等待的线程,没有等待的线程就不会有任何操作;
notify_all():唤醒所有等待的线程;
    还是之前的例子,假设有两个监工,盯着一个工人,生产一千只杯子,那么这个时候,我们可以通过Event来实现,如下;
def boss():
    print("I am waching")
    event.wait()
    print("Good Job")

def worker(count=1000):
    print("Starting...")
    cups=[]
    while len(cups) <=count:
        time.sleep(0.001)
        cups.append(1)
    else:
        print("finished")
    event.set()

event=threading.Event()
thread1=threading.Thread(target=boss)
thread2=threading.Thread(target=boss)
thread3=threading.Thread(target=worker,args=(1000,))
thread1.start()
thread2.start()
thread3.start()
# I am waching
# I am waching
# Starting...
# finished
# Good Job
# Good Job
    但是Event有个问题,它只能够实现这种一对全体的这种通知方式,如果我们想对其中的某一些,而不是全体,这个时候Condition就有更大的灵活性了,但是,我们需要知道Event内部其实就是利用Condition来实现的,所以我们可以认为Event是被封装过的Condition;
cond=threading.Condition()
def boss():
    print("I am waching")
    cond.acquire()
    cond.wait()
    cond.release()
    print("Good Job")

def worker(count=1000):
    print("Starting...")
    cups=[]
    while len(cups) <count:
        time.sleep(0.001)
        cups.append(1)
    print("finished",len(cups))
    cond.acquire()
    cond.notify_all()
    cond.release()
thread_boss1=threading.Thread(target=boss)
thread_boss2=threading.Thread(target=boss)
thread_worker1=threading.Thread(target=worker,args=(1000,))
thread_boss1.start()
thread_boss2.start()
thread_worker1.start()
# I am waching
# I am waching
# Starting...
# finished 1000
# Good Job
# Good Job
    Condition更多的是用在生产者消费者模型当中,主要为了解决消费者消费的速度大于生产者生产速度的问题,生产者有的时候一秒钟能够达到几千甚至几百个,而消费者一秒钟甚至只能消费几个,也就是说,生产的速度可能往往大于消费的速度;
class Dispatcher():
    def __init__(self):
        self.data=None
    def produce(self): # 生产者
        print("starting...")
        while True:
            time.sleep(1) # 模拟生产速度慢
            self.data=random.randint(1,50)
    def consume(self):
        print("consume...")
        while True:
            time.sleep(0.5) # 模拟消费速度快
            print(self.data)

d1=Dispatcher()
consume=threading.Thread(target=d1.consume,name="consume")
produce=threading.Thread(target=d1.produce,name="produce")
consume.start()
produce.start()
# consume...
# starting...
# None
# 21
# 21
# 35
# 35
# ...
    那么如果在某一时刻,消费者的消费变慢了,比如凌晨夜晚,业务量变小了,也可能出现生产者的速度快于消费者的情况,那么针对这种情况,对于这种生产者消费者的模型下,即使没有新的消息产生,生产者也会一直处于轮训的状态,那么针对这种情况,生产者快于消费者,就白白浪费了大量的主机资源,那么针对这种情况,我们去使用Condition消息通知的机制,更加的合适,在消息没有生产出来之前,消费者处于阻塞状态,一旦消费者消息生产完成之后,通知生产者来消费数据,这样对性能消耗更小;
class Dispatcher():
    def __init__(self):
        self.data=None
        self.cond=threading.Condition(threading.Lock())
    def produce(self): # 生产者
        print("starting...")
        while True:
            time.sleep(0.5) # 模拟生产速度慢
            self.data=random.randint(1,50)
            with self.cond:
                self.cond.notify_all()
    def consume(self):
        print("consume...")
        while True:
            with self.cond:
                self.cond.wait()
            time.sleep(1) # 模拟消费速度快
            print(self.data)

d1=Dispatcher()
consume1=threading.Thread(target=d1.consume,name="consume")
produce=threading.Thread(target=d1.produce,name="produce")
consume1.start()
produce.start()
# consume...
# starting...
# None
# 44
# 36
# 16
# ...
    那么针对上面的代码,其实也有一个非常大的问题,就是当生产大于消费时,我们可以看到上面的代码,一旦出现这种情况,那么之前的数据就会被刷掉,这是不允许的,这个时候往往会加入队列的技术,因为一旦生产速度得到大幅度的提升,那么避免不了的就是生产速度大于消费速度,这个时候我们就需要一个缓冲区,来缓存还没有被消费的历史数据;
class Dispatcher():
    def __init__(self):
        self.queue=queue.Queue()
        self.cond=threading.Condition(threading.Lock())
    def produce(self): # 生产者
        print("starting...")
        while True:
            time.sleep(0.5) # 模拟生产速度快
            with self.cond:
                self.queue.put(random.randint(1,50))
                self.cond.notify_all()

    def consume(self):
        print("consume...")
        while True:
            with self.cond:
                self.cond.wait()
            time.sleep(1) # 模拟消费速度慢
            print(self.queue.get())

d1=Dispatcher()
consume1=threading.Thread(target=d1.consume,name="consume")
produce=threading.Thread(target=d1.produce,name="produce")
consume1.start()
produce.start()
# consume...
# starting...
# 19
# 36
# 45
# ...
    可以看到,我们可以使用notify_all(),来实现广播,当然,我们也可以使用notify()实现单播,或者多播,这个就主要看业务了;

Semaphore信号量

    信号量和Lock很像,但是信号量内部维护着一个倒计算的计数器,每acquire一次都会减少1,当计数器为0,就阻塞请求,直到其他线程对信号量release后,计数器大于0,才能恢复阻塞的线程,所以计数器永远不可能小于0,常用方法如下;
Semaphore(value=1):构造方法,value必须大于0,否则抛出ValueError异常;
acquire(blocking=True,timeout=None):获取信号量,计数器减1,获取成功则返回True;
release():释放信号量,计数器加1;
    如下示例,在创建Semaphore对象时,我们需要给定一个value值,这个值表示,这个锁最多可以acquire多少次,一旦超出这个次数,将阻塞,为了演示,下面设定了blocking=False;
s = threading.Semaphore(value=2)
print(s.acquire(blocking=False))
print(s.acquire(blocking=False))
print(s.acquire(blocking=False))
print(s.__dict__) # 我们也可以通过字典属性的方式查询到目前的value值
# True
# True
# False
# {'_cond': <Condition(<unlocked _thread.lock object at 0x1028c0d78>, 0)>, '_value': 0}

release超界

    通过上面的代码可以看到,acquire的次数是不可以超出预定值的,但是这个Semaphore也存在一个问题,就是它可以release多次,这里的主要的问题是,Semaphore在归还的时候,没有做边界的判断,它可以让你无限release,使得可acquire可以超出预定的value界限,所以它也有它的问题;
    如下,可以很明显的看到,我们最大只可以acquire两次,但是我们在进行release一次之后,竟然可以acquire三次;
s = threading.Semaphore(value=2)
s.release()
print(s._value)
print(s.acquire())
print(s.acquire())
print(s.acquire())
# 3
# True
# True
# True
    如果需要解决这种无限release的问题,我们可以修改成有界信号量,即BoundedSemaphore,将Semaphore更换为BoundedSemaphore即可,其他都不用修改,BoundedSemaphore不允许使用release超出初始值,否则,抛出ValueError异常;

线程共享

    需要知道的是,Semaphore是可以在多个线程中操作同一个对象的,如下,我们新建一个线程进行release一样可以将Semaphore的value值加大;
s = threading.Semaphore(value=2)
threading.Thread(target=lambda s: s.release(), args=(s,)).start()
print(s._value)  # 3

线程池应用场景

    对于Semaphore而言,比较常见的应用场景就是连接池,因为资源有限,且开启一个链接的成本较高,所以就需要用到链接池,假设现在数据库最大并发仅支持5个链接,那么此时如果我们有6个链接同时进来时,那么只有5个链接是可以正常链接到数据库的,而剩下的一个就只能等待,等待这5个链接中某一个断开链接,才能与数据库进行通信,如下就是一个基础版本链接池;
class Conn(): # 链接类
    def __init__(self,name):
        self.name=name

class ConnPool(): # 链接池类
    def __init__(self,count=5):
        self.count=count
        self.conns=[ Conn(x) for x in range(count)] # 初始创建count个链接
    def get_conn(self):
        if len(self.conns) > 0: # 获取链接的方法,最多只能获取count个链接
            return self.conns.pop()
    def return_conn(self,conn_obj):
        if len(self.conns) < self.count: # 归还链接的方法,最多只能归还count个链接
            self.conns.append(conn_obj)
    以上代码有个问题,可能会出现连接数超界的情况,不管是归还链接还是获取链接,在多线程场景下,可能A线程在获取len(self.conns) 是否大于 0,那么B线程也可能同时在获取len(self.conns) 是否大于 0,那么在这种场景下,就可能出现超界的情况,所以为了解决这个问题,我们可以引入Lock;
class Conn(): # 链接类
    def __init__(self,name):
        self.name=name

class ConnPool(): # 链接池类
    def __init__(self,count=5):
        self.count=count
        self.conns=[ Conn(x) for x in range(count)] # 初始创建count个链接
        self.lock=threading.Lock()
    def get_conn(self):
        with self.lock: # 在判断和操作语句之前先加一把锁
            if len(self.conns) > 0: # 获取链接的方法,最多只能获取count个链接
                return self.conns.pop()
    def return_conn(self,conn_obj):
        with self.lock: # 在判断和操作语句之前先加一把锁
            if len(self.conns) < self.count: # 归还链接的方法,最多只能归还count个链接
                self.conns.append(conn_obj)
    其实除了锁,还有更好的实现方式,即Semaphore,它主要就是用来解决这种应用场景的问题的,我们可以将Semaphore的value值设置为最大连接数,一旦有人获取链接,那么就acquire一次,一旦有人归还链接,那么就release一次,假设目前最大支持2个链接,然后当有3个人需要获取链接时,只有前2个能获取到链接,最后一个处于阻塞态,只有等待前两个用户归还了链接之后,第三个才能获取到链接,如下示例;
class Conn(): # 链接类
    def __init__(self,name):
        self.name=name

class ConnPool(): # 链接池类
    def __init__(self,count=2):
        self.count=count
        self.conns=[ Conn(x) for x in range(count)] # 初始创建count个链接
        self.sema=threading.BoundedSemaphore(count) # 最大值允许acquire的次数为count
    def get_conn(self):
        print("开始获取链接")
        self.sema.acquire() # 获取一次acquire一次
        print("获取成功")
        return self.conns.pop()
    def return_conn(self,conn_obj):
        self.conns.append(conn_obj)
        self.sema.release() # 归还一次release一次,需要在归还之后,先归还,后通知确定归还

c=ConnPool()
c.get_conn()
conn_obj=c.get_conn()
c.return_conn(conn_obj) # 归还链接
c.get_conn()

# 可以看到,一共获取了3次链接,我们程序最多只允许获取3次链接,主要原因是因为我们在后面归还了一次
# 开始获取链接
# 获取成功
# 开始获取链接
# 获取成功
# 开始获取链接
# 获取成功

多线程之Queue

    标准版queue模块,提供队列和优先队列,Queue类是线程安全的,适用于多线程间安全交换数据,但是需要注意的是,虽然这个Queue模块是线程安全的,虽然内部使用Lock和Condition保障了线程安全,但是依然不能保证数据的完全安全;
q = queue.Queue(8)

if q.qsize() == 7:
    q.put()  # 多线程场景下,很有可能多个线程同时判断此时等于7,那么就可能多个线程同时向队列里面塞数据
if q.qsize() == 1:
    q.get() # 多线程场景下,很有可能多个线程同时判断此时等于1,然后第一个线程拿走了,第二个线程拿的时候,却拿不到
    所谓的不安全,不是说queue模块本身不安全,queue本身是线程安全的,但是它只保护了数据的存取,但是它没法保证程序员的代码是否能够线程安全,所以多线程场景下上述代码出现问题的几率非常大,所以应该在判断之前,加个锁,然后在操作完成后,释放锁;

GIL全局解释器锁

    在Python中有一个非常著名的锁,GIL全局解释器锁,在解释器这个级别上有一把锁,解释器是解释器进程,所以它是一个进程级别的锁,在这个进程级别加锁主要是用来控制这个进程内部的线程的,GIL是在CPython的解释器进程当中,GIL要求在同一时刻,只能有一个线程在执行字节码,不管有多少棵核心都没用,在单个进程内在同一时刻只能有一个线程能被CPU调度;
    如果有很多线程都需要用CPU,那只能依靠CPU的时间片来进行切换,未被切换到CPU调度的线程,全部阻塞,所以前面说了,在Python中的多线程,都是假并行,因为在同一时刻只有一个线程能够被调度到CPU上,只不过由于CPU的执行效率非常高,在Python多线程的场景下,给了我们一种错觉,认为它是并行的;
    GIL这把锁是进程级别锁,它不能管别的进程,只能管理自己进程内的所有线程,由于Python存在这样的问题,所以它并不是非常适合那种CPU密集型的程序,因为它无法绕开GIL这把全局解释器锁;
    但是如果是IO密集型的,用多线程是非常合适的,因为IO密集型线程一旦访问IO就会主动放弃CPU的执行权,从运行态转为阻塞态,转为阻塞态这个瞬间会立即释放GIL锁,这个时候CPU就会去调度其他的线程,所以IO密集型,我们是看不出来它有什么大的问题的,再加上线程上比较轻量级的,所以在IO密集型时,推荐使用多线程;
    而对于CPU密集型,很可能当前线程连续获得多次GIL这把大锁,那么其他的线程将很少能够获得到GIL锁,有可能一个线程被调度的次数比较多,这是一种不公平的分配法则,本来利用多线程是为了能够提高并发数的,但是这样却变成了单线程了,变成了串行,所以针对CPU密集型的在Python的多线程场景下是不推荐使用的;
    如果一定要使用Python可以使用多进程来进行开发,因为GIL这把锁是进程级别的锁,它只能负责本进程内的内部事务,但是需要知道的是,每个进程内部的所有线程一样要受到GIL的控制,所以我们可以在多进程的场景下使用单线程运行程序,从而就达到了多核CPU的利用,这就是为什么CPU密集型要运行多进程的场景当中的原因,就是要饶开GIL;

串行

    下面的代码就是一个CPU密集计算型的代码,通过传统的串行方式去执行,如下示例
def calc():
    sum=0
    for i in range(90000000):
        sum+=1
start=datetime.datetime.now()
calc()
calc()
calc()
calc()
print((datetime.datetime.now()-start).total_seconds())
# 21.144253

多线程

    下面的代码就是一个CPU密集计算型的代码,通过Python的多线程方式并行执行,如下示例
def calc():
    sum=0
    for i in range(90000000):
        sum+=1
pools=[]
for _ in range(4):
    pools.append(threading.Thread(target=calc))
start=datetime.datetime.now()
for pool in pools:
    pool.start()
for pool in pools:
    pool.join()
print((datetime.datetime.now()-start).total_seconds())
# 21.134625
    此处说明一下,为什么另启一个for循环去join线程不会串行,其实和之前是一样的,只不过之前是启动一个线程然后join,下一个线程才能启动,这样做,就可以让所有的线程先start运行起来,然后一个一个join,至于join的时间,以执行时间最长的线程为准;
总结
    串行代码是单线程程序,所有的calc()依次执行,他们根本就不是并发,在主线程内函数以串行的方式一个一个排队执行,对于多线程代码,calc()在不同的线程中执行,但是由于GIL的存在,线程的执行变成了假并发,虽然是假并发,但是这些线程可以被调度到不同的CPU核心上执行,只不过GIL的限制,同一时刻该进程内只有一个线程能够被CPU调度;
    从两段程序测试结果来看,CPython中多线程根本有任何优势,和一个线程执行时间相当,其主要原因就是因为GIL的存在,这样实际上也就没有用上计算机多核心的优势;

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注